C++ - 仿 RabbitMQ 实现消息队列--muduo快速上手

发布于:2025-07-16 ⋅ 阅读:(19) ⋅ 点赞:(0)

目录

Muduo 库是什么

 Muduo 库常见接口介绍

muduo::net::TcpServer 类基础介绍

muduo::net::EventLoop 类基础介绍

muduo::net::TcpConnection 类基础介绍

muduo::net::TcpClient 类基础介绍

muduo::net::Buffer 类基础介绍

Muduo 库快速上手

echo 服务器

echo客户端

编译命令

基于 muduo 库函数实现 protobuf 协议的通信

大佬代码

服务器

客户端

模仿实现服务器客户端

proto结构的定义

服务器

客户端

编译命令


Muduo 库是什么

        Muduo 由陈硕大佬开发,是一个基于非阻塞 IO 事件驱动的 C++高并发 TCP 网络编程库。 它是一款基于主从 Reactor 模型的网络库,其使用的线程模型是 one loop per thread, 所谓 one loop per thread 指的是:

  • 一个线程只能有一个事件循环(EventLoop), 用于响应计时器和 IO 事件。
  • 一个文件描述符只能由一个线程进行读写,换句话说就是一个 TCP 连接必须归属于某个 EventLoop 管理。

 Muduo 库常见接口介绍

muduo::net::TcpServer 类基础介绍

typedef std::shared_ptr<TcpConnection> TcpConnectionPtr;
typedef std::function<void (const TcpConnectionPtr&)> ConnectionCallback;
typedef std::function<void (const TcpConnectionPtr&, Buffer*, Timestamp)> MessageCallback;
class InetAddress : public muduo::copyable
{ 
public:
 InetAddress(StringArg ip, uint16_t port, bool ipv6 = false);
};
class TcpServer : noncopyable
{ 
public:
 enum Option
 { 
 kNoReusePort,
 kReusePort,
 };
 TcpServer(EventLoop* loop, const InetAddress& listenAddr, const string& nameArg, Option option = kNoReusePort); 
 void setThreadNum(int numThreads); 
 void start();
 /// 当一个新连接建立成功的时候被调用
 void setConnectionCallback(const ConnectionCallback& cb)
 { connectionCallback_ = cb; }
 /// 消息的业务处理回调函数---这是收到新连接消息的时候被调用的函数 
 void setMessageCallback(const MessageCallback& cb) { messageCallback_ = cb; }
};

说明:

  • 构造函数:
    • loop:进行事件监控的对象。
    • listenAddr:绑定的ip,port。
    • nameArg:服务器的名称。
    • option:是否启用端口复用,默认不启用。
  • setThreadNum:设置线程数目。
  • start:启动服务器。
  • setConnectionCallback:设置收到新链接的时候要调用的回调函数。
  • setMessageCallback:设置收到客户端发来的信息时要调用的回调函数,也就是处理数据的函数。

muduo::net::EventLoop 类基础介绍

class EventLoop : noncopyable
{ 
public:
 /// Loops forever.
 /// Must be called in the same thread as creation of the object. 
 void loop();
 /// Quits loop.
 /// This is not 100% thread safe, if you call through a raw pointer, 
 /// better to call through shared_ptr<EventLoop> for 100% safety. 
 void quit();
 TimerId runAt(Timestamp time, TimerCallback cb);
 /// Runs callback after @c delay seconds.
 /// Safe to call from other threads.
 TimerId runAfter(double delay, TimerCallback cb);
 /// Runs callback every @c interval seconds.
 /// Safe to call from other threads.
 TimerId runEvery(double interval, TimerCallback cb);
 /// Cancels the timer.
 /// Safe to call from other threads. void cancel(TimerId timerId);private: std::atomic<bool> quit_; std::unique_ptr<Poller> poller_; mutable MutexLock mutex_; std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_);
};

说明:在次项目中我们只需要知道loop是启动事件监控即可,和服务器一起启动。

