Kafka学习

发布于:2024-12-18 ⋅ 阅读:(90) ⋅ 点赞:(0)

Kafka

kafka是一种消息队列,主要用来大量数据状态下的消息队列,一般用来做日志的处理,既然是效力队列,那么kafka也就拥有消息队列的相应的特性了。

消息队列的好处

解耦和

耦合 的状态标识当你实现某个功能的时候,是直接接入当前接口,而利用消息队列,可以将相应的消息发送到消息队列,这样的话,如果接口出了问题,将不会影响到当前的功能

异步处理

异步处理代替了之前的同步处理,异步处理不需要让流程走完就返回结果,可以将消息发送到消息队列中,然后返回结果,剩下让其他业务处理接口从消息队列中拉取消费处理即可

流量削峰

高流量的时候,使用消息队列作为中间件可以将流量的高峰保存在消息队列中,从而防止了系统的高请求,减轻服务器的请求处理压力

kafka的消费模式

一种是一对一的消费,也即点对点的通信,即一个发送一个接收,第二种为一对多的消费,即一个消息发送到消息队列,消费者根据消息队列的拉取信息消费。

消息生产者发布消息到Queue队列中,通知消费者从队列中拉取消息进行消费,消息被消费之后则删除,Queue支持多个消费者,但对于一条消息而言,只有一个消费者可以消费,即一条消息只能被一个消费者消费。

这种模式也成为发布/订阅模式,即利用Topic存储消息,消息生产者将消息发布到Topic种,同时有多个消费者订阅此topic,消费者可以从中消费消息,注意发布到Topic中的消息会被多个消费者消费,消费者消费数据后,数据yao'q不会被清除,kafka会默认保留一段时间,然后再删除。

Kafka的基础架构

kafka像其他Mq一样,也有自己的基础构架,主要存在生产者Producer、kafka集群Broker、消费者Consumer、注册消息Zookeeper

  • Producer:消费生产者,向Kafka中发布消息的角色

  • Consumer:消息消费者,即从kafka中拉取消息消费的客户端

  • Consumer Group:消费者组,消费者组则是一组中存在多个消费者,消费者消费Broker中当前Topic的不同分区中的消息,消费者组之间互不影响,所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者,某一分区中的消息只能够一个消费者组中的一个消费者所消费

  • Broker:经纪人,一台Kafka服务器就是一个Broker,一个集群由多个Broker组成,一个Broker可以容纳多个Topic

  • Topic主题,可以理解为一个队列,生产者和消费者都是面向一个Topic

  • Partition:分区,为了实现扩展性,一个非常大的Topic可以分布到多个Broker上,一个Topic可以分为多个Partition,每个Partition是一个有序的队列(分区有序,不能保证全局有序)

  • Replica:副本Replication,为保证集群中某个节点发生故障,节点上的Partition数据不丢失,Kafka可以正常的工作,Kafka提供了副本机制,一个Topic的每个分区有若干个副本,一个Leader和多个Follower

  • Leader:每个分区多个副本的主角色,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。

  • Follower:每个分区多个副本的从角色,实时的从Leader中同步数据,保持和Leader数据的同步,Leader发生故障的时候,某个Follower会成为新的Leader。

一个Topic会产生多个分区Partition,分区中分为Leader和Follower,消息一般发送到Leader,Follower通过数据的同步与Leader爆出同步,消费的话也是在Leader中消费,如果多个消费者,则分别消费Leader和各个Follower中的消息,当Leader发生故障的时候,某个Follower会成为主节点,此时会对齐消息的偏移量。

文件存储

kafka文件存储也是通过本地落盘的方式存储的,主要通过相应的log与index等文件保存具体的消息文件

生产者不断向log文件追加消息文件,为了防止log文件过大导致定位效率低下,kafka的log文件以1G为一个分界点,当.log文件大小超过1G时,此时会创建一个新的.log文件,同时为了快速定位大文件中消息位置,kafka采取了分片和索引的机制来加速定位

在kafka的存储log的地方,即文件的地方,会存在消费的偏移量以及具体的分区信息,分区信息的话主要包括.index和.log文件组成。

advertised.listeners=PLAINTEXT://112.126.74.249:9092

