一、理论
消息(Message)是指在应用间传送的数据。
消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到MQ中而不用管谁来取,消息使用者只管从MQ中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。
Kafka的核心概念及角色
Broker:Kafka集群包含一个或多个服务器,每个服务器被称为broker(经纪人)。
Topic:每条发布到Kafka集群的消息都有一个分类,这个类别被称为Topic(主题)。
Producer:指消息的生产者,负责发布消息到kafka broker。
Consumer:指消息的消费者,从kafka broker拉取数据,并消费这些已发布的消息。
Partition:Partition是物理上的概念,每个Topic包含一个或多个Partition,每个partition都是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。
Consumer Group:消费者组,可以给每个Consumer指定消费组,若不指定消费者组,则属于默认的group。
Message:消息,通信的基本单位,每个producer可以向一个topic发布一些消息。
Zookeeper是一种分布式协调技术,用于解决分布式环境中多个进程的同步控制,让它们有序的去访问某种共享资源,防止造成资源竞争(脑裂)的后果。
Zookeeper集群架构
server:leader、follower、observer
client
leader:领导者角色,主要负责投票的发起和决议,以及更新系统状态。
follower:跟随者角色,用于接收客户端的请求并返回结果给客户端,在选举过程中参与投票。
observer:观察者角色,用于接收客户端的请求,并将写请求转发给leader,同时同步leader状态,但是不参与投票。ovserver目的是扩展系统,提高伸缩性。
client:客户端角色,用于向zookeeper发起请求。
broker是分布式部署并且相互之间相互独立,使用Zookeeper注册系统集中管理kafka中的broker。
二、实践
1、环境
kafka1 192.168.10.101
kafka2 192.168.10.102
kafka3 192.168.10.103
2、过程
# 永久修改主机名
[root@localhost ~]# hostnamectl set-hostname kafka1
[root@localhost ~]# bash
# 添加地址映射
[root@kafka1 ~]# cat /etc/hosts
192.168.10.101 kafka1
192.168.10.102 kafka2
192.168.10.103 kafka3
# 安装zookeeper依赖环境,java
[root@kafka1 ~]# dnf -y install java
[root@kafka1 ~]# tar zxf apache-zookeeper-3.6.0-bin.tar.gz
[root@kafka1 ~]# mv apache-zookeeper-3.6.0-bin /etc/zookeeper
[root@kafka1 ~]# cd /etc/zookeeper/
[root@kafka1 zookeeper]# cd conf/
[root@kafka1 conf]# ls
configuration.xsl log4j.properties zoo_sample.cfg
[root@kafka1 conf]# mv zoo_sample.cfg zoo.cfg
[root@kafka1 zookeeper]# cat conf/zoo.cfg
dataDir=/etc/zookeeper/zookeeper-data #数据目录
clientPort=2181 # 客户端监听端口
server.1=192.168.10.101:2888:3888
server.2=192.168.10.102:2888:3888
server.3=192.168.10.103:2888:3888
# 指定服务器地址及监听端口 2888是集群内机器通讯使用(leader监听此端口),3888是选举leader使用的。
# 创建数据目录
[root@kafka1 zookeeper]# mkdir zookeeper-data
[root@kafka1 zookeeper]# ls
bin docs LICENSE.txt README.md zookeeper-data
conf lib NOTICE.txt README_packaging.md
# 创建id节点
[root@kafka1 zookeeper]# echo '1' > /etc/zookeeper/zookeeper-data/myid
[root@kafka2 zookeeper]# echo '2' > /etc/zookeeper/zookeeper-data/myid
[root@kafka3 zookeeper]# echo '3' > /etc/zookeeper/zookeeper-data/myid
[root@kafka1 zookeeper]# /etc/zookeeper/bin/zkServer.sh start
[root@kafka1 ~]# tar zxf kafka_2.13-2.4.1.tgz
[root@kafka1 ~]# mv kafka_2.13-2.4.1 /etc/kafka
[root@kafka1 ~]# cd /etc/kafka/
[root@kafka1 kafka]# vim config/server.properties
broker.id=1
# 21行,其他两个的id分别是2和3
listeners=PLAINTEXT://192.168.10.101:9092
# 31行,其他节点改成各自的IP地址
log.dirs=/etc/kafka/kafka-logs
# 60行
num.partitions=1
# 65行,分片数量,不能超过节点数。
zookeeper.connect=192.168.10.101:2181,192.168.10.102:2181,192.168.10.103:2181
# 123行,填写集群中各节点的地址和端口。
# 创建日志目录
[root@kafka1 kafka]# mkdir /etc/kafka/kafka-logs
[root@kafka1 kafka]# nohup ./bin/kafka-server-start.sh config/server.properties &
[1] 2316
# 如果启动不了,可以将/etc/kafka/kafka-logs中的数据清楚再试试。
[root@kafka1 kafka]# nohup: 忽略输入并把输出追加到 'nohup.out'
[root@kafka1 kafka]# netstat -anpt | grep 2181 # 查看zookeeper状态
tcp6 0 0 :::2181 :::* LISTEN 4151/java
tcp6 0 0 192.168.10.101:34716 192.168.10.102:2181 ESTABLISHED 4242/java
[root@kafka1 kafka]# netstat -anpt | grep 9092 # 查看kafka的状态
tcp6 0 0 192.168.10.101:9092 :::* LISTEN 4242/java
tcp6 0 0 192.168.10.101:58724 192.168.10.101:9092 ESTABLISHED 4242/java
tcp6 0 0 192.168.10.101:9092 192.168.10.101:58724 ESTABLISHED 4242/java
# 修改脚本,否则脚本中会提示大量egrep已经过时警告。
[root@kafka1 kafka]# sed -i 's/egrep/grep -E/' bin/kafka-run-class.sh
# 在任意一个节点上创建topic(主题)
[root@kafka1 kafka]# bin/kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 1 --partitions 1 --topic test
Created topic test.
# 列出topic
[root@kafka1 kafka]# bin/kafka-topics.sh --list --zookeeper kafka1:2181
test
[root@kafka1 kafka]# bin/kafka-topics.sh --list --zookeeper kafka2:2181
test
[root@kafka1 kafka]# bin/kafka-topics.sh --list --zookeeper kafka3:2181
test
# 生产消息
[root@kafka1 kafka]# bin/kafka-console-producer.sh --broker-list kafka1:9092 -topic test
>11
>22
>33
>1
>2
>3
# 消费消息
[root@kafka2 kafka]# bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic test
11
22
33
1
2
3
# 消费消息
[root@kafka3 kafka]# bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic test
22
33
1
2
3