muduo::net::TcpConnection 类基础介绍

class TcpConnection : noncopyable, public std::enable_shared_from_this<TcpConnection>
{ 
public:
 /// Constructs a TcpConnection with a connected sockfd
 ///
 /// User should not create this object. 
 TcpConnection(EventLoop* loop, const string& name, int sockfd, const InetAddress&  localAddr, const InetAddress& peerAddr); 
 bool connected() const { return state_ == kConnected; } 
 bool disconnected() const { return state_ == kDisconnected; }
 
 void send(string&& message); // C++11 
 void send(const void* message, int len); void send(const StringPiece& message); 
 //  void send(Buffer&& message); // C++11 
 void send(Buffer* message); // this one will swap data void shutdown(); 
 // NOT thread safe, no simultaneous calling
 void setContext(const boost::any& context)
 { context_ = context; }
 const boost::any& getContext() const
 { return context_; }
 boost::any* getMutableContext()
 { return &context_; }
 void setConnectionCallback(const ConnectionCallback& cb)
 { connectionCallback_ = cb; }
 void setMessageCallback(const MessageCallback& cb)
 { messageCallback_ = cb; }
private:
 enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };
 EventLoop* loop_;
 ConnectionCallback connectionCallback_;
 MessageCallback messageCallback_;
 WriteCompleteCallback writeCompleteCallback_;
 boost::any context_;
};

在此项目中,我们只需要了解一下接口:

  • connected()/disconnected():判断链接的状态,即是否是链接状态。
  • void send(string&& message):向对端发送数据。

muduo::net::TcpClient 类基础介绍

class TcpClient : noncopyable
{ 
public:
 // TcpClient(EventLoop* loop);
 // TcpClient(EventLoop* loop, const string& host, uint16_t port);
 TcpClient(EventLoop* loop, const InetAddress& serverAddr, const string& nameArg);
 ~TcpClient(); // force out-line dtor, for std::unique_ptr members.
 void connect();//连接服务器
 void disconnect();//关闭连接 
 void stop();
 //获取客户端对应的通信连接 Connection 对象的接口,发起 connect 后,有
可能还没有连接建立成功
 TcpConnectionPtr connection() const
 { 
   MutexLockGuard lock(mutex_); 
   return connection_;
 } 
 /// 连接服务器成功时的回调函数
 void setConnectionCallback(ConnectionCallback cb)
 { connectionCallback_ = std::move(cb); }
 /// 收到服务器发送的消息时的回调函数
 void setMessageCallback(MessageCallback cb)
 { messageCallback_ = std::move(cb); }
private:
 EventLoop* loop_;
 ConnectionCallback connectionCallback_; 
 MessageCallback messageCallback_;
 WriteCompleteCallback writeCompleteCallback_; 
 TcpConnectionPtr connection_ GUARDED_BY(mutex_);
};
/*
需要注意的是,因为 muduo 库不管是服务端还是客户端都是异步操作,
对于客户端来说如果我们在连接还没有完全建立成功的时候发送数据,这是不被允许的。
因此我们可以使用内置的 CountDownLatch 类进行同步控制*/
class CountDownLatch : noncopyable
{ 
public:
 explicit CountDownLatch(int count); void wait()
 {
   MutexLockGuard lock(mutex_); 
     while (count_ > 0)
     { 
      condition_.wait();
     }
 } 
 void countDown()
{
   MutexLockGuard lock(mutex_);
   --count_;
   if (count_ == 0)
   { 
     condition_.notifyAll();
   } 
   } 
 int getCount() const;
private:
 mutable MutexLock mutex_;
 Condition condition_ GUARDED_BY(mutex_); int count_ GUARDED_BY(mutex_);
};

说明:

  • 构造函数:
    • loop:事件监控对象。
    • serverAddr:服务器地址。
    • nameArg:客户端名称。
  • connect/disconnect:连接服务器/关闭连接。
  • 两种回调函数与服务器一致。
  • 注释中也给出,我们需要通过CountDownLatch类控制正确获取到链接,后续demo中会演示,用法和条件变量类似。

