C++ - 仿 RabbitMQ 实现消息队列--服务端核心模块实现(五)

发布于:2025-08-03 ⋅ 阅读:(8) ⋅ 点赞:(0)

目录

虚拟机管理

代码 

测试:

交换机路由管理

代码:

测试:


虚拟机管理

虚拟机模块是对上述三个数据管理模块的整合,并基于数据之间的关联关系进行联合操作。

  • 定义虚拟机类包含以下成员:
    • 交换机数据管理模块句柄
    • 队列数据管理模块句柄c. 绑定数据管理模块句柄d. 消息数据管理模块句柄
  • 虚拟机包含操作:
    • 提供声明交换机的功能(存在则 OK,不存在则创建)
    • 提供删除交换机的功能(删除交换机的同时删除关联绑定信息)
    • 提供声明队列的功能(存在则 OK,不存在则创建,创建的同时创建队列关联消息管理对象)
    • 提供删除队列的功能(删除队列的同时删除关联绑定信息,删除关联消息管理对象及队列所有消息)
    • 提供交换机-队列绑定的功能
    • 提供交换机-队列解绑的功能
    • 提供获取交换机相关的所有绑定信息功能
    • 提供新增消息的功能
    • 提供获取指定队列队首消息的功能
    • 提供消息确认删除的功能
  • 虚拟机管理操作:
    • 增删查

代码 

#pragma once

#include "exchange.hpp"
#include "queue.hpp"
#include "binding.hpp"
#include "message.hpp"

namespace jiuqi
{
    class VirtualHost
    {
    public:
        using ptr = std::shared_ptr<VirtualHost>;
        VirtualHost(const std::string name, const std::string &basedir, const std::string &dbfile)
            :_host_name(name),
             _emp(std::make_shared<ExchangeManager>(dbfile)),
             _qmp(std::make_shared<QueueManager>(dbfile)),
             _bmp(std::make_shared<BindingManager>(dbfile)),
             _mmp(std::make_shared<MessageManager>(basedir)) 
        {
            QueueMap qm = _qmp->allQueue();
            for (auto &q : qm)
            {
                _mmp->initQueueMessage(q.first);
            }
        }

        bool declareExchange(const std::string &name,
                             ExchangeType type,
                             bool durable,
                             bool auto_delete,
                             const google::protobuf::Map<std::string, std::string> &args)
        {
            return _emp->declareExchange(name, type, durable, auto_delete, args);
        }
        void deleteExchange(const std::string &name)
        {
            // 要删除相应的绑定信息
            _bmp->removeExchangeBinding(name);
            _emp->deleteExchange(name);
        }

        Exchange::ptr selectExchange(const std::string &ename)
        {
            return _emp->selectExchange(ename);
        }

        bool declareQueue(const std::string &name,
                          bool durable,
                          bool exclusive,
                          bool auto_delete,
                          const google::protobuf::Map<std::string, std::string> &args)
        {
            _mmp->initQueueMessage(name);
            return _qmp->declareQueue(name, durable, exclusive, auto_delete, args);
        }
        void deleteQueue(const std::string &name)
        {
            _mmp->destoryQueueMessage(name);
            _bmp->removeQueueBinding(name);
            _qmp->deleteQueue(name);
        }

        bool bind(const std::string &ename, const std::string &qname, const std::string &key)
        {
            Exchange::ptr ep = _emp->selectExchange(ename);
            if (ep == nullptr) 
            {
                DEBUG("队列绑定失败,交换机%s不存在", ename.c_str());
                return false;
            }
            MsgQueue::ptr qp = _qmp->selectQueue(qname);
            if (qp == nullptr) 
            {
                DEBUG("队列绑定失败,队列%s不存在", qname.c_str());
                return false;
            }            
            return _bmp->bind(ename, qname, key, ep->durable && qp->durable);
        }
        void unBind(const std::string &ename, const std::string &qname)
        {   
            _bmp->unbind(ename, qname);
        }
        QueueBindingMap exchangeBinding(const std::string &ename)
        {
            return _bmp->getExchangeBindings(ename);
        }

