个人名片:
博主:酒徒ᝰ.
个人简介:沉醉在酒中,借着一股酒劲,去拼搏一个未来。
本篇励志:三人行,必有我师焉。
本项目基于B站黑马程序员Java《SpringCloud微服务技术栈》,SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式
【SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,系统详解springcloud微服务技术栈课程|黑马程序员Java微服务】 点击观看
三、SpringAMQP
2.Work Queue 工作队列模型
Work queue,工作队列,可以提高消息处理速度,避免队列消息堆积
RabbitMQ的工作队列(Work Queue)是一种重要的消息队列模式,它在分布式系统中扮演着处理密集型任务的职责。
一、背景与定义
在分布式系统中,消息队列是一种常见的数据交换方式。RabbitMQ作为一种流行的开源消息代理软件,提供了多种消息队列模式,其中工作队列是一种特殊的形式。它主要解决多个消费者有序执行密集型任务的问题,确保每个任务只能被一个消费者正确执行。
二、工作队列特点
- 有序性:RabbitMQ工作队列保证消息的有序消费。当多个消费者同时从队列中获取消息时,每个消费者都会按照消息的原始顺序逐一处理。
- 可靠性:一旦一个消费者从工作队列中获取到一个消息并开始处理,该消息将被锁定,确保其他消费者无法同时处理该消息。如果处理失败,消息不会从队列中删除,可以由其他消费者再次尝试处理。
- 效率性:通过将密集型任务分发到多个消费者并行处理,RabbitMQ工作队列可以提高系统的处理能力。
- 灵活性:工作队列支持自定义消息处理逻辑,消费者可以根据需要实现不同的业务处理逻辑,从而满足多样化的应用需求。
三、实现原理与具体应用
- 消息存储:RabbitMQ工作队列将消息持久化存储在磁盘上,确保在系统崩溃时不会丢失消息。同时,它支持消息的内存缓存,以提高读取效率。
- 消息处理:当消费者从工作队列中获取到消息时,会将其发送到指定的处理器进行处理。处理器可以根据需要实现自定义业务逻辑,例如对数据进- 行处理、调用其他服务或更新数据库等。
- 反馈机制:在处理完消息后,消费者可以将结果反馈给生产者。这种机制可以帮助生产者了解消息的处理情况,以便进行后续处理或重试。
四、与其他技术的比较
- 对比Kafka:Kafka是一种流处理平台,虽然也支持分布式消息队列,但与RabbitMQ的工作队列有所不同。Kafka强调实时流数据的处理,适用于大数据分析、日志收集等场景。而RabbitMQ工作队列更注重消息的有序性和可靠性,适用于需要保证数据一致性的分布式系统。
- 对比ActiveMQ:ActiveMQ是另一种流行的消息代理软件,它也支持工作队列模式。与RabbitMQ相比,ActiveMQ在性能和稳定性方面稍逊一筹。RabbitMQ具有更高效的内存管理、更高的吞吐量和更好的可扩展性。
五、应用设计与实践
在设计高效率、高收益、可靠性强、灵活性强、监控管理方便的应用系统时,我们可以利用RabbitMQ工作队列的特点进行优化。以下是一些经验技巧:
- 合理配置资源:根据业务需求和系统负载,合理配置RabbitMQ和消费者的资源。确保在工作负载高峰期时,系统能够承受并保持稳定运行。
- 优化消息处理逻辑:在实现自定义处理器时,应尽量减少处理过程中的IO操作和耗时操作,以提高消息处理速度。同时,可以采用线程池或进程池的方式,提高处理并发度。
- 异常处理与监控:在消费者处理消息过程中,应捕获并处理异常情况。同时,通过监控管理工具实时监控系统的性能指标和消息队列的状态,以便及时发现问题并进行调整。
- 合理使用重试机制:对于可能失败的消息处理任务,可以启用重试机制。当消费者处理失败时,可以自动将消息重新放回队列中,由其他消费者再次尝试处理。需要注意的是,重试机制可能会导致消息延迟,需权衡利弊进行使用。
- 合理调整工作队列参数:RabbitMQ提供了许多可配置的参数,如消息的存活时间、消费者的并发数等。根据实际需求和性能测试结果,合理调整这些参数以优化系统性能。
- 监控与分析:借助RabbitMQ提供的监控工具和分析插件,可以对消息队列的性能、消费者和生产者的行为进行分析与优化。这对于发现和解决系统瓶颈、提高整体性能具有重要作用。
总结
RabbitMQ的工作队列作为一种重要的消息队列模式,在分布式系统中扮演着处理密集型任务的职责。通过有序性、可靠性、效率性和灵活性的特点,它为应用系统提供了高效、可靠、灵活的消息处理机制。相比其他消息处理技术,RabbitMQ工作队列在实际应用中具有独特的优势和注意事项。掌握其实现原理和应用技巧,可以帮助我们更好地设计高效率、高收益、可靠性强、灵活性强、监控管理方便的应用系统。随着技术的不断发展和应用场景的不断变化,我们期待RabbitMQ工作队列在未来的实践中继续发挥重要作用,并有可能进一步扩展
案例:模拟WorkQueue,实现一个队列绑定多个消费者
步骤1:生产者循环发送消息到simple.queue
在publisher服务中添加一个测试方法,循环发送50条消息到simple.queue队列
@Test
public void testWorkQueue() throws InterruptedException {
String queueName = "simple.queue";
String message = "hello, message_";
for (int i = 0; i < 50; i++) {
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}
步骤2:编写两个消费者,都监听simple.queue
在consumer服务中添加一个消费者,也监听simple.queue:
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接受到消息:【"+ msg +"】" + LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.out.println("消费者2接受到消息:【"+ msg +"】" + LocalTime.now());
Thread.sleep(200);
}
消费者1为奇数,消费者2为偶数。
消费预取限制
修改application.yml文件,设置preFetch这个值,可以控制预取消息的上限:
spring:
rabbitmq:
listener:
simple:
prefetch: 1
总结:
Work模型的使用:
多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
通过设置prefetch来控制消费者预取的消息数量