muduo::net::Buffer 类基础介绍

class Buffer : public muduo::copyable
{ 
public:
 static const size_t kCheapPrepend = 8;
 static const size_t kInitialSize = 1024;
 explicit Buffer(size_t initialSize = kInitialSize) : buffer_(kCheapPrepend + initialSize), readerIndex_(kCheapPrepend), writerIndex_(kCheapPrepend);
 void swap(Buffer& rhs)
 size_t readableBytes() const
 size_t writableBytes() const
 const char* peek() const
 const char* findEOL() const
 const char* findEOL(const char* start) const
 void retrieve(size_t len)
 void retrieveInt64()
 void retrieveInt32()
 void retrieveInt16()
 void retrieveInt8()
 string retrieveAllAsString()
 string retrieveAsString(size_t len)
 void append(const StringPiece& str)
 void append(const char* /*restrict*/ data, size_t len)
 void append(const void* /*restrict*/ data, size_t len)
 char* beginWrite()
 const char* beginWrite() const
 void hasWritten(size_t len)
 void appendInt64(int64_t x)
 void appendInt32(int32_t x)
 void appendInt16(int16_t x)
 void appendInt8(int8_t x)
 int64_t readInt64()
 int32_t readInt32()
 int16_t readInt16()
 int8_t readInt8()
 int64_t peekInt64() const
 int32_t peekInt32() const
 int16_t peekInt16() const
 int8_t peekInt8() const
 void prependInt64(int64_t x)
 void prependInt32(int32_t x)
 void prependInt16(int16_t x)
 void prependInt8(int8_t x)
 void prepend(const void* /*restrict*/ data, size_t len)
private:
 std::vector<char> buffer_;
 size_t readerIndex_;
 size_t writerIndex_;
 static const char kCRLF[];
};

主要了解retrieveAllAsString()接口,就是将数据转换为字符串。

Muduo 库快速上手

        我们使用 Muduo 网络库来实现一个简单echo服务器和客户端 快速上手 Muduo 库。

echo 服务器

#include "../include/muduo/net/TcpServer.h"
#include "../include/muduo/net/EventLoop.h"
#include "../include/muduo/net/TcpConnection.h"
#include <iostream>
#include <functional>
#include <string>

class EchoServer
{
public:
    EchoServer(int port)
        :_server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port), "EchoServer", muduo::net::TcpServer::kReusePort)
    {
        _server.setConnectionCallback(std::bind(&EchoServer::onConnection, this, std::placeholders::_1));
        _server.setMessageCallback(std::bind(&EchoServer::onMessage, this, std::placeholders::_1,
                                    std::placeholders::_2, std::placeholders::_3));
    }
    void start()
    {
        _server.start(); // 开始事件监听
        _baseloop.loop();// 开始事件监控,这是一个死循环阻塞接口
    }
private:
    // 在连接建立成功的时候和关闭连接的时候调用
    void onConnection(const muduo::net::TcpConnectionPtr &conn)
    {
        // 新连接建立成功是的回调函数
        if(conn->connected())
            std::cout << "新连接建立成功: " << conn->peerAddress().toIpPort() << std::endl;
        else
            std::cout << "新连接关闭: " << conn->peerAddress().toIpPort() << std::endl;
    }

    void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buff, muduo::Timestamp)
    {
        std::string msg(buff->retrieveAllAsString());
        conn->send(msg);
    }
private:
    // 用来进行事件监控的对象
    muduo::net::EventLoop _baseloop;
    // 用来设置回调函数的对象
    muduo::net::TcpServer _server;
};

int main()
{
    EchoServer server(8080);
    server.start();
    return 0;
}