Springboot整合kafka

  1. 在项目中连接kafka,因为是外网,首先要开放kafka配置文件中的如下配置(其中ip为公网ip)

    advertised.listeners=PLAINTEXT://112.126.74.249:9092

  2. 在执行代码kafkaTemplate.send("topic1",normalMessage)发送消息时,kafka会帮我们自动完成topic的创建工作,但这种情况下创建的topic默认只有一个分区,分区也没有副本,所以,可以在项目中新建一个配置类专门用来初始化topic

    @Configuration
    public class KafkaInitialConfiguration {
        // 创建一个名为testtopic的Topic并设置分区数为8,分区副本数为2
        @Bean
        public NewTopic initialTopic() {
            return new NewTopic("testtopic",8, (short) 2 );
        }
    •
         // 如果要修改分区数,只需修改配置值重启项目即可
        // 修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小
        @Bean
        public NewTopic updateTopic() {
            return new NewTopic("testtopic",10, (short) 2 );
        }
    }

  3. 新建项目

    引入pom依赖

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    application.propertise配置

    ###########【Kafka集群】###########
    spring.kafka.bootstrap-servers=112.126.74.249:9092,112.126.74.249:9093
    ###########【初始化生产者配置】###########
    # 重试次数
    spring.kafka.producer.retries=0
    # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
    spring.kafka.producer.acks=1
    # 批量大小
    spring.kafka.producer.batch-size=16384
    # 提交延时
    spring.kafka.producer.properties.linger.ms=0
    # 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
    # linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
    •
    # 生产端缓冲区大小
    spring.kafka.producer.buffer-memory = 33554432
    # Kafka提供的序列化和反序列化类
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    # 自定义分区器
    # spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
    •
    ###########【初始化消费者配置】###########
    # 默认的消费组ID
    spring.kafka.consumer.properties.group.id=defaultConsumerGroup
    # 是否自动提交offset
    spring.kafka.consumer.enable-auto-commit=true
    # 提交offset延时(接收到消息后多久提交offset)
    spring.kafka.consumer.auto.commit.interval.ms=1000
    # 当kafka中没有初始offset或offset超出范围时将自动重置offset
    # earliest:重置为分区中最小的offset;
    # latest:重置为分区中最新的offset(消费分区中新产生的数据);
    # none:只要有一个分区不存在已提交的offset,就抛出异常;
    spring.kafka.consumer.auto-offset-reset=latest
    # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
    spring.kafka.consumer.properties.session.timeout.ms=120000
    # 消费请求超时时间
    spring.kafka.consumer.properties.request.timeout.ms=180000
    # Kafka提供的序列化和反序列化类
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    # 消费端监听的topic不存在时,项目启动会报错(关掉)
    spring.kafka.listener.missing-topics-fatal=false
    # 设置批量消费
    # spring.kafka.listener.type=batch
    # 批量消费每次最多消费多少条消息
    # spring.kafka.consumer.max-poll-records=50

实体类

package com.ywj.kafuka.beans;
 
import lombok.Data;
 
import java.util.Date;
 
/**
 * @ClassName Meagess
 * @Author ywj
 * @Describe
 * @Date 2019/3/19 0019 19:42
 */
@Data
public class Message {
    private Long id;    //id
 
    private String msg; //消息
 
    private Date sendTime;  //时间戳
 
}

消息发送者

package com.ywj.kafuka.provider;
 
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.ywj.kafuka.beans.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
 
import java.util.Date;
import java.util.UUID;
 
/**
 * @ClassName KafkaSender
 * @Author ywj
 * @Describe
 * @Date 2019/3/19 0019 19:48
 */
@Component
@Slf4j
public class KafkaSender {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
 
    private Gson gson = new GsonBuilder().create();
 
    //发送消息方法
    public void send() {
        Message message = new Message();
        message.setId(System.currentTimeMillis());
        message.setMsg(UUID.randomUUID().toString());
        message.setSendTime(new Date());
        log.info("+++++++++++++++++++++  message = {}", gson.toJson(message));
        kafkaTemplate.send("message", gson.toJson(message));
    }
}

message是kafka里的topic,这个topic在java程序中不需要提前在kafka中设置的,因为他会在发送的时候自动创建你设置的topic,gson.toJson(message)是消息内容

带回调的生产者

kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功或失败时做补偿处理,有两种写法

@GetMapping("/kafka/callbackOne/{message}")
public void sendMessage2(@PathVariable("message") String callbackMessage) {
    kafkaTemplate.send("topic1", callbackMessage).addCallback(success -> {
        // 消息发送到的topic
        String topic = success.getRecordMetadata().topic();
        // 消息发送到的分区
        int partition = success.getRecordMetadata().partition();
        // 消息在分区内的offset
        long offset = success.getRecordMetadata().offset();
        System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
    }, failure -> {
        System.out.println("发送消息失败:" + failure.getMessage());
    });
}
@GetMapping("/kafka/callbackTwo/{message}")
public void sendMessage3(@PathVariable("message") String callbackMessage) {
    kafkaTemplate.send("topic1", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
        @Override
        public void onFailure(Throwable ex) {
            System.out.println("发送消息失败:"+ex.getMessage());
        }
 
        @Override
        public void onSuccess(SendResult<String, Object> result) {
            System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
                    + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
        }
    });
}

