八、客户端模块的实现
客户端实现的总体框架
在 RabbitMQ 中,应用层提供消息服务的核心实体是 信道(Channel)。
用户想要与消息队列服务器交互时,通常不会直接操作底层的 TCP 连接,而是通过信道来进行各种消息的发布、订阅、确认等操作。
信道可以看作是在单个 TCP 连接之上的轻量级虚拟连接,它负责封装具体的协议细节,屏蔽了网络通信的复杂性。
用户只需要调用信道提供的接口,发送消息或接收消息,无需关心底层数据如何传输、协议如何实现。
简单来说,用户面向的是信道服务接口,而信道背后处理了连接管理、数据编解码、协议交互等工作,实现了业务与网络通信的解耦。
客户端设计视角和服务器视角的对比
方面 | 客户端视角(信道) | 服务端视角(连接+信道) |
---|
主要关注点 | 调用信道接口完成消息操作 | 管理连接和信道,执行底层协议与业务逻辑 |
抽象层级 | 抽象出具体网络和协议细节 | 解析信道请求,实现消息路由和持久化 |
资源管理 | 不关心连接具体实现,只用信道 | 管理物理连接、信道状态及相关资源 |
多路复用 | 多信道复用同一连接,简化调用 | 维护多信道,保证隔离与并发性能 |
用户责任 | 只需调用信道接口 | 处理协议解析、消息调度、资源分配 |
- 连接模块
客户端和服务端进行网络连接的基础。
一个直接面向用户的模块,内部包含多个对外提供服务的接口,用户需要什么服务进行调用对应的接口即可,其中包含交换机的声明/删除,队列的声明与删除,队列的绑定与解绑,消息的发布与订阅,订阅和解除订阅。
表达了客户端与服务器之间在消息队列系统中协作的流程
在仿 RabbitMQ 的消息队列系统中,客户端首先通过订阅者模块注册自身的消费者身份,并指定对应的消息处理回调函数;随后通过信道模块在单一的 TCP 连接上实现多路复用,创建多个逻辑信道以并行处理不同的消息服务(如发布、订阅、队列管理等)。客户端通过连接模块建立与服务器的连接,并在信道中发起具体的请求服务。
服务器接收到连接请求后,由服务器端的连接管理器创建连接上下文,并根据信道中携带的请求类型,路由到对应的处理模块(如交换机、队列或消息模块),执行相应的业务逻辑。处理结果再由服务器的异步线程池将数据封装好并通过网络返回给客户端,客户端的异步事件机制或线程池再触发对应的回调函数完成消息消费流程。
基于以上模块实现客户端的思路就非常清晰了
1、实例化异步线程对象
2、实例化连接对象
3、通过连接对象进行创建信道
4、根据信道进行获取自己的所需服务
5、关闭信道
6、关闭连接
8.1、订阅者模块
订阅者对象的设计
一个不向用户进行直接展示的模块,在客户端中进行体现的作用就是对角色的描述,表示这就是一个消费者,用户通常不直接操作订阅逻辑,而是通过定义“消费者”的方式进行消息处理逻辑的注册。
一个信道只有一个订阅者,所以说不需要进行订阅者的管理。订阅者这个模块很简单,没有涉及到一些业务模块的内容,业务模块的服务都在信道模块进行提供。
订阅者模块(消费者)这个类中成员变量的设计
- 首先需要定义消费者ID,描述了收到该条消息后该如何进行对这条消息进行处理。
- 其次是要进行订阅的哪个队列的ID和自动删除标志,描述了收到消息后是否需要对消息进行回复,是否要进行自动删除。
- 最后是回调函数,描述了从队列中进行获取消息后应该如何进行处理,这部分由用户进行决定。
订阅者模块的实现
using ConsumerCallback = std::function<void(const std::string, const ys::BasicProperties *bp, const std::string)>;
struct Consumer
{
using ptr=std::shared_ptr<Consumer>;
std::string _cid;
std::string _qname;
bool _auto_ack;
ConsumerCallback _callback;
Consumer()
{
DLOG("new Consumer:%p",this);
}
Consumer(const std::string &cid, const std::string &qname, bool auto_ack, const ConsumerCallback &cb)
: _cid(cid)
, _qname(qname)
, _auto_ack(auto_ack)
, _callback(std::move(cb))
{
DLOG("new Consumer:%p",this);
}
~Consumer()
{
DLOG("del Consumer:%p",this);
}
};
在构造函数中,ConsumerCallback(是一个 std::function)内部可能有复杂对象(比如 Lambda、绑定的资源等),如果直接写 _callback(cb),会调用拷贝构造,可能涉及较多内存分配、资源拷贝,用 std::move(cb),可以让 ConsumerCallback 的内部资源被移动到 _callback,更高效。
8.2、异步工作线程模块
用户虽然是通过信道进行网络通信的,但是网络通信的本质还是需要进行IO事件的监控的,这就要通过IO监控线程来进行整,不能在当前线程进行IO事件的监控,这样的话就会在当前线程进行阻塞住了.下面通过表格的方式进行说明
模块 | loopthread |
pool |
---|---|---|
订阅客户端 | 负责监听服务器消息推送(socket 读事件) | - 接收到消息后,异步将业务处理放到 pool 中去执行,因为收到的消息可能需要进行处理会耗时,防止主线程阻塞,无法进行监听服务器消息的推送 |
发布客户端 | 负责监听向服务器发送消息后的 socket 可写事件方便继续向服务器进行发送数据(或者响应 ack 等) | - 应用层调用发布接口时,耗时操作(如序列化、日志记录)在 pool 中处理 |
class AsyncWorker
{
public:
using ptr=std::shared_ptr<AsyncWorker>;
muduo::net::EventLoopThread loopthread;
threadpool pool;
};
8.3、连接模块
这其实就是我们在Demo模块利用muduo库进行搭建的客户端,这里其实就是换了一层皮,称为连接模块。
class Connection
{
public:
using ptr=std::shared_ptr<Connection>;
typedef std::shared_ptr<google::protobuf::Message> MessagePtr;
Connection(const std::string &sip, int sport,AsyncWorker::ptr worker)
: _latch(1)
, _client(worker->loopthread.startLoop(), muduo::net::InetAddress(sip, sport), "Client")
, _dispatcher(std::bind(&Connection::onUnknownMessage, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))
,_codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)))
,_worker(worker)
,_channel_manager(std::make_shared<ChannelManager>())
{
_dispatcher.registerMessageCallback<ys::basicConsumeResponse>(std::bind(&Connection::consumeResponse, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_dispatcher.registerMessageCallback<ys::basicCommonResponse>(std::bind(&Connection::basicResponse, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(),
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_client.setConnectionCallback(std::bind(&Connection::onConnection, this, std::placeholders::_1));
_client.connect();
_latch.wait(); // 阻塞等待,直到连接建立成功
}
Channel::ptr openChannel()
{
Channel::ptr channel = _channel_manager->create(_conn,_codec);
bool ret=channel->opneChannel();
if(ret==false)
{
DLOG("打开信道失败");
return Channel::ptr();
}
return channel;
}
void closeChannel(const Channel::ptr& channel)
{
channel->closeChannel();
_channel_manager->remove(channel->cid());
}
private:
//收到基础响应
void basicResponse(const muduo::net::TcpConnectionPtr &conn, const basicCommonResponsePtr &message, muduo::Timestamp)
{
//1、找到信道
Channel::ptr channel=_channel_manager->get(message->cid());
if(channel.get()==nullptr)
{
DLOG("未找到信道消息");
return;
}
//2、将得到的响应对象进行添加到信道的基础响应
channel->putBasicResponse(message);
}
//收到消息推送
void consumeResponse(const muduo::net::TcpConnectionPtr &conn, const basicConsumeResponsePtr &message, muduo::Timestamp)
{
//1、找到信道
Channel::ptr channel=_channel_manager->get(message->cid());
if(channel.get()==nullptr)
{
DLOG("未找到信道消息");
return;
}
//2、封装异步任务(消息处理任务),抛入线程池
_worker->pool.push([channel,message]()
{
channel->consume(message);
});
}
void onUnknownMessage(const muduo::net::TcpConnectionPtr &conn, const MessagePtr &message, muduo::Timestamp)
{
LOG_INFO << "onUnknownMessage: " << message->GetTypeName();
conn->shutdown();
}
void onConnection(const muduo::net::TcpConnectionPtr &conn)
{
if (conn->connected())
{
_latch.countDown(); // 唤醒主线程中的阻塞
_conn = conn;
}
else
{
// 连接关闭时的操作
_conn.reset();
}
}
private:
muduo::CountDownLatch _latch; // 实现同步的
//muduo::net::EventLoopThread _loopthread; // 异步循环处理线程
AsyncWorker::ptr _worker;
muduo::net::TcpConnectionPtr _conn; // 客户端对应的连接
muduo::net::TcpClient _client; // 客户端
ProtobufDispatcher _dispatcher; // 请求分发器
ProtobufCodecPtr _codec; // 协议处理器
ChannelManager::ptr _channel_manager;
};
在连接模块这里是有一个极易容易进行掉进坑里的陷阱 :当发布客户端进行向服务器进行发送建立连接请求的时候,由于TCP是有发送缓冲区和接收缓冲区的,当请求被发送到发送缓存区的时候,就会默认连接建立成功,但是此时的连接是没有被建立成功的,此时发布客户端误以为连接是建立成功的,就会执行后续操作,向服务器进行发送消息,此时就出现了问题。同样订阅客户端也类似。
因此在onConnection 函数中需要进行判断是否是真正的建立连接成功。
consumeResponse中
当连接收到消息推送后,需要_consumer 进行参与,因为只有consumer中有回调函数,知道进行收到消息推送时如何进行处理,这个接口到时候收到消息之后和消息一起进行封装成一个任务,把这个任务放到线程池中,并不在当前的主执行流中进行执行。
8.4、信道管理模块
信道模块的定位与主要职责
信道不仅仅是数据的通道,还承载着客户端的业务接口,因此这个模块不仅要进行信道结构的设计,还需要进行提供对应的业务逻辑。信道类可以理解为客户端在和消息服务器交互时的一条逻辑通道。它并不是单纯的数据结构,而是抽象出与服务器交互(各种请求/响应、状态维护等)的一套完整业务流程封装。
换句话说,其实可以将信道模块进行理解成将订阅者模块、异步线程模块、和连接模块进行统一封装管理。
客户端信道模块和客户端其他模块之间的交互关系
模块 | 交互内容 |
---|---|
连接模块 ( muduo::net::TcpConnection ) |
- Channel 直接持有 TCP 连接 _conn - 通过 _conn 发送请求消息给服务器- 服务器响应也通过该连接返回 |
订阅者模块 ( Consumer ) |
- Channel 通过 basicConsume() 创建订阅者对象 _consumer - 收到推送消息时, Channel::consume() 回调订阅者的处理逻辑 |
异步线程模块 (muduo 的 IO 线程) |
- 服务器响应由 IO 线程收到 - 触发 Channel::putBasicResponse() ,将响应加入 _basic_resp - 触发 Channel::consume() ,调用用户回调 |
8.2.1、信道管理的信息
- 信道ID
- 信道关联的网络通信连接的对象
- protobuf 协议处理对象
- 信道关联的消费者
- 请求对应的响应信息队列(这里队列使用<请求ID,响应>hash表,一遍进行查找指定的响应)
- 互斥锁&条件变量(大部分的请求都是阻塞操作,发送请求后需要进行等到响应才能继续,但是muduo库的通信是异步的,因此需要我们子啊收到响应后,通过判断是否是等待的响应来进行同步)。
8.2.2、信道管理的操作
- 创建信道的操作
- 提供删除信道操作
- 提供声明交换机操作(强断言-有则OK,没有则创建)
- 提供删除交换机
- 提供创建队列操作(强断言-有则OK,没有则创建)
- 提供删除队列操作
- 提供交换机-队列绑定操作
- 提供交换机-队列解除绑定操作
- 提供添加订阅操作
- 提供取消订阅操作
- 提供发布消息操作
- 提供确认消息操作信道模块进行管理
using ProtobufCodecPtr=std::shared_ptr<ProtobufCodec>;
using basicCommonResponsePtr=std::shared_ptr<ys::basicCommonResponse>;
using basicConsumeResponsePtr=std::shared_ptr<ys::basicConsumeResponse>;
class Channel
{
public:
using ptr=std::shared_ptr<Channel>;
Channel(const muduo::net::TcpConnectionPtr& conn,const ProtobufCodecPtr& codec)
:_cid(UUIDHelper::uuid())
,_conn(conn)
,_codec(codec)
{
}
~Channel()
{
basicCancel();
}
bool opneChannel()
{
std::string rid=UUIDHelper::uuid();
ys::openChannelRequest req;
req.set_rid(rid);
req.set_cid(_cid);
//向服务器进行发送请求
_codec->send(_conn,req);
//等待服务器
basicCommonResponsePtr resp=waitResponse(rid);
//返回
return resp->ok();
}
void closeChannel()
{
std::string rid=UUIDHelper::uuid();
ys::closeChannelRequest req;
req.set_rid(rid);
req.set_cid(_cid);
//向服务器进行发送请求
_codec->send(_conn,req);
//等待服务器
waitResponse(rid);
//返回
return;
}
bool declareExchange(const std::string& name,
ys::ExchangeType type,
bool durable,
bool auto_delete,
google::protobuf::Map<std::string,std::string>& args)
{
//构造一个声明虚拟机的请求对象
ys::declareExchangeRequest req;
std::string rid=UUIDHelper::uuid();
req.set_rid(rid);
req.set_cid(_cid);
req.set_exchange_name(name);
req.set_exchange_type(type);
req.set_durable(durable);
req.set_auto_delete(auto_delete);
req.mutable_args()->swap(args);
//向服务器进行发送请求
_codec->send(_conn,req);
//等待服务器
basicCommonResponsePtr resp=waitResponse(rid);
//返回
return resp->ok();
}
void deleteExchange(const std::string& name)
{
ys::deleteExchangeRequest req;
std::string rid=UUIDHelper::uuid();
req.set_rid(rid);
req.set_cid(_cid);
req.set_exchange_name(name);
//向服务器进行发送请求
_codec->send(_conn,req);
//等待服务器
basicCommonResponsePtr resp=waitResponse(rid);
return;
}
bool declareQueue(const std::string& qname,
bool qdurable,
bool qexclusive,
bool qauto_delete,
google::protobuf::Map<std::string,std::string> qargs)
{
ys::declareQueueRequest req;
std::string rid=UUIDHelper::uuid();
req.set_rid(rid);
req.set_cid(_cid);
req.set_queue_name(qname);
req.set_durable(qdurable);
req.set_exclusive(qexclusive);
req.set_auto_delete(qauto_delete);
req.mutable_args()->swap(qargs);
//向服务器进行发送请求
_codec->send(_conn,req);
//等待服务器
basicCommonResponsePtr resp=waitResponse(rid);
//返回
return resp->ok();
}
void deleteQueue(const std::string& qname)
{
ys::deleteQueueRequest req;
std::string rid=UUIDHelper::uuid();
req.set_rid(rid);
req.set_cid(_cid);
req.set_queue_name(qname);
//向服务器进行发送请求
_codec->send(_conn,req);
//等待服务器
waitResponse(rid);
//返回
return;
}
bool queueBind(const std::string& ename,const std::string& qname,
const std::string& key)
{
ys::queueBindRequest req;
std::string rid=UUIDHelper::uuid();
req.set_rid(rid);
req.set_cid(_cid);
req.set_exchange_name(ename);
req.set_queue_name(qname);
req.set_binding_key(key);
//向服务器进行发送请求
_codec->send(_conn,req);
//等待服务器
basicCommonResponsePtr resp=waitResponse(rid);
//返回
return resp->ok();
}
void queueUnBind(const std::string& ename,const std::string& qname)
{
ys::queueUnBindRequest req;
std::string rid=UUIDHelper::uuid();
req.set_rid(rid);
req.set_cid(_cid);
req.set_exchange_name(ename);
req.set_queue_name(qname);
//向服务器进行发送请求
_codec->send(_conn,req);
//等待服务器
waitResponse(rid);
//返回
return;
}
bool basicPublish(const std::string &ename, ys::BasicProperties *bp,
const std::string &body)
{
std::string rid=UUIDHelper::uuid();
ys::basicPublishRequest req;
req.set_rid(rid);
req.set_cid(_cid);
req.set_body(body);
req.set_exchange_name(ename);
if(bp!=nullptr)
{
req.mutable_properties()->set_id(bp->id());
req.mutable_properties()->set_delivery_mode(bp->delivery_mode());
req.mutable_properties()->set_routing_key(bp->routing_key());
}
//向服务器进行发送请求
_codec->send(_conn,req);
//等待服务器
basicCommonResponsePtr resp=waitResponse(rid);
//返回
return resp->ok();
}
void basicAck(const std::string &msgid)//删除参数qname,用户知道消费了哪个队列
{
if(_consumer.get()==nullptr)
{
DLOG("消息确认时,找不到消费者的消息");
return;
}
std::string rid=UUIDHelper::uuid();
ys::basicAckRequest req;
req.set_rid(rid);
req.set_cid(_cid);
req.set_queue_name(_consumer->_qname);
req.set_message_id(msgid);
//向服务器进行发送请求
_codec->send(_conn,req);
//等待服务器
waitResponse(rid);
//返回
return;
}
bool basicConsume(const std::string consume_tag,const std::string queue_name,bool auto_ack,const ConsumerCallback &cb)
{
if(_consumer.get()!=nullptr)
{
DLOG("当前信道已订阅其他队列消息");
return false;
}
std::string rid=UUIDHelper::uuid();
ys::basicConsumeRequest req;
req.set_rid(rid);
req.set_cid(_cid);
req.set_consume_tag(consume_tag);
req.set_queue_name(queue_name);
req.set_auto_ack(auto_ack);
//向服务器进行发送请求
_codec->send(_conn,req);
//等待服务器
basicCommonResponsePtr resp=waitResponse(rid);
if(resp->ok()==false)
{
DLOG("添加订阅失败!");
return false;
}
_consumer=std::make_shared<Consumer>(consume_tag,queue_name,auto_ack,cb);
//返回
return resp->ok();
}
void basicCancel()
{
//不一定是消费者
if(_consumer.get()==nullptr)
{
DLOG("取消订阅时,找不到消费者信息");
return;
}
std::string rid=UUIDHelper::uuid();
ys::basicCancelRequest req;
req.set_rid(rid);
req.set_cid(_cid);
req.set_consume_tag(_consumer->_cid);
//向服务器进行发送请求
_codec->send(_conn,req);
//等待服务器
waitResponse(rid);
_consumer.reset();
//返回
return;
}
std::string cid()
{
return _cid;
}
public:
//连接收到基础响应后,向hash_map中进行添加响应
void putBasicResponse(const basicCommonResponsePtr& resp)
{
std::unique_lock<std::mutex> lock(_mutex);
_basic_resp.insert(std::make_pair(resp->rid(),resp));
_cv.notify_all();
}
//连接收到消息推送后,需要通过信道进行找到对应的消费对象,通过回调函数进行消息处理
void consume(const basicConsumeResponsePtr& resp)
{
if(_consumer.get()==nullptr)
{
DLOG("消息处理时,未找到订阅者消息");
return;
}
if(_consumer->_cid!=resp->consume_tag())
{
DLOG("收到的推送消息中的消费者标识,与当前信道的消费者标识不一致!");
return;
}
_consumer->_callback(resp->consume_tag(),resp->mutable_properties(),resp->body());
return;
}
private:
//等待请求的响应
basicCommonResponsePtr waitResponse(const std::string& rid)
{
std::unique_lock<std::mutex> lock(_mutex);
//while(condition()) _cv.wait();
_cv.wait(lock,[&rid,this]()
{
return _basic_resp.find(rid)!=_basic_resp.end();
});
basicCommonResponsePtr basic_resp=_basic_resp[rid];
_basic_resp.erase(rid);
return basic_resp;
}
private:
std::string _cid;
muduo::net::TcpConnectionPtr _conn;
ProtobufCodecPtr _codec;
Consumer::ptr _consumer;
std::mutex _mutex;
std::condition_variable _cv;
std::unordered_map<std::string,basicCommonResponsePtr> _basic_resp;
};
该模块的注意事项
_consumer 这个成员是不需要在构造函数时进行初始化,当前这个信道要进行订阅某个消息的时候,才能确定这个角色是一个消费者角色,此时在进去构建,要是再构造函数的过程中就进行去构建,万一这个信道的角色是发布客户端,就造成了资源的浪费
在移除信道的时候要是消费者需要进行取消订阅一下,因此添加一个析构函数
8.2.4、对提供创建信道操作
信道的增删查
class ChannelManager
{
public:
using ptr=std::shared_ptr<ChannelManager>;
ChannelManager()
{
}
Channel::ptr create(const muduo::net::TcpConnectionPtr& conn,const ProtobufCodecPtr& codec)
{
std::unique_lock<std::mutex> lock(_mutex);
auto channel =std::make_shared<Channel>(conn,codec);
_channels.insert(std::make_pair(channel->cid(),channel));
return channel;
}
void remove(const std::string& cid)
{
//进行删除的时候还需要进行考虑是否为消费者(是需要进行取消订阅)
std::unique_lock<std::mutex> lock(_mutex);
_channels.erase(cid);
}
Channel::ptr get(const std::string& cid)
{
auto pos=_channels.find(cid);
if(pos==_channels.end())
{
return Channel::ptr();
}
return pos->second;
}
private:
std::mutex _mutex;
std::unordered_map<std::string,Channel::ptr> _channels;
};
九、功能联调
9.1、联调的思想
- 必须有一个生产者客户端
- 声明一个交换机exchange1
- 声明两个队列queue1(其中binding_key=queue1)、queue2(binding_key=new.music.#)
- 将这两个队列和交换机进行绑定起来
- 搭建两个消费者客户端,分别进行各自订阅一个队列的消息
- 第一次,将交换机的类型进行定义为广播交换模式:理论结果是两个消费者客户端都能拿到消息
- 第二次,将交换机的类型进行定义为直接交换模式:routing_key=queue1,理论是只有queue1能拿到消息
- 第三次,将交换机的类型进行定义成主题交换模式:routing_key=news.music.pop,理论是只有queue2能拿到结果
9.2、搭建发布客户端
以广播模式下的测试为例子
int main()
{
//广播交换下进行测试
//直接交换下进行测试
//主题交换下进行测试
//1、实例化异步工作线程对象
brush::AsyncWorker::ptr awp=std::make_shared<brush::AsyncWorker>();
//2、实例化连接对象
brush::Connection::ptr conn=std::make_shared<brush::Connection>("127.0.0.1",8085,awp);
//3、通过连接进行创建信道
brush::Channel::ptr channel=conn->openChannel();
//4、通过信道提供的服务完成所需
//4.1、声明一个交换机exchange1,交换机的类型为广播模式
google::protobuf::Map<std::string, std::string> temp_args;
channel->declareExchange("exchange1",ys::TOPIC,true,false,temp_args);
//4.2、声明两个队列queue1和queue2
channel->declareQueue("queue1",true,false,false,temp_args);
channel->declareQueue("queue2",true,false,false,temp_args);
//4.3、绑定queue1-exchange1,且binding_key设置成queue1
// 绑定queue2-exchange1,且binding_key设置成news.music.#
channel->queueBind("exchange1","queue1","queue1");
channel->queueBind("exchange1","queue2","news.music.#");
//5、循环向交换机进行发布信息
//广播交换
// for(int i=1;i<10;i++)
// {
// std::string msg="Hello world-"+std::to_string(i);
// channel->basicPublish("exchange1",nullptr,msg);
// DLOG("向交换机exchange进行发布了消息:%s",msg.c_str());
// }
//直接交换
// for(int i=0;i<10;i++)
// {
// ys::BasicProperties bp;
// bp.set_id(brush::UUIDHelper::uuid());
// bp.set_delivery_mode(ys::DeliveryMode::DURABLE);
// bp.set_routing_key("queue1");
// std::string msg="Hello world-"+std::to_string(i);
// channel->basicPublish("exchange1",&bp,msg);
// DLOG("向交换机exchange进行发布了消息:%s",msg.c_str());
// }
//主题交换
for(int i=0;i<10;i++)
{
ys::BasicProperties bp;
bp.set_id(brush::UUIDHelper::uuid());
bp.set_delivery_mode(ys::DeliveryMode::DURABLE);
bp.set_routing_key("news.music.pop");
std::string msg="Hello world-"+std::to_string(i);
channel->basicPublish("exchange1",&bp,msg);
DLOG("向交换机exchange进行发布了消息:%s",msg.c_str());
}
ys::BasicProperties bp;
bp.set_id(brush::UUIDHelper::uuid());
bp.set_delivery_mode(ys::DeliveryMode::DURABLE);
bp.set_routing_key("news.music.sport");
std::string msg="Hello brush-";
channel->basicPublish("exchange1",&bp,msg);
DLOG("向交换机exchange进行发布了消息:%s",msg.c_str());
//6、关闭信道
//channel->closeChannel();
conn->closeChannel(channel);
}
9.3、搭建消费者客户端
同样是以广播模式下的测试为例子
//需要进行增加传参
void cb(brush::Channel::ptr& channel, const std::string& consumer_tag, const ys::BasicProperties *bp, const std::string&body)
{
DLOG("%s - 消费了消息: %s",consumer_tag.c_str(),body.c_str());
std::cout<<"body:"<<body<<std::endl;
channel->basicAck(bp->id());
}
int main(int argc,char*argv[])
{
if(argc!=2)
{
DLOG("usage: ./consumer_client queue1");
return -1;
}
//1、实例化异步工作线程对象
brush::AsyncWorker::ptr awp=std::make_shared<brush::AsyncWorker>();
//2、实例化连接对象
brush::Connection::ptr conn=std::make_shared<brush::Connection>("127.0.0.1",8085,awp);
//DLOG("实例化连接成功");
//3、通过连接进行创建信道
brush::Channel::ptr channel=conn->openChannel();
//DLOG("打开信道成功");
//4、通过信道提供的服务完成所需
//4.1、声明一个交换机exchange1,交换机的类型为广播模式
google::protobuf::Map<std::string, std::string> temp_args;
channel->declareExchange("exchange1",ys::TOPIC,true,false,temp_args);
//4.2、声明两个队列queue1和queue2
channel->declareQueue("queue1",true,false,false,temp_args);
channel->declareQueue("queue2",true,false,false,temp_args);
//4.3、绑定queue1-exchange1,且binding_key设置成queue1
// 绑定queue2-exchange1,且binding_key设置成news.music.#
channel->queueBind("exchange1","queue1","queue1");
channel->queueBind("exchange1","queue2","news.music.#");
//5、进行订阅队列的消息(回调函数对消息进行确认)
//auto functor=std::bind(cb,channel,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3);
auto functor = std::bind(cb, channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
channel->basicConsume("consumer1",argv[1],false,functor);
while(1) std::this_thread::sleep_for(std::chrono::seconds(3));
//6、关闭信道
conn->closeChannel(channel);
return 0;
}
十、项目的扩展
- 我们项目中只实现了一个虚拟机的版本,实际上是可以有多个虚拟机的
- 我们是通过代码进行搭建客户端进行访问服务器的,可以进行拓展成管理接口,然后通过可视化的界面进行客户端的搭建
- 交换机/队列的独占模式和自动删除
- 发送方式的确认(broker 给生产者进行确认应答)功能也可以进行拓展实现