说明:

  • 服务器只需要包含两个成员属性,注意:_baseloop必须在_server之前构造,因为_server的构造需要用到_baseloop。
  • start():启动服务器和事件监控。
  • onConnection/onMessage:收到新链接或者客户端数据时的回调函数,要在构造函数中设置到_server中。

echo客户端
 

#include "muduo/net/TcpClient.h"
#include "muduo/net/EventLoopThread.h"
#include "muduo/net/TcpConnection.h"
#include "muduo/base/CountDownLatch.h"
#include <iostream>
#include <functional>
#include <string>

class EchoClient
{
public:
    EchoClient(const std::string &ip, int port)
        :_latch(1), 
         _client(_loopthread.startLoop(), muduo::net::InetAddress(ip, port), "EchoClient")
    {
        _client.setConnectionCallback(std::bind(&EchoClient::onConnection, this, std::placeholders::_1));
        _client.setMessageCallback(std::bind(&EchoClient::onMessage, this, std::placeholders::_1,
                                    std::placeholders::_2, std::placeholders::_3));
    }

    void connect()
    {
        _client.connect();
        _latch.wait();    // 阻塞等待
    }

    bool send(const std::string &msg)
    {
        if(_conn && _conn->connected())
        {
            _conn->send(msg);
            return true;
        }
        else
        {
            std::cout << "未建立连接" << std::endl;
            return false;
        }
    }
private:
    void onConnection(const muduo::net::TcpConnectionPtr &conn)
    {
        if(conn->connected())
        {
            _conn = conn;
            _latch.countDown(); //唤醒主线程中的阻塞
            std::cout << "建立连接" << std::endl;
        }   
        else
        {
            _conn.reset();
            std::cout << "关闭连接" << std::endl;
        }
    }

    void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buff, muduo::Timestamp time)
    {
        std::cout << "server echo: " << buff->retrieveAllAsString() << std::endl;
    }

private:
    // 这里_loopthread必须在_client之前构造, 要不然无法获得相应的Loop
    muduo::CountDownLatch _latch;
    muduo::net::EventLoopThread _loopthread;
    muduo::net::TcpClient _client;
    muduo::net::TcpConnectionPtr _conn;
};

int main()
{
    EchoClient client("127.0.0.1", 8080);
    client.connect();

    while(1)
    {
        std::string buff;
        std::cout << "请输入: ";
        std::cin >> buff;
        client.send(buff);

        if(!client.send(buff))
        {
            std::cout << "连接已断开,无法发送消息" << std::endl;
            break;
        }
    }
    return 0;
}

说明:

  • 成员属性:
    • _latch:封装了条件变量的对象,用来确保正确获得连接,必须初始化为1。
    • _loopthread:客户端不能像服务器一样一直进行事件监控的循环,EventLoopThread对象就可以重新创建一个线程来进行事件监控,必须在_client之前初始化。
    • _client:客户端对象。
    • _conn:建立好的连接,用来发送数据。
  • connect():关键要等待连接真正建立好,将_conn初始化。
  • send():向服务器发送数据。
  • onConnection():关键在于建立连接后初始化_conn,并唤醒客户端。
  • onMessage():收到服务器应答后调用的回调函数。

编译命令

all:server client
server:server.cc
	g++ -o $@ $^ -I../include -L../lib -lmuduo_net -lmuduo_base -pthread
client:client.cc
	g++ -o $@ $^ -I../include -L../lib -lmuduo_net -lmuduo_base -pthread

这里要链接的库的路径根据自己设置的路径来调整。

基于 muduo 库函数实现 protobuf 协议的通信

        上面的echo服务器客户端无法解决数据粘包的问题,并且当我们需要处理多个业务时,无法解决问题,所以我们模仿一个陈硕大佬来完成一个基于protobuf 协议的服务器客户端,实现翻译和计算加法两种业务(简单实现)。

大佬代码

服务器

#include "examples/protobuf/codec/codec.h"
#include "examples/protobuf/codec/dispatcher.h"
#include "examples/protobuf/codec/query.pb.h"

