Kafka

发布于:2025-07-14 ⋅ 阅读:(15) ⋅ 点赞:(0)

1. Kafka启动方式

我下载的是kafka_2.13-3.9.1版本,官网下载

1.1. 自带的zookeeper(也可独立安装)

# 先确认在 kafka 目录下
cd /path/to/kafka_2.13-3.9.1
# 后台启动zookeeper
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
# 后台启动Kafka
nohup bin/kafka-server-start.sh config/server.properties &

关闭:

# 关闭 Zookeeper
bin/zookeeper-server-stop.sh config/zookeeper.properties
# 关闭 Kafka
bin/kafka-server-stop.sh config/server.properties

1.2. jdk安装

运行Kafka需要安装jdk的,去官网下载一个linux版本的jdk17,解压到/usr/local目录下,然后vim /etc/profile进行 环境变量修改:

在这里插入图片描述

然后source /etc/profile重新加载。

1.2. KRaft启动

从 Kafka 2.8 开始,Kafka 引入了 KRaft 模式(Kafka Raft Metadata mode),就是 用 Raft 协议自己管理元数据,不再需要 ZooKeeper。

# 生成集群uuid(这个命令会打印出一个随机 UUID,比如76ubojBlQu2J0_esxcZt4g )
bin/kafka-storage.sh random-uuid
# 初始化存储目录
bin/kafka-storage.sh format -t 76ubojBlQu2J0_esxcZt4g -c config/kraft/server.properties
# 启动kafka
bin/kafka-server-start.sh config/kraft/server.properties

2. 使用Docker启动Kafka

拉取kafka镜像

docker pull apache/kafka:3.9.1

启动kafka容器

docker run -p 9092:9092 apache/kafka:3.9.1
# 后台运行
docker run -d --name kafka apache/kafka:3.9.1

3. 主题Topic

Topic 用于存储事件(Events)

  • 事件也称为记录或消息,比如支付交易、手机地理位置更新、运输订单、物联网设备或医疗设备的传感器测量数据等。
  • 事件被组织和存储在 Topic 中。
  • 简单来说,Topic 类似于文件系统的文件夹,Events 是该文件夹中的文件。

3.1. 使用kafka-topics.sh脚本创建Topic

这里我是在docker容器中执行了。

# 进入Kafka容器
docker exec -it kafka bash
## 进入Kafka目录
cd /opt/kafka
# 执行脚本创建hello主题
bin/kafka-topics.sh --create --topic hello --bootstrap-server localhost:9092

在这里插入图片描述

# 查看主题列表
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
# 查看主题详细信息
bin/kafka-topics.sh --describe --topic hello --bootstrap-server localhost:9092
# 删除hello主题
bin/kafka-topics.sh --delete --topic hello --bootstrap-server localhost:9092

在这里插入图片描述
在这里插入图片描述

3.2. 修改主题

  • 修改主题配置(Configs):比如调整 retention.ms(消息保留时间)、segment.bytes(日志分段大小)、cleanup.policy(清理策略)等配置项,可以用 kafka-configs.sh 命令来修改。
  • 修改分区数量(Partition Count):可以增加分区数,但不能减少。分区数一旦增加,不能再减少。
  • 修改副本因子(Replication Factor):副本因子不能直接通过命令修改,需要通过手动重新分配副本实现,比较复杂,一般用 kafka-reassign-partitions.sh 工具配合 JSON 文件进行副本重分配。
# 修改hello主题的消息保留时间为 1 天
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name hello --alter --add-config retention.ms=86400000
# 给hello主题增加到 3 个分区
bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic hello --partitions 3

3.3. 写入Events

kafka 客户端通过网络与 Kafka Brokers(代理/服务节点)进行通信,可以写(或读)主题 Topic 中的事件 Events。

在这里插入图片描述
Kafka Brokers 一旦收到事件 Events,就会将事件 Event 以持久和容错的方式存储起来,可以永久存储。

使用 kafka-console-producer.sh 脚本写入事件:

# 在容器内
bin/kafka-console-producer.sh --topic hello --bootstrap-server localhost:9092

执行命令后,命令行会变成可输入模式。你输入什么就写到 hello 这个 topic:

在这里插入图片描述

每输入一行,Kafka 就写一条消息。ctrl + c 退出

3.4. 读取Events

使用 kafka-console-consumer.sh 消费者客户端读取之前写入的事件:

bin/kafka-console-consumer.sh --topic hello --from-beginning --bootstrap-server localhost:9092

–from-beginning: 从头开始读(包括历史所有消息)。
–bootstrap-server localhost:9092:指定 Kafka Broker 地址。

你可以打开另外一个窗口执行消费者,然后在生产者那边发消息,消费者接收消息,进而完成通信。

在这里插入图片描述

4. 外部环境连接Kafka

打开idea,安装插件Kafka。输入虚拟机ip 192.168.116.100:9092 发现连接不上,因为我们使用的是默认启动配置。

kafka镜像使用文档

# 在容器中执行,找到server.properties配置文件
cd /etc/kafka/docker

要把这个配置文件复制到虚拟机本地(自己创建一个目录)中

docker cp bf9006c4b67e:/etc/kafka/docker/server.properties /home/fgh/kafka/docker

vim 打开复制好的配置文件,找到 # Socket Server Settings #那一行。

在这里插入图片描述

修改后:

在这里插入图片描述

然后通过挂载进行文件映射:
文件映射(Volume Mount):就是把宿主机(你的虚拟机)的某个目录 挂载 到容器的某个目录。
容器里对挂载目录的读写,其实就是对宿主机对应目录的读写。

