Kafka消息队列

发布于:2025-06-05 ⋅ 阅读:(16) ⋅ 点赞:(0)

目录

一,什么是消息队列

1,消息队列的特征

2,为什么需要消息队列

二,Kafka基础人门

1,kafka相关术语

2,kafka拓扑架构

三,Zokkeeper概念介绍

1,zookeeper应用举例

2,zookeeper的工作原理是什么

3,zookeeper集群架构

4,zookeeper的工作流程

四,单节点部署Kafka

1,安装zookeeper

2,安装kafka

3,创建topic

4,查看服务进程并测试

五,集群部署Kafka

2,安装zookeeper

3,安装并配置kafka

4,测试

5,使用python脚本测试队列

一,什么是消息队列

      消息队列(Message Queue,MQ)是一种异步通信机制,用于应用程序之间或组件之间传递消息。它通过队列的形式暂存消息,允许生产者(发送消息的一方)和消费者(接收消息的一方)解耦,从而实现异步处理、流量削峰、系统扩展等功能。

      消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到MQ中而不用管谁来取,消息使用者只管从 MQ中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

1,消息队列的特征

  • 存储:存储与依赖于使用套接字的基本TCP和UDP协议的传统请求和响应系统不同,消息队列通常将消息存储在某种类型的缓冲区中,直到目标进程读取这些消息或将其从消息队列中显式移除为止
  • 异步:与请求和响应系统不同,消息队列通过缓冲消息可以在应用程序中实现一定程度的异步性,允许源进程发送消息并在队列中累积消息,而目标进程则可以挑选消息进行处理。 这样,应用程序就可以在某些故障情况下运行,例如连接断断续续或源进程或目标进程故障。
    路由:消息队列还可以提供路由功能,其中多个进程可以在同一队列中读取或写入消息,从而实现广播或单播通信模式

2,为什么需要消息队列

  • 解耦:允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束
  • 冗余:消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
  • 扩展性:因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可
  • 灵活性和峰值处理能力:在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃
  • 可恢复性:系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
  • 顺序保证:在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka保证一个Partition 内的消息的有序性
  • 缓冲:有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度
    不一致的情况
  • 异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们

二,Kafka基础人门

      Kafka 是一种高吞吐量的分布式发布/订阅消息系统,这里简单举个例子:现在是个大数据时代各种商业、社交、搜索、浏览都会产生大量的数据。那么如何快速收集这些数据,如何实时的分析这些数据,是一个必须要解决的问题,同时,这也形成了一个业务需求模型,即生产者生产(produce)各种数据,消费者(consume)消费(分析、处理)这些数据。那么面对这些需求,如何高效、稳定的完成数据的生产和消费呢?这就需要在生产者与消费者之间,建立一个通信的桥梁,这个桥梁就是消息系统。从微观层面来说,这种业务需求也可理解为不同的系统之间如何传递消息。

      kafka是 Apache 组织下的一个开源系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于 hadoop 平台的数据分析、低时延的实时系统、storm/spark 流式处理引擎等。kafka现在已被多家大型公司作为多种类型的数据管道和消息系统使用。

1,kafka相关术语

  • Broker:Kafka 集群包含一个或多个服务器,每个服务器被称为 broker(经纪人)。
  • Topic:每条发布到 Kafka 集群的消息都有一个分类,这个类别被称为 Topic(主
    题)。
  • Producer:消息的生产者,负载发布消息到kafka broker
  • Consumer:指消息的消费者,从kafka broker 拉取数据,并消费这些已发布的消息。
  • Partition:是物理上的概念,每个 Topic 包含一个或多个 Partition,每个 partition 都是一个有序的队列。partition 中的每条消息都会被分配一个有序的id(offset)
  • Consumer Group:消费者组,可以给每个Consumer 指定消费组,若不指定消费者组,则属于默认的 group。
  • Message:消息,通信的基本单位,每个producer 可以向一个 topic 发布一些消息。

2,kafka拓扑架构