消息接收者

package com.ywj.kafuka.consumer;
 
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
 
import java.util.Optional;
 
/**
 * @ClassName KafkaReceiver
 * @Author ywj
 * @Describe
 * @Date 2019/3/19 0019 19:51
 */
@Component
@Slf4j
public class KafkaReceiver {
    @KafkaListener(topics = {"message"})
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
 
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            log.info("----------------- record =" + record);
            log.info("------------------ message =" + message);
        }
    }
 
 
}

接收消息直接使用@KafkaListener注解即可,并在监听中设置监听的topic,topics是一个数组所以可以绑定多个主题,上面的代码中修改为@KafkaListener(topics={"message","kafka"})就可以同时监听两个topic的消息了,需要注意的是;这里的topic需要和消息发送类KafkaSender.java中设置的topic一致

如果想指定topic、指定partition、指定offset来消费

@KafkaListener(id = "consumer1",groupId = "felix-group",topicPartitions = {
        @TopicPartition(topic = "topic1", partitions = { "0" }),
        @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))
})
public void onMessage2(ConsumerRecord<?, ?> record) {
    System.out.println("topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value());
}
  • id:消费者id

  • groupid:消费组id

  • topics:监听的topic,可监听多个

  • topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offect监听

上面onMessage2监听的含义:监听topic1的0号分区,同时监听topic2的0号分区和topic2的1号分区里面offset从8开始的消息

topics和topicPartitions不能同时使用

批量消费

设置application.propertise开启批量消费

# 设置批量消费
spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
spring.kafka.consumer.max-poll-records=50

接收消息时用List来接收,监听代码如下

​
@KafkaListener(id = "consumer2",groupId = "felix-group", topics = "topic1")
public void onMessage3(List<ConsumerRecord<?, ?>> records) {
    System.out.println(">>>批量消费一次,records.size()="+records.size());
    for (ConsumerRecord<?, ?> record : records) {
        System.out.println(record.value());
    }
}

ConsumerAwareListenerErrorHandler异常处理器

通过异常处理器,可以处理consumer在消费时发生的异常

新建一个ConsumerAwareListenerErrorHandler类型的异常处理方法,用@Bean注入,BeanName默认就是方法名,然后我们将这个异常处理器的BeanName方到@KafkaListener注解的errorHandler属性里面,当监听抛出异常的时候,则会自动调用异常处理器

// 新建一个异常处理器,用@Bean注入
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
    return (message, exception, consumer) -> {
        System.out.println("消费异常:"+message.getPayload());
        return null;
    };
}
•
// 将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面
@KafkaListener(topics = {"topic1"},errorHandler = "consumerAwareErrorHandler")
public void onMessage4(ConsumerRecord<?, ?> record) throws Exception {
    throw new Exception("简单消费-模拟异常");
}
•
// 批量消费也一样,异常处理器的message.getPayload()也可以拿到各条消息的信息
@KafkaListener(topics = "topic1",errorHandler="consumerAwareErrorHandler")
public void onMessage5(List<ConsumerRecord<?, ?>> records) throws Exception {
    System.out.println("批量消费一次...");
    throw new Exception("批量消费-模拟异常");
}

消息过滤器

消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。

配置消息过滤只需要为监听器工厂配置一个RecordFilterStrategy(消息过滤策略),返回true的时候消息会被抛弃,返回false时,消息能正常抵达监听容器

@Component
public class KafkaConsumer {
    @Autowired
    ConsumerFactory consumerFactory;
​
    // 消息过滤器
    @Bean
    public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        // 被过滤的消息将被丢弃
        factory.setAckDiscarded(true);
        // 消息过滤策略
        factory.setRecordFilterStrategy(consumerRecord -> {
            if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {
                return false;
            }
            //返回true消息则被过滤
            return true;
        });
        return factory;
    }
•
    // 消息过滤监听
    @KafkaListener(topics = {"topic1"},containerFactory = "filterContainerFactory")
    public void onMessage6(ConsumerRecord<?, ?> record) {
        System.out.println(record.value());
    }
}

上面实现了一个过滤奇数,接收偶数的过滤策略,我们向topic1发送0~99共100条消息,可以看到监听器只消费了偶数

消息转发