        bool basicPublish(const std::string &qname, BasicProperties *bp, const std::string &body)
        {
            MsgQueue::ptr qp = _qmp->selectQueue(qname);
            if (qp == nullptr) 
            {
                DEBUG("发布消息失败,队列%s不存在", qname.c_str());
                return false;
            }
            return _mmp->insert(qname, bp, body, qp->durable);
        }
        
        MessagePtr basicConsume(const std::string &qname)
        {
            return _mmp->front(qname);
        }
        void basicAck(const std::string &qname, const std::string &msgid)
        {
            return _mmp->ack(qname, msgid);
        }

        bool existsExchange(const std::string name)
        {
            return _emp->exists(name);
        }

        bool existsQueue(const std::string name)
        {
            return _qmp->exists(name);
        }

        bool existsBinding(const std::string ename, const std::string qname)
        {
            return _bmp->exists(ename, qname);
        }

        QueueMap allqueue()
        {
            return _qmp->allQueue();
        }

        void clear()
        {
            _emp->clear();
            _qmp->clear();
            _bmp->clear();
            _mmp->clear();
        }
    private:
        std::string _host_name;
        ExchangeManager::ptr _emp;
        QueueManager::ptr _qmp;
        BindingManager::ptr _bmp;
        MessageManager::ptr _mmp;
    };
} 

说明:

  • 删除交换机时,同时要删除相应的绑定信息。
  • 删除队列时,同时要删除相应的绑定信息和队列消息。

测试:

#include <gtest/gtest.h>
#include "../mqserver/vhost.hpp"

google::protobuf::Map<std::string, std::string> map = {{"k1", "v1"}, {"k2", "v2"}};

class HostTest : public testing::Test
{
public:
    static void SetUpTestCase()
    {
        std::cout << "所有单元测试前初始化环境\n";
    }

    static void TearDownTestCase()
    {
        std::cout << "所有单元测试完毕后清理环境\n";
    }

    void SetUp() override
    {
        vhp = std::make_shared<jiuqi::VirtualHost>("host", "./data/queue", "./data/host.db");
        vhp->declareExchange("exchange1", jiuqi::ExchangeType::FANOUT, true, false, map);
        vhp->declareExchange("exchange2", jiuqi::ExchangeType::FANOUT, true, false, map);
        vhp->declareExchange("exchange3", jiuqi::ExchangeType::FANOUT, true, false, map);

        vhp->declareQueue("queue1", true, false, false, map);
        vhp->declareQueue("queue2", true, false, false, map);
        vhp->declareQueue("queue3", true, false, false, map);

        vhp->bind("exchange1", "queue1", "news.music.#");
        vhp->bind("exchange1", "queue2", "news.music.#");
        vhp->bind("exchange1", "queue3", "news.music.#");

        vhp->bind("exchange2", "queue1", "news.music.#");
        vhp->bind("exchange2", "queue2", "news.music.#");
        vhp->bind("exchange2", "queue3", "news.music.#");

        vhp->bind("exchange3", "queue1", "news.music.#");
        vhp->bind("exchange3", "queue2", "news.music.#");
        vhp->bind("exchange3", "queue3", "news.music.#");

        vhp->basicPublish("queue1", nullptr, "hello1");
        vhp->basicPublish("queue1", nullptr, "hello2");
        vhp->basicPublish("queue1", nullptr, "hello3");

        vhp->basicPublish("queue2", nullptr, "hello1");
        vhp->basicPublish("queue2", nullptr, "hello2");
        vhp->basicPublish("queue2", nullptr, "hello3");

        vhp->basicPublish("queue3", nullptr, "hello1");
        vhp->basicPublish("queue3", nullptr, "hello2");
        vhp->basicPublish("queue3", nullptr, "hello3");
    }

    void TearDown() override
    {
        vhp->clear();
    }

    jiuqi::VirtualHost::ptr vhp;
};

