一:docker-compose.yml:
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
volumes:
- ./zookeeper-data:/var/lib/zookeeper/data
- ./zookeeper-log:/var/lib/zookeeper/logkafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://自己的ip:9092
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1#新版使用CFG
KAFKA_CFG_PROCESS_ROLES: broker
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: PLAINTEXT
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://自己的ip:9092
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: 1
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_CFG_NUM_PARTITIONS: 1
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
volumes:
- ./kafka-data:/var/lib/kafka/data#kafka可视化界面
kafka-manager:
image: hlebalbau/kafka-manager:latest
ports:
- "9000:9000"
environment:
ZK_HOSTS: "zookeeper:2181"
APPLICATION_SECRET: "random-secret-key"
KAFKA_MANAGER_LOG_LEVEL: "INFO"
depends_on:
- zookeeper
- kafka
二:创建权限
sudo chown -R 1000:1000 ./zookeeper-data
sudo chown -R 1000:1000 ./zookeeper-log
sudo chown -R 1000:1000 ./kafka-data
三:启动容器
docker-compose up -d
四:测试功能
1. 进入Kafka容器:
docker exec -it kafka bash
2. 创建一个测试主题:
kafka-topics --create --topic order1-topic --bootstrap-server 上面配置的ip:9092 --replication-factor 1 --partitions 1
3: 启动一个生产者:
kafka-console-producer --topic order1-topic --bootstrap-server 上面配置的ip:9092
4. 在另一个终端窗口,启动一个消费者:
docker exec -it kafka kafka-console-consumer --topic order1-topic --bootstrap-server 上面配置的ip:9092 --from-beginning
5:测试成功
生产者:
消费者:
五:kafka 可视化界面使用
打开界面进行添加Cluster,添加成功如下图所示:
点击进入可以进行Topics和Brokers的管理
五:项目中进行配置和使用
1:pom设置,因为我后面要用Flink,所以kafka.就直接引用flink-connector-kafka
<!--实时订单处理--> <!-- Flink 核心依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency> <!-- Kafka Source Connector --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> </dependency> <!-- Redis Sink 客户端 --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.9.0</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!--实时订单处理-->
application-dev.yml: 设置后,Consumer可以直接使用@KafkaListener监听消息
spring: kafka: listener: missing-topics-fatal: false bootstrap-servers: 自己的ip:9092 consumer: auto-offset-reset: earliest enable-auto-commit: false key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: session.timeout.ms: 15000 heartbeat.interval.ms: 5000 max.poll.interval.ms: 300000 metadata.max.age.ms: 3000 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
2:多 Topic + 多 Group 实现方式:
功能 | 实现方式 |
多 Topic | 多个 @KafkaListener 方法 |
多 Group | 每个监听器指定不同的 groupId |
负载均衡 | 多实例使用相同 groupId |
广播模式 | 多实例使用不同 groupId |
动态配置 | 使用 application.yml + @Value 注入 |
自定义容器工厂 | 使用 ConcurrentKafkaListenerContainerFactory |
3:
3.1 KafkaConfig:
package com.zbkj.front.config.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@Configuration
@EnableKafka
public class KafkaConfig {
// ========== 公共方法 ==========
public Map<String, Object> commonConsumerProps(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "自己的ip:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return props;
}
// ========== 消费者组 1 - order-group ==========
@Bean
public ConsumerFactory<String, String> orderConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(commonConsumerProps("order-group"));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> orderKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(orderConsumerFactory());
factory.setConcurrency(1); // 可根据分区数设置并发度
return factory;
}
// ========== 消费者组 2 - payment-group ==========
@Bean
public ConsumerFactory<String, String> paymentConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(commonConsumerProps("payment-group"));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> paymentKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(paymentConsumerFactory());
factory.setConcurrency(1);
return factory;
}
// ========== Kafka Producer Bean ==========
@Bean
public KafkaProducer<String, String> orderKafkaProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "自己的ip:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
return new KafkaProducer<>(props);
}
}
3.2 OrderConsumer
package com.zbkj.front.config.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class OrderConsumer {
@KafkaListener(topics = "orderCreate-topic", containerFactory = "orderKafkaListenerContainerFactory")
public void consumeOrder(ConsumerRecord<String, String> record) {
log.info("[订单服务] 收到消息 topic={}, offset={}, key={}, value={}",
record.topic(), record.offset(), record.key(), record.value());
}
}
3.3 PaymentConsumer
package com.zbkj.front.config.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class PaymentConsumer {
@KafkaListener(topics = "orderPayment-topic", containerFactory = "paymentKafkaListenerContainerFactory")
public void consumePayment(ConsumerRecord<String, String> record) {
log.info("[支付服务] 收到消息 topic={}, offset={}, key={}, value={}",
record.topic(), record.offset(), record.key(), record.value());
}
}
3.4 testController
package com.zbkj.front.controller;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zbkj.common.model.order.Order;
import com.zbkj.common.result.CommonResult;
import com.zbkj.front.event.OrderEvent;
import com.zbkj.front.event.OrderStatusEum;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import java.math.BigDecimal;
import java.time.LocalDateTime;
@Slf4j
@RestController
@RequestMapping("api/front/test")
@Api(tags = "测试")
public class testController {
private final ObjectMapper objectMapper = new ObjectMapper();
@Autowired
@Qualifier("orderKafkaProducer")
private KafkaProducer<String, String> orderKafkaProducer;
@ApiOperation(value = "测试")
@RequestMapping(value = "/test/orderCreate", method = RequestMethod.GET)
public CommonResult<String> orderTopic(@Validated String orderNo) {
Order order= new Order();
order.setOrderNo(orderNo);
order.setPayPrice(new BigDecimal(100));
// 添加到订单创建的地方
sendOrderCreatedEvent(order,"orderCreate-topic");
return CommonResult.success();
}
@ApiOperation(value = "测试")
@RequestMapping(value = "/test/orderPayment", method = RequestMethod.GET)
public CommonResult<String> orderPayment(@Validated String orderNo) {
Order order= new Order();
order.setOrderNo(orderNo);
order.setPayPrice(new BigDecimal(100));
// 添加到订单创建的地方
sendOrderCreatedEvent(order,"orderPayment-topic");
return CommonResult.success();
}
public void sendOrderCreatedEvent(Order order,String topic) {
try {
// 构建订单事件对象
OrderEvent event = new OrderEvent(
String.valueOf(order.getOrderNo()),
order.getPayPrice(),
LocalDateTime.now().toString(),
OrderStatusEum.CREATED
);
// 序列化为 JSON
String json = objectMapper.writeValueAsString(event);
// 创建 Kafka 消息记录(使用订单号作为 key)
ProducerRecord<String, String> record = new ProducerRecord<>(topic, order.getOrderNo(), json);
// 发送消息
orderKafkaProducer.send(record, (metadata, exception) -> {
if (exception != null) {
log.error("Kafka消息发送失败 topic={}, key={}, error={}",
record.topic(), record.key(), exception.getMessage());
} else {
log.info("Kafka消息发送成功 topic={}, key={}, partition={}, offset={}",
metadata.topic(), record.key(), metadata.partition(), metadata.offset());
}
});
} catch (Exception e) {
log.error("发送Kafka订单创建事件异常 orderNo={}", order.getOrderNo(), e);
}
}
}
执行完后:
2025-05-28 15:56:53.768 [kafka-producer-network-thread | producer-1] INFO com.zbkj.front.controller.testController - Kafka消息发送成功 topic=orderCreate-topic, key=PT672174822506216634598, partition=0, offset=0
2025-05-28 15:56:53.772 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO com.zbkj.front.config.kafka.OrderConsumer - [订单服务] 收到消息 topic=orderCreate-topic, offset=0, key=PT672174822506216634598, value={"orderId":"PT672174822506216634598","merId":null,"amount":100,"timestamp":"2025-05-28T15:56:49.726","status":"CREATED"}
2025-05-28 15:57:22.687 [kafka-producer-network-thread | producer-1] INFO com.zbkj.front.controller.testController - Kafka消息发送成功 topic=orderPayment-topic, key=SH377174799246359695171, partition=0, offset=0
2025-05-28 15:57:22.688 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO com.zbkj.front.config.kafka.PaymentConsumer - [支付服务] 收到消息 topic=orderPayment-topic, offset=0, key=SH377174799246359695171, value={"orderId":"SH377174799246359695171","merId":null,"amount":100,"timestamp":"2025-05-28T15:57:20.754","status":"CREATED"}