#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpServer.h"

#include <stdio.h>
#include <unistd.h>

using namespace muduo;
using namespace muduo::net;

typedef std::shared_ptr<muduo::Query> QueryPtr;
typedef std::shared_ptr<muduo::Answer> AnswerPtr;

class QueryServer : noncopyable
{
 public:
  QueryServer(EventLoop* loop,
              const InetAddress& listenAddr)
  : server_(loop, listenAddr, "QueryServer"),
    dispatcher_(std::bind(&QueryServer::onUnknownMessage, this, _1, _2, _3)),
    codec_(std::bind(&ProtobufDispatcher::onProtobufMessage, &dispatcher_, _1, _2, _3))
  {
    dispatcher_.registerMessageCallback<muduo::Query>(
        std::bind(&QueryServer::onQuery, this, _1, _2, _3));
    dispatcher_.registerMessageCallback<muduo::Answer>(
        std::bind(&QueryServer::onAnswer, this, _1, _2, _3));
    server_.setConnectionCallback(
        std::bind(&QueryServer::onConnection, this, _1));
    server_.setMessageCallback(
        std::bind(&ProtobufCodec::onMessage, &codec_, _1, _2, _3));
  }

  void start()
  {
    server_.start();
  }

 private:
  void onConnection(const TcpConnectionPtr& conn)
  {
    LOG_INFO << conn->peerAddress().toIpPort() << " -> "
        << conn->localAddress().toIpPort() << " is "
        << (conn->connected() ? "UP" : "DOWN");
  }

  void onUnknownMessage(const TcpConnectionPtr& conn,
                        const MessagePtr& message,
                        Timestamp)
  {
    LOG_INFO << "onUnknownMessage: " << message->GetTypeName();
    conn->shutdown();
  }

  void onQuery(const muduo::net::TcpConnectionPtr& conn,
               const QueryPtr& message,
               muduo::Timestamp)
  {
    LOG_INFO << "onQuery:\n" << message->GetTypeName() << message->DebugString();
    Answer answer;
    answer.set_id(1);
    answer.set_questioner("Chen Shuo");
    answer.set_answerer("blog.csdn.net/Solstice");
    answer.add_solution("Jump!");
    answer.add_solution("Win!");
    codec_.send(conn, answer);

    conn->shutdown();
  }

  void onAnswer(const muduo::net::TcpConnectionPtr& conn,
                const AnswerPtr& message,
                muduo::Timestamp)
  {
    LOG_INFO << "onAnswer: " << message->GetTypeName();
    conn->shutdown();
  }

  TcpServer server_;
  ProtobufDispatcher dispatcher_;
  ProtobufCodec codec_;
};

int main(int argc, char* argv[])
{
  LOG_INFO << "pid = " << getpid();
  if (argc > 1)
  {
    EventLoop loop;
    uint16_t port = static_cast<uint16_t>(atoi(argv[1]));
    InetAddress serverAddr(port);
    QueryServer server(&loop, serverAddr);
    server.start();
    loop.loop();
  }
  else
  {
    printf("Usage: %s port\n", argv[0]);
  }
}

关键点说明:

  • 先看成员属性:
    • dispatcher_:任务的分发器,也就是基于不同的任务进行不同的处理。

    • codec_:解析protobuf数据,将数据交给dispatcher_处理。还可以根据连接向对端发送protobuf结构。
  • 设计的思路:先向dispatcher_中注册针对不同任务的处理函数,再将server_对数据的处理绑定到codec_对数据处理的函数上,即让codec_代替server_处理数据,codec_解析完数据后交给dispatcher_进行真正的任务处理。
  • 关于代码中的QueryPtr/AnswerPtr,这些的都是需要自己定义的proto结构体。

客户端

#include "examples/protobuf/codec/dispatcher.h"
#include "examples/protobuf/codec/codec.h"
#include "examples/protobuf/codec/query.pb.h"

