Kafka学习记录

发布于:2025-08-10 ⋅ 阅读:(16) ⋅ 点赞:(0)

1.概述以及环境准备

1.1 概述

1.1.1 概念

Apache Kafka是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序。

1.1.2 Kafka版本迭代演进

Kafka前期项目版本似乎有点凌乱,Kafka在1.x之前的版本,是采用4位版本号;比如:0.8.2.2、0.9.0.1、0.10.0.0...等等;

在1.x之后,kafka 采用 Major.Minor.Patch 三位版本号;

Major表示大版本,通常是一些重大改变,因此彼此之间功能可能会不兼容;

Minor表示小版本,通常是一些新功能的增加;

Patch表示修订版,主要为修复一些重点Bug而发布的版本;

比如:Kafka 2.1.3,大版本就是2,小版本是1,Patch版本为3,是为修复Bug发布的第3个版本;

Kafka总共发布了8个大版本,分别是0.7.x、0.8.x、0.9.x、0.10.x、0.11.x、1.x、2.x 3.x 版本,截止目前,最新版本是Kafka 3.7.0,也是最新稳定版本;

1.1.3 Kafka运行环境前置要求

Kafka是由Scala语言编写而成,而Scala运行在Java虚拟机上,并兼容现有的Java程序,因此部署Kakfa的时候,需要先安装JDK(必须安装Java 8+以上的版本)。

1.1.4 启动Kafka

Apache Kafka可以使用ZooKeeper或KRaft启动;但只能使用其中一种方式,不能同时使用;

  • KRaft:是Kafka内置共识机制,用于取代 Apache ZooKeeper,早期kafka的运行依赖于zookeeper,现在kafka3.0之后有了KRaft,没有zookeeper也可以独立运行了;

Kafka启动基于Zookeeper(目前kafka内部已经自带了zookeeper)

1、进入到kafka家目录下的bin目录中运行命令,启动zookeeper./zookeeper-server-start.sh ../config/zookeeper.properties &

2、进入到kafka家目录下的bin目录中运行命令,启动kafka./kafka-server-start.sh ../config/server.properties &

3、进入到kafka家目录下的bin目录中运行命令,关闭Kafka:./kafka-server-stop.sh ../config/server.properties

4、进入到kafka家目录下的bin目录中运行命令,关闭zookeeper: ./zookeeper-server-stop.sh ../config/zookeeper.properties

1.1.5 使用独立的Zookeeper启动Kafka

下载好独立的zookeeper后需要先配置Zookeeper:

进入到zookeeper家目录下的conf目录下,执行命令:cp zoo_sample.cfg  zoo.cfg(zookeeper的配置文件名必须叫zoo.cfg,这里通过样例文件复制一份出来即可)

zoo.cfg 不需要修改,直接使用即可

1、必须先启动Zookeeper

 zookeeper启动默认会占用8080端口,需要其修改配置文件zoo.cfg,在结尾添加如下配置:

admin.serverPort=9089

到zookeeper家目录下的bin目录下运行命令,启动Zookeeper:zkServer.sh start

到zookeeper家目录下的bin目录下运行命令,关闭Zookeeper:zkServer.sh stop

2、进入到kafka家目录下的bin目录中运行命令,启动kafka./kafka-server-start.sh ../config/server.properties &

1.1.6 使用KRaft启动运行Kafka

1、多个kafka节点和单个都需要先,生成Cluster UUID(集群UUID):到kafka的家目录/bin目录下执行命令: ./kafka-storage.sh random-uuid

2、格式化日志目录:到kafka的家目录/bin目录下执行命令:./kafka-storage.sh format -t 刚刚生成的集群UUID -c ../config/kraft/server.properties

3、启动Kafka,到kafka的家目录/bin目录下执行命令:./kafka-server-start.sh ../config/kraft/server.properties &

4、关闭Kafka,到kafka的家目录/bin目录下执行命令:./kafka-server-stop.sh ../config/kraft/server.properties

1.1.7 使用Docker启动运行Kafka

Docker安装:

安装最新版的Docker:

1、yum install yum-utils -y

2、yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo

3、yum install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin -y

查看是否安装成功,查看docker版本:docker --versiondocker version,docker -v


Docker启动:

启动:systemctl start docker 或者 service docker start

停止:systemctl stop docker 或者 service docker stop

重启:systemctl restart docker 或者 service docker restart


检查Docker进程的运行状态:systemctl status docker 或者 service docker status

查看docker进程:ps -ef | grep docker

查看docker系统信息:docker info

查看所有的帮助信息:docker --help

查看某个commond命令的帮助信息:docker commond --help


使用Docker启动kafka

1、拉取Kafka镜像:docker pull apache/kafka:3.7.0

2、启动Kafka容器:docker run -p 9092:9092 apache/kafka:3.7.0

查看已安装的镜像:docker images

删除镜像:docker rmi apache/kafka:3.7.0

2.Kafka操作

2.1 Kafka操作

2.1.1创建主题Topic

在使用Kafka之前,必须先在kafka服务器中创建主题(Topic):

        主题(Topic)类似于文件系统中的文件夹。

        主题(Topic)用于存储事件(Events)。

                事件(Events)也称为记录或消息,比如支付交易、手机地理位置更新、运输订单、物联网设备或医疗设备的传感器测量数据等等都是事件(Events)。

                事件(Events)被组织和存储在主题(Topic)中。

                简单来说,主题(Topic)类似于文件系统中的文件夹,事件(Events)是该文件夹中的文件。

2.1.2 Kafka操作

1、创建kafka中的topic

创建主题使用这个命令:kafka-topics.sh

1、不带任何参数会告知该脚本如何使用:./kafka-topics.sh

2、创建主题:./kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

3、列出所有的主题:./kafka-topics.sh --list --bootstrap-server localhost:9092

4、删除主题:./kafka-topics.sh --delete --topic quickstart-events --bootstrap-server localhost:9092

5、显示主题详细信息:./kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092

6、修改主题信息:./kafka-topics.sh --alter --topic quickstart-events --partitions 5  --bootstrap-server localhost:9092

