SpringBoot集成Kafka实战应用

发布于:2025-09-03 ⋅ 阅读:(13) ⋅ 点赞:(0)

目录

使用Kafka-Client实现消息收发

引入依赖

发送端:

消费端:

SpringBoot集成

引入maven依赖

消费端


在上一篇我们深度解析了Kafka的运行操作原理以及集群消息消费机制等,请点击下方链接获取

Kafka消息队列深度解析与实战指南

        本篇我们将着重实战

  • 使用Kafka-Client实现消息收发

引入依赖

<dependency>

    <groupId>org.apache.kafka</groupId>

    <artifactId>kafka-clients</artifactId>

    <version>3.0.0</version>

</dependency>

发送端:

import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import com.google.gson.Gson;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@SpringBootTest
class KafkaProducerTests {
    private final static String TOPIC_NAME = "muse-rp";
    private final static Integer PARTITION_ONE = 0;
    private final static Integer PARTITION_TWO = 1;
    private final static Gson GSON = new Gson();

    /**
     * 同步阻塞——消息发送
     */
    @Test
    void testBlockingSendMsg() {
        /** 初始化生产者属性信息 */
        Properties properties = initProducerProp();

        /** 创建消息发送的客户端 */
        Producer<Integer, String> producer = new KafkaProducer<>(properties);
        Message message;
        for (int i=0; i< 3; i++) {
            /** 构造消息 */
            message = new Message(i, "BLOCKING_MSG_"+i);

            ProducerRecord<Integer, String> producerRecord;
            int SEND_MSG_METHOD = 0;
            switch (SEND_MSG_METHOD) {
                case 0: /** 【发送方式1】未指定发送的分区 */
                    producerRecord = new ProducerRecord<>(TOPIC_NAME, GSON.toJson(message));
                    break;
                case 1: /** 【发送方式2】未指定发送的分区,根据第二个参数key来判断发送到哪个分区*/
                    producerRecord = new ProducerRecord<>(TOPIC_NAME, message.getMegId(), GSON.toJson(message));
                    break;
                default: /** 【发送方式3】指定发送的分区 */
                    producerRecord = new ProducerRecord(TOPIC_NAME, PARTITION_ONE, message.getMegId(), GSON.toJson(message));
            }

            /** 同步阻塞——等待消息发送成功 */
            try {
                Future<RecordMetadata> recordMetadataFuture = producer.send(producerRecord);
                log.info("调用send方法完毕,msg={}", producerRecord.value());
                RecordMetadata recordMetadata = recordMetadataFuture.get();
                log.info("[topic]={}, [partition]={}, [offset]={}", recordMetadata.topic(), recordMetadata.partition(),
                        recordMetadata.offset());
            } catch (Throwable e) {
                log.error("发送消息异常!", e);
            }
        }
        producer.close(); // close方法会阻塞等待之前所有的发送请求完成后再关闭KafkaProducer
    }

    /**
     * 异步回调——消息发送
     */
    @Test
    void testNoBlockingSendMsg() {
        /** 初始化生产者属性信息 */
        Properties properties = initProducerProp();

        /** 创建消息发送的客户端 */
        Producer<Integer, String> producer = new KafkaProducer<>(properties);
        CountDownLatch latch = new CountDownLatch(5);
        Message message;
        for (int i=0; i< 5; i++) {
            message = new Message(i, "NO_BLOCKING_MSG_" + i);

            /** 指定发送的分区 */
            ProducerRecord<Integer, String> producerRecord = new ProducerRecord(TOPIC_NAME, PARTITION_ONE,
                    message.getMegId(), GSON.toJson(message));

            /** 异步回调方式发送消息 */
            producer.send(producerRecord, (metadata, exception) -> {
                if (exception != null) {
                    log.error("消息发送失败!", exception);
                }
                if (metadata != null) {
                    log.info("[topic]={}, [partition]={}, [offset]={}", metadata.topic(), metadata.partition(),
                            metadata.offset());
                }
                latch.countDown();
            });
            log.info("调用send方法完毕,msg={}", producerRecord.value());
        }
        producer.close();
    }

    /**
     * 初始化生产者属性
     */
    private Properties initProducerProp() {
        Properties properties = new Properties();
        // properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095"); // 配置kafka的Broker列表
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        /**
         * 发出消息持久化机制参数
         * acks=0:  表示producer不需要等待任何broker确认收到消息的ACK回复,就可以继续发送下一条消息。性能最高,但是最容易丢失消息
         * acks=1:  表示至少等待leader已经成功将数据写入本地log,但是不需要等待所有follower都写入成功,就可以继续发送下一条消息。
         *          这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息就会丢失。
         * acks=-1: 表示kafka ISR列表中所有的副本同步数据成功,才返回消息给客户端,这是最强的数据保证。min.insync.replicas 这个配置是
         *          用来设置同步副本个数的下限的, 并不是只有 min.insync.replicas 个副本同步成功就返回ack。而是,只要acks=all就意味着
         *          ISR列表里面的副本必须都要同步成功。
         */
        properties.put(ProducerConfig.ACKS_CONFIG, "1");

        /**
         * 发送失败重试的次数,默认是间隔100ms
         * 重试能保证消息发送的可靠性,但是也可能造成消息重复发送,所以需要在消费者端做好幂等性处理
         */
        properties.put(ProducerConfig.RETRIES_CONFIG, 3); // 失败重试3次
        properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300); // 重试间隔300ms

