kafka快速部署、集成、调优

发布于:2025-08-02 ⋅ 阅读:(18) ⋅ 点赞:(0)

kafka快速部署、集成、调优

1.部署

1.1 docker部署

参考:https://blog.csdn.net/taotao_guiwang/article/details/135508643
在这里插入图片描述

1.2 kafka部署

# 赋权,在kafka_zk_install目录,执行:
chmod -R 777 .
# 执行
./kafka_zk_install.sh

在这里插入图片描述

2. spring boot集成

核心代码,KafkaController.java:

package com.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaController {

    private final static String TOPIC_NAME = "my-replicated-topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/send")
    public void send() {
        kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a msg");
    }

}

MyConsumer.java:

package com.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaController {

    private final static String TOPIC_NAME = "my-replicated-topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/send")
    public void send() {
        kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a msg");
    }

}

application.yml:

server:
  port: 8080

spring:
  kafka:
    bootstrap-servers: 10.86.97.210:9092,10.86.97.210:9093,10.86.97.210:9094
    producer: # 生产者
      retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
      batch-size: 16384
      buffer-memory: 33554432
      acks: 1
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: default-group
      enable-auto-commit: false
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
      # RECORD
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
      # BATCH
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
      # TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
      # COUNT
      # TIME | COUNT 有一个条件满足时提交
      # COUNT_TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
      # MANUAL
      # 手动调用Acknowledgment.acknowledge()后立即提交
      # MANUAL_IMMEDIATE
      ack-mode: MANUAL_IMMEDIATE

浏览器访问:
http://localhost:8080/send

控制台打印:
在这里插入图片描述

3.相关资源

百度网盘:https://pan.baidu.com/s/1qlabJ7m8BDm77GbDuHmbNQ?pwd=41ac

在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到