Kafka
kafka是一种消息队列,主要用来大量数据状态下的消息队列,一般用来做日志的处理,既然是效力队列,那么kafka也就拥有消息队列的相应的特性了。
消息队列的好处
解耦和
耦合 的状态标识当你实现某个功能的时候,是直接接入当前接口,而利用消息队列,可以将相应的消息发送到消息队列,这样的话,如果接口出了问题,将不会影响到当前的功能
异步处理
异步处理代替了之前的同步处理,异步处理不需要让流程走完就返回结果,可以将消息发送到消息队列中,然后返回结果,剩下让其他业务处理接口从消息队列中拉取消费处理即可
流量削峰
高流量的时候,使用消息队列作为中间件可以将流量的高峰保存在消息队列中,从而防止了系统的高请求,减轻服务器的请求处理压力
kafka的消费模式
一种是一对一的消费,也即点对点的通信,即一个发送一个接收,第二种为一对多的消费,即一个消息发送到消息队列,消费者根据消息队列的拉取信息消费。
消息生产者发布消息到Queue队列中,通知消费者从队列中拉取消息进行消费,消息被消费之后则删除,Queue支持多个消费者,但对于一条消息而言,只有一个消费者可以消费,即一条消息只能被一个消费者消费。
这种模式也成为发布/订阅模式,即利用Topic存储消息,消息生产者将消息发布到Topic种,同时有多个消费者订阅此topic,消费者可以从中消费消息,注意发布到Topic中的消息会被多个消费者消费,消费者消费数据后,数据yao'q不会被清除,kafka会默认保留一段时间,然后再删除。
Kafka的基础架构
kafka像其他Mq一样,也有自己的基础构架,主要存在生产者Producer、kafka集群Broker、消费者Consumer、注册消息Zookeeper
Producer:消费生产者,向Kafka中发布消息的角色
Consumer:消息消费者,即从kafka中拉取消息消费的客户端
Consumer Group:消费者组,消费者组则是一组中存在多个消费者,消费者消费Broker中当前Topic的不同分区中的消息,消费者组之间互不影响,所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者,某一分区中的消息只能够一个消费者组中的一个消费者所消费
Broker:经纪人,一台Kafka服务器就是一个Broker,一个集群由多个Broker组成,一个Broker可以容纳多个Topic
Topic主题,可以理解为一个队列,生产者和消费者都是面向一个Topic
Partition:分区,为了实现扩展性,一个非常大的Topic可以分布到多个Broker上,一个Topic可以分为多个Partition,每个Partition是一个有序的队列(分区有序,不能保证全局有序)
Replica:副本Replication,为保证集群中某个节点发生故障,节点上的Partition数据不丢失,Kafka可以正常的工作,Kafka提供了副本机制,一个Topic的每个分区有若干个副本,一个Leader和多个Follower
Leader:每个分区多个副本的主角色,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。
Follower:每个分区多个副本的从角色,实时的从Leader中同步数据,保持和Leader数据的同步,Leader发生故障的时候,某个Follower会成为新的Leader。
一个Topic会产生多个分区Partition,分区中分为Leader和Follower,消息一般发送到Leader,Follower通过数据的同步与Leader爆出同步,消费的话也是在Leader中消费,如果多个消费者,则分别消费Leader和各个Follower中的消息,当Leader发生故障的时候,某个Follower会成为主节点,此时会对齐消息的偏移量。
文件存储
kafka文件存储也是通过本地落盘的方式存储的,主要通过相应的log与index等文件保存具体的消息文件
生产者不断向log文件追加消息文件,为了防止log文件过大导致定位效率低下,kafka的log文件以1G为一个分界点,当.log文件大小超过1G时,此时会创建一个新的.log文件,同时为了快速定位大文件中消息位置,kafka采取了分片和索引的机制来加速定位
在kafka的存储log的地方,即文件的地方,会存在消费的偏移量以及具体的分区信息,分区信息的话主要包括.index和.log文件组成。
advertised.listeners=PLAINTEXT://112.126.74.249:9092
Springboot整合kafka
在项目中连接kafka,因为是外网,首先要开放kafka配置文件中的如下配置(其中ip为公网ip)
advertised.listeners=PLAINTEXT://112.126.74.249:9092
在执行代码kafkaTemplate.send("topic1",normalMessage)发送消息时,kafka会帮我们自动完成topic的创建工作,但这种情况下创建的topic默认只有一个分区,分区也没有副本,所以,可以在项目中新建一个配置类专门用来初始化topic
@Configuration public class KafkaInitialConfiguration { // 创建一个名为testtopic的Topic并设置分区数为8,分区副本数为2 @Bean public NewTopic initialTopic() { return new NewTopic("testtopic",8, (short) 2 ); } • // 如果要修改分区数,只需修改配置值重启项目即可 // 修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小 @Bean public NewTopic updateTopic() { return new NewTopic("testtopic",10, (short) 2 ); } }
新建项目
引入pom依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
application.propertise配置
###########【Kafka集群】########### spring.kafka.bootstrap-servers=112.126.74.249:9092,112.126.74.249:9093 ###########【初始化生产者配置】########### # 重试次数 spring.kafka.producer.retries=0 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1) spring.kafka.producer.acks=1 # 批量大小 spring.kafka.producer.batch-size=16384 # 提交延时 spring.kafka.producer.properties.linger.ms=0 # 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka # linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了 • # 生产端缓冲区大小 spring.kafka.producer.buffer-memory = 33554432 # Kafka提供的序列化和反序列化类 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer # 自定义分区器 # spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner • ###########【初始化消费者配置】########### # 默认的消费组ID spring.kafka.consumer.properties.group.id=defaultConsumerGroup # 是否自动提交offset spring.kafka.consumer.enable-auto-commit=true # 提交offset延时(接收到消息后多久提交offset) spring.kafka.consumer.auto.commit.interval.ms=1000 # 当kafka中没有初始offset或offset超出范围时将自动重置offset # earliest:重置为分区中最小的offset; # latest:重置为分区中最新的offset(消费分区中新产生的数据); # none:只要有一个分区不存在已提交的offset,就抛出异常; spring.kafka.consumer.auto-offset-reset=latest # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作) spring.kafka.consumer.properties.session.timeout.ms=120000 # 消费请求超时时间 spring.kafka.consumer.properties.request.timeout.ms=180000 # Kafka提供的序列化和反序列化类 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 消费端监听的topic不存在时,项目启动会报错(关掉) spring.kafka.listener.missing-topics-fatal=false # 设置批量消费 # spring.kafka.listener.type=batch # 批量消费每次最多消费多少条消息 # spring.kafka.consumer.max-poll-records=50
实体类
package com.ywj.kafuka.beans;
import lombok.Data;
import java.util.Date;
/**
* @ClassName Meagess
* @Author ywj
* @Describe
* @Date 2019/3/19 0019 19:42
*/
@Data
public class Message {
private Long id; //id
private String msg; //消息
private Date sendTime; //时间戳
}
消息发送者
package com.ywj.kafuka.provider;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.ywj.kafuka.beans.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.UUID;
/**
* @ClassName KafkaSender
* @Author ywj
* @Describe
* @Date 2019/3/19 0019 19:48
*/
@Component
@Slf4j
public class KafkaSender {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private Gson gson = new GsonBuilder().create();
//发送消息方法
public void send() {
Message message = new Message();
message.setId(System.currentTimeMillis());
message.setMsg(UUID.randomUUID().toString());
message.setSendTime(new Date());
log.info("+++++++++++++++++++++ message = {}", gson.toJson(message));
kafkaTemplate.send("message", gson.toJson(message));
}
}
message是kafka里的topic,这个topic在java程序中不需要提前在kafka中设置的,因为他会在发送的时候自动创建你设置的topic,gson.toJson(message)是消息内容
带回调的生产者
kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功或失败时做补偿处理,有两种写法
@GetMapping("/kafka/callbackOne/{message}")
public void sendMessage2(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("topic1", callbackMessage).addCallback(success -> {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
}, failure -> {
System.out.println("发送消息失败:" + failure.getMessage());
});
}
@GetMapping("/kafka/callbackTwo/{message}")
public void sendMessage3(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("topic1", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
System.out.println("发送消息失败:"+ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
}
});
}
消息接收者
package com.ywj.kafuka.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* @ClassName KafkaReceiver
* @Author ywj
* @Describe
* @Date 2019/3/19 0019 19:51
*/
@Component
@Slf4j
public class KafkaReceiver {
@KafkaListener(topics = {"message"})
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info("----------------- record =" + record);
log.info("------------------ message =" + message);
}
}
}
接收消息直接使用@KafkaListener注解即可,并在监听中设置监听的topic,topics是一个数组所以可以绑定多个主题,上面的代码中修改为@KafkaListener(topics={"message","kafka"})就可以同时监听两个topic的消息了,需要注意的是;这里的topic需要和消息发送类KafkaSender.java中设置的topic一致
如果想指定topic、指定partition、指定offset来消费
@KafkaListener(id = "consumer1",groupId = "felix-group",topicPartitions = {
@TopicPartition(topic = "topic1", partitions = { "0" }),
@TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))
})
public void onMessage2(ConsumerRecord<?, ?> record) {
System.out.println("topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value());
}
id:消费者id
groupid:消费组id
topics:监听的topic,可监听多个
topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offect监听
上面onMessage2监听的含义:监听topic1的0号分区,同时监听topic2的0号分区和topic2的1号分区里面offset从8开始的消息
topics和topicPartitions不能同时使用
批量消费
设置application.propertise开启批量消费
# 设置批量消费 spring.kafka.listener.type=batch # 批量消费每次最多消费多少条消息 spring.kafka.consumer.max-poll-records=50
接收消息时用List来接收,监听代码如下
@KafkaListener(id = "consumer2",groupId = "felix-group", topics = "topic1")
public void onMessage3(List<ConsumerRecord<?, ?>> records) {
System.out.println(">>>批量消费一次,records.size()="+records.size());
for (ConsumerRecord<?, ?> record : records) {
System.out.println(record.value());
}
}
ConsumerAwareListenerErrorHandler异常处理器
通过异常处理器,可以处理consumer在消费时发生的异常
新建一个ConsumerAwareListenerErrorHandler类型的异常处理方法,用@Bean注入,BeanName默认就是方法名,然后我们将这个异常处理器的BeanName方到@KafkaListener注解的errorHandler属性里面,当监听抛出异常的时候,则会自动调用异常处理器
// 新建一个异常处理器,用@Bean注入
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
return (message, exception, consumer) -> {
System.out.println("消费异常:"+message.getPayload());
return null;
};
}
•
// 将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面
@KafkaListener(topics = {"topic1"},errorHandler = "consumerAwareErrorHandler")
public void onMessage4(ConsumerRecord<?, ?> record) throws Exception {
throw new Exception("简单消费-模拟异常");
}
•
// 批量消费也一样,异常处理器的message.getPayload()也可以拿到各条消息的信息
@KafkaListener(topics = "topic1",errorHandler="consumerAwareErrorHandler")
public void onMessage5(List<ConsumerRecord<?, ?>> records) throws Exception {
System.out.println("批量消费一次...");
throw new Exception("批量消费-模拟异常");
}
消息过滤器
消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。
配置消息过滤只需要为监听器工厂配置一个RecordFilterStrategy(消息过滤策略),返回true的时候消息会被抛弃,返回false时,消息能正常抵达监听容器
@Component
public class KafkaConsumer {
@Autowired
ConsumerFactory consumerFactory;
// 消息过滤器
@Bean
public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
// 被过滤的消息将被丢弃
factory.setAckDiscarded(true);
// 消息过滤策略
factory.setRecordFilterStrategy(consumerRecord -> {
if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {
return false;
}
//返回true消息则被过滤
return true;
});
return factory;
}
•
// 消息过滤监听
@KafkaListener(topics = {"topic1"},containerFactory = "filterContainerFactory")
public void onMessage6(ConsumerRecord<?, ?> record) {
System.out.println(record.value());
}
}
上面实现了一个过滤奇数,接收偶数的过滤策略,我们向topic1发送0~99共100条消息,可以看到监听器只消费了偶数
消息转发
存在应用从A获取到消息,经过处理后转发到TopicB,再由应用B监听处理消息,即一个应用处理完成后将该消息转发至其他应用,完成消息转发。
在springboot集成kafka实现消息的转发也很简单,只需要通过一个@SendTo注解,被注解方法的return值即转发的消息内容
@KafkaListener(topics = {"topic1"})
@SendTo("topic2")
public String onMessage7(ConsumerRecord<?, ?> record) {
return record.value()+"-forward message";
}
定时启动,停止监听器
默认情况下,当消费者项目启动的时候,监听器就开始工作,监听消费发送到指定topic的消息,那如果我们不想让监听器立即工作,想让他在我们指定的时间开始工作,或者在我们指定的时间点停止工作,使用kafkaListenerEndpointRegistry
禁止监听器自启动
创建两个定时任务,一个用来在指定时间点启动定时器,另一个在指定时间点停止定时器
新建一个定时任务类,用注解@EnableScheduling声明,KafkaListenerEndpointRegistry在springIO中已经被注册为Bean,直接注入,设置禁止KafkaListener自启动
@EnableScheduling
@Component
public class CronTimer {
/**
* @KafkaListener注解所标注的方法并不会在IOC容器中被注册为Bean,
* 而是会被注册在KafkaListenerEndpointRegistry中,
* 而KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean
**/
@Autowired
private KafkaListenerEndpointRegistry registry;
@Autowired
private ConsumerFactory consumerFactory;
// 监听器容器工厂(设置禁止KafkaListener自启动)
@Bean
public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
container.setConsumerFactory(consumerFactory);
//禁止KafkaListener自启动
container.setAutoStartup(false);
return container;
}
// 监听器
@KafkaListener(id="timingConsumer",topics = "topic1",containerFactory = "delayContainerFactory")
public void onMessage1(ConsumerRecord<?, ?> record){
System.out.println("消费成功:"+record.topic()+"-"+record.partition()+"-"+record.value());
}
// 定时启动监听器
@Scheduled(cron = "0 42 11 * * ? ")
public void startListener() {
System.out.println("启动监听器...");
// "timingConsumer"是@KafkaListener注解后面设置的监听器ID,标识这个监听器
if (!registry.getListenerContainer("timingConsumer").isRunning()) {
registry.getListenerContainer("timingConsumer").start();
}
//registry.getListenerContainer("timingConsumer").resume();
}
// 定时停止监听器
@Scheduled(cron = "0 45 11 * * ? ")
public void shutDownListener() {
System.out.println("关闭监听器...");
registry.getListenerContainer("timingConsumer").pause();
}
}
启动项目,触发生产者向topic1发送消息,可以看到consumer没有消费,因为这时监听器还没有开始工作
11:42分监听器开始工作,消费消息
11:45监听器停止工作
SpringbootkafkaApplication
package com.ywj.kafuka;
import com.ywj.kafuka.beans.Message;
import com.ywj.kafuka.consumer.KafkaReceiver;
import com.ywj.kafuka.provider.KafkaSender;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
@SpringBootApplication
public class SpringbootkafukaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(SpringbootkafukaApplication.class, args);
KafkaSender sender = context.getBean(KafkaSender.class);
for (int i = 0; i < 3; i++) {
//调用消息发送类中的消息发送方法
sender.send();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
自定义分区器
kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区,就是所谓的分区策略,kafka提供了默认的分区策略,同时也支持自定义分区策略,其路由机制为:
若发送消息时制定了分区(即自定义分区策略),则直接将消息append到指定分区
若发送消息时未指定patition,但指定了key(kafka允许为每条消息设置一个key),则对key值就行hash计算,根据计算结果路由到指定分区,这种情况下可以保证同一个key的所有消息都进入到相同的分区
patition和key都未指定,则使用kafka默认的分区策略,轮循选出一个patition
自定义分区策略,讲消息发送到我们指定的partition,首先新建一个分区器类实现partitioner接口,重写方法,其中partition方法的返回值就表示将消息发送到几号分区
public class CustomizePartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 自定义分区规则(这里假设全部发到0号分区)
// ......
return 0;
}
•
@Override
public void close() {
•
}
•
@Override
public void configure(Map<String, ?> configs) {
•
}
}
在application.propertise中配置自定义分区器,配置的值就是分区器类的全路径名
public class CustomizePartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 自定义分区规则(这里假设全部发到0号分区)
// ......
return 0;
}
•
@Override
public void close() {
•
}
•
@Override
public void configure(Map<String, ?> configs) {
•
}
}
如果在发送消息时需要创建事务,可以使用KafkaTemplate 的executelnTransaction方法来声明事务(测试未成功)
@GetMapping("/kafka/transaction")
public void sendMessage7(){
// 声明事务:后面报错消息不会发出去
kafkaTemplate.executeInTransaction(operations -> {
operations.send("topic1","test executeInTransaction");
throw new RuntimeException("fail");
});
•
// 不声明事务:后面报错但前面消息已经发送成功了
kafkaTemplate.send("topic1","test executeInTransaction");
throw new RuntimeException("fail");
}
Kafka导致重复消费原因和解决
问题根本
已经消费了数据,但是offset还没来得及提交(比如kafka没有或者不知道该数据已经被消费)
强行kill线程,导致消费后的数据,offset没有提交(消息系统宕机,重启等)
设置offset为自动提交,关闭kafka时,如果在close之前,调用consumer.unsubscribe()则有可能部分offset没提交,下次重启会重复消费 例如:
try { consumer.unsubscribe(); } catch (Exception e) { } try { consumer.close(); } catch (Exception e) { }
上面代码会导致部分offset没提交,下次启动时会重复消费
解决方法:设置offset自动提交给false
整合了spring配置的修改如下配置
spring配置
spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.auto-offset-reset=latest
整合了API方式的修改enable.auto.commit为false
API配
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false");
一旦设置了enable.auto.commit为true,kafka会保证在开始调用poll方法时,提交上次poll返回的所有消息。从顺序上来说。poll方法的逻辑是先提交上一批消息的位移,在处理下一批消息,因此他能保证不出现消息丢失的情况。
(最常见):消费后的数据,当offset还没有提交时,partition就断开连接,比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30s),那么就会re—blance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费 offset机制:使得kafka在消费的过程中即使挂了或者引发再均衡问题重新分配Partation,当下次重新恢复消费时仍然可以知道从哪里开始消费。好比看一本书中的数千标记,每次通过书签标记(offset)就能快速找到该从哪里开始消费
kafka对于offset的处理有两种提交方式:
自动提交(默认) Kafka中偏移量的自动提交是由参数enable_auto_commit和auto_commit_interval_ms控制的,当enable_auto_commit=True时,Kafka在消费的过程中会以频率为auto_commit_interval_ms向Kafka自带的topic(__consumer_offsets)进行偏移量提交,具体提交到哪个Partation是以算法:partation=hash(group_id)%50来计算的。
对于自动提交偏移量,如果auto_commit_interval_ms的值设置的过大,当消费者再自动提交偏移量之前异常退出,将导致kafka未提交偏移量,进而出现重复消费的问题,所以建议auto_commit_interval_ms的值越小越好
手动提交(可以灵活的控制offset)
对于手动提交offset主要有3种方式:1 同步提交 2 异步提交 3 同步+异步 组合的方式提交
同步模式下提交失败的时候一致尝试提交,直到遇到无法重试的情况下才会结束,同时同步方式下消费者线程在拉取消息会被阻塞,在broker对提交的请求做出相应之前,会一直阻塞直到偏移量提交操作成功或者在提交过程中发生异常,限制了消息的吞吐量。每轮循一个批次,手动提交一次,只有当前批次的消息提交完成时才会触发poll来获取下一轮的消息
异步手动提交偏移量+回调函数
异步手动提交offset时,消费者线程不会阻塞,提交失败的时候也不会进行重试,并且可以配合回调函数在brocker做出响应的时候记录错误信息
对于异步提交,由于不会进行失败重试,当消费者异常关闭或者触发了再均衡前,如果偏移量还未提交就会造成偏移量丢失。
异步+同步 组合的方式提交偏移量
针对异步提交偏移量丢失的问题,通过对消费者进行异步批次提交并且在关闭同步提交的方式,这样即使上一次的异步提交失败,通过同步提交还能进行补救,同步会一致重试,直到提交成功。
当消费者重新分配partition时,可能出现从头开始消费的情况,导致重发问题