【Kafka】消息队列Kafka知识总结
【一】消息队列
【1】什么是消息队列
我们可以把消息队列看作是一个存放消息的容器,当我们需要使用消息的时候,直接从容器中取出消息供自己使用即可。由于队列 Queue 是一种先进先出的数据结构,所以消费消息时也是按照顺序来消费的。
参与消息传递的双方称为 生产者 和 消费者 ,生产者负责发送消息,消费者负责处理消息。
【2】消息队列有什么用
通常来说,使用消息队列主要能为我们的系统带来下面三点好处:
(1)异步处理
(2)削峰/限流
(3)降低系统耦合性
除了这三点之外,消息队列还有其他的一些应用场景,例如实现分布式事务、顺序保证和数据流处理。
(1)异步处理
将用户请求中包含的耗时操作,通过消息队列实现异步处理,将对应的消息发送到消息队列之后就立即返回结果,减少响应时间,提高用户体验。随后,系统再对消息进行消费。
因为用户请求数据写入消息队列之后就立即返回给用户了,但是请求数据在后续的业务校验、写数据库等操作中可能失败。因此,使用消息队列进行异步处理之后,需要适当修改业务流程进行配合,比如用户在提交订单之后,订单数据写入消息队列,不能立即返回用户订单提交成功,需要在消息队列的订单消费者进程真正处理完该订单之后,甚至出库后,再通过电子邮件或短信通知用户订单成功,以免交易纠纷。这就类似我们平时手机订火车票和电影票。
(2)削峰/限流
先将短时间高并发产生的事务消息存储在消息队列中,然后后端服务再慢慢根据自己的能力去消费这些消息,这样就避免直接把后端服务打垮掉。
举例:在电子商务一些秒杀、促销活动中,合理使用消息队列可以有效抵御促销活动刚开始大量订单涌入对系统的冲击。如下图所示:
(3)降低系统耦合性
使用消息队列还可以降低系统耦合性。如果模块之间不存在直接调用,那么新增模块或者修改模块就对其他模块影响较小,这样系统的可扩展性无疑更好一些。
生产者(客户端)发送消息到消息队列中去,消费者(服务端)处理消息,需要消费的系统直接去消息队列取消息进行消费即可而不需要和其他系统有耦合,这显然也提高了系统的扩展性。
消息队列使用发布-订阅模式工作,消息发送者(生产者)发布消息,一个或多个消息接受者(消费者)订阅消息。 从上图可以看到消息发送者(生产者)和消息接受者(消费者)之间没有直接耦合,消息发送者将消息发送至分布式消息队列即结束对消息的处理,消息接受者从分布式消息队列获取该消息后进行后续处理,并不需要知道该消息从何而来。对新增业务,只要对该类消息感兴趣,即可订阅该消息,对原有系统和业务没有任何影响,从而实现网站业务的可扩展性设计。
例如,我们商城系统分为用户、订单、财务、仓储、消息通知、物流、风控等多个服务。用户在完成下单后,需要调用财务(扣款)、仓储(库存管理)、物流(发货)、消息通知(通知用户发货)、风控(风险评估)等服务。使用消息队列后,下单操作和后续的扣款、发货、通知等操作就解耦了,下单完成发送一个消息到消息队列,需要用到的地方去订阅这个消息进行消息即可。
另外,为了避免消息队列服务器宕机造成消息丢失,会将成功发送到消息队列的消息存储在消息生产者服务器上,等消息真正被消费者服务器处理后才删除消息。在消息队列服务器宕机后,生产者服务器会选择分布式消息队列服务器集群中的其他服务器发布消息。
备注: 不要认为消息队列只能利用发布-订阅模式工作,只不过在解耦这个特定业务环境下是使用发布-订阅模式的。除了发布-订阅模式,还有点对点订阅模式(一个消息只有一个消费者),我们比较常用的是发布-订阅模式。另外,这两种消息模型是 JMS 提供的,AMQP 协议还提供了另外 5 种消息模型。
(4)实现分布式事务
分布式事务的解决方案之一就是 MQ 事务。
RocketMQ、 Kafka、Pulsar、QMQ 都提供了事务相关的功能。事务允许事件流应用将消费,处理,生产消息整个过程定义为一个原子操作。
(5)顺序保证
在很多应用场景中,处理数据的顺序至关重要。消息队列保证数据按照特定的顺序被处理,适用于那些对数据顺序有严格要求的场景。大部分消息队列,例如 RocketMQ、RabbitMQ、Pulsar、Kafka,都支持顺序消息。
(6)延时/定时处理
消息发送后不会立即被消费,而是指定一个时间,到时间后再消费。大部分消息队列,例如 RocketMQ、RabbitMQ、Pulsar、Kafka,都支持定时/延时消息。
(7)即时通讯
MQTT(消息队列遥测传输协议)是一种轻量级的通讯协议,采用发布/订阅模式,非常适合于物联网(IoT)等需要在低带宽、高延迟或不可靠网络环境下工作的应用。它支持即时消息传递,即使在网络条件较差的情况下也能保持通信的稳定性。
RabbitMQ 内置了 MQTT 插件用于实现 MQTT 功能(默认不启用,需要手动开启)。
(8)数据流处理
针对分布式系统产生的海量数据流,如业务日志、监控数据、用户行为等,消息队列可以实时或批量收集这些数据,并将其导入到大数据处理引擎中,实现高效的数据流管理和处理。
【3】使用消息队列会带来哪些问题
(1)系统可用性降低: 系统可用性在某种程度上降低,为什么这样说呢?在加入 MQ 之前,你不用考虑消息丢失或者说 MQ 挂掉等等的情况,但是,引入 MQ 之后你就需要去考虑了!
(2)系统复杂性提高: 加入 MQ 之后,你需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题!
(3)一致性问题: 上面讲了消息队列可以实现异步,消息队列带来的异步确实可以提高系统响应速度。但是,万一消息的真正消费者并没有正确消费消息怎么办?这样就会导致数据不一致的情况了
【4】JMS和AMQP
(1)JMS是什么
JMS(JAVA Message Service,java 消息服务)是 Java 的消息服务,JMS 的客户端之间可以通过 JMS 服务进行异步的消息传输。JMS(JAVA Message Service,Java 消息服务)API 是一个消息服务的标准或者说是规范,允许应用程序组件基于 JavaEE 平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。
JMS 定义了五种不同的消息正文格式以及调用的消息类型,允许你发送并接收以一些不同形式的数据:
(1)StreamMessage:Java 原始值的数据流
(2)MapMessage:一套名称-值对
(3)TextMessage:一个字符串对象
(4)ObjectMessage:一个序列化的 Java 对象
(5)BytesMessage:一个字节的数据流
ActiveMQ(已被淘汰) 就是基于 JMS 规范实现的。
(2)JMS两种消息模型
(1)点到点(P2P)模型
使用队列(Queue)作为消息通信载体;满足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。比如:我们生产者发送 100 条消息的话,两个消费者来消费一般情况下两个消费者会按照消息发送的顺序各自消费一半(也就是你一个我一个的消费。)
(2)发布/订阅(Pub/Sub)模型
发布订阅模型(Pub/Sub) 使用主题(Topic)作为消息通信载体,类似于广播模式;发布者发布一条消息,该消息通过主题传递给所有的订阅者。
(3)AMQP是什么
AMQP,即 Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准 高级消息队列协议(二进制应用层协议),是应用层协议的一个开放标准,为面向消息的中间件设计,兼容 JMS。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。
RabbitMQ 就是基于 AMQP 协议实现的
(4)JMS对比AMQP
(5)总结
(1)AMQP 为消息定义了线路层(wire-level protocol)的协议,而 JMS 所定义的是 API 规范。在 Java 体系中,多个 client 均可以通过 JMS 进行交互,不需要应用修改代码,但是其对跨平台的支持较差。而 AMQP 天然具有跨平台、跨语言特性。
(2)JMS 支持 TextMessage、MapMessage 等复杂的消息类型;而 AMQP 仅支持 byte[] 消息类型(复杂的类型可序列化后发送)。
(3)由于 Exchange 提供的路由算法,AMQP 可以提供多样化的路由方式来传递消息到消息队列,而 JMS 仅支持 队列 和 主题/订阅 方式两种。
【5】RPC和消息队列的区别
RPC 和消息队列都是分布式微服务系统中重要的组件之一,简单对比一下两者:
(1)从用途来看
RPC 主要用来解决两个服务的远程通信问题,不需要了解底层网络的通信机制。通过 RPC 可以帮助我们调用远程计算机上某个服务的方法,这个过程就像调用本地方法一样简单。消息队列主要用来降低系统耦合性、实现任务异步、有效地进行流量削峰。
(2)从通信方式来看
RPC 是双向直接网络通讯,消息队列是单向引入中间载体的网络通讯。
(3)从架构上来看
消息队列需要把消息存储起来,RPC 则没有这个要求,因为前面也说了 RPC 是双向直接网络通讯。
(4)从请求处理的时效性来看
通过 RPC 发出的调用一般会立即被处理,存放在消息队列中的消息并不一定会立即被处理。
RPC 和消息队列本质上是网络通讯的两种不同的实现机制,两者的用途不同
【6】消息队列和线程池异步的区别
【二】分布式消息队列选型
【三】Kafka基础
【1】Kafka是什么?主要应用场景?
Kafka 是一个分布式流式处理平台。
流平台具有三个关键功能:
(1)消息队列:发布和订阅消息流,这个功能类似于消息队列,这也是 Kafka 也被归类为消息队列的原因。
(2)容错的持久方式存储记录消息流:Kafka 会把消息持久化到磁盘,有效避免了消息丢失的风险。
(3)流式处理平台: 在消息发布的时候进行处理,Kafka 提供了一个完整的流式处理类库。
Kafka 主要有两大应用场景:
(1)消息队列:建立实时流数据管道,以可靠地在系统或应用程序之间获取数据。
(2)数据处理: 构建实时的流数据处理程序来转换或处理数据流。
【2】优势
(1)极致的性能:基于 Scala 和 Java 语言开发,设计中大量使用了批量处理和异步的思想,最高可以每秒处理千万级别的消息。
(2)生态系统兼容性无可匹敌:Kafka 与周边生态系统的兼容性是最好的没有之一,尤其在大数据和流计算领域。
【3】核心概念
Kafka 将生产者发布的消息发送到 Topic(主题) 中,需要这些消息的消费者可以订阅这些 Topic(主题),如下图所示:
(1)Producer(生产者) : 产生消息的一方。
(2)Consumer(消费者) : 消费消息的一方。
(3)Broker(代理) : 可以看作是一个独立的 Kafka 实例。多个 Kafka Broker 组成一个 Kafka Cluster。
(4)Topic(主题) : Producer 将消息发送到特定的主题,Consumer 通过订阅特定的 Topic(主题) 来消费消息。
(5)Partition(分区) : Partition 属于 Topic 的一部分。一个 Topic 可以有多个 Partition ,并且同一 Topic 下的 Partition 可以分布在不同的 Broker 上,这也就表明一个 Topic 可以横跨多个 Broker 。这正如我上面所画的图一样。Kafka 中的 Partition(分区) 实际上可以对应成为消息队列中的队列
【4】副本机制
Kafka 为分区(Partition)引入了多副本(Replica)机制。分区(Partition)中的多个副本之间会有一个叫做 leader 的家伙,其他副本称为 follower。我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。
生产者和消费者只与 leader 副本交互。可以理解为其他副本只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。当 leader 副本发生故障时会从 follower 中选举出一个 leader,但是 follower 中如果有和 leader 同步程度达不到要求的参加不了 leader 的竞选。
多副本的好处:
(1)Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力(负载均衡)。
(2)Partition 可以指定对应的 Replica 数, 这也极大地提高了消息存储的安全性, 提高了容灾能力,不过也相应的增加了所需要的存储空间。
【5】Zookeeper的作用
下图是本地 Zookeeper ,和本地的 Kafka 关联上,ZooKeeper 主要为 Kafka 提供元数据的管理的功能。
Zookeeper 主要为 Kafka 做了下面这些事情:
(1)Broker 注册:在 Zookeeper 上会有一个专门用来进行 Broker 服务器列表记录的节点。每个 Broker 在启动时,都会到 Zookeeper 上进行注册,即到 /brokers/ids 下创建属于自己的节点。每个 Broker 就会将自己的 IP 地址和端口等信息记录到该节点中去
(2)Topic 注册:在 Kafka 中,同一个Topic 的消息会被分成多个分区并将其分布在多个 Broker 上,这些分区信息及与 Broker 的对应关系也都是由 Zookeeper 在维护。比如我创建了一个名字为 my-topic 的主题并且它有两个分区,对应到 zookeeper 中会创建这些文件夹:/brokers/topics/my-topic/Partitions/0、/brokers/topics/my-topic/Partitions/1
(3)负载均衡:上面也说过了 Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力。 对于同一个 Topic 的不同 Partition,Kafka 会尽力将这些 Partition 分布到不同的 Broker 服务器上。当生产者产生消息后也会尽量投递到不同 Broker 的 Partition 里面。当 Consumer 消费的时候,Zookeeper 可以根据当前的 Partition 数量以及 Consumer 数量来实现动态负载均衡。
(4)……
在 Kafka 2.8 之前,Kafka 最被大家诟病的就是其重度依赖于 Zookeeper。在 Kafka 2.8 之后,引入了基于 Raft 协议的 KRaft 模式,不再依赖 Zookeeper,大大简化了 Kafka 的架构,让你可以以一种轻量级的方式来使用 Kafka。
【四】Kafka问题解决
【1】保证消息的消费顺序
在使用消息队列的过程中经常有业务场景需要严格保证消息的消费顺序,比如我们同时发了 2 个消息,这 2 个消息对应的操作分别对应的数据库操作是:
(1)更改用户会员等级。
(2)根据会员等级计算订单价格。
假如这两条消息的消费顺序不一样造成的最终结果就会截然不同。
Kafka 中 Partition(分区)是真正保存消息的地方,我们发送的消息都被放在了这里。而我们的 Partition(分区) 又存在于 Topic(主题) 这个概念中,并且我们可以给特定 Topic 指定多个 Partition。
每次添加消息到 Partition(分区) 的时候都会采用尾加法,如上图所示。 Kafka 只能为我们保证 Partition(分区) 中的消息有序。
消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。Kafka 通过偏移量(offset)来保证消息在分区内的顺序性。
所以,我们就有一种很简单的保证消息消费顺序的方法:1 个 Topic 只对应一个 Partition。这样当然可以解决问题,但是破坏了 Kafka 的设计初衷。
Kafka 中发送 1 条消息的时候,可以指定 topic, partition, key,data(数据) 4 个参数。如果你发送消息的时候指定了 Partition 的话,所有消息都会被发送到指定的 Partition。并且,同一个 key 的消息可以保证只发送到同一个 partition,这个我们可以采用表/对象的 id 来作为 key 。
(1)1 个 Topic 只对应一个 Partition。
(2)(推荐)发送消息的时候指定 key/Partition。
【2】保证消息不丢失
(1)生产者丢失消息
生产者(Producer) 调用send方法发送消息之后,消息可能因为网络问题并没有发送过去。
所以,我们不能默认在调用send方法发送消息之后消息发送成功了。为了确定消息是发送成功,我们要判断消息发送的结果。但是要注意的是 Kafka 生产者(Producer) 使用 send 方法发送消息实际上是异步的操作,我们可以通过 get()方法获取调用结果,但是这样也让它变为了同步操作,示例代码如下:
SendResult<String, Object> sendResult = kafkaTemplate.send(topic, o).get();
if (sendResult.getRecordMetadata() != null) {
logger.info("生产者成功发送消息到" + sendResult.getProducerRecord().topic() + "-> " + sendRe
sult.getProducerRecord().value().toString());
}
但是一般不推荐这么做!可以采用为其添加回调函数的形式,示例代码如下:
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
future.addCallback(result -> logger.info("生产者成功发送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),
ex -> logger.error("生产者发送消失败,原因:{}", ex.getMessage()));
如果消息发送失败的话,我们检查失败的原因之后重新发送即可!
另外,这里推荐为 Producer 的retries(重试次数)设置一个比较合理的值,一般是 3 ,但是为了保证消息不丢失的话一般会设置比较大一点。设置完成之后,当出现网络问题之后能够自动重试消息发送,避免消息丢失。另外,建议还要设置重试间隔,因为间隔太小的话重试的效果就不明显了,网络波动一次你 3 次一下子就重试完了。
(2)消费者丢失消息
消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。偏移量(offset)表示 Consumer 当前消费到的 Partition(分区)的所在的位置。Kafka 通过偏移量(offset)可以保证消息在分区内的顺序性。
当消费者拉取到了分区的某个消息之后,消费者会自动提交了 offset。自动提交的话会有一个问题,试想一下,当消费者刚拿到这个消息准备进行真正消费的时候,突然挂掉了,消息实际上并没有被消费,但是 offset 却被自动提交了。
解决办法也比较粗暴,我们手动关闭自动提交 offset,每次在真正消费完消息之后再自己手动提交 offset 。 但是,这样会带来消息被重新消费的问题。比如你刚刚消费完消息之后,还没提交 offset,结果自己挂掉了,那么这个消息理论上就会被消费两次。
(3)Kafka丢失消息
Kafka 为分区(Partition)引入了多副本(Replica)机制。分区(Partition)中的多个副本之间会有一个叫做 leader 的家伙,其他副本称为 follower。我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。生产者和消费者只与 leader 副本交互。你可以理解为其他副本只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。
试想一种情况:假如 leader 副本所在的 broker 突然挂掉,那么就要从 follower 副本重新选出一个 leader ,但是 leader 的数据还有一些没有被 follower 副本的同步的话,就会造成消息丢失。
1-设置 acks = all 副本全部收到消息再回调
解决办法就是我们设置 acks = all。acks 是 Kafka 生产者(Producer) 很重要的一个参数。
acks 的默认值即为 1,代表我们的消息被 leader 副本接收之后就算被成功发送。当我们配置 acks = all 表示只有所有 ISR 列表的副本全部收到消息时,生产者才会接收到来自服务器的响应. 这种模式是最高级别的,也是最安全的,可以确保不止一个 Broker 接收到了消息. 该模式的延迟会很高.
2-设置 replication.factor >= 3 副本数
为了保证 leader 副本能有 follower 副本能同步消息,我们一般会为 topic 设置 replication.factor >= 3。这样就可以保证每个 分区(partition) 至少有 3 个副本。虽然造成了数据冗余,但是带来了数据的安全性。
3-设置 min.insync.replicas > 1
一般情况下我们还需要设置 min.insync.replicas> 1 ,这样配置代表消息至少要被写入到 2 个副本才算是被成功发送。min.insync.replicas 的默认值为 1 ,在实际生产中应尽量避免默认值 1。
但是,为了保证整个 Kafka 服务的高可用性,你需要确保 replication.factor > min.insync.replicas 。为什么呢?设想一下假如两者相等的话,只要是有一个副本挂掉,整个分区就无法正常工作了。这明显违反高可用性!一般推荐设置成 replication.factor = min.insync.replicas + 1。
4-设置 unclean.leader.election.enable = false
我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。多个 follower 副本之间的消息同步情况不一样,当我们配置了 unclean.leader.election.enable = false 的话,当 leader 副本发生故障时就不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader ,这样降低了消息丢失的可能性。
【3】保证消息不重复消费
kafka 出现消息重复消费的原因:
(1)服务端侧已经消费的数据没有成功提交 offset(根本原因,消费者没来得及提交offset就挂了)。
(2)Kafka 侧 由于服务端处理业务时间长或者网络链接等等原因让 Kafka 认为服务假死,触发了分区 rebalance。
解决方案:
(1)消费消息服务做幂等校验,比如 Redis 的 set、MySQL 的主键等天然的幂等功能。这种方法最有效。
(2)将 enable.auto.commit 参数设置为 false,关闭自动提交,开发者在代码中手动提交 offset。
那么这里会有个问题:什么时候提交 offset 合适?
1-处理完消息再提交:依旧有消息重复消费的风险,和自动提交一样
2-拉取到消息即提交:会有消息丢失的风险。允许消息延时的场景,一般会采用这种方式。然后,通过定时任务在业务不繁忙(比如凌晨)的时候做数据兜底。
(1)关闭自动提交,开启手动提交offset
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
enable-auto-commit: false # 关闭自动提交
auto-offset-reset: latest # 可选:设置为latest避免启动时重复消费
max-poll-records: 50 # 每次拉取的最大记录数
max-poll-interval-ms: 300000 # 处理消息的最大间隔
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
ack-mode: manual # 配置手动提交
(2)消费者处理完消息后再手动提交
@Service
public class OrderConsumerService {
@Autowired
private OrderService orderService;
@KafkaListener(topics = "orders-topic", groupId = "order-consumer-group")
public void listen(ConsumerRecord<String, String> record, Acknowledgment ack) {
try {
String orderJson = record.value();
OrderDTO orderDTO = JSON.parseObject(orderJson, OrderDTO.class);
// 1. 处理订单(幂等性)
orderService.processOrder(orderDTO);
// 2. 手动提交offset
ack.acknowledge();
} catch (Exception e) {
log.error("处理订单消息失败: {}", e.getMessage(), e);
// 根据异常类型决定是否重试或跳过
if (e instanceof RetryableException) {
// 重试异常,不提交offset
throw e;
} else {
// 非重试异常,记录日志并提交offset
log.error("非重试异常,跳过消息: {}", record.value());
ack.acknowledge();
}
}
}
}
(3)添加数据库唯一约束实现幂等消费
消费消息的时候,根据数据库的唯一键判断消息是否已经消费过了,如果已消费就跳过
@Service
public class OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private MessageDeduplicationService deduplicationService;
@Transactional
public void processOrder(OrderDTO orderDTO) {
String messageId = orderDTO.getOrderId();
// 1. 检查消息是否已处理(Redis去重)
if (!deduplicationService.isMessageProcessed(messageId)) {
log.info("订单已处理,跳过: {}", messageId);
return;
}
// 2. 再次检查数据库中是否存在(双重检查)
Optional<Order> existingOrder = orderRepository.findByOrderId(messageId);
if (existingOrder.isPresent()) {
log.info("订单已存在,跳过: {}", messageId);
return;
}
// 3. 处理新订单
Order order = new Order();
order.setOrderId(messageId);
order.setAmount(orderDTO.getAmount());
order.setStatus("CREATED");
// 4. 保存订单
orderRepository.save(order);
// 5. 后续业务逻辑...
processPayment(order);
updateInventory(order);
}
// 其他方法...
}
使用redis校验的逻辑
@Service
public class MessageDeduplicationService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static final String DEDUPLICATION_KEY = "processed_messages:";
private static final long EXPIRE_TIME = 24 * 60 * 60; // 24小时过期
/**
* 检查消息是否已处理
*/
public boolean isMessageProcessed(String messageId) {
String key = DEDUPLICATION_KEY + messageId;
return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, "1", EXPIRE_TIME, TimeUnit.SECONDS));
}
}
【4】Kafka重试机制
(1)消费失败会怎么样?
在消费过程中,当其中一个消息消费异常时,会不会卡住后续队列消息的消费?这样业务岂不是卡住了?
生产者代码:
for (int i = 0; i < 10; i++) {
kafkaTemplate.send(KafkaConst.TEST_TOPIC, String.valueOf(i))
}
消费者消代码:
@KafkaListener(topics = {KafkaConst.TEST_TOPIC},groupId = "apple")
private void customer(String message) throws InterruptedException {
log.info("kafka customer:{}",message);
Integer n = Integer.parseInt(message);
if (n%5==0){
throw new RuntimeException();
}
}
在默认配置下,当消费异常会进行重试,重试多次后会跳过当前消息,继续进行后续消息的消费,不会一直卡在当前消息。下面是一段消费的日志,可以看出当 test-0@95 重试多次后会被跳过。
2023-08-10 12:03:32.918 DEBUG 9700 --- [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Skipping seek of: test-0@95
2023-08-10 12:03:32.918 TRACE 9700 --- [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Seeking: test-0 to: 96
2023-08-10 12:03:32.918 INFO 9700 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-apple-1, groupId=apple] Seeking to offset 96 for partition test-0
因此,即使某个消息消费异常,Kafka 消费者仍然能够继续消费后续的消息,不会一直卡在当前消息,保证了业务的正常进行。
(2)默认会重试多少次?
默认配置下,消费异常会进行重试,重试次数是多少, 重试是否有时间间隔?
看源码 FailedRecordTracker 类有个 recovered 函数,返回 Boolean 值判断是否要进行重试,下面是这个函数中判断是否重试的逻辑:
@Override
public boolean recovered(ConsumerRecord << ? , ? > record, Exception exception,
@Nullable MessageListenerContainer container,
@Nullable Consumer << ? , ? > consumer) throws InterruptedException {
if (this.noRetries) {
// 不支持重试
attemptRecovery(record, exception, null, consumer);
return true;
}
// 取已经失败的消费记录集合
Map < TopicPartition, FailedRecord > map = this.failures.get();
if (map == null) {
this.failures.set(new HashMap < > ());
map = this.failures.get();
}
// 获取消费记录所在的Topic和Partition
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
FailedRecord failedRecord = getFailedRecordInstance(record, exception, map, topicPartition);
// 通知注册的重试监听器,消息投递失败
this.retryListeners.forEach(rl - >
rl.failedDelivery(record, exception, failedRecord.getDeliveryAttempts().get()));
// 获取下一次重试的时间间隔
long nextBackOff = failedRecord.getBackOffExecution().nextBackOff();
if (nextBackOff != BackOffExecution.STOP) {
this.backOffHandler.onNextBackOff(container, exception, nextBackOff);
return false;
} else {
attemptRecovery(record, exception, topicPartition, consumer);
map.remove(topicPartition);
if (map.isEmpty()) {
this.failures.remove();
}
return true;
}
}
其中, BackOffExecution.STOP 的值为 -1。
@FunctionalInterface
public interface BackOffExecution {
long STOP = -1;
long nextBackOff();
}
nextBackOff 的值调用 BackOff 类的 nextBackOff() 函数。如果当前执行次数大于最大执行次数则返回 STOP,既超过这个最大执行次数后才会停止重试。
public long nextBackOff() {
this.currentAttempts++;
if (this.currentAttempts <= getMaxAttempts()) {
return getInterval();
}
else {
return STOP;
}
}
那么这个 getMaxAttempts 的值又是多少呢?回到最开始,当执行出错会进入 DefaultErrorHandler 。DefaultErrorHandler 默认的构造函数是:
public DefaultErrorHandler() {
this(null, SeekUtils.DEFAULT_BACK_OFF);
}
SeekUtils.DEFAULT_BACK_OFF 定义的是:
public static final int DEFAULT_MAX_FAILURES = 10;
public static final FixedBackOff DEFAULT_BACK_OFF = new FixedBackOff(0, DEFAULT_MAX_FAILURES - 1);
DEFAULT_MAX_FAILURES 的值是 10,currentAttempts 从 0 到 9,所以总共会执行 10 次,每次重试的时间间隔为 0。
最后,简单总结一下:Kafka 消费者在默认配置下会进行最多 10 次 的重试,每次重试的时间间隔为 0,即立即进行重试。如果在 10 次重试后仍然无法成功消费消息,则不再进行重试,消息将被视为消费失败。
(3)如何自定义重试次数以及时间间
从上面的代码可以知道,默认错误处理器的重试次数以及时间间隔是由 FixedBackOff 控制的,FixedBackOff 是 DefaultErrorHandler 初始化时默认的。所以自定义重试次数以及时间间隔,只需要在 DefaultErrorHandler 初始化的时候传入自定义的 FixedBackOff 即可。重新实现一个 KafkaListenerContainerFactory ,调用 setCommonErrorHandler 设置新的自定义的错误处理器就可以实现。
@Bean
public KafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
// 自定义重试时间间隔以及次数
FixedBackOff fixedBackOff = new FixedBackOff(1000, 5);
factory.setCommonErrorHandler(new DefaultErrorHandler(fixedBackOff));
factory.setConsumerFactory(consumerFactory);
return factory;
}
(4)如何在重试失败后进行告警?
自定义重试失败后逻辑,需要手动实现,以下是一个简单的例子,重写 DefaultErrorHandler 的 handleRemaining 函数,加上自定义的告警等操作。
@Slf4j
public class DelErrorHandler extends DefaultErrorHandler {
public DelErrorHandler(FixedBackOff backOff) {
super(null,backOff);
}
@Override
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
super.handleRemaining(thrownException, records, consumer, container);
log.info("重试多次失败");
// 自定义操作
}
}
DefaultErrorHandler 只是默认的一个错误处理器,Spring Kafka 还提供了 CommonErrorHandler 接口。手动实现 CommonErrorHandler 就可以实现更多的自定义操作,有很高的灵活性。例如根据不同的错误类型,实现不同的重试逻辑以及业务逻辑等
(5)重试失败后的数据如何再次处理?
当达到最大重试次数后,数据会直接被跳过,继续向后进行。当代码修复后,如何重新消费这些重试失败的数据呢?
死信队列(Dead Letter Queue,简称 DLQ) 是消息中间件中的一种特殊队列。它主要用于处理无法被消费者正确处理的消息,通常是因为消息格式错误、处理失败、消费超时等情况导致的消息被"丢弃"或"死亡"的情况。当消息进入队列后,消费者会尝试处理它。如果处理失败,或者超过一定的重试次数仍无法被成功处理,消息可以发送到死信队列中,而不是被永久性地丢弃。在死信队列中,可以进一步分析、处理这些无法正常消费的消息,以便定位问题、修复错误,并采取适当的措施。
@RetryableTopic 是 Spring Kafka 中的一个注解,它用于配置某个 Topic 支持消息重试,更推荐使用这个注解来完成重试。
// 重试 5 次,重试间隔 100 毫秒,最大间隔 1 秒
@RetryableTopic(
attempts = "5",
backoff = @Backoff(delay = 100, maxDelay = 1000)
)
@KafkaListener(topics = {KafkaConst.TEST_TOPIC}, groupId = "apple")
private void customer(String message) {
log.info("kafka customer:{}", message);
Integer n = Integer.parseInt(message);
if (n % 5 == 0) {
throw new RuntimeException();
}
System.out.println(n);
}
当达到最大重试次数后,如果仍然无法成功处理消息,消息会被发送到对应的死信队列中。对于死信队列的处理,既可以用 @DltHandler 处理,也可以使用 @KafkaListener 重新消费。
【五】实现思路案例
【1】配置文件
spring:
kafka:
bootstrap-servers: localhost:9092
# 生产者配置
producer:
acks: all # 所有副本都确认才返回成功
retries: 3 # 重试次数
retry-backoff-ms: 100 # 重试间隔
buffer-memory: 33554432 # 缓冲区大小
max-in-flight-requests-per-connection: 1 # 限制每个连接的未确认请求数
enable-idempotence: true # 启用幂等性
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 消费者配置
consumer:
group-id: order-consumer-group
enable-auto-commit: false # 关闭自动提交
auto-offset-reset: earliest # 从最早的消息开始消费
max-poll-records: 50 # 每次拉取的最大记录数
max-poll-interval-ms: 300000 # 处理消息的最大间隔
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
spring.kafka.listener.error-handler: customErrorHandler # 配置死信队列处理器
# 监听器配置
listener:
ack-mode: manual
concurrency: 3
missing-topics-fatal: false
【2】Broker 配置(确保消息持久化)
# server.properties
min.insync.replicas=2 # 至少2个副本同步
default.replication.factor=3 # 默认3个副本
unclean.leader.election.enable=false # 禁止非ISR节点成为leader
log.flush.interval.messages=1 # 每写入一条消息就刷新到磁盘
【3】生产者服务
@Service
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendOrder(OrderDTO orderDTO) {
String topic = "orders-topic";
String key = orderDTO.getOrderId();
String value = JSON.toJSONString(orderDTO);
// 发送消息并添加回调
ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(topic, key, value);
future.addCallback(
result -> log.info("订单消息发送成功: orderId={}, partition={}, offset={}",
orderDTO.getOrderId(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset()),
ex -> log.error("订单消息发送失败: orderId={}", orderDTO.getOrderId(), ex)
);
}
}
【4】消费者服务
@Service
public class OrderConsumerService {
@Autowired
private OrderService orderService;
@Autowired
private MessageDeduplicationService deduplicationService;
@KafkaListener(
topics = "orders-topic",
groupId = "order-consumer-group",
containerFactory = "kafkaListenerContainerFactory"
)
public void listen(ConsumerRecord<String, String> record, Acknowledgment ack) {
try {
String orderJson = record.value();
OrderDTO orderDTO = JSON.parseObject(orderJson, OrderDTO.class);
// 检查消息是否已处理
if (!deduplicationService.isMessageProcessed(orderDTO.getOrderId())) {
log.info("订单已处理,跳过: {}", orderDTO.getOrderId());
ack.acknowledge();
return;
}
// 处理订单
orderService.processOrder(orderDTO);
// 手动提交offset
ack.acknowledge();
} catch (Exception e) {
log.error("处理订单消息失败: {}", e.getMessage(), e);
// 根据异常类型决定是否重试或跳过
if (e instanceof RetryableException) {
// 重试异常,不提交offset
throw e;
} else {
// 非重试异常,记录日志并提交offset
log.error("非重试异常,跳过消息: {}", record.value());
ack.acknowledge();
}
}
}
}
【5】订单服务(幂等性)
双重校验,首先redis去重,然后mysql查询校验
@Service
public class MessageDeduplicationService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static final String DEDUPLICATION_KEY = "processed_messages:";
private static final long EXPIRE_TIME = 24 * 60 * 60; // 24小时过期
/**
* 检查消息是否已处理
*/
public boolean isMessageProcessed(String messageId) {
String key = DEDUPLICATION_KEY + messageId;
return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, "1", EXPIRE_TIME, TimeUnit.SECONDS));
}
}
@Service
public class OrderService {
@Autowired
private OrderRepository orderRepository;
@Transactional
public void processOrder(OrderDTO orderDTO) {
// 检查订单是否已存在
Optional<Order> existingOrder = orderRepository.findByOrderId(orderDTO.getOrderId());
if (existingOrder.isPresent()) {
log.info("订单已处理,跳过: {}", orderDTO.getOrderId());
return;
}
// 处理新订单
Order order = new Order();
order.setOrderId(orderDTO.getOrderId());
order.setAmount(orderDTO.getAmount());
order.setStatus("CREATED");
order.setCreateTime(new Date());
// 保存订单
orderRepository.save(order);
// 后续业务逻辑...
processPayment(order);
updateInventory(order);
}
private void processPayment(Order order) {
// 支付处理逻辑
log.info("处理订单支付: {}", order.getOrderId());
}
private void updateInventory(Order order) {
// 更新库存逻辑
log.info("更新库存: {}", order.getOrderId());
}
}
【6】消费失败监控与告警
全局异常处理器
@Configuration
public class KafkaErrorHandlerConfig {
@Bean
public KafkaListenerErrorHandler errorHandler() {
return new SeekToCurrentErrorHandler(
new FixedBackOff(1000, 3) // 重试3次,间隔1秒
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory,
KafkaListenerErrorHandler errorHandler) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// 配置错误处理器
factory.setErrorHandler(errorHandler);
return factory;
}
}
自定义错误处理器
@Component
public class CustomErrorHandler implements ConsumerAwareListenerErrorHandler {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private DeadLetterQueueService deadLetterQueueService;
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
log.error("消费消息失败: {}", message.getPayload(), exception);
// 获取原始消息
ConsumerRecord<?, ?> record = (ConsumerRecord<?, ?>) message.getHeaders().get(KafkaHeaders.RECEIVED_RECORD);
// 记录失败消息到数据库
saveFailedMessage(record, exception);
// 发送告警
sendAlert(record, exception);
// 发送到死信队列
deadLetterQueueService.sendToDlq(record, exception);
// 返回null表示处理完成,不需要重试
return null;
}
private void saveFailedMessage(ConsumerRecord<?, ?> record, Exception exception) {
FailedMessage failedMessage = new FailedMessage();
failedMessage.setTopic(record.topic());
failedMessage.setPartition(record.partition());
failedMessage.setOffset(record.offset());
failedMessage.setKey(record.key() != null ? record.key().toString() : null);
failedMessage.setValue(record.value().toString());
failedMessage.setException(exception.getMessage());
failedMessage.setStackTrace(ExceptionUtils.getStackTrace(exception));
failedMessage.setFailedTime(new Date());
// 保存到数据库
// failedMessageRepository.save(failedMessage);
}
private void sendAlert(ConsumerRecord<?, ?> record, Exception exception) {
String alertMessage = "Kafka消费失败: " +
"Topic=" + record.topic() +
", Partition=" + record.partition() +
", Offset=" + record.offset() +
", Exception=" + exception.getMessage();
// 发送告警(邮件、短信、钉钉等)
log.error(alertMessage);
}
}
【7】死信队列服务
@Service
public class DeadLetterQueueService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private FailedMessageRepository failedMessageRepository;
/**
* 发送消息到死信队列
*/
public void sendToDlq(ConsumerRecord<?, ?> record, Exception exception) {
String dlqTopic = record.topic() + ".DLQ";
// 创建死信消息
Map<String, Object> headers = new HashMap<>();
headers.put("original_topic", record.topic());
headers.put("original_partition", record.partition());
headers.put("original_offset", record.offset());
headers.put("exception_message", exception.getMessage());
headers.put("exception_stacktrace", ExceptionUtils.getStackTrace(exception));
// 构建消息
Message<String> message = MessageBuilder
.withPayload(record.value().toString())
.setHeaders(new MessageHeaders(headers))
.build();
// 发送到死信队列
kafkaTemplate.send(dlqTopic, record.key().toString(), record.value().toString());
// 记录到数据库
saveFailedMessage(record, exception);
}
/**
* 从死信队列重试消费
*/
@KafkaListener(topics = "orders.DLQ", groupId = "dlq-retry-group")
public void retryFromDlq(ConsumerRecord<String, String> record) {
try {
// 解析消息
OrderDTO orderDTO = JSON.parseObject(record.value(), OrderDTO.class);
// 尝试重新处理
orderService.processOrder(orderDTO);
// 处理成功,记录日志
log.info("从死信队列重试成功: {}", orderDTO.getOrderId());
// 更新数据库状态
updateRetryStatus(record, true);
} catch (Exception e) {
log.error("从死信队列重试失败: {}", record.value(), e);
// 更新数据库状态
updateRetryStatus(record, false);
// 可以考虑发送到重试队列或归档队列
sendToRetryQueue(record, e);
}
}
// 其他方法...
}