2、在主题(Topic)中写入一些事件(Events

Kafka客户端通过网络与Kafka Brokers进行通信,可以 读/写 主题Topic中的事件Events;

Kafka Brokers一旦收到事件Event,就会将事件Event以持久和容错的方式存储起来,可以永久地存储;

通过 kafka-console-producer.sh 脚本写入事件Events:

  • 不带任何参数会告知该脚本如何使用:./kafka-console-producer.sh
  • ./kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
  • 每一次换行是一个事件Event
  • 使用Ctrl+C退出,停止发送事件Event到主题Topic

3、从主题Topic中读取事件Events

使用kafka-console-consumer.sh消费者客户端读取之前写入的事件Event

  • 不带任何参数会告知该脚本如何使用:./kafka-console-consumer.sh
  • ./kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
  • --from-beginning 表示从kafka最早的消息开始消费
  • 使用Ctrl+C停止消费者客户端;

事件Events是持久存储在Kafka中的,所以它们可以被任意读取多次;

2.2 在另外一台机器上连接Kafka

1、先在Linux下使用docker启动Kafka容器:docker run -d -p 9092:9092 apache/kafka:3.7.0

2、在windows下安装外部连接工具;

3、使用外部连接工具连接Kafka;

这里使用idea中的kafka插件作为客户端连接kafka服务器: 

如果外部环境连接不上Kafka怎么办?

文件输入:提供一个本地kafka属性配置文件,替换docker容器中的默认配置文件;

0.先随便创建并运行一个kafka容器:docker run -p 9092:9092 apache/kafka:3.7.0

①然后执行这个命令进入到此容器的命令行中:docker exec -it 容器id /bin/bash

②把docker的kafka容器中的server.properties文件复制到linux中:

docker cp 容器id:/etc/kafka/docker/server.properties /opt/kafka/docker

③在Linux下编辑这个配置文件:server.properties

listeners=PLAINTEXT://0.0.0.0:9092(listeners用于指定kafka 监听来自于哪个网卡的请求)

advertised.listeners=PLAINTEXT://192.168.11.128:9092(指定客户端或其他broker连接kafka服务器时,实际使用连接地址,即“对外宣传”的地址)

advertise的含义表示宣称的、公布的,Kafka服务对外开放的IP和端口;

④文件映射:docker run --volume /opt/kafka/docker:/mnt/shared/config -d -p 9092:9092 apache/kafka:3.7.0

其他的Kafka图形界面连接工具:

1、Offset Explorer (以前叫 Kafka Tool),官网:Offset Explorer

        直接进入官网点击下载到window系统中即可连接linux中的kafka:

2、CMAK(以前叫 Kafka Manager) 官网:https://github.com/yahoo/CMAK

        是一个web后台管理系统,可以管理kafka,和kafka一起运行在Linux系统中,项目地址: https://github.com/yahoo/CMAK

        注意该管控台运行需要JDK11版本的支持。

        下载:https://github.com/yahoo/CMAK/releases

        下载下来是一个zip压缩包,直接 unzip解压:unzip cmak-3.0.0.6.zip,解压后即完成了安装;

CMAK客户端需要基于zookeeper的方式启动kafka才可以使用该web管理后台,否则不行;

        1、CMAK配置:

                修改CMAK家目录中的conf目录下的application.conf配置文件:

                kafka-manager.zkhosts="192.168.184.128:2181"

                cmak.zkhosts="127.0.0.1:2181"

        2、CMAK启动:

                切换到CMAK家目录/bin目录下执行命令:./cmak -Dconfig.file=../conf/application.conf -java-home /usr/local/jdk-11.0.22(这里必须用jdk11启动,jdk17是不行的)

                其中-Dconfig.file是指定配置文件,-java-home是指定jdk11所在位置,如果机器上已经是jdk11,则不需要指定;

        3、CMAK访问:

                启动之后CMAK默认端口为9000,在windows系统下访问:http://192.168.11.128:9000/

3、EFAK(以前叫 kafka-eagle) 官网:EFAK

EFAK概述:

        EFAK一款优秀的开源免费的Kafka集群监控工具;(国人开发并开源)

        官网:EFAK    Github:https://github.com/smartloli/EFAK

EFAK下载与安装:

                下载:https://github.com/smartloli/kafka-eagle-bin/archive/v3.0.1.tar.gz

                安装,需要解压两次:

                        1、tar -zxvf kafka-eagle-bin-3.0.1.tar.gz

                        2、cd kafka-eagle-bin-3.0.1

                        3、tar -zxvf efak-web-3.0.1-bin.tar.gz

                        4、cd efak-web-3.0.1

EFAK的配置:

        1、安装MySQL数据库,需要MySQL,并创建数据库ke,数据库创建好即可,其他什么都不用管;

        2、修改配置文件EFAK家目录//conf/system-config.properties

                主要修改Zookeeper配置MySQL数据库配置

                

        3、在/etc/profile文件中配置环境变量KE_HOME,在profile文件的最后添加:

                export KE_HOME=/usr/local/efak-web-3.0.1

                export PATH=$KE_HOME/bin:$PATH

                执行source让环境变量配置生效:source /etc/profile

启动EFAK(也是和kafka一起运行在Linux系统中)

                1、EFAK需要kafka采用zookeeper的方式启动才能使用;

                2、在EFAK安装目录的bin目录下执行这个命令:./ke.sh start 命令使用:ke.sh [start|status|stop|restart|stats]

访问EFAK

        1、在window系统下访问:http://192.168.11.128:8048/

        2、登录账号:admin , 密码:123456

3.Spring Boot集成Kafka开发

3.1 最基础的kafka程序

1、新建项目:新建一个空项目,然后再使用SpringBoot脚手架Spring Initializr在空项目下创建好SpringBoot项目:

下面是kafka的依赖:

  • 这个不是kafka的起步依赖,但是SpringBoot中其实也自动配置好了kafka
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

2、配置SpringBoot配置文件

spring:
  application:
    name: springnboot-01-kafka-base
  #配置afka相关信息
  kafka:
    #kafka的连接地址
    bootstrap-servers: 192.168.184.129:9092
    #生产者配置信息

    #消费者配置信息


3、写代码

编写生产者(写入事件)

package com.example.producer;

import jakarta.annotation.Resource;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class EventProducer {

    //引入spring-kakfa依赖后,SpringBoot就会自动装配KakfaTemplate对象到Ioc容器中
    //KafkaTemplate<K, V>类的泛型分别表示发送消息的key和value的类型
    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;

    public void send(){
        //先使用最基础的send方法发送事件:向某个主题发生某个消息
        kafkaTemplate.send("hello-topic","hello kafka");
    }
}

消费者(读取事件)

package com.example.consumer;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class EventConsumer {
    //采用监听的方式接收事件(消息、数据)
    //默认只会监听最新发到kafka的消息,消费者启动之前发送到kafka的消息是监听不到的
    @KafkaListener(topics = "hello-topic",groupId = "hello-group")
    public void onEvent(String event){
        System.out.println("读取到事件:" + event);
    }
}

主启动类:

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Springnboot01KafkaBaseApplication {

    public static void main(String[] args) {
        SpringApplication.run(Springnboot01KafkaBaseApplication.class, args);
    }

}

 测试类:

package com.example;

import com.example.producer.EventProducer;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class Springnboot01KafkaBaseApplicationTests {
    @Resource
    private EventProducer eventProducer;

    @Test
    void test01() {
        eventProducer.send();
    }

}

先运行主启动类,再运行测试类中的test01方法发生消息,结果为:

3.2 kafka的几个概念

Kafka的几个概念:

1、生产者Producer:

2、消费者Consumer:

3、主题Topic:

4、分区Partition:在Kafka中,每个topic可以有一个或多个partition分区,当创建topic时,如果不指定该topic的partition数量,那么默认就是1个partition。

5、偏移量Offset:offset是标识分区中消息的唯一位置,从0开始,然后顺序往上增长,每条消息进来分区都会对应一个offset

6、默认情况下,当启动一个新的消费者组(编写消费者需要指定一个消费者组)时,它会从每个分区的最新偏移量(即该分区中最后一条消息的下一个位置)开始消费消息。如果希望从第一条消息开始消费,需要在SpringBoot配置文件中将消费者的auto.offset.reset属性设置为earliest:

spring:
  application:
    name: springnboot-01-kafka-base
  #配置afka相关信息
  kafka:
    #kafka的连接地址
    bootstrap-servers: 192.168.184.129:9092
    #生产者配置信息

    #消费者配置信息
    consumer:
      auto-offset-reset: earliest

注意: 如果之前已经用相同的消费者组ID消费过该主题中的消息,这时Kafka就已经记录了该消费者组的偏移量(也就是这个消费者组消费到哪个消息),那么即使设置了auto.offset.reset=earliest,该设置也不会生效,因为Kafka只会在找不到偏移量时使用这个配置。在这种情况下,你需要手动重置偏移量使用一个新的消费者组ID

  • 手动重置偏移量
    • 在kafka家目录/bin下执行下面命令:
      #将偏移量重置到00
      ./kafka-consumer-groups.sh --bootstrap-server <your-kafka-bootstrap-servers> --group <your-consumer-group> --topic <your-topic> --reset-offsets --to-earliest --execute
      
      ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group hello-group-02 --topic hello-topic --reset-offsets --to-earliest --execute
      
      #将偏移量重置到最新的那条消息
      ./kafka-consumer-groups.sh --bootstrap-server <your-kafka-bootstrap-servers> --group <your-consumer-group> --topic <your-topic> --reset-offsets --to-latest --execute
      

3.3 消费者消费消息时的偏移量策略配置

消息消费时偏移量策略的配置:

spring:
  kafka:
    #消费者配置信息
    consumer:
      auto-offset-reset: earliest
取值: earliest 、latest、 none 、exception
earliest 自动将偏移量重置为最早的偏移量,从0开始消费;
latest: 自动将偏移量重置为最新偏移量,从该分区中的最后一条消息的下一个位置开始消费;
none: 如果没有为消费者组找到以前的偏移量(也就是以前消费者组有没有消费过消息),则向消费者抛出异常;
exception: 向消费者抛出异常; spring- kafka 不支持)

3.4 生产者向kafka发送消息的方法

package com.example.producer;

import jakarta.annotation.Resource;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

@Component
public class EventProducer {
    //引入spring-kakfa依赖后,SpringBoot就会自动装配KakfaTemplate对象到Ioc容器中
    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;

    public void send(){
        //先使用最基础的send(String topic, @Nullable V data)方法发送事件:向某个主题发生某个消息
        kafkaTemplate.send("hello-topic","hello kafka");
    }


    //使用send(Message<?> message)方法发送事件:向某个主题发生某个消息
    public void send2(){
        //1.使用MessageBuilder创建消息对象,泛型表示发生的消息是什么类型的
        Message<String> message = MessageBuilder.withPayload("hello kafka") //1.1调用这个方法设置要发送的消息
                .setHeader(KafkaHeaders.TOPIC,"test-topic") //1.2调用这个方法设置消息要发送到test-topic主题
                .build();

        kafkaTemplate.send(message);
    }

    //使用send(ProducerRecord<K, V> record)方法发送消息
    public void send3(){
        //Headers对象里面可以放一些信息,到时候消费者接受到消息后,可以拿到此对象中放的那些信息
        Headers headers = new RecordHeaders();
        headers.add("photo","12345678901".getBytes(StandardCharsets.UTF_8));
        headers.add("name","张三".getBytes(StandardCharsets.UTF_8));

        //1.创建ProducerRecord对象,其泛型分别表示消息的key和value的类型
        //1.1:构造方法ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
        //1.1.1:ProducerRecord类的构造方法的每个参数分别为向哪个主题的哪个分区发送消息,发送消息时的时间,消息的key和value,消息的headers
        ProducerRecord<String,String> record = new ProducerRecord<>("test-topic02",
                0,
                System.currentTimeMillis(),
                "key1",
                "hello kafka",
                headers
                );
        kafkaTemplate.send(record);
    }

    /*
    * 使用send(String topic, Integer partition, Long timestamp, K key, @Nullable V data)方法发送消息
    *   这个send方法的每个参数代表向topic主题的partition分区发送消息,发送消息的时间,消息的key和value
    */
    public void send4(){
        kafkaTemplate.send("test-topic02",0,System.currentTimeMillis(),"key2","hello kafka");
    }

