目录
3. 重新理解 read, write, recv, send 以及 TCP 为什么支持全双工
1. 应用层自定义协议概述
应用层自定义协议是指在应用层根据特定业务需求自行设计的通信规则,用于实现不同系统、设备或程序之间的数据交互。它不依赖于 HTTP、FTP 等标准协议,而是针对场景(如物联网、实时通信、工业控制等)定制,具有灵活性高、针对性强、效率可控的特点。
应用层自定义协议是解决特定场景通信需求的有效手段,其核心价值在于按需定制和性能优化。设计时需平衡功能完整性与实现复杂度,优先考虑业务需求、兼容性和可维护性。对于通用场景,建议优先使用成熟的标准协议(如 MQTT、HTTP),以降低开发成本和对接难度。
在之前的网络传输中,读写数据时都是按字符串的方式来发送接收的,那么如果要传输一些结构化的数据应该怎么办呢?其实,之前的 UDP 和 TCP 通信都是在传输层,我们可以在应用层规定协议,而在应用层的协议就是双方约定好的结构化的数据。
在应用层通过序列化将结构化数据转化为字符串给到传输层进行通信,而传输层接收到数据通过反序列化成为结构化数据给到应用层。
2. 序列化和反序列化
序列化(Serialization):将内存中对象/数据结构转换为字节流(二进制或文本格式),便于存储到文件 / 数据库或通过网络传输。让数据脱离内存环境后仍能被“复原”。
反序列化(Deserialization):将字节流重新转换为内存中的对象 / 数据结构,是序列化的逆过程。
上述由 message,time,nickname,组成的结构体就叫做自定义协议,将结构体转化为字符串成为序列化,将字符串转化为结构体成为反序列化。
知识点1:
为什么不直接传结构体数据?
传结构体数据理论上是可以的,但是在 C/C++ 中结构体存在内存对齐问题,每个操作系统的对齐方式可以不一样,所以传输的结构体大小可能和接收的结构体大小不一样。其次如果服务器和客户端用不同的语言进行编写,在 C++ 中有 class 类,但是在 python 中就不是 C++ 中的 class 类的,所以无法使用统一的结构体作为协议进行通信。
3. 重新理解 read, write, recv, send 以及 TCP 为什么支持全双工
(1)write 和 read 函数只是将数据从上层拷贝到 TCP 套接字的发送缓冲区以及从 TCP 套接字的接收缓冲区拷贝到上层,而 TCP 自主决定数据什么时候发送,发多少以及出错了怎么办。
(2)主机间通信的本质:把发送方的发送缓冲区的数据拷贝到接收方的接收缓冲区。
(3)因为 TCP 有发送缓冲区和接收缓冲区,发送和接收互不影响,所以 TCP 在通信的时候就会建立一条双向可靠通道(类似双向车道),同一时刻可以进行发送和接收。
4.Jsoncpp
Jsoncpp 是一个用于处理 JSON 数据的 C++库。它提供了将 JSON 数据序列化为字符串以及从字符串反序列化为 JSON 数据的功能。
在 Ubuntu 中使用一下命令进行安装:
sudo apt-get install libjsoncpp-dev
4.1 Jsoncpp 的特性
(1)简单易用:Jsoncpp 提供了直观的 API,使得处理 JSON 数据变得简单。
(2)高性能:Jsoncpp 的性能经过优化,能够高效地处理大量 JSON 数据。
(3)全面支持:支持 JSON 标准中的所有数据类型,包括对象、数组、字符串、数字、布尔值和 null。
(4)错误处理:在解析 JSON 数据时,Jsoncpp 提供了详细的错误信息和位置,方便开发者调试。
4.2 使用 Jsoncpp 的序列化和反序列化
序列化指的是将数据结构或对象转换为一种格式,以便在网络上传输或存储到文件中。下面介绍 Jsoncpp 中一种序列化的方式 Json::Fastwrite。
#include <iostream>
#include <string>
#include <sstream>
#include <memory>
#include <jsoncpp/json/json.h>
int main()
{
Json::Value root;
root["name"] = "joe";
root["sex"] = "男";
Json::FastWriter writer;
std::string s = writer.write(root);
std::cout << s << std::endl;
return 0;
}
$ ./test.exe
{"name":"joe","sex":"男"}
反序列化是序列化的逆过程。使用 Json::Reader 进行反序列化。
#include <iostream>
#include <string>
#include <jsoncpp/json/json.h>
int main()
{
// JSON 字符串
std::string json_string = "{\"name\":\"张三\",
\"age\":30, \"city\":\"北京\"}";
// 解析 JSON 字符串
Json::Reader reader;
Json::Value root;
// 从字符串中读取 JSON 数据
bool parsingSuccessful = reader.parse(json_string,
root);
if (!parsingSuccessful) {
// 解析失败,输出错误信息
std::cout << "Failed to parse JSON: " <<
reader.getFormattedErrorMessages() << std::endl;
return 1;
}
// 访问 JSON 数据
std::string name = root["name"].asString();
int age = root["age"].asInt();
std::string city = root["city"].asString();
// 输出结果
std::cout << "Name: " << name << std::endl;
std::cout << "Age: " << age << std::endl;
std::cout << "City: " << city << std::endl;
return 0;
}
$ ./test.exe
Name: 张三
Age: 30
City: 北京
4.3 Json::Value 的介绍
Json::Value 是 Jsoncpp 库中的一个重要类,用于表示和操作 JSON 数据结构。以下是一些常用的 Json::Value 操作列表。
4.3.1 构造函数
(1)Json::Value():默认构造函数,创建一个空的 Json::Value 对象。
(2)Json::Value(ValueType type, bool allocated = false):根据给定的ValueType(如 nullValue, intValue, stringValue 等)创建一个 Json::Value 对象。
4.3.2 访问元素
(1)Json::Value& operator[](const char* key):通过键(字符串)访问对象中的元素。如果键不存在,则创建一个新的元素。
(2)Json::Value& operator[](const std::string& key):同上,但使用std::string 类型的键。
(3)Json::Value& operator[](ArrayIndex index):通过索引访问数组中的元素。如果索引超出范围,则创建一个新的元素。
(4)Json::Value& at(const char* key):通过键访问对象中的元素,如果键不存在则抛出异常。
(5)Json::Value& at(const std::string& key):同上,但使用 std::string类型的键。
4.3.3 类型检查
(1)bool isNull():检查值是否为 null。
(2)bool isBool():检查值是否为布尔类型。
(3)bool isInt():检查值是否为整数类型。
(4)bool isInt64():检查值是否为 64 位整数类型。
(5)bool isUInt():检查值是否为无符号整数类型。
(6)bool isUInt64():检查值是否为 64 位无符号整数类型。
(7)bool isIntegral():检查值是否为整数或可转换为整数的浮点数。
(8)bool isDouble():检查值是否为双精度浮点数。
(9)bool isNumeric():检查值是否为数字(整数或浮点数)。
(10)bool isString():检查值是否为字符串。
(11)bool isArray():检查值是否为数组。
(12)bool isObject():检查值是否为对象(即键值对的集合)。
4.3.4 赋值和类型转换
- Json::Value& operator=(bool value):将布尔值赋给 Json::Value 对象。
- Json::Value& operator=(int value):将整数赋给 Json::Value 对象。
- Json::Value& operator=(unsigned int value):将无符号整数赋给 Json::Value 对象。
- Json::Value& operator=(Int64 value):将 64 位整数赋给 Json::Value对象。
- Json::Value& operator=(UInt64 value):将 64 位无符号整数赋给 Json::Value 对象。
- Json::Value& operator=(double value):将双精度浮点数赋给 Json::Value 对象。
- Json::Value& operator=(const char* value):将 C 字符串赋给 Json::Value 对象。
- Json::Value& operator=(const std::string& value):将 std::string 赋给 Json::Value 对象。
- bool asBool():将值转换为布尔类型(如果可能)。
- int asInt():将值转换为整数类型(如果可能)。
- Int64 asInt64():将值转换为 64 位整数类型(如果可能)。
- unsigned int asUInt():将值转换为无符号整数类型(如果可能)。
- UInt64 asUInt64():将值转换为 64 位无符号整数类型(如果可能)。
- double asDouble():将值转换为双精度浮点数类型(如果可能)。
- std::string asString():将值转换为字符串类型(如果可能)。
5. 代码 demo -- 网络版计算器
为了更好的进行日志信息的查看,先给出需要使用的互斥锁的封装模块 Mutex.hpp 和线程安全的日志模块 Log.hpp,参考Linux 的 UDP 网络编程 -- 回显服务器,翻译服务器。
Common.hpp 该源文件中包含了整个项目所使用的通用的头文件,宏定义,结构体。
// Common.hpp
#pragma once
#include <iostream>
#include <memory>
#include <functional>
#include <unistd.h>
#include <string>
#include <cstring>
#include <signal.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "Log.hpp"
using namespace LogModule;
// 强转 struct sockaddr_in * 为 struct sockaddr * 的宏
#define CONV(addr) ((struct sockaddr*)&addr)
// 将各种错误的错误码用一个枚举类型表示
enum EixtCode
{
OK,
USAGE_ERR,
SOCKET_ERR,
BIND_ERR,
LISTEN_ERR,
CONNECT_ERR,
FORK_ERR,
OPEN_ERR
};
// 没有拷贝构造和赋值重载的基类
class NoCopy
{
public:
NoCopy(){}
~NoCopy(){}
NoCopy(const NoCopy &) = delete;
const NoCopy &operator=(const NoCopy&) = delete;
};
InetAddr.hpp 源文件对其网络地址进行封装,并且提供网络地址序列和主机地址序列相互转换的方法。
// InetAddr.hpp
#pragma once
#include "Common.hpp"
class InetAddr
{
public:
InetAddr() {};
// 使用套接字创建对象的构造函数
InetAddr(struct sockaddr_in &addr)
{
SetAddr(addr);
}
// 使用主机序列创建的构造函数
InetAddr(std::string &ip, uint16_t port) : _ip(ip), _port(port)
{
memset(&_addr, 0, sizeof(_addr));
_addr.sin_family = AF_INET;
_addr.sin_port = htons(_port);
inet_pton(AF_INET, _ip.c_str(), &_addr.sin_addr);
}
// 仅使用端口号创建,ip 设为 INADDR_ANY
InetAddr(uint16_t port) : _port(port), _ip()
{
memset(&_addr, 0, sizeof(_addr));
_addr.sin_family = AF_INET;
_addr.sin_port = htons(_port);
_addr.sin_addr.s_addr = INADDR_ANY;
}
void SetAddr(struct sockaddr_in &addr)
{
_addr = addr;
_port = ntohs(_addr.sin_port);
char ipbuffer[64];
inet_ntop(AF_INET, &_addr.sin_addr, ipbuffer, sizeof(_addr));
_ip = ipbuffer;
}
uint16_t Port() { return _port; }
std::string Ip() { return _ip; }
const struct sockaddr_in &NetAddr() { return _addr; }
const struct sockaddr *NetAddrPtr() { return CONV(_addr); }
socklen_t NetAddrLen() { return sizeof(_addr); }
bool operator==(const InetAddr &addr) { return addr._ip == _ip && addr._port == _port; }
std::string StringAddr() { return _ip + ":" + std::to_string(_port); }
~InetAddr() {}
private:
struct sockaddr_in _addr;
std::string _ip;
uint16_t _port;
};
5.1 Socket 封装
// Socket.hpp
#pragma once
#include "Common.hpp"
#include "InetAddr.hpp"
namespace SocketModule
{
const static int gbacklog = 16;
// 模板方法模式
// 套接字基类
class Socket
{
public:
virtual ~Socket() {}
virtual void SocketOrDie() = 0;
virtual void Bind(uint16_t port) = 0;
virtual void Listen(int backlog) = 0;
virtual std::shared_ptr<Socket> Accept(InetAddr *client) = 0;
virtual void Close() = 0;
virtual int Recv(std::string *out) = 0;
virtual int Send(const std::string &message) = 0;
virtual int Connect(std::string &server_ip, uint16_t server_port) = 0;
// 使用该函数进行 TCP 服务器套接字的创建和初始化
void BuildTcpSocketMethod(uint16_t port, int backlog = gbacklog)
{
SocketOrDie();
Bind(port);
Listen(backlog);
}
// 使用该函数进行 TCP 客户端套接字的创建
void BuildTcpClientSocketMethod()
{
SocketOrDie();
}
};
const static int defaultfd = -1;
// TCP 套接字子类
class TcpSocket : public Socket
{
public:
TcpSocket(int fd = defaultfd) : _sockfd(fd) {}
~TcpSocket() {}
void SocketOrDie() override
{
_sockfd = ::socket(AF_INET, SOCK_STREAM, 0);
if (_sockfd < 0)
{
LOG(LogLevel::FATAL) << "socket error";
exit(SOCKET_ERR);
}
LOG(LogLevel::INFO) << "socket success";
}
void Bind(uint16_t port) override
{
InetAddr localAddr(port);
int n = ::bind(_sockfd, localAddr.NetAddrPtr(), localAddr.NetAddrLen());
if (n < 0)
{
LOG(LogLevel::FATAL) << "bind error";
exit(BIND_ERR);
}
LOG(LogLevel::INFO) << "bind success";
}
void Listen(int backlog) override
{
int n = ::listen(_sockfd, backlog);
if (n < 0)
{
LOG(LogLevel::FATAL) << "listen error";
exit(LISTEN_ERR);
}
LOG(LogLevel::INFO) << "listen success";
}
std::shared_ptr<Socket> Accept(InetAddr *client) override
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int fd = ::accept(_sockfd, CONV(peer), &len);
if (fd < 0)
{
LOG(LogLevel::WARNING) << "accept warning...";
return nullptr;
}
client->SetAddr(peer);
LOG(LogLevel::INFO) << "accept success, " << client->StringAddr() << " sockfd: " << fd;
return std::make_shared<TcpSocket>(fd);
}
int Connect(std::string &server_ip, uint16_t server_port) override
{
InetAddr server(server_ip, server_port);
return ::connect(_sockfd, server.NetAddrPtr(), server.NetAddrLen());
}
void Close() override
{
if (_sockfd > 0)
::close(_sockfd);
}
int Recv(std::string *out) override
{
char buffer[1024];
ssize_t n = ::recv(_sockfd, buffer, sizeof(buffer) - 1, 0);
if (n > 0)
{
buffer[n] = 0;
*out += buffer;
}
return n;
}
int Send(const std::string &message) override
{
return send(_sockfd, message.c_str(), message.size(), 0);
}
private:
int _sockfd;
};
}
5.2 应用层协议定制
使用 Json 进行序列化和反序列化。一个通过网络传输的报文如下图:
// Protocol.hpp
#pragma once
#include "Socket.hpp"
#include <jsoncpp/json/json.h>
using namespace SocketModule;
// 使用 Json 实现序列化和反序列化
// client->server
class Request
{
public:
Request() {}
~Request() {}
Request(int x, int y, char oper)
: _x(x),
_y(y),
_oper(oper)
{
}
// 序列化
std::string Serialize()
{
Json::Value root;
root["x"] = _x;
root["y"] = _y;
root["oper"] = _oper;
Json::FastWriter write;
std::string s = write.write(root);
return s;
}
// 反序列化
bool Deserialize(std::string &in)
{
Json::Value root;
Json::Reader reader;
bool ok = reader.parse(in, root);
if (ok)
{
_x = root["x"].asInt();
_y = root["y"].asInt();
_oper = root["oper"].asInt();
}
return ok;
}
int X() { return _x; }
int Y() { return _y; }
char Oper() { return _oper; }
private:
int _x;
int _y;
char _oper;
};
// server->client
class Response
{
public:
Response() {}
~Response() {}
Response(int result, int code)
: _result(result),
_code(code)
{
}
std::string Serialize()
{
Json::Value root;
root["result"] = _result;
root["code"] = _code;
Json::FastWriter writer;
std::string s = writer.write(root);
return s;
}
bool Deserialize(std::string &in)
{
Json::Value root;
Json::Reader reader;
bool ok = reader.parse(in, root);
if (ok)
{
_result = root["result"].asInt();
_code = root["code"].asInt();
}
return ok;
}
void SetResult(int res)
{
_result = res;
}
void SetCode(int code)
{
_code = code;
}
void ShowResult()
{
std::cout << "计算结果: " << _result << "[" << _code << "]" << std::endl;
}
private:
int _result;
int _code;
};
const std::string sep = "\r\n";
using func_t = std::function<Response(Request &req)>;
class Protocol
{
public:
Protocol() {}
~Protocol() {}
Protocol(func_t func) : _func(func) {}
// 应用层封装报头
std::string Encode(std::string &jsonstr)
{
std::string len = std::to_string(jsonstr.size());
return len + sep + jsonstr + sep;
}
// 应用层解析报文
// 1. 判断报文完整性
// 2. 如果包含至少一个有效报文,提取并从字符串中移除
bool Decode(std::string &buffer, std::string *package)
{
ssize_t pos = buffer.find(sep);
if (pos == std::string::npos)
return false;
std::string package_len_str = buffer.substr(0, pos);
int package_len_int = std::stoi(package_len_str);
int target_len = package_len_str.size() + package_len_int + 2 * sep.size();
if (buffer.size() < target_len)
{
return false;
}
*package = buffer.substr(pos + sep.size(), package_len_int);
buffer.erase(0, target_len);
return true;
}
// 服务端获取请求,处理后并返回应答
void GetRequest(std::shared_ptr<Socket> &sock, InetAddr &client)
{
std::string buffer_queue;
while (true)
{
// 1. 从网络中读取数据
int n = sock->Recv(&buffer_queue);
if (n > 0)
{
std::string json_package;
// 2. 解析报文,提取一个完整报文,如果不完整,服务器继续读取
while(Decode(buffer_queue, &json_package))
{
// 解析成功
LOG(LogLevel::DEBUG) << client.StringAddr() << " 请求: " << json_package;
// 3. 请求 Json 串反序列化
Request req;
bool ok = req.Deserialize(json_package);
if (!ok)
continue;
// 4. 通过回调函数处理业务
Response resp = _func(req);
// 5. 对应答结构体进行序列化
std::string json_str = resp.Serialize();
// 6. 添加应用层报头构成完整报文
std::string send_str = Encode(json_str);
// 7. 将应答发送到网络中
sock->Send(send_str);
}
}
else if (n == 0)
{
LOG(LogLevel::INFO) << "client: " << client.StringAddr() << " quit!";
break;
}
else
{
LOG(LogLevel::WARNING) << "client: " << client.StringAddr() << ", recv error";
break;
}
}
}
// 客户端获取应答
bool GetResponse(std::shared_ptr<Socket> &client, std::string &resp_buf, Response *resp)
{
while (true)
{
int n = client->Recv(&resp_buf);
if (n > 0)
{
std::string json_package;
while(Decode(resp_buf, &json_package))
{
resp->Deserialize(json_package);
}
return true;
}
else if (n == 0)
{
std::cout << "server quit! " << std::endl;
return false;
}
else
{
std::cout << "recv error!" << std::endl;
return false;
}
}
}
private:
func_t _func;
};
5.3 网络计算器的上层业务封装
#pragma once
#include <iostream>
#include "Protocol.hpp"
class Cal
{
public:
Response Execute(Request &req)
{
Response resp(0, 0);
switch (req.Oper())
{
case '+':
resp.SetResult(req.X() + req.Y());
break;
case '-':
resp.SetResult(req.X() - req.Y());
break;
case '*':
resp.SetResult(req.X() * req.Y());
case '/':
if (req.Y() != 0)
resp.SetResult(req.X() / req.Y());
else
resp.SetCode(1);
break;
case '%':
if (req.Y() != 0)
resp.SetResult(req.X() % req.Y());
else
resp.SetCode(2);
break;
default:
resp.SetCode(3);
break;
}
return resp;
}
};
5.4 服务器端
// TcpServer.hpp
#pragma once
#include "Socket.hpp"
using namespace SocketModule;
using ioservice_t = std::function<void(std::shared_ptr<Socket> &, InetAddr &)>;
class TcpServer
{
public:
TcpServer() {}
TcpServer(uint16_t port, ioservice_t service)
: _port(port),
_service(service),
_isrunning(false),
_listensocketptr(std::make_unique<TcpSocket>())
{
_listensocketptr->BuildTcpSocketMethod(_port);
}
void Start()
{
_isrunning = true;
while (_isrunning)
{
InetAddr client;
auto sock = _listensocketptr->Accept(&client);
if (sock == nullptr)
continue;
pid_t id = fork();
if (id < 0)
{
LOG(LogLevel::FATAL) << "fork error";
exit(FORK_ERR);
}
else if (id == 0)
{
// 子进程
_listensocketptr->Close();
if (fork() > 0)
exit(OK);
// 孙子进程
_service(sock, client);
exit(OK);
}
else
{
// 父进程
sock->Close();
pid_t rid = ::waitpid(id, nullptr, 0);
(void)rid;
}
}
_isrunning = false;
}
~TcpServer() {}
private:
uint16_t _port;
std::unique_ptr<Socket> _listensocketptr;
bool _isrunning;
ioservice_t _service;
};
// TcpServer.cc
#include "TcpServer.hpp"
#include "NetCal.hpp"
void Usage(std::string proc)
{
std::cerr << "Usage: " << proc << " port" << std::endl;
}
int main(int argc, char *argv[])
{
if (argc != 2)
{
Usage(argv[0]);
exit(USAGE_ERR);
}
uint16_t port = std::stoi(argv[1]);
// Enable_File_Log_Strategy();
Enable_Console_Log_Strategy();
// 1. 应用层业务
std::unique_ptr<Cal>
cal = std::make_unique<Cal>();
// 2. 协议层
std::unique_ptr<Protocol> protocol = std::make_unique<Protocol>([&cal](Request &req) -> Response
{ return cal->Execute(req); });
// 3. 服务器层
std::unique_ptr<TcpServer> tsvr = std::make_unique<TcpServer>(port, [&protocol](std::shared_ptr<Socket> &sock, InetAddr &client)
{ protocol->GetRequest(sock, client); });
tsvr->Start();
return 0;
}
5.5 客户端
// TcpClient.cc
#include "NetCal.hpp"
#include "Socket.hpp"
void Usage(std::string proc)
{
std::cerr << "Usage: " << proc << " server_ip server_port" << std::endl;
}
void GetDataFromStdin(int *x, int *y, char *oper)
{
std::cout << "Please Enter x: ";
std::cin >> *x;
std::cout << "Please Enter y: ";
std::cin >> *y;
std::cout << "Please Enter oper: ";
std::cin >> *oper;
}
// ./tcpclient server_ip server_port
int main(int argc, char *argv[])
{
// 进行命名行参数检测,并获取服务器 ip 和 port
if (argc != 3)
{
Usage(argv[0]);
exit(USAGE_ERR);
}
std::string server_ip = argv[1];
uint16_t server_port = std::stoi(argv[2]);
// 2. 创建客户端套接字并连接服务器
std::shared_ptr<Socket> client = std::make_unique<TcpSocket>();
client->BuildTcpClientSocketMethod();
if (client->Connect(server_ip, server_port) < 0)
{
// 连接失败
std::cerr << "connect error" << std::endl;
exit(CONNECT_ERR);
}
// 3. 从标准输入获取数据并发送给服务器并接收应答显示结果
std::unique_ptr<Protocol> protocol = std::make_unique<Protocol>();
std::string resp_buffer;
while (true)
{
// 3.1. 从标准输入中获取数据
int x, y;
char oper;
GetDataFromStdin(&x, &y, &oper);
// 3.2. 构建一个请求
Request req(x, y, oper);
std::string req_json_str = req.Serialize();
std::string req_str = protocol->Encode(req_json_str);
// 3.3. 发送请求
client->Send(req_str);
// 3.4. 获取应答
Response resp;
bool res = protocol->GetResponse(client, resp_buffer, &resp);
if (res == false)
break;
// 3.5. 显示结果
resp.ShowResult();
}
client->Close();
return 0;
}
5.6 代码结果
客户端结果,"[]" 中如果不为 0,则表示结果异常,为 1 表示除 0 错误,为 2 表示模 0 错误,为 3 表示非法符号。
服务端结果: