kafka-spring实现对于topic的监听的开启、暂停、暂停后重新开始、停止

发布于:2024-05-01 ⋅ 阅读:(36) ⋅ 点赞:(0)

kafka-spring实现对于topic的监听的开启、暂停、暂停后重新开始、停止

直接上代码

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

/**
 * kafka整体配置类
 *
 * @author Dean
 */
@Configuration
public class KafkaConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.admin.client-id}")
    private String adminClientId;

    @Bean
    public AdminClient adminClient() {
        Map<String, Object> configs = new HashMap<>(5);
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configs.put(AdminClientConfig.CLIENT_ID_CONFIG, adminClientId);
        return AdminClient.create(configs);
    }

    /**
     * 如果有多个消费组,需要定义多个不同的ConcurrentKafkaListenerContainerFactory
     *
     * @return ConcurrentKafkaListenerContainerFactory
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        String groupId = "dmGroup";
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //是否自动提交ack
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //一次拉取最大数据量,默认值为500,如果拉取时不足配置的条数则有多少拉取多少
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);

        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        //是否批量这个设置好像只对配置了@KafkaListener的方法有用
        factory.setBatchListener(false);
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
        //手动提交ack
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }
}
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;

/**
 * kafka中间件的逻辑封装类
 */
@Slf4j
@Service
public class KafkaListenerManagement {
    private final Map<String, MessageListenerContainer> containers = new ConcurrentHashMap<>();
    /**
     * 如果有多个消费组,需要注入多个不同的ConcurrentKafkaListenerContainerFactory
     */
    private final ConcurrentKafkaListenerContainerFactory<String, String> containerFactory;

    @Autowired
    public KafkaListenerManagement(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
        this.containerFactory = containerFactory;
    }

    /**
     * 开启Topic的监听
     *
     * @param topic            topic
     * @param bizLogicConsumer 消息的业务逻辑处理
     */
    public void startListening(String topic, BiConsumer<String, Acknowledgment> bizLogicConsumer) {
        //必须手动提交ACK,否则停止监听后重新监听可能导致拉取到重复的记录
        AcknowledgingMessageListener<String, String> messageListener =
                (message, acknowledgment) -> bizLogicConsumer.accept(message.value(), acknowledgment);

        MessageListenerContainer container = containerFactory.createContainer(topic);
        container.setupMessageListener(messageListener);
        container.start();
        containers.put(topic, container);
    }

    /**
     * 暂停监听
     *
     * @param topic topic
     */
    public void pauseListening(String topic) {
        MessageListenerContainer container = containers.get(topic);
        container.pause();
    }

    /**
     * 暂停后继续监听
     *
     * @param topic topic
     */
    public void resumeListening(String topic) {
        MessageListenerContainer container = containers.get(topic);
        container.resume();
    }

    /**
     * 停止监听
     *
     * @param topic topic
     */
    public void stopListening(String topic) {
        MessageListenerContainer container = containers.remove(topic);
        if (container != null) {
            container.stop();
        }
    }
}
/**
 * Kafka生产者
 *
 * @author LiuChang
 */
@Service
public class KafkaProducerManagement {
    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * 异步发送
     *
     * @param topic   topic
     * @param message 消息
     * @return ListenableFuture
     */
    public ListenableFuture<SendResult<String, String>> send(String topic, String message) {
        return kafkaTemplate.send(topic, message);
    }
}
import com.feiynn.kafka.management.KafkaListenerManagement;
import com.feiynn.kafka.management.KafkaProducerManagement;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

/**
 * 业务逻辑类
 * 注意:业务逻辑类不建议直接调用kafka的API,都调用封装后的Kafka相关的Management类
 *
 * @author Dean
 */
@Slf4j
@Service
public class BizService {
    @Resource
    private KafkaListenerManagement kafkaListenerManagement;
    @Resource
    private KafkaProducerManagement kafkaProducerManagement;