     /*
     *  使用sendDefault(Integer partition, Long timestamp, K key, @Nullable V data)方法发送消息
     *      这个sendDefault方法的每个参数代表向yml配置文件中配置的默认主题的partition分区发送消息,发送消息的时间,消息的key和value
     */
    public void send5(){
        kafkaTemplate.sendDefault(0,System.currentTimeMillis(),"key3","hello kafka");
    }
}

 kafkaTemplate.send(...) 和 kafkaTemplate.sendDefault(...) 方法发送消息的区别?

主要区别是每次发送消息到Kafka时,是否需要指定主题topic;

1、kafkaTemplate.send(...) 方法需要指定要发送消息的目标主题topic

2、kafkaTemplate.sendDefault() 该方法不需要指定要发送消息的目标主题topic

kafkaTemplate.send(...) 方法适用于需要根据业务逻辑或外部输入动态确定消息目标topic的场景;

kafkaTemplate.sendDefault() 方法适用于总是需要将消息发送到特定默认topic的场景;kafkaTemplate.sendDefault() 是一个便捷方法,它使用配置中指定的默认主题topic来发送消息,如果应用中所有消息都发送到同一个主题时采用该方法非常方便,可以减少代码的重复或满足特定的业务需求;

3.5 获取生产者发送消息后的结果

3.4节介绍的所有send()方法和sendDefault()方法都会返回CompletableFuture<SendResult<K, V>>类型的对象。

  • CompletableFuture<T>是Java 8中引入的一个类,用于异步编程,它表示一个异步计算的结果,这个特性使得调用者不必等待操作完成就能继续执行其他任务,从而提高了应用程序的响应速度和吞吐量。(CompletableFuture单词含义为未来能够完成的
  • 其中泛型SendResult<K, V>类封装了kafkaTemplate对象发送消息后的发送结果。

因为调用 kafkaTemplate.send() 方法发送消息时,Kafka需要一些时间来处理该消息(例如:网络延迟、消息序列化、Kafka集群的负载等),如果 send() 方法是同步的,那么发送消息可能会阻塞调用线程,直到消息发送成功或发生错误这会导致应用程序的性能下降,尤其是在高并发场景下。

使用 CompletableFuture类后,kafkaTemplate.send方法发送消息时会立即返回一个表示异步操作结果的未来对象(在消息发送完成后可以从这个对象中获取发送结果),而不是等待操作完成,这样,调用线程可以继续执行其他任务,而不必等待消息发送完成。当消息发送完成时(无论是成功还是失败),CompletableFuture会相应地更新其状态,并允许我们通过回调、阻塞等方式来获取操作结果。

实例:

package com.example.producer;

import jakarta.annotation.Resource;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@Component
public class EventProducer {
    //引入spring-kakfa依赖后,SpringBoot就会自动装配KakfaTemplate对象到Ioc容器中
    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;


    /*
     *  阻塞等待的方式获取发送结果,如果还没拿到发送结果就会阻塞等待,直到获取到结果
     */
    public void send6(){
        CompletableFuture<SendResult<String, String>> completableFuture
                = kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "key3", "hello kafka");

        //在消息发送完成后可以从这个对象中获取发送结果
        try {
            //1.阻塞等待的方式获取发送结果,如果还没拿到发送结果就会在这里阻塞等待,直到获取到结果
            SendResult<String, String> sendResult = completableFuture.get();
            if (sendResult.getRecordMetadata() != null){
                //如果SendResult对象中的RecordMetadata对象不为空,就表示消息发送到kafka服务器
                System.out.println("消息发送成功:" + sendResult.getRecordMetadata().toString());
            }
            System.out.println("producerRecord" + sendResult.getProducerRecord());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /*
     *  异步方式获取发送结果:
     *      使用CompletableFuture对象内的thenAccept(), thenApply(), thenRun()等方法来注册回调函数,回调函数会在CompletableFuture对象完成时被执行
     */
    public void send7(){
        CompletableFuture<SendResult<String, String>> completableFuture
                = kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "key3", "hello kafka");

        //在消息发送完成后可以从这个对象中获取发送结果
        try {
            //1.使用非阻塞的方式获取发送消息的结果
            completableFuture.thenAccept((t) -> {
                if (t.getRecordMetadata() != null){
                    //如果SendResult对象中的RecordMetadata对象不为空,就表示消息发送到kafka服务器
                    System.out.println("消息发送成功:" + t.getRecordMetadata().toString());
                }
                System.out.println("producerRecord" + t.getProducerRecord());
            }).exceptionally((t) -> {
                //如果发送消息过程中出现异常,就会执行这个回调方法
                t.printStackTrace();
                return null;
            });
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

3.6 生产者发送对象类型的消息

生产者类:

package com.example.producer;

import com.example.entity.User;
import jakarta.annotation.Resource;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@Component
public class EventProducer {
    //引入spring-kakfa依赖后,SpringBoot就会自动装配KakfaTemplate对象到Ioc容器中
    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;

    @Resource
    private KafkaTemplate<String,Object> kafkaTemplate2;



    public void send8(){
        User user = new User();
        user.setId(1);
        user.setName("张三");
        user.setAge(18);
        //分区传null表示让kafka自己选择将消息发到哪个分区
        kafkaTemplate2.sendDefault(null, System.currentTimeMillis(), "key3", user);
    }
}

测试类:

package com.example;

import com.example.producer.EventProducer;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class Springnboot01KafkaBaseApplicationTests {
    @Resource
    private EventProducer eventProducer;


    @Test
    void test08() {
        eventProducer.send8();
    }

}

此时会出现异常:org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.example.entity.User to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

因为kafka默认使用字符串序列化器StringSerializer来序列化消息的key和value,而此时发送过去的消息是一个对象,所以会报错,这个时候需要配置kafka使用的序列化方式:

spring:
  application:
    name: springnboot-01-kafka-base
  #配置afka相关信息
  kafka:
    #kafka的连接地址
    bootstrap-servers: 192.168.184.130:9092
    #生产者配置信息
    producer:
      key-serializer: org.springframework.kafka.support.serializer.JsonSerializer #指定消息key和消息value的编码(序列化)方式
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    #消费者配置信息
    consumer:
      auto-offset-reset: earliest
    template:
      default-topic: default-topic  #当代码中没有指定向哪个主题发送消息时,默认使用这里配置的主题

4. Kafka的核心概念

4.1 Replica副本

Replica:副本kafka实现备份功能保证kafka集群中的某个节点发生故障时使该节点上partition中的数据不丢失,且 Kafka仍然能够继续工作,Kafka提供了副本机制,一个topic主题中的每个分区patition都有1个或多个副本;

Replica副本分为Leader ReplicaFollower Replica

  • Leader:每个分区的多个副本中的“主”副本,生产者发送数据以及消费者消费数据,都是来自leader副本
  • Follower:每个分区的多个副本中的“从”副本,实时从leader副本中同步数据,保持和leader副本数据的同步,leader副本发生故障时,某个follower副本会成为新的leader副本

注意:设置副本的个数不能为0,也不能大于节点个数,否则将不能创建Topic;

4.2 创建主题时指定分区和副本

在直接使用send()方法发送消息时,kafka会自动创建topic,这种情况下创建的topic默认只有一个分区,分区只有1个副本,也就是有它自己本身的副本(主副本),没有额外的副本备份;

我们可以在项目中新建一个配置类专门用来初始化topic:

package com.example.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaConfig {

    @Bean
    public NewTopic newTopic(){
        //在配置类中配置一个Bean,使用NewTopic创建一个主题,后面项目启动后会加载配置类,然后就会创建这个Bean去创建一个topic
        //这个构造方法的每个参数为:主题的名称、分区数、每个分区的副本数
        return new NewTopic("heTopic", 5, (short) 1);
    }
}

如果要修改分区数,只需修改配置值重启项目即可,修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小,如果主题已经存在并不会重新创建。

4.3 生产者发送消息时的分区策略

生产者发送消息到topic时,Kafka将依据不同的策略将数据分配到不同的分区中:

1、默认分配策略:BuiltInPartitioner 

        有指定消息的key:Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;()

        没有指定消息的key:是使用随机数 % numPartitions

2、轮询分配策略:RoundRobinPartitioner实现类 (实现的接口:Partitioner)

  • 轮流往每个分区中发消息:如何配置:
    • package com.example.config;
      
      import org.apache.kafka.clients.admin.NewTopic;
      import org.apache.kafka.clients.producer.ProducerConfig;
      import org.apache.kafka.clients.producer.RoundRobinPartitioner;
      import org.springframework.beans.factory.annotation.Value;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.kafka.core.DefaultKafkaProducerFactory;
      import org.springframework.kafka.core.KafkaTemplate;
      import org.springframework.kafka.core.ProducerFactory;
      
      import java.util.HashMap;
      import java.util.Map;
      
      @Configuration
      public class KafkaConfig {
      
          @Value("${spring.kafka.bootstrap-servers}")
          private String bootstrapServers;
      
          @Value("${spring.kafka.producer.value-serializer}")
          private String valueSerializer;
      
          @Value("${spring.kafka.producer.key-serializer}")
          private String keySerializer;
      
      
          /**
           * 创建并返回一个包含Kafka生产者相关配置的Map集合
           */
          public Map<String,Object> producerConfigs(){
              Map<String, Object> props = new HashMap<>();
              //指定kafka的连接地址
              props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
              //指定key的序列化器
              props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
              //指定值的序列化器
              props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
              //指定分区策略为轮询的方式
              props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);
              return props;
          }
      
          /**
           * 创建并返回一个生产者工厂,使用前面定义的producerConfigs()方法提供的配置
           */
          public ProducerFactory<String,?> producerFactory(){
              return new DefaultKafkaProducerFactory<>(producerConfigs());
          }
      
          /**
           * KafkaTemplate:用于覆盖默认的KafkaTemplate对象,使用前面创建的producerFactory()构建
           */
          @Bean
          public KafkaTemplate<String,?> kafkaTemplate(){
              return new KafkaTemplate<>(producerFactory());
          }
      
      
          /**
           * 创建一个主题
           */
          @Bean
          public NewTopic newTopic(){
              //在配置类中配置一个Bean,使用NewTopic创建一个主题,后面项目启动后会加载配置类,然后就会创建这个Bean去创建一个topic
              //这个构造方法的每个参数为:主题的名称、分区数、每个分区的副本数
              return new NewTopic("heTopic", 9, (short) 1);
          }
      }
      

3、自定义分配策略

定义一个类实现kafka提供的Partitioner接口:

package com.example.config;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class CustomPatitioner implements Partitioner {
    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        //在这个方法里面写分区策略的代码即可,其他两个方法可以不用管
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

之后在配置类中配置一下这个类即可:

package com.example.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.producer.value-serializer}")
    private String valueSerializer;

    @Value("${spring.kafka.producer.key-serializer}")
    private String keySerializer;


    /**
     * 创建并返回一个包含Kafka生产者相关配置的Map集合
     */
    public Map<String,Object> producerConfigs(){
        Map<String, Object> props = new HashMap<>();
        //指定kafka的连接地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        //指定key的序列化器
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
        //指定值的序列化器
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
        //指定分区策略为轮询的方式
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPatitioner.class);
        return props;
    }

    /**
     * 创建并返回一个生产者工厂,使用前面定义的producerConfigs()方法提供的配置
     */
    public ProducerFactory<String,?> producerFactory(){
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    /**
     * KafkaTemplate:用于覆盖默认的KafkaTemplate对象,使用前面创建的producerFactory()构建
     */
    @Bean
    public KafkaTemplate<String,?> kafkaTemplate(){
        return new KafkaTemplate<>(producerFactory());
    }


    /**
     * 创建一个主题
     */
    @Bean
    public NewTopic newTopic(){
        //在配置类中配置一个Bean,使用NewTopic创建一个主题,后面项目启动后会加载配置类,然后就会创建这个Bean去创建一个topic
        //这个构造方法的每个参数为:主题的名称、分区数、每个分区的副本数
        return new NewTopic("heTopic", 9, (short) 1);
    }
}

4.4 生产者发送消息的流程

首先,生产者发消息时,首先会经过拦截器(可以有0个到多个拦截器,默认是没有拦截器的),然后会走序列化器,分别对消息的key和value进行序列化,默认使用字符串序列化器,然后走分区器(消息会发到哪个分区),最后才会把消息发到topic中。

4.4.1 拦截生产者发送的消息

自定义拦截器拦截生产者发送的消息,实现ProducerInterceptor<K, V>接口,这个接口的泛型分别表示消息的key和value是什么类型:

package com.example.config;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;

public class CustomerProducerInterceptor implements ProducerInterceptor<String,Object> {
    /**
     * 发送消息时会先调用此方法,对消息进行拦截,可以对消息进行一些处理,比如记录日志等
     * @param producerRecord 拦截到的完整的消息对象
     * @return 返回原本的完整消息对象,继续传给下面的方法
     */
    @Override
    public ProducerRecord onSend(ProducerRecord producerRecord) {
        System.out.println("拦截消息:" + producerRecord.toString());
        return producerRecord;
    }

    /**
     * 生产者发送消息给kafka服务器,服务器会返回一个响应,表示服务器是否收到这个消息
     * @param recordMetadata 如果服务器收到消息,就可以从RecordMetadata对象中拿到偏移量等信息
     * @param e 如果没有收到消息,RecordMetadata对象就会为null,会出现异常信息
     */
    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        if (recordMetadata != null){
            System.out.println("服务器收到消息:" + recordMetadata.offset());
        }else {
            System.out.println("消息发送失败:" + e.getMessage());
        }
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

消息拦截器定义好后,还需要在生产者配置中配置一下拦截器让其生效:

package com.example.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.producer.value-serializer}")
    private String valueSerializer;

    @Value("${spring.kafka.producer.key-serializer}")
    private String keySerializer;


    /**
     * 创建并返回一个包含Kafka生产者相关配置的Map集合
     */
    public Map<String,Object> producerConfigs(){
        Map<String, Object> props = new HashMap<>();
        //指定kafka的连接地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        //指定key的序列化器
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
        //指定值的序列化器
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
        //指定分区策略为轮询的方式
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);
        //添加自定义的拦截器
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomerProducerInterceptor.class.getName());
        return props;
    }

    /**
     * 创建并返回一个生产者工厂,使用前面定义的producerConfigs()方法提供的配置
     */
    public ProducerFactory<String,?> producerFactory(){
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    /**
     * KafkaTemplate:用于覆盖默认的KafkaTemplate对象,使用前面创建的producerFactory()构建
     */
    @Bean
    public KafkaTemplate<String,?> kafkaTemplate(){
        return new KafkaTemplate<>(producerFactory());
    }


    /**
     * 创建一个主题
     */
    @Bean
    public NewTopic newTopic(){
        //在配置类中配置一个Bean,使用NewTopic创建一个主题,后面项目启动后会加载配置类,然后就会创建这个Bean去创建一个topic
        //这个构造方法的每个参数为:主题的名称、分区数、每个分区的副本数
        return new NewTopic("heTopic", 9, (short) 1);
    }
}

