C++ - 仿 RabbitMQ 实现消息队列--服务端核心模块实现(四)

发布于:2025-07-28 ⋅ 阅读:(17) ⋅ 点赞:(0)

目录

 

队列消息管理

proto 文件

消息的持久化管理

消息的管理

单个队列消息的管理

消息的总体对外管理

测试


 

队列消息管理

        因为消息数据需要在网络中进行传输,因此消息的类型定义使用 protobuf 进行,因为protobuf 中自带了序列化和反序列化功能,因此操作起来会简便一些。
        需要特别说明的是,消息的存储并没有使用数据库,因为消息长度通常不定,且有些消息可能会非常庞大,因此并不适合存储在数据库中,因此我们的处理方式(包括RabbitMQ)是直接将消息存储在文件中进行管理,而内存中管理的消息只需要记录好自己在文件中的所在位置和长度即可。
        为了便于管理,消息管理以队列为单元进行管理,因此每个队列都会有自己独立的数据存储文件,在存储消息时,先存储消息的长度,再存储消息主体。

  • 创建消息类型的 proto 文件,并使用 protobuf 命令生成相对应的代码文件。
    • 属性:消息 ID, 路由主题,持久化模式标志
    • 消息内容
    • 有效标志(持久化需要)
    • 持久化位置(内存中)
    • 持久化消息长度(内存中)
  • 消息的持久化管理
    • 管理数据
      • 队列消息文件存储的路径
      • 队列消息的存储文件名
      • 队列消息的临时交换文件名
    • 管理操作
      • 日志消息存储在文件中(长度+(属性+内容+有效位)序列化消息,连续存储即可)
      • 提供队列消息文件创建/删除功能
      • 提供队列消息的新增持久化/删除持久化
      • 提供持久化内容的垃圾回收(其实就是重新加载出所有有效消息返回,并重新生成新的消息存储文件)
  • 消息的管理(以队列为单位进行管理)
    • 队列消息管理数据
      • 队列名称
      • 待推送消息链表iii. 持久化消息 hashiv. 待确认消息 hashv. 有效消息数量
      • 已经持久化消息总量
      • 持久化管理句柄
    • 队列管理操作
      • 新增消息
      • 获取队首消息(获取的同时将消息加入待确认队列)
      • 移除指定待确认消息
      • 获取队列待消费&待确认消息数量v. 恢复队列历史消息。
      • 销毁队列所有消息
      • 判断队列消息是否为空
    • 消息的总体对外管理
      • 初始化新建队列的消息管理结构,并创建消息存储文件
      • 删除队列的消息管理结构,以及消息存储文件
      • 向指定队列新增消息
      • 获取指定队列队首消息
      • 确认指定队列待确认消息(删除)
      • 判断指定队列消息是否为空

proto 文件

syntax = "proto3";
package jiuqi;

enum ExchangeType
{
    UNKONWTYPE = 0;
    DIRECT = 1;
    FANOUT = 2;
    TOPIC = 3;
};

enum DelivertMode
{
    UNKNOWMODE = 0;
    UNDURABLE = 1;
    DURABLE = 2;
};

message BasicProperties
{
    string id = 1;
    DelivertMode deliver_mode = 2;
    string routing_key = 3;
};

message Message
{
    message Payload
    {
        BasicProperties properties = 1; 
        string body = 2;  // 主体信息
        string valid = 3; // 有效标志
    }

    Payload payload = 1; // 真正用于持久化的字段
    uint32 offset = 2;
    uint32 length = 3;
};

