RabbitMQ 简介
RabbitMQ 是一个开源的消息代理和队列服务器,基于 AMQP(Advanced Message Queuing Protocol)协议实现。它支持多种消息传递模式,适用于分布式系统中的异步通信、任务分发和事件驱动架构。
核心概念
- Producer(生产者):发送消息的应用。
- Consumer(消费者):接收消息的应用。
- Queue(队列):存储消息的缓冲区,遵循先进先出(FIFO)原则。
- Exchange(交换机):接收生产者发送的消息,并根据路由规则将消息分发到队列。
- Binding(绑定):定义交换机和队列之间的关联规则。
- Message(消息):包含有效载荷(payload)和元数据(如路由键、头信息等)。
常见交换机类型
- Direct Exchange:根据消息的路由键(routing key)精确匹配队列。
- Fanout Exchange:将消息广播到所有绑定的队列(忽略路由键)。
- Topic Exchange:通过通配符匹配路由键,支持灵活的消息分发。
- Headers Exchange:基于消息头(headers)而非路由键进行匹配。
主要特性
- 可靠性:支持消息持久化、确认机制(acknowledgments)和事务。
- 灵活性:支持多种协议(AMQP、MQTT、STOMP等)和插件扩展。
- 集群与高可用:支持镜像队列和故障转移。
- 跨平台:提供多种语言客户端(如Python、Java、Go等)。
典型应用场景
- 异步任务处理(如耗时操作解耦)。
- 微服务间的松耦合通信。
- 日志收集与事件广播。
- 流量削峰(通过队列缓冲高并发请求)。
在pox添加RabbitMQ依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在application.yml中配置RabbitMQ
spring:
rabbitmq:
host: 192.168.0.64
port: 5672
username: ${RABBITMQ_USERNAME:root}
password: ${RABBITMQ_PASSWORD:123456}
virtual-host: /
connection-timeout: 15000
requested-heartbeat: 30
publisher-returns: true
在config文件创建RabbitMQConfigProperties配置类
/**
* RabbitMQ配置属性类
*
* 该类用于读取和存储RabbitMQ相关的配置信息,包括连接参数、认证信息和连接设置等。
* 通过Spring的@ConfigurationProperties注解自动绑定application.yml或application.properties
* 中以"spring.rabbitmq"为前缀的配置项。
*/
@Data
@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMQConfigProperties {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
@Value("${spring.rabbitmq.connection-timeout}")
private int connectionTimeout;
@Value("${spring.rabbitmq.requested-heartbeat}")
private int requestedHeartbeat;
@Value("${spring.rabbitmq.publisher-returns}")
private boolean publisherReturns;
}
在config文件创建RabbitMQConnectionConfig配置类用于RabbitMQ连接工厂
/**
* RabbitMQ连接配置类
* 用于配置和创建RabbitMQ连接工厂bean
*/
@Configuration
public class RabbitMQConnectionConfig {
@Autowired
private RabbitMQConfigProperties rabbitMQConfigProperties;
/**
* 创建RabbitMQ连接工厂bean
* 通过读取配置属性来构建CachingConnectionFactory实例,用于管理RabbitMQ连接
*
* @return 配置好的ConnectionFactory实例
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(rabbitMQConfigProperties.getHost());
connectionFactory.setPort(rabbitMQConfigProperties.getPort());
connectionFactory.setUsername(rabbitMQConfigProperties.getUsername());
connectionFactory.setPassword(rabbitMQConfigProperties.getPassword());
connectionFactory.setVirtualHost(rabbitMQConfigProperties.getVirtualHost());
connectionFactory.setConnectionTimeout(rabbitMQConfigProperties.getConnectionTimeout());
connectionFactory.setRequestedHeartBeat(rabbitMQConfigProperties.getRequestedHeartbeat());
connectionFactory.setPublisherReturns(rabbitMQConfigProperties.isPublisherReturns());
return connectionFactory;
}
}
在config文件创建RabbitMQConfig用于配置消息队列的交换机、队列、绑定关系以及消息转换器
/**
* RabbitMQ配置类
* 用于配置消息队列的交换机、队列、绑定关系以及消息转换器
*/
@Configuration
public class RabbitMQConfig {
// 交换机名称
public static final String COMMENT_EXCHANGE = "comment.exchange";
// 队列名称
public static final String COMMENT_QUEUE = "comment.queue";
public static final String ANSWER_QUEUE = "answer.queue";
// 路由键
public static final String COMMENT_ROUTING_KEY = "comment.create";
public static final String ANSWER_ROUTING_KEY = "answer.create";
/**
* 声明评论交换机
* 创建一个持久化的topic类型交换机用于处理评论相关消息
*
* @return Exchange 评论交换机实例
*/
@Bean
public Exchange commentExchange() {
return ExchangeBuilder.topicExchange(COMMENT_EXCHANGE).durable(true).build();
}
/**
* 声明评论队列
* 创建一个持久化的评论队列用于存储评论消息
*
* @return Queue 评论队列实例
*/
@Bean
public Queue commentQueue() {
return QueueBuilder.durable(COMMENT_QUEUE).build();
}
/**
* 声明回答队列
* 创建一个持久化的回答队列用于存储回答消息
*
* @return Queue 回答队列实例
*/
@Bean
public Queue answerQueue() {
return QueueBuilder.durable(ANSWER_QUEUE).build();
}
/**
* 绑定评论队列和交换机
* 将评论队列通过指定路由键绑定到评论交换机上
*
* @return Binding 评论队列绑定关系实例
*/
@Bean
public Binding commentBinding() {
return BindingBuilder.bind(commentQueue()).to(commentExchange()).with(COMMENT_ROUTING_KEY).noargs();
}
/**
* 绑定回答队列和交换机
* 将回答队列通过指定路由键绑定到评论交换机上
*
* @return Binding 回答队列绑定关系实例
*/
@Bean
public Binding answerBinding() {
return BindingBuilder.bind(answerQueue()).to(commentExchange()).with(ANSWER_ROUTING_KEY).noargs();
}
/**
* 消息转换器
* 配置JSON格式的消息转换器用于对象序列化和反序列化
*
* @return MessageConverter 消息转换器实例
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
在utils文件创建CommentMessageProducer工具类
/**
* 评论消息生产者类
* 用于将评论和回答消息发送到RabbitMQ消息队列
*/
@Component
public class CommentMessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送评论消息到RabbitMQ
* 将评论对象通过指定的交换机和路由键发送到消息队列中
*
* @param comment 评论对象,包含评论的相关信息
*/
public void sendCommentMessage(ReleaseComment comment){
rabbitTemplate.convertAndSend(
RabbitMQConfig.COMMENT_EXCHANGE,
RabbitMQConfig.COMMENT_ROUTING_KEY,
comment
);
}
/**
* 发送回答消息到RabbitMQ
* 将回答对象通过指定的交换机和路由键发送到消息队列中
*
* @param answer 回答对象,包含回答的相关信息
*/
public void sendAnswerMessage(ReleaseAnswer answer){
rabbitTemplate.convertAndSend(
RabbitMQConfig.COMMENT_EXCHANGE,
RabbitMQConfig.ANSWER_ROUTING_KEY,
answer
);
}
}
在baens文件创建实体类ReleaseAnswer、ReleaseComment、CommentWithAnswersDTO
/**
* 发布回复评论
* @TableName release_answer
*/
@Data
@TableName("release_answer")
public class ReleaseAnswer {
@Id
private String id;
@TableField(value = "comment_id")
private String commentId;
@TableField(value = "user_id")
private String userId;
@TableField(value = "user_name")
private String userName;
@TableField(value = "course_id")
private String courseId;
@TableField(value = "course_name")
private String courseName;
@TableField(value = "content")
private String content;
@JsonFormat(pattern = "yyyy-MM-dd")
@TableField(value = "create_time")
private Date createTime;
@JsonFormat(pattern = "yyyy-MM-dd")
@TableField(value = "update_time")
private Date updateTime;
}
/**
* 发布评论
* @TableName release_comment
*/
@Data
@TableName("release_comment")
public class ReleaseComment {
@Id
private String id;
@TableField(value = "user_id")
private String userId;
@TableField(value = "user_name")
private String userName;
@TableField(value = "content")
private String content;
@TableField(value = "course_id")
private String courseId;
@TableField(value = "course_name")
private String courseName;
@JsonFormat(pattern = "yyyy-MM-dd")
@TableField(value = "create_time")
private Date createTime;
@JsonFormat(pattern = "yyyy-MM-dd")
@TableField(value = "update_time")
private Date updateTime;
}
/**
* 一次性返回完整的数据结构
*/
@Data
public class CommentWithAnswersDTO {
private ReleaseComment comment;
private List<ReleaseAnswer> answers;
}
ReleaseAnswerController
@RestController
@RequestMapping("/api")
@Tag(name = "发布答案接口")
public class ReleaseAnswerController {
@Autowired
private ReleaseAnswerService releaseAnswerService;
@Autowired
private ReleaseCommentService releaseCommentService;
@Autowired
private UsersService usersService;
@Autowired
private CoursesService coursesService;
@Autowired
private CommentMessageProducer commentMessageProducer;
@PostMapping("/add/answer")
@Operation(summary = "回答评论")
public ApiResult<ReleaseAnswer> addAnswer(
@RequestParam String commentId,
@RequestParam String userId,
@RequestParam String content,
@RequestParam String courseId
) {
QueryWrapper<Users> UsersWrapper = new QueryWrapper<>();
UsersWrapper.eq("id", userId);
Users user = usersService.getOne(UsersWrapper);
if (user == null) {
return new ApiResult<>(400, "用户不存在", null);
}
QueryWrapper<Courses> CoursesWrapper = new QueryWrapper<>();
CoursesWrapper.eq("id", courseId);
Courses course = coursesService.getOne(CoursesWrapper);
if (course == null) {
return new ApiResult<>(400, "课程不存在", null);
}
QueryWrapper<ReleaseComment> ReleaseCommentWrapper = new QueryWrapper<>();
ReleaseCommentWrapper.eq("id", commentId);
ReleaseComment comment = releaseCommentService.getOne(ReleaseCommentWrapper);
if (comment == null) {
return new ApiResult<>(400, "评论不存在", null);
}
ReleaseAnswer releaseAnswer = new ReleaseAnswer();
releaseAnswer.setId(UuidUtils.generate());
releaseAnswer.setCommentId(commentId);
releaseAnswer.setUserId(userId);
releaseAnswer.setUserName(user.getUsername());
releaseAnswer.setCourseId(courseId);
releaseAnswer.setCourseName(course.getName());
releaseAnswer.setContent(content);
releaseAnswer.setUpdateTime(new Date());
releaseAnswer.setCreateTime(new Date());
commentMessageProducer.sendAnswerMessage(releaseAnswer);
releaseAnswerService.save(releaseAnswer);
return new ApiResult<>(200, "添加成功", null);
}
@GetMapping("/comment/course/{courseId}")
@Operation(summary = "获取课程下的所有评论")
public ApiResult<List<CommentWithAnswersDTO>> getCommentsByCourseId(@PathVariable String courseId) {
QueryWrapper<ReleaseComment> commentWrapper = new QueryWrapper<>();
commentWrapper.eq("course_id", courseId);
List<ReleaseComment> comments = releaseCommentService.list(commentWrapper);
System.out.println(comments);
// 转换为包含回复的DTO
List<CommentWithAnswersDTO> result = comments.stream().map(comment -> {
CommentWithAnswersDTO dto = new CommentWithAnswersDTO();
dto.setComment(comment);
// 查询该评论的所有回复
QueryWrapper<ReleaseAnswer> answerWrapper = new QueryWrapper<>();
answerWrapper.eq("comment_id", comment.getId());
List<ReleaseAnswer> answers = releaseAnswerService.list(answerWrapper);
dto.setAnswers(answers);
return dto;
}).collect(Collectors.toList());
return new ApiResult<>(200, "获取成功", result);
}
}
ReleaseCommentController
@RestController
@RequestMapping("/api")
@Tag(name = "发布评论接口")
public class ReleaseCommentController {
@Autowired
private ReleaseCommentService releaseCommentService;
@Autowired
private UsersService usersService;
@Autowired
private CoursesService coursesService;
@Autowired
private CommentMessageProducer commentMessageProducer;
@PostMapping("/add/comment")
@Operation(summary = "发布评论")
public ApiResult<ReleaseComment> addComment(
@RequestParam String userId,
@RequestParam String content,
@RequestParam String courseId
) {
QueryWrapper<Users> UsersWrapper = new QueryWrapper<>();
UsersWrapper.eq("id", userId);
Users user = usersService.getOne(UsersWrapper);
if (user == null) {
return new ApiResult<>(400, "用户不存在", null);
}
QueryWrapper<Courses> CoursesWrapper = new QueryWrapper<>();
CoursesWrapper.eq("id", courseId);
Courses course = coursesService.getOne(CoursesWrapper);
if (course == null) {
return new ApiResult<>(400, "课程不存在", null);
}
ReleaseComment releaseComment = new ReleaseComment();
releaseComment.setId(UuidUtils.generate());
releaseComment.setUserId(userId);
releaseComment.setUserName(user.getUsername());
releaseComment.setCourseId(courseId);
releaseComment.setCourseName(course.getName());
releaseComment.setContent(content);
releaseComment.setUpdateTime(new Date());
releaseComment.setCreateTime(new Date());
commentMessageProducer.sendCommentMessage(releaseComment);
releaseCommentService.save(releaseComment);
return new ApiResult<>(200, "添加成功", releaseComment);
}
}