之后使用生产者发送消息:
 

    public void send8(){
        User user = new User();
        user.setId(1);
        user.setName("张三");
        user.setAge(18);
        //分区传null表示让kafka自己选择将消息发到哪个分区
        kafkaTemplate2.send("heTopic", user);
    }


    @Test
    void test08() {
        eventProducer.send8();
    }

控制台输出:

拦截消息:ProducerRecord(topic=heTopic, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=com.example.entity.User@3688baab, timestamp=null)
2025-08-05T00:52:57.036+08:00  INFO 20980 --- [springnboot-01-kafka-base] [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: Some(5L6g3nShT-eMCtK--X86sw)
2025-08-05T00:52:57.036+08:00  INFO 20980 --- [springnboot-01-kafka-base] [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1] ProducerId set to 2 with epoch 0
服务器收到消息:2

5. 消费者消费消息详解

5.1 获取生产者发送的消息

package com.example.springboot02kafkabase.consumer;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
public class EventConsumer {
    @KafkaListener(topics = "helloTopic",groupId = "helloGroup")
    public void onEvent(@Payload String event,
                        @Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,
                        //@Header(value = KafkaHeaders.RECEIVED_KEY) String key,
                        @Header(value = KafkaHeaders.RECEIVED_PARTITION) String partition){
        System.out.println("读取到事件:" + event + ",topic:" + topic + ",partition:" + partition);
    }

}

代码讲解:

  • @KafkaListener: Spring Kafka提供的注解,用于声明Kafka消息监听器

  • @Payload: 标识用于获取消息体内容(即消息的实际内容)并注入到方法形参中,而不是消息头或其他元数据。

  • @Header注解:标识用于从消息头中获取指定值,并注入到方法的形参中

  • KafkaHeaders类: 包含Kafka消息头常量定义的类

    • 常用常量

      常量 说明 示例值
      RECEIVED_TOPIC 消息来源的主题 "test-topic"
      RECEIVED_PARTITION 消息来源的分区ID 2
      RECEIVED_KEY 消息的键 "user-123"
      RECEIVED_TIMESTAMP 消息的时间戳 1625097600000
      OFFSET 消息的偏移量 15
      GROUP_ID 消费者组ID "my-group"

使用ConsumerRecord<K,V>来接收消息:

package com.example.springboot02kafkabase.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class EventConsumer {
    @KafkaListener(topics = "helloTopic",groupId = "helloGroup")
    public void onEvent(ConsumerRecord<String,String> consumerRecord){
        System.out.println("consumerRecord:" + consumerRecord.toString());
        System.out.println("key:" + consumerRecord.key());
        System.out.println("value:" + consumerRecord.value());
        System.out.println("topic:" + consumerRecord.topic());
    }
}

生产者发送消息后的控制台:

ConsumerRecord<K,V>讲解:

  • 作用: 用于消费者接收消息表示消费者接收到的单条消息记录通过这个对象可以获取到最完整的消息数据,一次性访问消息的所有数据。

  • 泛型参数:

    • 第一个 String: 消息键(key)的类型

    • 第二个 String: 消息值(value)的类型

  • 主要方法

    方法 返回类型 说明
    key() String 获取消息的键
    value() String 获取消息的值(正文内容)
    topic() String 获取消息所属主题
    partition() int 获取消息所在分区ID
    offset() long 获取消息在分区中的偏移量
    timestamp() long 获取消息时间戳
    headers() Headers 获取消息头信息

配置消费者监听的主题和指定消费者组的时候可以不写死,而是写在配置文件:

spring:
  application:
    name: springnboot-02-kafka-base
  #配置afka相关信息
  kafka:
    #kafka的连接地址
    bootstrap-servers: 192.168.184.130:9092
    #生产者配置信息
    #producer:


    #消费者配置信息
    #consumer:

#自定义配置
kafka:
  topic:
    name: helloTopic
  consumer:
    group: helloGroup


package com.example.springboot02kafkabase.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class EventConsumer {
    @KafkaListener(topics = "${kafka.topic.name}",groupId = "${kafka.consumer.group}")
    public void onEvent(ConsumerRecord<String,String> consumerRecord){
        System.out.println("consumerRecord:" + consumerRecord.toString());
        System.out.println("key:" + consumerRecord.key());
        System.out.println("value:" + consumerRecord.value());
        System.out.println("topic:" + consumerRecord.topic());
    }
}

5.2 开启消费者手动确认消息收到模式

先在SpringBoot配置文件中开启相应配置:

spring:
  application:
    name: springnboot-02-kafka-base
  #配置afka相关信息
  kafka:
    #kafka的连接地址
    bootstrap-servers: 192.168.184.130:9092
    #生产者配置信息
    #producer:


    #消费者配置信息
    #consumer:

    #配置消息监听器
    listener:
      ack-mode: manual  #开启消息监听的手动确认模式

#自定义配置
kafka:
  topic:
    name: helloTopic
  consumer:
    group: helloGroup


然后在消费者方法形参上添加Acknowledgment对象,最后再在代码中使用Acknowledgment对象调用acknowledge方法:

package com.example.springboot02kafkabase.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
public class EventConsumer {
    @KafkaListener(topics = "${kafka.topic.name}",groupId = "${kafka.consumer.group}")
    public void onEvent(@Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,
                        @Header(value = KafkaHeaders.RECEIVED_PARTITION) String partition,
                        @Payload ConsumerRecord<String,String> consumerRecord,
                        Acknowledgment ack){
        System.out.println("消费者接收到的单条消息记录:" + consumerRecord.toString());

        ack.acknowledge(); //手动确认消息,告诉kafka服务器,该消息已经确认收到,默认情况下kafka是自动确认的
    }

}

注意:如果消费者不确认收到消息,kafka服务器会认为消息没有收到,相当于消息没有被消费,会造成重复消费消息;原理就是确认消息后,kafka会将offset偏移量更新,如果不确认收到消息偏移量就没有更新所以会重复消费消息。

实际的使用实例:

默认情况下,Kafka消费者消费消息后,会自动发送确认信息给Kafka服务器,表示消息已经被成功消费。但在某些场景下,我们希望在消息处理成功后(也就是业务处理成功)再发送确认收到消息,以便Kafka能够重新发送该消息。

下面代码中,如果业务处理中发送异常,消费者就不会确认收到消息,消费者还可以再次接收到刚刚处理失败的那条消息。

package com.example.springboot02kafkabase.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
public class EventConsumer {
    @KafkaListener(topics = "${kafka.topic.name}",groupId = "${kafka.consumer.group}")
    public void onEvent(@Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,
                        @Header(value = KafkaHeaders.RECEIVED_PARTITION) String partition,
                        @Payload ConsumerRecord<String,String> consumerRecord,
                        Acknowledgment ack){
        try{
            //-----------业务代码开始-------------
            System.out.println("消费者接收到的单条消息记录:" + consumerRecord.toString());
            //-----------业务代码结束-------------

            //业务处理完成后,消费者告诉kafka服务器收到消息
            ack.acknowledge(); 
        }catch (Exception e){
            e.printStackTrace();
        }

    }

}

5.3 消费者接收消息的时候指定topic、partition、offset进行消费

application.yml

spring:
  application:
    name: springnboot-02-kafka-base
  #配置afka相关信息
  kafka:
    #kafka的连接地址
    bootstrap-servers: 192.168.184.130:9092
    #生产者配置信息
    #producer:


    #消费者配置信息
    consumer:
      auto-offset-reset: earliest

    #配置消息监听器
    listener:
      ack-mode: manual  #开启消息监听的手动确认模式

#自定义配置
kafka:
  topic:
    name: helloTopic
  consumer:
    group: helloGroup


消费者:

package com.example.springboot02kafkabase.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
public class EventConsumer {
    @KafkaListener(groupId = "${kafka.consumer.group}",
            topicPartitions = {@TopicPartition(
            topic = "${kafka.topic.name}",  //指定监听的topic
            partitions = {"0","1","2"},  //指定监听的分区,设置从哪个偏移量开始消费
                    //3分区和4分区的从3和4偏移量后面的数据开始消费
            partitionOffsets = {@PartitionOffset(partition = "3",initialOffset = "3"),@PartitionOffset(partition = "4",initialOffset = "4")}
    )})
    public void onEvent(@Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,
                        @Header(value = KafkaHeaders.RECEIVED_PARTITION) String partition,
                        @Payload ConsumerRecord<String,String> consumerRecord,
                        Acknowledgment ack){
        try{
            //-----------业务代码开始-------------
            System.out.println("消费者接收到的单条消息记录:" + consumerRecord.toString());
            //-----------业务代码结束-------------

            //业务处理完成后,消费者告诉kafka服务器收到消息
            ack.acknowledge();
        }catch (Exception e){
            e.printStackTrace();
        }

    }

}

kafka配置类:
 

package com.example.springboot02kafkabase.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaConfig {
    @Bean
    public NewTopic newTopic(){
        return new NewTopic("helloTopic",5, (short) 1);
    }
}

生产者:

package com.example.springboot02kafkabase.producer;

import jakarta.annotation.Resource;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class EventProducer {
    //引入spring-kakfa依赖后,SpringBoot就会自动装配KakfaTemplate对象到Ioc容器中
    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;


    public void send(){
        for (int i = 0; i < 25; i++) {
            //指定消息的key,让消息可以发送到不同的分区中
            kafkaTemplate.send("helloTopic","k" + i,"hello kafka");
        }
    }
}

测试类:

package com.example.springboot02kafkabase;

import com.example.springboot02kafkabase.producer.EventProducer;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class Springboot02KafkaBaseApplicationTests {

    @Resource
    private EventProducer eventProducer;

    @Test
    public void test01(){
        eventProducer.send();
    }
}

运行单元测试后的控制台:

5.4 开启消费者批量消费消息

1、配置application.yml开启批量消费:

spring:
  application:
    name: springnboot-02-kafka-base
  #配置afka相关信息
  kafka:
    #kafka的连接地址
    bootstrap-servers: 192.168.184.130:9092
    #生产者配置信息
    #producer:


    #消费者配置信息
    consumer:
      auto-offset-reset: earliest
      max-poll-records: 20  #每次最多批量消费的消息数量

    #配置消息监听器
    listener:
      type: batch   #配置为batch,表示批量消费消息

#自定义配置
kafka:
  topic:
    name: helloTopic
  consumer:
    group: helloGroup


2、消费者接收消息时用List集合来接收

package com.example.springboot02kafkabase.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class EventConsumer {
    @KafkaListener(topics = {"batchTopic"},groupId = "batchGroup")
    public void onEvent(List<ConsumerRecord<String, String>> consumerRecordList){
        try{
            //-----------业务代码开始-------------
            System.out.println("批量消费消息,消息总数=" + consumerRecordList.size() + ",consumerRecordList=" + consumerRecordList);
            //-----------业务代码结束-------------
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

其余代码:
生产者

package com.example.springboot02kafkabase.producer;

import jakarta.annotation.Resource;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class EventProducer {
    //引入spring-kakfa依赖后,SpringBoot就会自动装配KakfaTemplate对象到Ioc容器中
    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;


    public void send(){
        for (int i = 0; i < 125; i++) {
            //指定消息的key,让消息可以发送到不同的分区中
            kafkaTemplate.send("batchTopic","k" + i,"hello kafka");
        }
    }
}

测试类:

package com.example.springboot02kafkabase;

import com.example.springboot02kafkabase.producer.EventProducer;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class Springboot02KafkaBaseApplicationTests {

    @Resource
    private EventProducer eventProducer;

    @Test
    public void test01(){
        eventProducer.send();
    }
}

运行单元测试后,可以看到服务的控制台,消费者一次接收20条消息到List集合中:

5.5 消费消息时对消息进行拦截

在消费者消费消息之前,通过配置拦截器可以对消息进行拦截,在消息被实际处理之前对其进行一些操作,例如记录日志、修改消息内容或执行一些安全检查等。

1、自定义一个类实现kafka的ConsumerInterceptor拦截器接口:

package com.example.springboot02kafkabase.config.interceptor;

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;

public class CustomConsumerInterceptor implements ConsumerInterceptor<String,String> {

    /**
     * 在消费消息之前执行该方法
     * @param consumerRecords
     * @return
     */
    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> consumerRecords) {
        System.out.println("onConsume方法执行" + consumerRecords);
        return consumerRecords;
    }

    /**
     * 在消息拿到之后,提交offset之前执行该方法
     * @param map
     */
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
        System.out.println("onCommit方法执行" + map);
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

2、在Kafka消费者的ConsumerFactory配置中注册这个拦截器:

package com.example.springboot02kafkabase.config;

import com.example.springboot02kafkabase.interceptor.CustomConsumerInterceptor;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.value-deserializer}")
    private String valueSerializer;

    @Value("${spring.kafka.consumer.key-deserializer}")
    private String keySerializer;

    /**
     * 创建并返回一个包含Kafka消费者相关配置的Map集合
     */
    public Map<String,Object> consumerConfigs(){
        Map<String, Object> props = new HashMap<>();
        //指定kafka的连接地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        //指定key的序列化器
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keySerializer);
        //指定值的序列化器
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueSerializer);
        //添加自定义的消费者拦截器
        props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomConsumerInterceptor.class.getName());
        return props;
    }

    /**
     * 创建并返回一个消费者工厂,使用前面定义的consumerConfigs()方法提供的配置
     */
    @Bean
    public ConsumerFactory<String,String> myConsumerFactory(){
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    /**
     * 创建并返回一个Kafka监听器工厂,使用前面定义的消费者工厂
     * @param myConsumerFactory
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<?> myKafkaListenerContainerFactory(ConsumerFactory<String,String> myConsumerFactory){
        ConcurrentKafkaListenerContainerFactory<String,String> listenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        listenerContainerFactory.setConsumerFactory(myConsumerFactory);
        return listenerContainerFactory;
    }
}

3、编写消费者:

package com.example.springboot02kafkabase.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class EventConsumer {
    @KafkaListener(topics = {"batchTopic"},groupId = "batchGroup",containerFactory = "myKafkaListenerContainerFactory")
    public void onEvent(ConsumerRecord<String, String> consumerRecord){
        try{
            //-----------业务代码开始-------------
            System.out.println("消费消息,consumerRecord=" + consumerRecord);
            //-----------业务代码结束-------------
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

生产者发送消息:
 

package com.example.springboot02kafkabase.producer;

import jakarta.annotation.Resource;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class EventProducer {
    //引入spring-kakfa依赖后,SpringBoot就会自动装配KakfaTemplate对象到Ioc容器中
    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;


    public void send(){
        //指定消息的key,让消息可以发送到不同的分区中
        kafkaTemplate.send("batchTopic","hello kafka");
    }
}

application.yml

spring:
  application:
    name: springnboot-02-kafka-base
  #配置afka相关信息
  kafka:
    #kafka的连接地址
    bootstrap-servers: 192.168.184.131:9092
    #生产者配置信息
    #producer:


    #消费者配置信息
    consumer:
      auto-offset-reset: earliest
      max-poll-records: 20  #消费者每次最多批量消费的消息数量
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

    #配置消息监听器
    listener:
      type: batch   #配置为batch,表示批量消费消息

#自定义配置
kafka:
  topic:
    name: helloTopic
  consumer:
    group: helloGroup


运行测试类:

package com.example.springboot02kafkabase;

import com.example.springboot02kafkabase.producer.EventProducer;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class Springboot02KafkaBaseApplicationTests {

    @Resource
    private EventProducer eventProducer;

    @Test
    public void test01(){
        eventProducer.send();
    }
}

查看控制台可以发送消费者拦截器成功拦截到消息并处理:

5.6 消息转发

消息转发就是应用A的消费者从TopicA接收到消息经过处理后,再转发到TopicB,再由应用B的消费者监听接收该消息,即一个应用处理完成后将该消息转发至其他应用处理,这在实际开发中,是可能存在这样的需求的。

消费者:

package com.example.springboot02kafkabase.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

@Component
public class EventConsumer {
    @KafkaListener(topics = {"topicA"},groupId = "aGroup")
    @SendTo(value = "topicB")  //指定要转发到的主题
    public String onEventA(ConsumerRecord<String, String> consumerRecord){
        System.out.println("消费消息A,consumerRecord=" + consumerRecord);
        return consumerRecord.value() + "---forward message";   //需要将消息返回
    }

    @KafkaListener(topics = {"topicB"},groupId = "bGroup")
    public void onEventB(ConsumerRecord<String, String> consumerRecord){
        System.out.println("消费消息B,consumerRecord=" + consumerRecord);
    }
}

生产者:

package com.example.springboot02kafkabase.producer;

import jakarta.annotation.Resource;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class EventProducer {
    //引入spring-kakfa依赖后,SpringBoot就会自动装配KakfaTemplate对象到Ioc容器中
    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;


    public void send(){
        //向TopicA发送消息hello kafka
        kafkaTemplate.send("topicA","hello kafka");
    }
}

application.yml:

spring:
  application:
    name: springnboot-02-kafka-base
  #配置afka相关信息
  kafka:
    #kafka的连接地址
    bootstrap-servers: 192.168.184.131:9092
    #生产者配置信息
    #producer:


    #消费者配置信息
    consumer:
      auto-offset-reset: earliest
      max-poll-records: 20  #消费者每次最多批量消费的消息数量
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

#自定义配置
kafka:
  topic:
    name: helloTopic
  consumer:
    group: helloGroup

5.7 消费者消费消息的分区策略

Kafka消费者消费消息的分区策略:指定topic中的哪些分区由哪些消费者来消费。

Kafka有多种分区分配策略,默认的分区分配策略是RangeAssignor,除了RangeAssignor策略外,Kafka还有其他分区分配策略:

  • RoundRobinAssignor
  • StickyAssignor
  • CooperativeStickyAssignor

这些策略各有特点,可以根据实际的应用场景和需求来选择适合的分区分配策略。

5.7.1 RangeAssignor分区分配策略

Kafka消费者默认的消息消费分区分配策略是:RangeAssignor(范围分配)假设如下:

  • 一个主题myTopic有10个分区;(p0 - p9
  • 一个消费者组内有3个消费者:consumer1、consumer2、consumer3;

RangeAssignor消费分区策略:

1、计算每个消费者应得的分区数:分区总数(10)/  消费者组中消费者数量(3)= 3 ... 余1

  • 每个消费者理论上应该得到3个分区,但由于有余数1,所以第1个消费者会多得到一个分区
  • consumer1(作为第一个消费者)将得到 3 + 1 = 4 个分区
  • consumer2 consumer3 将各得到 3 个分区

2、具体分配:分区编号从0到9,按照编号顺序为消费者分配分区:

  • consumer1 将分配得到分区 0、1、2、3;
  • consumer2 将分配得到分区 4、5、6;
  • consumer3 将分配得到分区 7、8、9;

总结:RangeAssignor策略是根据消费者组内的消费者数量和主题的分区数量,来均匀地为每个消费者分配分区。

RangeAssignor分区分配策略的例子:

KafkaConfig:

package com.example.springboot02kafkabase.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaConfig {
    @Bean
    public NewTopic newTopic(){
        return new NewTopic("myTopic", 10, (short) 1);
    }
}

消费者:

package com.example.springboot02kafkabase.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class EventConsumer {
    /**
     * concurrency属性:为同一个 @KafkaListener 创建多个消费者实例(线程),并并行消费同一个Topic中的分区的消息
     * @param consumerRecord
     */
    @KafkaListener(topics = {"myTopic"},groupId = "myGroup",concurrency = "3")
    public void onEventB(ConsumerRecord<String, String> consumerRecord){
        System.out.println(Thread.currentThread().getName() + "---消费消息,consumerRecord=" + consumerRecord);
    }
}