存在应用从A获取到消息,经过处理后转发到TopicB,再由应用B监听处理消息,即一个应用处理完成后将该消息转发至其他应用,完成消息转发。

在springboot集成kafka实现消息的转发也很简单,只需要通过一个@SendTo注解,被注解方法的return值即转发的消息内容

@KafkaListener(topics = {"topic1"})
@SendTo("topic2")
public String onMessage7(ConsumerRecord<?, ?> record) {
    return record.value()+"-forward message";
}

定时启动,停止监听器

默认情况下,当消费者项目启动的时候,监听器就开始工作,监听消费发送到指定topic的消息,那如果我们不想让监听器立即工作,想让他在我们指定的时间开始工作,或者在我们指定的时间点停止工作,使用kafkaListenerEndpointRegistry

  1. 禁止监听器自启动

  2. 创建两个定时任务,一个用来在指定时间点启动定时器,另一个在指定时间点停止定时器

新建一个定时任务类,用注解@EnableScheduling声明,KafkaListenerEndpointRegistry在springIO中已经被注册为Bean,直接注入,设置禁止KafkaListener自启动

@EnableScheduling
@Component
public class CronTimer {
​
    /**
     * @KafkaListener注解所标注的方法并不会在IOC容器中被注册为Bean,
     * 而是会被注册在KafkaListenerEndpointRegistry中,
     * 而KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean
     **/
    @Autowired
    private KafkaListenerEndpointRegistry registry;
​
    @Autowired
    private ConsumerFactory consumerFactory;
​
    // 监听器容器工厂(设置禁止KafkaListener自启动)
    @Bean
    public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
        ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
        container.setConsumerFactory(consumerFactory);
        //禁止KafkaListener自启动
        container.setAutoStartup(false);
        return container;
    }
​
    // 监听器
    @KafkaListener(id="timingConsumer",topics = "topic1",containerFactory = "delayContainerFactory")
    public void onMessage1(ConsumerRecord<?, ?> record){
        System.out.println("消费成功:"+record.topic()+"-"+record.partition()+"-"+record.value());
    }
​
    // 定时启动监听器
    @Scheduled(cron = "0 42 11 * * ? ")
    public void startListener() {
        System.out.println("启动监听器...");
        // "timingConsumer"是@KafkaListener注解后面设置的监听器ID,标识这个监听器
        if (!registry.getListenerContainer("timingConsumer").isRunning()) {
            registry.getListenerContainer("timingConsumer").start();
        }
        //registry.getListenerContainer("timingConsumer").resume();
    }
​
    // 定时停止监听器
    @Scheduled(cron = "0 45 11 * * ? ")
    public void shutDownListener() {
        System.out.println("关闭监听器...");
        registry.getListenerContainer("timingConsumer").pause();
    }
}

启动项目,触发生产者向topic1发送消息,可以看到consumer没有消费,因为这时监听器还没有开始工作

11:42分监听器开始工作,消费消息

11:45监听器停止工作

SpringbootkafkaApplication

package com.ywj.kafuka;
 
import com.ywj.kafuka.beans.Message;
import com.ywj.kafuka.consumer.KafkaReceiver;
import com.ywj.kafuka.provider.KafkaSender;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
 
@SpringBootApplication
public class SpringbootkafukaApplication {
 
 
 
    public static void main(String[] args) {
 
        ConfigurableApplicationContext context = SpringApplication.run(SpringbootkafukaApplication.class, args);
        KafkaSender sender = context.getBean(KafkaSender.class);
        for (int i = 0; i < 3; i++) {
            //调用消息发送类中的消息发送方法
            sender.send();
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

自定义分区器

kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区,就是所谓的分区策略,kafka提供了默认的分区策略,同时也支持自定义分区策略,其路由机制为:

  • 若发送消息时制定了分区(即自定义分区策略),则直接将消息append到指定分区

  • 若发送消息时未指定patition,但指定了key(kafka允许为每条消息设置一个key),则对key值就行hash计算,根据计算结果路由到指定分区,这种情况下可以保证同一个key的所有消息都进入到相同的分区

  • patition和key都未指定,则使用kafka默认的分区策略,轮循选出一个patition

自定义分区策略,讲消息发送到我们指定的partition,首先新建一个分区器类实现partitioner接口,重写方法,其中partition方法的返回值就表示将消息发送到几号分区

public class CustomizePartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 自定义分区规则(这里假设全部发到0号分区)
        // ......
        return 0;
    }
•
    @Override
    public void close() {
•
    }
•
    @Override
    public void configure(Map<String, ?> configs) {
•
    }
}

在application.propertise中配置自定义分区器,配置的值就是分区器类的全路径名

public class CustomizePartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 自定义分区规则(这里假设全部发到0号分区)
        // ......
        return 0;
    }