消息的持久化管理

    class MessageMapper
    {
    public:
        MessageMapper(const std::string &basedir, const std::string &qname)
            : _qname(qname)
        {
            std::string dir = basedir;
            if (dir.back() != '/')
                dir.push_back('/');
            _datafile = dir + qname + DATAFILE_SUBFIX;
            _tmpfile = dir + qname + TMPFILE_SUBFIX;
            assert(FileHelper::createDirectory(basedir));
            assert(createMsgFile());
        }

        bool createMsgFile()
        {
            if (FileHelper::exists(_datafile))
                return true;
            bool ret = FileHelper::createFile(_datafile);
            if (!ret)
            {
                ERROR("创建队列文件失败: %s", _datafile.c_str());
            }
            return ret;
        }

        void removeMsgFile()
        {
            FileHelper::removeFile(_datafile);
            FileHelper::removeFile(_tmpfile);
        }

        bool insert(MessagePtr &msg)
        {
            // 对有效信息序列化
            std::string body = msg->payload().SerializeAsString();
            // 写入文件
            FileHelper helper(_datafile);
            size_t msg_size = body.size();
            // 设置偏移量与数据长度
            msg->set_offset(helper.size());
            msg->set_length(body.size());
            // 先写入数据长度
            if (!helper.write((char *)&msg_size, helper.size(), sizeof(size_t)))
            {
                DEBUG("向%s写入数据长度失败", _datafile.c_str());
                return false;
            }
            // 写入数据主体消息
            if (!helper.write(body))
            {
                DEBUG("向%s写入数据失败", _datafile.c_str());
                return false;
            }
            return true;
        }

        bool remove(MessagePtr &msg)
        {
            // 将msg的有效标志设置为'0'
            msg->mutable_payload()->set_valid("0");
            // 对msg序列化
            std::string body = msg->payload().SerializeAsString();
            if (body.size() != msg->length())
            {
                DEBUG("不能修改文件中数据有效位:有效载荷不一致");
                return false;
            }
            // 覆盖写入
            FileHelper helper(_datafile);
            size_t msg_size = body.size();
            // 依旧先写入数据长度
            if (!helper.write((char *)&msg_size, msg->offset(), sizeof(size_t)))
            {
                DEBUG("向%s写入数据长度失败", _datafile.c_str());
                return false;
            }
            // 在写入数据主体消息
            if (!helper.write(body.c_str(), msg->offset() + sizeof(size_t), body.size()))
            {
                DEBUG("向%s写入数据失败", _datafile.c_str());
                return false;
            }
            return true;
        }

        // 垃圾回收
        std::list<MessagePtr> gc()
        {
            std::list<MessagePtr> result;
            bool ret;
            ret = load(result);
            if (!ret)
            {
                DEBUG("加载有效数据失败");
                return result;
            }

            FileHelper::createFile(_tmpfile);
            for (auto &msg : result)
            {
                ret = insert(_tmpfile, msg);
                if (!ret)
                {
                    DEBUG("向临时文件 %s 写入数据失败", _tmpfile.c_str());
                    return result;
                }
            }
            ret = FileHelper::removeFile(_datafile);
            if (!ret)
            {
                DEBUG("删除原文件 %s 失败", _datafile.c_str());
                return result;
            }
            ret = FileHelper::rename(_tmpfile, _datafile);
            if (!ret)
            {
                DEBUG("修改临时文件名称失败");
                return result;
            }
            return result;
        }

    private:
        bool insert(const std::string &filename, MessagePtr &msg)
        {
            // 对有效信息序列化
            std::string body = msg->payload().SerializeAsString();
            // 写入文件
            FileHelper helper(filename);
            size_t msg_size = body.size();
            // 设置偏移量与数据长度
            msg->set_offset(helper.size());
            msg->set_length(body.size());
            if (!helper.write((char *)&msg_size, helper.size(), sizeof(size_t)))
            {
                DEBUG("向%s写入数据长度失败", filename.c_str());
                return false;
            }
            if (!helper.write(body))
            {
                DEBUG("向%s写入数据失败", filename.c_str());
                return false;
            }
            return true;
        }


        bool load(std::list<MessagePtr> &result)
        {
            // 加载文件所有有效数据
            FileHelper helper(_datafile);
            size_t offset = 0, msgSize;
            size_t fsize = helper.size();
            bool ret;
            while (offset < fsize)
            {
                // 先读取数据长度
                ret = helper.read((char *)&msgSize, offset, sizeof(size_t));
                if (!ret)
                {
                    DEBUG("读取消息长度失败");
                    return false;
                }

                offset += sizeof(size_t);
                //在读取主体信息
                std::string msg_body(msgSize, 0);
                ret = helper.read(msg_body, offset, msgSize);
                if (!ret)
                {
                    DEBUG("读取消息失败");
                    return false;
                }
                offset += msgSize;
                // 进行反序列化
                MessagePtr msgp = std::make_shared<Message>();
                msgp->mutable_payload()->ParseFromString(msg_body);

                if (msgp->payload().valid() == "0")
                    continue;
                result.push_back(msgp);
            }
            return true;
        }

    private:
        std::string _qname;
        std::string _datafile;
        std::string _tmpfile;
    };

关键说明:

  • 移除某条消息时,我们并不选择直接在文件中删除它,而是将它的标志位置为‘0’,等到垃圾回收时再进行删除。
  • 在垃圾回收时,我们先从文件中读取到所有有效的消息,再将其写入到临时文件中,更新每条消息的写入位置,最后对更改文件名即可。

