目录
一、实现目标:
实现基于TCP套接字实现的网络通信和UDP套接字还是有些许相同的地方的,本次也是实现服务端和客户端,客户端想服务端发送消息后,服务端发送回客户端,接下来看看:
实现原理也就是都从sockfd这个文件描述符中同时进行读写操作
二、实现代码:
1、服务端代码解析:
对于服务端,我们本次也是将服务端进行封装,然后在main.cc函数中进行使用,此时的main.cc函数和udp套接字那里是差不多的,在本次,我们实现4个版本:
#include "tcpserver.hpp"
#include <memory>
void Usage(const std::string proc)
{
std::cout<<"\n\tUsage: "<<proc<<" port[1024+]\n"<<std::endl;
}
int main(int argc,char* argv[])
{
if(argc != 2)
{
Usage(argv[0]);
exit(Usage_ERR);
}
uint16_t port = stoi(argv[1]);
std::unique_ptr<TcpServer> tcp_svr(new TcpServer(port));
tcp_svr->Init();
tcp_svr->Start();
return 0;
}
接下来实现服务端代码:
#pragma once
#include <iostream>
#include <string.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "log.hpp"
#include "ThreadPool.hpp"
#include "Task.hpp"
enum
{
Usage_ERR = 1,
SOCK_ERR,
BIND_ERR,
LISTEN_ERR
};
std::string defaultip = "0.0.0.0";
class TcpServer
{
public:
TcpServer(const uint16_t& port,const std::string& ip = defaultip)
:_listensockfd(0),_port(port),_ip(ip)
{}
void Init()
{
}
void Start()
{
}
void Service(int& sockfd,const std::string& clientip,const uint16_t& clientport)
{
}
~TcpServer()
{
if(_listensockfd > 0) close(_listensockfd);
}
private:
int _listensockfd;
uint16_t _port;
std::string _ip;
};
主体思路和udp套接字也差不多,就是创建,初始化,启动服务端,其中成员函数和udp的也是一样的,但是这里的套接字是_listensockfd套接字,这个作用和udp的_sockfd就不一样了,在后面出现了sockfd会讲的
init函数:
init函数主体和udp的没啥区别,就是创建sockfd,填充sockaddr_in,bind绑定
在socket创建_listensockfd套接字的时候,参数是SOCK_STREAM,这是面向字节流的
需要设置监听状态:
和udp套接字不同的是TCP需要进行监听状态,这是因为TCP是面向连接的,在客户端在向TCP服务端发送数据的时候,需要提前与TCP服务端进行connect连接,但是服务端又不向客户端发送请求,而是客户端向服务端发送请求,所以服务端是比较被动的,那么服务端需要一直处于一种等待连接到来的状态,就需要是监听状态,监听客户端有没有到来
void Init()
{
_listensockfd = socket(AF_INET,SOCK_STREAM,0);
if(_listensockfd < 0)
{
lg(FATAL,"_listensockfd create false,_listensockfd:%d",_listensockfd);
exit(SOCK_ERR);
}
lg(INFO,"_listensockfd create success,_listensockfd:%d",_listensockfd);
// bind绑定需要struct sockaddr结构体
struct sockaddr_in local;
local.sin_family = AF_INET;
local.sin_port = htons(_port);
inet_aton(_ip.c_str(),&local.sin_addr);
// 进行绑定
int n = bind(_listensockfd,(struct sockaddr*)&local,sizeof(local));
if(n < 0)
{
lg(FATAL,"bind false,errno:%d,errstring:%s",errno,strerror(errno));
exit(BIND_ERR);
}
lg(INFO,"bind success,errno:%d,errstring:%s",errno,strerror(errno));
//TCP是面向连接的,服务器一般是比较“被动”的,服务器一直处于一种,一直等待连接到来的状态
int m = listen(_listensockfd,backlog);
if(m < 0)
{
lg(FATAL,"listen false,error:%d,errstring:%s",errno,strerror(errno));
exit(LISTEN_ERR);
}
lg(INFO,"listen success,error:%d,errstring:%s",errno,strerror(errno));
}
Start函数:
在Start函数这里实现了四个版本的代码:
version1-单进程:
在服务端和客户端的网络通信前,服务端需要收到客户端的链接请求,这里使用accept:
参数解析:
sockfd:监听套接字描述符,是之前通过socket,bind,listen创建并配置的
addr:指向sockaddr结构的指针,用于存储客户端的地址信息
addrlen:指向socklen_t类型的指针,表示addr结构的长度
返回值:
- 成功时返回一个新的套接字描述符,用于与客户端通信
- 失败时返回-1,错误码被设置
accept的返回值也是一个文件描述符,那么什么时候用这个文件描述符,什么时候用_listensockfd文件描述符呢?这两者有什么区别呢?
监听套接字(_listensockfd):用于获取客户端发来的连接请求,accept函数会不断从监听套接字当中获取新连接
accept函数返回的套接字:用于为本次accept获取到的连接提供服务。监听套接字的任务只是不断获取新连接,而真正为这些连接提供服务的套接字是accept函数返回的套接字,而不是监听套接字
accept接收后可以将客户端的端口号,IP地址进行网路到本地的字节序的转换进行存储起来,接着通过Service函数进行处理
void Start()
{
lg(INFO,"tcpserver is running ...");
for(;;)
{
struct sockaddr_in client;
socklen_t len = sizeof(client);
// 获取新链接 accept收到客户端上的数据的时候要把网络序列转主机序列
int sockfd = accept(_listensockfd,(struct sockaddr*)&client,&len);
if(sockfd < 0)
{
lg(WARNING,"accept error,errno:%d,errstring:%s",errno,strerror(errno));
continue;
}
// lg(INFO,"accept success,errno:%d,errstring:%s",errno,strerror(errno));
// 拿到客户端的端口和IP
uint16_t clientport = ntohs(client.sin_port);
char clientip[32];
inet_ntop(AF_INET,&client,clientip,sizeof(clientip));
lg(INFO,"get a new link,_sockfd:%d,clientport:%d,clientip:%s",sockfd,clientport,clientip);
// 根据新链接进行通信
// // version1 -- 单进程版本
Service(sockfd,clientip,clientport); // 服务端做处理
close(sockfd);
}
}
Service函数:
这个函数就是直接在sockfd套接字中进行读取,read从sockfd中进行读,然后读到数据在处理完后,通过write向sockfd中进行写入
void Service(int& sockfd,const std::string& clientip,const uint16_t& clientport)
{
char buffer[1024];
while(true)
{
memset(buffer, 0, sizeof(buffer));
// 首先从sockfd中读数据
int n = read(sockfd,buffer,sizeof(buffer));
// 如果读到了数据就直接进行处理
if(n > 0)
{
// 进行处理数据
std::cout<<"client say#"<<buffer<<std::endl;
std::string echostr = "server#";
echostr += buffer;
// 再将处理后的数据写入sockfd中让客户端读取
write(sockfd,echostr.c_str(),echostr.size());
}
else if (n == 0)
{
lg(INFO, "%s:%d quit, server close sockfd: %d", clientip.c_str(), clientport, sockfd);
break;
}
else
{
lg(WARNING, "read error, sockfd: %d, client ip: %s, client port: %d", sockfd, clientip.c_str(), clientport);
break;
}
}
}
2、客户端代码解析:
对于客户端,总体思路和udp也是一样的,
- 首先通过socket创建套接字,注意这里的第二个参数是SOCK_STREAM面向字节流,
- 然后就是bind,但是不需要手动bind,客户端连接服务端时系统会自动指定一个端口号给客户端,并且主要是保证各个客户端之间的端口号不一样即可
接着是connect连接:
参数解析:
- sockfd:由上面socket()函数创建的套接字描述符
- addr:指向目标服务器地址结构的指针
- addrlen:addr的地址结构的长度
返回值:
- 成功时返回 0
- 失败时返回 - 1,错误码被设置
当链接后,并且服务端那边也accept接收后,就可以进行网络通信了
#include <iostream>
#include <unistd.h>
#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
void Usage(const std::string proc)
{
std::cout << "\n\tUsage: " << proc << " port[1024+]\n"
<< std::endl;
}
// ./tcpclient.cc serverip serverport
int main(int argc, char *argv[])
{
// 使用手册
if (argc != 3)
{
Usage(argv[0]);
exit(1);
}
// 拿到命令行中的IP地址和端口号
std::string serverip = argv[1];
uint16_t serverport = std::stoi(argv[2]);
// 初始化
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(serverport);
inet_pton(AF_INET, serverip.c_str(), &(server.sin_addr));
socklen_t len = sizeof(server);
while (true)
{
// sockfd
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
{
std::cerr << "socket error" << std::endl;
exit(2);
}
// bind问题:这里和udp套接字一样,不需要手动绑定,是OS自动给我们绑定,客户端这里只要保证端口号不一样即可
// connect链接
int n = connect(sockfd, (struct sockaddr *)&server, len);
if (n < 0)
{
std::cerr << "connect error,cnt:" << cnt << std::endl;
exit(2);
}
std::cerr << "connect success" << std::endl;
std::string message;
std::cout << "Please enter#";
// 获取输入
getline(std::cin, message);
// 向sockfd中写入
int m = write(sockfd, message.c_str(), message.size());
if (m < 0)
{
std::cerr << "write error..." << std::endl;
continue;
}
// 从sockfd中读取
char buffer[4096];
int n = read(sockfd, buffer, sizeof(buffer));
if (n > 0)
{
buffer[n] = 0;
std::cout << buffer << std::endl;
}
// 关闭sockfd
close(sockfd);
}
return 0;
}
3、服务端的多个版本:
version2-多进程:
单进程版本的服务器有弊端,当一个客户端正在和服务端进行通信的时候,另一个客户端即使能够链接服务端,但是不能够进行通信的,只有当第一个客户端退出后,第二个客户端才能够和服务端之间进行通信
这是因为单进程版本是只有一个执行流的,此时一个服务端只能够为一个客户端提供服务
那么就需要引入多进程版本的服务端了:
需要修改的服务端代码只是Start中增加:
fork创建子进程后,子进程会继承父进程的文件描述符,也就能够看到在父进程中创建的sockfd,子进程向这个文件描述符中进行Service服务
父进程记得等待子进程,不然会有僵尸进程,进而造成内存泄漏
还有一件事:
父进程等待子进程的时候是阻塞等待的,那么此时就和单进程没什么区别了,那么怎么办呢?-----让子进程在进行创建子进程,然后子进程break出来,这样子进程就被父进程等待到了,然后父进程继续循环创建子进程,而孙子进程就会成为孤儿进程,然后被系统回收
void Start()
{
// ThreadPool<Task>::GetInstance()->Start(); // 启动线程池
lg(INFO,"tcpserver is running ...");
for(;;)
{
struct sockaddr_in client;
socklen_t len = sizeof(client);
// 获取新链接 accept收到客户端上的数据的时候要把网络序列转主机序列
int sockfd = accept(_listensockfd,(struct sockaddr*)&client,&len);
if(sockfd < 0)
{
lg(WARNING,"accept error,errno:%d,errstring:%s",errno,strerror(errno));
continue;
}
// lg(INFO,"accept success,errno:%d,errstring:%s",errno,strerror(errno));
// 拿到客户端的端口和IP
uint16_t clientport = ntohs(client.sin_port);
char clientip[32];
inet_ntop(AF_INET,&client,clientip,sizeof(clientip));
lg(INFO,"get a new link,_sockfd:%d,clientport:%d,clientip:%s",sockfd,clientport,clientip);
// 根据新链接进行通信
se(sockfd);
// version2 -- 多进程版本
// fork线程,在子进程中关闭listen文件描述符,创建孙子进程,让孙子进程去进行Service,
// 父进程进行等待,但是创建一个进程的成本太高了,所以这个版本也不建议使用
pid_t id = fork();
if(id == 0)
{
// 子进程
close(_listensockfd);
if (fork() > 0) break;
Service(sockfd,clientip,clientport);
close(sockfd);
exit(0);
}
close(sockfd);
waitpid(id,nullptr,0);
}
}
version3-多线程:
但是创建进程的成本是很高的,需要页表,PCB,进程地址空间等等,但是创建线程的成本就没有那么高了,可以引入线程来写一个多线程版本的服务端
修改代码:
增加一个结构体:这个结构体里面记录者进行操作的文件描述符,还有客户端的端口和IP地址
这是因为Service是需要传入这三个参数的,但是pthread_create只有一个void*,那么就需要用结构体传入,然后在里面进行强制类型转换就可以拿到这个结构体里面的数据了
为什么Routine要设计成静态的
因为pthread_create的函数原型是
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
所以他的第三个函数指针的类型为: void* (*) (void*)
那么就必须要求,Routine的类型就是void* (*) (void*)类型
那么当Routine不是静态成员函数,他就会有个隐藏的this指针作为第一个形参,与咱们void* (*) (void*)这个类型不匹配
此时Routine是静态函数,不能够直接调用类内成员函数Service,所以就需要this指针,所以就需要在结构体中有一个TcpServer*,然后将TcpServer的this指针传到Routine,通过this指针调用Service函数
class ThreadDate
{
public:
ThreadDate(int sockfd,std::string clientip,uint16_t clientport,TcpServer* t)
:_sockfd(sockfd),_clientip(clientip),_clientport(clientport),_tsvr(t)
{}
public:
int _sockfd;
std::string _clientip;
uint16_t _clientport;
TcpServer* _tsvr;
};
直接进行创建线程,将结构体td传入
最后就是Routine函数:
这里要进行线程分离,这里的原因和进程那里是差不多的
然后就是强制类型转换后拿到数据进行传参
static void* Routine(void* args)
{
pthread_detach(pthread_self());
ThreadDate* td = static_cast<ThreadDate*>(args);
td->_tsvr->Service(td->_sockfd,td->_clientip,td->_clientport);
delete td;
return nullptr;
}
void Start()
{
// ThreadPool<Task>::GetInstance()->Start(); // 启动线程池
lg(INFO,"tcpserver is running ...");
for(;;)
{
struct sockaddr_in client;
socklen_t len = sizeof(client);
// 获取新链接 accept收到客户端上的数据的时候要把网络序列转主机序列
int sockfd = accept(_listensockfd,(struct sockaddr*)&client,&len);
if(sockfd < 0)
{
lg(WARNING,"accept error,errno:%d,errstring:%s",errno,strerror(errno));
continue;
}
// lg(INFO,"accept success,errno:%d,errstring:%s",errno,strerror(errno));
// 拿到客户端的端口和IP
uint16_t clientport = ntohs(client.sin_port);
char clientip[32];
inet_ntop(AF_INET,&client,clientip,sizeof(clientip));
lg(INFO,"get a new link,_sockfd:%d,clientport:%d,clientip:%s",sockfd,clientport,clientip);
// 根据新链接进行通信
// version3 -- 多线程版本
// 创建好一个ThreadDate类,其中成员变量为sockfd,客户端的端口号和IP
// 创建线程这里需要将上述类传入(pthread_create)
// 在routine里面进行线程分离,强转,调用server,删除,返回
// 最后有一个类内static问题
ThreadDate* td = new ThreadDate(sockfd,clientip,clientport,this);
pthread_t tid;
pthread_create(&tid,nullptr,Routine,td);
}
}
version4-线程池版本:
多线程版本的局限性:
- 每当有新连接到来时,服务端的主线程都会重新为该客户端创建为其提供服务的新线程,而当服务结束后又会将该新线程销毁。这样做不仅麻烦,而且效率低下,每当连接到来的时候服务端才创建对应提供服务的线程
- 如果有大量的客户端连接请求,此时服务端要为每一个客户端创建对应的服务线程。计算机当中的线程越多,CPU的压力就越大,此外,一旦线程太多,每一个线程再次被调度的周期就变长了,而线程是为客户端提供服务的,线程被调度的周期变长,客户端也迟迟得不到应答
那么就需要使用线程池版本,提前创建好一批线程,当有客户端来了的时候直接让这个线程去服务
线程池代码:
这个在之前的学习中有写过,这里直接拿过来用
#pragma once
#include <iostream>
#include <vector>
#include <queue>
#include <string>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
struct ThreadInfo
{
pthread_t _tid;
std::string ThreadName;
};
static int defaultnum = 5;
template <class T>
class ThreadPool
{
public:
void Lock()
{
pthread_mutex_lock(&_mutex);
}
void UnLock()
{
pthread_mutex_unlock(&_mutex);
}
void ThreadSleep()
{
pthread_cond_wait(&_cond, &_mutex);
}
bool IsEmpty()
{
return _task.empty();
}
std::string GetThreadName(pthread_t tid)
{
for (auto &it : _threads)
{
if (it._tid == tid)
{
return it.ThreadName;
}
}
return "NONE";
}
void Wakeup()
{
pthread_cond_signal(&_cond);
}
public:
static void *myhander(void *args)
{
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
std::string name = tp->GetThreadName(pthread_self());
while (true)
{
tp->Lock();
while (tp->IsEmpty())
{
tp->ThreadSleep();
}
T ret = tp->Pop();
tp->UnLock();
ret();
}
}
T Pop()
{
T t = _task.front();
_task.pop();
return t;
}
void Push(const T &t)
{
Lock();
_task.push(t);
Wakeup();
UnLock();
}
void Start()
{
int num = _threads.size();
for (int i = 0; i < num; i++)
{
_threads[i].ThreadName = "thread-" + std::to_string(i + 1);
pthread_create(&(_threads[i]._tid), nullptr, myhander, this);
}
}
//懒汉模式
static ThreadPool<T>* GetInstance()
{
if(_tp == nullptr) //我们发现,只有第一个线程进入的时候_tp才会为空,这样的话后面如果多次进行加锁判断释放锁会降低效率
{ //所以这里得二次判断,保证后面不为空的时候直接返回,并且即使第一次有多个线程进入了235行,但是
//依然是只会有一个线程成功申请锁并进入if,new空间,其余线程即使后来申请到锁了,但是if判断失效,所以就
//会释放锁,然后后来的线程就会在第一个if那里都进不去,进而增加效率
pthread_mutex_lock(&_lock);
if(_tp == nullptr)
{
_tp = new ThreadPool<T>;
}
pthread_mutex_unlock(&_lock);
}
return _tp;
}
private:
ThreadPool(int num = defaultnum)
: _threads(num)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
ThreadPool(const ThreadPool<T>& copy) = delete;
const ThreadPool<T>& operator=(const ThreadPool<T>& copy) = delete;
private:
std::vector<ThreadInfo> _threads;
std::queue<T> _task;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
static ThreadPool<T>* _tp;
// static ThreadPool<T>* _tp = nullptr;
static pthread_mutex_t _lock;
};
template <class T>
ThreadPool<T>* ThreadPool<T>::_tp = nullptr;
template <class T>
pthread_mutex_t ThreadPool<T>::_lock = PTHREAD_MUTEX_INITIALIZER;
引入Task任务
线程池是要执行一个个任务的,这里就不用Service了,在task类中的run接口中进行服务,网络通信和数据处理
#pragma once
#include <iostream>
#include <string.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "log.hpp"
Log lg;
class Task
{
public:
Task(int sockfd, std::string clientip, uint16_t clientport)
: _sockfd(sockfd), _clientip(clientip), _clientport(clientport)
{}
void run()
{
char buffer[1024];
while (true)
{
memset(buffer, 0, sizeof(buffer));
// 首先从sockfd中读数据
int n = read(_sockfd, buffer, sizeof(buffer));
// 如果读到了数据就直接进行处理
if (n > 0)
{
// 进行处理数据
std::cout << "client say#" << buffer << std::endl;
std::string echostr = "server#";
echostr += buffer;
// 再将处理后的数据写入sockfd中让客户端读取
int m = write(_sockfd, echostr.c_str(), echostr.size());
if (m < 0)
{
lg(WARNING, "write error,errno:%d,errnostring:%s", errno, strerror(errno));
}
}
else if (n == 0)
{
lg(INFO, "%s:%d quit, server close sockfd: %d", _clientip.c_str(), _clientport, _sockfd);
break;
}
else
{
lg(WARNING, "read error, sockfd: %d, client ip: %s, client port: %d", _sockfd, _clientip.c_str(), _clientport);
break;
}
}
}
void operator()()
{
run();
}
~Task()
{
}
private:
int _sockfd;
std::string _clientip;
uint16_t _clientport;
};
最后是Start代码:
在最开始记得启动线程池,增加如下三行代码即可
这样,当服务器启动的时候,就会创建出一批线程,一个主线程和五个子线程
while :; do ps -aL|head -1&&ps -aL|grep tcpserver;echo "####################";sleep 1;done
三、小拓展:
模拟一下客户端掉线后,重连到服务端
这个动作是客户端在自己做的,在本次中,我们让服务端突然挂掉,客户端链接失败就会进行重连
核心代码如下:
while (true)
{
int cnt = 5; // 进行重连几次
bool isreconnect = false; // 检测是否重连成功
// sockfd
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
{
std::cerr << "socket error" << std::endl;
exit(2);
}
// std::cerr << "socket success" << std::endl;
// bind问题:这里和udp套接字一样,不需要手动绑定,是OS自动给我们绑定,客户端这里只要保证端口号不一样即可
do
{
// connect链接
int n = connect(sockfd, (struct sockaddr *)&server, len);
if (n < 0) // 连接失败
{
isreconnect = true;
cnt--;
std::cerr << "connect error,cnt:" << cnt << std::endl;
sleep(2);
}
else // 连接成功
{
break;
}
} while (cnt && isreconnect);
if (cnt == 0)
{
std::cerr << "user offonline" << std::endl;
break;
}
cnt记录的是重连几次,isreconnect是检测是否重连成功
观察现象:
首先将客户端服务端连接好
接着让服务端退出,此时客户端是不知道服务端退了的
如下,当客户端再次发送消息,就会进行重连,如果服务端不再次启动就会链接失败,用户下线
但是如果服务端再次启动,客户端就会链接成功再次上线
四、整体代码:
main.cc
#include "tcpserver.hpp"
#include <memory>
void Usage(const std::string proc)
{
std::cout<<"\n\tUsage: "<<proc<<" port[1024+]\n"<<std::endl;
}
int main(int argc,char* argv[])
{
if(argc != 2)
{
Usage(argv[0]);
exit(Usage_ERR);
}
uint16_t port = stoi(argv[1]);
std::unique_ptr<TcpServer> tcp_svr(new TcpServer(port));
tcp_svr->Init();
tcp_svr->Start();
return 0;
}
tcpserver.hpp
#pragma once
#include <iostream>
#include <string.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "log.hpp"
#include "ThreadPool.hpp"
#include "Task.hpp"
enum
{
Usage_ERR = 1,
SOCK_ERR,
BIND_ERR,
LISTEN_ERR
};
std::string defaultip = "0.0.0.0";
const int backlog = 10;
class TcpServer;
class ThreadDate
{
public:
ThreadDate(int sockfd,std::string clientip,uint16_t clientport,TcpServer* t)
:_sockfd(sockfd),_clientip(clientip),_clientport(clientport),_tsvr(t)
{}
public:
int _sockfd;
std::string _clientip;
uint16_t _clientport;
TcpServer* _tsvr;
};
class TcpServer
{
public:
TcpServer(const uint16_t& port,const std::string& ip = defaultip)
:_listensockfd(0),_port(port),_ip(ip)
{}
void Init()
{
_listensockfd = socket(AF_INET,SOCK_STREAM,0);
if(_listensockfd < 0)
{
lg(FATAL,"_listensockfd create false,_listensockfd:%d",_listensockfd);
exit(SOCK_ERR);
}
lg(INFO,"_listensockfd create success,_listensockfd:%d",_listensockfd);
// bind绑定需要struct sockaddr结构体
struct sockaddr_in local;
local.sin_family = AF_INET;
local.sin_port = htons(_port);
inet_aton(_ip.c_str(),&local.sin_addr);
// 进行绑定
int n = bind(_listensockfd,(struct sockaddr*)&local,sizeof(local));
if(n < 0)
{
lg(FATAL,"bind false,errno:%d,errstring:%s",errno,strerror(errno));
exit(BIND_ERR);
}
lg(INFO,"bind success,errno:%d,errstring:%s",errno,strerror(errno));
//TCP是面向连接的,服务器一般是比较“被动”的,服务器一直处于一种,一直等待连接到来的状态
int m = listen(_listensockfd,backlog);
if(m < 0)
{
lg(FATAL,"listen false,error:%d,errstring:%s",errno,strerror(errno));
exit(LISTEN_ERR);
}
lg(INFO,"listen success,error:%d,errstring:%s",errno,strerror(errno));
}
static void* Routine(void* args)
{
pthread_detach(pthread_self());
ThreadDate* td = static_cast<ThreadDate*>(args);
td->_tsvr->Service(td->_sockfd,td->_clientip,td->_clientport);
delete td;
return nullptr;
}
void Start()
{
ThreadPool<Task>::GetInstance()->Start(); // 启动线程池
lg(INFO,"tcpserver is running ...");
for(;;)
{
struct sockaddr_in client;
socklen_t len = sizeof(client);
// 获取新链接 accept收到客户端上的数据的时候要把网络序列转主机序列
int sockfd = accept(_listensockfd,(struct sockaddr*)&client,&len);
if(sockfd < 0)
{
lg(WARNING,"accept error,errno:%d,errstring:%s",errno,strerror(errno));
continue;
}
// lg(INFO,"accept success,errno:%d,errstring:%s",errno,strerror(errno));
// 拿到客户端的端口和IP
uint16_t clientport = ntohs(client.sin_port);
char clientip[32];
inet_ntop(AF_INET,&client,clientip,sizeof(clientip));
lg(INFO,"get a new link,_sockfd:%d,clientport:%d,clientip:%s",sockfd,clientport,clientip);
// 根据新链接进行通信
// // // version1 -- 单进程版本
// Service(sockfd,clientip,clientport); // 服务端做处理
// close(sockfd);
// version2 -- 多进程版本
// fork线程,在子进程中关闭listen文件描述符,创建孙子进程,让孙子进程去进行Service,
// 父进程进行等待,但是创建一个进程的成本太高了,所以这个版本也不建议使用
// pid_t id = fork();
// if(id == 0)
// {
// // 子进程
// close(_listensockfd);
// if (fork() > 0) break;
// Service(sockfd,clientip,clientport);
// close(sockfd);
// exit(0);
// }
// close(sockfd);
// waitpid(id,nullptr,0);
// version3 -- 多线程版本
// 创建好一个ThreadDate类,其中成员变量为sockfd,客户端的端口号和IP
// 创建线程这里需要将上述类传入(pthread_create)
// 在routine里面进行线程分离,强转,调用server,删除,返回
// 最后有一个类内static问题
// ThreadDate* td = new ThreadDate(sockfd,clientip,clientport,this);
// pthread_t tid;
// pthread_create(&tid,nullptr,Routine,td);
// version4 -- 线程池版本
// 任务
// 线程池Getinstance->push,记得在最开始的时候Start
Task task(sockfd,clientip,clientport);
ThreadPool<Task>::GetInstance()->Push(task);
}
}
void Service(int& sockfd,const std::string& clientip,const uint16_t& clientport)
{
char buffer[1024];
while(true)
{
memset(buffer, 0, sizeof(buffer));
// 首先从sockfd中读数据
int n = read(sockfd,buffer,sizeof(buffer));
// 如果读到了数据就直接进行处理
if(n > 0)
{
// 进行处理数据
std::cout<<"client say#"<<buffer<<std::endl;
std::string echostr = "server#";
echostr += buffer;
// 再将处理后的数据写入sockfd中让客户端读取
write(sockfd,echostr.c_str(),echostr.size());
}
else if (n == 0)
{
lg(INFO, "%s:%d quit, server close sockfd: %d", clientip.c_str(), clientport, sockfd);
break;
}
else
{
lg(WARNING, "read error, sockfd: %d, client ip: %s, client port: %d", sockfd, clientip.c_str(), clientport);
break;
}
}
}
~TcpServer()
{
if(_listensockfd > 0) close(_listensockfd);
}
private:
int _listensockfd;
uint16_t _port;
std::string _ip;
};
tcpclient.cc
#include <iostream>
#include <unistd.h>
#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
void Usage(const std::string proc)
{
std::cout << "\n\tUsage: " << proc << " port[1024+]\n"
<< std::endl;
}
// ./tcpclient.cc serverip serverport
int main(int argc, char *argv[])
{
// 使用手册
if (argc != 3)
{
Usage(argv[0]);
exit(1);
}
// 拿到命令行中的IP地址和端口号
std::string serverip = argv[1];
uint16_t serverport = std::stoi(argv[2]);
// 初始化
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(serverport);
inet_pton(AF_INET, serverip.c_str(), &(server.sin_addr));
socklen_t len = sizeof(server);
while (true)
{
int cnt = 5; // 进行重连几次
bool isreconnect = false; // 检测是否重连成功
// sockfd
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
{
std::cerr << "socket error" << std::endl;
exit(2);
}
// std::cerr << "socket success" << std::endl;
// bind问题:这里和udp套接字一样,不需要手动绑定,是OS自动给我们绑定,客户端这里只要保证端口号不一样即可
do
{
// connect链接
int n = connect(sockfd, (struct sockaddr *)&server, len);
if (n < 0) // 连接失败
{
isreconnect = true;
cnt--;
std::cerr << "connect error,cnt:" << cnt << std::endl;
sleep(2);
}
else // 连接成功
{
break;
}
} while (cnt && isreconnect);
if (cnt == 0)
{
std::cerr << "user offonline" << std::endl;
break;
}
// std::cerr << "connect success" << std::endl;
std::string message;
std::cout << "Please enter#";
// 获取输入
getline(std::cin, message);
// 向sockfd中写入
int m = write(sockfd, message.c_str(), message.size());
if (m < 0)
{
std::cerr << "write error..." << std::endl;
continue;
}
// 从sockfd中读取
char buffer[4096];
int n = read(sockfd, buffer, sizeof(buffer));
if (n > 0)
{
buffer[n] = 0;
std::cout << buffer << std::endl;
}
// 关闭sockfd
close(sockfd);
}
return 0;
}
Task.hpp
#pragma once
#include <iostream>
#include <string.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "log.hpp"
Log lg;
class Task
{
public:
Task(int sockfd, std::string clientip, uint16_t clientport)
: _sockfd(sockfd), _clientip(clientip), _clientport(clientport)
{
}
void run()
{
char buffer[1024];
memset(buffer, 0, sizeof(buffer));
// 首先从sockfd中读数据
int n = read(_sockfd, buffer, sizeof(buffer));
// 如果读到了数据就直接进行处理
if (n > 0)
{
// 进行处理数据
std::cout << "client say#" << buffer << std::endl;
std::string echostr = "server#";
echostr += buffer;
// 再将处理后的数据写入sockfd中让客户端读取
int m = write(_sockfd, echostr.c_str(), echostr.size());
if (m < 0)
{
lg(WARNING, "write error,errno:%d,errnostring:%s", errno, strerror(errno));
}
}
else if (n == 0)
{
lg(INFO, "%s:%d quit, server close sockfd: %d", _clientip.c_str(), _clientport, _sockfd);
}
else
{
lg(WARNING, "read error, sockfd: %d, client ip: %s, client port: %d", _sockfd, _clientip.c_str(), _clientport);
}
close(_sockfd);
}
void operator()()
{
run();
}
~Task()
{
}
private:
int _sockfd;
std::string _clientip;
uint16_t _clientport;
};
ThreadPool.hpp
#pragma once
#include <iostream>
#include <vector>
#include <queue>
#include <string>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
struct ThreadInfo
{
pthread_t _tid;
std::string ThreadName;
};
static int defaultnum = 5;
template <class T>
class ThreadPool
{
public:
void Lock()
{
pthread_mutex_lock(&_mutex);
}
void UnLock()
{
pthread_mutex_unlock(&_mutex);
}
void ThreadSleep()
{
pthread_cond_wait(&_cond, &_mutex);
}
bool IsEmpty()
{
return _task.empty();
}
std::string GetThreadName(pthread_t tid)
{
for (auto &it : _threads)
{
if (it._tid == tid)
{
return it.ThreadName;
}
}
return "NONE";
}
void Wakeup()
{
pthread_cond_signal(&_cond);
}
public:
static void *myhander(void *args)
{
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
std::string name = tp->GetThreadName(pthread_self());
while (true)
{
tp->Lock();
while (tp->IsEmpty())
{
tp->ThreadSleep();
}
T ret = tp->Pop();
tp->UnLock();
ret();
}
}
T Pop()
{
T t = _task.front();
_task.pop();
return t;
}
void Push(const T &t)
{
Lock();
_task.push(t);
Wakeup();
UnLock();
}
void Start()
{
int num = _threads.size();
for (int i = 0; i < num; i++)
{
_threads[i].ThreadName = "thread-" + std::to_string(i + 1);
pthread_create(&(_threads[i]._tid), nullptr, myhander, this);
}
}
//懒汉模式
static ThreadPool<T>* GetInstance()
{
if(_tp == nullptr) //我们发现,只有第一个线程进入的时候_tp才会为空,这样的话后面如果多次进行加锁判断释放锁会降低效率
{ //所以这里得二次判断,保证后面不为空的时候直接返回,并且即使第一次有多个线程进入了235行,但是
//依然是只会有一个线程成功申请锁并进入if,new空间,其余线程即使后来申请到锁了,但是if判断失效,所以就
//会释放锁,然后后来的线程就会在第一个if那里都进不去,进而增加效率
pthread_mutex_lock(&_lock);
if(_tp == nullptr)
{
_tp = new ThreadPool<T>;
}
pthread_mutex_unlock(&_lock);
}
return _tp;
}
private:
ThreadPool(int num = defaultnum)
: _threads(num)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
ThreadPool(const ThreadPool<T>& copy) = delete;
const ThreadPool<T>& operator=(const ThreadPool<T>& copy) = delete;
private:
std::vector<ThreadInfo> _threads;
std::queue<T> _task;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
static ThreadPool<T>* _tp;
// static ThreadPool<T>* _tp = nullptr;
static pthread_mutex_t _lock;
};
template <class T>
ThreadPool<T>* ThreadPool<T>::_tp = nullptr;
template <class T>
pthread_mutex_t ThreadPool<T>::_lock = PTHREAD_MUTEX_INITIALIZER;