生产者:

package com.example.springboot02kafkabase.producer;

import jakarta.annotation.Resource;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class EventProducer {
    //引入spring-kakfa依赖后,SpringBoot就会自动装配KakfaTemplate对象到Ioc容器中
    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;


    public void send(){
        //向TopicA发送消息hello kafka
        for (int i = 0; i < 100; i++) {
            kafkaTemplate.send("myTopic","k" + i,"hello kafka");
        }
    }
}

测试类:

package com.example.springboot02kafkabase;

import com.example.springboot02kafkabase.producer.EventProducer;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class Springboot02KafkaBaseApplicationTests {

    @Resource
    private EventProducer eventProducer;

    @Test
    public void test01(){
        eventProducer.send();
    }
}

最后在控制台输出中可以发现每个消费者都按默认的分区策略在分配到的分区中消费消息:

5.7.2 RoundRobinAssignor分区分配策略

继续以前面的例子数据,采用RoundRobinAssignor策略进行测试

轮询的消息消费分区分配策略就是:每个消费者轮流的按顺序消费每个分区

1、在配置类中去指定分区分配策略:

package com.example.springboot02kafkabase.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.value-deserializer}")
    private String valueSerializer;

    @Value("${spring.kafka.consumer.key-deserializer}")
    private String keySerializer;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;


    @Bean
    public NewTopic newTopic(){
        return new NewTopic("myTopic", 10, (short) 1);
    }

    /**
     * 创建并返回一个包含Kafka消费者相关配置的Map集合
     */
    public Map<String,Object> consumerConfigs(){
        Map<String, Object> props = new HashMap<>();
        //指定kafka的连接地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        //指定key的序列化器
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keySerializer);
        //指定值的序列化器
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueSerializer);
        //指定消费者偏移量策略
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        //指定消息消费分区器,这里指定为轮询的消息消费分区器
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
        return props;
    }

    /**
     * 创建并返回一个消费者工厂,使用前面定义的consumerConfigs()方法提供的配置
     */
    @Bean
    public ConsumerFactory<String,String> myConsumerFactory(){
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    /**
     * 创建并返回一个Kafka监听器工厂,使用前面定义的消费者工厂
     * @param myConsumerFactory
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<?> myKafkaListenerContainerFactory(ConsumerFactory<String,String> myConsumerFactory){
        ConcurrentKafkaListenerContainerFactory<String,String> listenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        listenerContainerFactory.setConsumerFactory(myConsumerFactory);
        return listenerContainerFactory;
    }
}

消费者:

package com.example.springboot02kafkabase.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class EventConsumer {
    /**
     * concurrency属性:为同一个 @KafkaListener 创建多个消费者实例(线程),并并行消费同一个Topic中的分区的消息
     * @param consumerRecord
     */
    @KafkaListener(topics = {"myTopic"},groupId = "myGroup",concurrency = "3",containerFactory = "myKafkaListenerContainerFactory")
    public void onEventB(ConsumerRecord<String, String> consumerRecord){
        System.out.println(Thread.currentThread().getName() + "---消费消息,consumerRecord=" + consumerRecord);
    }
}