一个典型的 Kafka 集群包含若干 Producer,若于 broker、若干 Consumer Group,以及一个 Zookeeper 集群。Kafka 通过 Zookeeper 管理集群配置,选举 leader,以及在 Consumer Group 发生变化时进行 rebalance。Producer 使用 push 模式将消息发布到 broker,Consumer 使用 pu1l 模式从 broker 订阅并消费消息。

      从图中可以看出,典型的消息系统有生产者(Producer),存储系统(broker)和消费者(Consumer)组成,Kafka作为分布式的消息系统支持多个生产者和多个消费者,生产者可以将消息分布到集群中不同节点的不同 Partition 上,消费者也可以消费集群中多个节点上的多个 Partition。在写消息时允许多个生产者写到同一个 Partition 中,但是读消息时一个 Partition 只允许被一个消费组中的一个消费者所消费,而一个消费者可以消费多个Partition。也就是说同一个消费组下消费者对 Partition 是互斥的,而不同消费组之间是共享的

      katka文持洞总持人化仔储,持久化数据你仔在 kafka的日志又件中,,任生产者生产消息后,kafka不会直接把消息传递给消费者,而是先要在broker中进行存储,为了减少磁盘写入的次数,broker会将消息暂时缓存起来,当消息的个数或尺寸、大小达到一定阀值时,再统一写到磁盘上,这样不但提高了 kafka的执行效率,也减少了磁盘 I0 调用次数。

三,Zokkeeper概念介绍

ZooKeeper是一种分布式协调技术,所谓分布式协调技术主要是用来解决分布式环境当中多个进程之间的同步控制,让他们有序的去访问某种共享资源,防止造成资源竞争(脑裂)的后果。脑裂是指在主备切换时,由于切换不彻底或其他原因,导致客户端和 Slave 误以为出现两个 activemaster,最终使得整个集群处于混乱状态

1,zookeeper应用举例

  • 什么是单点故障问题

所谓单点故障,就是在一个主从的分布式系统中,主节点负责任务调度分发,从节点负责任务的处理,而当主节点发生故障时,整个应用系统也就瘫痪了,那么这种故障就称为单点故障。那我们的解决方法就是通过对集群master 角色的选取,来解决分布式系统单点故障的问题。

  • 传统的方式是怎么解决单点故障的,以及由哪些缺点

传统的方式是采用一个备用节点,这个备用节点定期向主节点发送 ping 包,主节点收到 ping 包以后向备用节点发送回复 Ack 信息,当备用节点收到回复的时候就会认为当前主节点运行正常,让它继续提供服务。而当主节点故障时,备用节点就无法收到回复信息了,此时,备用节点就认为主节点宕机,然后接替它成为新的主节点继续提供服务。
这种传统解决单点故障的方法,虽然在一定程度上解决了问题,但是有一个隐患,就是网络问题,可能会存在这样一种情况:主节点并没有出现故障,只是在回复 ack 响应的时候网络发生了故障,这样备用节点就无法收到回复,那么它就会认为主节点出现了故障,接着,备用节点将接管主节点的服务,并成为新的主节点,此时,分布式系统中就出现了两个主节点(双Master 节点)的情况,双 Master 节点的出现,会导致分布式系统的服务发生混乱。这样的话,整个分布式系统将变得不可用。为了防止出现这种情况,就需要引入 ZooKeeper 来解决这种问题。

2,zookeeper的工作原理是什么

主要分为三种情形

  • master启动

在分布式系统中引入 Zookeeper 以后,就可以配置多个主节点,这里以配置两个主节点为例,假定它们是主节点A和主节点B,当两个主节点都启动后,它们都会向 ZooKeeper 中注册节点信息。我们假设主节点A注册的节点信息是master00001,主节点B注册的节点信息是 master00002 ,注册完以后会进行选举,选举有多种算法,这里以编号最小作为选举算法为例,编号最小的节点将在选举中获胜并获得锁成为主节点,也就是主节点A将会获得锁成为主节点,然后主节点B将被阻塞成为一个备用节点。这样,通过这种方式 Zookeeper 就完
成了对两个 Master 进程的调度。完成了主、备节点的分配和协作。
Lln

  • master故障

如果主节点 A 发生了故障,这时候它在 ZooKeeper 所注册的节点信息会被自动删除,而 ZooKeeper 会自动感知节点的变化,发现主节点A故障后,会再次发出选举,这时候 主节点B将在选举中获胜,替代主节点A 成为新的主节点,这样就完成了主、被节点的重新选举。

  • master恢复