        /**
         * 设置发送消息的本地缓冲区
         * 如果设置了该缓冲区,消息会先发送到本地缓冲区,可以提高消息发送性能,默认值为32MB
         */
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32*1024*1024);

        /**
         * 设置批量发送消息的大小
         * kafka本地线程会从缓冲区去取数据,然后批量发送到Broker,默认值16KB,即:一个批次满足16KB就会发送出去。
         */
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);

        /**
         * 默认值为0,表示消息必须立即被发送,但这样会影响性能
         * 一般设置10ms左右,也就是说这个消息发送完后会进入本地的一个批次中,如果10ms内,这个批次满足了16KB,那么就会随着批次一起被发送出去
         * 如果10ms内,批次没满,那么也必须要把消息发送出去,不能让消息的发送延迟时间太长
         */
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 10);

        /** 把发送的key和消息value从字符串序列化为字节数组 */
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return properties;
    }

}

消费端:

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import com.google.gson.Gson;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@SpringBootTest
class KafkaConsumerTests {
    private final static String TOPIC_NAME = "muse-rp";
    private final static String CONSUMER_GROUP_NAME = "museGroup";
    private final static Integer PARTITION_ONE = 0;
    private final static Gson GSON = new Gson();

    /**
     * 自动提交offset
     */
    @Test
    void testAutoCommitOffset() throws Throwable {
        Properties properties = initConsumerProp();
        /** 是否自动提交offset,默认:true*/
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        /** 自动提交offset的时间间隔 */
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        /** 配置Rebalance策略 */
//        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, Arrays
//                .asList(RangeAssignor.class, CooperativeStickyAssignor.class));
        /** 创建消息发送的客户端 */
        Consumer<Integer, String> consumer = new KafkaConsumer<>(properties);
        int RECV_MSG_METHOD = 0;
        switch (RECV_MSG_METHOD) {
            case 0: /** 【接收方式1】未指定接收的分区 */
                consumer.subscribe(Lists.newArrayList(TOPIC_NAME));
                break;
            case 1: /** 【接收方式2】指定分区消费 */
                consumer.assign(Lists.newArrayList(new TopicPartition(TOPIC_NAME, PARTITION_ONE)));
                break;
            case 2: /** 【接收方式3】指定从头开始消费 */
                consumer.assign(Lists.newArrayList(new TopicPartition(TOPIC_NAME, PARTITION_ONE)));
                consumer.seekToBeginning(Lists.newArrayList(new TopicPartition(TOPIC_NAME, PARTITION_ONE)));
                break;
            default: /**【接收方式4】指定分区和offset进行消费*/
                consumer.assign(Lists.newArrayList(new TopicPartition(TOPIC_NAME, PARTITION_ONE)));
                consumer.seek(new TopicPartition(TOPIC_NAME, PARTITION_ONE), 10);
        }
        ConsumerRecords<Integer, String> records;
        while (true) {
            /** 长轮询的方式拉取消息 */
            records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord record : records) {
                log.info(" [topic]={}, [partition]={}, [offset]={}, [value]={}", record.topic(), record.partition(),
                        record.offset(), record.value());
            }
            Thread.sleep(3000);
        }
    }

    /**
     * 手动提交offset
     */
    @Test
    void testManualCommitOffset() throws Throwable {
        Properties properties = initConsumerProp();
        /** 是否自动提交offset,默认:true*/
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        /** 创建消息发送的客户端 */
        Consumer<Integer, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Lists.newArrayList(TOPIC_NAME));
        // consumer.assign(Lists.newArrayList(new TopicPartition(TOPIC_NAME, PARTITION_ONE)));

        ConsumerRecords<Integer, String> records;
        while (true) {
            /** 长轮询的方式拉取消息 */
            records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord record : records) {
                log.info(" [topic]={}, [partition]={}, [offset]={}, [value]={}", record.topic(), record.partition(),
                        record.offset(), record.value());
            }

            boolean isSync = true;
            if (records.count() > 0) {
                if (isSync) {
                    /** 【手动同步提交offset】当前线程会阻塞直到offset提交成功;常用同步提交方式 */
                    consumer.commitSync();
                } else {
                    /** 【手动异步提交offset】当前线程提交offset不会阻塞,可以继续执行后面的逻辑代码 */
                    consumer.commitAsync((offsets, exception) -> {
                        log.error("offset={}", GSON.toJson(offsets));
                        if (exception != null) {
                            log.error("提交offset发生异常!", exception);
                        }
                    });
                }
            }
            Thread.sleep(1000);
        }
    }


    /**
     * 初始化消费者配置
     */
    private Properties initConsumerProp() {
        Properties properties = new Properties();
        // 配置kafka的Broker列表
        // properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095");
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        /** 配置消费组——museGroup */
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);

        /**
         * offset的重置策略——例如创建一个新的消费组,offset是不存在的,如何对offset赋值消费
         * latest(默认):只消费自己启动之后发送到主题的消息。
         * earliest:第一次从头开始消费,以后按照消费offset记录继续消费。
         */
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        /** Consumer给Broker发送心跳的时间间隔 */
        properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);

        /** 如果超过10秒没有接收到消费者的心跳,则会把消费者踢出消费组,然后重新进行rebalance操作,把分区分配给其他消费者 */
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10*1000);

        /** 一次poll最大拉取消息的条数,可以根据消费速度的快慢来设置 */
        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);

        /** 如果两次poll的时间超出了30秒的时间间隔,kafka会认为整个Consumer的消费能力太弱,会将它踢出消费组。将分区分配给其他消费者 */
        properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30*1000);

        /** key和value的反序列化 */
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        return properties;
    }

}
  • SpringBoot集成

