Spring Boot + Spring Kafka 集成

发布于:2025-08-19 ⋅ 阅读:(19) ⋅ 点赞:(0)

Spring Boot 集成 org.springframework.kafka 的完整代码示例,包含 pom 依赖、配置类、生产者、消费者、测试入口,做到开箱即用。


1. pom.xml 依赖

<dependencies>
    <!-- Spring Boot Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- Spring for Apache Kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>3.2.4</version> <!-- 选用最新稳定版本 -->
    </dependency>

    <!-- 日志 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
    </dependency>
</dependencies>

2. application.yml 配置

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all
    consumer:
      group-id: test-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: earliest

3. Kafka 配置类(可选,定制用)

package com.example.kafkademo.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaTopicConfig {

    // 自动创建一个 topic(分区=3,副本=1)
    @Bean
    public NewTopic topic() {
        return new NewTopic("test-topic", 3, (short) 1);
    }
}

4. 生产者 Service

package com.example.kafkademo.producer;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    private final KafkaTemplate<String, String> kafkaTemplate;
    private static final String TOPIC = "test-topic";

    public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    // 发送消息
    public void sendMessage(String key, String message) {
        kafkaTemplate.send(TOPIC, key, message)
                .thenAccept(result -> {
                    System.out.printf("Sent message=[%s] with key=[%s] to partition=[%d] offset=[%d]%n",
                            message, key,
                            result.getRecordMetadata().partition(),
                            result.getRecordMetadata().offset());
                });
    }
}

5. 消费者 Listener

package com.example.kafkademo.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumerListener {

    @KafkaListener(topics = "test-topic", groupId = "test-group")
    public void listen(ConsumerRecord<String, String> record) {
        System.out.printf("Consumed message: key=%s value=%s partition=%d offset=%d%n",
                record.key(), record.value(), record.partition(), record.offset());
    }
}

6. 控制器测试入口

package com.example.kafkademo.controller;

import com.example.kafkademo.producer.KafkaProducerService;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/kafka")
public class KafkaController {

    private final KafkaProducerService producerService;

    public KafkaController(KafkaProducerService producerService) {
        this.producerService = producerService;
    }

    @GetMapping("/send")
    public String send(@RequestParam String key, @RequestParam String msg) {
        producerService.sendMessage(key, msg);
        return "Message sent: " + msg;
    }
}

7. 启动类

package com.example.kafkademo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaDemoApplication.class, args);
    }
}

8. 使用方式

  1. 启动 Kafka & Zookeeper

    bin/zookeeper-server-start.sh config/zookeeper.properties
    bin/kafka-server-start.sh config/server.properties
    
  2. 启动 Spring Boot 应用

    mvn spring-boot:run
    
  3. 在浏览器/命令行发送消息

    curl "http://localhost:8080/kafka/send?key=1&msg=HelloKafka"
    
  4. 控制台会看到消费者打印出消费到的消息。



网站公告

今日签到

点亮在社区的每一天
去签到