RabbitMQ是什么?
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
PS:也可能直接问什么是消息队列?消息队列就是一个使用队列来通信的组件
什么是消息队列:我们可以把消息队列看作是一个存放消息的容器,当我们需要使用消息的时候,直接从容器中取出消息供自己使用即可。由于队列 Queue 是一种先进先出的数据结构,所以消费消息时也是按照顺序来消费的。
RabbitMQ特点?
- 可靠性: RabbitMQ使用一些机制来保证可靠性, 如持久化、传输确认及发布确认等。
- 灵活的路由 : 在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能, RabbitMQ 己经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个 交换器绑定在一起, 也可以通过插件机制来实现自己的交换器。
- 扩展性: 多个RabbitMQ节点可以组成一个集群,也可以根据实际业务情况动态地扩展 集群中节点。
- 高可用性 : 队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队 列仍然可用。
- 多种协议: RabbitMQ除了原生支持AMQP协议,还支持STOMP, MQTT等多种消息 中间件协议。
- 多语言客户端 :RabbitMQ 几乎支持所有常用语言,比如 Java、 Python、 Ruby、 PHP、 C#、 JavaScript 等。
- 管理界面 : RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息、集 群中的节点等。
- 令插件机制 : RabbitMQ 提供了许多插件 , 以实现从多方面进行扩展,当然也可以编写自 己的插件。
AMQP是什么?
RabbitMQ就是 AMQP 协议的 Erlang 的实现(当然 RabbitMQ 还支持 STOMP2、 MQTT3 等协议 ) AMQP 的模型架构 和 RabbitMQ 的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定
RabbitMQ 中的交换器、交换器类型、队列、绑定、路由键等都是遵循的 AMQP 协议中相 应的概念。目前 RabbitMQ 最新版本默认支持的是 AMQP 0-9-1。
AMQP协议3层?
Module Layer:协议最高层,主要定义了一些客户端调用的命令,客户端可以用这些命令实现自己的业务逻辑。
Session Layer:中间层,主要负责客户端命令发送给服务器,再将服务端应答返回客户端,提供可靠性同步机制和错误处理。
TransportLayer:最底层,主要传输二进制数据流,提供帧的处理、信道服用、错误检测和数据表示等。
AMQP 模型的几大组件?
- 交换器 (Exchange):消息代理服务器中用于把消息路由到队列的组件。
- 队列 (Queue):用来存储消息的数据结构,位于硬盘或内存中。
- 绑定 (Binding):一套规则,告知交换器消息应该将消息投递给哪个队列。
说说生产者 Producer 和消费者 Consumer ?
生产者
- 消息生产者,就是投递消息的一方。
- 消息一般包含两个部分:消息体(payload)和标签(Label)。
消费者
- 消费消息,也就是接收消息的一方。
- 消费者连接到RabbitMQ服务器,并订阅到队列上。消费消息时只消费消息体,丢弃标签。
为什么需要消息队列?
从本质上来说是因为互联网的快速发展,业务不断扩张,促使技术架构需要不断的演进。
从以前的单体架构到现在的微服务架构,成百上千的服务之间相互调用和依赖。从互联网初期一个服务器上有 100 个在线用户已经很了不得,到现在坐拥10亿日活的微信。此时,我们需要有一个「工具」来解耦服务之间的关系、控制资源合理合时的使用以及缓冲流量洪峰等等。因此,消息队列就应运而生了。
它常用来实现:异步处理、服务解耦、流量控制(削峰)。
说说 Broker 服务节点、Queue 队列、Exchange 交换器?
- Broker可以看做RabbitMQ的服务节点。一般请下一个Broker可以看做一个RabbitMQ服务器。
- Queue:RabbitMQ的内部对象,用于存储消息。多个消费者可以订阅同一队列,这时队列中的消息会被平摊(轮询)给多个消费者进行处理。
- Exchange:生产者将消息发送到交换器,由交换器将消息路由到一个或者多个队列中。当路由不到时,或返回给生产者或直接丢弃。
消息队列有什么优缺点
优点上面已经说了,就是在特殊场景下有其对应的好处,解耦、异步、削峰。缺点有以下几个:
- 系统可用性降低 系统引入的外部依赖越多,越容易挂掉。万一 MQ 挂了,MQ 一挂,整套系统崩 溃,你不就完了?
- 系统复杂度提高 硬生生加个 MQ 进来,你怎么保证消息没有重复消费?怎么处理消息丢失的情况?
- 怎么保证消息传递的顺序性?问题一大堆。hjx2151942196
- 一致性问题 A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致 了。
如何保证消息的可靠性?
消息到MQ的过程中搞丢?MQ自己搞丢?MQ到消费过程中搞丢?
- 生产者到RabbitMQ:事务机制和Confirm机制,注意:事务机制和 Confirm 机制是互斥的,两者不能共存,会导致 RabbitMQ 报错。
- 事务机制:(效率低,需要阻塞消息的发送无法发送下一条)
- channnel.txSelect()开启事务
- channel.txCommit()用于提交事务
- channel.txRollback()用于回滚机制
- Confirm机制(生产者确认机制、默认是未开启Confirm模式)
- 单条confirm模式(开启模式后会在生产方生成唯一消息ID(deliveryTag 自增)然后消息队列会给用户传递参数包括deliveryTag、ACK/Nack标志)用户阻塞等待ack或者nack,效率大于事务机制,因为单条Confirm模式比事务机制少了一条指令。
- 批量Confirm模式(先开启confirm模式,发送多条之后再调用waitForConfirms()方法确认,但是如果出现了false 或者 超时 当前批次需要全部重发,效率并不比单Confirm模式高)
- 异步Confirm模式(该模式通过addConfirmListener监听实现 handleAck 和 handleNack方法,通过SortedSet来维护一个发生序列集合,按照ack机制来删除set中的元素)
- 事务机制:(效率低,需要阻塞消息的发送无法发送下一条)
- RabbitMQ自身:持久化、集群、普通模式、镜像模式。
- 持久化(默认不是持久化的):持久化消息会影响RabbitMQ的性能,因为写入硬盘速度比写入内存的不止一点。但是三点都持久化了也不能保证数据的不丢失。需要在producer引入事务机制或者Confirm机制来确保消息已经正确的发送到broker端。如果是在持久化时短时间丢失还未及落盘则需要配置RabbitMQ的镜像队列生成副本提高可用性。
- 交换机持久化:如果当第三个参数durable 为true时 表示需要持久化 服务重启时还会存在。如果需要消息队列持久化的强保证,可以使用publisher confirms。
- 队列持久化:
- 消息的持久化:消息持久化的前提是队列持久化
- 持久化(默认不是持久化的):持久化消息会影响RabbitMQ的性能,因为写入硬盘速度比写入内存的不止一点。但是三点都持久化了也不能保证数据的不丢失。需要在producer引入事务机制或者Confirm机制来确保消息已经正确的发送到broker端。如果是在持久化时短时间丢失还未及落盘则需要配置RabbitMQ的镜像队列生成副本提高可用性。
- RabbitMQ到消费者:basicAck机制、死信队列、消息补偿机制。
- basicAck 机制
- 消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 参数等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移除消息(实际上是先打上删除标记,之后在删除)
- RabbitMQ 会一直等待持有消息直到消费者显式调用 Basic.Ack 命令为止。
- 死信队列
- 变成了 “ 死信队列 ” 后被重新投递 (publish) 到另一个 Exchange ,然后重新消费。说白了就是没有被消费 的消息换个地方重新被消费
- 消息补偿机制
- 发消息
- 收到消息确认
- 检查比对是不是收到消息了
- 发生业务操作,业务数据写入数据库
- 生产者将消息发送给MQ的队列Q1
- 发送了一条与step2中一摸一样的延迟消息到对了Q3
- 消费者监听Q1,获取到了step2中发送的业务消息
- 消费者在收到生产者的业务消息后,发送了一条确认消息(记录收到的消息信息)到Q2
- 回调检查服务监听了Q2,获取到了消费者发送的确认消息
- 回调检查服务将这条确认消息写入数据库等待之后的比对
- Q3中的延迟消息延迟时间已到,被回调检查服务接收到,之后就拿着这条延迟消息在数据库中比对,如果比对成功,证明消费者接收到了生产者的业务消息并处理成功(如果不处理成功谁会傻了吧唧发送确认消息呢);如果比对失败,证明消费者没有接收到生产者的业务消息,或者说消费者接收到了业务消息之后始终没有处理成功相关的业务并发送确认消息。这时回调检查服务就会调用生产者的相关业务接口,让生产者再次发送这条失败的消息
- 有一种最极端的情况,step2和step3的消息都发送失败了或者说在消息传递过程中发生意外丢失了!定时检查服务会一直轮询保存确认消息的数据库中的消息数据,并于生产者的业务数据库中的业务数据进行比对,如果两者比对数量一致,则代表业务执行没有问题;如果比对不一致,确认消息数据库的数据量小于生产者业务数据量的话,就证明消费者没有接收到生产者发送的消息。这时定时检查服务会通知生产者再次发送消息到MQ的队列Q1
- basicAck 机制
什么是 RoutingKey 路由键?
生产者将消息发送给交换器的时候,会指定一个RoutingKey,用来指定这个消息的路由规则,这个RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。
Binding绑定?
通过绑定将交换器和队列关联起来,一般会指定一个BindingKey,这样RabbitMq就知道如何正确路由消息到队列了。
交换机4种类型?
RabbitMQ接收到消息之后通过交换机转发到对应的队列上面。Worker会从队列中获取未被读取的数据处理。
主要有以下4种。
- fanout(扇形交换机):把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。
- 需要随时增加减少业务处理的队列,例如注册、下单等功能需要增加送积分功能,只需要增加一个绑定到交换机的队列去处理新业务,无需修改旧的业务逻辑,从而达到业务解耦,非常容易扩展。
- direct(直连交换机):把消息路由到BindingKey和RoutingKey完全匹配的队列中。
- 有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理高优先级的队列。
- topic(主题交换机):
- 匹配规则:
- RoutingKey 为一个 点号’.': 分隔的字符串。比如: java.xiaoka.show
- BindingKey和RoutingKey一样也是点号“.“分隔的字符串。
- BindingKey可使用 * 和 # 用于做模糊匹配,*匹配一个单词,#匹配多个或者0个
- 消息需要基于多重条件进行路由到达对应队列,例如:日志系统,不仅可以根据日志的级别而且能根据日志的来源进行订阅。
- 匹配规则:
- headers:不依赖路由键匹配规则路由消息。是根据发送消息内容中的headers属性进行匹配。性能差,基本用不到。
生产者消息运转?
- Producer 先连接到 Broker ,建立连接 Connection ,开启一个信道(Channel)。
- Producer 声明一个交换器并设置好相关属性。
- Producer 声明一个队列并设置好相关属性。
- Producer 通过路由键将交换器和队列绑定起来。
- Producer 发送消息到Broker,其中包含路由键、交换器等信息。
- 相应的交换器根据接收到的路由键查找匹配的队列。
- 如果找到,将消息存入对应的队列,如果没有找到,会根据生产者的配置丢弃或者退回给生产者。
- 关闭信道。
- 管理连接。
消费者接收消息过程?
- Producer 先连接到 Broker,建立连接 Connection ,开启一个信道(Channel)。
- 向 Broker 请求消费响应的队列中消息,可能会设置响应的回调函数。
- 等待 Broker 回应并投递相应队列中的消息,接收消息。
- 消费者确认收到的消息 ack。
- RabbitMQ 从队列中删除已经确定的消息。
- 关闭信道。
- 关闭连接。
交换器无法根据自身类型和路由键找到符合条件队列时,有哪些处理?
- mandatory 参数为:true 返回消息给生产者。
- mandatory 参数为:false 直接丢弃。
无法根据自身的类型和路由键找到一个符合条件的队列,那么 RabbitM 会调用Basic.Return命令将消息返回给生产者 。通过调用channel.addReturnListener 来添加ReturnListener 监听器实现。同时return message之前channel不能关闭。
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode, String replyText,
String exchange, String routingKey,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println( "Basic.Return 返回的结果是: "+message );
}
});
死信队列?
DLX,全称为 Dead-Letter-Exchange,死信交换器,死信邮箱。当消息在一个队列中变成死信 (dead message) 之后,它能被重新被发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为死信队列。
如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。
:::info
配置死信队列
:::
- 配置业务队列,绑定到业务交换机上
- 为业务队列配置死信交换机和路由key
- 为死信交换机配置死信队列
而是为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由key。
需要对业务队列配置一个Map的参数来绑定死信队列的RoutingKey。
导致的死信的几种原因?
- 消息被拒(Basic.Reject /Basic.Nack) 且 requeue = false。
- 消息TTL过期。
- 队列满了,无法再添加。
延迟队列?
存储对应的延迟消息,指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
怎么实现延迟队列:
- TTL是RabbitMQ中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒
- 如果同时配置了队列的TTL和消息的TTL,那么较小的那个值将会被使用。
一个消息的延时时长很长,而第二个消息的延时时长很短,则第二个消息并不会优先得到执行。
如果是采用的为队列设置过期时间的话,这里面的消息过期后可以直接删掉也可以转入死信队列,而如果单独为消息设置过期时间的话,过期后就只能是直接删除该消息,不会转入死信队列中。
只有当过期消息到达队列的头部时,它们才会被真实地丢弃(或死信路由)。
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
这样所有被投递到该队列的消息都最多不会存活超过6s。
另一种方式便是针对每条消息设置TTL,代码如下:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());
如果不设置TTL,表示消息永远不会过期,如果将TTL设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。
优先级队列?
- 优先级高的队列会先被消费。
- 可以通过x-max-priority参数来实现。
- 当消费速度大于生产速度且Broker没有堆积的情况下,优先级显得没有意义。
事务机制?
RabbitMQ 客户端中与事务机制相关的方法有三个:
- channel.txSelect 用于将当前的信道设置成事务模式。
- channel.txCommit 用于提交事务 。
- channel.txRollback 用于事务回滚,如果在事务提交执行之前由于 RabbitMQ 异常崩溃或者其他原因抛出异常,通过txRollback来回滚。
发送确认机制?
生产者把信道设置为confirm确认模式,设置后,所有再改信道发布的消息都会被指定一个唯一的ID,一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这样生产者就知道消息到达对应的目的地了。
Confirm机制(生产者确认机制、默认是未开启Confirm模式)
- 单条confirm模式(开启模式后会在生产方生成唯一消息ID(deliveryTag 自增)然后消息队列会给用户传递参数包括deliveryTag、ACK/Nack标志)用户阻塞等待ack或者nack,效率大于事务机制,因为单条Confirm模式比事务机制少了一条指令。
- 批量Confirm模式(先开启confirm模式,发送多条之后再调用waitForConfirms()方法确认,但是如果出现了false 或者 超时 当前批次需要全部重发,效率并不比单Confirm模式高)
- 异步Confirm模式(该模式通过addConfirmListener监听实现 handleAck 和 handleNack方法,通过SortedSet来维护一个发生序列集合,按照ack机制来删除set中的元素)
消费者获取消息的方式?
- 推
- 拉
:::info
push方式
:::
消息消费的过程:
- mq接收到消息
- mq主动将消息推送给消费者(消费者需提供一个消费接口)
mq属于主动方,消费者属于一种被动消费,一旦有消息到达mq,会触发mq推送机制,将消息推送给消费者,不管消费者处于何种状态。
优点:
- 消费者代码较少:对于消费者来说,只需提供一个消费接口给mq即可;mq将接收到的消息,随即推送到指定的消费接口
- 消息实时性比较高:对于消费者来说,消息一旦到达mq,mq会立即推送给消费者
缺点:
- 消费者属于被动方,消息量比较大时,对消费者性能要求比较高;若消费者机器资源有限,可能会导致压力过载,引发宕机的情况。
- 对消费者可用性要求比较高:当消费者不可用时,会导致很push失败,在mq方需要考虑至少推送成功一次,这块的设计下章节会做说明。
:::info
pull方式
:::
消息消费的过程:
- 消费端采用轮询的方式,从mq服务中拉取消息进行消费
- 消费完成通知mq删除已消费成功的消息
- 继续拉取消息消费
对于消费者来说,是主动方,可以采用线程池的方式,根据机器的性能来增加或缩小线程池的大小,控制拉取消息的速度,可以很好的控制自身的压力。
优点:
- 消费者可以根据自己的性能主动控制消息拉去的速度,控制自己的压力,不至于把自己弄跨
- 实时性相对于push方式会低一些
- 消费者属于主动方,控制权更大一些
缺点:
- 消费方需要实现消息拉取的代码
- 消费速度较慢时,可能导致mq中消息积压,消息消费延迟等
总结
- 消费者性能较好,对实时性要求比较高的,可以采用push的方式
- 消费者性能有限,建议采用pull的方式
- 整体上来说,主要在于消费者的性能,机器的性能如果没有问题,push和pull都是可以的
消费者某些原因无法处理当前接受的消息如何来拒绝?
- channel.basicNack
- channel.basicReject
消息传输保证层级?
At most once:最多一次。消息可能会丢失,但不会重复传输。
At least once:最少一次。消息绝不会丢失,但可能会重复传输。
Exactly once:恰好一次,每条消息肯定仅传输一次。
了解Virtual Host吗?
每一个RabbitMQ服务器都能创建虚拟的消息服务器,也叫虚拟主机(virtual host),简称vhost。
默认为“/”。
集群中的节点类型?
内存节点:ram,将变更写入内存。
磁盘节点:disc,磁盘写入操作。
RabbitMQ要求最少有一个磁盘节点。
队列结构?
通常由以下两部分组成?
rabbit_amqqueue_process:负责协议相关的消息处理,即接收生产者发布的消息、向消费者交付消息、处理消息的确认(包括生产端的 confirm 和消费端的 ack) 等。
backing_queue:是消息存储的具体形式和引擎,并向 rabbit amqqueue process提供相关的接口以供调用。
RabbitMQ中消息可能有的几种状态?
alpha: 消息内容(包括消息体、属性和 headers) 和消息索引都存储在内存中 。
beta: 消息内容保存在磁盘中,消息索引保存在内存中。
gamma: 消息内容保存在磁盘中,消息索引在磁盘和内存中都有 。
delta: 消息内容和索引都在磁盘中 。
在何种场景下使用了消息中间件?
- 接口之间耦合比较严重
- 面对大流量并发时,容易被冲垮
- 存在性能问题
生产者如何将消息可靠投递到MQ?
- Client发送消息给MQ
- MQ将消息持久化后,发送Ack消息给Client,此处有可能因为网络问题导致Ack消息无法发送到Client,那么Client在等待超时后,会重传消息;
- Client收到Ack消息后,认为消息已经投递成功。
如何将消息可靠投递到消费者?
- 将消息push给Client(或Client来pull消息)
- Client得到消息并做完业务逻辑
- Client发送Ack消息给MQ,通知MQ删除该消息,此处有可能因为网络问题导致Ack失败,那么Client会重复消息,这里就引出消费幂等的问题;
- MQ将已消费的消息删除
消费幂等性?
生产者在发送消息时,为每条消息设置一个全局唯一的messageId,消费者拿到消息后,使用setnx命令,将messageId作为key放到redis中:setnx(messageId,1),若返回1,说明之前没有消费过,正常消费;若返回0,说明这条消息之前已消费过,抛弃。
:::info
setnx 命令(分布式锁),若给定的key不存在,执行set操作,返回1,若给定的Key已存在,不做任何操作,返回0。
:::
数据库中解决幂等性(补偿机制 + 乐观锁)
- 当确认消息发送失败,回调检查服务会用延迟消息比对数据库中是否存在确认消息,如果没有会通知生产者继续发送同样的业务数据
- 当定时检查服务检查确认消息小于业务数据的时候也会调用生产者重发业务消息到MQ
这时候队列Q1中可能会有多条一摸一样的消息,当消费者的服务恢复正常之后,Q1队列中这些一样的消息都会被Q1读取并进行业务处理。此时如果消费者没有幂等性的保证的话,就会出现“多扣钱”的问题。
-- 消息内容:
{id = 1, money = 500, operation = subtract, version = 1}
-- 第一次执行
update account set money = money - 500, version = version + 1 where id = 1 and version = 1;
--第二次执行的时候,version = 2,所以这条sql无法匹配到数据就不做扣钱处理了
update account set money = money - 500, version = version + 1 where id = 1 and version = 1;
如何保证RabbitMQ消息队列的高可用?
RabbitMQ 有三种模式:单机模式,普通集群模式,镜像集群模式。
- 单机模式:就是demo级别的,一般就是你本地启动了玩玩儿的,没人生产用单机模式
- 普通集群模式:意思就是在多台机器上启动多个RabbitMQ实例,每个机器启动一个。
- 镜像集群模式:这种模式,才是所谓的RabbitMQ的高可用模式,跟普通集群模式不一样的是,你创建的queue,无论元数据(元数据指RabbitMQ的配置数据)还是queue里的消息都会存在于多个实例上,然后每次你写消息到queue的时候,都会自动把消息到多个实例的queue里进行消息同步。
消息基于什么传输?
由于TCP连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。RabbitMQ使用信道的方式来传输数据。信道是建立在真实的TCP连接内的虚拟连接,且每条TCP连接上的信道数量没有限制。
集群中的节点?
- 内存节点:ram,将变更写入内存。
- 磁盘节点:disc,磁盘写入操作。
RabbitMQ 要求最少有一个磁盘节点。
集群节点类型有哪几种?
内存节点:保存状态到内存,但持久化的队列和消息还是会保存到磁盘;
磁盘节点:保存状态到内存和磁盘,一个集群中至少需要一个磁盘节点
如何避免消息重复投递或者重复消费?
在消息生产时,MQ内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重和幂等的依据(消息投递失败并重传),避免重复的消息进入队列;在消息消费时,要求消息体中必须要有一个bizId(对于同一业务全局唯一,如支付ID、订单ID、帖子ID等)作为去重和幂等的依据,避免同一条消息被重复消费。
这个问题针对业务场景来答分以下几点:
- 拿到这个消息做数据库的insert操作。然后给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
- 拿到这个消息做Redis的set的操作,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。
- 如果上面两种情况还不行。准备一个第三方介质,来做消费记录。以Redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入Redis。那消费者开始消费前,先去Redis中查询有没消费记录即可。
routing_key 和 binding_key 最大长度?
255字节
消息怎么路由?
从概念上来说,消息路由必须有三部分:交换器、路由、绑定。生产者把消息到交换器上;绑定决定了消息如何从路由器路由到特定的队列;消息最终到达队列,并被消费者接收。
消息到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。通过队列路由键,可以把队列绑定到交换器上。消息到达交换器后,RabbitMQ会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则)。如果能够匹配到队列,则消息会投递到相应队列中;如果不能匹配到任何队列,消息将进入 “黑洞”。
常用的交换器主要分为一下三种:
- direct:如果路由键完全匹配,消息就被投递到相应的队列
- fanout:如果交换器收到消息,将会广播到所有绑定的队列上
- topic:可以使来自不同源头的消息能够到达同一个队列。使用topic交换器时,可以使用通配符。比如:“*” 匹配特定位置的任意文本, “.” 把路由键分为了几部分,“#” 匹配所有规则等。特别注意:发往topic交换器的消息不能随意的设置选择键(routing_key),必须是由"."隔开的一系列的标识符组成。
RabbitMQ 消息发布订阅?
- 每个消费者监听自己的队列;
- 生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
能够在地理位置上分开数据使用集群?
不能。第一,你无法控制所创建的 queue 实际分布在 cluster 里的哪个 node 上(一般使用 HAProxy + cluster 模型时都是这样),这可能会导致各种跨地域访问时的常见问题;第二,Erlang 的 OTP 通信框架对延迟的容忍度有限,这可能会触发各种超时,导致业务疲于处理;第三,在广域网上的连接失效问题将导致经典的“脑裂”问题,而RabbitMQ 目前无法处理(该问题主要是说 Mnesia)。
什么情况出现blackholed?
blackholed 问题是指,向 exchange 投递了 message ,而由于各种原因导致该message 丢失,但发送者却不知道。
可导致 blackholed 的情况:
- 向未绑定 queue 的exchange 发送 message;
- exchange 以 binding_key key_A 绑定了 queue queue_A,但向该 exchange 发送 message 使用的 routing_key 却是 key_B。
如何解决消息队列的延时以及过期失效问题消息队列满了以后该怎么处理有几百万消息持续积压几小时怎么办?
消息积压处理办法:临时紧急扩容:
- 先修复 consumer 的问题,确保其恢复消费速度,然后将现有 cnosumer 都停掉。
- 新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。
- 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
- 接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
- 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。
- MQ中消息失效:假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。也只能是这样了。假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。
mq消息队列块满了:
如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧。
如何自动删除长时间未消费的消息?
为队列或者消息是在定时。