springboot项目详细配置rabbitmq及使用rabbitmq完成评论功能
RabbitMQ 简介
RabbitMQ 是一个开源的消息代理和队列服务器,基于 AMQP(Advanced Message Queuing Protocol)协议实现。它支持多种消息传递模式,适用于分布式系统中的异步通信、任务分发和事件驱动架构。
核心概念
- Producer(生产者):发送消息的应用。
- Consumer(消费者):接收消息的应用。
- Queue(队列):存储消息的缓冲区,遵循先进先出(FIFO)原则。
- Exchange(交换机):接收生产者发送的消息,并根据路由规则将消息分发到队列。
- Binding(绑定):定义交换机和队列之间的关联规则。
- Message(消息):包含有效载荷(payload)和元数据(如路由键、头信息等)。
常见交换机类型
- Direct Exchange:根据消息的路由键(routing key)精确匹配队列。
- Fanout Exchange:将消息广播到所有绑定的队列(忽略路由键)。
- Topic Exchange:通过通配符匹配路由键,支持灵活的消息分发。
- Headers Exchange:基于消息头(headers)而非路由键进行匹配。
主要特性
- 可靠性:支持消息持久化、确认机制(acknowledgments)和事务。
- 灵活性:支持多种协议(AMQP、MQTT、STOMP等)和插件扩展。
- 集群与高可用:支持镜像队列和故障转移。
- 跨平台:提供多种语言客户端(如Python、Java、Go等)。
典型应用场景
- 异步任务处理(如耗时操作解耦)。
- 微服务间的松耦合通信。
- 日志收集与事件广播。
- 流量削峰(通过队列缓冲高并发请求)。
在pox添加RabbitMQ依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
在application.yml中配置RabbitMQ
spring:rabbitmq:host: 192.168.0.64port: 5672username: ${RABBITMQ_USERNAME:root}password: ${RABBITMQ_PASSWORD:123456}virtual-host: /connection-timeout: 15000requested-heartbeat: 30publisher-returns: true
在config文件创建RabbitMQConfigProperties配置类
/*** RabbitMQ配置属性类** 该类用于读取和存储RabbitMQ相关的配置信息,包括连接参数、认证信息和连接设置等。* 通过Spring的@ConfigurationProperties注解自动绑定application.yml或application.properties* 中以"spring.rabbitmq"为前缀的配置项。*/
@Data
@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMQConfigProperties {@Value("${spring.rabbitmq.host}")private String host;@Value("${spring.rabbitmq.port}")private int port;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.virtual-host}")private String virtualHost;@Value("${spring.rabbitmq.connection-timeout}")private int connectionTimeout;@Value("${spring.rabbitmq.requested-heartbeat}")private int requestedHeartbeat;@Value("${spring.rabbitmq.publisher-returns}")private boolean publisherReturns;
}
在config文件创建RabbitMQConnectionConfig配置类用于RabbitMQ连接工厂
/*** RabbitMQ连接配置类* 用于配置和创建RabbitMQ连接工厂bean*/
@Configuration
public class RabbitMQConnectionConfig {@Autowiredprivate RabbitMQConfigProperties rabbitMQConfigProperties;/*** 创建RabbitMQ连接工厂bean* 通过读取配置属性来构建CachingConnectionFactory实例,用于管理RabbitMQ连接** @return 配置好的ConnectionFactory实例*/@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(rabbitMQConfigProperties.getHost());connectionFactory.setPort(rabbitMQConfigProperties.getPort());connectionFactory.setUsername(rabbitMQConfigProperties.getUsername());connectionFactory.setPassword(rabbitMQConfigProperties.getPassword());connectionFactory.setVirtualHost(rabbitMQConfigProperties.getVirtualHost());connectionFactory.setConnectionTimeout(rabbitMQConfigProperties.getConnectionTimeout());connectionFactory.setRequestedHeartBeat(rabbitMQConfigProperties.getRequestedHeartbeat());connectionFactory.setPublisherReturns(rabbitMQConfigProperties.isPublisherReturns());return connectionFactory;}
}
在config文件创建RabbitMQConfig用于配置消息队列的交换机、队列、绑定关系以及消息转换器
/*** RabbitMQ配置类* 用于配置消息队列的交换机、队列、绑定关系以及消息转换器*/
@Configuration
public class RabbitMQConfig {// 交换机名称public static final String COMMENT_EXCHANGE = "comment.exchange";// 队列名称public static final String COMMENT_QUEUE = "comment.queue";public static final String ANSWER_QUEUE = "answer.queue";// 路由键public static final String COMMENT_ROUTING_KEY = "comment.create";public static final String ANSWER_ROUTING_KEY = "answer.create";/*** 声明评论交换机* 创建一个持久化的topic类型交换机用于处理评论相关消息** @return Exchange 评论交换机实例*/@Beanpublic Exchange commentExchange() {return ExchangeBuilder.topicExchange(COMMENT_EXCHANGE).durable(true).build();}/*** 声明评论队列* 创建一个持久化的评论队列用于存储评论消息** @return Queue 评论队列实例*/@Beanpublic Queue commentQueue() {return QueueBuilder.durable(COMMENT_QUEUE).build();}/*** 声明回答队列* 创建一个持久化的回答队列用于存储回答消息** @return Queue 回答队列实例*/@Beanpublic Queue answerQueue() {return QueueBuilder.durable(ANSWER_QUEUE).build();}/*** 绑定评论队列和交换机* 将评论队列通过指定路由键绑定到评论交换机上** @return Binding 评论队列绑定关系实例*/@Beanpublic Binding commentBinding() {return BindingBuilder.bind(commentQueue()).to(commentExchange()).with(COMMENT_ROUTING_KEY).noargs();}/*** 绑定回答队列和交换机* 将回答队列通过指定路由键绑定到评论交换机上** @return Binding 回答队列绑定关系实例*/@Beanpublic Binding answerBinding() {return BindingBuilder.bind(answerQueue()).to(commentExchange()).with(ANSWER_ROUTING_KEY).noargs();}/*** 消息转换器* 配置JSON格式的消息转换器用于对象序列化和反序列化** @return MessageConverter 消息转换器实例*/@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}
}
在utils文件创建CommentMessageProducer工具类
/*** 评论消息生产者类* 用于将评论和回答消息发送到RabbitMQ消息队列*/
@Component
public class CommentMessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送评论消息到RabbitMQ* 将评论对象通过指定的交换机和路由键发送到消息队列中** @param comment 评论对象,包含评论的相关信息*/public void sendCommentMessage(ReleaseComment comment){rabbitTemplate.convertAndSend(RabbitMQConfig.COMMENT_EXCHANGE,RabbitMQConfig.COMMENT_ROUTING_KEY,comment);}/*** 发送回答消息到RabbitMQ* 将回答对象通过指定的交换机和路由键发送到消息队列中** @param answer 回答对象,包含回答的相关信息*/public void sendAnswerMessage(ReleaseAnswer answer){rabbitTemplate.convertAndSend(RabbitMQConfig.COMMENT_EXCHANGE,RabbitMQConfig.ANSWER_ROUTING_KEY,answer);}
}
在baens文件创建实体类ReleaseAnswer、ReleaseComment、CommentWithAnswersDTO
/*** 发布回复评论* @TableName release_answer*/
@Data
@TableName("release_answer")
public class ReleaseAnswer {@Idprivate String id;@TableField(value = "comment_id")private String commentId;@TableField(value = "user_id")private String userId;@TableField(value = "user_name")private String userName;@TableField(value = "course_id")private String courseId;@TableField(value = "course_name")private String courseName;@TableField(value = "content")private String content;@JsonFormat(pattern = "yyyy-MM-dd")@TableField(value = "create_time")private Date createTime;@JsonFormat(pattern = "yyyy-MM-dd")@TableField(value = "update_time")private Date updateTime;
}/*** 发布评论* @TableName release_comment*/
@Data
@TableName("release_comment")
public class ReleaseComment {@Idprivate String id;@TableField(value = "user_id")private String userId;@TableField(value = "user_name")private String userName;@TableField(value = "content")private String content;@TableField(value = "course_id")private String courseId;@TableField(value = "course_name")private String courseName;@JsonFormat(pattern = "yyyy-MM-dd")@TableField(value = "create_time")private Date createTime;@JsonFormat(pattern = "yyyy-MM-dd")@TableField(value = "update_time")private Date updateTime;
}/*** 一次性返回完整的数据结构*/
@Data
public class CommentWithAnswersDTO {private ReleaseComment comment;private List<ReleaseAnswer> answers;
}
ReleaseAnswerController
@RestController
@RequestMapping("/api")
@Tag(name = "发布答案接口")
public class ReleaseAnswerController {@Autowiredprivate ReleaseAnswerService releaseAnswerService;@Autowiredprivate ReleaseCommentService releaseCommentService;@Autowiredprivate UsersService usersService;@Autowiredprivate CoursesService coursesService;@Autowiredprivate CommentMessageProducer commentMessageProducer;@PostMapping("/add/answer")@Operation(summary = "回答评论")public ApiResult<ReleaseAnswer> addAnswer(@RequestParam String commentId,@RequestParam String userId,@RequestParam String content,@RequestParam String courseId) {QueryWrapper<Users> UsersWrapper = new QueryWrapper<>();UsersWrapper.eq("id", userId);Users user = usersService.getOne(UsersWrapper);if (user == null) {return new ApiResult<>(400, "用户不存在", null);}QueryWrapper<Courses> CoursesWrapper = new QueryWrapper<>();CoursesWrapper.eq("id", courseId);Courses course = coursesService.getOne(CoursesWrapper);if (course == null) {return new ApiResult<>(400, "课程不存在", null);}QueryWrapper<ReleaseComment> ReleaseCommentWrapper = new QueryWrapper<>();ReleaseCommentWrapper.eq("id", commentId);ReleaseComment comment = releaseCommentService.getOne(ReleaseCommentWrapper);if (comment == null) {return new ApiResult<>(400, "评论不存在", null);}ReleaseAnswer releaseAnswer = new ReleaseAnswer();releaseAnswer.setId(UuidUtils.generate());releaseAnswer.setCommentId(commentId);releaseAnswer.setUserId(userId);releaseAnswer.setUserName(user.getUsername());releaseAnswer.setCourseId(courseId);releaseAnswer.setCourseName(course.getName());releaseAnswer.setContent(content);releaseAnswer.setUpdateTime(new Date());releaseAnswer.setCreateTime(new Date());commentMessageProducer.sendAnswerMessage(releaseAnswer);releaseAnswerService.save(releaseAnswer);return new ApiResult<>(200, "添加成功", null);}@GetMapping("/comment/course/{courseId}")@Operation(summary = "获取课程下的所有评论")public ApiResult<List<CommentWithAnswersDTO>> getCommentsByCourseId(@PathVariable String courseId) {QueryWrapper<ReleaseComment> commentWrapper = new QueryWrapper<>();commentWrapper.eq("course_id", courseId);List<ReleaseComment> comments = releaseCommentService.list(commentWrapper);System.out.println(comments);// 转换为包含回复的DTOList<CommentWithAnswersDTO> result = comments.stream().map(comment -> {CommentWithAnswersDTO dto = new CommentWithAnswersDTO();dto.setComment(comment);// 查询该评论的所有回复QueryWrapper<ReleaseAnswer> answerWrapper = new QueryWrapper<>();answerWrapper.eq("comment_id", comment.getId());List<ReleaseAnswer> answers = releaseAnswerService.list(answerWrapper);dto.setAnswers(answers);return dto;}).collect(Collectors.toList());return new ApiResult<>(200, "获取成功", result);}
}
ReleaseCommentController
@RestController
@RequestMapping("/api")
@Tag(name = "发布评论接口")
public class ReleaseCommentController {@Autowiredprivate ReleaseCommentService releaseCommentService;@Autowiredprivate UsersService usersService;@Autowiredprivate CoursesService coursesService;@Autowiredprivate CommentMessageProducer commentMessageProducer;@PostMapping("/add/comment")@Operation(summary = "发布评论")public ApiResult<ReleaseComment> addComment(@RequestParam String userId,@RequestParam String content,@RequestParam String courseId) {QueryWrapper<Users> UsersWrapper = new QueryWrapper<>();UsersWrapper.eq("id", userId);Users user = usersService.getOne(UsersWrapper);if (user == null) {return new ApiResult<>(400, "用户不存在", null);}QueryWrapper<Courses> CoursesWrapper = new QueryWrapper<>();CoursesWrapper.eq("id", courseId);Courses course = coursesService.getOne(CoursesWrapper);if (course == null) {return new ApiResult<>(400, "课程不存在", null);}ReleaseComment releaseComment = new ReleaseComment();releaseComment.setId(UuidUtils.generate());releaseComment.setUserId(userId);releaseComment.setUserName(user.getUsername());releaseComment.setCourseId(courseId);releaseComment.setCourseName(course.getName());releaseComment.setContent(content);releaseComment.setUpdateTime(new Date());releaseComment.setCreateTime(new Date());commentMessageProducer.sendCommentMessage(releaseComment);releaseCommentService.save(releaseComment);return new ApiResult<>(200, "添加成功", releaseComment);}}