生产者:

package com.example.springboot02kafkabase.producer;

import jakarta.annotation.Resource;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class EventProducer {
    //引入spring-kakfa依赖后,SpringBoot就会自动装配KakfaTemplate对象到Ioc容器中
    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;


    public void send(){
        //向TopicA发送消息hello kafka
        for (int i = 0; i < 100; i++) {
            kafkaTemplate.send("myTopic","k" + i,"hello kafka");
        }
    }
}

application:

spring:
  application:
    name: springnboot-02-kafka-base
  #配置afka相关信息
  kafka:
    #kafka的连接地址
    bootstrap-servers: 192.168.184.131:9092
    #生产者配置信息
    #producer:


    #消费者配置信息
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: earliest

测试类:

package com.example.springboot02kafkabase;

import com.example.springboot02kafkabase.producer.EventProducer;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class Springboot02KafkaBaseApplicationTests {

    @Resource
    private EventProducer eventProducer;

    @Test
    public void test01(){
        eventProducer.send();
    }
}

5.7.3 StickyAssignor消费分区分配策略

尽可能保持消费者与分区之间的分配关系不变,即使消费者组中消费者成员发生变化,减少不必要的分区重分配;

尽量保持现有的分区分配不变,对新加入的消费者或离开的消费者进行分区调整。这样,大多数消费者可以继续消费它们之前消费的分区,只有少数消费者需要处理额外的分区;所以叫“粘性”分配;