消息的管理

单个队列消息的管理

    class QueueMessage
    {
    public:
        using ptr = std::shared_ptr<QueueMessage>;
        QueueMessage(const std::string &basedir, const std::string &qname)
            : _qname(qname), _mapper(basedir, qname), _valid_count(0), _total_count(0)
        {
        }

        void recovery()
        {
            std::unique_lock<std::mutex> lock(_mutex);
            _msgs = _mapper.gc();
            for (auto &msg : _msgs)
            {
                _durable_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));
            }
            _valid_count = _total_count = _msgs.size();
        }

        bool insert(const BasicProperties *bp, std::string body, bool queue_is_durable)
        {
            // 构造消息对象
            MessagePtr msg = std::make_shared<Message>();
            msg->mutable_payload()->set_body(body);
            if (bp != nullptr)
            {
                msg->mutable_payload()->mutable_properties()->set_id(bp->id());
                msg->mutable_payload()->mutable_properties()->set_deliver_mode(bp->deliver_mode());
                msg->mutable_payload()->mutable_properties()->set_routing_key(bp->routing_key());
            }
            else
            {
                msg->mutable_payload()->mutable_properties()->set_id(UUIDHelper::uuid());
                msg->mutable_payload()->mutable_properties()->set_deliver_mode(DelivertMode::DURABLE);
                msg->mutable_payload()->mutable_properties()->set_routing_key("");
            }

            if (!queue_is_durable) msg->mutable_payload()->mutable_properties()->set_deliver_mode(DelivertMode::UNDURABLE);

            std::unique_lock<std::mutex> lock(_mutex);
            // 判断是否需要持久化
            if (msg->payload().properties().deliver_mode() == DelivertMode::DURABLE)
            {
                msg->mutable_payload()->set_valid("1"); // 设置有效位为"1"
                bool ret = _mapper.insert(msg);
                if (!ret)
                {
                    DEBUG("持久化存储消息: %s 失败", body.c_str());
                    return ret;
                }
                _valid_count++;
                _total_count++;
                _durable_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));
            }
            _msgs.push_back(msg);
            return true;
        }

        MessagePtr front()
        {
            std::unique_lock<std::mutex> lock(_mutex);
            if(_msgs.size() == 0) return nullptr;
            MessagePtr msg = _msgs.front();
            _msgs.pop_front();
            // 向待确认hash中添加
            _waitack_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));
            return msg;
        }

        bool remove(const std::string &msgid)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            auto it = _waitack_msgs.find(msgid);

            if (it == _waitack_msgs.end())
                return true;

            if (it->second->payload().properties().deliver_mode() == DelivertMode::DURABLE)
            {
                bool ret = _mapper.remove(it->second);
                if (!ret)
                {
                    DEBUG("删除消息(%s)失败", msgid.c_str());
                }
                _durable_msgs.erase(msgid);
                _valid_count--;
                gc();
            }
            _waitack_msgs.erase(msgid);
            return true;
        }

        size_t getable_count()
        {
            std::unique_lock<std::mutex> lock(_mutex);
            return _msgs.size();
        }
        size_t total_count()
        {
            std::unique_lock<std::mutex> lock(_mutex);
            return _total_count;
        }
        size_t durable_count()
        {
            std::unique_lock<std::mutex> lock(_mutex);
            return _durable_msgs.size();
        }
        size_t waitack_count()
        {
            std::unique_lock<std::mutex> lock(_mutex);
            return _waitack_msgs.size();
        }
        void clear()
        {
            _mapper.removeMsgFile();
            _msgs.clear();
            _durable_msgs.clear();
            _waitack_msgs.clear();
        }

    private:
        bool GCCheck()
        {
            return _total_count >= 2000 && _valid_count * 2 < _total_count;
        }

        void gc()
        {
            if (!GCCheck())
                return;
            std::list<MessagePtr> msgs = _mapper.gc();
            for (auto &msg : msgs)
            {
                auto it = _durable_msgs.find(msg->payload().properties().id());
                if (it == _durable_msgs.end())
                {
                    DEBUG("垃圾回收后, 有一条消息在内存中没有找到");
                    _durable_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));
                    _msgs.push_back(msg);
                }
                // 更新存储位置
                it->second->set_offset(msg->offset());
                it->second->set_length(msg->length());
            }
            _valid_count = _total_count = msgs.size();
        }

    private:
        std::mutex _mutex;
        std::string _qname;  // 队列名称
        size_t _valid_count; // 持久化有效消息数量
        size_t _total_count; // 持久化消息总量
        MessageMapper _mapper;
        std::list<MessagePtr> _msgs;                               // 待推送消息
        std::unordered_map<std::string, MessagePtr> _durable_msgs; // 持久化消息hash
        std::unordered_map<std::string, MessagePtr> _waitack_msgs; // 待确认消息hash
    };