TEST_F(HostTest, init_test)
{
    ASSERT_EQ(vhp->existsExchange("exchange1"), true);
    ASSERT_EQ(vhp->existsExchange("exchange2"), true);
    ASSERT_EQ(vhp->existsExchange("exchange3"), true);

    ASSERT_EQ(vhp->existsQueue("queue1"), true);
    ASSERT_EQ(vhp->existsQueue("queue2"), true);
    ASSERT_EQ(vhp->existsQueue("queue3"), true);

    ASSERT_EQ(vhp->existsBinding("exchange1", "queue1"), true);
    ASSERT_EQ(vhp->existsBinding("exchange1", "queue2"), true);
    ASSERT_EQ(vhp->existsBinding("exchange1", "queue3"), true);

    ASSERT_EQ(vhp->existsBinding("exchange2", "queue1"), true);
    ASSERT_EQ(vhp->existsBinding("exchange2", "queue2"), true);
    ASSERT_EQ(vhp->existsBinding("exchange2", "queue3"), true);

    ASSERT_EQ(vhp->existsBinding("exchange3", "queue1"), true);
    ASSERT_EQ(vhp->existsBinding("exchange3", "queue2"), true);
    ASSERT_EQ(vhp->existsBinding("exchange3", "queue3"), true);

    jiuqi::MessagePtr msg1 = vhp->basicConsume("queue1");
    jiuqi::MessagePtr msg2 = vhp->basicConsume("queue1");
    jiuqi::MessagePtr msg3 = vhp->basicConsume("queue1");
    jiuqi::MessagePtr msg4 = vhp->basicConsume("queue1");

    ASSERT_EQ(msg1->payload().body(), "hello1");
    ASSERT_EQ(msg2->payload().body(), "hello2");
    ASSERT_EQ(msg3->payload().body(), "hello3");
    ASSERT_EQ(msg4, nullptr);

    msg1 = vhp->basicConsume("queue2");
    msg2 = vhp->basicConsume("queue2");
    msg3 = vhp->basicConsume("queue2");
    msg4 = vhp->basicConsume("queue2");

    ASSERT_EQ(msg1->payload().body(), "hello1");
    ASSERT_EQ(msg2->payload().body(), "hello2");
    ASSERT_EQ(msg3->payload().body(), "hello3");
    ASSERT_EQ(msg4, nullptr);

    msg1 = vhp->basicConsume("queue3");
    msg2 = vhp->basicConsume("queue3");
    msg3 = vhp->basicConsume("queue3");
    msg4 = vhp->basicConsume("queue3");

    ASSERT_EQ(msg1->payload().body(), "hello1");
    ASSERT_EQ(msg2->payload().body(), "hello2");
    ASSERT_EQ(msg3->payload().body(), "hello3");
    ASSERT_EQ(msg4, nullptr);
}

TEST_F(HostTest, remove_test)
{
    vhp->deleteExchange("exchange1");
    ASSERT_EQ(vhp->existsBinding("exchange1", "queue1"), false);
    ASSERT_EQ(vhp->existsBinding("exchange1", "queue2"), false);
    ASSERT_EQ(vhp->existsBinding("exchange1", "queue3"), false);
    
    vhp->deleteQueue("queue1");
    ASSERT_EQ(vhp->existsBinding("exchange1", "queue1"), false);
    ASSERT_EQ(vhp->existsBinding("exchange2", "queue1"), false);
    ASSERT_EQ(vhp->existsBinding("exchange3", "queue1"), false);   
    
    jiuqi::MessagePtr msg1 = vhp->basicConsume("queue1");
    ASSERT_EQ(msg1, nullptr);
}

int main(int argc, char *argv[])
{
    testing::InitGoogleTest(&argc, argv);
    return RUN_ALL_TESTS();
}

交换机路由管理

        客户端将消息发布到指定的交换机,交换机这时候要考虑这条数据该放入到哪些与自己绑定的队列中,而这个考量是通过交换机类型以及匹配规则来决定的:

  1. 广播交换:直接将消息交给所有绑定的队列,无需匹配
  2. 直接交换:队列绑定信息中的 binding_key 与消息中的 routing_key 一致则匹配成功,否则失败。
  3. 主题交换:只有匹配队列主题的消息才会被放入队列中

        其中广播交换和直接交换,都非常简单,唯一较为难以理解的是主题交换。在这里我们需要先对 binding_key 和 routing_key 作以了解:

binding_key
        是由数字字母下划线构成的, 并且使用 . 分成若干部分,并支持 * 和 # 通配符。
例如:news.music.#,这用于表示交换机绑定的当前队列是一个用于发布音乐新闻的队列。

  • 支持 * 和 # 两种通配符, 但是 * # 只能作为 . 切分出来的独立部分, 不能和其他数字字母混用, 
    • 比如 a.*.b 是合法的, a.*a.b 是不合法的
    • * 可以匹配任意一个单词(注意是单词不是字母)
    • # 可以匹配零个或者多个任意单词(注意是单词不是字母)
  • 注意事项: a.#.b
    • 一个单词中不能既出现 * 又出现 #, 也就是,一个单词中只能有一个通配符,且必须独立存在
    • #通配符两边不能出现其他通配符,因为 # 可以匹配任意多个任意单词,因此连续出现是没有意义的。

routing_key
        是由数据、字母和下划线构成, 并且可以使用 . 划分成若干部分。
例如:news.music.pop,这用于表示当前发布的消息是一个流行音乐的新闻.
比如,在进行队列绑定时,某队列的 binding_key 约定为:news.music.#表示这个队列用于发布音乐新闻。而这时候客户端发布了一条消息,其中 routing_key 为:news.music.pop 则可以匹配成功,而如果发布消息的 routing_key 为:news.sport.football,这时候就会匹配失败。

匹配算法
        定义一个二维数组来标记每次匹配的结果,通过最终数组末尾位置的结果来查看是否整体匹配成功。
        使用 routing_key 中的每个单词,与 binding_key 中的单词进行逐个匹配,根据匹配结果来标记数组内容,最终以数组中的末尾标记来确定是否匹配成功。
        该动态规划的核心主要在推导递推公式, 下面我们通过几个示例来推导递推公式。

 示例 1
binding_key = "bbb.ddd"; routing_key = "aaa.ddd"
定义:dp[2][2]

aaa ddd
bbb 0 0
ddd 0 1

binding_key = "aaa.ddd"; routing_key = "aaa.ddd"
定义:dp[2][2]

aaa ddd
aaa 1 0
ddd 0 1

         从上述例子中理解,两个单词匹配成功,并不是将位置无脑标记为 1,而是需要考虑父级单词是否匹配成功,只有父级是匹配成功的,本次匹配成功才有意义。
        所以理解一个关键点:当一个 routing_key 单词,与 binding_key 单词匹配成功,则应该继承上一个单词(上一行和上一列)的匹配结果。
        单词匹配成功: dp[i][j] = dp[i - 1][j - 1]。
        但是,在将思想转换为代码时,我们考虑当 aaa 匹配成功时,从左上继承结果,但是这时候是没有左上位置的,因此对于代码的逻辑就出现了一个例外的点(代码处理额外增加了难度)。
        因此,为了便于将思想转换为代码,因此我们的数组大小定义行列分别额外多申请一
行一列,并将 dp[0][0]位置置 1。

 

dp aaa ddd
1 0 0
aaa 0 1 0
ddd 0 0 1

        这样初始将 dp[0][0] 位置置 1, 其他数组位置全部置 0; 这样只要单词匹配成功,则
从左上位置继承结果。

示例 2:#通配符的特殊

binding_key = "#"; routing_key = "aaa.bbb"

aaa bbb
1 0 0
# 0 1 0

        从这个例子中,能看出,当出现#通配符的时候是比较特殊的,如果 bbb 与#匹配成功的时候,从左上继承结果,得到的结果是 0,匹配失败,但是实际结果应该是成功的。因此,得出结论:当遇到通配符 # 时,不仅从左上继承结果,还可以从上一个单词与#的匹配结果处(左边)继承,即: dp[i][j] = dp[i - 1][j - 1] | dp[i][j - 1] 。

aaa bbb
1 0 0
# 0 1 1

