[muduo] TcpConnection | 回调交互

发布于:2025-06-23 ⋅ 阅读:(17) ⋅ 点赞:(0)

第六章:TcpConnection

在前几章中,我们已经构建了Muduo网络库的基础。

  • 我们理解EventLoop第一章:EventLoop)是单线程的事件引擎,负责管理事件。

  • 我们学习了muduo::Thread第二章:Thread)如何帮助在专用线程中运行EventLoop以实现并发

  • 我们看到了Channel第三章:Channel)如何将特定的文件描述符(如套接字)连接到EventLoop并保存该FD的事件回调

  • 我们了解了Poller第四章:Poller)如何高效地等待多个Channel的事件

  • 以及TimerQueue第五章:TimerQueue)如何处理基于时间的事件

现在我们将这些组件整合起来处理实际的网络通信。

  • 当建立TCP连接时——无论是服务器接受的连接还是客户端发起的连接——都会获得一个准备好发送和接收数据的套接字文件描述符。

    手动管理每个套接字的读写、缓冲和连接状态非常复杂。

    这正是muduo::net::TcpConnection类的职责所在。

TcpConnection解决了什么问题?

假设服务器接受了一个新的客户端连接。操作系统会提供一个全新的套接字文件描述符。数据可能随时到达这个套接字。需要:

  • 从套接字读取传入数据
  • 缓冲传入数据直到形成完整消息
  • 处理完整消息
  • 向客户端发送返回数据
  • 在网络繁忙时缓冲出站数据
  • 优雅地处理错误和连接关闭
  • 跟踪连接状态(连接中/已连接/断开中/已断开)

直接使用底层套接字调用并管理可能数千个连接的缓冲是项艰巨的任务。

TcpConnection类为单个已建立的TCP连接提供了高级的面向对象抽象。

它封装了套接字文件描述符,处理底层I/O操作(读/写),管理输入输出缓冲区,并通过回调提供了清晰的接口供插入应用逻辑。

TcpConnection视为客户端的专属"会话管理器"。一旦客户端连接,就会创建TcpConnection对象来处理与该特定客户端的所有通信直到连接关闭

TcpConnection:会话管理器

muduo::net::TcpConnection对象代表与对等端的一个活动连接。无论是连接到服务器的客户端,还是客户端连接远程服务器,都使用TcpConnection对象。

以下是muduo::net::TcpConnection的核心概念:

  1. 每连接专属每个已建立的TCP连接对应一个TcpConnection对象
  2. 单EventLoop归属:与ChannelPoller类似,TcpConnection对象属于单个EventLoop,其方法必须在该EventLoop线程中调用
  3. 管理Socket和Channel拥有底层Socket对象(封装文件描述符)和与该套接字FD关联的Channel对象Channel将FD注册到EventLoopPoller以监视读/写事件
  4. 输入输出缓冲区:包含inputBuffer_muduo::net::Buffer,后续章节详述)用于存储从套接字读取的输入数据,以及outputBuffer_用于暂存尚未完全发送的输出数据
  5. 状态机:连接经历不同状态:kConnecting(建立时短暂存在)、kConnected(准备就绪)、kDisconnecting(本地关闭中)、kDisconnected(已断开)
  6. 回调机制:应用开发者通过回调函数与TcpConnection交互。通常通过创建它的TcpServerTcpClient设置以下回调:
  • ConnectionCallback:连接状态变化时触发
  • MessageCallback:新数据到达并读入inputBuffer_触发
  • WriteCompleteCallback:输出缓冲区排空后触发
  • HighWaterMarkCallback:输出缓冲区超过阈值时触发
  • CloseCallback:内部用于管理对象生命周期

使用TcpConnection:通过回调交互

通常不直接创建TcpConnection对象

而是由TcpServer(接受新连接时)或TcpClient(连接服务器成功时)自动创建。

开发者主要通过设置回调函数TcpConnection交互。以下是muduo/net/Callbacks.h中定义的回调类型:

#include <functional>
#include <memory>

namespace muduo {
namespace net {

class Buffer;
class TcpConnection;
class Timestamp;

typedef std::shared_ptr<TcpConnection> TcpConnectionPtr;

typedef std::function<void (const TcpConnectionPtr&)> ConnectionCallback;
typedef std::function<void (const TcpConnectionPtr&,
                            Buffer*,
                            Timestamp)> MessageCallback;
typedef std::function<void (const TcpConnectionPtr&)> WriteCompleteCallback;
typedef std::function<void (const TcpConnectionPtr&, size_t)> HighWaterMarkCallback;
typedef std::function<void (const TcpConnectionPtr&)> CloseCallback;

} // namespace net
} // namespace muduo

