C++ - 仿 RabbitMQ 实现消息队列(2)(Protobuf 和 Muduo 初识)

发布于:2025-05-21 ⋅ 阅读:(22) ⋅ 点赞:(0)

我们之前把仿 RabbitMQ 实现消息队列的环境搭建好了,我们今天主要是来使用一下protobuf和Muduo库做一个简单的应用,为之后的项目做准备。

如果之前环境还没有装好的小伙伴可以点击这里:

https://blog.csdn.net/qq_67693066/article/details/147733266

Protobuf

首先,我们查看一下我们有没有装protobuf:
在这里插入图片描述

// 定义protobuf的版本
syntax = "proto3";
package chat;

//定义消息体
message ChatMessage{
    string sender = 1; //发送方
    string text = 2; //文本内容
    int64 timestamp = 3; //时间戳
}

//消息请求
message ChatRequest{
    enum Type
    {
        SEND = 0;
        QUERY = 1;
    }

    Type type = 1;
    ChatMessage message = 2;
}

//消息回应
message ChatResponse{
    bool success = 1;
    string info = 2; //回应信息
    repeated ChatMessage messages = 3;
}

然后执行protoc --cpp_out=. chat.proto,然后就会多出两个文件:
在这里插入图片描述

这段代码是使用 Protocol Buffers (protobuf) 定义的一组消息格式,具体来说:

  • syntax = "proto3";:指定使用 proto3 版本的语法。Proto3 相比于 Proto2 简化了很多特性,并且支持更多语言。

  • package chat;:定义了一个包声明 chat,这有助于防止不同项目之间的命名冲突。

接下来是三个消息类型定义:

  1. message ChatMessage:定义了一个名为 ChatMessage 的消息类型,表示一条聊天消息。它包含了以下字段:

    • string sender = 1;:发送者的名称。
    • string text = 2;:消息内容。
    • int64 timestamp = 3;:消息的时间戳,使用 Unix 时间格式(从 1970 年 1 月 1 日开始计算的秒数)。
  2. message ChatRequest:定义了一个名为 ChatRequest 的消息类型,用于请求聊天操作。它包括:

    • enum Type:一个枚举类型 Type,包含两种可能的值 SEND = 0;QUERY = 1;,分别代表发送消息和查询消息的请求类型。
    • Type type = 1;:请求的类型,使用上述 Type 枚举中的一个值。
    • ChatMessage message = 2;:如果这是一个发送消息的请求,这里会包含一条 ChatMessage
  3. message ChatResponse:定义了一个名为 ChatResponse 的消息类型,用于响应聊天请求。它包括:

    • bool success = 1;:表示请求是否成功处理。
    • string info = 2;:提供额外的信息或错误描述。
    • repeated ChatMessage messages = 3;:一个 ChatMessage 类型的列表,用于返回多条消息(例如查询请求的结果)。

这些定义可以被编译成多种编程语言(如 C++, Java, Python等),以便在不同的系统或服务之间传输结构化的数据。

我们看看里面的核心方法有哪些:
从您提供的 Protobuf 生成的 C++ 代码中,最核心的方法可以分为以下几类:


1. 序列化/反序列化方法(最核心)

这些方法是 Protobuf 自动生成的核心功能,用于消息的二进制编码和解码:

_InternalSerialize()
uint8_t* ChatMessage::_InternalSerialize(
    uint8_t* target, ::PROTOBUF_NAMESPACE_ID::io::EpsCopyOutputStream* stream) const
  • 作用:将消息对象序列化为二进制格式
  • 调用链:被 SerializeToArray()/SerializeToString() 调用
  • 重要性:所有网络传输和持久化存储的基础
_InternalParse()
const char* ChatMessage::_InternalParse(const char* ptr, ::_pbi::ParseContext* ctx)
  • 作用:从二进制数据反序列化为消息对象
  • 调用链:被 ParseFromArray()/ParseFromString() 调用
  • 重要性:接收数据时的关键解析方法

2. 内存管理方法

SharedCtor()/SharedDtor()
void ChatMessage::SharedCtor(::_pb::Arena* arena, bool is_message_owned)
void ChatMessage::SharedDtor()
  • 作用:构造/析构时的内存管理
  • 特点:处理字符串字段的内存分配和释放
InternalSwap()
void ChatMessage::InternalSwap(ChatMessage* other)
  • 作用:高效交换两个对象的内容
  • 使用场景:被赋值运算符和移动语义操作调用

3. 字段访问方法

生成的 getter/setter

如:

// string sender = 1;
inline const std::string& sender() const;
inline void set_sender(const std::string& value);
  • 作用:提供类型安全的字段访问接口