示例 3:#通配符的特殊

binding_key = "aaa.#"; routing_key = "aaa"

aaa
1 0
aaa 0 1
# 0 0

        从上例中,看出,当 aaa 与#匹配成功时,从左边和左上继承的结果这时候都是 0,这也是不合理的。
        结论,因此当遇到 # 通配符匹配成功时,不仅从 左上,左边继承结果,也可以从上方
继承结果,即:dp[i][j] = dp[i - 1][j - 1] | dp[i][j - 1] | dp[i - 1][j]。

示例 4:#通配符的特殊

binding_key = "#.aaa"; routing_key = "aaa";

aaa
1 0
# 0 1
aaa 0 0

        观察上述例子,当 aaa 匹配成功时,从左上继承匹配结果,这时候继承到的是 0 ,这是有问题的。
        因此,当 binding_key 中以起始行以#开始时,应该将起始行的第 0 列置为 1,以便于后边的匹配结果继承。

aaa
1 0
# 1 1
aaa 0 1

代码:

#pragma once
#include <iostream>
#include "../mqcommon/helper.hpp"
#include "../mqcommon/msg.pb.h"

namespace jiuqi
{
    class Router
    {
    public:
        static bool isLegalRoutingKey(const std::string &routingkey)
        {
            // routingkey: 只需要检查是否包含非法字符即可
            for (auto &ch : routingkey)
            {
                if ((ch >= 'a' && ch <= 'z') ||
                    (ch >= 'A' && ch <= 'Z') ||
                    (ch >= '0' && ch <= '9') ||
                    ch == '_' || ch == '.')
                    continue;
                return false;
            }
            return true;
        }

        static bool isLegalBindingKey(const std::string &bindingkey)
        {
            // 不能包含非法字符
            for (auto &ch : bindingkey)
            {
                if ((ch >= 'a' && ch <= 'z') ||
                    (ch >= 'A' && ch <= 'Z') ||
                    (ch >= '0' && ch <= '9') ||
                    ch == '_' || ch == '.' ||
                    ch == '#' || ch == '*')
                    continue;
                return false;
            }

            // "#"与"*"必须独立出现
            std::vector<std::string> sub_words;
            StrHelper::split(bindingkey, ".", sub_words);
            for (auto &word : sub_words)
            {
                if (word.size() > 1 &&
                    (word.find("*") != std::string::npos ||
                     word.find("#") != std::string::npos))
                    return false;
            }

            //"#"两边不能有通配符
            for (int i = 1; i < sub_words.size(); i++)
            {
                if (sub_words[i] == "#" && sub_words[i - 1] == "#")
                    return false;
                if (sub_words[i] == "#" && sub_words[i - 1] == "*")
                    return false;
                if (sub_words[i] == "*" && sub_words[i - 1] == "#")
                    return false;
            }
            return true;
        }

        static bool route(ExchangeType type, const std::string &routingkey, const std::string &bindingkey)
        {
            if (type == ExchangeType::FANOUT)
                return true;
            else if (type == ExchangeType::DIRECT)
                return routingkey == bindingkey;

            // #可以匹配多个单词, *可以匹配一个单词
            std::vector<std::string> routing_sub_words;
            std::vector<std::string> binding_sub_words;

            // 处理空字符串的特殊情况
            if (routingkey.empty() && bindingkey.empty())
                return true;
            if (routingkey.empty() || bindingkey.empty())
                return bindingkey == "#"; // 只有绑定键是"#"时才匹配空路由键
                
            int n_route = StrHelper::split(routingkey, ".", routing_sub_words);
            int n_bind = StrHelper::split(bindingkey, ".", binding_sub_words);

            std::vector<std::vector<bool>> dp(n_bind + 1, std::vector<bool>(n_route + 1, false));
            dp[0][0] = true;

            // bindingkey与#起始
            for (int i = 1; i <= n_bind; i++)
            {
                if (routing_sub_words[i - 1] == "#")
                {
                    dp[i][0] = true;
                    continue;
                }
                break;
            }

            for (int i = 1; i <= n_bind; i++)
            {
                for (int j = 1; j <= n_route; j++)
                {
                    // 如果bindword是*或者两者单词相同就从左上方继承
                    if (binding_sub_words[i - 1] == "*" ||
                        binding_sub_words[i - 1] == routing_sub_words[j - 1])
                        dp[i][j] = dp[i - 1][j - 1];
                    // 如果bindword是#, 从左方,左上方,上方继承
                    else if (binding_sub_words[i - 1] == "#")
                        dp[i][j] = dp[i - 1][j - 1] | dp[i][j - 1] | dp[i - 1][j];
                }
            }
            return dp[n_bind][n_route];
        }
    };
}

