目录
Kafka介绍
Kafka是一个开源的分布式事件流平台(Event Streaming Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。它的主要应用场景:实时计算分析、数据同步、消息异步、服务解耦、日志收集等,下图演示通过Kafka实现服务解耦:
Kafka有如下核心概念:
1、Producer:kafka消息生产者
2、Broker:kafka集群中的每个节点,这些节点数据被zookeeper管理
3、Consumer:Kafka消息消费者
4、Topic:kafka消息主题,是逻辑概念
5、Partition:分区,具体存储kafka消息的地方,一个Topic有多个Partition
6、Replica:副本,在kafka集群中,每个分区在不同的broker节点中有副本,Kafka 通过副本机制实现高可用。
7、ISR(In-Sync Replicas):由于拉取及时,所有与 Leader 副本保持一定程度同步的副本集合。
8、OSR(Out-of-Sync Replicas):由于拉取不及时,与 Leader 副本同步滞后过多的副本集合。
9、consumer和consumer group:一个消费者组内有多个消费者,组内的消费者会按照kafka的均衡策略分别处理不同的分区消息,一个分区信息只会被组内的一个消费者消费;不同组内的消费者,都会消费分区信息,相当于广播。
Kafka安装
环境信息如下:
1、kafka版本:kafka_2.12-2.1.0.tgz
2、kafka-manager版本:kafka-manager-2.0.0.2.zip
安装步骤如下:
1、解压kafka
tar -zxvf kafka_2.12-2.1.0.tgz -C ../install/
2、建立kafka日志路径
cd /usr/install/kafka_2.12
mkdir kafka-logs
3、修改kafka配置文件信息,其中zookeeper集群安装参考:ZooKeeper集群安装-CSDN博客
vim /usr/install/kafka_2.12/config/server.properties
#broker节点id,在集群环境下需要不同
broker.id=0
#端口
port=9092
host.name=192.168.136.128
#分区个数
num.partitions=5
#日志目录
log.dirs=/usr/install/kafka_2.12/kafka-logs
#zookeeper集群
zookeeper.connect=192.168.136.128:2181,192.168.136.129:2181,192.168.136.130:2181
4、启动kafka,启动时需要指定配置文件
cd /usr/install/kafka_2.12/bin
./kafka-server-start.sh /usr/install/kafka_2.12/config/server.properties &
5、解压kafka-manager
unzip kafka-manager-2.0.0.2.zip -d /usr/install/
6、修改kafka-manager配置文件application.conf
vim /usr/install/kafka-manager-2.0.0.2/conf/application.conf
kafka-manager.zkhosts="192.168.136.128:2181,192.168.136.129:2181,192.168.136.130:2181"
7、启动kafka-manager
/usr/install/kafka-manager-2.0.0.2/bin/kafka-manager &
8、验证安装效果
登录http://192.168.136.130:9000/,kafka-manager安装在192.168.136.130节点,创建了myCluster集群,效果如下:
生产者、消费者客户端JAVA程序
步骤如下:
1、添加项目依赖,添加spring-kafka
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2、生产者代码
package com.gingko.quickstart.kafka;
import com.alibaba.fastjson.JSON;
import com.gingko.quickstart.entity.ProductInfo;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class Producer {
public static void main(String[] args) {
KafkaProducer kafkaProducer = null;
try {
Properties properties = new Properties();
//集群地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.128:9092");
//标记生产者id
properties.put(ProducerConfig.CLIENT_ID_CONFIG,"producer_id");
//KEY和Value的序列化,kafka发送的都是二进制字节码,key作用:计算此条消息投递到主题的具体哪个分区partition
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
/**
* acks:指定发送消息后,Broker端至少有多少个副本接收到该消息;默认acks=1;
* acks=0:生产者发送消息之后不需要等待任何服务端的响应;
* acks=-1、acks=all:生产者在消息发送之后,需要等待ISR中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。
* 面试:设置acks=-1 or acks=a11 是不是一定能够保障消息的可靠性呢?
* 不一定,只是说了ISR中的副本都写入了消息,如果OSR的副本不一定完全写入
*/
properties.put(ProducerConfig.ACKS_CONFIG,"-1");
//该参数用来限制生产者客户端能发送的消息的最大值
//properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576);
//retries和retry.backoff.msretries:重试次数和重试间隔,默认100
//properties.put(ProducerConfig.RETRIES_CONFIG,3);
//properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,100);
//linger.ms:这个参数用来指定生产者发送ProducerBatch之前等待更多消息(ProducerRecord)加入ProducerBatch的时间,默认值为0
//properties.put(ProducerConfig.LINGER_MS_CONFIG,0);
//batch.size:累计多少条消息,则一次进行批量发送;
//properties.put(ProducerConfig.BATCH_SIZE_CONFIG,100);
//buffer.memory:缓存提升性能参数,默认为32M
//properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432L);
//消息主题
String topic = "test-kafka";
for(int i=0;i<10;i++) {
kafkaProducer = new KafkaProducer(properties);
ProductInfo productInfo = new ProductInfo(i+"","小米YU7");
ProducerRecord<String,String> record = new ProducerRecord<>(topic, JSON.toJSONString(productInfo));
kafkaProducer.send(record);
}
} catch (Exception e) {
e.printStackTrace();
}finally {
kafkaProducer.close();
}
}
}
3、消费者代码
package com.gingko.quickstart.kafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
public class Consumer {
public static void main(String[] args) {
Properties properties = new Properties();
//集群地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.128:9092");
//KEY和Value的反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
/**
* 消费者组,一个消费者组内有多个消费者,组内的消费者会按照kafka的均衡策略分别处理不同的分区消息,一个分区信息只会被组内的一个消费者消费;
* 不同组内的消费者,都会消费分区信息,相当于广播
*/
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
/**
* 消费者从什么位置开始拉取消息,有三种方式:"latest","earliest","none'
* none
* latest 对于其他分组不同GROUP_ID_CONFIG的消费者,从一个分区的最后提交的offset开始拉取消息,默认值
* earliest 对于其他分组不同GROUP_ID_CONFIG的消费者,从最开始的起始位置拉取消息
*/
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
//消息手动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//一次拉取最大数据量,默认为50M
properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 52428800);
//一次fetch请求,从一个partition中取得的records最大大小 默认1M
properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576);
//Consumer每次调用poll()时取到的records的最大数默认为500条
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
//消费对象
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
//订阅主题
String topic = "test-kafka";
kafkaConsumer.subscribe(Collections.singletonList(topic));
while (true) {
//拉取获取消息,获取此主题的所有消息
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
//主题对应的分区列表
Set<TopicPartition> topicPartitions = consumerRecords.partitions();
//遍历每个分区
topicPartitions.forEach(topicPartition -> {
//获取此分区的对应的消息列表
List<ConsumerRecord<String, String>> records = consumerRecords.records(topicPartition);
//遍历每条消息
records.forEach(record -> {
String key = record.key();
String value = record.value();
//消息在partition中的位置
long offset = record.offset();
String str = "收到的消息,key:" + key + ",value:" + value + ",offset:" + offset + ",partition:" + topicPartition;
System.out.println(str);
//对于消息可靠性有要求的,建议消息一条条提交
Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>();
//手动提交,提交的offset,向后移动一位
long commitOffset = offset + 1;
offsetMap.put(topicPartition,new OffsetAndMetadata(commitOffset));
kafkaConsumer.commitSync(offsetMap);
});
});
}
}
}
4、测试验证,分别运行Consumer和Producer后,在Consumer控制台显示如下信息,符合预期。
消费者客户端JAVA程序-多线程实现
实际项目中,分区partition中的消息会快速积累很多,为了满足消息的及时性,项目需要及时可靠的将消息消费出去,下图示例演示了通过多个消费者并发的消费多个分区消息的思想实现消息快速消费。
由于KafkaConsumer对象是线程不安全的,所以每个线程需要独立拥有KafkaConsumer对象,示例代码如下:
package com.gingko.quickstart.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Consumer {
public static void main(String[] args) {
Properties properties = new Properties();
//集群地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.128:9092");
//KEY和Value的反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
/**
* 消费者组,一个消费者组内有多个消费者,组内的消费者会按照kafka的均衡策略分别处理不同的分区消息,一个分区信息只会被组内的一个消费者消费;
* 不同组内的消费者,都会消费分区信息,相当于广播
*/
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
/**
* 消费者从什么位置开始拉取消息,有三种方式:"latest","earliest","none'
* none
* latest 对于其他分组不同GROUP_ID_CONFIG的消费者,从一个分区的最后提交的offset开始拉取消息,默认值
* earliest 对于其他分组不同GROUP_ID_CONFIG的消费者,从最开始的起始位置拉取消息
*/
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
//消息手动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//一次拉取最大数据量,默认为50M
properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 52428800);
//一次fetch请求,从一个partition中取得的records最大大小 默认1M
properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576);
//Consumer每次调用poll()时取到的records的最大数默认为500条
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
//订阅主题
String topic = "test-kafka";
//构造线程池,5个分区,用5个消费者线程来消费
ExecutorService executorService = Executors.newFixedThreadPool(5);
for(int i=0;i<5;i++) {
//5个分区消息任务
executorService.submit(new MutiThreadConsumer(properties,topic));
}
executorService.shutdown();//关闭线程池
}
}
package com.gingko.quickstart.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
/**
* 多线程消费者
*/
public class MutiThreadConsumer implements Runnable{
//消费者
private KafkaConsumer<String,String> kafkaConsumer;
public MutiThreadConsumer(Properties properties,String topic) {
kafkaConsumer = new KafkaConsumer<String, String>(properties);
//订阅主题
kafkaConsumer.subscribe(Collections.singletonList(topic));
}
@Override
public void run() {
while (true) {
try {
//拉取获取消息,获取此主题的所有消息
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
//主题对应的分区列表
Set<TopicPartition> topicPartitions = consumerRecords.partitions();
//遍历每个分区
topicPartitions.forEach(topicPartition -> {
//获取此分区的对应的消息列表
List<ConsumerRecord<String, String>> records = consumerRecords.records(topicPartition);
//遍历每条消息
records.forEach(record -> {
String key = record.key();
String value = record.value();
//消息在partition中的位置
long offset = record.offset();
String str = "收到的消息,key:" + key + ",value:" + value + ",offset:" + offset + ",partition:" + topicPartition
+ ",被线程" + Thread.currentThread().getName() + "消费了";
System.out.println(str);
//对于消息可靠性有要求的,建议消息一条条提交
Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>();
//手动提交,提交的offset,向后移动一位
long commitOffset = offset + 1;
offsetMap.put(topicPartition,new OffsetAndMetadata(commitOffset));
kafkaConsumer.commitSync(offsetMap);
});
});
}finally {
if(null != kafkaConsumer) {
kafkaConsumer.close();
}
}
}
}
}
测试控制台输出信息如下,符合预期。
Kafka与Springboot整合
整合步骤如下:
1、【生产者工程、消费者工程】添加项目依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.gingko</groupId>
<artifactId>kafka-producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka-producer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.7.6</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.60</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<configuration>
<mainClass>com.gingko.producer.KafkaProducerApplication</mainClass>
<skip>true</skip>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
2、编写生产者工程配置文件
# 应用服务 WEB 访问端口
server:
port: 8080
spring:
kafka:
bootstrap-servers: 192.168.136.128:9092 #kafka集群地址
producer:
#key,value 序列化
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#acks:指定发送消息后,Broker端至少有多少个副本接收到该消息;默认acks=1;
#acks=0:生产者发送消息之后不需要等待任何服务端的响应;
#cks=-1、acks=all:生产者在消息发送之后,需要等待ISR中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。
acks: 1
3、编写生产者发送消息的服务,核心类:KafkaTemplate
package com.gingko.producer.service;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Random;
@Service
public class KafkaProducerService {
@Resource
private KafkaTemplate<String,Object> kafkaTemplate;
/**
* 发送消息
* @param topic 消息主题
* @param content 消息内容
*/
public void sendMsg(String topic,Object content) {
kafkaTemplate.send(topic,content);
}
}
4、编写生产者测试案例
package com.gingko.producer.service;
import com.alibaba.fastjson.JSON;
import com.gingko.producer.entity.ProductInfo;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
@SpringBootTest
public class KafkaProducerServiceTest {
@Resource
private KafkaProducerService kafkaProducerService;
@Test
public void testSendMsg() {
for(int i=0;i<10;i++) {
ProductInfo productInfo = new ProductInfo(i+"","小米YU7");
kafkaProducerService.sendMsg("test-kafka",JSON.toJSONString(productInfo));
}
}
}
5、编写消费者工程配置文件
# 应用服务 WEB 访问端口
server:
port: 8081
spring:
kafka:
bootstrap-servers: 192.168.136.128:9092 #kafka集群地址
consumer:
enable-auto-commit: false #手工提交
#消费者从什么位置开始拉取消息,有三种方式:"latest","earliest","none'
#latest 对于其他分组不同GROUP_ID_CONFIG的消费者,从一个分区的最后提交的offset开始拉取消息,默认值
# earliest 对于其他分组不同GROUP_ID_CONFIG的消费者,从最开始的起始位置拉取消息
auto-offset-reset: latest
# key,value反序列化
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
ack-mode: manual
# 5个线程同时消费
concurrency: 5
6、编写消费者消费消息程序,核心注解:@KafkaListener
package com.gingko.kafkaconsumer.service;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class KafkaConsumerService {
/**
* 消费消息
* @param record 消息记录
* @param acknowledgment 手工提交信息
* @param consumer
*/
@KafkaListener(topics = "test-kafka",groupId = "test-group")
public void consumerMsg(ConsumerRecord<String,Object> record,
Acknowledgment acknowledgment, Consumer consumer) {
log.info("消费的消息是:{}",record.value());
acknowledgment.acknowledge();//手工确认
}
}
7、启动消费者工程和生产者工程的测试案例,控制台显示如下,符合预期。