# 先把刚刚启动的容器删了
docker rm kafka
# 再执行命令
docker run -d --name kafka -p 9092:9092 --volume /home/fgh/kafka/docker:/mnt/shared/config apache/kafka:3.9.1

就可以连接成功了

在这里插入图片描述

5. 其他连接工具

Offset Explorer

CMAK(基于Zookeeper启动的Kafka才能用)
在这里插入图片描述

EFAK

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

5. SpringBoot集成Kafka

配依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.2.4</version>
</dependency>

application.yml

spring:
  kafka:
    bootstrap-servers: 192.168.116.100:9092

写个代码测试:

Consumer类

package com.fg.consumer;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

    @KafkaListener(topics = "hello-topic", groupId = "hello-group")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

Producer类

package com.fg.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class Producer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        kafkaTemplate.send("hello-topic", message);
    }
}

通过测试类测试:

package com.fg;

import com.fg.producer.Producer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class KafkaTest {

    @Autowired
    private Producer producer;

    @Test
    public void test() {
        producer.sendMessage("Hello Kafka!");
    }
}

在这里插入图片描述

6. Kafka的几个概念

在这里插入图片描述

Topic:可以理解成一个消息队列的名字,或者是一个分类桶,所有消息都按主题来分类。

Producer:负责发送消息的人。生产者是消息的 发布方,把数据写到某个 Topic 里。

Consumer:负责读消息的人。消费者是消息的 订阅方,从某个 Topic 里读取消息。

Partition:Topic 里的分片。每个Topic 可以分成一个或多个 Partition,当创建 Topic 时,如果不指定数量,默认就是 1 个。

Offset:消息的位移标记。每个分区里的消息都有一个从 0 开始递增的序号,叫做 Offset。消费者消费到哪里,就记到哪里,下次可以从上次的 Offset 继续读。

7. Kafka的使用

7.1. 读取最早的(历史)消息

默认情况下,当启动一个新的消费者组时,它会从每个分区的最新偏移量(即该分区中最后一条消息的下一个位置)开始消费。如果希望从第一条消息开始消费,需要进行消费者配置。

我比较懒,所以统一在 yml 里配置相关。也可自己写代码配置(推荐)。

在这里插入图片描述

注意: 如果之前已经用相同的消费者组ID消费过该主题,并且Kafka已经保存了该消费者组的偏移量,那么即使设置了earliest 也不会生效,因为Kafka只会在找不到偏移量时使用这个配置。在这种情况,需要手动设置偏移量或者使用一个新的消费者组ID

手动重置偏移量:

./kafka-consumer-groups.sh --bootstrap-server <your-kafka-bootstrap-servers>--group <your-consumer-group>--topic <your-topic>--reset-offsets --to-earliest --execute

在这里插入图片描述

7.2. 发送Message

在这里插入图片描述

// 发送Message对象消息
public void sendMessage2() {
    Message<String> message = MessageBuilder.withPayload("Hello Kafka!")
            .setHeader(KafkaHeaders.TOPIC, "hello-topic").build();
    kafkaTemplate.send(message);
}
// 发送ProducerRecord对象消息
public void sendMessage3() {
    // Headers里面可以存放自定义信息(key-value),消费者接收到该消息后,可以拿到里面的信息
    RecordHeaders headers = new RecordHeaders();
    headers.add("username", "fg".getBytes());
    ProducerRecord<String, String> record = new ProducerRecord<>(
            "hello-topic",    // Topic 名称
            0,                     // 分区 ID(可选)
            System.currentTimeMillis(),  // 时间戳(可选)
            "k1",                  // 消息 Key
            "Hello Kafka!",        // 消息 Value(主体内容)
            headers                // 自定义 Headers
    );
    kafkaTemplate.send(record);
}
// 发送指定分区的消息
public void sendMessage4() {
    kafkaTemplate.send("hello-topic", 0, System.currentTimeMillis(), "k2", "Hello Kafka!");
}
// 发送默认topic消息
public void sendMessage5() {
    kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "k3", "Hello Kafka!");
}

需要配置:

在这里插入图片描述

7.3. 接收Message

send()sendDefault()方法都返回CompletableFuture<SendResult<K, V>>

CompletableFuture 是Java 8中引入的一个类,用于异步编程,它表示一个异步计算的结果,这个特性使得调用者不必等待操作完成就能继续执行其他任务,从而提高了应用程序的响应速度和吞吐量。

因为调用kafkaTemplate.send()方法发送消息时,Kafka可能需要一些时间来处理该消息(例如:网路延迟、消息序列化、Kafka集群的负载等),如果send()是同步的,那么发送消息可能会阻塞调用线程,直到消息发送成功或发生错误,这会导致应用程序性能下降,尤其在高并发场景下。

使用CompletableFuturesend()方法可以立即返回一个表示异步操作结果的未来对象,而不是等待操作完成,这样调用线程可以继续执行其他任务,而不必等待消息发送完成。当消息发送完成时(无论是成功还是失败),CompletableFuture会相应地更新其状态,并允许我们通过回调、阻塞等方式来获取操作结果。

  • 使用CompletableFuture.get()方法同步阻塞等待发送结果