测试:


#include "../mqserver/route.hpp"
#include <gtest/gtest.h>

class RouteTest : public testing::Environment
{
public:
    virtual void SetUp() override
    {
    }

    virtual void TearDown() override
    {
    }
};

TEST(RouteTest, legal_route_test)
{
    std::string rk1 = "news._music.pop";
    std::string rk2 = "news.._music.pop";
    std::string rk3 = "news..,_music.pop";
    std::string rk4 = "news._123music.pop";
    ASSERT_EQ(jiuqi::Router::isLegalRoutingKey(rk1), true);
    ASSERT_EQ(jiuqi::Router::isLegalRoutingKey(rk2), true);
    ASSERT_EQ(jiuqi::Router::isLegalRoutingKey(rk3), false);
    ASSERT_EQ(jiuqi::Router::isLegalRoutingKey(rk4), true);
}

TEST(RouteTest, legal_bind_test)
{
    std::string bk1 = "news._music.pop";
    std::string bk2 = "news.._music.pop";
    std::string bk3 = "news..,_music.pop";
    std::string bk4 = "news._123music.pop";
    std::string bk5 = "news.#._123music.pop";
    std::string bk6 = "news.*._123music.pop";
    std::string bk7 = "news._#123music.pop";
    std::string bk8 = "news.#.*._123music.pop";
    std::string bk9 = "news.#.#._123music.pop";
    std::string bk10 = "news.*.*._123music.pop";
    std::string bk11 = "news.*.#._123music.pop";

    ASSERT_EQ(jiuqi::Router::isLegalBindingKey(bk1), true);
    ASSERT_EQ(jiuqi::Router::isLegalBindingKey(bk2), true);
    ASSERT_EQ(jiuqi::Router::isLegalBindingKey(bk3), false);
    ASSERT_EQ(jiuqi::Router::isLegalBindingKey(bk4), true);
    ASSERT_EQ(jiuqi::Router::isLegalBindingKey(bk5), true);
    ASSERT_EQ(jiuqi::Router::isLegalBindingKey(bk6), true);
    ASSERT_EQ(jiuqi::Router::isLegalBindingKey(bk7), false);
    ASSERT_EQ(jiuqi::Router::isLegalBindingKey(bk8), false);
    ASSERT_EQ(jiuqi::Router::isLegalBindingKey(bk9), false);
    ASSERT_EQ(jiuqi::Router::isLegalBindingKey(bk10), true);
    ASSERT_EQ(jiuqi::Router::isLegalBindingKey(bk11), false);
}

// FANOUT交换器路由测试
TEST(RouteTest, Route_Fanout_AlwaysMatch)
{
    ASSERT_EQ(jiuqi::Router::route(jiuqi::ExchangeType::FANOUT, "any.key", "any.binding"), true);
    ASSERT_EQ(jiuqi::Router::route(jiuqi::ExchangeType::FANOUT, "", ""), true);
    ASSERT_EQ(jiuqi::Router::route(jiuqi::ExchangeType::FANOUT, "a.b.c", "x.y.z"), true);
}

// DIRECT交换器路由测试
TEST(RouteTest, Route_Direct_ExactMatch)
{
    ASSERT_EQ(jiuqi::Router::route(jiuqi::ExchangeType::DIRECT, "same.key", "same.key"), true);
    ASSERT_EQ(jiuqi::Router::route(jiuqi::ExchangeType::DIRECT, "a.b.c", "a.b.c"), true);
    ASSERT_EQ(jiuqi::Router::route(jiuqi::ExchangeType::DIRECT, "", ""), true);
}