如果主节点恢复了,它会再次向 ZooKeeper 注册自身的节点信息,只不过这时候它注册的节点信息将会变成 master00003,而不是原来的信息。ZooKeeper会感知节点的变化再次发动选举,这时候,主节点B在选举中会再次获胜继续担任主节点,主节点A会担任备用节点。

3,zookeeper集群架构

  • Leader:领导者角色,主要负责投票的发起和决议,以及更新系统状态。fo1lower:跟随着角色,用于接收客户端的请求并返回结果给客户端,在选举过程中参与投票。
  • observer:观察者角色,用户接收客户端的请求,并将写请求转发给leader,同时同步 leader 状态,但是不参与投票。0bserver 目的是扩展系统,提高伸缩性。
  • client:客户端角色,用于向zookeeper 发起请求

4,zookeeper的工作流程

      Zookeeper 修改数据的流程: Zookeeper 集群中每个 Server 在内存中存储了一份数据,在 Zookeeper 启动时,将从实例中选举一个Server 作为 leader,Leader 负责处理数据更新等操作,当且仅当大多数 Server 在内存中成功修改数据,才认为数据修改成功。
      Zookeeper 写的流程为:客户端 Client 首先和一个 Server 或者 0bserve 通信,发起写请求,然后 Server 将写请求转发给Leader,Leader 再将写请求转发给其它 Server,其它 Server 在接收到写请求后写入数据并响应 Leader,Leader在接收到大多数写成功回应后,认为数据写成功,最后响应Client,完成一次写操作过程。

四,单节点部署Kafka

1,安装zookeeper

dnf -y install java       ##zookeeper运行依赖java环境,需要安装java

[root@localhost ~]# ls
anaconda-ks.cfg  apache-zookeeper-3.6.0-bin.tar.gz  kafka_2.13-2.4.1.tgz

[root@localhost ~]# tar zxvf apache-zookeeper-3.6.0-bin.tar.gz
[root@localhost ~]# mv apache-zookeeper-3.6.0-bin /etc/zookeeper

[root@localhost ~]# cd /etc/zookeeper/conf
[root@localhost conf]# mv zoo_sample.cfg zoo.cfg
[root@localhost conf]# ls
configuration.xsl  log4j.properties  zoo.cfg

[root@localhost conf]# vim zoo.cfg
dataDir=/etc/zookeeper/zookeeper-data

在/etc/zookeeper/目录下创建zookeeper-data目录
[root@localhost zookeeper]# mkdir zookeeper-data

##切换到指定目录,启动zookeeper服务
cd /etc/zookeeper/bin    
[root@localhost bin]# ./zkServer.sh start

2,安装kafka

[root@localhost ~]# tar zxvf kafka_2.13-2.4.1.tgz
[root@localhost ~]# mv kafka_2.13-2.4.1 /etc/kafka
[root@localhost ~]# cd /etc/kafka

[root@localhost kafka]# vim config/server.properties 
log.dirs=/etc/kafka/kafka-logs           ##60行修改

[root@localhost kafka]# mkdir kafka-logs

##启动kafka服务
[root@localhost ~]# cd /etc/kafka/bin
[root@localhost bin]# ./kafka-server-start.sh ../config/server.properties &

3,创建topic

[root@localhost bin]# pwd
/etc/kafka/bin
[root@localhost bin]# ./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic testaaa
##创建topic,名字为testaaa

4,查看服务进程并测试

[root@localhost bin]# netstat -anpt |grep java
tcp6       0      0 :::45561                :::*                    LISTEN      5055/java           
tcp6       0      0 :::2181                 :::*                    LISTEN      5055/java           
tcp6       0      0 :::9092                 :::*                    LISTEN      5098/java           
tcp6       0      0 :::43721                :::*                    LISTEN      5098/java           
tcp6       0      0 :::8080                 :::*                    LISTEN      5055/java           
tcp6       0      0 127.0.0.1:2181          127.0.0.1:56962         ESTABLISHED 5055/java           
tcp6       0      0 127.0.0.1:9092          127.0.0.1:53582         ESTABLISHED 5098/java           
tcp6       0      0 127.0.0.1:53582         127.0.0.1:9092          ESTABLISHED 5098/java           
tcp6       0      0 127.0.0.1:56962         127.0.0.1:2181          ESTABLISHED 5098/java 

