目录
一、消息队列
1.什么是消息队列
a.消息是指在应用间传输的数据
b.消息队列是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递
2.消息队列的特征
a.存储:
与依赖于使用套节子的基本TCP和UDP协议的传统请求和响应系统不同,消息队列通常将消息存储在某种类型的缓冲区中,直到目标进程读取这些信息或将其从消息队列中显式移除为止
b.异步:
与请求和响应系统不同,消息队列通过缓冲消息可以在应用程序中实现一定程度的异步性,允许源进程发送消息并在队列中累计消息,而目标简称可以挑选消息进行处理
路由:消息队列还可以提供路由功能,其中多个进程可以在同一队列中读取或写入消息,从而实现广播或单播通信方式
3.为什么需要消息队列
a.解耦:允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束
b.冗余:消息队列把数据进行持久化知道它们已经被完全处理,通过这一方式规避了数据丢失风险。消息队列采用“插入-获取-删除”范式
c.扩展性:因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可
d.灵活性 & 峰值处理能力:使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃
e.可恢复性:消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统中恢复后被处理
f.顺序保证:大部分的消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理
g.缓冲:有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况
h.异步通信:允许用户把一个消息放入队列,但并不处理它
二、Kafka基础与入门
1.Kafka基本概念
a.Kafka是一种高吞吐量的分布式发布/订阅消息系统
b.Kafka是Apache组织下的一个开源系统,它的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop平台的数据分析、低延时的实施系统storm/spark流式处理引擎
2.Kafka相关术语
a.Broker:Kafka集群包含一个或多个服务器,每个服务器被称为broker(经纪人)
b.Topic:每条发布到Kafka集群的消息都有一个分类,这个类别被称为Topic(主题)
c.Producer:指消息的消费者,从kafka broker拉取数据,并消费这些已发布的消息
d.Consumer:指消息的消费者,从kafka broker拉取数据,ing消费这些已发布的消息
e.Partition:是物理上的概念,每个Topic包含一个或多个Partition,每个partition都是一个有序的队列。partition中的每条消息都会被分配一个有序的id
f.Consumer Group:消费者组,可以给每个Consumer指定消费组,若不指定消费者组,则属于默认的group
g.Message:消息,通信的基本单位,每个producer可以向一个topic发布一些消息
3.Kafka拓扑架构
a.一个典型的kafka集群包含若干Producer、若干broker、若干Consumer Group、以及一个Zookeeper集群
b.典型的消息系统有生产者(Producer),存储系统(boker)和消费者(Consumer)组成
c.kafka支持消息持久化存储,持久化数据保存在kafka的日志文件中,在生产者生产消息后,kafka不会直接把消息传递给消费者,而是先要在boker中进行存储,为了减少磁盘写入的次数,broker会将消息暂时缓存起来,当消息的个数或尺寸、大小达到一定的贬职时,再统一写到磁盘上,这样不但提高了kafka的执行效率,也减少了磁盘IO调用次数
d.kafka中每条消息写到partition中,是顺序写入磁盘中的
4.Topic与partition
a.Kafka中topic(主题)是以partition的形式存放的,每个topic都可以设置它的partition数量,Partition的数量决定了组成topic的log的数量
b.Topic为什么要设置多个Partition? 因为kafka是基于文件存储的,通过配置多个partition可以将消息内容分散存储到多个broker上,这样可以避免文件尺寸达到单机磁盘的上限。同时,将一个topic切分成任意多个partitions,可以保证消息存储、消息消费的效率
c.再存储结构上,每个partiton再物理上对应一个文件夹,该文件夹下存储这个partition的所有消息和索引文件
d.再每个partition中有多个大小相等的segment数据文件,每个segment的大小相同的,但是每条消息的大小是不同的,因此segment<br/>数据文件消息数量不一定相等
5.Producer生产机制
a.Producer是消息和数据的生产者,他发送消息到broker时,会根据Parition机制选择将其存储到哪一个Partition
b.如果Partition机制设置的合理,所有消息都可以均匀的分布到不同的Partition里,这样可以实现数据的负载均衡
6.Consumer消费机制
a.Kafka发布消息的模式:队列模式和发布/订阅模式
b.Kafka中的Producer和consumer采用的时push、pull模式,即producer向broker进行push消息,comsumer从bork进行pull消息,push和pull对于消息的生产和消费时异步进行的
三、Zookeeper概念介绍
1.zookeeper概述
a.Zookeeper是一种分布式协调技术,主要是用来解决分布式环境当中多个进程之间的同步控制,让它们有序的去访问某种共享资源,防止造成资源竞争的后果
b.Zookeeper是一种分布式应用所设计的高可用、高性能的开源协调服务,它提高了一行基本服务:分布式锁服务,同时也提供了数据的维护和管理机制
2.zookeeper应用举例
a.什么单点故障问题?
就是再一个主从的分布式系统中,主节点负责任务调度分发,从节点负责任务的处理,而当主节点发生故障时,整个应用系统也就瘫痪了
b.传统的方式是怎么解决单点故障的?以及有哪些缺点?
传统的方式是采用一个备用节点,这个备用节点定期向主节点发送ping包,主节点收到ping包以后向备用节点发送恢复ACK信息,当备用节点收到回复的时候就会认为当前主节点运行正常,让它继续提供服务。当主节点故障时,备用节点就无法收到回复信息,此时,备用节点就任务主节点宕机,然后接替它成为新的主节点继续提供服务
3.zookeeper的工作原理是什么
a.master启动
在分布式系统中引入Zookeeper以后,就可以配置多个主节点,这里以配置两个主节点为例,假设它们时主节点A和主节点B,当两个主节点都启动后,它们都想Zookeeper中注册节点信息
b.master故障
如果主节点A发生了故障,这时候它在Zookeeper所注册的节点信息会被自动删除,而Zookeeper会自动感知节点的变化,发现主节点A故障后,会再次发出选举,这时候主节点B将在选举获胜,替代主节点A成为新的主节点
c.master恢复
如果主节点恢复了,它会再次向Zookeeper注册自身的节点信息,只不过这时候它注册的节点信息将会变成master0003,而不是原来的信息。Zookeeper会感知节点信息的变化再次选举,这时候,主节点B在选举中再次获胜,继续担任主节点,主节点A会担任备用节点
4.zookeeper集群架构
Server的三个角色
Leader:领导者角色,主要负责投票的发起和决议,以及更新系统状态
Follower:跟随者角色,用于接收客户端的请求并把返回结果给客户端妙哉选举过程中参与投票
observer:观察者角色,用户接收客户端的请求,并将写请求转发给leader,同时同步leader状态,但不参与投票。Observer目的是扩展系统,提高伸缩性
5.zookeeper的工作流程
a.Zookeeper修改数据的流程:Zookeeper集群中每个Server在内存中存储了一份数据,在Zookeeper启动时,将从实例中选举一个Server作为leader ,Leader负责处理数据更新等操作,当且仅当大多数Server在内存中成功修改数据,才认为数据修改成功
b.Zookeeper写的流程:客户端Client首先和一个Server或者Observe通信,发起写请求,然后Server将写请求转发给Leader,Leader再将写请求转发给其它Server,其它Server在接收到写请求后写入数据并响应Leader,Leader在接收到大多数写成功回应后,认为数据写成功,最后响应Cliect,完成一次写操作过程
四、Zookeeper在Kafka中的作用
1.Broker注册
a.Broker是分布式并且相互之间相互独立,但是需要有一个注册系统能够将整个集群中的Broker管理起来,此时就使用到了Zookeeper
b.在Zookeeper上会有一个专门用来进行Borker服务器列表记录的节点:/brokers/ids
c.Kafka使用了全局唯一的数字来指代每个Broker服务器,不同的Broker必须使用不同的Broker ID进行注册,创建完之后,每个Broker就会将自己的IP地址和端口号记录到该节点中
2.Topic注册
a.在Kafka中,同一个Topic的消息会被分成多个分区并将其分布在多个Broker上,这些分区信息及与Broker的对应关系也都是由Zookeeper在维护,由专门节点来记录:/brokers/topics
b.Kafka中每个Topic都会以/broker/topics/[topic]的形式被记录
3.生产者负载均衡
a.四层负载均衡
b.使用Zookeeper进行负载均衡
4.消费者负载均衡
Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理的从对应Broker服务器上接收消息,每个消费者分组包含若干消费者,每条消息指挥发送给分组中的一个消费者
5.记录消息分区与消费者的关系
a.消费组下由多个消费者,
b.对于每个消费者组,Kafka会为其分配一个全局唯一的Group ID,Group内部的所有消费者共享该ID。
c.订阅的topic下的每个分区只能分配给你某个group下的一个consumer
d.Kafka为每个消费者分配一个Consumer ID 采用“Hostname:UUID”
e.在Kafka中,规定了每个消息分区只能被同组的一个消费者进行消费,因此需要在Zookeeper上记录消息分区域Consmer 之间的关系
6.消息消费进度Offset记录
在消费者对指定消息分区进行消息消费的过程中,需要定时的将分区消息的消费进度Offset记录到Zookeeper上,以便在该消费者进行重启或其它消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费
7.消费者注册
a.注册搭配消费者分组
b.对消费者分组中的消费者变化的注册监听
c.对Broker服务器变化注册监听
d.进行消费者负载均衡
五、集群部署Kafka
1.资源清单
主机 |
操作系统 |
IP地址 |
Kafka1 |
OpenEuler24.03 |
192.168.16.142 |
Kafka2 |
OpenEuler24.03 |
192.168.16.143 |
Kafka3 |
OpenEuler24.03 |
192.168.16.144 |
2.基础环境设置
a.修改主机名
hostnamectl set-hostname kafka1
hostnamectl set-hostname kafka2
hostnamectl set-hostname kafka3
b.添加地址映射
vi /etc/hosts
192.168.16.142 kafka1
192.168.16.143 kafka2
192.168.16.144 kafka3
3.安装Zookeeper
a.安装zookeeper
dnf -y install java tar
tar zxf apache-zookeeper-3.6.0-bin.tar.gz
mv apache-zookeeper-3.6.0-bin /etc/zookeeper
cd /etc/zookeeper/conf/
mv zoo_sample.cfg zoo.cfg
b.编辑配置文件
vi /etc/zookeeper/conf/zoo.cfg
dataDir=/etc/zookeeper/zookeeper-data
server.1=192.168.16.142:2888:3888
server.2=192.168.16.143:2888:3888
server.3=192.168.16.144:2888:3888
c.新增数据目录
mkdir /etc/zookeeper/zookeeper-data
d.创建ID节点
#kafka1
echo '1' > /etc/zookeeper/zookeeper-data/myid
#kafka2
echo '2' > /etc/zookeeper/zookeeper-data/myid
#kafka3
echo '3' > /etc/zookeeper/zookeeper-data/myid
e.启动服务
cd /etc/zookeeper/
./bin/zkServer.sh start
./bin/zkServer.sh status
ss -nlpt
4.安装kafka
a.安装kafka
cd
tar zxf kafka_2.13-2.4.1.tgz
mv kafka_2.13-2.4.1 /etc/kafka
b.配置kafka配置文件(三台21行、31行修改不同)
vi /etc/kafka/config/server.properties
#修改21行,注意其它的两个ID分别为2和3
broker.id=1
#修改31行去#号,修改成各自节点的IP地址
listeners=PLAINTEXT://192.168.16.142:9092
#修改60行路径
log.dirs=/etc/kafka/kafka-logs
#65行分片数量
num.partitions=1
#123行,填写集群中各节点的地址和端口
zookeeper.connect=192.168.16.142:2181,192.168.16.143:2181,192.168.16.144:2181
c.启动kafka服务
mkdir /etc/kafka/kafka-logs
cd /etc/kafka
nohup ./bin/kafka-server-start.sh config/server.properties &
ss -nlpt
5.测试
a.修改脚本
sed -i 's/egrep/grep -E/' bin/kafka-run-class.sh
b.创建topic(任意一台)
bin/kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 1 --partitions 1 --topic test
c.列出topic(任意一台)
bin/kafka-topics.sh --list --zookeeper kafka1:2181
d.生产消息
bin/kafka-console-producer.sh --broker-list kafka1:9092 -topic test
e.消费消息
bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic test