TcpConnection类
TcpConnection 是 Muduo 网络库中“连接”的抽象,它封装了 TCP 连接的生命周期、I/O 操作、缓冲区管理和事件回调,是网络通信的核心枢纽。它就像一个“TCP 连接的代理”,你不再直接操作 fd,而是通过 TcpConnection 来:
- 接收数据
- 发送数据
- 关闭连接
- 设置回调
✅ 1. 连接生命周期管理
- 构造:在 accept 后创建,表示一个新连接;
- 析构:连接关闭时自动清理资源;
- 状态机:维护连接状态(kConnecting → kConnected → kDisconnected);
✅ 2. I/O 事件处理(读/写)
- 绑定到一个 Channel(封装 fd 和 epoll 事件);
- 当 epoll 通知可读时,调用 handleRead();
- 当可写时,调用 handleWrite();
- 内部使用 Buffer 高效收发数据;
✅ 3. 数据收发接口
- 提供高层 send 接口,内部自动写入 outputBuffer_;
- 支持 Buffer、string、原始指针;
- 自动处理 EAGAIN,未发送完的数据留在 outputBuffer_;
✅ 4. 回调机制(事件驱动)
- TcpConnection 是事件的分发中心,它支持多种回调:
- ConnectionCallback 连接建立/关闭时调用(如日志记录)
- MessageCallback 收到数据时调用(核心业务逻辑)
- WriteCompleteCallback 数据发送完成后调用
- HighWaterMarkCallback 发送缓冲区过大时通知
- CloseCallback 连接真正关闭前调用(资源清理)
✅ 5. 缓冲区管理
- 内置两个 Buffer:
- inputBuffer_:接收数据;
- outputBuffer_:发送数据;
- 使用 readFd/writeFd 高效 I/O;
- 支持 high water mark(高水位线),防止客户端发太快导致服务器出问题;
✅ 6. 连接关闭管理
- 支持优雅关闭(shutdown()):关闭写端,等待读完数据再关闭;
- 支持强制关闭(forceClose());
- 支持延迟关闭(在事件循环中安全关闭);
头文件
#pragma once
#include"noncopyable.h"
#include"InetAddress.h"
#include"Callbacks.h"
#include"Timestamp.h"
#include"Buffer.h"
#include<memory>
#include<atomic>
#include<string>
class Channel;
class Eventloop;
class Socket;
/*
TcpServer通过Acceptor有一个新用户链接,通过accept函数拿到connfd,TcpConnection设置回调给Channel通过它再注册
到poller上,接着有事件之后就会调用回调函数
*/
class TcpConnection:noncopyable,public std::enable_shared_from_this<TcpConnection>
{
public:
TcpConnection(Eventloop *loop,
const std::string &nameArg,
int sockfd,
const InetAddress &localAddr,
const InetAddress &peerAddr);
~TcpConnection();
Eventloop* getloop() const {return loop_;}
const std::string& name() const {return name_;}
const InetAddress& localAddress() const {return localAddr_;}
const InetAddress& peerAddress() const {return peerAddr_;}
bool connected() const {return state_==kConnected;}
void send(const std::string &buf);
void shutdown();//关闭当前连接
void setConnectionCallback(const ConnectionCallback& cb){connectionCallback_=cb;}
void setMessageCallback(const MessageCallback &cb){messageCallback_=cb;}
void setWriteCompleteCallback(const WriteCompleteCallback& cb){writeCompleteCallback_=cb;}
void setHighWaterMarkCallback(const HighWaterMarkCallback& cb,size_t highWaterMark)
{highWaterMarkCallback_=cb;highWaterMark_=highWaterMark;}
void setCloseback(const CloseCallback& cb){closeCallback_=cb;}
//建立连接和销毁连接
void connectEstablished();
void connectDestroyed();
private:
enum StateE{kDisconnected ,kConnecting,kConnected,kDisConnecting};//连接状态
void setState(StateE state){state_=state;}
void handleRead(Timestamp receiveTime);
void handleWrite();
void handleClose();
void handleError();
void sendInLoop(const void* message,size_t len);//确保所有操作在同一个线程中串行执行
void shutdownInLoop();
Eventloop *loop_;//在多线程下这里绝对不是baseloop,因为TcpConnection都是在subloop中管理的
const std::string name_;
std::atomic_int state_;
bool reading_;
std::unique_ptr<Socket> socket_;
std::unique_ptr<Channel> channel_;
const InetAddress localAddr_;
const InetAddress peerAddr_;
ConnectionCallback connectionCallback_;//有新连接时候的回调
MessageCallback messageCallback_;//有读写消息时候的回调
WriteCompleteCallback writeCompleteCallback_;//消息发送完之后的回调
HighWaterMarkCallback highWaterMarkCallback_;
CloseCallback closeCallback_;
size_t highWaterMark_;//发送缓冲区的高水位线,如64MB
Buffer inputBuffer_;//接受数据的缓冲区
Buffer outputBuffer_;//发送数据的缓冲区
};
1、智能指针
enable_shared_from_this<TcpConnection>
TcpConnection继承这个类是为了在自己的类中提供一些方法,返回当前对象的一个shared_ptr强智能指针,做参数传递使用。对这里掌握不好的可以先看这一篇文章C++智能指针。
源文件
#include "TcpConnection.h"
#include "Logger.h"
#include "Socket.h"
#include "Channel.h"
#include "Eventloop.h"
#include "Eventloop.h"
#include <functional>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <strings.h>
#include <netinet/tcp.h>
static Eventloop *CheckLoopNotNull(Eventloop *loop)
{
if (loop == nullptr)
{
LOG_FATAL("%s:%s:%d main loop is null! \n", __FILE__, __FUNCTION__, __LINE__);
}
return loop;
}
TcpConnection::TcpConnection(Eventloop *loop,
const std::string &nameArg,
int sockfd,
const InetAddress &localAddr,
const InetAddress &peerAddr)
: loop_(CheckLoopNotNull(loop)), name_(nameArg), state_(kConnecting), reading_(true), socket_(new Socket(sockfd)), channel_(new Channel(loop, sockfd)), peerAddr_(peerAddr), highWaterMark_(64 * 1024 * 1024) // 64M
{
// 下面给channel设置相应的回调,poller给channel通知感兴趣的事情发生后,channel会回调相应的操作函数
channel_->setReadCallback(
std::bind(&TcpConnection::handleRead, this, std::placeholders::_1));
channel_->setWriteCallback(
std::bind(&TcpConnection::handleWrite, this));
channel_->setCloseCallback(
std::bind(&TcpConnection::handleClose, this));
channel_->setErrorCallback(
std::bind(&TcpConnection::handleError, this));
LOG_INFO("TcpConnection::ctor[%s] at fd=%d\n", name_.c_str(), sockfd);
socket_->setKeepAlive(true);
}
TcpConnection::~TcpConnection()
{
LOG_INFO("TcpConnection::dtor[%s] at fd=%d state=%d \n",
name_.c_str(), channel_->fd(), (int)state_);
}
void TcpConnection::send(const std::string &buf)
{
if (state_ == kConnected)
{
if (loop_->isInLoopThread()) //断当前线程是否是 subloop 线程
{
sendInLoop(buf.c_str(), buf.size()); //直接调用 sendInLoo
}
else
{
//如果不是,通过 runInLoop 把任务派发到 subloop 线程
loop_->runInLoop(std::bind(
&TcpConnection::sendInLoop,
this,
buf.c_str(), buf.size()));
}
}
}
void TcpConnection::sendInLoop(const void *data, size_t len)
{
ssize_t nwrote = 0;
size_t remianing = len; // 剩余长度
bool faultError = false;
// 之前调用过connection的shutdown,就不能再发送了
if (state_ == kDisconnected)
{
LOG_ERROR("disconnected ,give up writing!");
return;
}
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
{
nwrote = ::write(channel_->fd(), data, len);
if (nwrote >= 0)
{
remianing = len - nwrote;
if (remianing == 0 && writeCompleteCallback_)
{
// 既然在这里数据全部发送完成,说明不用再给channel设置epollout事件了
loop_->queueInLoop(
std::bind(writeCompleteCallback_, shared_from_this()));
}
}
else//nwrote<0表示出错,这里包含了remianing>0的情况
{
nwrote = 0;
if (errno != EWOULDBLOCK)
{
LOG_ERROR("TcpConnection:;sendInLoop");
if (errno == EPIPE || errno == ECONNRESET)
{
faultError = true;
}
}
}
}
if(!faultError&&remianing>0)
{
//目前发送缓冲区中剩余的待发送的数据的长度
size_t oldLen=outputBuffer_.readableBytes();
if(oldLen+remianing>=highWaterMark_
&&oldLen<highWaterMark_
&&highWaterMarkCallback_)
{
loop_->queueInLoop(
std::bind(highWaterMarkCallback_,shared_from_this(),oldLen+remianing)
);
}
outputBuffer_.append((char*)data+nwrote,remianing);
if(!channel_->isWriting())
{
channel_->enableReading();//这里一定要注册channel的写事件,否则poller不会给channel通知epollout
}
}
}
void TcpConnection::handleRead(Timestamp receiveTime)
{
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0)
{
// 已建立连接的用户,有可读事件发生了,调用用户传入的回调函数onMessage
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if (n == 0) // 断开了
{
handleClose();
}
else
{
errno = savedErrno;
LOG_ERROR("TcpConnection::handleRead");
handleError();
}
}
void TcpConnection::handleWrite()
{
if (channel_->isWriting())
{
int savedErrno = 0;
ssize_t n = outputBuffer_.writeFd(channel_->fd(), &savedErrno);
if (n > 0)
{
outputBuffer_.retrieve(n);
if (outputBuffer_.readableBytes() == 0) // 表示发送完成
{
channel_->disableWriting(); // 变成不可写
if (writeCompleteCallback_)
{
// 唤醒loop_对应的thread线程,执行回调
loop_->queueInLoop(
std::bind(writeCompleteCallback_, shared_from_this()));
}
if (state_ == kDisConnecting)
{
shutdownInLoop();
}
}
}
else
{
LOG_ERROR("TcpConnection::handleWrite");
}
}
else
{
LOG_ERROR("TcpConnection fd=%d is down,no more writing \n", channel_->fd());
}
}
void TcpConnection::handleClose()
{
LOG_INFO("TcpConnection::handleClose fd=%d,state-%d \n", channel_->fd(), (int)state_);
setState(kDisconnected);
channel_->disableAll();
TcpConnectionPtr connPtr(shared_from_this());
connectionCallback_(connPtr); // 执行连接关闭的回调
closeCallback_(connPtr); // 关闭连接的回调
}
void TcpConnection::handleError()
{
int optval;
socklen_t optlen = sizeof optval;
int err = 0;
if (::getsockopt(channel_->fd(), SOL_SOCKET, SO_ERROR, &optval, &optlen) < 0)
{
err = errno;
}
else
{
err = optval;
}
LOG_ERROR("TcpConnection::handleError name:%s - SO_ERROR:%d \n", name_.c_str(), err);
}
void TcpConnection::connectEstablished()
{
setState(kConnected);
//tie防止底层的channel还在运行而上层的connection已经被remove掉了,用来监视
channel_->tie(shared_from_this());
channel_->enableReading();//向poller注册channel的epollin事件
//新连接建立,执行回调
connectionCallback_(shared_from_this());
}
void TcpConnection::connectDestroyed()
{
if(state_==kConnected)
{
setState(kDisconnected);
channel_->disableAll();//把channel所有感兴趣的事件都从poller中del掉
connectionCallback_(shared_from_this());
}
channel_->remove();//把channel从poller中删除掉
}
void TcpConnection::shutdown()
{
if(state_==kConnected)
{
setState(kDisconnected);
loop_->runInLoop(
std::bind(&TcpConnection::shutdownInLoop,this)
);
}
}
void TcpConnection::shutdownInLoop()
{
if(!channel_->isWriting())//如果没有注册过写事件,说明已经把发送缓冲区的数据发送完了,考虑到这是用户调用的,可能还没发送完成
{
socket_->shutdownWrite();//关闭写端
}
}
1、构造函数前置检查防止loop为空
static Eventloop *CheckLoopNotNull(Eventloop *loop)
{
if (loop == nullptr)
{
LOG_FATAL("%s:%s:%d main loop is null! \n", __FILE__, __FUNCTION__, __LINE__);
}
return loop;
}
static:仅在本文件使用,避免命名冲突
2、核心函数sendInLoop()函数
作用:尝试把数据发出去,如果一次发不完,就把剩下的数据存到缓冲区,并监听可写事件,等 socket 能继续写时再发。
分析:
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
这里的意思是如果当前的fd没有设置关系写事件(没有在等待写事件),并且 outputBuffer_ 是空的,即用户缓冲区中没有积压的数据。那说明:这条连接现在是“空闲”的,我可以试试直接 write。
nwrote = ::write(sockfd, data, len);
这行代码是把我们的应用层数据,从用户空间拷贝到操作系统内核的 socket 发送缓冲区中。
- 如果拷贝成功,内核会负责后续的 TCP 发送(分段、重传、确认等)
- 如果缓冲区满了,write() 就会失败(返回 -1,errno = EAGAIN/EWOULDBLOCK)
这里是为了避免不必要的缓冲区拷贝;能直接发就直接发。
remianing = len - nwrote;
if (remianing == 0 && writeCompleteCallback_)
这里是看待写入的数据还有多少,如果待写入的数据为0,表明前边调用write()已经将想要写的数据全都拷贝到操作系统内核的 socket 发送缓冲区中了,且设置了数据发送完成事件的回调函数之后,只需要将回调函数加入到subloop 的待处理事件数组中就好了,不用设置channel对写事件的关心了,系统会在合适的时候将发送缓冲区的数据发送。
这是“乐观写(optimistic write)”策略:能不进缓冲区就不进,能不注册事件就不注册。
接着对于其他情况或出错做处理
nwrote = 0;
if (errno != EWOULDBLOCK)
{
LOG_ERROR("TcpConnection::sendInLoop");
if (errno == EPIPE || errno == ECONNRESET)
{
faultError = true;
}
发生错误时说明写入失败,重新将 nwrote置为0,接着判断错误
- EWOULDBLOCK:非阻塞 socket 写满,正常;
- 其他错误:如 EPIPE(管道破裂)、ECONNRESET(连接重置),标记 faultError;
if(!faultError&&remianing>0)
{
//目前发送缓冲区中剩余的待发送的数据的长度
size_t oldLen=outputBuffer_.readableBytes();
if(oldLen+remianing>=highWaterMark_
&&oldLen<highWaterMark_
&&highWaterMarkCallback_)
{
loop_->queueInLoop(
std::bind(highWaterMarkCallback_,shared_from_this(),oldLen+remianing)
);
}
outputBuffer_.append((char*)data+nwrote,remianing);
if(!channel_->isWriting())
{
channel_->enableReading();//这里一定要注册channel的写事件,否则poller不会给channel通知epollout
}
}
走到这一步说明,正在等待 EPOLLOUT,说明 outputBuffer_ 有数据正在发或者用户层缓冲区已经有数据积压。
或者说在上面的代码中,在写入socket缓冲区中时,写入一部分后缓冲区满了,还剩一部分没有写入的数据。
接着
- 如果 outputBuffer_ 中之前存在的数据与这次剩余的数据之和大于等于“高水位线”,即加完新数据后,总长度 ≥ 高水位线,这次追加后,缓冲区会“超载”,这是触发回调的前提
- 如果 加之前,总长度 < 高水位线,表明之前没超,现在才超。
- 用户是否注册了高水位回调函数,防止空指针调用
这三个条件合起来判断,这次是否是首次超过高水位线,需要通知上层进行流控,即
告诉上层:“你发的数据客户端接收太慢,缓冲区积压很多了!
等到上层调用完 highWaterMarkCallback_ 的回调函数时,一般会做如下处理(我们代码中未实现)
- 暂停从 socket 读数据(比如暂停 TcpConnection::enableReading())
- 等待 outputBuffer_ 被逐渐发完
- 当 outputBuffer_.readableBytes() 降到某个“低水位线”(low water mark)时
- 触发 lowWaterMarkCallback,通知上层“可以继续读了”
接着将我们待发送的数据放在outputBuffer_缓冲区中
最后一定要注册写事件,不然poller不会给channel通知epollout。
3、handleRead()
这个函数在 TcpConnection 收到 可读事件(EPOLLIN) 时被调用,它负责从 socket 读取数据并放在用户缓冲区inputBuffer_中,并交给上层业务处理。
void TcpConnection::handleRead(Timestamp receiveTime)
{
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);//从 socket fd 读取数据,尽可能多地读入 inputBuffer_
if (n > 0)
{
// 已建立连接的用户,有可读事件发生了,调用用户传入的回调函数onMessage
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if (n == 0) // 断开了,对端正常关闭连接(发送了 FIN 包)
{
handleClose();
}
else//处理真正的系统错误(如 ECONNRESET)
{
errno = savedErrno;
LOG_ERROR("TcpConnection::handleRead");
handleError();
}
}
4、handleWrite()
在 EPOLLOUT 事件触发时被调用,负责从 outputBuffer_ 取数据,写入 socket 发送缓冲区。
void TcpConnection::handleWrite()
{
if (channel_->isWriting())
{
int savedErrno = 0;
ssize_t n = outputBuffer_.writeFd(channel_->fd(), &savedErrno);
if (n > 0)
{
outputBuffer_.retrieve(n);
if (outputBuffer_.readableBytes() == 0) // 表示发送完成
{
channel_->disableWriting(); // 变成不可写
if (writeCompleteCallback_)
{
// 唤醒loop_对应的thread线程,执行回调
loop_->queueInLoop(
std::bind(writeCompleteCallback_, shared_from_this()));
}
if (state_ == kDisConnecting)
{
shutdownInLoop();
}
}
}
else
{
LOG_ERROR("TcpConnection::handleWrite");
}
}
else
{
LOG_ERROR("TcpConnection fd=%d is down,no more writing \n", channel_->fd());
}
}
5、handleClose()
在连接被对端关闭(read() == 0)或发生错误时被调用,它标志着一个 TCP 连接的正式结束
void TcpConnection::handleClose()
{
LOG_INFO("TcpConnection::handleClose fd=%d,state-%d \n", channel_->fd(), (int)state_);
setState(kDisconnected);
channel_->disableAll();//清楚所有感兴趣的事件
TcpConnectionPtr connPtr(shared_from_this());//创建一个 shared_ptr
connectionCallback_(connPtr); // 执行连接关闭的回调
closeCallback_(connPtr); // 关闭连接的回调,通知用户“连接已关闭”,可以释放资源
}
清除所有的感兴趣的事件,调用 epoll_ctl(fd, EPOLL_CTL_DEL, ...) 或 MOD 去掉所有事件,防止后续事件被触发(比如连接已关,但还有 EPOLLOUT 事件排队)。
创建一个 shared_ptr,是为了延长 TcpConnection 的生命周期,因为回调函数可能在执行过程中导致 TcpConnection 被销毁(比如用户在回调中释放资源)
这是 C++ 异步编程中的经典模式:确保回调期间对象存活
接着
- connectionCallback_:通知状态变化(“连接断了”)
- closeCallback_:执行清理工作(“请清理资源”)
6、handleError()
在 read() 或 write() 返回错误时被调用,这个接口的作用是:获取 socket 的真实错误码(SO_ERROR)
void TcpConnection::handleError()
{
int optval;
socklen_t optlen = sizeof optval;
int err = 0;
if (::getsockopt(channel_->fd(), SOL_SOCKET, SO_ERROR, &optval, &optlen) < 0)//调用 getsockopt() 获取 socket 的待定错误(pending error)
{
err = errno;
}
else
{
err = optval;
}
LOG_ERROR("TcpConnection::handleError name:%s - SO_ERROR:%d \n", name_.c_str(), err);
}
感谢阅读!