关键说明:

  • 插入消息时,我们将消息先插入到一个msgs链表中,对于需要持久化的消息在使用mapper的方法写入文件。
  • 获取消息时,我们并不直接删除消息,而是将其放入待确认消息的hash表中,等待确认。
  • 当确认消息时在调用remove函数,从待确认消息的hash表中移除,再修改文件中对应消息的有效标志位,再考虑是否需要垃圾回收(回收条件为,总消息数量大于2000且有效消息数量小于一半)。
  • 我们还提供了一个recovery方法,在创建这个对象时调用这个方法来恢复文件中的所有消息。

消息的总体对外管理

    class MessageManager
    {
    public:
        using ptr = std::shared_ptr<MessageManager>;
        MessageManager(const std::string &basedir) : _basedir(basedir) {}

        void clear()
        {
            std::unique_lock<std::mutex> lock(_mutex);
            for (auto &msg : _queue_msgs)
            {
                msg.second->clear();
            }
            _queue_msgs.clear();
        }

        void initQueueMessage(const std::string &qname)
        {
            QueueMessage::ptr qmp;
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _queue_msgs.find(qname);
                if (it != _queue_msgs.end())
                    return;
                qmp = std::make_shared<QueueMessage>(_basedir, qname);
                _queue_msgs.insert(std::make_pair(qname, qmp));
            }
            qmp->recovery();
        }

        void destoryQueueMessage(const std::string &qname)
        {
            QueueMessage::ptr qmp;
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _queue_msgs.find(qname);
                if (it == _queue_msgs.end())
                {
                    DEBUG("销毁队列数据 %s 失败:不存在", qname.c_str());
                    return;
                }
                qmp = it->second;
                _queue_msgs.erase(qname);
            }
            qmp->clear();
        }

        bool insert(const std::string &qname, BasicProperties *bp, const std::string &body, bool queue_is_durable)
        {
            QueueMessage::ptr qmp;
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _queue_msgs.find(qname);
                if (it == _queue_msgs.end())
                {
                    DEBUG("向 %s 插入数据失败:队列不存在", qname.c_str());
                    return false;
                }
                qmp = _queue_msgs[qname];
            }
            return qmp->insert(bp, body, queue_is_durable);
        }

        MessagePtr front(const std::string &qname)
        {
            QueueMessage::ptr qmp;
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _queue_msgs.find(qname);
                if (it == _queue_msgs.end())
                {
                    DEBUG("获取 %s 队首消息失败:队列不存在", qname.c_str());
                    return nullptr;
                }
                qmp = _queue_msgs[qname];
            }
            return qmp->front();
        }

        void ack(const std::string &qname, const std::string &msgid)
        {
            QueueMessage::ptr qmp;
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _queue_msgs.find(qname);
                if (it == _queue_msgs.end())
                {
                    DEBUG("确认 %s 消息失败:队列不存在", qname.c_str());
                    return;
                }
                qmp = _queue_msgs[qname];
            }
            qmp->remove(msgid);
        }

        size_t getable_count(const std::string &qname)
        {
            QueueMessage::ptr qmp;
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _queue_msgs.find(qname);
                if (it == _queue_msgs.end())
                {
                    DEBUG("获取 %s 待推送消息数量失败:队列不存在", qname.c_str());
                    return 0;
                }
                qmp = _queue_msgs[qname];
            }
            return qmp->getable_count();
        }
        size_t total_count(const std::string &qname)
        {
            QueueMessage::ptr qmp;
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _queue_msgs.find(qname);
                if (it == _queue_msgs.end())
                {
                    DEBUG("获取 %s 持久化消息总数量失败:队列不存在", qname.c_str());
                    return 0;
                }
                qmp = _queue_msgs[qname];
            }
            return qmp->total_count();
        }
        size_t durable_count(const std::string &qname)
        {
            QueueMessage::ptr qmp;
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _queue_msgs.find(qname);
                if (it == _queue_msgs.end())
                {
                    DEBUG("获取 %s 有效持久化消息数量失败:队列不存在", qname.c_str());
                    return 0;
                }
                qmp = _queue_msgs[qname];
            }
            return qmp->durable_count();
        }
        size_t waitack_count(const std::string &qname)
        {
            QueueMessage::ptr qmp;
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _queue_msgs.find(qname);
                if (it == _queue_msgs.end())
                {
                    DEBUG("获取 %s 待确认消息数量失败:队列不存在", qname.c_str());
                    return 0;
                }
                qmp = _queue_msgs[qname];
            }
            return qmp->waitack_count();
        }

    private:
        std::mutex _mutex;
        std::string _basedir;
        std::unordered_map<std::string, QueueMessage::ptr> _queue_msgs;
    };