    /**
     * 开启topic监听后的业务逻辑
     *
     * @param topic topic
     */
    public void startListening(String topic) {
        kafkaListenerManagement.startListening(topic, (data, acknowledgment) -> {
            //消息处理业务逻辑
            log.info("Received message value: [{}]", data);
            try {
                //降低消费速率,方便观察日志
                TimeUnit.MILLISECONDS.sleep(100L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            acknowledgment.acknowledge();
        });
    }

    /**
     * 停止topic监听
     *
     * @param topic topic
     */
    public void stopListening(String topic) {
        kafkaListenerManagement.stopListening(topic);
    }

    /**
     * 暂停监听
     *
     * @param topic topic
     */
    public void pauseListening(String topic) {
        kafkaListenerManagement.pauseListening(topic);
    }

    /**
     * 暂停后继续监听
     *
     * @param topic topic
     */
    public void resumeListening(String topic) {
        kafkaListenerManagement.resumeListening(topic);
    }

    /**
     * 发送消息
     *
     * @param topic   topic
     * @param message 消息
     */
    public void sendMsg(String topic, String message) {
        ListenableFuture<SendResult<String, String>> listenableFuture = kafkaProducerManagement.send(topic, message);
        //添加回调逻辑,异步获取发送结果
        listenableFuture.addCallback((sendResult) -> {
            //发送成功
            log.trace("Send [{}] success", message);
        }, (e) -> {
            //发送失败,可以执行降级策略,或者把消息写入日志后续进行统一处理
            log.error("Send [{}] failed", message, e);
        });
    }
}
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = KafkaAdvancedApplication.class)
public class KafkaAdvancedTest {
    @Resource
    private BizService bizService;

    /**
     * 测试topic监听的开启、暂停、暂停后重新开始、停止
     */
    @Test
    public void startStopListening() throws InterruptedException {
 
        String topicDm = "dm0";

        //开启topic监听
        bizService.startListening(topicDm);
        TimeUnit.SECONDS.sleep(2);
        //消息前缀,用来区分是上一次发送的未消费完的消息还是本次发送的消息
        String msgPre = LocalTime.now().toString();
        log.info("msgPre=[{}]", msgPre);
        for (int i = 0; i < 2000; i++) {
            bizService.sendMsg(topicDm, "Msg_" + msgPre + "_" + i);
        }
        TimeUnit.SECONDS.sleep(5);

        log.info("pause listening begin");
        bizService.pauseListening(topicDm);
        log.info("pause listening success");
        //暂停监听成功后,消费者会把配置max.poll.records条数的消息消费完才会真正停止,因此停顿足够长的时间后观察消息消费的日志是否会暂停输出
        TimeUnit.SECONDS.sleep(20);
        log.info("resume listening");
        //暂停后重新开启消息监听
        bizService.resumeListening(topicDm);
        TimeUnit.SECONDS.sleep(20);

        //新一轮暂停与重启
        log.info("pause listening again");
        bizService.pauseListening(topicDm);
        TimeUnit.SECONDS.sleep(10);
        log.info("resume listening again");
        bizService.resumeListening(topicDm);

        //继续消费一段时间
        TimeUnit.SECONDS.sleep(10);
        //消费一段时间后停止监听
        log.info("stop listening");
        bizService.stopListening(topicDm);
        TimeUnit.SECONDS.sleep(20);

        //重新开启topic监听
        log.info("start listening again");
        bizService.startListening(topicDm);
        TimeUnit.SECONDS.sleep(120);
    }

}

直接运行测试用例,通过观察日志,即可看出各种操作效果

遇到的问题

遇到停止监听topic后,从看到消费消息的日志观察,有时会一直打印,有时会打印一段时间就停止打印的问题,最终发现暂停监听方法调用成功后,消费者会把配置max.poll.records条数的消息消费完才会真正暂停或者停止。
另外如果不是手动提交ack,停止stop(不是暂停pause)订阅topic然后后重新开始订阅(start),可能会出现重复消费消息的问题,改成手动提交ack后问题不再出现。
还考虑到max.poll.interval.ms 最大拉取时间间隔是5分钟,尝试了暂停5分30秒看是否消费者会被因为rebalance,导致在resume重新监听无法成功,测试结果是没有问题,可以成功继续监听并消费消息。
代码已使用到生产环境。


网站公告

今日签到

点亮在社区的每一天
去签到