主要用户接口方法:

class TcpConnection : noncopyable, public std::enable_shared_from_this<TcpConnection> {
public:
  void send(const void* message, int len);
  void send(const StringPiece& message);
  void send(Buffer* message);

  void shutdown();
  void forceClose();
  void forceCloseWithDelay(double seconds);

  void startRead();
  void stopRead();
  bool isReading() const;

  Buffer* inputBuffer();
  Buffer* outputBuffer();

  void setContext(const boost::any& context);
  const boost::any& getContext() const;
  boost::any* getMutableContext();

  void setConnectionCallback(const ConnectionCallback& cb);
  void setMessageCallback(const MessageCallback& cb);
  void setWriteCompleteCallback(const WriteCompleteCallback& cb);
  void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark);
};

典型回调使用示例:

void onConnection(const TcpConnectionPtr& conn) {
  if (conn->connected()) {
    LOG_INFO << "Connection UP: " << conn->name();
    conn->send("Hello!\n");
  } else {
    LOG_INFO << "Connection DOWN: " << conn->name();
  }
}

void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp receiveTime) {
  LOG_INFO << "Received " << buf->readableBytes() << " bytes";
  conn->send(buf);
}

void onWriteComplete(const TcpConnectionPtr& conn) {
    LOG_INFO << "Write complete";
}

void onHighWaterMark(const TcpConnectionPtr& conn, size_t bytes) {
    LOG_INFO << "High water mark: " << bytes;
    conn->stopRead();
}

回调

回调就像点外卖时留电话号码:
你告诉外卖平台"餐到了就打电话通知我",而不是一直盯着手机看。

这里的电话号码就是回调函数,外卖平台就是调用者。

体现

onConnection函数:
当网络连接状态变化时(比如连接成功或断开),系统会自动调用这个函数,就像外卖到了自动给你打电话。

onMessage函数:
当收到网络消息时自动触发,相当于"有快递包裹到了就通知我"的设定。

onWriteComplete函数:
当数据发送完成时自动执行,类似"外卖小哥离开后给我发短信确认"。

onHighWaterMark函数:
当数据堆积过多时触发,相当于"快递柜快满了就提醒我取件"。

特点

  1. 预先写好处理函数(回调函数
  2. 系统在特定事件发生时自动调用(外卖到了)
  3. 不需要主动去查询状态(会通过回调函数打电话)
  4. 处理逻辑完全由你定义

这种机制避免了程序不断轮询检查状态,就像不用每隔5分钟就给外卖平台打电话问"餐到了没"。


TcpConnection内部机制

通过序列图展示数据接收流程:

在这里插入图片描述

核心内部方法实现逻辑:

  1. 构造函数:绑定套接字FD,创建Channel,设置事件回调
  2. connectEstablished:完成连接建立,启用读监控
  3. handleRead:处理读事件,填充输入缓冲区,触发消息回调
  4. sendInLoop:实现高效数据发送,处理缓冲区管理
  5. handleWrite:处理写事件,管理输出缓冲区
  6. handleClose:处理连接关闭,触发相关回调

总结

功能 描述 优势
每连接对象 代表单个TCP连接 提供高级会话管理接口
EventLoop归属 严格单线程操作 简化并发控制,无需锁机制
Socket & Channel 封装套接字和事件通道 深度集成事件循环机制
双缓冲系统 inputBuffer_存储输入,outputBuffer_管理输出 高效处理流量控制和不完整数据包
状态机 明确的状态迁移(kConnecting→kConnected→kDisconnecting→kDisconnected) 清晰管理连接生命周期
回调体系 五类核心回调覆盖连接全生命周期 灵活对接业务逻辑
智能发送方法 send()系列方法自动处理缓冲和流量控制 简化应用层开发
优雅关闭机制 shutdown()保证数据完整性,forceClose()立即终止 满足不同场景需求

TcpConnection作为Muduo网络库的核心组件

通过封装底层套接字操作、管理双缓冲系统、维护状态机,并结合事件循环机制,为TCP连接管理提供了高效可靠的解决方案。

回调驱动的设计模式使得业务逻辑与网络层完美解耦,是构建高性能网络应用的基石。

第七章:Buffer将深入解析Muduo的缓冲区实现机制。


网站公告

今日签到

点亮在社区的每一天
去签到