##生产消息
[root@localhost bin]./kafka-console-producer.sh --broker-list 127.0.0.1:9092 -topic testaaa
>123
>456
>789

##打开一个新的终端,查看消息
[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 -topic testaaa
123
456
789

五,集群部署Kafka

1,准备工作

192.168.10.101 Zookeeper ,kafka
192.168.10.105 Zookeeper ,kafka
192.168.10.106 Zookeeper ,kafka

2,基础环境设置

systemctl stop firewalld

setenforce 0

dnf -y install java  ##三台都要安装java

##三台服务器修改名字
hostnamectl set-hostname kafka1
hostnamectl set-hostname kafka2
hostnamectl set-hostname kafka3

cat /etc/hosts        ##在该文件中添加
192.168.10.101 kafka1
192.168.10.105 kafka2
192.168.10.106 kafka3

2,安装zookeeper

三个节点都要操作

[root@kafka1 ~]# cd /etc/zookeeper/conf
[root@kafka1 conf]# ls
configuration.xsl  zoo.cfg               zoo_sample.cfg
log4j.properties   zoo.cfg.dynamic.next

[root@kafka1 conf]# vim zoo.cfg
dataDir=/etc/zookeeper/zookeeper-data          ##修改并添加几行
clientPort=2181
server.1=192.168.10.101:2888:3888          
server.2=192.168.10.105:2888:3888
server.3=192.168.10.106:2888:3888

[root@kafka1 conf]# mkdir /etc/zookeeper/zookeeper-data/

echo '1'>//etc/zookeeper/zookeeper-data/myid
echo '2'>//etc/zookeeper/zookeeper-data/myid
echo '3'>//etc/zookeeper/zookeeper-data/myid

[root@kafka1 ~]# cd /etc/zookeeper/bin 
[root@kafka1 bin]# ./zkServer.sh setart          ##一定要启动zookeeper

3,安装并配置kafka

[root@kafka1 ~]# cd /etc/kafka/config

[root@kafka1 config]# vim server.properties 
broker.id=1           ##id值不能一样,其他两个id为2和三
listeners=PLAINTEXT://192.168.10.101:9092      ##三台填写自己的ip
log.dirs=/etc/kafka/kafka-logs
zookeeper.connect=192.168.10.101:2181,192.168.10.105:2181,192.168.10.106:2181

##启动kafka
[root@kafka1 ~]# cd /etc/kafka/bin
[root@kafka1 bin]# ./kafka-server-start.sh ../config/server.properties &

4,测试

任意一台服务器创建topic
./kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 1 --partitions 1 --topic test1111111

./kafka-console-producer.sh --broker-list kafka1:9092 -topic test1111111       生产消息

./kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic test1111111      另一台消费消息

5,使用python脚本测试队列

准备好两个脚本分布式:kafka_consumer ##消费者

                                        kafka_producer  ##生产者

生产者脚本 

[root@kafka1 opt]# cat kafka_producer.py 
from kafka import KafkaProducer
import json
import time

# 配置Kafka连接
producer = KafkaProducer(
    bootstrap_servers=['192.168.10.101:9092'],  # Kafka服务器地址
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),  # 序列化消息为JSON
    retries=3  # 重试次数
)

# 主题名称
TOPIC = 'test_topic'

def send_message(key, value):
    """发送消息到Kafka"""
    try:
        # 发送消息并获取future对象
        future = producer.send(TOPIC, key=key.encode('utf-8'), value=value)
        # 阻塞等待确认
        record_metadata = future.get(timeout=10)
        print(f"消息发送成功: 主题={record_metadata.topic}, 分区={record_metadata.partition}, 偏移量={record_metadata.offset}")
    except Exception as e:
        print(f"消息发送失败: {e}")

if __name__ == "__main__":
    # 发送示例消息
    for i in range(10):
        message = {
            'id': i,
            'timestamp': time.time(),
            'content': f'这是第{i}条测试消息'
        }
        send_message(key=f'msg_{i}', value=message)
        time.sleep(1)  # 每秒发送一条
        
    # 确保所有消息都被发送
    producer.flush()
    # 关闭连接
    producer.close() 

 消费者脚本

