1. 客户端TCP管理类
聊天服务需要维持一个长链接,方便服务端和客户端双向通信,需要TCPMgr管理TCP连接
.h
#ifndef TCPMGR_H
#define TCPMGR_H
#include <QObject>
#include <QTcpSocket>
#include "Struct.h"
#include "Enum.h"
#include "singletion.h"
#include <functional>
#include "Enum.h"
using CallBack = std::function<void(ReqID id, int len, QByteArray data)>;
class TCPMgr : public QObject, public Singleton<TCPMgr>
{
Q_OBJECT
public:
friend class Singleton<TCPMgr>;
public:
~TCPMgr();
public slots:
void SlotTcpConnect(ServerInfo serverInfo);
void SlotSendData(ReqID reqId, QString data);
signals:
void SigConnectSuccess(bool bSuccess);
void SigSendData(ReqID reqId, QString data);
void SigLoginFailed(int);
void SigSwitchChatWnd();
private:
TCPMgr(QObject* parent = 0);
void BindSlots();
void InitHandlers();
void HandleMsg(ReqID id, int len, QByteArray data);
private:
QTcpSocket* m_TcpSocket = nullptr;
QString m_Host;
int m_Port;
QByteArray m_RecvData;
quint16 m_MessageID;
quint16 m_MessageLen;
QMap<ReqID, CallBack> _handlers;
};
#endif // TCPMGR_H
.cpp
#include "TCPMgr.h"
#include <QDebug>
#include "UserMgr.h"
#include <QJsonDocument>
#include <QJsonObject>
TCPMgr::TCPMgr(QObject* parent)
: m_Host(""), m_Port(0), m_MessageID(0), m_MessageLen(0), QObject(parent)
{
BindSlots();
}
void TCPMgr::BindSlots()
{
/* 连接成功时 */
connect(m_TcpSocket, &QTcpSocket::connected, [this]()
{
emit SigConnectSuccess(true);
});
/* 服务器发来数据时 */
connect(m_TcpSocket, &QTcpSocket::readyRead, [this]()
{
if (m_TcpSocket->bytesAvailable() <= 0)
{
qDebug() << QString::fromLocal8Bit("接受缓冲区为空!!!");
return;
}
m_RecvData.append(m_TcpSocket->readAll());
QDataStream in(&m_RecvData, QIODevice::ReadOnly);
in.setVersion(QDataStream::Qt_5_0);
while (1)
{
// 先判断头部长度是否合理
if (m_RecvData.size() < static_cast<int>(sizeof(quint16) * 2))
{
qDebug() << QString::fromLocal8Bit("数据小于一个消息头!!!");
return;
}
// 判断接受的数据是否完整
in >> m_MessageID >> m_MessageLen;
qDebug() << QString::fromLocal8Bit("收到消息ID:") << m_MessageID << QString::fromLocal8Bit("长度:") << m_MessageLen;
if (m_RecvData.size() < m_MessageLen)
{
qDebug() << QString::fromLocal8Bit("数据不完整!!!");
return;
}
// 解析消息体
QByteArray messageBody = m_RecvData.mid(2 * sizeof(quint16), m_MessageLen);
m_RecvData.remove(0, m_MessageLen + 2 * sizeof(quint16));
HandleMsg((ReqID)m_MessageID, m_MessageLen, messageBody);
}
});
/* 处理错误 */
connect(m_TcpSocket, static_cast<void (QTcpSocket::*)(QTcpSocket::SocketError)>(&QTcpSocket::error),
[this]()
{
qDebug() << "Error: " << m_TcpSocket->errorString();
switch (m_TcpSocket->error())
{
case QTcpSocket::ConnectionRefusedError:
qDebug() << QString::fromLocal8Bit("连接被拒绝!!!");
emit SigConnectSuccess(false);
break;
case QTcpSocket::RemoteHostClosedError:
qDebug() << QString::fromLocal8Bit("远程主机关闭连接!!!");
break;
case QTcpSocket::HostNotFoundError:
qDebug() << QString::fromLocal8Bit("主机未找到!!!");
break;
case QTcpSocket::NetworkError:
qDebug() << QString::fromLocal8Bit("网络错误!!!");
break;
case QTcpSocket::SocketTimeoutError:
qDebug() << QString::fromLocal8Bit("套接字超时错误!!!");
emit SigConnectSuccess(false);
break;
default:
qDebug() << QString::fromLocal8Bit("未知错误!!!");
break;
}
});
/* 连接断开 */
connect(m_TcpSocket, &QTcpSocket::disconnected, [this]()
{
qDebug() << QString::fromLocal8Bit("连接断开!!!");
});
/* 发送数据 */
connect(this, &TCPMgr::SigSendData, this, &TCPMgr::SlotSendData);
}
void TCPMgr::InitHandlers()
{
_handlers.insert(ReqID::ID_CHAT_LOGIN_RESPONSE, [this](ReqID id, int len, QByteArray data)
{
Q_UNUSED(id);
qDebug() << "handle id is" << id << "len is" << len << "data is" << data;
QJsonDocument jsonDoc = QJsonDocument::fromJson(data);
if (jsonDoc.isNull())
{
qDebug() << QString::fromLocal8Bit("Json解析失败!!!");
return;
}
QJsonObject jsonObj = jsonDoc.object();
if (!jsonObj.contains("error"))
{
int error = ErrorCodes::ERROR_JSON;
qDebug() << QString::fromLocal8Bit("Json中没有error字段!!!");
emit SigLoginFailed(error);
return;
}
int error = jsonObj["error"].toInt();
if (error != ErrorCodes::SUCCESS)
{
qDebug() << QString::fromLocal8Bit("登录失败!!!") << error;
emit SigLoginFailed(error);
return;
}
UserMgr::Instance()->SetUid(jsonObj["uid"].toInt());
UserMgr::Instance()->SetName(jsonObj["name"].toString());
UserMgr::Instance()->SetToken(jsonObj["token"].toString());
emit SigSwitchChatWnd();
});
}
void TCPMgr::HandleMsg(ReqID id, int len, QByteArray data)
{
auto it = _handlers.find(id);
if (it != _handlers.end())
{
it.value()(id, len, data);
}
}
TCPMgr::~TCPMgr()
{
}
/* 登录成功时连接到服务器 */
void TCPMgr::SlotTcpConnect(ServerInfo serverInfo)
{
qDebug() << QString::fromLocal8Bit("开始连接服务器...");
m_Host = serverInfo.Host;
m_Port = serverInfo.Port.toInt();
m_TcpSocket->connectToHost(m_Host, m_Port);
}
/* 在多线程的情况下使用单例,进行写操作,需要加锁,如果不加锁则采用信号槽来自动排队,保障线程安全 */
void TCPMgr::SlotSendData(ReqID reqId, QString data)
{
uint16_t ID = reqId;
// 将字符数组转换成UTF-8编码的字节数组
QByteArray message = data.toUtf8();
// 计算消息体的长度
quint16 len = static_cast<quint16>(message.size());
// 创建一个QByteArray用于存储要发送的所有数据
QByteArray sendData;
QDataStream out(&sendData, QIODevice::WriteOnly);
// 设置数据流使用网络字节序
out.setByteOrder(QDataStream::BigEndian);
out << ID << len;
sendData.append(message);
// 发送数据
m_TcpSocket->write(sendData);
}
2. 连接到聊天服务器
当登录成功时,会接收到服务器发来的登录回复,里面包含聊天服务器的IP,然后向聊天服务器发起连接
1. 服务器向客户端发送登录请求的响应报文
2. 客户端处理服务器发送的响应报文
3. 客户端接收到服务器发来的聊天服务器的IP,向聊天服务器发送TCP连接
连接成功时,调用连接成功的槽函数,并向聊天服务器发送登录请求
根据消息类型发送数据
3. 聊天服务器
一个TCP服务器必然会有连接的接受,维持,收发数据等逻辑
3.1 main.cpp
#include <iostream>
#include <csignal>
#include <thread>
#include <mutex>
#include <boost/beast/http.hpp>
#include <boost/beast.hpp>
#include <boost/asio.hpp>
#include "AsioIOServicePool.h"
#include "LogicSystem.h"
#include "CServer.h"
#include "ServerStatic.h"
using namespace std;
bool bStop = false;
condition_variable cond_quit;
mutex mtx_quit;
int main()
{
auto pool = AsioIOServicePool::GetInstance();
boost::asio::io_context io_ct;
boost::asio::signal_set signals(io_ct, SIGINT, SIGTERM);
signals.async_wait([&io_ct, pool](auto, auto)
{
io_ct.stop();
pool->Stop();
});
const int port = get<int>(ServerStatic::ParseConfig("SelfServer", "Port"));
unsigned short port_ushort = static_cast<unsigned short>(port);
CServer server(io_ct, port_ushort); // 创建监听TCP连接的服务端
io_ct.run();
return 0;
}
配置文件
{
"GateServer": {
"Port": 8080
},
"VerifyServer": {
"Port": 50051
},
"StatusServer": {
"Port": 50052
},
"SelfServer": {
"Host": "0.0.0.0",
"Port": 8090
},
"Redis": {
"Host": "127.0.0.1",
"Port": 6380,
"Password": "123456"
},
"Mysql": {
"Host": "127.0.0.1",
"Port": 3306,
"Username": "root",
"Password": "123456",
"Database": "YjjChat"
}
}
3.2 监听TCP的服务端CServer
#ifndef __CSERVER_H__
#define __CSERVER_H__
#include "GlobalHead.h"
class CSession;
/*
CServer类主要负责监听端口是否有客户端发起连接,如果有将该连接转交给CSesssion类处理
*/
class CServer
{
public:
CServer(boost::asio::io_context& ioc, unsigned short& port);
void CleanSession(string& sessionID); // 清理掉已经断开连接的会话对象
private:
void HandleAccept(CSession* session, const boost::system::error_code& error);
void StartAccept();
private:
tcp::acceptor _acceptor; // 绑定端口并监听是否有TCP客户端连接进来
net::io_context& _ioc; // _ioc.run() 是一个事件循环,本身并不监听任何端口
short _port; // 监听的端口号
map<string, CSession*> _sessionMap; // 保存所有连接过的客户端的会话对象
mutex _mutex;
};
#endif // __CSERVER_H__
#include "CServer.h"
#include "AsioIOServicePool.h"
#include "CSession.h"
using namespace std;
CServer::CServer(boost::asio::io_context& ioc, unsigned short& port)
:_ioc(ioc), _port(port), _acceptor(ioc, tcp::endpoint(tcp::v4(), port))
{
StartAccept();
}
void CServer::CleanSession(string& sessionID)
{
_sessionMap.erase(sessionID);
}
void CServer::HandleAccept(CSession* session, const boost::system::error_code& error)
{
if (!error)
{
session->Start(); // 监听客户端发来的数据
lock_guard<mutex> lock(_mutex);
_sessionMap[session->GetSessionID()] = session;
}
else
{
cout << "CServer::HandleAccept error: " << error.message() << "\n";
}
StartAccept(); // 继续监听客户端的连接事件
}
/* 从事件循环池中获取一个IO服务对象,并注册监听事件,监听端口是否有新的TCP连接,如果有将该连接转移到会话对象的socket下进行监听客户端的读写事件 */
void CServer::StartAccept()
{
auto &io_context = AsioIOServicePool::GetInstance()->GetIOService();
CSession* session = new CSession(io_context, this);
_acceptor.async_accept(session->GetSocket(), bind(&CServer::HandleAccept, this, session, placeholders::_1));
}
3.3 与客户端通信的会话对象CSession
1. 当开始从套接字中监听数据时,即Start时
void CSession::Start()
{
AsyncReadHead(HEAD_TOTAL_LEN);
}
宏定义
#ifndef MACRO_H
#define MACRO_H
// 最大数据长度
#define MAX_LENGTH 1024*2
// 头部总长度
#define HEAD_TOTAL_LEN 4
//头部id长度
#define HEAD_ID_LEN 2
//头部数据长度
#define HEAD_DATA_LEN 2
#define MAX_RECVQUEUE 10000
#define MAX_SENDQUEUE 1000
#define LOCK_PREFIX "lock_"
#endif // MACRO_H
2. 监听客户端发来的消息头
将该长度的数据全部读取出来
void CSession::AsyncReadFull(size_t maxLength, function<void(const boost::system::error_code&, size_t)> handler)
{
::memset(_recvBuffer, 0, sizeof(_recvBuffer));
AsyncReadLen(0, maxLength, handler);
}
异步读取指定长度的数据
// read_len: 已经读取的长度
// total_len: 总长度
void AsyncReadLen(size_t read_len, size_t total_len, function<void(const boost::system::error_code&, size_t)> handler);
消息可能一次性不能读取完整,避免无法完整的读取消息,每次读取消息时都会调用回调函数检查,这里进行了回调函数的嵌套,在原先的回调函数上又嵌套了一个回调函数,只有当数据读取完整时才会去处理数据
/* 异步读取指定长度的字节,数据读取完毕后,调用handler回调函数 */
void CSession::AsyncReadLen(size_t read_len, size_t total_len, function<void(const boost::system::error_code&, size_t)> handler)
{
_socket.async_read_some(boost::asio::buffer(_recvBuffer + read_len, total_len - read_len),
[read_len, total_len, handler, this](const boost::system::error_code& ec, size_t bytesTransferred)
{
if (ec)
{
handler(ec, bytesTransferred);
return;
}
// bytesTransferred 实际读取的字节数
// 如果实际读取的字节数 + 已读取的字节数 >= 总字节数
if (read_len + bytesTransferred >= total_len)
{
handler(ec, bytesTransferred);
return;
}
// 递归读取,直到读取完指定的字节数为止
AsyncReadLen(read_len + bytesTransferred, total_len, handler);
});
}
处理数据读取成功的回调函数,这里是完整的读取了头部字节的回调函数,取出消息的ID和消息的长度,创建接受消息的容器(数据接受单元), 读取消息体,并将该消息体存储到容器中,消息ID和消息长度用成员变量的方式进行存储
AsyncReadFull(HEAD_TOTAL_LEN, [this](const boost::system::error_code& ec, size_t bytes_transferred)
{
if (ec)
{
cout << "handle read failed, error is: " << ec.what() << "\n";
Close();
DealExceptionSession();
_server->CleanSession(_sessionID);
return;
}
if (bytes_transferred < HEAD_TOTAL_LEN)
{
cout << "read length not match, read [" << bytes_transferred << "] bytes, but need [" << HEAD_TOTAL_LEN << "] bytes\n";
Close();
_server->CleanSession(_sessionID);
return;
}
_recvHeadNode->Clear();
memcpy(_recvHeadNode->_data, _recvBuffer, bytes_transferred);
// 获取头部MSGID数据
short msg_id = 0;
memcpy(&msg_id, _recvHeadNode->_data, sizeof(short));
// 网络字节序转化为本地字节序
msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);
cout << "msg_id is: " << msg_id << "\n";
// 检查ID是否合法
if (msg_id > MAX_LENGTH)
{
cout << "msg_id is not valid, msg_id is: " << msg_id << "\n";
_server->CleanSession(_sessionID);
return;
}
short msg_len = 0;
memcpy(&msg_len, _recvHeadNode->_data + sizeof(short), sizeof(short));
msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);
cout << "msg_len is: " << msg_len << "\n";
// 检查长度是否合法
if (msg_len > MAX_LENGTH)
{
cout << "msg_len is not valid, msg_len is: " << msg_len << "\n";
_server->CleanSession(_sessionID);
return;
}
_recvMsgNode = new RecvNode(msg_len, msg_id);
AsyncReadBody(msg_len);
});
处理数据读取成功后的回调函数,处理读取消息体的回调函数(该回调函数只会调用一次),数据一旦读取出错就断开连接,然后将数据存放到_recvMsgNode->_data中,最后将该数据接收结点,由逻辑处理类专门从队列中去处理,在接着读取下一条消息的头部,实现对客户端的请求的监听
void CSession::AsyncReadBody(int total_len)
{
AsyncReadFull(total_len, [this, total_len](const boost::system::error_code& ec, size_t bytes_transferred)
{
if (ec)
{
cout << "handle read failed, error is: " << ec.what() << "\n";
Close();
DealExceptionSession();
_server->CleanSession(_sessionID);
return;
}
if (bytes_transferred < total_len)
{
cout << "read length not match, read [" << bytes_transferred << "] bytes, but need [" << total_len << "] bytes\n";
Close();
_server->CleanSession(_sessionID);
return;
}
memcpy(_recvMsgNode->_data, _recvBuffer, bytes_transferred);
_recvMsgNode->_curLen += bytes_transferred;
cout << "recv msg is: " << _recvMsgNode->_data << "\n";
// 将数据添加到登录逻辑的消息队列中
LogicNode* ln = new LogicNode(this, _recvMsgNode);
LogicSystem::GetInstance()->PostMsgToQueue(ln);
AsyncReadHead(HEAD_TOTAL_LEN);
});
}
3.4 逻辑处理类LogicSystem
这个类应该被每个连接所独有才对,因为并发量一旦上来的话所有的客户端逻辑的处理全部都是在这个类中
1. 处理客户端发来的请求
void PostMsgToQueue(LogicNode* logicNode);
void LogicSystem::PostMsgToQueue(LogicNode* logicNode)
{
unique_lock<mutex> lock(_mutex);
_msgQueue.push(logicNode);
if (_msgQueue.size() >= 1)
{
lock.unlock();
_consumer.notify_one();
}
}
2. 创建一个线程专门处理数据,根据消息头部的ID来取出对应的回调函数,这个回调函数的权限很大,有客户端通道的权限,这样就没必要使用信号槽
如果是我写,我可能会在数据接受到时,直接调用该单例的DealMsg去处理数据,这样就需要CSession类和LogicSystem类高度耦合,造成需要LogicSystem需要返回处理结果交给CSession来进行数据传输,并且需要等待这个数据处理完毕才能够继续读取余下的数据。真正实现了每个类做自己的事情;
此时CSession在自己的线程中会接受来自客户端的请求,LogicSystem在处理数据时会发送数据给客户端,同一个指针被多一个线程调用;
对象创建在内存中,而线程运行在CPU上,当一个对象被创建时,在内存中会有确定的位置,线程是执行路径,可以访问内存中的对象,多个线程可以访问同一个对象,前提是不会造成数据竞争;
所以不仅不同职责的线程会出现资源竞争,同一种线程也会出现资源竞争,比如多个发送线程同时执行一个函数,就需要进行排队
LogicSystem::LogicSystem()
:bStop(false)
{
RegisterCallBack();
_workThread = thread(&LogicSystem::DealMsg, this);
}
void LogicSystem::DealMsg()
{
while (true)
{
unique_lock<mutex> lock(_mutex);
_consumer.wait(lock, [this] {return!_msgQueue.empty(); });
if (bStop)
{
while (!_msgQueue.empty())
{
ConsumerFunc();
}
break;
}
ConsumerFunc();
}
}
void LogicSystem::ConsumerFunc()
{
LogicNode* logicNode = _msgQueue.front();
short id = logicNode->_rn->_msg_id;
cout << "logNode ID : " << id << "\n";
auto it = _funMap.find(id);
if (it != _funMap.end())
{
it->second(logicNode->_cs, id, string(logicNode->_rn->_data, logicNode->_rn->_curLen));
}
_msgQueue.pop();
delete logicNode;
}
3. 处理登录请求
void LogicSystem::RegisterCallBack()
{
_funMap[MSG_CHAT_LOGIN] = bind(&LoginHandler,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
}
void LogicSystem::RegisterCallBack()
{
_funMap[MSG_CHAT_LOGIN] = bind(&LoginHandler,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
}
void LogicSystem::LoginHandler(CSession* session, const short& msg_id, const string& msg_data)
{
// 1.解析json数据
Json::Reader reader;
Json::Value jsonResult;
Json::Value jsonReturn;
reader.parse(msg_data, jsonResult);
// 2.向状态服务器请求登录
auto uid = jsonResult["uid"].asInt();
auto TokenStr = jsonResult["token"].asString();
cout << "user uid = " << uid << "Token = " << TokenStr << "\n";
auto response = StatusGrpcClient::GetInstance()->Login(uid, TokenStr);
// 3.当所有信息都处理完时,返回登录结果
ConnectionRAII conn([&jsonReturn, session]() {
string jsonReturnStr = jsonReturn.toStyledString();
session->Send(jsonReturnStr, MSG_CHAT_LOGIN_RESPONSE);
});
// 4.检查登录结果
jsonReturn["error"] = response.error();
if (response.error() != ErrorCodes::SUCCESS)
return;
// 5.从MySQL中获取用户信息
UserInfo* userInfo = nullptr;
auto it = _userMaps.find(uid);
if (it == _userMaps.end())
{
userInfo = MySqlManage::GetInstance()->GetUser(uid);
if (userInfo == nullptr)
{
jsonReturn["error"] = ErrorCodes::UID_INVALID;
return;
}
_userMaps[uid] = userInfo; // 存储所有的用户
}
else
{
userInfo = it->second;
}
jsonReturn["uid"] = uid;
jsonReturn["token"] = TokenStr;
jsonReturn["name"] = userInfo->name;
}
4. 客户端聊天界面
5. 总体逻辑的梳理
5.1 客户端发送登录请求
5.2 GateServer服务处理登录请求
5.3 客户端处理登录回应
5.4 聊天服务器处理客户端连接请求
消费者处理消息
如果是登录请求,向状态服务器请求用户信息
5.5 状态服务器处理聊天服务器的登录查询
5.6 聊天获取状态服务器的回应后
如果返回的响应状态是SUCCESS,说明查询成功,从数据库中获取用户的信息,并返回给客户端
5.7 客户端处理聊天服务器返回的登录响应
6. Bind和Lambda的用法
1. Bind利用已知的参数将函数绑定成指定的类型
#include <functional>
#include <iostream>
using namespace std;
using Func = function<void(bool)>;
void TestBind(int Num, Func func)
{
cout << "TestBind: " << Num << endl;
func(true);
}
void TestFunc(int Num, bool b)
{
if (b)
{
cout << "TestFunc: true, Num = " << Num << endl;
}
else
{
cout << "TestFunc: false, Num = " << Num << endl;
}
}
void TestFuncB(bool b, int Num)
{
if (b)
{
cout << "TestFuncB: true, Num = " << Num << endl;
}
else
{
cout << "TestFuncB: false, Num = " << Num << endl;
}
}
int main()
{
TestBind(1, bind(TestFunc, 5,placeholders::_1));
TestBind(2, bind(TestFuncB, placeholders::_1, 5));
return 0;
}
2. Lambda
编辑器为每个lambda表达式生成一个唯一的匿名类,这个类重载了operator()运算符;
捕获的变量成为该类的成员变量;
lambda的函数体成为该类的operator()的函数体;
在lambda定义处,创建一个该类的对象(闭包对象),跟函数对象的效果等同;
该闭包对象可以像函数一样被调用,也可以像普通对象一样被拷贝,移动;
// 编译器生成独特的闭包类型名
class __CServer_lambda_1 {
public:
// 构造函数初始化捕获变量
__CServer_lambda_1(CServer* this_capture, CSession* session_capture)
: __this(this_capture), __session(session_capture) {}
// 函数调用运算符重载(核心)
template<typename T>
void operator()(T&& ec) const {
__this->HandleAccept(__session, std::forward<T>(ec));
}
private:
CServer* __this; // 捕获的this指针
CSession* __session; // 捕获的session指针
};