Spring Boot 集成 org.springframework.kafka
的完整代码示例,包含 pom 依赖、配置类、生产者、消费者、测试入口,做到开箱即用。
1. pom.xml
依赖
<dependencies>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring for Apache Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.2.4</version> <!-- 选用最新稳定版本 -->
</dependency>
<!-- 日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</dependency>
</dependencies>
2. application.yml
配置
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
consumer:
group-id: test-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
3. Kafka 配置类(可选,定制用)
package com.example.kafkademo.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class KafkaTopicConfig {
// 自动创建一个 topic(分区=3,副本=1)
@Bean
public NewTopic topic() {
return new NewTopic("test-topic", 3, (short) 1);
}
}
4. 生产者 Service
package com.example.kafkademo.producer;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
private static final String TOPIC = "test-topic";
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// 发送消息
public void sendMessage(String key, String message) {
kafkaTemplate.send(TOPIC, key, message)
.thenAccept(result -> {
System.out.printf("Sent message=[%s] with key=[%s] to partition=[%d] offset=[%d]%n",
message, key,
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
});
}
}
5. 消费者 Listener
package com.example.kafkademo.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumerListener {
@KafkaListener(topics = "test-topic", groupId = "test-group")
public void listen(ConsumerRecord<String, String> record) {
System.out.printf("Consumed message: key=%s value=%s partition=%d offset=%d%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
6. 控制器测试入口
package com.example.kafkademo.controller;
import com.example.kafkademo.producer.KafkaProducerService;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/kafka")
public class KafkaController {
private final KafkaProducerService producerService;
public KafkaController(KafkaProducerService producerService) {
this.producerService = producerService;
}
@GetMapping("/send")
public String send(@RequestParam String key, @RequestParam String msg) {
producerService.sendMessage(key, msg);
return "Message sent: " + msg;
}
}
7. 启动类
package com.example.kafkademo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaDemoApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
}
8. 使用方式
启动 Kafka & Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties
启动 Spring Boot 应用
mvn spring-boot:run
在浏览器/命令行发送消息
curl "http://localhost:8080/kafka/send?key=1&msg=HelloKafka"
控制台会看到消费者打印出消费到的消息。