C++ - 仿 RabbitMQ 实现消息队列--案例

发布于:2025-08-09 ⋅ 阅读:(14) ⋅ 点赞:(0)

实现生产者客户端

#include "connection.hpp"

int main()
{
    jiuqi::AsyncWorker::ptr worker = std::make_shared<jiuqi::AsyncWorker>();
    jiuqi::Connection::ptr client = std::make_shared<jiuqi::Connection>("127.0.0.1", 8080, worker);
    jiuqi::Channel::ptr channel = client->openChannel();
    google::protobuf::Map<std::string, std::string> map;
    channel->declareExchange("exchange1", jiuqi::ExchangeType::TOPIC, true, false, map);
    channel->declareQueue("queue1", true, false, false, map);
    channel->declareQueue("queue2", true, false, false, map);
    channel->queueBind("exchange1", "queue1", "queue1");
    channel->queueBind("exchange1", "queue2", "news.music.#");

    for (int i = 0; i < 10; i++)
    {
        jiuqi::BasicProperties bp;
        bp.set_id(jiuqi::UUIDHelper::uuid());
        bp.set_deliver_mode(jiuqi::DelivertMode::DURABLE);
        bp.set_routing_key("news.music.pop");
        channel->basicPublish("exchange1", &bp, "hello world");
    }
    client->closeChannel(channel);
    return 0;
}

实现消费者客户端

#include "connection.hpp"

void cb(jiuqi::Channel::ptr &channel, const std::string &ctag, const jiuqi::BasicProperties *bp, const std::string &body)
{
    std::cout << ctag << "消费了: " << body << std::endl;
    channel->basicAck(bp->id());
}

int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        std::cout << "usage: ./consumer_client queuename\n";
        return -1;
    }
    jiuqi::AsyncWorker::ptr worker = std::make_shared<jiuqi::AsyncWorker>();
    jiuqi::Connection::ptr client = std::make_shared<jiuqi::Connection>("127.0.0.1", 8080, worker);
    jiuqi::Channel::ptr channel = client->openChannel();
    google::protobuf::Map<std::string, std::string> map;
    channel->declareExchange("exchange1", jiuqi::ExchangeType::TOPIC, true, false, map);
    channel->declareQueue("queue1", true, false, false, map);
    channel->declareQueue("queue2", true, false, false, map);
    channel->queueBind("exchange1", "queue1", "queue1");
    channel->queueBind("exchange1", "queue2", "news.music.#");

    auto func = std::bind(cb, channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
    channel->basicConsumer("consumer1", argv[1], false, func);

    std::this_thread::sleep_for(std::chrono::seconds(100));

    client->closeChannel(channel);

    return 0;
}


网站公告

今日签到

点亮在社区的每一天
去签到