kafka消费者接收不到消息

发布于:2024-03-04 ⋅ 阅读:(53) ⋅ 点赞:(0)

背景:

   对kafka消息进行监听,生产者发了消息,但是消费端没有接到消息,监听代码

消费端,kafka配置

spring.kafka.bootstrap-servers=kafka.cestc.dmp:9591

spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="Kafka#Cestc2021";

spring.kafka.properties.security.protocol=SASL_PLAINTEXT

spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256

#=============== provider =======================

spring.kafka.producer.retries=0

# 每次批量发送消息的数量

spring.kafka.producer.batch-size=16384

spring.kafka.producer.buffer-memory=33554432

# 指定消息key和消息体的编解码方式

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer =======================

# 指定默认消费者group id

spring.kafka.consumer.group-id=dq

spring.kafka.consumer.auto-offset-reset=latest

spring.kafka.consumer.enable-auto-commit=true

spring.kafka.consumer.auto-commit-interval=100

# 指定消息key和消息体的编解码方式

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

@KafkaListener(groupId = "${spring.kafka.consumer.group-id:dq}",topics = {"t_dq_rwzt_topic"})
public ReturnT<String> listenKafka2(String records, Acknowledgment ack) {

}

offset explorer发现生产者发送了消息,offset是0

问题解决:

后来查看生产者kafka配置,发现他们的enable-auto-commit是false:

spring.kafka.consumer.enable-auto-commit=false

修改kafka配置

spring.kafka.consumer.enable-auto-commit=false

# 在侦听器容器中运行的线程数

spring.kafka.listener.concurrency=5

# listner负责ack,每调用commit方法,立即向服务器提交

spring.kafka.listener.ack-mode=manual_immediate

本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到