【Kafka】Kafka从入门到实战:构建高吞吐量分布式消息系统

发布于:2025-06-10 ⋅ 阅读:(16) ⋅ 点赞:(0)

Kafka从入门到实战:构建高吞吐量分布式消息系统

一、Kafka概述

Apache Kafka是一个分布式流处理平台,最初由LinkedIn开发,后成为Apache顶级项目。它被设计用于高吞吐量、低延迟的消息处理,能够处理来自多个生产者的海量数据,并将这些数据实时传递给消费者。

Kafka核心特性

  1. 高吞吐量:即使是非常普通的硬件,Kafka也能支持每秒数百万条消息
  2. 可扩展性:集群可以无缝扩展,无需停机
  3. 持久性:消息持久化到磁盘,并支持数据备份防止数据丢失
  4. 分布式:天然支持分布式部署,具有容错能力
  5. 实时性:消息产生后立即对消费者可见

二、Kafka核心概念

1. 基本组件

  • Producer:消息生产者,向Kafka集群发送消息
  • Consumer:消息消费者,从Kafka集群读取消息
  • Broker:Kafka服务器节点,组成Kafka集群
  • Topic:消息类别,生产者发送消息到特定Topic,消费者订阅特定Topic
  • Partition:Topic物理上的分组,一个Topic可以分为多个Partition
  • Replica:Partition的副本,保证数据可靠性
  • Consumer Group:一组Consumer实例共同消费一个Topic

2. 消息存储机制

Kafka采用顺序写入磁盘的方式存储消息,这种设计使得Kafka即使使用普通磁盘也能实现很高的吞吐量。每个Partition是一个有序的、不可变的消息序列,新消息被追加到Partition末尾。

3. 消息传递语义

  • 至少一次(At least once):消息不会丢失,但可能被重复消费
  • 至多一次(At most once):消息可能丢失,但不会被重复消费
  • 精确一次(Exactly once):消息恰好被消费一次

三、Kafka环境搭建

1. 单机版安装

# 下载Kafka
wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
tar -xzf kafka_2.13-3.6.0.tgz
cd kafka_2.13-3.6.0

# 启动Zookeeper(新版本Kafka已内置,无需单独安装)
bin/zookeeper-server-start.sh config/zookeeper.properties

# 启动Kafka
bin/kafka-server-start.sh config/server.properties

2. 集群部署

修改config/server.properties文件:

# 每个broker必须有唯一的id
broker.id=1

# 监听地址
listeners=PLAINTEXT://:9092

# 日志目录
log.dirs=/tmp/kafka-logs

# Zookeeper连接地址
zookeeper.connect=localhost:2181

# 副本数量
default.replication.factor=3

# 分区数量
num.partitions=3

在不同节点上启动多个Broker实例即组成集群。

四、Kafka基础操作

1. Topic管理

# 创建Topic
bin/kafka-topics.sh --create --topic test-topic \
--bootstrap-server localhost:9092 \
--partitions 3 --replication-factor 1

# 查看Topic列表
bin/kafka-topics.sh --list --bootstrap-server localhost:9092

# 查看Topic详情
bin/kafka-topics.sh --describe --topic test-topic \
--bootstrap-server localhost:9092

# 删除Topic
bin/kafka-topics.sh --delete --topic test-topic \
--bootstrap-server localhost:9092

2. 生产消费消息

# 启动生产者
bin/kafka-console-producer.sh --topic test-topic \
--bootstrap-server localhost:9092

# 启动消费者
bin/kafka-console-consumer.sh --topic test-topic \
--bootstrap-server localhost:9092 --from-beginning

五、Java客户端开发

1. 添加依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.6.0</version>
</dependency>

2. 生产者示例

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class SimpleProducer {
   
    public static void main(String[] args) {
   
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 可选配置
        props.put("acks", "all"); // 确保所有副本都收到消息
        props.put("retries", 3); // 发送失败重试次数
        props.put("linger.ms", 1); // 发送延迟

        Producer<String, String> producer = new KafkaProducer<>(props);
        
        for (int i = 0; i < 100; i++) {
   
            ProducerRecord<String, String> record = 
                new ProducerRecord<>("test-topic", "key-" + i, "value-" + i);
            
            producer.