4. 工具方法

ByteSizeLong()
size_t ChatMessage::ByteSizeLong() const
  • 作用:计算消息序列化后的字节大小
  • 用途:预先分配缓冲区大小
IsInitialized()
bool ChatMessage::IsInitialized() const
  • 作用:检查所有 required 字段是否已设置(在 proto3 中主要用作占位符)

关键设计特点:

  1. 零拷贝优化:通过 ArenaStringPtr 管理字符串字段,减少内存拷贝
  2. 延迟解析:Parse 方法支持按需解析字段
  3. 类型安全:强类型的 getter/setter 方法
  4. 二进制紧凑:采用 Tag-Length-Value 编码格式

实际使用中最常用的方法:

// 序列化
message.SerializeToString(&output_string);

// 反序列化
message.ParseFromString(input_string);

// 字段访问
message.set_sender("Alice");
const std::string& sender = message.sender();

这些生成的代码构成了 Protobuf 高效序列化能力的核心,而 Muduo 网络库则会利用这些方法来实现网络消息的收发和处理。

muduo

Muduo 由陈硕大佬开发,是一个基于非阻塞 IO 和事件驱动的 C++高并发 TCP 网络编
程库。 它是一款基于主从 Reactor 模型的网络库,其使用的线程模型是 one loop per
thread, 所谓 one loop per thread 指的是:

  • 一个线程只能有一个事件循环(EventLoop), 用于响应计时器和 IO 事件
  • 一个文件描述符只能由一个线程进行读写,换句话说就是一个 TCP 连接必须归属
    于某个 EventLoop 管理

在这里插入图片描述

Reactor模式

Reactor 模型是事件驱动的网络编程模型,它的核心思想是**“当事件发生时,才进行处理”**,而不是阻塞等待。Muduo 网络库正是基于 Reactor 模式设计的。我用最通俗的方式解释它的原理和工作流程:


1. Reactor 是什么?

  • 类比:就像餐厅的服务员

    • 传统阻塞模型:一个服务员全程服务一桌客人(上菜、倒水、结账都卡住)
    • Reactor 模型:一个服务员监听所有桌子的需求(有需求才过去处理)
  • 核心三要素

    1. Event Loop(事件循环):不断检查是否有事件发生
    2. Demultiplexer(事件分发器):监听哪些socket有事件(如 select/epoll)
    3. Event Handler(事件处理器):处理具体事件(如 onMessage)
    

2. Reactor 工作流程

以 Muduo 的 TCP 服务器为例:

  ┌───────────────────────────────────────────────────┐
  │  Event Loop                                      │
  │   ┌──────────────────────────────┐               │
  │   │  epoll_wait (监听所有socket) │◀──新连接/数据──┤
  │   └──────────────┬───────────────┘               │
  │         检测到事件│                              │
  │   ┌──────────────▼───────────────┐               │
  │   │  根据事件类型分发到对应处理器  │               │
  │   └──────────────┬───────────────┘               │
  │          │                                       │
  │   ┌──────▼──────┐  ┌─────────────┐  ┌─────────┐ │
  │   │ 新连接处理器 │  │ 数据处理器  │  │ 错误处理器│ │
  │   │ (onConnection)│  │ (onMessage) │  │ (onError) │ │
  │   └─────────────┘  └─────────────┘  └─────────┘ │
  └───────────────────────────────────────────────────┘

3. 与传统阻塞模型的对比

特性 Reactor 模型 阻塞模型
线程使用 单线程处理多连接(IO多路复用) 每个连接一个线程
资源消耗 低(1个线程管理成千上万个连接) 高(线程数=连接数)
响应方式 事件触发(有数据才唤醒) 阻塞等待(没数据也占着线程)
典型实现 Muduo、Nginx、Redis 早期Java BIO

4. Reactor 在 Muduo 中的体现

// 1. 创建事件循环(Reactor核心)
EventLoop loop;

// 2. 创建服务器并注册事件处理器
TcpServer server(&loop, listenAddr, "EchoServer");
server.setConnectionCallback(onConnection);  // 新连接事件处理器
server.setMessageCallback(onMessage);        // 数据到达事件处理器

// 3. 启动事件循环(开始监听事件)
loop.loop();  // 内部调用 epoll_wait
  • 当事件发生时
    1. epoll_wait 返回活跃的 socket
    2. EventLoop 根据事件类型(新连接/数据到达)调用对应的 Callback
    3. 你的 onMessage 等函数被触发执行

