Kafka应用Demo:按主题订阅消费消息

发布于:2024-05-08 ⋅ 阅读:(30) ⋅ 点赞:(0)

安装环境

  Kafka安装可参考官方网站的指导(https://kafka.apache.org/quickstart), 按步骤解压压缩包,修改配置。然后再启动zookeeper和kafka-server即可。

  需要注意的一点:如果是在VMware虚拟机上启动的kafka, 需要修改一下server.properties配置文件,增加如下配置:
在这里插入图片描述
  advertised.listener指定访问kafka的IP和端口,IP设置为虚拟机暴露给外部访问的IP。通过本地代码连接kafka,需要使用该配置

生产者代码样例

public class KafkaProducerService {
    private static final String NEO_TOPIC = "elon-topic";
    private KafkaProducer<String, String> producer = null;

    public KafkaProducerService() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.5.128:9092");
        props.put("acks", "0");
        props.put("group.id", "1111");
        props.put("retries", "2");
        //设置key和value序列化方式
        props.put("key.serializer", StringSerializer.class);
        props.put("value.serializer", StringSerializer.class);

        //生产者实例
        producer = new KafkaProducer<>(props);
    }

    /**
     * 外部调用的发消息接口
     */
    public void sendMessage() {
        for (int i = 0; i < 10; ++i) {
            int p = i % 2;
            ProducerRecord<String, String> record = new ProducerRecord(NEO_TOPIC, p, "neo", JSON.toJSONString(i));
            producer.send(record);
        }
    }
}

 发送消息时,将10个数据分别发送到0分区和1分区。

消费者代码样例

public class KafkaConsumerService {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerService.class);
    private static final String NEO_TOPIC = "elon-topic";

    Properties properties = new Properties();
    private KafkaConsumer consumer = null;

    public KafkaConsumerService() {
        properties.put("bootstrap.servers","192.168.5.128:9092");  // 指定 Broker
        properties.put("group.id", "neo1");              // 指定消费组群 ID
        properties.put("max.poll.records", "5");
        properties.put("enable.auto.commit", "false");
        properties.put("key.deserializer", StringDeserializer.class); // 将 key 的字节数组转成 Java 对象
        properties.put("value.deserializer", StringDeserializer.class);  // 将 value 的字节数组转成 Java 对象

        consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Collections.singletonList(NEO_TOPIC));  // 订阅主题 order-events
        new Thread(this::receiveMessage).start();
    }

    public void receiveMessage() {
        try {
            while (true) {
                synchronized (this) {
                    ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                    LOGGER.info("Fetch record num:{}", records.count());
                    for (ConsumerRecord<String,String> record: records) {
                        String info = String.format("[Topic: %s][Partition:%d][Offset:%d][Key:%s][Message:%s]",
                                record.topic(), record.partition(), record.offset(), record.key(), record.value());
                        LOGGER.info("Received:" + info);
                        Thread.sleep(100);
                    }
                    consumer.commitSync();
                }
            }
        } catch (Exception e){

        } finally {
            consumer.close();
        }
    }

 消费者按主题订阅。从打印的结果可以看到,消费者循环从topic下取出各个分区的消息依次消费。

在这里插入图片描述