TEST(RouteTest, Route_Direct_NotMatch)
{
    ASSERT_EQ(jiuqi::Router::route(jiuqi::ExchangeType::DIRECT, "same.key", "different.key"), false);
    ASSERT_EQ(jiuqi::Router::route(jiuqi::ExchangeType::DIRECT, "a.b.c", "a.b"), false);
    ASSERT_EQ(jiuqi::Router::route(jiuqi::ExchangeType::DIRECT, "a.b", "a.b.c"), false);
}

// TOPIC交换器路由测试
TEST(RouteTest, Route_Topic_ExactMatch)
{
    ASSERT_EQ(jiuqi::Router::route(jiuqi::ExchangeType::TOPIC, "a.b.c", "a.b.c"), true);
    ASSERT_EQ(jiuqi::Router::route(jiuqi::ExchangeType::TOPIC, "news.sports", "news.sports"), true);
}

TEST(RouteTest, Route_Topic_StarWildcard)
{
    ASSERT_EQ(jiuqi::Router::route(jiuqi::ExchangeType::TOPIC, "a.b.c", "a.*.c"), true);
    ASSERT_EQ(jiuqi::Router::route(jiuqi::ExchangeType::TOPIC, "a.b.c", "*.b.*"), true);
    ASSERT_EQ(jiuqi::Router::route(jiuqi::ExchangeType::TOPIC, "a.b.c", "a.*"), false);
}

TEST(RouteTest, Route_Topic_HashWildcard)
{
    ASSERT_EQ(jiuqi::Router::route(jiuqi::ExchangeType::TOPIC, "a.b.c", "a.#"), true);
    ASSERT_EQ(jiuqi::Router::route(jiuqi::ExchangeType::TOPIC, "a.b.c", "#"), true);
    ASSERT_EQ(jiuqi::Router::route(jiuqi::ExchangeType::TOPIC, "a", "a.#"), true);
    ASSERT_EQ(jiuqi::Router::route(jiuqi::ExchangeType::TOPIC, "a.b.c", "#.c"), true);
}

TEST(RouteTest, Route_Topic_MixedWildcards)
{
    ASSERT_EQ(jiuqi::Router::route(jiuqi::ExchangeType::TOPIC, "a.b.c.d", "a.*.#"), true);
    ASSERT_EQ(jiuqi::Router::route(jiuqi::ExchangeType::TOPIC, "a.b.c.d", "a.#.d"), true);
    ASSERT_EQ(jiuqi::Router::route(jiuqi::ExchangeType::TOPIC, "a.b.c.d", "a.*.d"), false);
}

TEST(RouteTest, Route_Topic_EdgeCases)
{
    ASSERT_EQ(jiuqi::Router::route(jiuqi::ExchangeType::TOPIC, "", "#"), true);
    ASSERT_EQ(jiuqi::Router::route(jiuqi::ExchangeType::TOPIC, "", "*"), false);
    ASSERT_EQ(jiuqi::Router::route(jiuqi::ExchangeType::TOPIC, "a.b.c", "a.#.#"), true);
}

TEST(RouteTest, Route_Topic_ComplexCases)
{
    ASSERT_EQ(jiuqi::Router::route(jiuqi::ExchangeType::TOPIC,
                                   "news.sports.football.scores", "news.sports.#"),
              true);
    ASSERT_EQ(jiuqi::Router::route(jiuqi::ExchangeType::TOPIC,
                                   "news.sports.football.scores", "news.*.football.#"),
              true);
    ASSERT_EQ(jiuqi::Router::route(jiuqi::ExchangeType::TOPIC,
                                   "news.sports.football.scores", "news.*.basketball.#"),
              false);
}

int main(int argc, char *argv[])
{
    testing::InitGoogleTest(&argc, argv);
    testing::AddGlobalTestEnvironment(new RouteTest);
    return RUN_ALL_TESTS();
}