说明:

        此类其实就是封装了多个队列消息管理类:

  • 初始化即加入某个队列消息时,就恢复对应文件的所有有效消息。
  • 插入消息时,就调用对应队列管理的插入方法。
  • 取出消息时,就调用对应队列管理的取出消息方法。
  • 确认消息时,就调用对应队列管理的删除消息方法。

测试

#include "../mqserver/message.hpp"
#include <gtest/gtest.h>

jiuqi::MessageManager::ptr mmp;

class MessageTest : public testing::Environment
{
public:
    virtual void SetUp() override
    {
        mmp = std::make_shared<jiuqi::MessageManager>("./queue/");
        mmp->initQueueMessage("queue1");
    }

    virtual void TearDown() override
    {
        // mmp->clear();
    }
};

// 恢复历史数据测试
TEST(MessageTest, recovery_test)
{
    ASSERT_EQ(mmp->total_count("queue1"), 3);
    ASSERT_EQ(mmp->durable_count("queue1"), 3);
    ASSERT_EQ(mmp->getable_count("queue1"), 3);
    ASSERT_EQ(mmp->waitack_count("queue1"), 0);
}

// 插入数据测试
TEST(MessageTest, insert_test)
{
    jiuqi::BasicProperties pro;
    pro.set_id(jiuqi::UUIDHelper::uuid());
    pro.set_deliver_mode(jiuqi::DelivertMode::DURABLE);
    pro.set_routing_key("news.music.pop");
    mmp->insert("queue1", nullptr, "message1", jiuqi::DelivertMode::DURABLE);
    mmp->insert("queue1", &pro, "message2", jiuqi::DelivertMode::DURABLE);
    mmp->insert("queue1", nullptr, "message3", jiuqi::DelivertMode::DURABLE);
    mmp->insert("queue1", nullptr, "message4", jiuqi::DelivertMode::UNDURABLE);
    ASSERT_EQ(mmp->total_count("queue1"), 3);
    ASSERT_EQ(mmp->durable_count("queue1"), 3);
    ASSERT_EQ(mmp->getable_count("queue1"), 4);
    ASSERT_EQ(mmp->waitack_count("queue1"), 0);
}

// 获取数据测试
TEST(MessageTest, select_test)
{
    jiuqi::MessagePtr msg1 = mmp->front("queue1");
    ASSERT_EQ(msg1->payload().body(), "message1");
    ASSERT_EQ(mmp->getable_count("queue1"), 2);
    ASSERT_EQ(mmp->waitack_count("queue1"), 1);

    jiuqi::MessagePtr msg2 = mmp->front("queue1");
    ASSERT_EQ(msg2->payload().body(), "message2");
    ASSERT_EQ(mmp->getable_count("queue1"), 1);
    ASSERT_EQ(mmp->waitack_count("queue1"), 2);

    jiuqi::MessagePtr msg3 = mmp->front("queue1");
    ASSERT_EQ(msg3->payload().body(), "message3");
    ASSERT_EQ(mmp->getable_count("queue1"), 0);
    ASSERT_EQ(mmp->waitack_count("queue1"), 3);

    mmp->ack("queue1", msg1->payload().properties().id());
    mmp->ack("queue1", msg2->payload().properties().id());
    mmp->ack("queue1", msg3->payload().properties().id());
}

TEST(MessageTest, destory_test)
{
    mmp->destoryQueueMessage("queue1");
}

int main(int argc, char *argv[])
{
    testing::InitGoogleTest(&argc, argv);
    testing::AddGlobalTestEnvironment(new MessageTest);
    return RUN_ALL_TESTS();
}

 


网站公告

今日签到

点亮在社区的每一天
去签到