springboot集成kafka

发布于:2025-07-06 ⋅ 阅读:(16) ⋅ 点赞:(0)

springboot集成kafka

  1. 初始化springboot环境

在这里插入图片描述
在这里插入图片描述

如果在java版本中找不到8的话,把Server URL改成 https://start.aliyun.com
在这里插入图片描述

然后什么都不选直接创建

  1. pom依赖
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.10</version>
        </dependency>

        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.2.10</version>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.17</version>
        </dependency>
  1. 创建application
    如果新建的项目中没有resources文件夹的话,这样创建
    在这里插入图片描述
    在这里插入图片描述
    application.yml具体内容
server:
  port: 9090
spring:
  application:
    name: kafka-demo
  kafka:
    bootstrap-servers: 127.0.0.1:9092  #bootstrap-servers:连接kafka的地址,多个地址用逗号分隔
    consumer:
      group-id: myGroup
      enable-auto-commit: true
      auto-commit-interval: 100ms
      properties:
        session.timeout.ms: 15000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: earliest
    producer:
      retries: 0 #若设置大于0的值,客户端会将发送失败的记录重新发送
      batch-size: 16384 #当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置
      buffer-memory: 33554432 #Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置
      key-serializer: org.apache.kafka.common.serialization.StringSerializer #关键字的序列化类
      value-serializer: org.apache.kafka.common.serialization.StringSerializer #值的序列化类
  1. logback配置文件
    创建logback.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
    <!-- 日志存放路径  logs/job 设置为相对项目目录-->
    <property name="log.path" value="logs/job" />
    <!-- 日志输出格式 时间 线程 日志级别 类 方法 对应的行数 输出信息 这样设置后输出格式如下 -->
    <!-- 15:09:27.204 [http-nio-8080-exec-10] DEBUG c.e.s.l.TestLog - [getVersion,26] - debug详细信息 -->
    <property name="log.pattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{20} - [%method,%line] - %msg%n" />

    <!-- 控制台输出 appender  -->
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <!-- 日志内容输出格式设置为定义好的 log.pattern-->
            <pattern>${log.pattern}</pattern>
        </encoder>
    </appender>

    <!-- 系统日志输出 appender  class 中的log.pattern 表示日志滚动输出 -->
    <appender name="file_info" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <!-- 日志首次输出的文件地址  -->
        <file>${log.path}/info.log</file>
        <!-- 滚动输出策略:基于时间创建日志文件 ,这样第二天输出的日志,就会按照 fileNamePattern 新建日志文件 -->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!-- 日志文件名格式 -->
            <fileNamePattern>${log.path}/info.%d{yyyy-MM-dd}.log</fileNamePattern>
            <!-- 日志最大的历史 60天 -->
            <maxHistory>60</maxHistory>
        </rollingPolicy>
        <encoder>
            <!-- 日志内容输出格式设置为定义好的 log.pattern-->
            <pattern>${log.pattern}</pattern>
        </encoder>
        <!-- 日志内容输出过滤器 -->
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <!-- 过滤的级别 -->
            <level>INFO</level>
            <!-- 匹配时的操作:接收(记录) -->
            <onMatch>ACCEPT</onMatch>
            <!-- 不匹配时的操作:拒绝(不记录) -->
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>
    <appender name="file_debug" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${log.path}/debug.log</file>
        <!-- 循环政策:基于时间创建日志文件 -->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!-- 日志文件名格式 -->
            <fileNamePattern>${log.path}/debug.%d{yyyy-MM-dd}.log</fileNamePattern>
            <!-- 日志最大的历史 60天 -->
            <maxHistory>60</maxHistory>
        </rollingPolicy>
        <encoder>
            <pattern>${log.pattern}</pattern>
        </encoder>
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <!-- 过滤的级别 -->
            <level>DEBUG</level>
            <!-- 匹配时的操作:接收(记录) -->
            <onMatch>ACCEPT</onMatch>
            <!-- 不匹配时的操作:拒绝(不记录) -->
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>

    <appender name="file_error" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${log.path}/error.log</file>
        <!-- 循环政策:基于时间创建日志文件 -->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!-- 日志文件名格式 -->
            <fileNamePattern>${log.path}/error.%d{yyyy-MM-dd}.log</fileNamePattern>
            <!-- 日志最大的历史 60天 -->
            <maxHistory>60</maxHistory>
        </rollingPolicy>
        <encoder>
            <pattern>${log.pattern}</pattern>
        </encoder>
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <!-- 过滤的级别 -->
            <level>ERROR</level>
            <!-- 匹配时的操作:接收(记录) -->
            <onMatch>ACCEPT</onMatch>
            <!-- 不匹配时的操作:拒绝(不记录) -->
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>

    <!-- 系统模块日志级别控制 name 设置为你自己的项目根路径 如com.example.logback-->
    <!-- level 设置日志输出的级别为debug 这样系统在进行日志输出时 只要级别在 debug  之后都可以打印 -->
    <!-- 日志输出级别 trace< debug < info< warn < error  -->
    <logger name="com.example.logback" level="debug" />
    <!-- Spring日志级别控制-->
    <logger name="org.springframework" level="warn" />
   <!-- kafka日志级别控制-->
    <logger name="org.apache.kafka.clients" level="info" />

    <!--系统操作日志 root 根路径的日志级别 info -->
    <root level="info">
        <!-- 将定义好的几个日志输出 追加到 root 上 -->
        <!-- console 控制台输出  -->
        <appender-ref ref="console" />
        <!-- console info级别输出  -->
        <appender-ref ref="file_info" />
        <!-- console debug级输出  -->
        <appender-ref ref="file_debug" />
        <!-- console error级输出  -->
        <appender-ref ref="file_error" />
    </root>
</configuration>


  1. 消息体
public class Message {

    private Long id;

    private String msg;

    private Date sendTime;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    public Date getSendTime() {
        return sendTime;
    }

    public void setSendTime(Date sendTime) {
        this.sendTime = sendTime;
    }
}
  1. 生产者消息封装
@Component
public class KafkaProduct {
    static Logger logger = LoggerFactory.getLogger(KafkaProduct.class);


    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProduct(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void product(String msg) {
        Message message = new Message();
        message.setId(System.currentTimeMillis());
        message.setMsg(msg);
        message.setSendTime(new Date());

        logger.info("消息发送: {}", JSONUtil.toJsonStr(message));

        kafkaTemplate.send("cousumer01", JSONUtil.toJsonStr(message));
    }
}
  1. 消费者接受
@Component
public class KafkaConsumer {

    static Logger log = LoggerFactory.getLogger(KafkaConsumer.class);


    @KafkaListener(topics = {"cousumer01"})
    public void listen(ConsumerRecord<?, ?> record) {
        Optional.ofNullable(record.value())
                .ifPresent(message -> {
                    log.info("记录record : {}", record);
                    log.info("消费消息message : {}", message);
                });
    }
}
  1. 接口定义触发
@RestController
public class InterfaceController {
    @Resource
    private KafkaProduct kafkaProduct;

    @GetMapping("/sendMsg")
    public String sendMessage(@RequestParam("msg") String msg) {
        kafkaProduct.product(msg);
        return "发送成功";
    }
}
  1. 测试
    在这里插入图片描述
    控制台输出
    在这里插入图片描述
    在这里插入图片描述

简单应用是这样,但是这里缺乏很多东西。比如消息丢失在不同场景怎么处理?怎么保证消费者不会多次消费接收到同一条消息?