前文实现了单服务器与多客户端之间的通信以及聊天业务,同时为了增大并发量利用nginx实现多服务器的集群负载均衡,但是一个关键的问题是要实现多服务器之间的通信,这里采用Redis的发布订阅消息队列实现。
不同客户端可能连接在不同服务器上,服务器可以向Redis发起订阅和发布请求,当有消息需要发送时以发布的形式写入Redis消息队列,当有接收的消息时,在订阅的前提下Redis负责将消息通知给服务器。
同步redis发布-订阅封装的代码主要提供以下方法:
1.connect:连接redis服务器生成了两个redisContext对象,一个用于发布消息(_publish_context),一个用于订阅通道(_subcribe_context)。
2.publish:在_publish_context上发布消息,id+msg,阻塞接收redis server的响应。
3.subscribe:在_subcribe_context上订阅通道,id,不接收redis server的响应。
4.unsubscribe:在_subcribe_context上取消订阅通道,id,不接收redis server的响应。
5.由于订阅/取消订阅接收响应都是阻塞型的,所以单独开辟线程thread:
通过observer_channel_message函数调用redisGetReply循环阻塞方式接收订阅通道中的消息,回调通知上层应用(id+msg)。
实现源代码如下:
#include"redis.hpp"
#include<iostream>
#include<thread>
using namespace std;
//构造函数,将订阅和发布的两个对象指针置空
Redis::Redis():_publish_context(nullptr),_subcribe_context(nullptr)
{
}
//析构函数,释放资源
Redis::~Redis(){
if(_publish_context!=nullptr){
redisFree(_publish_context);
}
if(_subcribe_context!=nullptr){
redisFree(_subcribe_context);
}
}
//连接redis服务器
bool Redis::connect(){
//两个对象连接redis服务器,redis默认ip+port=127.0.0.1:6379
_publish_context = redisConnect("127.0.0.1", 6379);
if(nullptr == _publish_context){
cerr<<"connect redis failed!"<<endl;
return false;
}
_subcribe_context = redisConnect("127.0.0.1", 6379);
if(nullptr == _subcribe_context){
cerr<<"connect redis failed!"<<endl;
return false;
}
//在单独的线程中,监听通道上的事件,有消息给业务层进行上报
//因为上报和订阅都是阻塞的,需要单独开辟一个线程进行消息上报,否则服务器无法进行其他业务
thread t([&](){
observer_channel_message();
});
t.detach();
cout<<"connect redis-server success!"<<endl;
return true;
}
//向redis指定的通道channel发布消息
bool Redis::publish(int channel, string message){
//相当于命令行 publish channel message
//redisCommand是同步操作,阻塞,相当于redisAppendCommand+redisBufferWrite+redisGetReply,pubilsh是一致性马上回复,所以可以阻塞等待
//redisAppendCommand是将命令组装好后放到本地缓存
//redisBufferWrite是将本地缓存的命令发送到redis服务器
//redisGetReply是从redis服务器获取返回的结果(阻塞型)
redisReply* reply = (redisReply*)redisCommand(this->_publish_context, "PUBLISH %d %s", channel, message.c_str());
if(nullptr == reply){
cerr<<"publish message failed!"<<endl;
return false;
}
freeReplyObject(reply);
return true;
}
//向redis指定的通道subscribe订阅消息
bool Redis::subscribe(int channel){
//subscribe命令本身会造成线程阻塞等待通道里发生消息,这里之作订阅通道,不接受通道消息
//通道消息的接收专门在observer_channel_message函数中的独立线程中进行
//只负责发送命令,不阻塞接收redis server响应消息,否则和notifyMsg线程抢占响应资源
if(REDIS_ERR == redisAppendCommand(this->_subcribe_context, "SUBSCRIBE %d", channel)){
cerr<<"subscribe channel failed!"<<endl;
return false;
}
//redisBufferWrite可以循环发送缓冲区,直到缓冲区数据发送完毕(done被置为1)
int done = 0;
while(!done){
if(REDIS_ERR == redisBufferWrite(this->_subcribe_context, &done)){
cerr<<"subscribe channel failed!"<<endl;
return false;
}
}
return true;
}
//向redis指定的通道unsubscribe取消订阅消息(用户下线,无需订阅)
bool Redis::unsubscribe(int channel){
if(REDIS_ERR == redisAppendCommand(this->_subcribe_context, "UNSUBSCRIBE %d", channel)){
cerr<<"subscribe channel failed!"<<endl;
return false;
}
//redisBufferWrite可以循环发送缓冲区,直到缓冲区数据发送完毕(done被置为1)
int done = 0;
while(!done){
if(REDIS_ERR == redisBufferWrite(this->_subcribe_context, &done)){
cerr<<"subscribe channel failed!"<<endl;
return false;
}
}
return true;
}
//在独立线程中接收订阅通道中的消息
void Redis::observer_channel_message(){
redisReply* reply = nullptr;
//以循环阻塞的方式等待通道发生的消息
while(REDIS_OK == redisGetReply(this->_subcribe_context, (void**)&reply)){
//订阅收到的消息是一个redisReply对象,根据不同的消息类型进行处理
//订阅收到的消息是一个带三元素的数组
if(reply != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr){
//回调通知上层应用,收到订阅的消息
_notify_message_handler(atoi(reply->element[1]->str), reply->element[2]->str);
}
//释放redisReply对象
freeReplyObject(reply);
}
cerr<<"exit observer_channel_message"<<endl;
}
//初始化向业务层上报通道消息的回调对象
void Redis::init_notify_handler(function<void(int, string)> fn)
{
this->_notify_message_handler = fn;
}
/*
同步redis发布-订阅封装的代码主要提供以下方法:
1.connect:连接redis服务器生成了两个redisContext对象,一个用于发布消息(_publish_context),一个用于订阅通道(_subcribe_context)。
2.publish:在_publish_context上发布消息,id+msg,阻塞接收redis server的响应。
3.subscribe:在_subcribe_context上订阅通道,id,不接收redis server的响应。
4.unsubscribe:在_subcribe_context上取消订阅通道,id,不接收redis server的响应。
5.由于订阅/取消订阅接收响应都是阻塞型的,所以单独开辟线程thread:
通过observer_channel_message函数调用redisGetReply循环阻塞方式接收订阅通道中的消息,回调通知上层应用(id+msg)。
*/
在业务层面,由单机模式变为集群模式,因此逻辑需要发生变化:
1. 登录成功后,向Redis订阅channel(id)
//id用户登录成功后,向redis订阅channel(id)
_redis.subscribe(id)tuichu
2. 退出登录或者客户端异常退出后,取消订阅Redis channel
//用户下线,取消订阅redis channel
_redis.unsubscribe(user.getId());
3. 一对一聊天或者群聊业务时:
(1)先检查本服务器是否存在目标用户(检查userConnMap),若存在则直接转发消息
(2)如果userConnMap没有找到,则检查User数据库用户是否在线,若在线则说明在其他服务器,需要向该channel(id)发布消息
(3)若不在线,则直接写入离线消息数据库即可
//群组聊天业务
void ChatService::groupChat(const TcpConnectionPtr &conn,json &js,Timestamp time)
{
int userid = js["id"].get<int>();
int groupid = js["groupid"].get<int>();
//获取群组中所有其他用户id
vector<int> useridVec = _groupModel.queryGroupUsers(userid,groupid);
//加锁,防止_userConnMap中的用户在发送消息时候上线或者下线,C++中map的操作本身是无法保证线程安全的
lock_guard<mutex> lock(_connMutex);
for(int id:useridVec){
auto it = _userConnMap.find(id);
//用户在线,并且在本台服务器,直接转发消息
if(it!=_userConnMap.end()){
it->second->send(js.dump());
}
//用户在其他服务器上或者不在线
else{
//查询其他用户是否在线,查询数据库
User user = _userModel.query(id);
if(user.getState()=="online"){
//用户在线,但是不在本服务器上,转发消息到redis消息队列
_redis.publish(id,js.dump());
}
else{
//存储离线消息
_offlineMsgModel.insert(id,js.dump());
}
}
}
}
Redis连接后开辟单独的线程监听订阅的消息,当收到发布的消息后回调通知上层的应用。上层服务器收到通知后执行回调操作进行消息转发。
redis.cpp 监听通道中的消息:
//在独立线程中接收订阅通道中的消息
void Redis::observer_channel_message(){
redisReply* reply = nullptr;
//以循环阻塞的方式等待通道发生的消息
while(REDIS_OK == redisGetReply(this->_subcribe_context, (void**)&reply)){
//订阅收到的消息是一个redisReply对象,根据不同的消息类型进行处理
//订阅收到的消息是一个带三元素的数组
if(reply != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr){
//回调通知上层应用,收到订阅的消息
_notify_message_handler(atoi(reply->element[1]->str), reply->element[2]->str);
}
//释放redisReply对象
freeReplyObject(reply);
}
cerr<<"exit observer_channel_message"<<endl;
}
chatservice.cpp 执行回调操作,转发通道消息:
//连接redis服务器
if(_redis.connect()){
//设置上报消息的回调
_redis.init_notify_handler(std::bind(&ChatService::handleRedisSubscribeMessage,this,_1,_2));
}
//从redis消息队列中获取订阅的消息,将msg转发给对应的userid
void ChatService::handleRedisSubscribeMessage(int userid,string msg)
{
lock_guard<mutex> lock(_connMutex);
auto it = _userConnMap.find(userid);
if(it!=_userConnMap.end()){
it->second->send(msg);
return;
}
//存储离线消息,这里主要考虑在上报和调用回调过程中用户突然下线的情况
_offlineMsgModel.insert(userid,msg);
}
测试结果:
两台客户端分别连接两台服务器,并且可以实现通信。