[TcpConnection]

发布于:2025-06-30 ⋅ 阅读:(22) ⋅ 点赞:(0)

`TcpConnection`类是muduo最核心的类,这个类主要封装了一个已建立的TCP连接,以及控制该TCP连接的方法(连接建立和关闭和销毁),以及该连接发生的各种事件(读/写/错误/连接)对应的处理函数,以及这个TCP连接的服务端和客户端的套接字地址信息等。
 

成员变量

private:
	
	EventLoop*loop_;    //这里绝对不是Mainloop,因为tcpconnection都是在subloop里边管理的
	const string name_;
	atomic<int> state_;
	bool reading;
	
	unique_ptr<Socket> socket_;
	unique_ptr<Channle> channel_;
	
	const InetAddress localAddr_;
	const InetAddress peerAddr_;
	
	ConnectionCallback connectionCallback_;       // 有新连接回调                void(const TcpConnectionPtr&)
    MessageCallback messageCallback_;             //有读写消息回调   void(constTcpConnectionPtr&,Buffer*,Timestamp)
    WriteCompleteCallback writeCompleteCallback_  // 消息发送完成以后的回调       void(const TcpConnectionPtr&)
    CloseCallback closeCallbacl_;                 //void(const TcpConnectionPtr&)
    HighWaterMarkCallback highWaterMarkCallback_; //void(const TcpConnectionPtr&,size_t)
    size_t highWaterMark_;
    
	Buffer inputBuffer_;                          //接受缓冲区
    Buffer outputBuffer_;                         //发送缓冲区

	enum StateE{kDisconnected,kConnecting,kConnected,kDisconnecting}; 
  •  `loop_`该`Tcp`连接的Channel注册到了哪一个sub EventLoop上。这个loop_就是那一个sub EventLoop。
  • `highWaterMark_` 因为发送数据,应用写得快,内核发送数据慢,需要把待发送的数据写入缓冲区,且设置了水位回调,防止发送太快
  • `inputBuffer_  outputBuffer_`  输入输出缓冲区,在输出缓冲区是用于暂存那些暂时发送不出去的待发送数据。因为`Tcp`发送缓冲区是有大小限制的,假如达到了高水位线,就没办法把发送的数据通过send()直接拷贝到`Tcp`发送缓冲区,而是暂存在这个`outputBuffer_`中,等TCP发送缓冲区有空间了,触发可写事件了,再把`outputBuffer_`中的数据拷贝到`Tcp`发送缓冲区中。

重要成员函数