引入maven依赖

<dependency>

    <groupId>org.springframework.kafka</groupId>

    <artifactId>spring-kafka</artifactId>

</dependency>
import javax.annotation.Resource;

import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import com.muse.springbootdemo.entity.Message;

import lombok.extern.slf4j.Slf4j;


@Slf4j
@Service
public class ProducerService {
    private final static String TOPIC_NAME = "muse-rp";
    private final static Integer PARTITION_ONE = 0;
    private final static Integer PARTITION_TWO = 1;

    @Resource
    private KafkaTemplate<String, Message> kafkaTemplate;

    /**
     * 同步阻塞——消息发送
     */
    public void blockingSendMsg() throws Throwable {
        Message message;
        for (int i=0; i< 5; i++) {
            message = new Message(String.valueOf(i), "BLOCKING_MSG_SPRINGBOOT_" + i);
            ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send(TOPIC_NAME, PARTITION_ONE,
                    "" + message.getMegId(), message);
            SendResult<String, Message> sendResult = future.get();
            RecordMetadata recordMetadata = sendResult.getRecordMetadata();
            log.info("---BLOCKING_MSG_SPRINGBOOT--- [topic]={}, [partition]={}, [offset]={}", recordMetadata.topic(),
                    recordMetadata.partition(), recordMetadata.offset());
        }
    }

    /**
     * 异步回调——消息发送
     */
    public void noBlockingSendMsg() {
        Message message;
        for (int i=0; i< 5; i++) {
            message = new Message(String.valueOf(i), "NO_BLOCKING_MSG_SPRINGBOOT_" + i);
            ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send(TOPIC_NAME, PARTITION_ONE,
                    "" + message.getMegId(), message);
            future.addCallback(new ListenableFutureCallback<SendResult<String, Message>>() {
                @Override
                public void onFailure(Throwable ex) {
                    log.error("消息发送失败!", ex);
                }

                @Override
                public void onSuccess(SendResult<String, Message> result) {
                    RecordMetadata recordMetadata = result.getRecordMetadata();
                    log.info("---NO_BLOCKING_MSG_SPRINGBOOT---[topic]={}, [partition]={}, [offset]={}",
                            recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
                }
            });
        }
    }
}

消费端

package com.muse.springbootdemo.service;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Service;

import com.muse.springbootdemo.entity.Message;

import lombok.extern.slf4j.Slf4j;


@Slf4j
@Service
public class ConsumerService {

    private final static String TOPIC_NAME = "muse-rp";
    private final static String CONSUMER_GROUP_NAME = "museGroup";

    /**
     * 消息消费演示
     */
    @KafkaListener(topics = TOPIC_NAME, groupId = CONSUMER_GROUP_NAME)
    public void listenGroup(ConsumerRecord<String, Message> record) {
        log.info(" [topic]={}, [partition]={}, [offset]={}, [value]={}", record.topic(), record.partition(),
                record.offset(), record.value());
    }

}

下一篇将解析关于Kafka应用过程中的常见问题及大厂高频面试题

  •  包括 1)防止消息丢失;2)防止重复消费通过幂等处理;3)顺序消费需单分区单消费者;4)消息积压时提升消费能力;5)延迟队列通过时间判断实现;6)高吞吐依靠页面缓存+顺序写+零拷贝技术等问题 将在12h内更新
  • Kafka应用过程中的高频问题


网站公告

今日签到

点亮在社区的每一天
去签到