#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpClient.h"

#include <stdio.h>
#include <unistd.h>

using namespace muduo;
using namespace muduo::net;

typedef std::shared_ptr<muduo::Empty> EmptyPtr;
typedef std::shared_ptr<muduo::Answer> AnswerPtr;

google::protobuf::Message* messageToSend;

class QueryClient : noncopyable
{
 public:
  QueryClient(EventLoop* loop,
              const InetAddress& serverAddr)
  : loop_(loop),
    client_(loop, serverAddr, "QueryClient"),
    dispatcher_(std::bind(&QueryClient::onUnknownMessage, this, _1, _2, _3)),
    codec_(std::bind(&ProtobufDispatcher::onProtobufMessage, &dispatcher_, _1, _2, _3))
  {
    dispatcher_.registerMessageCallback<muduo::Answer>(
        std::bind(&QueryClient::onAnswer, this, _1, _2, _3));
    dispatcher_.registerMessageCallback<muduo::Empty>(
        std::bind(&QueryClient::onEmpty, this, _1, _2, _3));
    client_.setConnectionCallback(
        std::bind(&QueryClient::onConnection, this, _1));
    client_.setMessageCallback(
        std::bind(&ProtobufCodec::onMessage, &codec_, _1, _2, _3));
  }

  void connect()
  {
    client_.connect();
  }

 private:

  void onConnection(const TcpConnectionPtr& conn)
  {
    LOG_INFO << conn->localAddress().toIpPort() << " -> "
        << conn->peerAddress().toIpPort() << " is "
        << (conn->connected() ? "UP" : "DOWN");

    if (conn->connected())
    {
      codec_.send(conn, *messageToSend);
    }
    else
    {
      loop_->quit();
    }
  }

  void onUnknownMessage(const TcpConnectionPtr&,
                        const MessagePtr& message,
                        Timestamp)
  {
    LOG_INFO << "onUnknownMessage: " << message->GetTypeName();
  }

  void onAnswer(const muduo::net::TcpConnectionPtr&,
                const AnswerPtr& message,
                muduo::Timestamp)
  {
    LOG_INFO << "onAnswer:\n" << message->GetTypeName() << message->DebugString();
  }

  void onEmpty(const muduo::net::TcpConnectionPtr&,
               const EmptyPtr& message,
               muduo::Timestamp)
  {
    LOG_INFO << "onEmpty: " << message->GetTypeName();
  }

  EventLoop* loop_;
  TcpClient client_;
  ProtobufDispatcher dispatcher_;
  ProtobufCodec codec_;
};

int main(int argc, char* argv[])
{
  LOG_INFO << "pid = " << getpid();
  if (argc > 2)
  {
    EventLoop loop;
    uint16_t port = static_cast<uint16_t>(atoi(argv[2]));
    InetAddress serverAddr(argv[1], port);

    muduo::Query query;
    query.set_id(1);
    query.set_questioner("Chen Shuo");
    query.add_question("Running?");
    muduo::Empty empty;
    messageToSend = &query;

    if (argc > 3 && argv[3][0] == 'e')
    {
      messageToSend = &empty;
    }

    QueryClient client(&loop, serverAddr);
    client.connect();
    loop.loop();
  }
  else
  {
    printf("Usage: %s host_ip port [q|e]\n", argv[0]);
  }
}

与服务器非常相似。

模仿实现服务器客户端

        现在我们模仿一下大佬来实现一下自己的服务器客户端。

proto结构的定义

syntax = "proto3";

package jiuqi;

message TranslateRequest
{
    string msg = 1;
};

message TranslateResponse
{
    string msg = 1;
};

message AddRequest
{
    int32 num1 = 1;
    int32 num2 = 2;
};

message AddResponse
{
    int32 res = 1;
};

服务器

#include "muduo/proto/codec.h"
#include "muduo/proto/dispatcher.h"
#include "request.pb.h"