void TcpConnection::send(const std::string &buf) //直接引用buffer
{
    if(state_ == kConnected)
    {
        if(loop_->isInLoopThread())
        {
            //string.c_str是Borland封装的String类中的一个函数,它返回当前字符串的首字符地址。
            sendInLoop(buf.c_str(),buf.size());
        }
        else
        {
            loop_->runInLoop(std::bind(&TcpConnection::sendInLoop
                                , this
                                , buf.c_str()
                                , buf.size()
            ));
        }
    }
}
void TcpConnection::sendInLoop(const void* data, size_t len)
{
    ssize_t nwrote = 0;
    size_t remaining = len; //未发送的数据
    bool faultError = false; //记录是否产生错误

    //之前调用过connection的shutdown 不能在发送了
    if(state_ == kDisconnected)
    {
        LOG_ERROR("disconnected,give up writing!");
        return ;
    }

    //channel 第一次开始写数据,且缓冲区没有待发送数据
    if(!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
    {
        nwrote = ::write(channel_->fd(),data,len);
        if(nwrote >= 0)
        {
            remaining = len - nwrote;
            if(remaining == 0 && writeCompleteCallback_)
            {
                loop_->queueInLoop(
                    std::bind(writeCompleteCallback_,shared_from_this()));
            }
        }
        else
        {
            nwrote = 0;
            if(errno != EWOULDBLOCK) //用于非阻塞模式,不需要重新读或者写
            {
                LOG_ERROR("TcpConnection::sendInLoop");
                if(errno == EPIPE || errno == ECONNRESET) //SIGPIPE RESET
                {
                    faultError = true;
                }
            }
        }
    }
    if(!faultError && remaining > 0) 
    {
        //目前发送缓冲区剩余的待发送数据的长度
        size_t oldlen = outputBuffer_.readableBytes();
        if(oldlen + remaining >= highWaterMark_ 
            && oldlen < highWaterMark_
            && highWaterMark_)
            {
                loop_->queueInLoop(
                    std::bind(highWaterMarkCallback_,shared_from_this(),oldlen + remaining)
                );
            }
            outputBuffer_.append((char*)data + nwrote,remaining);
            if(!channel_->isWriting())
            {
                channel_->enableWriting(); //注册channel写事件,否则poller不会向channel通知epollout
            }
    }
}
  1. 发送数据要发送的数据长度是`len`,如果在`loop_`在当前的线程里面,就调用`sendInLoop`,`sendInLoop`内部实际上是调用了系统的`write`,如果一次性发送完了,就设置`writeCompleteCallback_`,表明不要再给channel设置`epollout`事件了
  2. 如果没有写完,先计算一下`oldlen`目前发送缓冲区剩余的待发送数据的长度。满足:`if(oldlen + remaining >= highWaterMark_ && oldlen < highWaterMark_&& highWaterMark_)`   就会触发高水位回调
  3.  不满足以上的话,直接写入`outputBuffer_`
  4. 剩余的数据保存到缓冲区当中,要给给`channel`注册`epollout`事件(**切记,一定要注册channel的写事件,否则`poller`不会向channel通知`epollout`**),这样`poller`发现`tcp`发送缓冲区有空间,会通知相应的`socket-channel`调用相应的`writeCallback()`回调方法,也就是调用`TcpConnection::handleWrite`,把发送缓冲区中数据全部发送出去。
void TcpConnection::handleRead(TimeStamp receiveTime)
{
    int savedErrno = 0;
    ssize_t n = inputBuffer_.readFd(channel_->fd(),&savedErrno);
    if(n > 0)
    {      
         messageCallback_(shared_from_this(),&inputBuffer_,receiveTime);
    }
    else if(n==0) //客户端断开
    {
        handleClose();
    } 
    else
    {
        errno = savedErrno;
        LOG_ERROR("TcpConnection::hanleRead");
        handleError();
    }

}

负责处理`Tcp`连接的可读事件,它会将客户端发送来的数据拷贝到用户缓冲区中`inputBuffer_`,然后再调用`connectionCallback_`保存的连接建立后的处理函数。

void TcpConnection::handleWrite()
{
    if(channel_->isWriting())
    {
        int savedErrno = 0;
        ssize_t n = outputBuffer_.writeFd(channel_->fd(),&savedErrno);
        if(n > 0)
        {
            outputBuffer_.retrieve(n); //处理了n个
            if(outputBuffer_.readableBytes() == 0) //发送完成
            {
                channel_->disableWriting(); //不可写了
                if(writeCompleteCallback_)
                {
                    //唤醒loop对应的thread线程,执行回调
                    loop_->queueInLoop(
                        std::bind(writeCompleteCallback_,shared_from_this())
                    );
                }
                if(state_ == kDisconnecting)
                {
                    shutdownInLoop();// 在当前loop中删除TcpConnection
                }
            }
        }
        else
        {
            LOG_ERROR("TcpConnection::handleWrite");
        }
    }
    else
    {
        LOG_ERROR("TcpConnection fd=%d is down, no more writing \n",channel_->fd());
    }
}
  1. 如果有可写,通过fd发送数据,直到发送完成
  2. 设置不可写,如果`writeCompleteCallback_`,唤醒`loop`对应的`thread`线程,执行回调
  3. 当前`tcp`正在断开连接,调用`shutdownInLoop`,在当前`loop`中删除`TcpConnection`
void TcpConnection::handleClose()
{
    LOG_INFO("TcpConnection::handleClose fd=%d state=%d \n",channel_->fd(),(int)state_);
    setState(kDisconnected);
    channel_->disableAll();

    TcpConnectionPtr connPtr(shared_from_this());
    connectionCallback_(connPtr); //执行连接关闭的回调
    closeCallback_(connPtr); //关闭连接的回调 TcpServer => TcpServer::removeConnection
}

处理逻辑就是将这个`TcpConnection`对象中的channel_从事件监听器中移除。然后调用`connectionCallback_`和`closeCallback_`保存的回调函数。`closeCallback_`在`TcpServer::newConnection()`为新连接新建`TcpConnection`时,已设为`TcpServer::removeConnection()`,而`removeConnection()`最终会调用`TcpConnection::connectDestroyed()`来销毁连接资源。

void TcpConnection::connectDestroyed()
{
    if(state_ == kConnected)
    {
        setState(kDisconnected);
        channel_->disableAll(); //把channel所有感兴趣的事件,从poller中del掉
        connectionCallback_(shared_from_this());
    }
    channel_->remove();//把channel从poller中删除掉
}
 //关闭连接
void TcpConnection::shutdown()
{
    if(state_ == kConnected)
    {
        setState(kDisconnecting);
        loop_->runInLoop(
            std::bind(&TcpConnection::shutdownInLoop,this)
        );
    }
}

void TcpConnection::shutdownInLoop()
{
    if(!channel_->isWriting()) //说明当前outputBuffer中的数据已经全部发送完成
    {
        socket_->shutdowmWrite(); // 关闭写端

    }
}

为什么只关闭了写端?

Muduo TcpConnection 没有提供 close,而只提供 shutdown ,这么做是为了收发数据的完整性。

TCP 是一个全双工协议,同一个文件描述符既可读又可写, shutdownWrite() 关闭了“写”方向的连接,保留了“读”方向,这称为 TCP half-close。如果直接 close(socket_fd),那么 socket_fd 就不能读或写了。

用 shutdown 而不用 close 的效果是,如果对方已经发送了数据,这些数据还“在路上”,那么 muduo 不会漏收这些数据。换句话说,muduo 在 TCP 这一层面解决了“当你打算关闭网络连接的时候,如何得知对方有没有发了一些数据而你还没有收到?”这一问题。当然,这个问题也可以在上面的协议层解决,双方商量好不再互发数据,就可以直接断开连接。

等于说 muduo 把“主动关闭连接”这件事情分成两步来做,如果要主动关闭连接,它会先关本地“写”端,等对方关闭之后,再关本地“读”端。被动关闭(muduo 在 read() 返回 0 的时候会回调 connection callback,这样客户代码就知道对方断开连接了。)

Muduo 这种关闭连接的方式对对方也有要求,那就是对方 read() 到 0 字节之后会主动关闭连接(无论 shutdownWrite() 还是 close()),一般的网络程序都会这样,不是什么问题。当然,这么做有一个潜在的安全漏洞,万一对方故意不不关,那么 muduo 的连接就一直半开着,消耗系统资源。

完整的流程是:我们发完了数据,于是 shutdownWrite,发送 TCP FIN 分节,对方会读到 0 字节,然后对方通常会关闭连接,这样 muduo 会读到 0 字节,然后 muduo 关闭连接。

另外,如果有必要,对方可以在 read() 返回 0 之后继续发送数据,这是直接利用了 half-close TCP 连接。muduo 会收到这些数据,通过 message callback 通知客户代码。

那么 muduo 什么时候真正 close socket 呢?在 TcpConnection 对象析构的时候。TcpConnection 持有一个 Socket 对象,Socket 是一个 RAII handler,它的析构函数会 close(sockfd_)。这样,如果发生 TcpConnection 对象泄漏,那么我们从 /proc/pid/fd/ 就能找到没有关闭的文件描述符,便于查错。

muduo 在 read() 返回 0 的时候会回调 connection callback,然后把 TcpConnection 的引用计数减一,如果 TcpConnection 的引用计数降到零,它就会析构了。


网站公告

今日签到

点亮在社区的每一天
去签到