1. Kafka启动方式
我下载的是kafka_2.13-3.9.1版本,官网下载
1.1. 自带的zookeeper(也可独立安装)
# 先确认在 kafka 目录下
cd /path/to/kafka_2.13-3.9.1
# 后台启动zookeeper
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
# 后台启动Kafka
nohup bin/kafka-server-start.sh config/server.properties &
关闭:
# 关闭 Zookeeper
bin/zookeeper-server-stop.sh config/zookeeper.properties
# 关闭 Kafka
bin/kafka-server-stop.sh config/server.properties
1.2. jdk安装
运行Kafka需要安装jdk的,去官网下载一个linux版本的jdk17,解压到/usr/local
目录下,然后vim /etc/profile
进行 环境变量修改:
然后source /etc/profile
重新加载。
1.2. KRaft启动
从 Kafka 2.8 开始,Kafka 引入了 KRaft 模式(Kafka Raft Metadata mode),就是 用 Raft 协议自己管理元数据,不再需要 ZooKeeper。
# 生成集群uuid(这个命令会打印出一个随机 UUID,比如76ubojBlQu2J0_esxcZt4g )
bin/kafka-storage.sh random-uuid
# 初始化存储目录
bin/kafka-storage.sh format -t 76ubojBlQu2J0_esxcZt4g -c config/kraft/server.properties
# 启动kafka
bin/kafka-server-start.sh config/kraft/server.properties
2. 使用Docker启动Kafka
拉取kafka镜像
docker pull apache/kafka:3.9.1
启动kafka容器
docker run -p 9092:9092 apache/kafka:3.9.1
# 后台运行
docker run -d --name kafka apache/kafka:3.9.1
3. 主题Topic
Topic 用于存储事件(Events)
- 事件也称为记录或消息,比如支付交易、手机地理位置更新、运输订单、物联网设备或医疗设备的传感器测量数据等。
- 事件被组织和存储在 Topic 中。
- 简单来说,Topic 类似于文件系统的文件夹,Events 是该文件夹中的文件。
3.1. 使用kafka-topics.sh脚本创建Topic
这里我是在docker容器中执行了。
# 进入Kafka容器
docker exec -it kafka bash
## 进入Kafka目录
cd /opt/kafka
# 执行脚本创建hello主题
bin/kafka-topics.sh --create --topic hello --bootstrap-server localhost:9092
# 查看主题列表
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
# 查看主题详细信息
bin/kafka-topics.sh --describe --topic hello --bootstrap-server localhost:9092
# 删除hello主题
bin/kafka-topics.sh --delete --topic hello --bootstrap-server localhost:9092
3.2. 修改主题
- 修改主题配置(Configs):比如调整 retention.ms(消息保留时间)、segment.bytes(日志分段大小)、cleanup.policy(清理策略)等配置项,可以用 kafka-configs.sh 命令来修改。
- 修改分区数量(Partition Count):可以增加分区数,但不能减少。分区数一旦增加,不能再减少。
- 修改副本因子(Replication Factor):副本因子不能直接通过命令修改,需要通过手动重新分配副本实现,比较复杂,一般用 kafka-reassign-partitions.sh 工具配合 JSON 文件进行副本重分配。
# 修改hello主题的消息保留时间为 1 天
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name hello --alter --add-config retention.ms=86400000
# 给hello主题增加到 3 个分区
bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic hello --partitions 3
3.3. 写入Events
kafka 客户端通过网络与 Kafka Brokers(代理/服务节点)进行通信,可以写(或读)主题 Topic 中的事件 Events。
Kafka Brokers 一旦收到事件 Events,就会将事件 Event 以持久和容错的方式存储起来,可以永久存储。
使用 kafka-console-producer.sh 脚本写入事件:
# 在容器内
bin/kafka-console-producer.sh --topic hello --bootstrap-server localhost:9092
执行命令后,命令行会变成可输入模式。你输入什么就写到 hello 这个 topic:
每输入一行,Kafka 就写一条消息。ctrl + c 退出
3.4. 读取Events
使用 kafka-console-consumer.sh 消费者客户端读取之前写入的事件:
bin/kafka-console-consumer.sh --topic hello --from-beginning --bootstrap-server localhost:9092
–from-beginning: 从头开始读(包括历史所有消息)。
–bootstrap-server localhost:9092:指定 Kafka Broker 地址。
你可以打开另外一个窗口执行消费者,然后在生产者那边发消息,消费者接收消息,进而完成通信。
4. 外部环境连接Kafka
打开idea,安装插件Kafka。输入虚拟机ip 192.168.116.100:9092 发现连接不上,因为我们使用的是默认启动配置。
# 在容器中执行,找到server.properties配置文件
cd /etc/kafka/docker
要把这个配置文件复制到虚拟机本地(自己创建一个目录)中
docker cp bf9006c4b67e:/etc/kafka/docker/server.properties /home/fgh/kafka/docker
vim 打开复制好的配置文件,找到 # Socket Server Settings #那一行。
修改后:
然后通过挂载进行文件映射:
文件映射(Volume Mount):就是把宿主机(你的虚拟机)的某个目录 挂载 到容器的某个目录。
容器里对挂载目录的读写,其实就是对宿主机对应目录的读写。
# 先把刚刚启动的容器删了
docker rm kafka
# 再执行命令
docker run -d --name kafka -p 9092:9092 --volume /home/fgh/kafka/docker:/mnt/shared/config apache/kafka:3.9.1
就可以连接成功了
5. 其他连接工具
CMAK(基于Zookeeper启动的Kafka才能用)
5. SpringBoot集成Kafka
配依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.2.4</version>
</dependency>
application.yml
spring:
kafka:
bootstrap-servers: 192.168.116.100:9092
写个代码测试:
Consumer类
package com.fg.consumer;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
@KafkaListener(topics = "hello-topic", groupId = "hello-group")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
}
Producer类
package com.fg.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class Producer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) {
kafkaTemplate.send("hello-topic", message);
}
}
通过测试类测试:
package com.fg;
import com.fg.producer.Producer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class KafkaTest {
@Autowired
private Producer producer;
@Test
public void test() {
producer.sendMessage("Hello Kafka!");
}
}
6. Kafka的几个概念
Topic:可以理解成一个消息队列的名字,或者是一个分类桶,所有消息都按主题来分类。
Producer:负责发送消息的人。生产者是消息的 发布方,把数据写到某个 Topic 里。
Consumer:负责读消息的人。消费者是消息的 订阅方,从某个 Topic 里读取消息。
Partition:Topic 里的分片。每个Topic 可以分成一个或多个 Partition,当创建 Topic 时,如果不指定数量,默认就是 1 个。
Offset:消息的位移标记。每个分区里的消息都有一个从 0 开始递增的序号,叫做 Offset。消费者消费到哪里,就记到哪里,下次可以从上次的 Offset 继续读。
7. Kafka的使用
7.1. 读取最早的(历史)消息
默认情况下,当启动一个新的消费者组时,它会从每个分区的最新偏移量(即该分区中最后一条消息的下一个位置)开始消费。如果希望从第一条消息开始消费,需要进行消费者配置。
我比较懒,所以统一在 yml 里配置相关。也可自己写代码配置(推荐)。
注意: 如果之前已经用相同的消费者组ID消费过该主题,并且Kafka已经保存了该消费者组的偏移量,那么即使设置了earliest 也不会生效,因为Kafka只会在找不到偏移量时使用这个配置。在这种情况,需要手动设置偏移量或者使用一个新的消费者组ID。
手动重置偏移量:
./kafka-consumer-groups.sh --bootstrap-server <your-kafka-bootstrap-servers>--group <your-consumer-group>--topic <your-topic>--reset-offsets --to-earliest --execute
7.2. 发送Message
// 发送Message对象消息
public void sendMessage2() {
Message<String> message = MessageBuilder.withPayload("Hello Kafka!")
.setHeader(KafkaHeaders.TOPIC, "hello-topic").build();
kafkaTemplate.send(message);
}
// 发送ProducerRecord对象消息
public void sendMessage3() {
// Headers里面可以存放自定义信息(key-value),消费者接收到该消息后,可以拿到里面的信息
RecordHeaders headers = new RecordHeaders();
headers.add("username", "fg".getBytes());
ProducerRecord<String, String> record = new ProducerRecord<>(
"hello-topic", // Topic 名称
0, // 分区 ID(可选)
System.currentTimeMillis(), // 时间戳(可选)
"k1", // 消息 Key
"Hello Kafka!", // 消息 Value(主体内容)
headers // 自定义 Headers
);
kafkaTemplate.send(record);
}
// 发送指定分区的消息
public void sendMessage4() {
kafkaTemplate.send("hello-topic", 0, System.currentTimeMillis(), "k2", "Hello Kafka!");
}
// 发送默认topic消息
public void sendMessage5() {
kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "k3", "Hello Kafka!");
}
需要配置:
7.3. 接收Message
send()
和sendDefault()
方法都返回CompletableFuture<SendResult<K, V>>
。
CompletableFuture 是Java 8中引入的一个类,用于异步编程,它表示一个异步计算的结果,这个特性使得调用者不必等待操作完成就能继续执行其他任务,从而提高了应用程序的响应速度和吞吐量。
因为调用kafkaTemplate.send()
方法发送消息时,Kafka可能需要一些时间来处理该消息(例如:网路延迟、消息序列化、Kafka集群的负载等),如果send()
是同步的,那么发送消息可能会阻塞调用线程,直到消息发送成功或发生错误,这会导致应用程序性能下降,尤其在高并发场景下。
使用CompletableFuture
,send()
方法可以立即返回一个表示异步操作结果的未来对象,而不是等待操作完成,这样调用线程可以继续执行其他任务,而不必等待消息发送完成。当消息发送完成时(无论是成功还是失败),CompletableFuture
会相应地更新其状态,并允许我们通过回调、阻塞等方式来获取操作结果。
- 使用
CompletableFuture.get()
方法同步阻塞等待发送结果
public void sendMessage6() {
CompletableFuture<SendResult<String, String>> completableFuture =
kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "k3", "Hello Kafka!");
try {
// 阻塞等待
SendResult<String, String> result = completableFuture.get();
System.out.println("结果:" + result); // 结果:SendResult [producerRecord=ProducerRecord(topic=default-topic, partition=0, headers=RecordHeaders(headers = [], isReadOnly = true), key=k3, value=Hello Kafka!, timestamp=1752240587138), recordMetadata=default-topic-0@0]
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
- 使用
thenAccept(), thenApply(), thenRun()
等方法来注册回调函数,回调函数将在CompletableFuture
完成时被执行
public void sendMessage7() {
CompletableFuture<SendResult<String, String>> completableFuture =
kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "k3", "Hello Kafka!");
completableFuture
.thenAccept(result -> {
System.out.println("结果:" + result);
})
.exceptionally(ex -> {
System.out.println("异常:" + ex.getMessage());
return null; // 必须返回点啥,哪怕是 null
});
System.out.println("消息发送已提交,主线程继续执行...");
}
thenAccept(Consumer)
:拿到异步结果,执行一些依赖结果的后续操作,不返回新结果。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello");
future.thenAccept(result -> {
System.out.println("结果是: " + result);
});
thenApply(Function)
:拿到异步结果,做一些处理,返回新的结果(可变换)。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello");
CompletableFuture<String> newFuture = future.thenApply(result -> {
return result.toUpperCase(); // 转成大写
});
newFuture.thenAccept(System.out::println); // 输出 HELLO
thenRun(Runable)
:前面的异步结果执行完成后,执行一个无参、无返回值的操作。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello");
future.thenRun(() -> {
System.out.println("前面的任务执行完了,我就跑!");
});
whenComplete(BiConsumer)
:无论成功还是失败,都会执行。可以同时拿到结果和异常(一个成功一个 null,或者相反)。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (true) throw new RuntimeException("出错啦");
return "hello";
});
future.whenComplete((result, ex) -> {
if (ex != null) {
System.out.println("发生异常: " + ex.getMessage());
} else {
System.out.println("执行结果: " + result);
}
});
exceptionally(Function<Throwable, T>)
:只在出现异常时执行,用于返回一个默认值,避免整个链挂掉。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (true) throw new RuntimeException("炸了");
return "hello";
}).exceptionally(ex -> {
System.out.println("捕获到异常: " + ex.getMessage());
return "default";
});
future.thenAccept(System.out::println); // 输出 default
7.4. 发送对象消息
在 Kafka 中,消息本质是字节流,要发对象的话需要:
把对象 序列化(一般转成 JSON) → 作为消息的 value 发送出去。
配置:
用的JSON序列化,所以要加这个依赖才可将Java对象转成JSON。
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
public void sendMessage8() {
User user = User.builder().id(1).username("fg").age(23).build();
// 分区可为 null,由 Kafka 分配)
CompletableFuture<SendResult<String, Object>> completableFuture =
kafkaTemplate2.sendDefault(null, System.currentTimeMillis(), "k3", user);
// 注册回调 - 成功时执行
completableFuture.thenAccept(result -> {
System.out.println("发送成功: " + result);
});
// 注册回调 - 异常时执行
completableFuture.exceptionally(ex -> {
System.out.println("发送失败: " + ex.getMessage());
return null;
});
System.out.println("已异步发送对象,主线程继续执行...");
}
7.5. Replica副本
Replica:副本,为实现备份功能,保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 Kafka 仍然能够继续工作,Kafka 提供了副本机制,一个 topic 的每个分区都有 1 个或多个副本。
- Leader Replica:每个分区多个副本中的“主”副本,生产者发送数据以及消费者消费数据,都是来自 leader 副本。
- Folower Replica:每个分区多个副本中的“从”副本,实时从 leader 副本中同步数据,保持和 leader 副本数据的同步,leader 副本发生故障时,某个 follower 副本会成为新的 leader。
设置副本个数不能为0,也不能大于节点个数,否则将不能创建Topic。
Kafka 的核心数据单元是分区(partition),每个分区会被复制多份(称为“分区副本”),分布在不同的 Broker 节点上,以实现冗余和高可用。每个分区在任何时刻只有一个“Leader”副本负责处理所有读写请求,而1其他“Follower”副本则持续从 Leader 拉去数据以保持同步。如果 Leader 副本所在的 Broker 发生故障,Kafka 会自动从同步状态良好(在ISR中)的 Follower 副本里选举出一个新的 Leader 继续提供服务,确保数据安全和服务连续性。所谓“主题副本”指的就是一个主题下所有分区副本的总和,而“节点副本”则是指存储在某个特定 Broker 上的所有分区副本的集合。
方式一:通过命令行在创建topic时指定分区和副本
./kafka-topic.sh --create --topic myTopic --partitions 3 --replication-factor 1 --bootstrap-server 192.168.116.100:9092
方式二:通过代码指定分区和副本
package com.fg.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class KafkaConfig {
// 创建一个topic,名称为heTopic,分区数为5,副本数为1
@Bean
public NewTopic initialTopic() {
return new NewTopic("heTopic", 3, (short) 1);
}
// 如果要修改分区数,只需修改配置重启项目即可,修改分区数并不会导致数据丢失,但是分区数只能增大不能减少
@Bean
public NewTopic updateTopic() {
return new NewTopic("heTopic", 5, (short) 1);
}
}
7.6. 生产分区策略
生产者写入消息到topic,Kafka 将依据不同的策略将数据分配到不同的分区中。
- 默认分配策略:BuiltlnPartitioner
- 有 key:
partition = Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions
- 没有 key:使用 Sticky Partitioner,它不是单纯随机或轮询,而是会在一段时间或一个 batch 内,把所有消息发到同一个分区,直到 batch 满或时间到,再换一个分区。
- 有 key:
- 轮询分配策略:RoundRobinPartition,和默认的 Sticky 不一样,它直接轮询所有可用分区。只要没有 key,消息就按轮询顺序依次分到各分区。
- 自定义分配策略:实现接口
org.apache.kafka.clients.producer.Partitioner
,重写 partition 方法。
public void sendMessage9() {
// 有key:相同key保证落到同一个分区
kafkaTemplate.send("heTopic", "key1", "Hello, with key!");
// 无key:Sticky分区,同一批尽量stick到同一个分区
for (int i = 0; i < 3; i++) {
kafkaTemplate.send("heTopic", "Hello, no key!");
}
}
如果是无key,使用轮询策略:
写一个自定义策略:
package com.fg.config;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.utils.Utils;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class CustomPartitioner implements Partitioner {
private final AtomicInteger counter = new AtomicInteger(0);
/**
* @param topic 消息主题
* @param key 消息的key
* @param keyBytes 序列化后的key
* @param value 消息的value
* @param valueBytes 序列化后的value
* @param cluster 集群元数据,包含分区元数据信息(有多少个分区,leader 状态等等)
* @return
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
int numPartitions = cluster.partitionCountForTopic(topic);
if (keyBytes != null) {
// 有key,走默认的hash分配
return (Math.abs(Utils.murmur2(keyBytes))) % numPartitions;
}
// 没有key,轮询分配
return counter.getAndIncrement() % numPartitions;
}
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public void close() {
}
}
配置:
测试:
public void sendMessage10() {
// RoundRobin分区,不同key尽量均匀分配到不同分区
for (int i = 0; i < 10; i++) {
try {
kafkaTemplate.send("heTopic", "Hello RoundRobin " + i).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}
7.7. 生产消息拦截
生产者发送消息的流程:
拦截器:实现ProducerInterceptor<K, V>
接口,对即将发送的 ProducerRecord 做修改或过滤。
package com.fg.config;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
/**
* 自定义 Kafka 生产者拦截器:
* 主要用于在消息发送前后做一些额外处理,比如:
* - 给消息做统一标记
* - 统计成功/失败
* - 打印日志
* - 丢弃不合法消息等
*/
public class CustomProducerInterceptor implements ProducerInterceptor<String, Object> {
/**
* 发送消息前会调用,可以在这里对消息进行修改或过滤,如果返回null,则消息会被丢弃
*
* @param record
* @return
*/
@Override
public ProducerRecord<String, Object> onSend(ProducerRecord<String, Object> record) {
// 1.获取原始消息内容
Object originalValue = record.value();
// 2.在消息尾部追加标记
String newValue = originalValue + " | intercepted";
// 3.返回新的消息
return new ProducerRecord<>(
record.topic(), // 保持原 topic
record.partition(), // 保持原分区(也可以自定义)
record.timestamp(), // 保持原时间戳
record.key(), // 保持原 key
newValue, // 替换后的 value
record.headers() // 保持原 header
);
}
/**
* 消息被Broker成功接收或失败后会调用,可以用于统计消息发送成功率、记录日志等
*
* @param recordMetadata
* @param e
*/
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
// 消息发送成功,打印分区信息
System.out.println("消息发送成功,分区:" + recordMetadata.partition());
} else {
// 消息发送失败,打印错误信息
System.out.println("消息发送失败,错误信息:" + e.getMessage());
}
}
/**
* Producer 关闭时会调用,用于清理资源
* 如果没有需要释放的资源可以留空
*/
@Override
public void close() {
}
/**
* 获取生产者的配置,可以在这里做初始化操作
*
* @param configs
*/
@Override
public void configure(Map<String, ?> configs) {
}
}
7.8. 接收消息内容
在 Spring Kafka 中,
@Payload
用于绑定 Kafka 消息体(就是生产者的 value)。
@Header
用于绑定 Kafka 消息头(比如 topic、key、partition、timestamp、自定义 header)。
ConsumerRecord<String, String> record
接收消息所有内容。
public void sendMessage11() {
kafkaTemplate.send(
MessageBuilder.withPayload("这是消息内容")
.setHeader(KafkaHeaders.TOPIC, "heTopic")
.setHeader(KafkaHeaders.KEY, "key2")
.setHeader("customHeader", "自定义头信息")
.build()
);
}
@KafkaListener(topics = "heTopic", groupId = "he-group")
public void receiveMessage(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.RECEIVED_KEY) String key,
@Header("customHeader") String customHeader,
ConsumerRecord<String, String> record
) {
System.out.println("消息体: " + message); // 消息体: "这是消息内容 | intercepted"
System.out.println("来自 topic: " + topic); // 来自 topic: heTopic
System.out.println("分区: " + partition); // 分区: 1
System.out.println("key: " + key); // key: key2
System.out.println("自定义 header: " + customHeader); // 自定义 header: 自定义头信息
System.out.println("ConsumerRecord: " + record); // ConsumerRecord: ConsumerRecord(topic = heTopic, partition = 1, leaderEpoch = 0, offset = 5, CreateTime = 1752309403290, serialized key size = 4, serialized value size = 34, headers = RecordHeaders(headers = [RecordHeader(key = customHeader, value = [-24, -121, -86, -27, -82, -102, -28, -71, -119, -27, -92, -76, -28, -65, -95, -26, -127, -81]), RecordHeader(key = spring_json_header_types, value = [123, 34, 99, 117, 115, 116, 111, 109, 72, 101, 97, 100, 101, 114, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125]), RecordHeader(key = __TypeId__, value = [106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103])], isReadOnly = false), key = key2, value = "这是消息内容 | intercepted")
}
发送消息对象
配置:
public void sendMessage12() {
User user = User.builder().id(1).username("fg").age(23).build();
kafkaTemplate2.send(
MessageBuilder.withPayload(user)
.setHeader(KafkaHeaders.TOPIC, "heTopic")
.build()
);
}
@KafkaListener(topics = "heTopic", groupId = "user-group")
public void receiveMessage(
@Payload User user,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
ConsumerRecord<String, String> record
) {
System.out.println("收到User对象: " + user + "来自Topic: " + topic + ", 分区: " + partition); // 收到User对象: User(id=1, username=fg, age=23)来自Topic: heTopic, 分区: 1
}
Acknowledgment: 是 Spring Kafka 提供的一个接口,用于手动提交 Kafka 消费者的 offset。
应用场景:
- 对消息要严格保证幂等性或一致性必须业务成功后再提交offset)。
- 想实现失败重试(不提交offset,下次还会消费到同一条消息)。
默认情况下,如果 ack-mode 是record
、batch
或time
,Spring Kafka 会自动提交 offset。如果你想自己控制什么时候提交(比如处理完业务再提交,或出现异常不提交),就用 Acknowledgment.acknowledge()
,不ack.acknowledge()
就相当于不提交,这条消息会在下一轮重新被消费。
设置手动提交:
@KafkaListener(topics = "heTopic", groupId = "user-group")
public void receiveMessage(
@Payload User user,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
ConsumerRecord<String, String> record,
Acknowledgment ack
) {
try {
log.info("处理业务逻辑...");
System.out.println("收到User对象: " + user + "来自Topic: " + topic + ", 分区: " + partition);
// 手动提交offset
ack.acknowledge();
log.info("已手动提交offset");
} catch (Exception e) {
log.error("处理失败,不提交offset,稍后会重试", e);
}
}
7.9. 指定消费
在 Spring Kafka 里,如果你想精准指定 topic、partition、offset 来消费,可以用@KafkaListener
的 topicPartitions 属性,而不是简单写 topics。
可用来在调试或回放数据时,从某个确定的 offset 重放消费。
配置项 | 说明 |
---|---|
topic |
要监听的 topic |
partitions |
要监听的分区(从 0 开始) |
partitionOffsets |
指定从哪个分区的哪个 offset 开始 |
initialOffset |
指定初始 offset(如果比当前最小 offset 还小,会从最小的可用 offset 开始)。initialOffset 只在 新的 groupId 或者这个分区之前没有提交 offset 时生效。如果 Kafka 里已经有保存的 offset,还是会从保存的 offset 开始消费。 |
@KafkaListener(
topicPartitions = {
@TopicPartition(
topic = "heTopic",
partitions = {"0", "1", "2"}, // 指定分区
partitionOffsets = {
@PartitionOffset(partition = "2", initialOffset = "15")
}
)
},
groupId = "he-group"
)
7.10. 批量消费
Kafka 消息是批量拉取的,但 Spring Kafka 默认是单条处理即使一次 poll 拉了 N 条,也一条一条执行 @KafkaListener)。
需设置:
public void sendMessage13() {
for (int i = 0; i < 5; i++) {
User user = new User(i, "fg-" + i, 20 + i);
kafkaTemplate2.send("heTopic", user);
}
}
@KafkaListener(topics = "heTopic", groupId = "user-group")
public void receiveMessage(List<User> users) {
users.forEach(u -> {
log.info("收到消息:{}", u);
});
}
7.11. 消费消息拦截
消费拦截器是在 Kafka 消费者端,消息真正被业务代码处理前的“拦截器”或“切面”,可以进行日志记录、消息过滤、消息修改、性能监控、安全校验等。
package com.fg.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
@Slf4j
public class CustomConsumerInterceptor implements ConsumerInterceptor<String, Object> {
/**
* 消费消息之前调用,可以在这里对消息进行处理、过滤、修改等操作
*
* @param records 本次批量消费的所有消息
* @return 返回要传递给业务代码的消息集合
*/
@Override
public ConsumerRecords<String, Object> onConsume(ConsumerRecords<String, Object> records) {
log.info("[拦截器] 拦截到批量消息,记录数: {}", records.count());
for (TopicPartition partition : records.partitions()) {
records.records(partition).forEach(record -> {
System.out.println("[拦截器] 拦截到消息: " + record.key() + " -> " + record.value());
});
}
// 不修改直接返回原消息
return records;
}
/**
* 当消费者提交offset时调用
*
* @param offsets 提交的offset信息
*/
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
/**
* 关闭拦截器时调用
*/
@Override
public void close() {
}
/**
* 配置拦截器时调用,传入消费者配置参数
*
* @param configs 配置参数
*/
@Override
public void configure(Map<String, ?> configs) {
}
}
7.12. 消息转发
消息转发就是应用 A 从 TopicA 接收到消息,经过处理后转发到 TopicB,再由应用 B 监听接收该消息,即一个应用处理完后将该消息转发至其他应用处理。
public void sendMessage14() {
User user = User.builder().id(1).username("fg").age(23).build();
kafkaTemplate2.send("sourceTopic", user);
}
public class Consumer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@KafkaListener(topics = "sourceTopic", groupId = "forward-group")
public void listenAndForward(@Payload User user) {
log.info("收到[原topic]消息:{}", user);
// 模拟业务
user.setUsername(user.getUsername() + "_processed");
// 转发到新topic
kafkaTemplate.send("targetTopic", user);
log.info("[转发]消息已转发到 targetTopic:{}", user);
}
@KafkaListener(topics = "targetTopic", groupId = "forward-group")
public void listenForward(@Payload User user) {
log.info("收到转发后的消息:{}", user);
}
}
7.13. 消费分区策略
Kafka 的 Topic 是由多个分区(Partition)组成的,分区是 Kafka 并行度和扩展性的核心。
- 生产者可以把消息写到指定分区或按分区器分配。
- 消费者以消费者组为单位来消费分区:一个分区在同一时刻只会被一个消费者消费。
- Kafka Broker 会自动把分区平均分配给组内消费者,叫分区再平衡。
策略 | 类名 | 说明 |
---|---|---|
RangeAssignor | org.apache.kafka.clients.consumer.RangeAssignor |
默认策略:按分区范围顺序分配,Kafka 会把分区按照顺序分配给消费者,尽量让每个消费者拿到连续分区。适合单 Topic,分区数较多且消费者数较少,顺序性好。 |
RoundRobinAssignor | org.apache.kafka.clients.consumer.RoundRobinAssignor |
轮询分配,尽量让分区均匀分布到所有消费者。 |
StickyAssignor | org.apache.kafka.clients.consumer.StickyAssignor |
粘性分配:在尽量均衡分配的同时,最大化保持前后分配不变,减少 Rebalance 的波动。 |
CooperativeStickyAssignor | org.apache.kafka.clients.consumer.CooperativeStickyAssignor |
Cooperative Sticky:和 Sticky 类似,但支持 渐进式 Rebalance,避免全量重分配,更平滑。 |
自定义分配器 | 实现 org.apache.kafka.clients.consumer.ConsumerPartitionAssignor |
可以按自定义逻辑分配(比如按用户ID、地区、权限等做定制)。 |
想用什么策略在 yml 配置即可。
我这里写个自定义分配器策略:
package com.fg.config;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import java.nio.ByteBuffer;
import java.util.*;
public class CustomPartitionAssignor implements ConsumerPartitionAssignor {
/**
* 返回自定义策略名称
* Kafka会根据这个名字找到你的策略
*/
@Override
public String name() {
return "simple-custom";
}
/**
* 可选: 返回用户自定义元数据,通常用于给 assign 传递额外信息
* 这里直接返回 null 就行,没用到
*/
@Override
public ByteBuffer subscriptionUserData(Set<String> topics) {
return null;
}
/**
* 核心方法: 分配逻辑
* cluster: 当前集群元数据信息
* subscriptions: 所有消费者的订阅信息
*/
@Override
public GroupAssignment assign(Cluster cluster, GroupSubscription subscriptions) {
// 结果Map: key是消费者ID, value是分配给它的分区
Map<String, Assignment> assignmentMap = new HashMap<>();
// 所有消费者
List<String> consumers = new ArrayList<>(subscriptions.groupSubscription().keySet());
Collections.sort(consumers);
// 所有分区
List<TopicPartition> partitions = new ArrayList<>();
for (Subscription sub : subscriptions.groupSubscription().values()) {
for (String topic : sub.topics()) {
cluster.partitionsForTopic(topic).forEach(info ->
partitions.add(new TopicPartition(topic, info.partition())));
}
}
// 简单轮询分配
for (int i = 0; i < partitions.size(); i++) {
String consumer = consumers.get(i % consumers.size());
assignmentMap.computeIfAbsent(consumer, k -> new Assignment(new ArrayList<>()))
.partitions().add(partitions.get(i));
}
System.out.println("【分配结果】" + assignmentMap);
return new GroupAssignment(assignmentMap);
}
/**
* 分配完成后执行,可用于打印分配结果、记录日志等
*/
@Override
public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
System.out.println("【onAssignment】" + metadata.memberId() + " 分配到: " + assignment.partitions());
}
}
7.14. 消息、数据的存储
Kafka 的存储就是以分区为单位,分段文件顺序写,索引文件快速查找,快照 & 元数据保证一致性,保留策略保证高效回收,整个机制天然支持海量消息的高效写读。
Kafka 的所有事件(消息、数据)都存储在/tmp/kafka-logs
目录中,可通过log.dirs=/tmp/kafka-logs
配置。
Kafka 的所有事件(消息、数据)都是以日志文件的方式来保存。
Kafka 一般都是海量的消息数据,为了避免日志文件过大,日志文件被存放在多个日志目录下,日志目录的命名规则为:<topic_name>-<partition_id>
。
每次消费一个消息并且提交以后,会保存当前消费到的最近一个 offset。这个进度就是 offset:它表示当前分区里“已经消费到哪里了”。
- 自动提交(
enable.auto.commit=true
) - 手动提交(显式调用 commitSync 或 commitAsync)
每次提交时,消费者会把【消费到的最新 offset】写到一个特殊的内置 topic:__consumer_offsets
。
__consumer_offsets
是 Kafka 内置的系统 Topic,专门用于存储是所有 consumer group 的 offset 元数据,默认有 50 个分区。
offset.metadata.max.retention.ms=604800000 # 默认保留 7 天
offsets.topic.partitions=50 # 默认分 50 个分区
分区规则: Kafka 根据 consumer group id 做哈希运算,确定写到 __consumer_offsets 的哪个分区。
offsetPartition = Math.abs(groupId.hashCode()) % offsetsTopicPartitionsCount
分区哈希保证了:
- 相同的 group id 的 offset 元数据始终写到同一个分区。
- 多分区提高并行度,多个 consumer group 提交 offset 时不会互相阻塞。
7.15. Offset
生产者Offset
生产者发送一条消息到 Kafka 的 broker 的某个 topic 下某个 partition 中。
Kafka 内部会为每条消息分配一个唯一的 offset,该 offset 就是该消息在 partition 中的位置。
消费者Offset
消费者 offset 是消费者需要知道自己已经读取到哪个位置了,接下来从那个位置开始继续读取消息。
每个消费者组(Consumer Group)中的消费者都会独立地维护自己的 offset,当消费者从某个 partition 读取消息时,它会记录当前读取到的 offset,这样即使消费者崩溃或重启,它也可以从上次读取的位置继续读取,而不会重复读取或遗漏消息。(消费者offset需要消费消息并提交(ack)后才记录)
Offset的生命周期
- 生产者(Producer) 生产消息时,Kafka Broker 会把消息追加到分区末尾,分配下一个 Offset。
- 消费者(Consumer) 消费消息时,会记录自己消费到哪里了,就是 Offset。
- Kafka 不会自动删除已消费的消息,而是基于保留策略(时间/大小/日志压缩)来删除。
8. Kafka集群
在 Kafka 集群中,副本的个数大于 0 且小于等于 Broker 数,Producer 把消息发送到某个 topic 的分区(Partition),每个分区都有一个主副本(Leader)和多个从副本(Follower),Producer 只写 Leader,Leader 接收后同步给 Follower 保证数据可靠性;Consumer 只从分区的 Leader 拉取消息读取,Broker 之间通过副本同步实现高可用,当 Leader 挂掉时,Follower 会被选举成新的 Leader,整个过程保证消息不丢失、不重复且高可用。
8.1. 基于Zookeeper的集群搭建
我用的镜像版本新的,默认只能用 Kraft 搭建,所以这里就不用docker了。
启动三个Kafka来配置吧。
准备好这几个。
然后启动zookeeper,接着启动三个Kafka。连接之后就可以看到三个 brokers 了。
然后现在去 SpringBoot 配置连接:
8.2. ISR(In-Sync Replicas)副本
指的是“和 Leader 保持同步的副本集合”。
它包含:当前分区的 Leader 副本和所有跟 Leader 同步进度在容忍范围内的 Follower 副本。
工作流程:
- 写请求先写到 Leader 副本。
- Follower 副本从 Leader 拉取数据进行同步(这是“拉”而不是 Leader 主动推送)。
- 由于网络和拉取的频率限制,Follower 副本的最新数据可能比 Leader 少一点,但 Kafka 允许这个差距存在(可配置)。
踢出机制:
如果某个 Follower 副本因为宕机、网络异常等原因长时间没能跟上 Leader(落后太多),Kafka 会把它从 ISR 中踢出去。
replica.lag.time.max.ms
(默认 30 秒)
如果某个 Follower 副本在这段时间内没有成功追上 Leader 的最新消息(即同步延迟超过 30 秒),则该副本会被移出 ISR。replica.lag.max.messages
表示 Follower 落后 Leader 多少条消息时被剔除 ISR。但该参数在新版 Kafka 中已被废弃,不建议使用。
被踢出去后,它暂时失去选举 Leader 的资格。
当它重新追上 Leader(Catch-up),会被重新加入 ISR。
ISR 就是 Kafka 用来保证高可用和数据一致性的核心机制之一,确保即使单个 Broker 挂了,也能用“与 Leader 保持同步的其他副本”来继续提供服务。
8.3. LEO(Log End Offset)
LEO 是指一个分区日志中,当前已经写入的最后一条消息的下一个可用的 offset,也就是日志的末尾位置。它标记了这个副本中下一条消息将写入的位置。
作用:
- Leader 副本的 LEO 代表该分区日志的最新写入进度。
- Follower 副本的 LEO 表示该副本当前同步到的最新消息位置。
与 ISR 关系: Follower 是否属于 ISR,通常会根据它的 LEO 与 Leader 的 LEO 之间的差值来判断。如果差距太大(超出阈值),就会被剔除 ISR。
举例:假设 Leader 的 LEO 是 1000,说明它已经写入了 0~999 条消息。一个 Follower 的 LEO 是 995,说明它同步到了第 995 条消息,还落后 Leader 5 条。如果阈值设置是最大允许落后 10 条消息,那么这个 Follower 仍在 ISR 中。
LEO 就是 Kafka 日志的“尾巴”,用来标识数据的写入和同步进度,是 Leader 和 Follower 协调复制和判断同步状态的重要指标。
8.4. HW(High Watermark)
HW 表示该分区日志中所有 ISR 副本都已同步完成的最大 offset。换句话说,HW 是所有副本中“最落后”副本的 LEO 的最小值。
作用:
- 消费者只能读取不超过 HW 的消息,即只读取所有同步副本都确认写入的消息。
- 保证消费者不会读取到尚未被所有同步副本确认的数据,避免“脏数据”或消息丢失。
- 当 Leader 收到写入请求后,会等待消息被大多数 ISR 副本确认(写入),然后更新 HW。
与 LEO 的区别:
- LEO 表示“日志末尾”位置(最新消息的偏移量),可能包含未被所有副本确认的消息。
- HW 是“安全可读点”,代表所有同步副本都确认的消息最大偏移量。
举例:假设 Leader 的 LEO 是 1000,表示它已经写入了消息到 offset 999。ISR 副本中最慢的副本 LEO 是 995,那么 HW 就是 995。消费者只能安全读取到 offset 995(含)之前的消息。
8.5. 基于KRaft的集群搭建
KRaft 是 Kafka 从传统依赖 ZooKeeper 管理元数据(Broker 状态、Topic 配置、分区信息、副本状态等)转向内置 Raft 协议的集群元数据管理方式。
在 KRaft 模式下,Kafka 节点既承担 Broker 的数据读写存储角色,又可以作为 Controller 节点来选举、复制和维护集群的元数据,依靠 Raft 协议取代 ZooKeeper 实现一致性。
Controller
- KRaft模式下,Controller 负责整个集群的元数据管理(原本由Zookeeper承担)。
- 在一个时刻,只会有一个 Controller Leader,它负责处理:
- 元数据更新(比如 Topic 创建、分区分配、ISR 维护)
- 与其他 Broker 的协调
- Raft 日志写入和复制
- 其他 Controller 节点是 Follower,跟随 Leader 同步 Raft 元数据日志。
Broker
- Broker 就是实际存储数据、处理生产者写入和消费者拉取的服务节点。
- 在 KRaft 模式中,一个节点既可以是 Broker,又可以是 Controller(通过 process.roles 配置角色)。
Raft 选举节点(元数据节点)
- 所有配置为 controller 角色的节点共同组成一个 Controller Quorum(Raft 选举组)。
- 通过 Raft 协议在 Controller 节点间选举 Leader,保证元数据一致性。
- 这部分就是代替 ZooKeeper 的核心:Raft 协议的日志复制 + 一致性选举。
上面是一种方式,我现在用docker部署。
先配置 docker-compose.yml
version: "3"
services:
kafka1:
image: apache/kafka:3.9.1
container_name: kafka1
hostname: kafka1
ports:
- "9092:9092"
volumes:
- kafka1-data:/var/lib/kafka/data
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.116.100:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 3
kafka2:
image: apache/kafka:3.9.1
container_name: kafka2
hostname: kafka2
ports:
- "9093:9092"
volumes:
- kafka2-data:/var/lib/kafka/data
environment:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.116.100:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 3
kafka3:
image: apache/kafka:3.9.1
container_name: kafka3
hostname: kafka3
ports:
- "9094:9092"
volumes:
- kafka3-data:/var/lib/kafka/data
environment:
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.116.100:9094
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 3
volumes:
kafka1-data:
kafka2-data:
kafka3-data:
然后再执行命令生成集群UUID,每个节点用同一个UUID进行format。
docker run --rm apache/kafka:3.9.1 bash -c "/opt/kafka/bin/kafka-storage.sh random-uuid"
# 第一个节点
docker run -it --rm \
-v kafka1-data:/var/lib/kafka/data \
apache/kafka:3.9.1 \
kafka-storage.sh format --cluster-id e4WdbK7oQq-FwMgLSTootQ --config /etc/kafka/kafka.properties --ignore-formatted
# 第二个节点
docker run -it --rm \
-v kafka2-data:/var/lib/kafka/data \
apache/kafka:3.9.1 \
bash -c "/opt/kafka/bin/kafka-storage.sh format --cluster-id e4WdbK7oQq-FwMgLSTootQ --config /opt/kafka/config/kraft/server.properties --ignore-formatted"
# 第三个节点
docker run -it --rm \
-v kafka3-data:/var/lib/kafka/data \
apache/kafka:3.9.1 \
bash -c "/opt/kafka/bin/kafka-storage.sh format --cluster-id e4WdbK7oQq-FwMgLSTootQ --config /opt/kafka/config/kraft/server.properties --ignore-formatted"
最后执行启动
docker-compose up -d
测试: