一、使用开发工具创建SpringBoot项目
我使用的idea
可以直接在最开始建项目的时候选择所需依赖
我直接选择了kafka和lombok,如下图所示是idea帮我自动引入的依赖
二、编写配置文件application.yml
# 连接Kafka
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
# 生产者 key value的序列化方式
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 消费者 key value的反序列化方式
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#指定消费者组的 group_id
group-id: kafka-test
三、创建生产者接口
package com.example.producerserver.controller;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* @author 荣_rjy
* @create 2023/9/6 16:26
* @describe
*/
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Resource
private KafkaTemplate<String, String> kafka;
@PostMapping
public String data(@RequestBody String msg) {
// 通过Kafka发出数据
kafka.send("test", msg);
return "ok";
}
}
四、创建消费者
package com.example.producerserver.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
/**
* @author 荣_rjy
* @create 2023/9/6 16:27
* @describe
*/
@Configuration
public class KafkaConsumer {
// 指定要监听的 topic
@KafkaListener(topics = "test")
public void consumeTopic(String msg) {
// 参数: 从topic中收到的 value值
System.out.println("收到的信息: " + msg);
}
}
五、测试生产者
六、启动 kafka
配置文件的方式启动(本地)
#cmd 命令直接输入zkServer启动zookeeper服务
zkServer
#cmd 命令直接输入zkCli启动zookeeper客户端
zkCli
#cmd 命令定位到kafka的安装目录 我的安装目录是 E:\kafka\kafka_2.12-3.5.1
#cmd 命令再该目录中启动kafka
bin\windows\kafka-server-start.bat config\server.properties
linux 远程启动 kafka
配置文件的形式启动
进入该文件夹下
可以看到该文件夹下有如下文件数据
输入命令可以启动 kafka,但是不建议如此,因为需要在配置文件中进行一定的配置,本地的 linux 服务器并没有配置好
sh kafka_start.sh
使用 docker 启动 kafka
# 首先安装 kafka 需要用到的 zookeeper
docker pull wurstmeister/zookeeper
# 运行zookeeper容器
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper
# 安装kafka容器
docker pull wurstmeister/kafka
# 运行kafka容器(需要注意的是,如果该kafka需要远程连接,如该kafka安装到了linux服务器上,但是需要本地的项目去连接该服务时,KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 需要改成KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://你对应的需要对外暴露的ip地址:9092),否则,本地项目在连接kafka时会去连接localhost地址的kafka服务
docker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 --env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 wurstmeister/kafka