public void sendMessage6() {
    CompletableFuture<SendResult<String, String>> completableFuture =
            kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "k3", "Hello Kafka!");
    try {
        // 阻塞等待
        SendResult<String, String> result = completableFuture.get();
        System.out.println("结果:" + result);  // 结果:SendResult [producerRecord=ProducerRecord(topic=default-topic, partition=0, headers=RecordHeaders(headers = [], isReadOnly = true), key=k3, value=Hello Kafka!, timestamp=1752240587138), recordMetadata=default-topic-0@0]
    } catch (ExecutionException | InterruptedException e) {
        throw new RuntimeException(e);
    }
}
  • 使用thenAccept(), thenApply(), thenRun()等方法来注册回调函数,回调函数将在CompletableFuture完成时被执行
public void sendMessage7() {
    CompletableFuture<SendResult<String, String>> completableFuture =
            kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "k3", "Hello Kafka!");
    completableFuture
            .thenAccept(result -> {
                System.out.println("结果:" + result);
            })
            .exceptionally(ex -> {
                System.out.println("异常:" + ex.getMessage());
                return null;  // 必须返回点啥,哪怕是 null
            });
    System.out.println("消息发送已提交,主线程继续执行...");
}
  1. thenAccept(Consumer):拿到异步结果,执行一些依赖结果的后续操作,不返回新结果。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello");
future.thenAccept(result -> {
    System.out.println("结果是: " + result);
});
  1. thenApply(Function):拿到异步结果,做一些处理,返回新的结果(可变换)。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello");
CompletableFuture<String> newFuture = future.thenApply(result -> {
    return result.toUpperCase(); // 转成大写
});
newFuture.thenAccept(System.out::println); // 输出 HELLO
  1. thenRun(Runable):前面的异步结果执行完成后,执行一个无参、无返回值的操作。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello");
future.thenRun(() -> {
    System.out.println("前面的任务执行完了,我就跑!");
});
  1. whenComplete(BiConsumer):无论成功还是失败,都会执行。可以同时拿到结果和异常(一个成功一个 null,或者相反)。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (true) throw new RuntimeException("出错啦");
    return "hello";
});
future.whenComplete((result, ex) -> {
    if (ex != null) {
        System.out.println("发生异常: " + ex.getMessage());
    } else {
        System.out.println("执行结果: " + result);
    }
});
  1. exceptionally(Function<Throwable, T>):只在出现异常时执行,用于返回一个默认值,避免整个链挂掉。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (true) throw new RuntimeException("炸了");
    return "hello";
}).exceptionally(ex -> {
    System.out.println("捕获到异常: " + ex.getMessage());
    return "default";
});

future.thenAccept(System.out::println); // 输出 default

7.4. 发送对象消息

在 Kafka 中,消息本质是字节流,要发对象的话需要:
把对象 序列化(一般转成 JSON) → 作为消息的 value 发送出去

配置:

在这里插入图片描述

用的JSON序列化,所以要加这个依赖才可将Java对象转成JSON。

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

在这里插入图片描述

public void sendMessage8() {
    User user = User.builder().id(1).username("fg").age(23).build();
    // 分区可为 null,由 Kafka 分配)
    CompletableFuture<SendResult<String, Object>> completableFuture =
            kafkaTemplate2.sendDefault(null, System.currentTimeMillis(), "k3", user);
    // 注册回调 - 成功时执行
    completableFuture.thenAccept(result -> {
        System.out.println("发送成功: " + result);
    });
    // 注册回调 - 异常时执行
    completableFuture.exceptionally(ex -> {
        System.out.println("发送失败: " + ex.getMessage());
        return null;
    });
    System.out.println("已异步发送对象,主线程继续执行...");
}

7.5. Replica副本

Replica:副本,为实现备份功能,保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 Kafka 仍然能够继续工作,Kafka 提供了副本机制,一个 topic 的每个分区都有 1 个或多个副本。

  • Leader Replica:每个分区多个副本中的“主”副本,生产者发送数据以及消费者消费数据,都是来自 leader 副本。
  • Folower Replica:每个分区多个副本中的“从”副本,实时从 leader 副本中同步数据,保持和 leader 副本数据的同步,leader 副本发生故障时,某个 follower 副本会成为新的 leader。

设置副本个数不能为0,也不能大于节点个数,否则将不能创建Topic

在这里插入图片描述

Kafka 的核心数据单元是分区(partition),每个分区会被复制多份(称为“分区副本”),分布在不同的 Broker 节点上,以实现冗余和高可用。每个分区在任何时刻只有一个“Leader”副本负责处理所有读写请求,而1其他“Follower”副本则持续从 Leader 拉去数据以保持同步。如果 Leader 副本所在的 Broker 发生故障,Kafka 会自动从同步状态良好(在ISR中)的 Follower 副本里选举出一个新的 Leader 继续提供服务,确保数据安全和服务连续性。所谓“主题副本”指的就是一个主题下所有分区副本的总和,而“节点副本”则是指存储在某个特定 Broker 上的所有分区副本的集合。

方式一:通过命令行在创建topic时指定分区和副本

./kafka-topic.sh --create --topic myTopic --partitions 3 --replication-factor 1 --bootstrap-server 192.168.116.100:9092

方式二:通过代码指定分区和副本

package com.fg.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaConfig {

    // 创建一个topic,名称为heTopic,分区数为5,副本数为1
    @Bean
    public NewTopic initialTopic() {
        return new NewTopic("heTopic", 3, (short) 1);
    }

    // 如果要修改分区数,只需修改配置重启项目即可,修改分区数并不会导致数据丢失,但是分区数只能增大不能减少
    @Bean
    public NewTopic updateTopic() {
        return new NewTopic("heTopic", 5, (short) 1);
    }
}

在这里插入图片描述