5.7.4 CooperativeStickyAssignor消费分区策略

与 StickyAssignor 类似,但增加了对协作式重新平衡的支持,即消费者可以在它离开消费者组之前通知协调器,以便协调器可以预先计划分区迁移,而不是在消费者突然离开时立即进行分区重分配;

6. Kafka事件(消息、数据)的存储

  1. kafka中的所有事件(消息、数据)都存储在/tmp/kafka-logs目录中,可以通过kafka的server.properties配置文件中的配置项log.dirs=/tmp/kafka-logs配置事件存储的目录。
  2. Kafka主题的所有事件(消息、数据)都是以日志文件的方式来保存,某个主题的某个分区可以看成就是一个目录,专门存储该主题该分区的数据
  3. Kafka一般都是海量的消息数据,为了避免日志文件过大,日志文件被存放在多个日志目录下,日志目录的命名规则为:<topic_name>-<partition_id>
  4. 比如创建一个名为 firstTopic 的 topic,其中有 3 个 partition,那么在 kafka 的数据目录/tmp/kafka-log下就有 3 个目录,firstTopic-0、firstTopic-1、firstTopic-2
    1. 00000000000000000000.index  消息索引文件
    2. 00000000000000000000.log  消息数据文件,也就是发送的实际消息内容存储在这个文件中
    3. 00000000000000000000.timeindex  消息的时间戳索引文件
    4. 00000000000000000006.snapshot  快照文件,生产者发生故障或重启时能够恢复并继续之前的操作
    5. leader-epoch-checkpoint  记录每个分区当前领导者的epoch以及领导者开始写入消息时的起始偏移量
    6. partition.metadata  存储关于特定分区的元数据(metadata)信息

每次消费者消费一个消息并且提交以后,会保存当前消费到的最近的一个offset,在kafka中,内置了一个名叫__consumer_offsets的topic, 消费者提交的offset信都息会写入到该topic中,__consumer_offsets保存了每个consumer group某一时刻提交的offset信息,__consumer_offsets默认有50个分区;

consumer_group在消费消息后提交的偏移量保存在__consumer_offsets的哪个分区中的计算公式:Math.abs(“groupid”.hashCode())%groupMetadataTopicPartitionCount ;

7. Offset详解

7.1 生产者Offset

生产者发送一条消息到Kafka broker的某个topic下的某个partition中;

生产者offset:生产者发送的每条消息,Kafka内部都会为其分配一个唯一的offset(从0开始顺序增长),该offset是该消息在partition中的位置。

代码演示:

消费者:

package com.example.springboot02kafkabase.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class EventConsumer {
    @KafkaListener(topics = {"offsetTopic"},groupId = "offsetGroup")
    public void onEventB(ConsumerRecord<String, String> consumerRecord){
        System.out.println(Thread.currentThread().getName() + "---消费消息,consumerRecord=" + consumerRecord);
    }
}

生产者:

package com.example.springboot02kafkabase.producer;

import jakarta.annotation.Resource;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class EventProducer {
    //引入spring-kakfa依赖后,SpringBoot就会自动装配KakfaTemplate对象到Ioc容器中
    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;


    public void send(){
        //向TopicA发送消息hello kafka
        for (int i = 0; i < 2; i++) {
            kafkaTemplate.send("offsetTopic","k" + i,"hello kafka");
        }
    }
}

发送消息后通过kafka客户端工具查看,因为我执行了三次生产者代码。所以一共发了六条消息:

  • 通过第一张图可以看到,offsetTopic中的0号分区的开始偏移量是0,结束偏移量是6,现在偏移量的位置是在6(也就是生产者的偏移量现在是最后一条消息的位置的下一个位置,下次放消息就放到6号位置)。

7.2 消费者Offset

消费者offset:记录消费者要消费的分区中的下一条消息的位置,接下来需要从分区的哪个位置开始消费消息。

每个消费者组(Consumer Group)中的每个消费者都会独立维护自己所消费分区的offset,当消费者从某个partition读取消息时,它会记录当前读取到的offset,这样,即使消费者崩溃或重启,它也可以从上次读取的offset位置继续消费,而不会重复消费或遗漏消息;(注意:消费者的offset需要消费消息并提交后才记录offset)。

每个消费者组启动开始监听消息的时候,默认是从消息的最新位置(也就是生产者offset)开始监听消息,即把最新位置作为消费者的offset

  • 启动消费者开始监听的时候,如果分区中还没有生产者发送过消息,则最新的消费者偏移量就是0,从最新的位置开始消费消息     (情况:如果之后出现宕机就算先用生产者发送消息,消费者还是从0开始消费消息,因为最开始启动的时候已经记录消费者的偏移量是0了)
  • 启动消费者开始监听的时候,如果分区中已经有生产者发送过消息,则最新的消费者偏移量就是生产者的offset,在消费者开始监听之前发送的消息默认是消费不了的

消费者消费消息后,如果不提交确认(ack),则消费者offset不更新,提交了才更新

查看消费者组的详情命令行命令:./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group osGroup --describe

结论:消费者从什么位置开始消费,就看消费者的offset是多少,消费者offset是多少,它启动后,可以通过上面的命令查看;

8. Kafka集群

8.1 Kafka集群搭建

一种是基于Zookeeper方式的集群搭建

一种是基于Kraft方式的集群搭建

8.1.1 kafka集群搭建(基于Zookeeper方式)

1、kafka是一个压缩包,直接解压即可使用,所以我们就解压三个kafka(在三台机器上分别安装三个kafka,或在一台机器上安装三个kafka,用不同的目录、不同的端口)

2、配置kafka的这个配置文件:server.properties 

  • (1)三台kafka分别配置为:
    • broker.id=1、broker.id=2、broker.id=3
    • 该配置项是每个broker的唯一id,取值在0~255之间;
  • (2)三台分别配置listener=PAINTEXT:IP:PORT
    • listeners=PLAINTEXT://0.0.0.0:9091
    • listeners=PLAINTEXT://0.0.0.0:9092
    • listeners=PLAINTEXT://0.0.0.0:9093
  • (3)三台分别配置advertised.listeners=PAINTEXT:IP:PORT
    • advertised.listeners=PLAINTEXT://192.168.11.128:9091
    • advertised.listeners=PLAINTEXT://192.168.11.128:9092
    • advertised.listeners=PLAINTEXT://192.168.11.128:9093
  • (3)配置消息存放的日志目录
    • log.dirs=/tmp/kafka-logs-9091
    • log.dirs=/tmp/kafka-logs-9092
    • log.dirs=/tmp/kafka-logs-9093
    • 这是极为重要的配置项,kafka所有数据就是写入这个目录下的磁盘文件中
  • (4)配置kafka的server.properties文件中的zookeeper连接地址
    • zookeeper.connect=localhost:2181
    • 如果zookeeper是集群,则:​​​zookeeper.connect=localhost:2181,localhost:2182,localhost:2183

