目录
队列消费者/订阅者管理
客户端这边每当发起一个订阅请求,意味着服务器这边就多了一个订阅者(处理消息的客户端描述),而这个消费者或者说订阅者它是和队列直接关联的,因为订阅请求中会描述当前用户想要订阅哪一个队列的消息。
而一个信道关闭的时候,或者队列被删除的时候,那么这个信道或队列关联的消费者也就没有存在的意义了,因此也需要将相关的消费者信息给删除掉。
基于以上需求,因此需要对订阅者信息进行管理。
- 定义消费者信息结构
- 消费者标识。
- 订阅的队列名称。
- 一个消息的处理回调函数(实现的是当发布一条消息到队列,则选择消费者进行消费,如何消费?对于服务端来说就是调用这个个回调函数进行处理,其内部逻辑就是找到消费者对应的连接,然后将数据发送给消费者对应的客户端)。
- void(const std::string&, const BasicProperties&, const std::string&)
- 是否自动应答标志。(一个消息被消费者消费后,若自动应答,则直接移除待确认消息,否则等待客户端确认)。
- 消费者管理--以队列为单元进行管理-队列消费者管理结构
- 操作
- 新增消费者:信道提供的服务是订阅队列消息的时候创建。
- 删除消费者:取消订阅 / 信道关闭 / 连接关闭 的时候删除。
- 获取消费者:从队列所有的消费者中按序取出一个消费者进行消息的推送。
- 判断队列消费者是否为空。
- 判断指定消费者是否存在。
- 清理队列所有消费者。
- 元素
- 消费者管理结构:vector。
- 轮转序号:一个队列可能会有多个消费者,但是一条消息只需要被一个消费者消费即可,因此采用 RR 轮转。
- 互斥锁:保证线程安全。
- 队列名称。
- 操作
- 对消费者进行统一管理结构
- 初始化/删除队列的消费者信息结构(创建/删除队列的时候初始化)。
- 向指定队列新增消费者(客户端订阅指定队列消息的时候):新增完成的时候返回消费者对象。
- 从指定队列移除消费者(客户端取消订阅的时候)。
- 移除指定队列的所有消费者(队列被删除时销毁):删除消费者的队列管理单元对象。
- 从指定队列获取一个消费者(轮询获取-消费者轮换消费起到负载均衡的作用)。
- 判断队列中消费者是否为空。
- 判断队列中指定消费者是否存在。
- 清理所有消费者。
代码实现
#pragma once
#include "../mqcommon/helper.hpp"
#include "../mqcommon/log.hpp"
#include "../mqcommon/msg.pb.h"
#include <unordered_map>
#include <memory>
#include <functional>
namespace jiuqi
{
// 消费者表示, 属性, 消息
using ConsumerCallback = std::function<void(const std::string&, const BasicProperties*, const std::string&)>;
struct Consumer
{
using ptr = std::shared_ptr<Consumer>;
std::string tag; // 消费者标识
std::string qname; // 绑定的队列名称
bool auto_ack; // 是否自动应答
ConsumerCallback callback; // 回调函数
Consumer() { DEBUG("new Consumer %p", this); }
Consumer(const std::string &ctag, const std::string &queue_name, bool ack, const ConsumerCallback &cb)
: tag(ctag), qname(queue_name), auto_ack(ack), callback(cb) { DEBUG("new Consumer %p", this); }
~Consumer() { DEBUG("del Consumer %p", this); }
};
// 以队列为单元的消费者管理结构
class QueueConsumer
{
public:
using ptr = std::shared_ptr<QueueConsumer>;
QueueConsumer(const std::string &qname) : _qname(qname), _rr_seq(0) {}
// 新增消费者
Consumer::ptr create(const std::string &ctag, bool ack, const ConsumerCallback &cb)
{
std::unique_lock<std::mutex> lock(_mutex);
// 判断是否重复
for (auto &consumer : _consumers)
if (consumer->tag == ctag)
return nullptr;
// 构建消费者
Consumer::ptr consumer = std::make_shared<Consumer>(ctag, _qname, ack, cb);
_consumers.push_back(consumer);
return consumer;
}
// 移除消费者
void remove(const std::string &ctag)
{
std::unique_lock<std::mutex> lock(_mutex);
for (auto it = _consumers.begin(); it != _consumers.end(); it++)
{
if ((*it)->tag == ctag)
{
_consumers.erase(it);
return;
}
}
}
// 获取消费者
Consumer::ptr choose()
{
std::unique_lock<std::mutex> lock(_mutex);
if (_consumers.empty())
return nullptr;
int index = _rr_seq % _consumers.size();
++_rr_seq;
return _consumers[index];
}
// 是否为空
bool empty()
{
std::unique_lock<std::mutex> lock(_mutex);
return _consumers.empty();
}
// 某个消费者是否存在
bool exists(const std::string &ctag)
{
std::unique_lock<std::mutex> lock(_mutex);
for (auto it = _consumers.begin(); it != _consumers.end(); it++)
if ((*it)->tag == ctag)
return true;
return false;
}
// 清空
void clear()
{
std::unique_lock<std::mutex> lock(_mutex);
_consumers.clear();
_rr_seq = 0;
}
private:
std::string _qname;
std::mutex _mutex;
uint64_t _rr_seq; // 轮转序号
std::vector<Consumer::ptr> _consumers;
};
class ConsumerManager
{
public:
using ptr = std::shared_ptr<ConsumerManager>;
ConsumerManager() {}
void initQueueConsumer(const std::string &qname)
{
std::unique_lock<std::mutex> lock_mutex;
auto it = _qconsumers.find(qname);
if (it != _qconsumers.end())
return;
QueueConsumer::ptr qc = std::make_shared<QueueConsumer>(qname);
_qconsumers.insert(std::make_pair(qname, qc));
}
void destoryQueueConsumer(const std::string &qname)
{
std::unique_lock<std::mutex> lock_mutex;
for (auto it = _qconsumers.begin(); it != _qconsumers.end(); it++)
{
if (it->first == qname)
{
_qconsumers.erase(it);
return;
}
}
}
Consumer::ptr create(const std::string &ctag, const std::string &qname, bool ack, const ConsumerCallback &cb)
{
QueueConsumer::ptr qcp;
{
std::unique_lock<std::mutex> lock_mutex;
auto it = _qconsumers.find(qname);
if (it == _qconsumers.end())
{
DEBUG("没有找到 %s 的消费者管理句柄", qname.c_str());
return nullptr;
}
qcp = it->second;
}
return qcp->create(ctag, ack, cb);
}
void remove(const std::string &ctag, const std::string &qname)
{
QueueConsumer::ptr qcp;
{
std::unique_lock<std::mutex> lock_mutex;
auto it = _qconsumers.find(qname);
if (it == _qconsumers.end())
{
DEBUG("没有找到 %s 的消费者管理句柄", qname.c_str());
return;
}
qcp = it->second;
}
return qcp->remove(ctag);
}
Consumer::ptr choose(const std::string &qname)
{
QueueConsumer::ptr qcp;
{
std::unique_lock<std::mutex> lock_mutex;
auto it = _qconsumers.find(qname);
if (it == _qconsumers.end())
{
DEBUG("没有找到 %s 的消费者管理句柄", qname.c_str());
return nullptr;
}
qcp = it->second;
}
return qcp->choose();
}
bool empty(const std::string &qname)
{
QueueConsumer::ptr qcp;
{
std::unique_lock<std::mutex> lock_mutex;
auto it = _qconsumers.find(qname);
if (it == _qconsumers.end())
{
DEBUG("没有找到 %s 的消费者管理句柄", qname.c_str());
return true;
}
qcp = it->second;
}
return qcp->empty();
}
bool exists(const std::string &ctag, const std::string &qname)
{
QueueConsumer::ptr qcp;
{
std::unique_lock<std::mutex> lock_mutex;
auto it = _qconsumers.find(qname);
if (it == _qconsumers.end())
{
DEBUG("没有找到 %s 的消费者管理句柄", qname.c_str());
return false;
}
qcp = it->second;
}
return qcp->exists(ctag);
}
void clear()
{
std::unique_lock<std::mutex> lock_mutex;
_qconsumers.clear();
}
private:
std::mutex _mutex;
std::unordered_map<std::string, QueueConsumer::ptr> _qconsumers;
};
}
信道管理模块
在 AMQP 模型中,除了通信连接 Connection 概念外,还有一个 Channel 的概念,Channel 是针对 Connection 连接的一个更细粒度的通信信道,多个 Channel 可以使用同一个通信连接 Connection 进行通信,但是同一个 Connection 的 Channel 之间相互独立。而信道模块就是再次将上述模块进行整合提供服务的模块。
- 管理信息:
- 信道 ID:信道的唯一标识。
- 信道关联的消费者:用于消费者信道在关闭的时候取消订阅,删除订阅者信息。
- 信道关联的连接:用于向客户端发送数据(响应,推送的消息)。
- protobuf 协议处理句柄:网络通信前的协议处理。
- 消费者管理句柄:信道关闭/取消订阅的时候,通过句柄删除订阅者信息。
- 虚拟机句柄:交换机/队列/绑定/消息数据管理。
- 工作线程池句柄(一条消息被发布到队列后,需要将消息推送给订阅了对应队列的消费者,过程由线程池完成)。
- 管理操作:
- 提供声明&删除交换机操作(删除交换机的同时删除交换机关联的绑定信息)b. 提供声明&删除队列操作(删除队列的同时,删除队列关联的绑定信息,消息,消费者信息)。
- 提供绑定&解绑队列操作。
- 提供订阅&取消订阅队列消息操作。
- 提供发布&确认消息操作。
- 信道管理
- 信道的增删查。
代码实现
#pragma once
#include "muduo/net/TcpConnection.h"
#include "muduo/proto/codec.h"
#include "muduo/proto/dispatcher.h"
#include "../mqcommon/helper.hpp"
#include "../mqcommon/log.hpp"
#include "../mqcommon/msg.pb.h"
#include "../mqcommon/proto.pb.h"
#include "../mqcommon/threadpool.hpp"
#include "route.hpp"
#include "consumer.hpp"
#include "vhost.hpp"
#include <unordered_map>
#include <memory>
namespace jiuqi
{
using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;
using openChannelRequestPtr = std::shared_ptr<openChannelRequest>;
using closeChannelRequestPtr = std::shared_ptr<closeChannelRequest>;
using declareExchangeRequestPtr = std::shared_ptr<declareExchangeRequest>;
using deleteExchangeRequestPtr = std::shared_ptr<deleteExchangeRequest>;
using declareQueueRequestPtr = std::shared_ptr<declareQueueRequest>;
using deleteQueueRequestPtr = std::shared_ptr<deleteQueueRequest>;
using queueBindRequestPtr = std::shared_ptr<queueBindRequest>;
using queueUnbindRequestPtr = std::shared_ptr<queueUnbindRequest>;
using basicPublishRequestPtr = std::shared_ptr<basicPublishRequest>;
using basicAckRequestPtr = std::shared_ptr<basicAckRequest>;
using basicConsumeRequestPtr = std::shared_ptr<basicConsumeRequest>;
using basicCancelRequestPtr = std::shared_ptr<basicCancelRequest>;
class Channel
{
public:
using ptr = std::shared_ptr<Channel>;
Channel(const std::string &cid,
const muduo::net::TcpConnectionPtr &conn,
const ProtobufCodecPtr &codec,
const ConsumerManager::ptr &cmp,
const VirtualHost::ptr &vhost,
const ThreadPool::ptr &pool)
: _cid(cid),
_conn(conn),
_codec(codec),
_cmp(cmp),
_vhost(vhost),
_pool(pool)
{
DEBUG("new Channel %p", this);
}
~Channel()
{
if (_consumer != nullptr)
_cmp->remove(_consumer->tag, _consumer->qname);
DEBUG("del Channel %p", this);
}
// 交换机的声明与删除
void declareExchange(const declareExchangeRequestPtr &req)
{
bool ret = _vhost->declareExchange(req->ename(), req->etype(), req->durable(), req->auto_delete(), req->args());
basicResponse(ret, req->rid(), req->cid());
}
void deleteExchange(const deleteExchangeRequestPtr &req)
{
_vhost->deleteExchange(req->ename());
basicResponse(true, req->rid(), req->cid());
}
// 队列的声明与删除
void declareQueue(const declareQueueRequestPtr &req)
{
bool ret = _vhost->declareQueue(req->qname(), req->durable(), req->exclusive(), req->auto_delete(), req->args());
if (ret)
_cmp->initQueueConsumer(req->qname());
basicResponse(ret, req->rid(), req->cid());
}
void deleteQueue(const deleteQueueRequestPtr &req)
{
_cmp->destoryQueueConsumer(req->qname());
_vhost->deleteQueue(req->qname());
basicResponse(true, req->rid(), req->cid());
}
// 队列的绑定与解绑
void queueBind(const queueBindRequestPtr &req)
{
bool ret = _vhost->bind(req->ename(), req->qname(), req->bindingkey());
basicResponse(ret, req->rid(), req->cid());
}
void queueUnbind(const queueUnbindRequestPtr &req)
{
_vhost->unBind(req->ename(), req->qname());
basicResponse(true, req->rid(), req->cid());
}
// 消息的发布
void basicPublish(const basicPublishRequestPtr &req)
{
// 判断交换机是否存在
auto ep = _vhost->selectExchange(req->ename());
if (ep == nullptr)
basicResponse(false, req->rid(), req->cid());
// 进行交换路由
QueueBindingMap qbm = _vhost->exchangeBinding(req->ename());
BasicProperties *bp = nullptr;
std::string routekey;
if (req->has_properties())
{
routekey = req->properties().routing_key();
bp = req->mutable_properties();
}
for (auto &binding : qbm)
{
if (Router::route(ep->type, routekey, binding.second->bindingKey))
{
// 将消息添加到队列中
_vhost->basicPublish(binding.first, bp, req->body());
// 向线程池中添加一个消息消费任务(向指定队列的订阅者推送消息)
auto task = std::bind(&Channel::consume, this, binding.first);
_pool->push(task);
}
}
basicResponse(true, req->rid(), req->cid());
}
// 消息的确认
void basicAck(const basicAckRequestPtr &req)
{
_vhost->basicAck(req->qname(), req->mid());
basicResponse(true, req->rid(), req->cid());
}
// 订阅队列消息
void basicConsumer(const basicConsumeRequestPtr &req)
{
// 判断队列是否存在
bool ret = _vhost->existsQueue(req->qname());
if (!ret)
return basicResponse(false, req->rid(), req->cid());
// 创建队列消费者
auto cb = std::bind(&Channel::callback, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
// 创建了消费者之后, 当前的channel角色就是个消费者
_consumer = _cmp->create(req->ctag(), req->qname(), req->auto_ack(), cb);
basicResponse(true, req->rid(), req->cid());
}
// 取消订阅
void basicCancel(const basicCancelRequestPtr &req)
{
_cmp->remove(req->ctag(), req->qname());
basicResponse(true, req->rid(), req->cid());
}
private:
void basicResponse(bool ok, const std::string &rid, const std::string &cid)
{
basicCommonResponse resp;
resp.set_rid(rid);
resp.set_cid(cid);
resp.set_ok(ok);
_codec->send(_conn, resp);
}
void consume(const std::string &qname)
{
MessagePtr mp = _vhost->basicConsume(qname);
if (mp == nullptr)
{
DEBUG("%s 队列无消息", qname.c_str());
return;
}
Consumer::ptr cp = _cmp->choose(qname);
if (cp == nullptr)
{
DEBUG("%s 队列无消费者", qname.c_str())
return;
}
cp->callback(cp->tag, mp->mutable_payload()->mutable_properties(), mp->payload().body());
DEBUG("消费者: %s", cp->tag.c_str());
if (cp->auto_ack)
_vhost->basicAck(qname, mp->payload().properties().id());
}
void callback(const std::string &tag, const BasicProperties *bp, const std::string &body)
{
basicConsumeResponse resp;
resp.set_cid(_cid);
resp.set_ctag(tag);
resp.set_body(body);
if (bp)
{
resp.mutable_properties()->set_id(bp->id());
resp.mutable_properties()->set_deliver_mode(bp->deliver_mode());
resp.mutable_properties()->set_routing_key(bp->routing_key());
}
_codec->send(_conn, resp);
}
private:
std::string _cid;
Consumer::ptr _consumer;
muduo::net::TcpConnectionPtr _conn;
ProtobufCodecPtr _codec;
ConsumerManager::ptr _cmp;
VirtualHost::ptr _vhost;
ThreadPool::ptr _pool;
};
class ChannelManager
{
public:
using ptr = std::shared_ptr<ChannelManager>;
ChannelManager() {}
bool openChannl(const std::string &cid,
const muduo::net::TcpConnectionPtr &conn,
const ProtobufCodecPtr &codec,
const ConsumerManager::ptr &cmp,
const VirtualHost::ptr &vhost,
const ThreadPool::ptr &pool)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _channels.find(cid);
if (it != _channels.end()) return false;
auto channel = std::make_shared<Channel>(cid, conn, codec, cmp, vhost, pool);
_channels.insert(std::make_pair(cid, channel));
return true;
}
void closeChannel(const std::string &cid)
{
std::unique_lock<std::mutex> lock(_mutex);
_channels.erase(cid);
}
Channel::ptr getChannel(const std::string &cid)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _channels.find(cid);
if (it == _channels.end()) return nullptr;
return it->second;
}
private:
std::mutex _mutex;
std::unordered_map<std::string, Channel::ptr> _channels;
};
}
连接管理模块
向用户提供一个用于实现网络通信的 Connection 对象,从其内部可创建出粒度更轻的Channel 对象,用于与客户端进行网络通信。
- 成员信息:
- 连接关联的信道管理句柄(实现信道的增删查)。
- 连接关联的实际用于通信的 muduo::net::Connection 连接。
- protobuf 协议处理的句柄(ProtobufCodec 对象)。
- 消费者管理句柄。
- 虚拟机句柄。
- 异步工作线程池句柄。
- 连接操作:
- 提供创建 Channel 信道的操作。
- 提供删除 Channel 信道的操作。
- 连接管理:
- 连接的增删查。
代码实现
#pragma once
#include "channel.hpp"
namespace jiuqi
{
class Connection
{
public:
using ptr = std::shared_ptr<Connection>;
Connection(const muduo::net::TcpConnectionPtr &conn,
const ProtobufCodecPtr &codec,
const ConsumerManager::ptr &cmp,
const VirtualHost::ptr &vhost,
const ThreadPool::ptr &pool)
: _conn(conn),
_codec(codec),
_cmp(cmp),
_vhost(vhost),
_pool(pool),
_channels(std::make_shared<ChannelManager>()) {}
void openChannel(const openChannelRequestPtr &req)
{
bool ret = _channels->openChannl(req->cid(), _conn, _codec, _cmp, _vhost, _pool);
basicResponse(ret, req->rid(), req->cid());
}
void closeChannel(const closeChannelRequestPtr &req)
{
_channels->closeChannel(req->cid());
basicResponse(true, req->rid(), req->cid());
}
Channel::ptr getChannel(const std::string &cid)
{
return _channels->getChannel(cid);
}
private:
void basicResponse(bool ok, const std::string &rid, const std::string &cid)
{
basicCommonResponse resp;
resp.set_rid(rid);
resp.set_cid(cid);
resp.set_ok(ok);
_codec->send(_conn, resp);
}
private:
muduo::net::TcpConnectionPtr _conn;
ProtobufCodecPtr _codec;
ConsumerManager::ptr _cmp;
VirtualHost::ptr _vhost;
ThreadPool::ptr _pool;
ChannelManager::ptr _channels;
};
class ConnectionManager
{
public:
using ptr = std::shared_ptr<ConnectionManager>;
ConnectionManager() {}
void newConnection(const muduo::net::TcpConnectionPtr &conn,
const ProtobufCodecPtr &codec,
const ConsumerManager::ptr &cmp,
const VirtualHost::ptr &vhost,
const ThreadPool::ptr &pool)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _conns.find(conn);
if (it != _conns.end()) return;
auto cnp = std::make_shared<Connection>(conn, codec, cmp, vhost, pool);
_conns.insert(std::make_pair(conn, cnp));
}
void deleteConnection(const muduo::net::TcpConnectionPtr &conn)
{
std::unique_lock<std::mutex> lock(_mutex);
_conns.erase(conn);
}
Connection::ptr getConnection(const muduo::net::TcpConnectionPtr &conn)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _conns.find(conn);
if (it == _conns.end()) return nullptr;
return it->second;
}
private:
std::mutex _mutex;
std::unordered_map<muduo::net::TcpConnectionPtr, Connection::ptr> _conns;
};
}