[root@kafka2 opt]# cat kafka_consumer.py 
from kafka import KafkaConsumer
import json

# 配置Kafka连接
consumer = KafkaConsumer(
    'test_topic',  # 订阅的主题
    bootstrap_servers=['192.168.10.101:9092'],  # Kafka服务器地址
    auto_offset_reset='earliest',  # 从最早的消息开始消费
    group_id='my-group',  # 消费者组ID
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),  # 反序列化JSON消息
    enable_auto_commit=True,  # 自动提交偏移量
    auto_commit_interval_ms=1000  # 自动提交间隔(毫秒)
)

def consume_messages():
    """消费Kafka消息"""
    try:
        print("开始消费消息...")
        for message in consumer:
            # 消息元数据
            print(f"分区: {message.partition}, 偏移量: {message.offset}")
            # 消息键
            if message.key:
                print(f"键: {message.key.decode('utf-8')}")
            # 消息值
            print(f"值: {message.value}")
            print("-" * 50)
    except KeyboardInterrupt:
        print("停止消费")
    finally:
        # 关闭消费者连接
        consumer.close()

if __name__ == "__main__":
    consume_messages()

测试生产者发送信息

[root@kafka1 opt]# python3 kafka_producer.py 
消息发送成功: 主题=test_topic, 分区=0, 偏移量=10
消息发送成功: 主题=test_topic, 分区=0, 偏移量=11
消息发送成功: 主题=test_topic, 分区=0, 偏移量=12
消息发送成功: 主题=test_topic, 分区=0, 偏移量=13
消息发送成功: 主题=test_topic, 分区=0, 偏移量=14
消息发送成功: 主题=test_topic, 分区=0, 偏移量=15
消息发送成功: 主题=test_topic, 分区=0, 偏移量=16
消息发送成功: 主题=test_topic, 分区=0, 偏移量=17
消息发送成功: 主题=test_topic, 分区=0, 偏移量=18
消息发送成功: 主题=test_topic, 分区=0, 偏移量=19

测试消费者接收信息

[root@kafka2 opt]# python3 kafka_consumer.py 
开始消费消息...
分区: 0, 偏移量: 0
键: msg_0
值: {'id': 0, 'timestamp': 1749021196.3487701, 'content': '这是第0条测试消息'}
--------------------------------------------------
分区: 0, 偏移量: 1
键: msg_1
值: {'id': 1, 'timestamp': 1749021197.5858512, 'content': '这是第1条测试消息'}
--------------------------------------------------
分区: 0, 偏移量: 2
键: msg_2
值: {'id': 2, 'timestamp': 1749021198.5925498, 'content': '这是第2条测试消息'}
--------------------------------------------------
分区: 0, 偏移量: 3
键: msg_3
值: {'id': 3, 'timestamp': 1749021199.5999444, 'content': '这是第3条测试消息'}
--------------------------------------------------
分区: 0, 偏移量: 4
键: msg_4
值: {'id': 4, 'timestamp': 1749021200.6062272, 'content': '这是第4条测试消息'}
--------------------------------------------------
分区: 0, 偏移量: 5
键: msg_5
值: {'id': 5, 'timestamp': 1749021201.6123135, 'content': '这是第5条测试消息'}
--------------------------------------------------
分区: 0, 偏移量: 6
键: msg_6
值: {'id': 6, 'timestamp': 1749021202.6162555, 'content': '这是第6条测试消息'}
--------------------------------------------------
分区: 0, 偏移量: 7
键: msg_7
值: {'id': 7, 'timestamp': 1749021203.619262, 'content': '这是第7条测试消息'}
--------------------------------------------------
分区: 0, 偏移量: 8
键: msg_8
值: {'id': 8, 'timestamp': 1749021204.6262918, 'content': '这是第8条测试消息'}
--------------------------------------------------
分区: 0, 偏移量: 9
键: msg_9
值: {'id': 9, 'timestamp': 1749021205.6304817, 'content': '这是第9条测试消息'}


网站公告

今日签到

点亮在社区的每一天
去签到