kafka的部署

发布于:2025-07-16 ⋅ 阅读:(21) ⋅ 点赞:(0)

目录

一、kafka简介

1.1、概述

1.2、消息系统介绍

1.3、点对点消息传递模式

1.4、发布-订阅消息传递模式

二、kafka术语解释

2.1、结构概述

2.2、broker

2.3、topic

2.4、producer

2.5、consumer

2.6、consumer group

2.7、leader

2.8、follower

2.9、partition

2.10、offset

2.11、replica

2.12、message

2.13、zookeeper

三、kafka架构

四、kafka的部署

4.1、软件下载

4.1.1、jdk的安装

4.1.2、zookeeper安装

4.1.3、kafka的安装

4.2、单机模式

4.3、集群部署

4.3.1、针对每一个节点的hosts文件添加节点的ip映射信息

4.3.2、时间同步

4.3.3、zookeeper配置 

4.3.4、创建对应的服务id

4.3.5、zoo.cfg参数解析

4.3.6、集群kafka配置


一、kafka简介

1.1、概述

kafka 是由 linkedin 公司开发,是一个分布式、分区、多副本、多生产者、多消费者,基于 zookeeper 的分布式 日志系统(也可以作为MQ 系统),常见可以用于 web/nginx 日志、访问日志、消息服务等, Linkedin2010 年将项目贡献给了Apache 基金会并成为顶级开源项目。
主要应用场景是:日志收集系统和消息详细。
设计目标如下:
1. 一时间复杂度为 O(1) 的方式提供消息持久能力,即使对 TB 级以上的数据也能保证常数时间的访问性能。
2. 高吞吐率:即使在非常廉价的商用机器上也能做到单机支持每秒 100k 条消息的传输。
3. 支持 Kafka Server 间的消息分布,以及分布式消费,同时保证每个 partition 内的消息顺序传输。
4. 同时支持离线数据和实时数据处理。
5. Scale out :支持在线水平扩展。

1.2、消息系统介绍

一个消息系统负责将数据从一个应用传递到另外一个应用,应用只需要关注数据,无需要关系数据再两个或者 多个应用间是如何传递的。分布式消息传递基于可靠的消息队列,在客户端应用和消息系统之间异步传递消 息,有两种主要的消息传递模式:点对点传递模式、发布-订阅模式。大部分的消息系统选用发布 - 订阅模式。
kafka 无疑也是一种消息订阅模式的系统。

1.3、点对点消息传递模式

在点对点消息系统中,消息持久化到一个队列中。此时,将有一个或多个消费队列中的数据。但是一条消息只 能被消费一次,当一个消费者消费了队列中的某条数据之后,该条数据则从消息队列中删除。该模式及时有多 个消费者同时消费数据,也能保证数据处理的顺序,架构示意图如下

1.4、发布-订阅消息传递模式

在该模式中,消息呗持久化到一个 topic 中。与点对点消息系统不同的是,消费者可以订阅一个或者多个 topic,消费者可以消费 topic 中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在该模式下,消息的生产者称为发布者,消费者称为订阅这,架构示意图如下:

二、kafka术语解释

2.1、结构概述

上图中一个 topic 配置了 3 partition Partition1 有两个 offset 0 1 Partition2 4 offset Partition3 1个offset 。副本的 id 和副本所在的机器的 id 恰好相同。
如果一个 topic 的副本数为 3 ,那么 Kafka 将在集群中为每个 partition 创建 3 个相同的副本。集群中的每个 broker存储一个或多个partition 。多个 producer consumer 可同时生产和消费数据。

2.2、broker

