完全用python 实现消息中间件5

发布于:2024-08-08 ⋅ 阅读:(154) ⋅ 点赞:(0)

为了实现自动清理功能,我们需要定期检查消息队列,并删除那些已经过期的消息。由于我们使用的是内存中的消息队列,我们可以使用Python的time模块来跟踪时间,并在适当的时候清理过期的消息。
以下是一个更新后的示例,它包含了一个定时任务,用于清理过期的消息:

from fastapi import FastAPI, HTTPException, Depends, status
from typing import Dict, List, Optional
import json
from threading import Lock
from uuid import uuid4
import time
app = FastAPI()
# 存储消息的字典,键为频道名,值为消息队列
channels: Dict[str, List[Dict[str, str]]] = {}
# 消息锁,用于并发控制
lock = Lock()
# 消息结构示例
message_example = {
    "id": "message_id",
    "content": "Hello, World!",
    "status": "sent",  # 例如:sent, received, acknowledged
    "expiration_time": 60  # 消息过期时间(秒)
}
# 消息队列的装饰器,用于并发控制
def queue_decorator(channel: str):
    def decorator(func):
        def wrapper(*args, **kwargs):
            with lock:
                return func(*args, **kwargs)
        return wrapper
    return decorator
# 清理过期的消息的定时任务
def clean_expired_messages():
    while True:
        time.sleep(60)  # 每分钟检查一次
        for channel, messages in channels.items():
            for message in messages:
                if message["expiration_time"] and time.time() > message["expiration_time"]:
                    messages.remove(message)
# 启动定时任务
import threading
threading.Thread(target=clean_expired_messages).start()
@app.post("/publish/{channel}")
async def publish_message(channel: str, message: Dict[str, str]):
    # 生成唯一的消息ID
    message_id = str(uuid4())
    message["id"] = message_id
    message["status"] = "sent"
    message["expiration_time"] = time.time() + message["expiration_time"]  # 设置过期时间
    
    # 添加消息到消息队列
    if channel not in channels:
        channels[channel] = []
    channels[channel].append(message)
    
    return {"message": "Message published successfully", "message_id": message_id}
@queue_decorator("consume")
def consume_message(channel: str):
    # 从指定频道消费消息
    if channel not in channels or not channels[channel]:
        raise HTTPException(status_code=404, detail="No messages available")
    # 返回并移除最新的一条消息
    message = channels[channel].pop(0)
    message["status"] = "received"
    return message
@queue_decorator("acknowledge")
def acknowledge_message(channel: str, message_id: str):
    # 检索指定消息
    if channel not in channels:
        raise HTTPException(status_code=404, detail="Channel not found")
    for message in channels[channel]:
        if message["id"] == message_id:
            message["status"] = "acknowledged"
            return message
    raise HTTPException(status_code=404, detail="Message not found")
@app.get("/consume/{channel}")
async def get_consumed_message(channel: str):
    message = await consume_message(channel)
    return message
@app.get("/acknowledge/{channel}/{message_id}")
async def get_acknowledged_message(channel: str, message_id: str):
    message = await acknowledge_message(channel, message_id)
    return message
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

在这个版本中,我们添加了以下功能:

  • 自动清理功能:启动了一个后台线程,每隔一段时间检查每个频道中的消息,并删除

网站公告

今日签到

点亮在社区的每一天
去签到