#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpServer.h"

#include <iostream>
#include <unistd.h>
#include <unordered_map>

class Server
{
public:
    using MessagePtr = std::shared_ptr<google::protobuf::Message>;
    using TranslateRequestPtr = std::shared_ptr<jiuqi::TranslateRequest>;
    using TranslateResponsePtr = std::shared_ptr<jiuqi::TranslateResponse>;
    using AddRequestPtr = std::shared_ptr<jiuqi::AddRequest>;
    using AddResponsePtr = std::shared_ptr<jiuqi::AddResponse>;

    Server(int port) :_server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port),
                                "server", muduo::net::TcpServer::kReusePort),
                      _dispatcher(std::bind(&Server::onUnknownMessage, this,
                                std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),
                      _codec(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher,
                                std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))
    {
        // 注册请求处理函数
        _dispatcher.registerMessageCallback<jiuqi::TranslateRequest>(std::bind(&Server::onTranslate, this,
                                std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
        _dispatcher.registerMessageCallback<jiuqi::AddRequest>(std::bind(&Server::onAdd, this,
                                std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
                                
        _server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, &_codec, 
                                std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
        _server.setConnectionCallback(std::bind(&Server::onConnection, this, std::placeholders::_1));
    }

    void Start()
    {
        _server.start();
        _baseloop.loop();
    }

private:
    std::string translate(const std::string &str)
    {
        std::unordered_map<std::string, std::string> dict = {
            {"hello", "你好"},
            {"你好", "hello"},
        };
        auto it = dict.find(str);
        if(it == dict.end())
            return "没听懂";
        return it->second;
    }

    void onTranslate(const muduo::net::TcpConnectionPtr& conn,
                        const TranslateRequestPtr& message,
                        muduo::Timestamp)
    {
        std::string msg = message->msg();
        std::string req = translate(msg);
        jiuqi::TranslateResponse resp;
        resp.set_msg(req);
        _codec.send(conn, resp);
    }

    void onAdd(const muduo::net::TcpConnectionPtr& conn,
                        const AddRequestPtr& message,
                        muduo::Timestamp)
    {
        int num1 = message->num1();
        int num2 = message->num2();
        int res = num1+num2;
        jiuqi::AddResponse resp;
        resp.set_res(res);
        _codec.send(conn, resp);
    }

    void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn,
                        const MessagePtr& message,
                        muduo::Timestamp)
    {
        LOG_INFO << "onUnknownMessage: " << message->GetTypeName();
        conn->shutdown();
    }

    void onConnection(const muduo::net::TcpConnectionPtr& conn)
    {
        if(conn->connected())
        {
            LOG_INFO << "连接建立成功";
        }
        else
        {
            LOG_INFO << "连接关闭";
        }
    }

private:
    muduo::net::EventLoop _baseloop;
    muduo::net::TcpServer _server;  // 服务器对象
    ProtobufDispatcher _dispatcher; // 请求分发器对象--要向其中注册请求处理函数
    ProtobufCodec _codec;           // protobuf协议处理器--针对收到的请求数据进行prototo协议处理
};

int main()
{
    Server server(8080);
    server.Start();
    return 0;
}

客户端

#include "muduo/proto/dispatcher.h"
#include "muduo/proto/codec.h"
#include "request.pb.h"

#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/TcpClient.h"
#include "muduo/net/EventLoopThread.h"
#include "muduo/base/CountDownLatch.h"

#include <iostream>
#include <unistd.h>


class Client
{
public:
    using MessagePtr = std::shared_ptr<google::protobuf::Message>;
    using TranslateRequestPtr = std::shared_ptr<jiuqi::TranslateRequest>;
    using TranslateResponsePtr = std::shared_ptr<jiuqi::TranslateResponse>;
    using AddRequestPtr = std::shared_ptr<jiuqi::AddRequest>;
    using AddResponsePtr = std::shared_ptr<jiuqi::AddResponse>;

    Client(const std::string ip, int port)
        :_latch(1),
         _client(_loopthread.startLoop(), muduo::net::InetAddress(ip, port), "client"),
         _dispatcher(std::bind(&Client::onUnknownMessage, this, 
                    std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),
         _codec(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, 
                    std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))
    {
                // 注册请求处理函数
        _dispatcher.registerMessageCallback<jiuqi::TranslateResponse>(std::bind(&Client::onTranslate, this,
                                std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
        _dispatcher.registerMessageCallback<jiuqi::AddResponse>(std::bind(&Client::onAdd, this,
                                std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
                                
        _client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, &_codec, 
                                std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
        _client.setConnectionCallback(std::bind(&Client::onConnection, this, std::placeholders::_1));
    }

    void connect()
    {
        _client.connect();
        _latch.wait();    // 阻塞等待
    }

    void translate(const std::string &msg)
    {
        jiuqi::TranslateRequest request;
        request.set_msg(msg);
        send(request);
    }

    void add(int num1, int num2)
    {
        jiuqi::AddRequest request;
        request.set_num1(num1);
        request.set_num2(num2);
        send(request);
    }

private:
    bool send(const google::protobuf::Message &msg)
    {
        if(_conn && _conn->connected())
        {
            _codec.send(_conn, msg);
            return true;
        }
        else
        {
            std::cout << "未建立连接" << std::endl;
            return false;
        }
    }

    void onTranslate(const muduo::net::TcpConnectionPtr&,
                const TranslateResponsePtr& message,
                muduo::Timestamp)
    {
        LOG_INFO << "翻译结果: " << message->msg();
    }

    void onAdd(const muduo::net::TcpConnectionPtr&,
                const AddResponsePtr& message,
                muduo::Timestamp)
    {
        LOG_INFO << "加法结果: " << message->res();
    }

    void onUnknownMessage(const muduo::net::TcpConnectionPtr&,
                        const MessagePtr& message,
                        muduo::Timestamp)
    {
      LOG_INFO << "onUnknownMessage: " << message->GetTypeName();
    }

    void onConnection(const muduo::net::TcpConnectionPtr& conn)
    {
        if(conn->connected())
        {
            _conn = conn;
            _latch.countDown();
            std::cout << "连接建立成功" << std::endl;
        }
        else
        {
            std::cout << "连接关闭成功" << std::endl;
        }
    }

private:
    muduo::CountDownLatch _latch;           // 实现同步的
    muduo::net::EventLoopThread _loopthread;// 异步循环处理线程
    muduo::net::TcpClient _client;
    ProtobufDispatcher _dispatcher;         // 请求分发器
    ProtobufCodec _codec;                   // 协议处理器
    muduo::net::TcpConnectionPtr _conn;     // 客户端对应连接

};

int main()
{
    Client client("127.0.0.1", 8080);
    client.connect();

    int op = 0;
    while(true)
    {
        std::cout << "输入要进行的操作(1.翻译, 2.加法): ";
        std::cin >> op;
        if(op == 1)
        {
            std::string str;
            std::cout << "输入要翻译的单词: ";
            std::cin >> str;
            client.translate(str);
        } 
        else
        {
            int num1, num2;
            std::cout << "输入两个加数: ";
            std::cin >> num1 >> num2;
            client.add(num1, num2);
        }
        sleep(1);
    }

    return 0;
}

编译命令

all:server client
server:protobufserver.cc request.pb.cc ../include/muduo/proto/codec.cc
	g++ -o $@ $^ -I../include -I. -L../lib -lmuduo_net -lmuduo_base -lprotobuf -lz -pthread
client:protobufclient.cc request.pb.cc ../include/muduo/proto/codec.cc
	g++ -o $@ $^ -I../include -I. -L../lib -lmuduo_net -lmuduo_base -lprotobuf -lz -pthread

网站公告

今日签到

点亮在社区的每一天
去签到