一台 Kafka 服务器就是一个 Broker ,一个集群由多个 Broker 组成,一个 Broker 可以容纳多个 Topic Broker 和Broker之间没有 Master Standy 的概念,他们之间的地位基本是平等的。
Kafka 集群包含一个或者多个服务器,服务器节点成为 broker
broker 存储 topic 的数据,如果某 topic N partion, 集群有 N broker
broker 存储 topic 的数据。如果某 topic N partition ,集群有 N broker ,那么每个 broker 存储该 topic 的一个partition
如果某 topic N partition ,集群有 (N+M) broker ,那么其中有 N broker 存储该 topic 的一个 partition ,剩下的M broker 不存储该 topic partition 数据。
如果某 topic N partition ,集群中 broker 数目少于 N 个,那么一个 broker 存储该 topic 的一个或多个 partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致 Kafka 集群数据不均衡。

2.3、topic

每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为 Topic 。(物理上不同 Topic 的消息分开存储,逻辑上一个Topic 的消息虽然保存于一个或多个 broker 上但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)
类似于数据库的表名。

2.4、producer

topic 中的数据分割为一个或多个 partition 。每个 topic 至少有一个 partition 。每个 partition 中的数据使用多个 segment文件存储。 partition 中的数据是有序的,不同 partition 间的数据丢失了数据的顺序。如果 topic 有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将 partition 数目设为1

2.5、consumer

消费者可以从 broker 中读取数据。消费者可以消费多个 topic 中的数据。

2.6、consumer group

每个 Consumer 属于一个特定的 Consumer Group (可为每个 Consumer 指定 group name ,若不指定 groupname则属于默认的 group )。

2.7、leader

每个 partition 有多个副本,其中有且仅有一个作为 Leader Leader 是当前负责数据的读写的 partition

2.8、follower

Follower 跟随 Leader ,所有写请求都通过 Leader 路由,数据变更会广播给所有 Follower Follower Leader 保持数据同步。如果Leader 失效,则从 Follower 中选举出一个新的 Leader 。当 Follower Leader 挂掉、卡住或者同步太慢,leader 会把这个 follower “in sync replicas” ISR )列表中删除,重新创建一个 Follower

2.9、partition

为了实现可扩展性,一个非常大的 Topic 可以被分为多个 Partion, 从而分布到多台 Broker 上。 Partion 中的每条消息都会被分配一个自增Id(Offset) Kafka 只保证按一个 Partion 中的顺序将消息发送给消费者,但是不保证单个Topic 中的多个 Partion 之间的顺序。

2.10、offset

消息在 Topic Partion 中的位置,同一个 Partion 中的消息随着消息的写入,其对应的 Offset 也自增,结构图如下:

2.11、replica

副本。 Topic Partion 含有 N replica,N 为副本因子。其中一个 Replica Leader, 其他都为 Follower,Leader 处理Partition 的所有读写请求,与此同时, Follower 会定期去同步 Leader 上的数据。

2.12、message

通讯的基本单位,消息

2.13、zookeeper

存放 Kafka 集群相关元数据的组件。在 ZK 集群中会保存 Topic 的状态消息,例如分区的个数,分区的组成,分区的分布情况等;保存Broker 的状态消息;报错消费者的消息等。通过这些消息, Kafka 很好的将消息生产,消息存储,消息消费的过程结合起来。

三、kafka架构

Kafka 集群中生产者将消息发送给以 Topic 命名的消息队列 Queue 中,消费者订阅发往以某个 Topic 命名的消息队列Queue 中的消息。其中 Kafka 集群由若干个 Broker 组成, Topic 由若干个 Partition 组成,每个 Partition 里面的消息通过Offset 来获取。
一个典型的 Kafka 集群中包含若干个 Producer( 可以是某个模块下发的 Command, 或者是 Web 前端产生的 PageView,或者是服务器日志,系统 CPU,Memor ) ,若干个 Broker Kafka 集群支持水平扩展,一般 Broker数量越多,整个Kafka 集群的吞吐率也就越高),若干个 ConsumerGroup, 以及一个 Zookeeper 集群。 Kafka 通过zookeeper 管理集群配置。 Producer 使用 Push 模式将消息发不到 Broker 上, consumer 使用 Pull 模式从Broker上订阅并消费消息。

