目录
kafka 体验卡
# 解压 kafka
tar -zxvf kafka_2.13-3.8.1.tgz
# 进入安装目录
cd /home/app/kafka/kafka_2.13-3.8.1
# 基于默认配置 单机启动 kafka
nohup bin/kafka-server-start.sh config/server.properties &
# 基于默认配置 启动 kafka 自带的 zookeeper 服务
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
# 创建一个名为 test 的 topic,kafka默认端口为 9092
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic test --create
# 查看 topic 列表bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic test --list
创建一个基于控制台的 向 test topic 发消息的生产者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
创建一个基于控制台的 接受 test topic 的消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
核心概念
partition
- 一个 Topic 可以被分成多个 Partition(物理分区),每个 Partition 是一个 有序、不可变的消息日志,消息以追加的方式写入 Partition。
- Kafka 通过多个 Partition 来实现消息吞吐量的提升。可以把不同的 Partition 分配到不同的 Broker 上,从而分散负载,在创建topci的时候可以指定 partition 的数量,这一组 partition 会选举一个 leader 负责对外的读写请求,其他的 Replicas(副本) 负责复制数据
- 通过 Replication(副本),每个 Partition 可以有多个副本分布在不同的 Broker 上,防止数据丢失。
- Kafka 只保证 同一个 Partition 内的消息是有序的,不同 Partition 之间不保证顺序。
- 不同的消费者可以消费不同的 Partition,提高消费能力。
消费者组
- 在创建消费者的时候指定一样的
group.id
就是一组消费者组
搭建Zookeeper集群
基于3台服务器搭建zookeeper集群,以其中一个服务器为例子(下载 apache-zookeeper-3.8.4-bin.tar 的时候注意名字带有 -bin的才是)
# 解压zookeeper
cd /home/app/zookeeper
tar -xzf apache-zookeeper-3.8.4-bin.tar.gz
cd /home/app/zookeeper/apache-zookeeper-3.8.4-bin
# 创建 data 和 logs 目录mkdir -p ./data ./logs
# 复制一份默认的配置文件模板,并重命名为conf/zoo.cfg(zookeeper默认识别配置文件zoo.cfg)
cp conf/zoo_sample.cfg conf/zoo.cfg
# 配置zoo.cfg
vim
# 创建 myid 文件,该文件 中的 1 是 zookeeper 的唯一标识,相当于身份证echo "1" > /home/app/zookeeper/apache-zookeeper-3.8.4-bin/data/myid
# 启动 zookeeper
bin/zkServer.sh --config conf start
搭建Kafak集群
基于3台服务器搭建kafka集群,以其中一个服务器为例子
编辑 server.properties 配置文件
vim config/server.properties
后台启动 kafkanohup /home/app/kafka/kafka_2.13-3.8.1/bin/kafka-server-start.sh config/server.properties &
Kafka 消息流转模型
消费者组分组消费机制
消费者消费完消息要给服务端响应,如果没有给响应,服务端就会认为消费者消费失败,就会重新给消费者组的其他成员推送消息。
消息消费到哪里了是由服务端的 offset 决定的 不由消费者决定。
- 当服务端消息丢失的时候,消费者是不知道的,会继续向服务端拉消息,为了解决这个问题可以在编写 消费者代码的时候去设置当拉不到消息的时候抛异常等。
- 当服务端的 offset 记录出错的时,那么消费者拉取到的消息就是错误的,为了解决这个问题,企业中常用做法是:当消费者拉取消息的时候,把这个 topic 作为hash key ,offset 作为 value 缓存到 redis 中。当下一次拉取消息的时候会去读 redis,比较上一次的 offset 就可以知道来取的消息是否有误,还可以做幂等性校验。
生产者拦截器机制
生产者拦截器(Producer Interceptor) 是一种允许你在消息发送到 Kafka 之前或之后执行自定义逻辑的机制。这种机制可以帮助你实现诸如监控、修改消息内容、记录日志等功能,而无需改变业务逻辑代码。
实现 org.apache.kafka.clients.producer.ProducerInterceptor 接口,并重写以下方法:
注意:此方法在 Kafka 客户端内部线程中执行,因此应尽量避免在此方法中执行耗时操作,以免影响性能。
消息序列号机制
kafka 服务端是以 二进制数组 接收传输的,因此在发送和接收消息都要做序列号处理。
kafka大多处理的是海量数据,这个 序列化的性能 以及 序列号后的比特大小 对 kafka 的影响会被放大N倍, 序列号后的比特大小 也是影响 消息存储 所占磁盘多少的一个很大因素。
对与传 常规的 字符串 或者 数字,可以直接使用 kafak 提供的序列号工具即可。如果传的是 对象User 就需要自定义序列号功能,因为kafak 没有提供 这种对象的 序列号。至于如何写出高性能序列号 交给AI吧
消息分区路由机制
- 顺序保证:Kafka 只能保证在单个分区内的消息顺序。因此,如果你的应用需要保持某些消息的顺序,可以通过控制这些消息进入同一个分区来实现。
- 负载均衡:通过合理地将消息分发到不同的分区,可以有效地分散负载,提高系统的吞吐量和响应速度。
- 容错能力:每个分区都有多个副本(Replicas),这提供了数据冗余,增强了系统的容错能力和可靠性。
生产者分区路由机制
- 如果在
ProducerRecord<K, V>
中指定了 Key,那么 Kafka 将根据 Key 的哈希值来选择分区。也可以通过设置partitioner.class
属性来自定义分区策略,从而改变默认的分区逻辑 - 如果没有指定 Key,Kafka 会采用轮询的方式将消息均匀地分配给所有可用的分区。注意,这里的“轮询”是基于每个生产者的独立计数器进行的,而不是全局的。
- 通过实现
org.apache.kafka.clients.producer.Partitioner
接口来自定义分区逻辑。例如,根据业务需求将特定类型的消息定向到特定的分区。
当一个主题有多个分区,并且这些分区需要被分配给一个消费者组中的多个消费者时,Kafka 使用所谓的“分区分配策略”来决定哪个消费者应该负责哪个分区。
消费者分区路由机制
- RangeAssignor:将每个主题的分区按顺序排列,并平均分配给消费者。如果不能均分,则前面的消费者会多分配一个或几个分区。
RoundRobinAssignor:将所有主题的所有分区按照字典序排序,然后轮询地分配给消费者
StickyAssignor:保持现有的分区分配尽可能不变的同时,尽量做到均衡分配。当发生再平衡时,它会尝试最小化分区迁移的数量
生产者消息缓存机制
发送应答机制
🛠️ acks 参数详解
是控制生产者发送消息后如何得到确认的核心参数。 客户端生产者可以配置
1. acks=0
- 含义:生产者不会等待来自服务器的任何确认。消息一旦被发送出去,就认为已经成功了。
- 优点:提供最高的吞吐量,因为不需要等待确认。
- 缺点:如果消息没有到达 broker 或者 broker 在接收消息后崩溃,消息将会丢失。
2. acks=1
- 含义:生产者会等待 leader 副本确认已收到消息。这意味着只要 leader 已经写入消息,即使 follower 还没有复制该消息,生产者也会认为消息已成功发送。
- 优点:相比于
acks=0
提供更高的可靠性,同时保持较好的性能。 - 缺点:如果 leader 在确认之后但在同步给 follower 之前崩溃,则消息可能会丢失。
3. acks=all
或 -1
- 含义:生产者将等待所有同步副本(包括leader partition 和 副本)确认收到消息。这是最强的一致性保障模式。
- 优点:提供了最高级别的数据可靠性,确保消息不仅被 leader 接收,还至少被一个 follower 成功复制。
- 缺点:由于需要等待更多的确认,这会导致较高的延迟,并可能降低系统的吞吐量。

生产者消息幂等性

生产者消息压缩 与 消息事务机制
消息压缩机制

消息事务机制
生产者的幂等性只能保证在单机中?
事务机制依赖于幂等性来避免重复消息的问题。
Producer提供了对应的API:
SpringBoot 集成 Kafka
<!-- spring 社区提供的 kafka 依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>