5. 为什么用 Reactor?

  • 高性能:单线程可处理数万并发连接(如 Nginx)
  • 低延迟:没有线程切换开销
  • 适合场景
    • 短连接服务(HTTP)
    • 高并发低延迟(即时通讯)
    • 需要精细控制IO的场合(代理服务器)

6. 通俗理解案例

假设你经营一家奶茶店:

  • 传统模式:每个顾客配一个店员(线程),店员必须等顾客做完决定才能服务下一个
  • Reactor模式
    • 1个前台(EventLoop)监听所有顾客(socket)
    • 顾客举手(事件发生)时,前台派空闲店员(Callback)去处理
    • 没顾客举手时,前台可以喝茶休息(epoll_wait 休眠)

我们来看一个简单的例子:

#include <muduo/net/TcpServer.h>    // TcpServer 类,用于创建 TCP 服务器
#include <muduo/net/EventLoop.h>     // EventLoop 事件循环类,负责 I/O 多路复用和事件处理
#include <muduo/base/Logging.h>      // 日志系统,用于输出调试、信息、错误等日志

using namespace muduo;
using namespace muduo::net;

// 定义消息回调函数:当客户端发送数据时,该函数会被调用
// 参数说明:
// - conn: 表示当前连接对象的智能指针
// - buf: 接收到的数据缓冲区(muduo 的 Buffer 类型)
// - time: 接收数据的时间戳(本例中未使用)
void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp)
{
    // 将接收的数据转换为字符串
    std::string msg = buf->retrieveAllAsString();  // 提取全部数据并清空缓冲区

    // 在终端打印接收到的消息内容
    printf("[Server] Received: %s\n", msg.c_str());

    // 将接收到的数据原样返回给客户端
    conn->send(buf);  // send 函数接受的是 Buffer* 类型,所以可以直接传入 buf
}

int main()
{
    // 设置日志输出级别为 DEBUG,这样会显示更多的调试信息
    // 默认情况下,muduo 可能只输出 INFO 和以上级别的日志
    Logger::setLogLevel(Logger::DEBUG);

    // 创建一个 EventLoop 对象,它是整个 muduo 网络模型的核心
    // 所有的网络事件(如 accept、read、write)都会在这个 loop 中处理
    EventLoop loop;  // 核心事件循环

    // 创建 TcpServer 实例
    // 参数说明:
    // - loop: 使用哪个 EventLoop 来驱动服务器
    // - InetAddress(8888): 监听本地所有 IP 地址(0.0.0.0)上的 8888 端口
    // - "SimpleServer": 服务器名称,用于日志输出中的标识
    TcpServer server(&loop, InetAddress(8888), "SimpleServer");

    // 设置消息回调函数
    // 当服务器从客户端读取到数据时,就会调用这个 onMessage 函数
    server.setMessageCallback(onMessage);

    // 启动服务器
    // 这个函数不会立即开始监听端口,而是在下一次进入 EventLoop 后执行初始化操作
    server.start();

    // 启动事件循环
    // 进入主事件循环后,程序将一直运行,等待客户端连接和数据的到来
    loop.loop();

    return 0;
}

📌 编译命令建议(请确保 muduo 已正确安装):

g++ -std=c++11 echo_server.cpp -lmuduo_net -lmuduo_base -lpthread -o echo_server

✅ 测试方法:

  1. 启动服务端:

    ./echo_server
    
  2. 打开另一个终端,使用 telnet 或 nc 测试:

    telnet 127.0.0.1 8888
    

    或者:

    nc localhost 8888
    
  3. 在客户端输入任意文本,例如:

    Hello Muduo!
    
  4. 你会看到服务端打印类似信息:

    [Server] Received: Hello Muduo!
    
  5. 同时客户端也会收到同样的回显消息。


这样我们可以利用muduo库编写一个简单的程序,客户端连接到服务端之后,服务端回显客户端发送的数据:

客户端代码:

client.h
#ifndef CHAT_CLIENT_H
#define CHAT_CLIENT_H

#include <muduo/net/TcpClient.h>
#include <functional>

class ChatClient {
public:
    using ConnectionCallback = std::function<void()>;
    
    ChatClient(muduo::net::EventLoop* loop, 
              const muduo::net::InetAddress& serverAddr);
    
    void connect();
    void sendMessage(const std::string& text);
    bool isConnected() const;

private:
    void onConnection(const muduo::net::TcpConnectionPtr& conn);
    void onMessage(const muduo::net::TcpConnectionPtr& conn,
                  muduo::net::Buffer* buf,
                  muduo::Timestamp time);

    muduo::net::TcpClient client_;
    muduo::net::TcpConnectionPtr conn_;
};