四、kafka的部署

4.1、软件下载

无论单机部署还是集群,这一步都不能省

4.1.1、jdk的安装

由于带GUI界面的安装,是自带jdk版本的,我们可以选择使用默认jdk
自带JDK,这种JDK可以使用java -version检查,如果使用javac就不行了,所以进行安装

sudo yum install java-1.8.0-openjdk-devel -y

4.1.2、zookeeper安装

Apache ZooKeeper

选择3.5.7版本

上传服务器,安装

解压
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
mv apache-zookeeper-3.5.7-bin zookeeper3.5.7
mv zookeeper3.5.7/ /opt

创建软链接
ln -s /opt/zookeeper3.5.7/ /opt/zookeeper

配置环境变量
vim /etc/profile

添加
export ZK_HOME=/opt/zookeeper
export PATH=$PATH:$ZK_HOME/bin

source /etc/profile

将Zookeeper提供的配置文件复制一份,复制成Zookeeper默认寻找的文件
cd /opt/zookeeper/conf
ls
cp zoo_sample.cfg zoo.cfg
cd ..

创建数据存放目录
mkdir data
chmod 755 /opt/zookeeper/data

修改数据存放位置
cd conf/
vim zoo.cfg

##修改以下配置
dataDir=/opt/zookeeper/data

启动 Zookeeper,Zookeeper的bin目录下
cd ..
./bin/zkServer.sh start zoo.cfg

检测zookeeper是否正常

jps # 看到控制台成功输出 QuorumPeerMain,表示启动成功

./bin/zkServer.sh status zoo.cfg ## Mode: standalone表示ok

4.1.3、kafka的安装

 https://kafka.apache.org/downloads

选择 kafka_2.12-3.8.0.tgz 进行下载,Scala 2.12 和 Scala 2.13 主要是使用Scala编译的版本不同,两者皆可

上传服务器,安装

解压
tar -zxvf kafka_2.12-2.7.0.tgz
mv kafka_2.12-2.7.0 /opt
cd /opt

创建软链接
ln -s /opt/kafka_2.12-2.7.0/ /opt/kafka
ls

配置环境变量
vim /etc/profile

添加
export KAFKA_HOME=/opt/kafka
export PATH=:$PATH:${KAFKA_HOME}

source /etc/profile

4.2、单机模式

在Kafka的config目录下存在相关的配置信息——本次我们只想让Kafka快速启动起来只关注server.properties文件即可

cd ${KAFKA_HOME}/config
ls
#connect-console-sink.properties    connect-file-source.properties   consumer.properties  server.properties
#connect-console-source.properties  connect-log4j.properties         kraft                tools-log4j.properties
#connect-distributed.properties     connect-mirror-maker.properties  log4j.properties     trogdor.conf
#connect-file-sink.properties       connect-standalone.properties    producer.properties  zookeeper.properties

打开配置文件,并主要注意以下几个配置
vim server.properties

broker.id=0 #kafka服务节点的唯一标识,这里是单机不用修改
#     listeners = PLAINTEXT://host1:9092  别忘了设置成自己的主机名
listeners=PLAINTEXT://host1:9092 #kafka底层监听的服务地址,注意是使用主机名,不是ip。
# log.dirs 指定的目录 kafka启动时可以自动创建,因此不要忘了让kafka可以有读写这个目录的权限。
log.dirs=/opt/kafka/data ##kafka的分区以日志的形式存储在集群中(其实就是broker数据存储的目录)

log.retention.hours=168 #日志的留存策略,默认168小时也就是一周
# zookeeper 的连接地址 ,别忘了设置成自己的主机名,单机情况下可以使用 localhost
zookeeper.connect=host1:2181

启动kafka

./bin/kafka-server-start.sh -daemon config/server.properties #后台启动kafka

使用 jps 查看是否成功启动kafka
jps
34843 QuorumPeerMain
21756 Jps
116076 Kafka

4.3、集群部署

