【kafka】生产者拦截器和消费者拦截器

发布于:2023-01-23 ⋅ 阅读:(295) ⋅ 点赞:(0)

拦截器(Interceptor)是在kafka 0.10.0.0引入的,主要用来实现client端的一些定制化控制逻辑。

生产者拦截器

生产者拦截器的接口是org.apache.kafka.clients.producer.ProducerInterceptor,它包含了以下方法:

(1)void configure(Map<String, ?> configs);

该方法继承Configurable接口,获取配置信息和初始化数据时调用

(2)ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);

将消息序列化和计算分区之前调用,用户可以在该方法中对消息进行任何操作,但最好不要修改消息的topic、key、partition等信息。
*
***(3)void onAcknowledgement(RecordMetadata metadata, Exception exception);

**在消息被应答之前或消息发送失败时调用
*
*(4)void close();

关闭拦截器时执行一些资源清理的工作

示例代码:

public class ProducerInterceptorPrefix implements ProducerInterceptor<String,String> {

    private volatile long sendSuccess = 0;
    private volatile long sendFail = 0;

    @Override
    public ProducerRecord<String,String> onSend(ProducerRecord<String,String> record) {

        String value = "test-"+record.value();

        return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(), value, record.headers());
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

        if (exception==null){
            sendSuccess++;
        }else {
            sendFail++;
        }

    }

    @Override
    public void close() {
        System.out.println("发送成功:"+sendSuccess+",失败:"+sendFail);
    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

实现自定义的ProducerInterceptor后需要配置kafka参数interceptor.classes

properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorPrefix.class.getName());

如果生产了5条消息,这5条消息都会增加前缀“test-”,在消息发送完成后客户端会打印“"发送成功:5,失败:0”

KafkaProducer还可以配置多个拦截器形成拦截器链,拦截器链会按照interceptor.classes参数配置的拦截器顺序执行。在拦截器链中,如果某个而拦截器执行失败,那么下一个拦截器会接着上一个而执行成功的拦截器继续执行

消费者拦截器

消费者拦截器的接口org.apache.kafka.clients.consumer.ConsumerInterceptor,该接口包含以下方法:

**(1)ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);

poll()方法返回前调用*
*****(2)void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);

提交万消费位移之后调用*
*****(3)void close();**

关闭拦截器时执行一些资源清理的工作

示例代码:

下面代码实现了过滤掉消息时间戳与当前时间超过10秒的消息

public class ConsumerInterceptorTimestamp implements ConsumerInterceptor<String, String> {

    private static final long EXPIRE_TIME = 10*1000;

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {

        long now = System.currentTimeMillis();
        HashMap<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();

        for (TopicPartition partition : records.partitions()) {

            List<ConsumerRecord<String, String>> tpRecordList = records.records(partition);
            ArrayList<ConsumerRecord<String, String>> newTpRecordList = new ArrayList<>();

            for (ConsumerRecord<String, String> record : tpRecordList) {
                // 判断比较时间
                if (now- record.timestamp()<EXPIRE_TIME){
                    newTpRecordList.add(record);
                }
            }

            if (!newTpRecordList.isEmpty()){
                newRecords.put(partition,newTpRecordList);
            }

        }

        return new ConsumerRecords<>(newRecords);
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        offsets.forEach((t,o)->{
            System.out.println(t+"---"+o.offset());
        });
    }

    @Override
    public void close() {}

    @Override
    public void configure(Map<String, ?> configs) {}
}

同样的实现自定义拦截器后需要配置参数interceptor.classes

properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptorTimestamp.class.getName());

消费者同样有拦截器链的概念,同样按配置顺序执行,如果某个拦截器执行失败,同样下一个拦截器接着上一个执行成功的拦截器继续执行

本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到