目录
1、Callbacks.h
回调操作
不用依赖boost库,全部翻译成对C++11的依赖,与多线程相关,也翻译成C++11,与Linux底层的API的关联性减少,移植性更好
#pragma once
#include <memory>
#include <functional>
class Buffer;
class TcpConnection;
using TcpConnectionPtr=std::shared_ptr<TcpConnection>;
using ConnectionCallback=std::function<void(const TcpConnectionPtr&)>;
using CloseCallback=std::function<void(const TcpConnectionPtr&)>;
using WriteCompleteCallback=std::function<void(const TcpConnectionPtr&)>;
using MessageCallback=std::function<void(const TcpConnectionPtr&,
Buffer*,
Timestamp)>;
2、TcpServer.h
#pragma once
/**
* 用户使用muduo编写服务器程序
* 在这里加上头文件,用户使用的时候就不用加了
*/
#include "EventLoop.h"
#include "Acceptor.h"
#include "InetAddress.h"
#include "noncopyable.h"
#include "EventLoopThreadPool.h"
#include "Callbacks.h"
#include <functional>
#include <string>
#include <memory>
#include <atomic>
#include <unordered_map>
//对外的服务器编程使用的类
class TcpServer:noncopyable
{
public:
using ThreadInitCallback=std::function<void(EventLoop*)>;
enum Option//预制两个选项来表示端口是否可重用
{
kNoReusePort,//不可重用
kReusePort,//可重用
};
TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const std::string& nameArg,
Option option=kNoReusePort);
~TcpServer();
void setThreadInitcallback(const ThreadInitCallback& cb){threadInitCallback_=cb;}
void setConnectionCallback(const ConnectionCallback& cb){connectionCallback_=cb;}
void setMessageCallback(const MessageCallback& cb){messageCallback_=cb;}
void setWriteCompleteCallback(const WriteCompleteCallback& cb){writeCompleteCallback_=cb;}
//设置底层subloop的个数
void setThreadNum(int numThreads);
//开启服务器监听
void start();
private:
void newConnection(int sockfd,const InetAddress& peerAddr);
void removeConnection(const TcpConnectionPtr& conn);
void removeConnectionInLoop(const TcpConnectionPtr& conn);
using ConnectionMap=std::unordered_map<std::string,TcpConnectionPtr>;
EventLoop* loop_;//baseloop 用户定义的loop
const std::string ipPort_;//保存服务器相关的ip地址,端口号
const std::string name_;//保存服务器名称
std::unique_ptr<Acceptor> acceptor_;//运行在mainloop,任务就是监听新连接事件
std::shared_ptr<EventLoopThreadPool> threadPool_;//one loop per thread
ConnectionCallback connectionCallback_;//有新连接时的回调
MessageCallback messageCallback_;//有读写消息时的回调
WriteCompleteCallback writeCompleteCallback_;//消息发送完成以后的回调
ThreadInitCallback threadInitCallback_;//loop线程初始化的回调
std::atomic_int started_;
int nextConnId_;
ConnectionMap connections_;//保存所有的连接
};
3、TcpServer.cc
mainLoop相当于reactor模型的reactor反应堆的角色
poller相当于是多路分发器的角色,掌控epoll的所有操作
Acceptor创建listenfd,封装成acceptchannel,通过enableReading向poller上注册读事件,并将listenfd添加到poller中,poller监听acceptChannel上的事件,有事件发生时执行一个读事件的回调(因为之前约定的是对读事件感兴趣),读事件的回调绑定的是handleRead,handleRead中accept函数返回一个跟客户端通信的connfd,然后去执行相应的回调
IOLoop运行在主线程中,想要把当前connfd封装的channel发送给subloop1,但是subloop1还没有被唤醒,我是在主线程中操作IOLoop的,很明显IOLoop和subLoop1不在一个线程,所以执行queueinloop唤醒subloop1,相当于在subloop1中的wakeupfd中写了一个数字,然后就把这个新的TcpConnection注册到subloop1中,操作到其它subloop上也是一样的过程。
重写代码:
#include "TcpServer.h"
#include "Logger.h"
#include "TcpConnection.h"
#include <functional>
#include <strings.h> //bzero()
static EventLoop* CheckLoopNotNull(EventLoop* loop)
{
if(loop==nullptr)
{
LOG_FATAL("%s:%s:%d mainLoop is null!\n",__FILE__,__FUNCTION__,__LINE__);
}
return loop;
}
TcpServer::TcpServer(EventLoop *loop,
const InetAddress &listenAddr,
const std::string &nameArg,
Option option)
:loop_(CheckLoopNotNull(loop))
,ipPort_(listenAddr.toIpPort())
,name_(nameArg)
,acceptor_(new Acceptor(loop,listenAddr,option==kReusePort))
,threadPool_(new EventLoopThreadPool(loop,name_))
,connectionCallback_()
,messageCallback_()
,nextConnId_(1)
{
//当有用户连接时,会执行TcpServer::newConnection回调
acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection,this,
std::placeholders::_1,std::placeholders::_2));
}
TcpServer::~TcpServer()
{
for(auto& item:connections_)
{
//TcpConnectionPtr指向Tcpconnection的强智能指针
TcpConnectionPtr conn(item.second);//这个局部的shared_ptr智能指针对象,出右括号,可以自动释放new出来的TcpConnection对象资源
item.second.reset();
//销毁连接
conn->getLoop()->runInLoop(
std::bind(&TcpConnection::connectDestroyed,conn)
);
}
}
// 设置底层subloop的个数
void TcpServer::setThreadNum(int numThreads)
{
threadPool_->setThreadNum(numThreads);
}
// 开启服务器监听 loop.loop()
void TcpServer::start()
{
if(started_++==0)//防止一个TcpServer对象被start多次
{
threadPool_->start(threadInitCallback_);//启动底层的loop线程池
loop_->runInLoop(std::bind(&Acceptor::listen,acceptor_.get()));
}
}
//有一个新的客户端的连接 acceptor会执行这个回调操作
void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
{
//轮询算法,选择一个subLoop,来管理channel
EventLoop* ioLoop=threadPool_->getNextLoop();
char buf[64]={0};
snprintf(buf,sizeof buf,"-%s#%d",ipPort_.c_str(),nextConnId_);
++nextConnId_;//整形,没有定义成原子变量,因为newconnection只在mainloop中处理,只在一个线程中处理,不涉及线程安全问题
std::string connName=name_+buf;//connection的name
LOG_INFO("TcpServe::newConnection[%s]-new connection[%s] from %s\n",
name_.c_str(),connName.c_str(),peerAddr.toIpPort().c_str());
//通过sockfd获取其绑定的本机的ip地址和端口信息
sockaddr_in local;
::bzero(&local,sizeof local);
socklen_t addrlen=sizeof local;
if(::getsockname(sockfd,(sockaddr*)&local,&addrlen)<0)
{
LOG_ERROR("sockets::getLocalAddr");
}
InetAddress localAddr(local);
//根据连接成功的sockfd,创建TcpConnection连接对象
TcpConnectionPtr conn(new TcpConnection(
ioLoop,
connName,
sockfd, //Socket Channel
localAddr,
peerAddr));
connections_[connName]=conn;
//下面的回调都是用户设置给TcpServer=》(设置给)TcpConnection=》(设置给)channel=》(注册到)poller=》notify(通知)channel调用回调
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
//设置了如何关闭连接的回调 conn->shutdown()用户调用shutdown方法
conn->setCloseCallback(
std::bind(&TcpServer::removeConnection,this,std::placeholders::_1)
);
//直接调用TcpConnection::connectEstablished
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished,conn));
}
void TcpServer::removeConnection(const TcpConnectionPtr &conn)
{
loop_->runInLoop(
std::bind(&TcpServer::removeConnectionInLoop,this,conn)
);
}
void TcpServer::removeConnectionInLoop(const TcpConnectionPtr &conn)
{
LOG_INFO("TcpServer::removeConnectionInLoop[%s]-connection %s\n",
name_.c_str(),conn->name().c_str());
connections_.erase(conn->name());//把相应的connection信息从TcpServer中的connectionMap中删掉
EventLoop* ioLoop=conn->getLoop();//拿到连接对应的loop
ioLoop->queueInLoop(
std::bind(&TcpConnection::connectDestroyed,conn)//执行这条连接的connectDestroyed
);
}
TcpServer中绑定了一个回调