4.3.1、针对每一个节点的hosts文件添加节点的ip映射信息

vim  /etc/hosts
192.168.157.80 host1
192.168.157.81 host2
192.168.157.82 host3

4.3.2、时间同步

yum install ntp -y
ntpdate cn.pool.ntp.org | ntp[1-7].aliyun.com #两个时钟同步地址选择一个就行

4.3.3、zookeeper配置 

vim /opt/zookeeper/conf/zoo.cfg
##额外添加以下配置
server.1=host1:2888:3888 #数据同步端口:领导选举时服务器监听的端口
server.2=host2:2888:3888
server.3=host3:2888:3888

4.3.4、创建对应的服务id

# host1
echo 1 > /opt/zookeeper/data/myid #在这个文件中写入自己服务的id号
# host2
echo 2 > /opt/zookeeper/data/myid
# host3
echo 3 > /opt/zookeeper/data/myid

4.3.5、zoo.cfg参数解析

tickTime=2000: 通信心跳数,用于设置Zookeeper服务器与客户端之间的心跳时间间隔,单位是毫秒。这个时间间隔是Zookeeper使用的基本时间单位,用于服务器之间或客户端与服务器之间维持心跳的时间间隔。

initLimit=10: LF初始通信时限,用于设置集群中的Follower跟随者服务器与Leader领导者服务器之间启动时能容忍的最多心跳数。如果在这个时限内(10个心跳时间)领导和根随者没有发出心跳通信,就视为失效的连接,领导和根随者彻底断开。

syncLimit=5: LF同步通信时限,用于设置集群启动后,Leader与Follower之间的最大响应时间单位。假如响应超过这个时间(syncLimit * tick Time -> 10秒),Leader就认为Follower已经死掉,会将Follower从服务器列表中删除。

dataDir: 数据文件目录+数据持久化路径,主要用于保存Zookeeper中的数据。

dataLogDir: 日志文件目录,用于存储Zookeeper的日志文件。

clientPort=2181: 客户端连接端口,用于监听客户端连接的端口

4.3.6、集群kafka配置

 server.properties配置文件

cd ${KAFKA_HOME}/config
vim server.properties

broker.id=0 #kafka服务节点的唯一标识
#     listeners = PLAINTEXT://your.host.name:9092  别忘了设置成自己的主机名
listeners=PLAINTEXT://host1:9092 #集群中需要设置成每个节点自己的
# log.dirs 指定的目录 kafka启动时可以自动创建,因此不要忘了让kafka可以有读写这个目录的权限。
log.dirs=/opt/kafka/data ##kafka的分区以日志的形式存储在集群中(其实就是broker数据存储的目录)
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168 #日志的留存策略,默认168小时也就是一周
# zookeeper 集群的连接地址 
zookeeper.connect=host1:2181,host2:2181,host3:2181

其余配置:

##修改差异配置
cd ${KAFKA_HOME}/config
vim server.properties

# host2节点
broker.id=1
listeners=PLAINTEXT://host2:9092
# host3节点
broker.id=2
listeners=PLAINTEXT://host3:9092

 kafka集群即可正常启动

kafka其余命令

./bin/kafka-server-stop.sh #关闭kafka
kafka-console-consumer.sh #消费命令
kafka-console-producer.sh #生产命令
kafka-consumer-groups.sh #查看消费者组,重置消费位点等
kafka-topics.sh #查询topic状态,新建,删除,扩容
kafka-acls.sh #配置,查看kafka集群鉴权信息
kafka-configs.sh #查看,修改kafka配置
kafka-mirror-maker.sh #kafka集群间同步命令
kafka-preferred-replica-election.sh #重新选举topic分区leader
kafka-producer-perf-test.sh #kafka自带生产性能测试命令
kafka-reassign-partitions.sh #kafka数据重平衡命令
kafka-run-class.sh #kafka执行脚本

网站公告

今日签到

点亮在社区的每一天
去签到