7.6. 生产分区策略

生产者写入消息到topic,Kafka 将依据不同的策略将数据分配到不同的分区中。

  • 默认分配策略:BuiltlnPartitioner
    • 有 key:partition = Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions
    • 没有 key:使用 Sticky Partitioner,它不是单纯随机或轮询,而是会在一段时间或一个 batch 内,把所有消息发到同一个分区,直到 batch 满或时间到,再换一个分区。
  • 轮询分配策略:RoundRobinPartition,和默认的 Sticky 不一样,它直接轮询所有可用分区。只要没有 key,消息就按轮询顺序依次分到各分区。
  • 自定义分配策略:实现接口org.apache.kafka.clients.producer.Partitioner,重写 partition 方法。
public void sendMessage9() {
    // 有key:相同key保证落到同一个分区
    kafkaTemplate.send("heTopic", "key1", "Hello, with key!");
    // 无key:Sticky分区,同一批尽量stick到同一个分区
    for (int i = 0; i < 3; i++) {
        kafkaTemplate.send("heTopic", "Hello, no key!");
    }
}

如果是无key,使用轮询策略:

在这里插入图片描述

写一个自定义策略:

package com.fg.config;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.utils.Utils;

import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

public class CustomPartitioner implements Partitioner {

    private final AtomicInteger counter = new AtomicInteger(0);

    /**
     * @param topic      消息主题
     * @param key        消息的key
     * @param keyBytes   序列化后的key
     * @param value      消息的value
     * @param valueBytes 序列化后的value
     * @param cluster    集群元数据,包含分区元数据信息(有多少个分区,leader 状态等等)
     * @return
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        int numPartitions = cluster.partitionCountForTopic(topic);
        if (keyBytes != null) {
            // 有key,走默认的hash分配
            return (Math.abs(Utils.murmur2(keyBytes))) % numPartitions;
        }
        // 没有key,轮询分配
        return counter.getAndIncrement() % numPartitions;
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public void close() {
    }
}

配置:

在这里插入图片描述

测试:

public void sendMessage10() {
    // RoundRobin分区,不同key尽量均匀分配到不同分区
    for (int i = 0; i < 10; i++) {
        try {
            kafkaTemplate.send("heTopic", "Hello RoundRobin " + i).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }
}

在这里插入图片描述

7.7. 生产消息拦截

生产者发送消息的流程:

在这里插入图片描述

拦截器:实现ProducerInterceptor<K, V>接口,对即将发送的 ProducerRecord 做修改或过滤。

package com.fg.config;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

/**
 * 自定义 Kafka 生产者拦截器:
 * 主要用于在消息发送前后做一些额外处理,比如:
 * - 给消息做统一标记
 * - 统计成功/失败
 * - 打印日志
 * - 丢弃不合法消息等
 */
public class CustomProducerInterceptor implements ProducerInterceptor<String, Object> {

    /**
     * 发送消息前会调用,可以在这里对消息进行修改或过滤,如果返回null,则消息会被丢弃
     *
     * @param record
     * @return
     */
    @Override
    public ProducerRecord<String, Object> onSend(ProducerRecord<String, Object> record) {
        // 1.获取原始消息内容
        Object originalValue = record.value();
        // 2.在消息尾部追加标记
        String newValue = originalValue + " | intercepted";
        // 3.返回新的消息
        return new ProducerRecord<>(
                record.topic(),        // 保持原 topic
                record.partition(),    // 保持原分区(也可以自定义)
                record.timestamp(),    // 保持原时间戳
                record.key(),          // 保持原 key
                newValue,              // 替换后的 value
                record.headers()       // 保持原 header
        );
    }

    /**
     * 消息被Broker成功接收或失败后会调用,可以用于统计消息发送成功率、记录日志等
     *
     * @param recordMetadata
     * @param e
     */
    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        if (e == null) {
            // 消息发送成功,打印分区信息
            System.out.println("消息发送成功,分区:" + recordMetadata.partition());
        } else {
            // 消息发送失败,打印错误信息
            System.out.println("消息发送失败,错误信息:" + e.getMessage());
        }
    }

    /**
     * Producer 关闭时会调用,用于清理资源
     * 如果没有需要释放的资源可以留空
     */
    @Override
    public void close() {
    }

    /**
     * 获取生产者的配置,可以在这里做初始化操作
     *
     * @param configs
     */
    @Override
    public void configure(Map<String, ?> configs) {
    }
}

在这里插入图片描述

7.8. 接收消息内容

在 Spring Kafka 中,
@Payload用于绑定 Kafka 消息体(就是生产者的 value)。
@Header用于绑定 Kafka 消息头(比如 topic、key、partition、timestamp、自定义 header)。
ConsumerRecord<String, String> record 接收消息所有内容。