#endif
client.cpp
#include "client.h"
#include <muduo/base/Logging.h>
#include <iostream>

using namespace muduo;
using namespace muduo::net;

ChatClient::ChatClient(EventLoop* loop, const InetAddress& serverAddr)
    : client_(loop, serverAddr, "ChatClient") {
    client_.setConnectionCallback(
        std::bind(&ChatClient::onConnection, this, _1));
    client_.setMessageCallback(
        std::bind(&ChatClient::onMessage, this, _1, _2, _3));
}

void ChatClient::connect() {
    client_.connect();
}

bool ChatClient::isConnected() const {
    return conn_ && conn_->connected();
}

void ChatClient::sendMessage(const std::string& text) {
    if (!isConnected()) {
        std::cout << "Not connected to server" << std::endl;
        return;
    }
    conn_->send(text);
}

void ChatClient::onConnection(const TcpConnectionPtr& conn) {
    if (conn->connected()) {
        conn_ = conn;
        LOG_INFO << "Connected to server";
    } else {
        conn_.reset();
        LOG_INFO << "Disconnected from server";
    }
}

void ChatClient::onMessage(const TcpConnectionPtr& conn,
                         Buffer* buf,
                         Timestamp time) {
    std::string msg(buf->retrieveAllAsString());
    std::cout << "Server reply: " << msg << std::endl;
}

主函数:

client_main.cpp
#include "../client/client.h"
#include <muduo/net/EventLoop.h>
#include <iostream>
#include <thread>
#include <atomic>

std::atomic<bool> g_running(true);

void handleInput(ChatClient& client) {
    std::string line;
    while (g_running.load() && std::getline(std::cin, line)) {
        if (line == "quit") {
            g_running.store(false);
            break;
        }
        client.sendMessage(line);
    }
}

int main(int argc, char* argv[]) {
    if (argc < 3) {
        std::cerr << "Usage: " << argv[0] 
                  << " <server_ip> <server_port>" << std::endl;
        return 1;
    }

    muduo::net::EventLoop loop;
    muduo::net::InetAddress serverAddr(argv[1], std::stoi(argv[2]));

    ChatClient client(&loop, serverAddr);
    std::thread inputThread(handleInput, std::ref(client));
    
    client.connect();
    loop.loop();
    
    g_running.store(false);
    if (inputThread.joinable()) {
        inputThread.join();
    }

    return 0;
}

服务端代码:

server.h
#ifndef CHAT_SERVER_H
#define CHAT_SERVER_H

#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/base/Logging.h> 
#include <unordered_set>
#include "chat.pb.h"

class ChatServer {
public:
    ChatServer(muduo::net::EventLoop* loop,
              const muduo::net::InetAddress& listenAddr);
    
    void start();

private:
    void onConnection(const muduo::net::TcpConnectionPtr& conn);
    void onMessage(const muduo::net::TcpConnectionPtr& conn,
                  muduo::net::Buffer* buf,
                  muduo::Timestamp time);
    
    void handleRequest(const muduo::net::TcpConnectionPtr& conn,
                      const chat::ChatRequest& request);
    
    muduo::net::TcpServer server_;
    std::unordered_set<muduo::net::TcpConnectionPtr> connections_;
    std::vector<chat::ChatMessage> chatHistory_;
};

#endif // CHAT_SERVER_H
server.cpp
#include "server.h"
#include "chat.pb.h"
#include <muduo/base/Logging.h>

using namespace muduo;
using namespace muduo::net;

ChatServer::ChatServer(EventLoop* loop, const InetAddress& listenAddr)
    : server_(loop, listenAddr, "ChatServer") {
    server_.setMessageCallback(
        std::bind(&ChatServer::onMessage, this, _1, _2, _3));
}

void ChatServer::start() {
    server_.start();
}

void ChatServer::onMessage(const TcpConnectionPtr& conn,
                          Buffer* buf,
                          Timestamp time) {
    // 获取并打印完整消息内容
    std::string msg = buf->retrieveAllAsString();
    printf("收到客户端消息[长度:%lu]: %s\n", msg.size(), msg.c_str());
    
    // 简单回复确认(包含原始消息)
    std::string reply = "Server收到消息: " + msg;
    conn->send(reply);
}

主函数:

server_main.cpp
#include "../server/server.h"
#include <muduo/net/EventLoop.h>

int main() {
    muduo::net::EventLoop loop;
    muduo::net::InetAddress listenAddr(8888);
    ChatServer server(&loop, listenAddr);
    
    server.start();
    loop.loop();
    
    return 0;
}

效果如下:
在这里插入图片描述
在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到