•
    @Override
    public void close() {
•
    }
•
    @Override
    public void configure(Map<String, ?> configs) {
•
    }
}

如果在发送消息时需要创建事务,可以使用KafkaTemplate 的executelnTransaction方法来声明事务(测试未成功)

@GetMapping("/kafka/transaction")
public void sendMessage7(){
    // 声明事务:后面报错消息不会发出去
    kafkaTemplate.executeInTransaction(operations -> {
        operations.send("topic1","test executeInTransaction");
        throw new RuntimeException("fail");
    });
•
    // 不声明事务:后面报错但前面消息已经发送成功了
   kafkaTemplate.send("topic1","test executeInTransaction");
   throw new RuntimeException("fail");
}

Kafka导致重复消费原因和解决

问题根本

已经消费了数据,但是offset还没来得及提交(比如kafka没有或者不知道该数据已经被消费)

  1. 强行kill线程,导致消费后的数据,offset没有提交(消息系统宕机,重启等)

  2. 设置offset为自动提交,关闭kafka时,如果在close之前,调用consumer.unsubscribe()则有可能部分offset没提交,下次重启会重复消费 例如:

    try {
        consumer.unsubscribe();
    } catch (Exception e) {
    }
    ​
    try {
        consumer.close();
    } catch (Exception e) {
    }

    上面代码会导致部分offset没提交,下次启动时会重复消费

    解决方法:设置offset自动提交给false

    整合了spring配置的修改如下配置

    spring配置

    spring.kafka.consumer.enable-auto-commit=false
    spring.kafka.consumer.auto-offset-reset=latest

    整合了API方式的修改enable.auto.commit为false

    API配

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "false");

    一旦设置了enable.auto.commit为true,kafka会保证在开始调用poll方法时,提交上次poll返回的所有消息。从顺序上来说。poll方法的逻辑是先提交上一批消息的位移,在处理下一批消息,因此他能保证不出现消息丢失的情况。

  3. (最常见):消费后的数据,当offset还没有提交时,partition就断开连接,比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30s),那么就会re—blance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费 offset机制:使得kafka在消费的过程中即使挂了或者引发再均衡问题重新分配Partation,当下次重新恢复消费时仍然可以知道从哪里开始消费。好比看一本书中的数千标记,每次通过书签标记(offset)就能快速找到该从哪里开始消费

    kafka对于offset的处理有两种提交方式:

    • 自动提交(默认) Kafka中偏移量的自动提交是由参数enable_auto_commit和auto_commit_interval_ms控制的,当enable_auto_commit=True时,Kafka在消费的过程中会以频率为auto_commit_interval_ms向Kafka自带的topic(__consumer_offsets)进行偏移量提交,具体提交到哪个Partation是以算法:partation=hash(group_id)%50来计算的。

      对于自动提交偏移量,如果auto_commit_interval_ms的值设置的过大,当消费者再自动提交偏移量之前异常退出,将导致kafka未提交偏移量,进而出现重复消费的问题,所以建议auto_commit_interval_ms的值越小越好

    • 手动提交(可以灵活的控制offset)

      对于手动提交offset主要有3种方式:1 同步提交 2 异步提交 3 同步+异步 组合的方式提交

      • 同步模式下提交失败的时候一致尝试提交,直到遇到无法重试的情况下才会结束,同时同步方式下消费者线程在拉取消息会被阻塞,在broker对提交的请求做出相应之前,会一直阻塞直到偏移量提交操作成功或者在提交过程中发生异常,限制了消息的吞吐量。每轮循一个批次,手动提交一次,只有当前批次的消息提交完成时才会触发poll来获取下一轮的消息

      • 异步手动提交偏移量+回调函数

        异步手动提交offset时,消费者线程不会阻塞,提交失败的时候也不会进行重试,并且可以配合回调函数在brocker做出响应的时候记录错误信息

        对于异步提交,由于不会进行失败重试,当消费者异常关闭或者触发了再均衡前,如果偏移量还未提交就会造成偏移量丢失。

      • 异步+同步 组合的方式提交偏移量

        针对异步提交偏移量丢失的问题,通过对消费者进行异步批次提交并且在关闭同步提交的方式,这样即使上一次的异步提交失败,通过同步提交还能进行补救,同步会一致重试,直到提交成功。

  4. 当消费者重新分配partition时,可能出现从头开始消费的情况,导致重发问题