public void sendMessage11() {
    kafkaTemplate.send(
            MessageBuilder.withPayload("这是消息内容")
                    .setHeader(KafkaHeaders.TOPIC, "heTopic")
                    .setHeader(KafkaHeaders.KEY, "key2")
                    .setHeader("customHeader", "自定义头信息")
                    .build()
    );
}
@KafkaListener(topics = "heTopic", groupId = "he-group")
public void receiveMessage(
        @Payload String message,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
        @Header(KafkaHeaders.RECEIVED_KEY) String key,
        @Header("customHeader") String customHeader,
        ConsumerRecord<String, String> record
) {
    System.out.println("消息体: " + message);  // 消息体: "这是消息内容 | intercepted"
    System.out.println("来自 topic: " + topic);  // 来自 topic: heTopic
    System.out.println("分区: " + partition);  // 分区: 1
    System.out.println("key: " + key);  // key: key2
    System.out.println("自定义 header: " + customHeader);  // 自定义 header: 自定义头信息
    System.out.println("ConsumerRecord: " + record); // ConsumerRecord: ConsumerRecord(topic = heTopic, partition = 1, leaderEpoch = 0, offset = 5, CreateTime = 1752309403290, serialized key size = 4, serialized value size = 34, headers = RecordHeaders(headers = [RecordHeader(key = customHeader, value = [-24, -121, -86, -27, -82, -102, -28, -71, -119, -27, -92, -76, -28, -65, -95, -26, -127, -81]), RecordHeader(key = spring_json_header_types, value = [123, 34, 99, 117, 115, 116, 111, 109, 72, 101, 97, 100, 101, 114, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125]), RecordHeader(key = __TypeId__, value = [106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103])], isReadOnly = false), key = key2, value = "这是消息内容 | intercepted")
}

发送消息对象

配置:

在这里插入图片描述

public void sendMessage12() {
    User user = User.builder().id(1).username("fg").age(23).build();
    kafkaTemplate2.send(
            MessageBuilder.withPayload(user)
                    .setHeader(KafkaHeaders.TOPIC, "heTopic")
                    .build()
    );
}
@KafkaListener(topics = "heTopic", groupId = "user-group")
public void receiveMessage(
        @Payload User user,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
        ConsumerRecord<String, String> record
) {
    System.out.println("收到User对象: " + user + "来自Topic: " + topic + ", 分区: " + partition);  // 收到User对象: User(id=1, username=fg, age=23)来自Topic: heTopic, 分区: 1
}

Acknowledgment: 是 Spring Kafka 提供的一个接口,用于手动提交 Kafka 消费者的 offset。

应用场景:

  • 对消息要严格保证幂等性或一致性必须业务成功后再提交offset)。
  • 想实现失败重试(不提交offset,下次还会消费到同一条消息)。

默认情况下,如果 ack-mode 是recordbatchtime,Spring Kafka 会自动提交 offset。如果你想自己控制什么时候提交(比如处理完业务再提交,或出现异常不提交),就用 Acknowledgment.acknowledge(),不ack.acknowledge()就相当于不提交,这条消息会在下一轮重新被消费。

设置手动提交:

在这里插入图片描述

@KafkaListener(topics = "heTopic", groupId = "user-group")
public void receiveMessage(
        @Payload User user,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
        ConsumerRecord<String, String> record,
        Acknowledgment ack
) {
    try {
        log.info("处理业务逻辑...");
        System.out.println("收到User对象: " + user + "来自Topic: " + topic + ", 分区: " + partition);
        // 手动提交offset
        ack.acknowledge();
        log.info("已手动提交offset");
    } catch (Exception e) {
        log.error("处理失败,不提交offset,稍后会重试", e);
    }
}

7.9. 指定消费

在 Spring Kafka 里,如果你想精准指定 topic、partition、offset 来消费,可以用@KafkaListener的 topicPartitions 属性,而不是简单写 topics。

可用来在调试或回放数据时,从某个确定的 offset 重放消费。

配置项 说明
topic 要监听的 topic
partitions 要监听的分区(从 0 开始)
partitionOffsets 指定从哪个分区的哪个 offset 开始
initialOffset 指定初始 offset(如果比当前最小 offset 还小,会从最小的可用 offset 开始)。initialOffset 只在 新的 groupId 或者这个分区之前没有提交 offset 时生效。如果 Kafka 里已经有保存的 offset,还是会从保存的 offset 开始消费。
@KafkaListener(
  topicPartitions = {
    @TopicPartition(
      topic = "heTopic",
      partitions = {"0", "1", "2"}, // 指定分区
      partitionOffsets = {
        @PartitionOffset(partition = "2", initialOffset = "15")
      }
    )
  },
  groupId = "he-group"
)

7.10. 批量消费

Kafka 消息是批量拉取的,但 Spring Kafka 默认是单条处理即使一次 poll 拉了 N 条,也一条一条执行 @KafkaListener)。

需设置:

在这里插入图片描述

在这里插入图片描述

public void sendMessage13() {
    for (int i = 0; i < 5; i++) {
        User user = new User(i, "fg-" + i, 20 + i);
        kafkaTemplate2.send("heTopic", user);
    }
}
@KafkaListener(topics = "heTopic", groupId = "user-group")
public void receiveMessage(List<User> users) {
    users.forEach(u -> {
        log.info("收到消息:{}", u);
    });
}

7.11. 消费消息拦截

消费拦截器是在 Kafka 消费者端,消息真正被业务代码处理前的“拦截器”或“切面”,可以进行日志记录、消息过滤、消息修改、性能监控、安全校验等。

package com.fg.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.Map;

@Slf4j
public class CustomConsumerInterceptor implements ConsumerInterceptor<String, Object> {

    /**
     * 消费消息之前调用,可以在这里对消息进行处理、过滤、修改等操作
     *
     * @param records 本次批量消费的所有消息
     * @return 返回要传递给业务代码的消息集合
     */
    @Override
    public ConsumerRecords<String, Object> onConsume(ConsumerRecords<String, Object> records) {
        log.info("[拦截器] 拦截到批量消息,记录数: {}", records.count());
        for (TopicPartition partition : records.partitions()) {
            records.records(partition).forEach(record -> {
                System.out.println("[拦截器] 拦截到消息: " + record.key() + " -> " + record.value());
            });
        }
        // 不修改直接返回原消息
        return records;
    }

