目录
Protocol.hpp 协议 序列化反序列化 结构化数据格式规定
引子:
我们在上一篇文章中以及提到,write,read这些接口本质上起到的是一个拷贝的作用。
而以发送为例,发送缓冲区什么时候发,发多少,出错了该怎么办,这些都是由tcp决定的,这也是为什么tcp被称为传输控制协议。
tcp是操作系统网络模块的部分,是被写到源代码中的。我们把数据交给tcp就相当于把数据交给操作系统。整体上的逻辑和文件操作是非常类似的。
但是对于接收方来讲,我们不能保证每次接收的数据都是 一个完整的报文,有可能一次有非常多的报文被读上来,也有可能有的报文只被读上来部分,因此这就需要我们在应用层定制协议以及序列反序列化了。
我们程序员写的一个个解决我们实际问题, 满足我们日常需求的网络程序, 都是在应用层
一、再谈 "协议"
协议是一种 "约定". socket api的接口, 在读写数据时, 都是按 "字符串" 的方式来发送接收的. 如果我们要传输一些"结构化的数据" 怎么办呢?
二、自定义协议与网络版计算器
例如, 我们需要实现一个服务器版的加法器. 我们需要客户端把要计算的两个加数发过去, 然后由服务器进行计算, 最后再把结果返回给客户端
1.约定方案一:
客户端发送一个形如"1+1"的字符串;
这个字符串中有两个操作数, 都是整形;
两个数字之间会有一个字符是运算符, 运算符只能是 + ;
数字和运算符之间没有空格;
2.约定方案二:
定义结构体来表示我们需要交互的信息;
发送数据时将这个结构体按照一个规则转换成字符串, 接收到数据的时候再按照相同的规则把字符串转化回结构体;
这个过程叫做 "序列化" 和 反序列化
但是我们现在不推荐直接发送结构体对象,例如同一个结构体在不同的编译器编译,其大小有可能不一样(结构体内存对齐)
不过Linux内核定义协议的时候用的就是互传结构体对象来做的,但是也做了非常多的后续工作来保证这种办法可行。
我们网络版本的计算器大致思路如下,我们输入两个数以及符号,对方经过计算返回结果和标记位,标记位用于记录结果是否有效,如果无效则它会提供错误信息给我们
这里的两个结构体对象就是属于发送方和接收方的约定内容。
我们再举一个例子,例如这里群聊信息的发送,双方约定好一个结构体对象,我们输入相应的内容进入结构体,然后将其转为 一个字符串进行发送,发送完成后再将这个字符串进行解析,还原为结构体,然后通过结构体对象来取得结果。
其中,结构体中的多个字符串转化为一个字符串被称为序列化,而一个字符串还原成多个字符串被称为反序列化。序列和反序列化主要是方便网络进行收发。
涉及到协议的定制,我们只需要知道定制怎么样的结构化数据就可以,然后再想办法把这个结构化数据整合成一个大字符串。但是接收方又怎么知道收到的字符串是否是完整的一个报文呢?
那么就需要我们自行加上一些将报文分隔的分隔符了。
3.我们采用的协议
同一个数字内是不可能出现空格的,因此我们把空格作为分隔数字与字符的标志。同时我们再以“\n”分隔不同的报文。这样其实就满足我们的需求了,但是这里我们还想添加一个长度的报头(这个长度是报文不包含\n的长度)。添加长度报头和添加“\n”都能满足我们的协议需求,但是为了后面调试方便,我们都采用。有了报文长度后,报文后的那个“\n”其实是可以不用添加的,但是添加之后我们调试时现象更明显,打印的时候会分行
以及更进一步我们可以添加protocal select报头,我们可以根据不同的protocal select采取不同的协议,1是整数,2是浮点数等等,这里先不采用,只用长度报文。
注意,本协议中采取“\n”作为分隔的标志,是因为我们计算器中的xopy形式是不可能存在“\n”的,因此可行。
三、网络计算器代码
Log.hpp 日志
#pragma once
#include <iostream>
#include<time.h>
#include<stdarg.h>
#include <fcntl.h>
#include<unistd.h>
#define SIZE 1024
#define Info 0
#define Debug 1
#define Warning 2
#define Error 3
#define Fatal 4
#define Screen 1
#define Onefile 2
#define Classfile 3
class Log
{
public:
Log()
{
printMethod = Screen;
path = "./log/";
}
void Enable(int method)
{
printMethod = method;
}
std::string levelToString(int level)
{
switch(level)
{
case Info: return "Info";
case Debug: return "Debug";
case Warning: return "Warning";
case Error :return "Error";
case Fatal :return "Fatal";
default: return "None";
}
}
// void logmessage(int level,const char *format, ...)
// {
// time_t t = time(nullptr);//时间戳
// struct tm *ctime = localtime(&t);//用时间戳得到一个结构体,可以从里面取年月日时分秒
// char leftbuffer[SIZE];
// snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]", levelToString(level).c_str(),
// ctime->tm_year+1900, ctime->tm_mon+1,ctime->tm_mday,
// ctime->tm_hour, ctime->tm_min, ctime->tm_sec);//把字符串存到leftbuffer里面
// va_list s;
// va_start(s, format);
// char rightbuffer[SIZE];
// vsnprintf(rightbuffer, sizeof(rightbuffer),format, s);//用这个库函数我们就不用自己作字符串解析了
// //格式 默认部分(左)+自定义部分(右)
// char logtxt[SIZE*2];
// snprintf(logtxt, sizeof(logtxt),"%s %s\n", leftbuffer, rightbuffer);
// printLog(level, logtxt);//暂时打印
// }
void printLog(int level, std::string logtxt)
{
switch (printMethod)
{
case Screen:
std::cout << logtxt << std:: endl;
break;
case Onefile:
printOneFile("LogFile" ,logtxt);
break;
case Classfile:
printClassFile(level, logtxt);
default:
break;
}
}
void printOneFile(const std:: string logname, const std::string logtxt)
{
std::string _logname = path + logname;
int fd = open(_logname.c_str(), O_WRONLY|O_CREAT|O_APPEND, 0666);//LogFile
if(fd < 0)return;
write(fd, logtxt.c_str(), logtxt.size());
close(fd);
}
void printClassFile(int level, const std::string logtxt)
{
std::string filename = "LogFile";
filename += ".";
filename += levelToString(level);//LogFile.Debug/Warning/Fatal
printOneFile(filename, logtxt);
}
~Log()//这里析构只是为了让类看起来完整
{
}
void operator()(int level,const char *format, ...)
{
time_t t = time(nullptr);//时间戳
struct tm *ctime = localtime(&t);//用时间戳得到一个结构体,可以从里面取年月日时分秒
char leftbuffer[SIZE];
snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]", levelToString(level).c_str(),
ctime->tm_year+1900, ctime->tm_mon+1,ctime->tm_mday,
ctime->tm_hour, ctime->tm_min, ctime->tm_sec);//把字符串存到leftbuffer里面
va_list s;
va_start(s, format);
char rightbuffer[SIZE];
vsnprintf(rightbuffer, sizeof(rightbuffer),format, s);//用这个库函数我们就不用自己作字符串解析了
//格式 默认部分(左)+自定义部分(右)
char logtxt[SIZE*2];
snprintf(logtxt, sizeof(logtxt),"%s %s", leftbuffer, rightbuffer);
printLog(level, logtxt);
}
private:
int printMethod;
std :: string path;
};
Log lg;
Makefile
.PHONY:all
all:servercal clientcal
servercal:ServerCal.cc
g++ -o $@ $^ -std=c++11
clientcal:ClientCal.cc
g++ -o $@ $^ -std=c++11 -g
.PHONY: clean
clean:
rm -f clientcal servercal
Socket.hpp 套接字封装
#pragma
#include <iostream>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <cstring>
#include "Log.hpp"
enum
{
SocketErr = 2,
BindErr,
ListenErr
};
const int backlog = 10;
class Sock
{
public:
Sock()
{
}
~Sock()
{
}
public:
void Socket()
{
sockfd_ = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd_ < 0)
{
lg(Fatal, "socker error, %s: %d", strerror(errno), errno);
exit(SocketErr);
}
}
void Bind(uint16_t port)
{
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = INADDR_ANY;
if (bind(sockfd_, (struct sockaddr *)&local, sizeof(local)) < 0)
{
lg(Fatal, "socker error, %s: %d", strerror(errno), errno);
exit(SocketErr);
}
}
void Listen()
{
if (listen(sockfd_, backlog) < 0)
{
lg(Fatal, "bind error, %s: %d", strerror(errno), errno);
exit(ListenErr);
}
}
int Accept(std::string *clientip, uint16_t *clientport)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int newfd = accept(sockfd_, (struct sockaddr*)& peer, &len);
if(newfd < 0)
{
lg(Warning, "listen error, %s: %d", strerror(errno), errno);
return -1;
}
char ipstr[64];
inet_ntop(AF_INET, &peer.sin_addr, ipstr, sizeof(ipstr));
*clientip = ipstr;
*clientport = ntohs(peer.sin_port);
return newfd;
}
int Connect(const std::string &ip, const uint16_t &port)
{
struct sockaddr_in peer;
memset(&peer,0,sizeof(peer));
peer.sin_family = AF_INET;
peer.sin_port = htons(port);
inet_pton(AF_INET, ip.c_str(), &(peer.sin_addr));
int n = connect(sockfd_, (struct sockaddr*)&peer, sizeof(peer));
if(n == -1)
{
std::cerr << "connect to " << ip << ";" << port << "error" << std::endl;
return false;
}
return true;
}
void Close()
{
close(sockfd_);
}
int Fd()
{
return sockfd_;
}
private:
int sockfd_;
};
Protocol.hpp 协议 序列化反序列化 结构化数据格式规定
#pragma
#include <iostream>
const std::string blank_space_sep = " ";
const std::string protocol_sep = "\n";
std::string Encode(std::string &content)
{
std::string package = std::to_string(content.size());
package += protocol_sep;
package += content;
package += protocol_sep;
return package;
}
bool Decode(std::string &package, std::string *content)
{
std::size_t pos = package.find(protocol_sep);
if(pos == std::string::npos)return false;
std::string len_str = package.substr(0, pos);
std::size_t len = std::stoi(len_str);
//package = len_str + content_str + 2
std::size_t total_len = len_str.size() + len + 2;
if(package.size() < total_len)return false;
*content = package.substr(pos+1, len);
//接收到报文后将其移除
package.erase(0,total_len);
return true;
}
class Request
{
public:
Request()
{}
Request(int data1, int data2, char oper):x(data1),y(data2),op(oper)
{}
public:
bool Serialize(std::string *out)
{
//struct => string "x op y" 一个数字内部不可能有空格,因此使用空格作为分隔符
//构建报文有效载荷
std::string s = std::to_string(x);
s += blank_space_sep;
s += op;
s += blank_space_sep;
s += std::to_string(y);
*out = s;
return true;
}
bool Deserialize(const std::string & in)//"x op y"
{
std::size_t left = in.find(blank_space_sep);
if(left == std::string::npos)return false;
std::string part_x = in.substr(0, left);
std::size_t right = in.rfind(blank_space_sep);
if(right == std::string::npos)return false;
std::string part_y = in.substr(right + 1);
if(left + 2 != right)return false;
op = in[left+1];
x = std::stoi(part_x);
y = std::stoi(part_y);
return true;
}
void DebugPrint()
{
std:: cout << "新请求构建完成 " << x << op << y << "= ?" << std::endl;
}
public:
//x op y
int x;
int y;
char op;
};
class Response
{
public:
Response(int res, int c):result(res), code(c)
{
}
Response()
{
}
public:
bool Serialize(std::string *out)
{
//"result code"
//构建报文的有效载荷
std::string s = std::to_string(result);
s += blank_space_sep;
s += std::to_string(code);
*out = s;
return true;
}
bool Deserialize(const std::string & in)
{
std::size_t pos = in.find(blank_space_sep);
if(pos == std::string::npos)
return false;
std::string part_left = in.substr(0,pos);
std::string part_right = in.substr(pos+1);
result = std::stoi(part_left);
code = std::stoi(part_right);
return true;
}
void DebugPrint()
{
std:: cout << "结果响应完成 " << "result: " << result << "code: " << code << std::endl;
}
public:
int result;
int code;//0 可信,否则!0 具体是几表明对应的错误原因
};
TcpServer.hpp 服务器主体代码封装
#pragma
#include <functional>
#include <string>
#include "Log.hpp"
#include "Socket.hpp"
#include <signal.h>
#include "Log.hpp"
using func_t = std::function<std::string(std::string &package)>;
class TcpServer
{
public:
TcpServer(uint16_t port, func_t callback):port_(port), callback_(callback)
{
}
bool InitServer()
{
listensock_.Socket();
listensock_.Bind(port_);
listensock_.Listen();
lg(Info, "init server .... done");
return true;
}
void Start()
{
signal(SIGCHLD, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
while(true)
{
std::string clientip;
uint16_t clientport;
int sockfd = listensock_.Accept(&clientip, &clientport);
if(sockfd < 0)continue;
lg(Info,"accept a new link, sockfd: %d, clientip: %s,clientport: %d", sockfd, clientip.c_str(), clientport);
//提供服务
if(fork() == 0)
{
listensock_.Close();
std::string inbuffer_stream;
//数据计算
while(true)
{
char buffer[128];
ssize_t n = read(sockfd, buffer, sizeof(buffer));
if(n > 0)
{
buffer[n] = 0;
inbuffer_stream += buffer;
lg(Debug,"debug:\n %s", inbuffer_stream.c_str());
//std::string info = callback_(inbuffer_stream);
//if(info.empty())continue;//不处理,直到客户端把发来的信息填写正确
//write(sockfd, info.c_str(),info.size());
while(true)//这个while循环,在一次读只有一个报文以及不足一个报文的时候效果同上面的代码一样
//而当一次读了好几个报文的时候 它也可以循环处理
{
std::string info = callback_(inbuffer_stream);
if(info.empty())break;
lg(Debug, "debug, response: \n%s", info.c_str());//这里的两行代码是测试服务器一次收到多个报文是不是能够处理
lg(Debug, "debug:\n %s", inbuffer_stream.c_str());// 我们在这里可以看到每次我们拿到一个response的时候 inbufferstream都会变短一些
write(sockfd, info.c_str(),info.size());
}
}
else if(n == 0)break;
else break;
}
exit(0);
}
close(sockfd);
}
}
~TcpServer()
{
}
private:
uint16_t port_;
Sock listensock_;
func_t callback_;
};
ServerCal.hpp 计算器逻辑
// #pragma <iostream>
// #include <iostream>
// #include"Protocol.hpp"
// enum
// {
// Div_ZERO = 1,
// MOD_ZERO,
// Other_Oper
// };
// class ServerCal
// {
// public:
// ServerCal()
// {}
// Response CalculatorHelper(const Request &req)
// {
// Response resp(0, 0);
// switch(req.op)
// {
// case '+':
// resp.result = req.x + req.y;
// break;
// case '-':
// resp.result = req.x - req.y;
// case '*':
// resp.result = req.x * req.y;
// case '/':
// {
// if(req.y == 0)
// resp.code = Div_ZERO;
// else resp.result = req.x / req.y;
// }
// break;
// case '%':
// {
// if(req.y == 0)
// resp.code = MOD_ZERO;
// else
// resp.result = req.x % req.y;
// }
// break;
// default:
// resp.code = Other_Oper;
// break;
// }
// return resp;
// }
// std::string Calculator(std::string &package)
// {
// std::string content;
// bool r = Decode(package, &content);//"len"\n"10 + 20"\n
// if(!r)return "";
// Request req;
// r = req.Deserialize(content);//"10 + 20" ->x=10 op=+ y=20
// if(!r)return "";
// //解析不出来 或者序列化失败我们就return
// //并没有对报文进行修改
// //让它继续读,所以在TcpServer.hpp中是加等 即inbuffer_stream += buffer;
// content = "";
// Response resp = CalculatorHelper(req);//result=30 code=0
// resp.Serialize(&content);//"30 0"
// content = Encode(content);//"len"\n"30 0"\n
// return content;
// }
// };
#pragma once
#include <iostream>
#include "Protocol.hpp"
enum
{
Div_Zero = 1,
Mod_Zero,
Other_Oper
};
class ServerCal
{
public:
ServerCal()
{
}
Response CalculatorHelper(const Request &req)
{
Response resp(0, 0);
switch (req.op)
{
case '+':
resp.result = req.x + req.y;
break;
case '-':
resp.result = req.x - req.y;
break;
case '*':
resp.result = req.x * req.y;
break;
case '/':
{
if (req.y == 0)
resp.code = Div_Zero;
else
resp.result = req.x / req.y;
}
break;
case '%':
{
if (req.y == 0)
resp.code = Mod_Zero;
else
resp.result = req.x % req.y;
}
break;
default:
resp.code = Other_Oper;
break;
}
return resp;
}
// "len"\n"10 + 20"\n
std::string Calculator(std::string &package)
{
std::string content;
bool r = Decode(package, &content); // "len"\n"10 + 20"\n
if (!r)
return "";
// "10 + 20"
Request req;
r = req.Deserialize(content); // "10 + 20" ->x=10 op=+ y=20
if (!r)
return "";
content = ""; //
Response resp = CalculatorHelper(req); // result=30 code=0;
resp.Serialize(&content); // "30 0"
content = Encode(content); // "len"\n"30 0"
return content;
}
~ServerCal()
{
}
};
ServerCal.cc 服务器主函数
#include "TcpServer.hpp"
#include "ServerCal.hpp"
static void Usage(const std::string &proc)
{
std::cout << "\nUsage: " << proc << " port\n\n" << std::endl;
}
int main(int argc, char *argv[])
{
if(argc != 2 )
{
Usage(argv[0]);
exit(0);
}
uint16_t port = std::stoi(argv[1]);
ServerCal cal;
TcpServer *tsvp = new TcpServer(port,std::bind(&ServerCal::Calculator, &cal, std::placeholders::_1));
tsvp->InitServer();
tsvp->Start();
// Response resp(1000,0);
// std::string content;
// resp.Serialize(&content);
// std::cout << content << std::endl;
// std::string package = Encode(content);
// std::cout << package;
// content = "";
// bool r = Decode(package, &content);
// std::cout << content << std::endl;
// Response temp;
// temp.Deserialize(content);
// std::cout << temp.result << std::endl;
// std::cout << temp.code <<std::endl;
// Request req(123, 456, '+');
// std::string s;
// req.Serialize(&s);
// s = Encode(s);
// std::cout << s;
// std::string content;
// bool r = Decode(s, &content);
// std::cout << content << std::endl;
// Request temp;
// temp.Deserialize(content);
// std::cout << temp.x << std::endl;
// std::cout << temp.op << std::endl;
// std::cout << temp.y << std::endl;
return 0;
}
ClientCal.cc 客户端主函数 代码较少未作拆分
#include "Socket.hpp"
#include <string>
#include <iostream>
#include <time.h>
#include "Protocol.hpp"
#include <unistd.h>
#include<assert.h>
static void Usage(const std::string &proc)
{
std::cout << "\nUsage:" << proc << " serverip serverport\n" << std::endl;
}
//./client ip port
int main(int argc, char *argv[])
{
if(argc != 3)
{
Usage(argv[0]);
exit(0);
}
std::string serverip = argv[1];
uint16_t serverport = std::stoi(argv[2]);
Sock sockfd;
sockfd.Socket();
bool r = sockfd.Connect(serverip, serverport);
if(!r)return 1;
srand(time(nullptr) ^ getpid());
int cnt = 1;
const std::string opers = "+-*/%&^";
std::string inbuffer_stream;
while(cnt <= 10)
{
std::cout <<"===================第" << cnt << "次测试..." << std::endl;
int x = rand() % 100 + 1;
usleep(123);
int y = rand() % 100;
usleep(321);
char oper = opers[rand()%opers.size()];
Request req(x, y, oper);
req.DebugPrint();
std::string package;
req.Serialize(&package);
package = Encode(package);
std:: cout << "这是最新的发出请求: \n" << package;
write(sockfd.Fd(), package.c_str(), package.size());
std:: cout << "这是最新的发出请求: \n" << package;
write(sockfd.Fd(), package.c_str(), package.size());
std:: cout << "这是最新的发出请求: \n" << package;
write(sockfd.Fd(), package.c_str(), package.size());
std:: cout << "这是最新的发出请求: \n" << package;
write(sockfd.Fd(), package.c_str(), package.size());
//char buffer[1280];
// ssize_t n = read(sockfd.Fd(), buffer, sizeof(buffer));//我们也无法保证我们能读到一个完整的报文
// if(n > 0)
// {
// buffer[n] = 0;
// inbuffer_stream += buffer;
// std::string content;
// bool r = Decode(inbuffer_stream, &content);
// assert(r);//服务器那边已经做过类似处理 这里就直接用assert判断了 注意,带assert的话需要在编译的时候加-g
// Response resp;
// r = resp.Deserialize(content);
// assert(r);
// resp.DebugPrint();
// }
sleep(1);
std::cout <<"=====================================" << std::endl;
cnt++;
}
sockfd.Close();
return 0;
}
四、结果显示
1.telnet测试
2.客户端测试
3.一次多个请求 客户端不显示结果
4.显示服务器处理数据过程
可以看到拿走最后一个后,重新获取了报文
五、自动序列反序列化方法
我们现有json 和protobuf 方案来进行便捷的序列反序列化
json:可视化的序列反序列化,方便调试
protobuf:二进制流的序列反序列化方式,注重效率
json
里面大多数是key value的格式,一个json串实际上也就是一个字符串
要使用它我们需要安装第三方库 sudo yum install -y jsoncpp-devel