python--使用pika库操作rabbitmq实现需求

发布于:2024-04-27 ⋅ 阅读:(28) ⋅ 点赞:(0)
Author: wencoo
Blog:https://wencoo.blog.csdn.net/
Date: 22/04/2024
Email: jianwen056@aliyun.com
Wechat:wencoo824
QQ:1419440391
Details:

在这里插入图片描述

目录

正文 或 背景

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站

有这样业务场景,算法部的同事有一个算法需要集成,有国内和国外两条链路,使用rabbitmq对应的试两个队列,但是他的算法只能开启一个实例,不能开两个进程,否则计算资源不足会崩溃(此处我想吐槽,做算法这帮人,工资又高,结果工程能力太差啦,所谓的算法也不过把github的开源库拿来改改参数,怎么好意思叫算法,搞不懂现在的中国互联网环境了),基于以上原因,需求则是需要在只开一个实力的情况下,消费两个队列里的任务。

由于算法模型都是python写的,所以要使用python来处理mq,python处理mq的驱动库是pika,下面来学习一下pika一些使用操作。

pika链接mq

import pika
import time


credentials = pika.PlainCredentials('admin', 'Hasx9527')
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '180.184.35.67',port = 5672,virtual_host = '/',credentials = credentials))
channel = connection.channel()

pika指定消费数量

# 可选:指定消费者处理队列消息的数量
channel.basic_qos(prefetch_count=1)  # 实现消费者负载均衡,表示同时只能处理1条消息,待处理完成后在接收下一条消息

pika自动消费实现

def callback(ch, method, properties, body):
    ch.basic_ack(delivery_tag = method.delivery_tag)
    print(body.decode())
    time.sleep(10)

# 可选:指定消费者处理队列消息的数量
channel.basic_qos(prefetch_count=1)  # 实现消费者负载均衡,表示同时只能处理1条消息,待处理完成后在接收下一条消息

# 告诉rabbitmq,用callback来接收消息
channel.basic_consume('test.python.pika.1',callback)
# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
channel.start_consuming()

pika获取队列任务数量

queue_info = channel.queue_declare(queue,passive=True)
message_count = queue_info.method.message_count
print(f"队列任务数量为:{message_count}")

pika主动获取任务消费

method_frame, header_frame, body = channel.basic_get(queue)
if method_frame:
    # print(method_frame, header_frame, body)  # 收到的全部数据  就要body
    print(body)          # 是b'xxx'的格式
    channel.basic_ack(method_frame.delivery_tag)  #用自动应答 这个应该不要
else:
    print('消费单条 没有收到消息')

根据需求实现的完整示例代码

import pika
import time


credentials = pika.PlainCredentials('admin', 'Hasx9527')
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '180.184.35.67',port = 5672,virtual_host = '/',credentials = credentials))
channel = connection.channel()

queue1 = 'test.python.pika.1'
queue2 = 'test.python.pika.2'
def queueConsumer(queueName):
	while True:
		# 申明消息队列,消息在这个队列传递,如果不存在,则创建队列
		queue = queueName
		channel.queue_declare(queue , durable = True)
		# 可选:指定消费者处理队列消息的数量
		channel.basic_qos(prefetch_count=1)  # 实现消费者负载均衡,表示同时只能处理1条消息,待处理完成后在接收下一条消息
		method_frame, header_frame, body = channel.basic_get(queue)
		if method_frame:
			print(body)          # 是b'xxx'的格式
			time.sleep(10)
			channel.basic_ack(method_frame.delivery_tag)  #用自动应答 这个应该不要
		else:
			time.sleep(1)
			break

def queueConsumer2(queueName,queueName2):
	while True:
		# 申明消息队列,消息在这个队列传递,如果不存在,则创建队列
		queue = queueName2
		channel.queue_declare(queue , durable = True)
		queue_info = channel.queue_declare(queueName,durable = True,passive=True)
		message_count = queue_info.method.message_count
		if message_count != 0:
			break

		# 可选:指定消费者处理队列消息的数量
		channel.basic_qos(prefetch_count=1)  # 实现消费者负载均衡,表示同时只能处理1条消息,待处理完成后在接收下一条消息
		method_frame, header_frame, body = channel.basic_get(queue)
		if method_frame:
			print(body)          # 是b'xxx'的格式
			time.sleep(10)
			channel.basic_ack(method_frame.delivery_tag)  #用自动应答 这个应该不要
		else:
			time.sleep(1)
			break

while True:
	print("开始运行队列1================\n")
	queueConsumer(queue1)

	print("开始运行队列2================\n")
	queueConsumer2(queue1,queue2)
	

参考

由于笔者的水平有限, 加之编写的同时还要参与开发工作,文中难免会出现一些错误或者不准确的地方,恳请读者批评指正。如果读者有任何宝贵意见,可以加我微信 wencoo824。QQ:1419440391。

技术交流

欢迎加微信,搜索"wencoo824",进行技术交流,备注”博客音视频技术交流“

音视频领域其他技术文章的链接

opengl相关文章

ffmpeg相关文章

ffmpeg原理相关文章

ffmpeg源码分析相关文章

ffmpeg指令相关文章

ffmpeg报错相关文章

libass相关文章

c/c++相关文章

linux相关文章

其他文章

后面都是一些废话,不用看,刷分的

推广一个AI学习网站

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站

中国软件行业倡议书

精简软件开发,电脑性能越来越好,打出的程序安装包越来越大,磁盘,内存越吃越多,这不是好现象,手机同理,大家觉得呢,欢迎发表看法,各抒己见。

手机app随意读取用户通讯录,就是流氓行为,即使有时候弹窗提示是否授权,选择了否,但是他其实还是悄悄读取你的通讯录,并且随便给你的通讯录好友发推广信息,这一点是非常反感的,并且也触犯了用户的权益,这不仅是流氓行为,更是违法行为,某软件就不说了。

作者有话说

个人简介:多年工作工程经验,擅长linux下软件开发,qt,ffmpeg音视频二次开发。

欢迎各位叨扰作者,如果有什么项目合作,创业合伙需要研发,网站推广,猎头服务,内推等等,尽管来联系,对于能挣钱的事,作者可是很感兴趣的哦。

关于内卷

劝大家一句,不要内卷,内卷只能害了别人,害了自己。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传