    /**
     * 当消费者提交offset时调用
     *
     * @param offsets 提交的offset信息
     */
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {

    }

    /**
     * 关闭拦截器时调用
     */
    @Override
    public void close() {
    }

    /**
     * 配置拦截器时调用,传入消费者配置参数
     *
     * @param configs 配置参数
     */
    @Override
    public void configure(Map<String, ?> configs) {
    }
}

在这里插入图片描述

7.12. 消息转发

消息转发就是应用 A 从 TopicA 接收到消息,经过处理后转发到 TopicB,再由应用 B 监听接收该消息,即一个应用处理完后将该消息转发至其他应用处理。

在这里插入图片描述

public void sendMessage14() {
    User user = User.builder().id(1).username("fg").age(23).build();
    kafkaTemplate2.send("sourceTopic", user);
}
public class Consumer {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    @KafkaListener(topics = "sourceTopic", groupId = "forward-group")
    public void listenAndForward(@Payload User user) {
        log.info("收到[原topic]消息:{}", user);
        // 模拟业务
        user.setUsername(user.getUsername() + "_processed");
        // 转发到新topic
        kafkaTemplate.send("targetTopic", user);
        log.info("[转发]消息已转发到 targetTopic:{}", user);
    }

    @KafkaListener(topics = "targetTopic", groupId = "forward-group")
    public void listenForward(@Payload User user) {
        log.info("收到转发后的消息:{}", user);
    }
}

7.13. 消费分区策略

Kafka 的 Topic 是由多个分区(Partition)组成的,分区是 Kafka 并行度和扩展性的核心。

  • 生产者可以把消息写到指定分区或按分区器分配。
  • 消费者以消费者组为单位来消费分区:一个分区在同一时刻只会被一个消费者消费。
  • Kafka Broker 会自动把分区平均分配给组内消费者,叫分区再平衡。

在这里插入图片描述

策略 类名 说明
RangeAssignor org.apache.kafka.clients.consumer.RangeAssignor 默认策略:按分区范围顺序分配,Kafka 会把分区按照顺序分配给消费者,尽量让每个消费者拿到连续分区。适合单 Topic,分区数较多且消费者数较少,顺序性好。
RoundRobinAssignor org.apache.kafka.clients.consumer.RoundRobinAssignor 轮询分配,尽量让分区均匀分布到所有消费者。
StickyAssignor org.apache.kafka.clients.consumer.StickyAssignor 粘性分配:在尽量均衡分配的同时,最大化保持前后分配不变,减少 Rebalance 的波动。
CooperativeStickyAssignor org.apache.kafka.clients.consumer.CooperativeStickyAssignor Cooperative Sticky:和 Sticky 类似,但支持 渐进式 Rebalance,避免全量重分配,更平滑。
自定义分配器 实现 org.apache.kafka.clients.consumer.ConsumerPartitionAssignor 可以按自定义逻辑分配(比如按用户ID、地区、权限等做定制)。

想用什么策略在 yml 配置即可。

我这里写个自定义分配器策略:

package com.fg.config;

import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;

import java.nio.ByteBuffer;
import java.util.*;

public class CustomPartitionAssignor implements ConsumerPartitionAssignor {

    /**
     * 返回自定义策略名称
     * Kafka会根据这个名字找到你的策略
     */
    @Override
    public String name() {
        return "simple-custom";
    }

    /**
     * 可选: 返回用户自定义元数据,通常用于给 assign 传递额外信息
     * 这里直接返回 null 就行,没用到
     */
    @Override
    public ByteBuffer subscriptionUserData(Set<String> topics) {
        return null;
    }

    /**
     * 核心方法: 分配逻辑
     * cluster: 当前集群元数据信息
     * subscriptions: 所有消费者的订阅信息
     */
    @Override
    public GroupAssignment assign(Cluster cluster, GroupSubscription subscriptions) {
        // 结果Map: key是消费者ID, value是分配给它的分区
        Map<String, Assignment> assignmentMap = new HashMap<>();
        // 所有消费者
        List<String> consumers = new ArrayList<>(subscriptions.groupSubscription().keySet());
        Collections.sort(consumers);
        // 所有分区
        List<TopicPartition> partitions = new ArrayList<>();
        for (Subscription sub : subscriptions.groupSubscription().values()) {
            for (String topic : sub.topics()) {
                cluster.partitionsForTopic(topic).forEach(info ->
                        partitions.add(new TopicPartition(topic, info.partition())));
            }
        }

        // 简单轮询分配
        for (int i = 0; i < partitions.size(); i++) {
            String consumer = consumers.get(i % consumers.size());
            assignmentMap.computeIfAbsent(consumer, k -> new Assignment(new ArrayList<>()))
                    .partitions().add(partitions.get(i));
        }

        System.out.println("【分配结果】" + assignmentMap);
        return new GroupAssignment(assignmentMap);
    }

    /**
     * 分配完成后执行,可用于打印分配结果、记录日志等
     */
    @Override
    public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
        System.out.println("【onAssignment】" + metadata.memberId() + " 分配到: " + assignment.partitions());
    }
}

在这里插入图片描述

7.14. 消息、数据的存储

Kafka 的存储就是以分区为单位,分段文件顺序写,索引文件快速查找,快照 & 元数据保证一致性,保留策略保证高效回收,整个机制天然支持海量消息的高效写读。

Kafka 的所有事件(消息、数据)都存储在/tmp/kafka-logs目录中,可通过log.dirs=/tmp/kafka-logs配置。

