kafka复习:(6)生产者拦截器

发布于:2023-08-24 ⋅ 阅读:(73) ⋅ 点赞:(0)

一、定义生产者拦截器

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

public class MyProducerInterceptor implements ProducerInterceptor<String,String> {
    private volatile long sendSuccess = 0;
    private volatile long sendFail = 0;
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        String newValue = "prefix:"+record.value();
        ProducerRecord<String,String> recordFinal = new ProducerRecord<>(record.topic(), record.partition(),record.timestamp(),
                record.key(),newValue,record.headers());
        return recordFinal;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if(exception == null){
            sendSuccess++;
        }
        else{
            sendFail++;
        }

    }

    @Override
    public void close() {
        System.out.println("fail and success:");
        System.out.println(sendFail);
        System.out.println(sendSuccess);

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

二、使用生产者拦截器


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class ProducerInterceptorTest {
    public static void main(String[] args) {
        System.out.println(StringSerializer.class.getName());
        Properties properties= new Properties();

        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");
        properties.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,MyProducerInterceptor.class.getName());
        KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(properties);

        for(int i=0;i<10;i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("study2", "hello ,my sister 8");
            Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
            int partition = 0;
            try {
                partition = future.get().partition();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            //System.out.println(partition);
        }

        kafkaProducer.close();
    }
}


网站公告

今日签到

点亮在社区的每一天
去签到