3、kafka集群启动:

  1. 先启动Zookeeper,切换到zookeeper家目录/bin目录下执行命令./zkServer.sh start
  2. 启动三个Kafka,切换到bin目录:./kafka-server-start.sh ../config/server.properties
  3. 查看topic详情:./kafka-topics.sh --bootstrap-server 127.0.0.1:9091 --describe --topic clusterTopic

4、测试:

  • 使用idea的zookeeper客户端插件连接zookeeper:

  • 使用idea的kafka客户端插件连接三台kafka:

  • 在SpringBoot中连接kafka集群

先编写application.yml

spring:
  application:
    name: springnboot-02-kafka-base
  #配置afka相关信息
  kafka:
    #kafka的连接地址
    bootstrap-servers: 192.168.184.132:9091,192.168.184.132:9092,192.168.184.132:9093
    #生产者配置信息
    #producer:


    #消费者配置信息
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

KafkaConfig:

package com.example.springboot02kafkabase.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaConfig {
    @Bean
    public NewTopic newTopic(){
        //这里可以将主题中分区的副本数设置成3,因为现在有三个kafka节点了
        //分区的副本数不能为0,也不能大于节点的个数
        return new NewTopic("clusterTopic",3,(short)3);
    }
}

最后启动SpringBoot应用,可以在zookeeper客户端中看到对应的主题已经创建出来:

向kafka集群中发送消息,并消费消息:

消费者:

package com.example.springboot02kafkabase.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class EventConsumer {
    @KafkaListener(topics = {"clusterTopic"},groupId = "clusterGroup")
    public void onEventB(ConsumerRecord<String, String> consumerRecord){
        System.out.println(Thread.currentThread().getName() + "---消费消息,consumerRecord=" + consumerRecord);
    }
}

生产者:

package com.example.springboot02kafkabase.producer;

import jakarta.annotation.Resource;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class EventProducer {
    //引入spring-kakfa依赖后,SpringBoot就会自动装配KakfaTemplate对象到Ioc容器中
    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;


    public void send(){
        //向TopicA发送消息hello kafka
        for (int i = 0; i < 2; i++) {
            kafkaTemplate.send("clusterTopic","k" + i,"hello kafka");
        }
    }
}

测试类:

package com.example.springboot02kafkabase;

import com.example.springboot02kafkabase.producer.EventProducer;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class Springboot02KafkaBaseApplicationTests {

    @Resource
    private EventProducer eventProducer;

    @Test
    public void test01(){
        eventProducer.send();
    }
}

运行test01方法发送消息,最终可以在控制台看到消息被消费:

8.1.2 Kafka集群搭建(基于KRaft方式)

服务器规划:

将三个kafka服务器都安装在同一台机器上,也可以将三个kafka服务器安装在不同的机器上:

  • 如果搭建在三台机器上,则端口号可以搞成一样的;如果搭建在同一台机器上端口号不能一样,否则会造成端口冲突
ip=192.168.11.129:9091   roles=broker,controller   
node.id=1

ip=192.168.11.129:9092   roles=broker,controller   
node.id=2

ip=192.168.11.129:9093   roles=broker,controller  
node.id=3

搭建步骤:

1、准备三个KafkaKafka是一个压缩包,直接解压即可使用,所以我们就解压出三个Kafka即可

2、配置kafka集群的server.properties配置文件:kafka家目录/config/kraft/server.properties

(1)三台分别找到下面的配置项并如下配置,用于指定kafka服务器的id:

  • broker.id=1
  • broker.id=2
  • broker.id=3

2)三台分别找到下面的配置项并如下配置节点的角色:

  • process.roles=broker,controller

3)三台分别配置参与投票的节点

  • controller.quorum.voters=1@192.168.11.129:9081,2@192.168.11.129:9082,3@192.168.11.129:9083

4)三台配置各自监听本机的ip和端口

  • listeners=PLAINTEXT://0.0.0.0:9091,CONTROLLER://0.0.0.0:9081
  • listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9082
  • listeners=PLAINTEXT://0.0.0.0:9093,CONTROLLER://0.0.0.0:9083

5)三台配置对外开放访问的ip和端口

  • advertised.listeners=PLAINTEXT://192.168.11.129:9091
  • advertised.listeners=PLAINTEXT://192.168.11.129:9092
  • advertised.listeners=PLAINTEXT://192.168.11.129:9093

(6)三台分别配置日志目录

  • log.dirs=/tmp/kraft-combined-logs-9091
  • log.dirs=/tmp/kraft-combined-logs-9092
  • log.dirs=/tmp/kraft-combined-logs-9093

3、kafka集群启动

  1. 生成Cluster UUID(集群UUID),在kafka家目录/bin下执行命令: ./kafka-storage.sh random-uuid
  2. 格式化日志目录,在kafka家目录/bin下执行命令(三台kafka都需要执行一下):./kafka-storage.sh format -t 集群UUID -c ../config/kraft/server.properties
  3. 启动Kafka,在kafka家目录/bin下执行命令:./kafka-server-start.sh ../config/kraft/server.properties &
  4. 关闭Kafka,在kafka家目录/bin下执行命令:./kafka-server-stop.sh ../config/kraft/server.properties

4、测试:和使用zookeeper搭建kafka集群的时候一样

8.1.3 Kafka集群架构分析

上图中:

三台kafka组成一个kafka集群,基于一个zookeeper或Kraft运行

  • Topic A在三台kafka上都有,然后这个主题有两个分区,每个分区有三个副本(一个主副本和两个从副本),每个副本都在不同的kafka节点上
  • Topic B在三台kafka上都有,然后这个主题有一个分区,每个分区有三个副本(一个主副本和两个从副本),每个副本都在不同的kafka节点上
  • Topic C在三台kafka上都有,然后这个主题有一个分区,每个分区有三个副本(一个主副本和两个从副本),每个副本都在不同的kafka节点上
  • 在kafka集群中,最终体现形式就是每个主题的每个分区的副本在不同的kafka节点上,从同一个分区的主副本和从副本不会在同一个kafka服务器上。
  • 分区的主副本放在哪个kafka broker中是由kafka内部机制决定的

下面分析下我们用程序创建出来的clusterTopic:

  • 箭头从左往右看:
    • 第一个箭头表示clusterTopic主题
    • 第二个箭头表示clusterTopic主题一共有多少个分区副本
    • 第三个箭头表示clusterTopic主题有多少个分区
    • 第四个箭头表示clusterTopic主题中每个分区的id
    • 第五个箭头表示此分区的主副本在哪台kafka节点上(kafka的broker id)
    • 第六个箭头表示此分区有几个副本

9. Kafka的一些重要概念

kafka服务器 broker

主题 topic

事件  Event message、消息、数据)

生产者 producer

消费者 consumer

消费组 consumer group

分区 partition

偏移量offset(分为生产者偏移量,消费者偏移量)

Replica副本:分为 Leader Replica 和 Follower Replica

ISR副本:在同步中的副本 (In-Sync Replicas)

LEO:日志末端偏移量 (Log End Offset)

HW:高水位值 (High Water mark)

9.1 ISR副本

ISR副本:在同步中的副本 (In-Sync Replicas),包含了Leader副本和所有与Leader副本保持同步的Follower副本

写请求首先由 Leader 副本处理,之后 Follower 副本会从 Leader 上拉取写入的消息,这个过程会有一定的延迟,导致 Follower 副本中保存的消息略少于 Leader 副本,但是只要没有超出阈值都可以容忍,但是如果一个 Follower 副本出现异常,比如宕机、网络断开等原因长时间没有同步到消息,那这个时候,Leader就会把它踢出去,Kafka 通过ISR副本集合来维护一个“可用且消息量与Leader相差不多的副本集合,它是整个副本集合的一个子集”

在Kafka中,一个副本要成为ISR(In-Sync Replicas)副本,需要满足一定条件:

1、Leader副本本身就是一个ISR副本;

2、Follower副本最后一条消息的offset与Leader副本的最后一条消息的offset之间的差值不能超过指定的阈值,超过阈值则该Follower副本将ISR列表剔除

  • replica.lag.time.max.ms:默认是30秒;如果该Follower在此时间间隔内一直没有追上过Leader副本的所有消息,则该Follower副本就会被剔除ISR列表
  • replica.lag.max.messages:落后了多少条消息该Follower副本就会被剔除ISR列表该配置参数现在新版本的Kafka已经过时了

9.2 LEO

日志末端偏移量 (Log End Offset)记录该副本消息日志(log)中下一条消息的偏移量注意是下一条消息,也就是说,如果LEO=10,那么表示该副本保存了偏移量值是[0, 9]的10条消息;

9.3 HW

(High Watermark),即高水位值,它代表一个偏移量offset信息,表示从副本复制主副本消息的复制进度,也就是从副本复制主副本中的消息已经复制到哪个位置了。即在HW之前的所有消息都已经被成功写入副本中并且可以在所有的副本中找到,因此,消费者可以安全地消费这些已成功复制的消息。

对于同一个副本而言,小于等于HW值的所有消息都被认为是“已备份”的(replicated),消费者只能拉取到这个offset之前的消息,确保了数据的可靠性;

9.4 ISR、HW、LEO的关系


网站公告

今日签到

点亮在社区的每一天
去签到