Kafka 的所有事件(消息、数据)都是以日志文件的方式来保存。

Kafka 一般都是海量的消息数据,为了避免日志文件过大,日志文件被存放在多个日志目录下,日志目录的命名规则为:<topic_name>-<partition_id>

在这里插入图片描述

每次消费一个消息并且提交以后,会保存当前消费到的最近一个 offset。这个进度就是 offset:它表示当前分区里“已经消费到哪里了”。

  • 自动提交(enable.auto.commit=true
  • 手动提交(显式调用 commitSync 或 commitAsync)

每次提交时,消费者会把【消费到的最新 offset】写到一个特殊的内置 topic:__consumer_offsets

__consumer_offsets 是 Kafka 内置的系统 Topic,专门用于存储是所有 consumer group 的 offset 元数据,默认有 50 个分区。

offset.metadata.max.retention.ms=604800000  # 默认保留 7 天
offsets.topic.partitions=50                 # 默认分 50 个分区

分区规则: Kafka 根据 consumer group id 做哈希运算,确定写到 __consumer_offsets 的哪个分区。

offsetPartition = Math.abs(groupId.hashCode()) % offsetsTopicPartitionsCount

分区哈希保证了:

  • 相同的 group id 的 offset 元数据始终写到同一个分区。
  • 多分区提高并行度,多个 consumer group 提交 offset 时不会互相阻塞。

7.15. Offset

生产者Offset

生产者发送一条消息到 Kafka 的 broker 的某个 topic 下某个 partition 中。

Kafka 内部会为每条消息分配一个唯一的 offset,该 offset 就是该消息在 partition 中的位置。

在这里插入图片描述

消费者Offset

消费者 offset 是消费者需要知道自己已经读取到哪个位置了,接下来从那个位置开始继续读取消息。

每个消费者组(Consumer Group)中的消费者都会独立地维护自己的 offset,当消费者从某个 partition 读取消息时,它会记录当前读取到的 offset,这样即使消费者崩溃或重启,它也可以从上次读取的位置继续读取,而不会重复读取或遗漏消息。(消费者offset需要消费消息并提交(ack)后才记录)

Offset的生命周期
  • 生产者(Producer) 生产消息时,Kafka Broker 会把消息追加到分区末尾,分配下一个 Offset。
  • 消费者(Consumer) 消费消息时,会记录自己消费到哪里了,就是 Offset。
  • Kafka 不会自动删除已消费的消息,而是基于保留策略(时间/大小/日志压缩)来删除。

8. Kafka集群

在 Kafka 集群中,副本的个数大于 0 且小于等于 Broker 数,Producer 把消息发送到某个 topic 的分区(Partition),每个分区都有一个主副本(Leader)和多个从副本(Follower),Producer 只写 Leader,Leader 接收后同步给 Follower 保证数据可靠性;Consumer 只从分区的 Leader 拉取消息读取,Broker 之间通过副本同步实现高可用,当 Leader 挂掉时,Follower 会被选举成新的 Leader,整个过程保证消息不丢失、不重复且高可用。

在这里插入图片描述

8.1. 基于Zookeeper的集群搭建

我用的镜像版本新的,默认只能用 Kraft 搭建,所以这里就不用docker了。

启动三个Kafka来配置吧。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

准备好这几个。

在这里插入图片描述

然后启动zookeeper,接着启动三个Kafka。连接之后就可以看到三个 brokers 了。

在这里插入图片描述

然后现在去 SpringBoot 配置连接:

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

8.2. ISR(In-Sync Replicas)副本

指的是“和 Leader 保持同步的副本集合”。
它包含:当前分区的 Leader 副本和所有跟 Leader 同步进度在容忍范围内的 Follower 副本。

工作流程:

  • 写请求先写到 Leader 副本。
  • Follower 副本从 Leader 拉取数据进行同步(这是“拉”而不是 Leader 主动推送)。
  • 由于网络和拉取的频率限制,Follower 副本的最新数据可能比 Leader 少一点,但 Kafka 允许这个差距存在(可配置)。

踢出机制:

  • 如果某个 Follower 副本因为宕机、网络异常等原因长时间没能跟上 Leader(落后太多),Kafka 会把它从 ISR 中踢出去。

    • replica.lag.time.max.ms (默认 30 秒)
      如果某个 Follower 副本在这段时间内没有成功追上 Leader 的最新消息(即同步延迟超过 30 秒),则该副本会被移出 ISR。
    • replica.lag.max.messages
      表示 Follower 落后 Leader 多少条消息时被剔除 ISR。但该参数在新版 Kafka 中已被废弃,不建议使用。
  • 被踢出去后,它暂时失去选举 Leader 的资格。

  • 当它重新追上 Leader(Catch-up),会被重新加入 ISR。

ISR 就是 Kafka 用来保证高可用和数据一致性的核心机制之一,确保即使单个 Broker 挂了,也能用“与 Leader 保持同步的其他副本”来继续提供服务。

8.3. LEO(Log End Offset)

LEO 是指一个分区日志中,当前已经写入的最后一条消息的下一个可用的 offset,也就是日志的末尾位置。它标记了这个副本中下一条消息将写入的位置。

作用:

  • Leader 副本的 LEO 代表该分区日志的最新写入进度。
  • Follower 副本的 LEO 表示该副本当前同步到的最新消息位置。

与 ISR 关系: Follower 是否属于 ISR,通常会根据它的 LEO 与 Leader 的 LEO 之间的差值来判断。如果差距太大(超出阈值),就会被剔除 ISR。

举例:假设 Leader 的 LEO 是 1000,说明它已经写入了 0~999 条消息。一个 Follower 的 LEO 是 995,说明它同步到了第 995 条消息,还落后 Leader 5 条。如果阈值设置是最大允许落后 10 条消息,那么这个 Follower 仍在 ISR 中。

LEO 就是 Kafka 日志的“尾巴”,用来标识数据的写入和同步进度,是 Leader 和 Follower 协调复制和判断同步状态的重要指标。

8.4. HW(High Watermark)

HW 表示该分区日志中所有 ISR 副本都已同步完成的最大 offset。换句话说,HW 是所有副本中“最落后”副本的 LEO 的最小值。

在这里插入图片描述

作用:

  • 消费者只能读取不超过 HW 的消息,即只读取所有同步副本都确认写入的消息。
  • 保证消费者不会读取到尚未被所有同步副本确认的数据,避免“脏数据”或消息丢失。
  • 当 Leader 收到写入请求后,会等待消息被大多数 ISR 副本确认(写入),然后更新 HW。

与 LEO 的区别:

  • LEO 表示“日志末尾”位置(最新消息的偏移量),可能包含未被所有副本确认的消息。
  • HW 是“安全可读点”,代表所有同步副本都确认的消息最大偏移量。

举例:假设 Leader 的 LEO 是 1000,表示它已经写入了消息到 offset 999。ISR 副本中最慢的副本 LEO 是 995,那么 HW 就是 995。消费者只能安全读取到 offset 995(含)之前的消息。

8.5. 基于KRaft的集群搭建

KRaft 是 Kafka 从传统依赖 ZooKeeper 管理元数据(Broker 状态、Topic 配置、分区信息、副本状态等)转向内置 Raft 协议的集群元数据管理方式。

在 KRaft 模式下,Kafka 节点既承担 Broker 的数据读写存储角色,又可以作为 Controller 节点来选举、复制和维护集群的元数据,依靠 Raft 协议取代 ZooKeeper 实现一致性。

在这里插入图片描述

Controller

  • KRaft模式下,Controller 负责整个集群的元数据管理(原本由Zookeeper承担)。
  • 在一个时刻,只会有一个 Controller Leader,它负责处理:
    • 元数据更新(比如 Topic 创建、分区分配、ISR 维护)
    • 与其他 Broker 的协调
    • Raft 日志写入和复制
  • 其他 Controller 节点是 Follower,跟随 Leader 同步 Raft 元数据日志。

Broker

  • Broker 就是实际存储数据、处理生产者写入和消费者拉取的服务节点。
  • 在 KRaft 模式中,一个节点既可以是 Broker,又可以是 Controller(通过 process.roles 配置角色)。

Raft 选举节点(元数据节点)

  • 所有配置为 controller 角色的节点共同组成一个 Controller Quorum(Raft 选举组)。
  • 通过 Raft 协议在 Controller 节点间选举 Leader,保证元数据一致性。
  • 这部分就是代替 ZooKeeper 的核心:Raft 协议的日志复制 + 一致性选举。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

上面是一种方式,我现在用docker部署。

先配置 docker-compose.yml

version: "3"
services:
  kafka1:
    image: apache/kafka:3.9.1
    container_name: kafka1
    hostname: kafka1
    ports:
      - "9092:9092"
    volumes:
      - kafka1-data:/var/lib/kafka/data
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.116.100:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_NUM_PARTITIONS: 3

  kafka2:
    image: apache/kafka:3.9.1
    container_name: kafka2
    hostname: kafka2
    ports:
      - "9093:9092"
    volumes:
      - kafka2-data:/var/lib/kafka/data
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.116.100:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_NUM_PARTITIONS: 3

  kafka3:
    image: apache/kafka:3.9.1
    container_name: kafka3
    hostname: kafka3
    ports:
      - "9094:9092"
    volumes:
      - kafka3-data:/var/lib/kafka/data
    environment:
      KAFKA_NODE_ID: 3
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.116.100:9094
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_NUM_PARTITIONS: 3

volumes:
  kafka1-data:
  kafka2-data:
  kafka3-data:

然后再执行命令生成集群UUID,每个节点用同一个UUID进行format。

docker run --rm apache/kafka:3.9.1 bash -c "/opt/kafka/bin/kafka-storage.sh random-uuid"
# 第一个节点
docker run -it --rm \
  -v kafka1-data:/var/lib/kafka/data \
  apache/kafka:3.9.1 \
  kafka-storage.sh format --cluster-id e4WdbK7oQq-FwMgLSTootQ --config /etc/kafka/kafka.properties --ignore-formatted
# 第二个节点
docker run -it --rm \
  -v kafka2-data:/var/lib/kafka/data \
  apache/kafka:3.9.1 \
  bash -c "/opt/kafka/bin/kafka-storage.sh format --cluster-id e4WdbK7oQq-FwMgLSTootQ --config /opt/kafka/config/kraft/server.properties --ignore-formatted"
# 第三个节点
docker run -it --rm \
  -v kafka3-data:/var/lib/kafka/data \
  apache/kafka:3.9.1 \
  bash -c "/opt/kafka/bin/kafka-storage.sh format --cluster-id e4WdbK7oQq-FwMgLSTootQ --config /opt/kafka/config/kraft/server.properties --ignore-formatted"

最后执行启动

docker-compose up -d

在这里插入图片描述

在这里插入图片描述

测试:

在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到