小记Vert.x的Pipe都做了什么

发布于:2025-06-12 ⋅ 阅读:(17) ⋅ 点赞:(0)

注意: 本文内容于 2025-06-08 01:41:22 创建,可能不会在此平台上进行更新。如果您希望查看最新版本或更多相关内容,请访问原文地址:小记Vert.x的Pipe都做了什么。感谢您的关注与支持!

一、背景

最近我在思考一个问题。

在长连接的使用场景中,为了及时释放空闲资源,通常会配置空闲超时机制。

这种机制应用于单个连接(比如一个 TCP 或 HTTP 连接)时,自然没问题。然而,如果放在一整条通信链路中,链路上的各个节点分别配置了不同的空闲超时参数,会发生什么情况呢?

我在一次实施中就遇到了类似的情况:当请求在发送后一段时间(大约 1 分钟)再次发起时,系统就会报错。由于我负责的是链路最下游的部分,无法直接查看上游节点的配置,只能推测问题可能是由于链路中各节点的空闲超时设置不一致所致。最终,我尝试将我这边的 idleTimeout 参数调大,问题随之消失。

虽然问题得以解决,但具体成因仍然只是我的猜测,也没有权力知道全貌。因此为了验证这个猜想,我决定基于 Java 的 Vert.x 框架,模拟并分析这类链路中因空闲超时不一致而导致的问题。

首先了解TCP通信的三次握手、四次挥手。我在下面简单画一下。如何进行抓包可以参考TCP状态以及CLOSE_WAIT问题排查 - 言成言成啊

三次握手

主动建立方                                              被动建立方
   |                                                    |
   | ------------------ SYN --------------------------> |
   |                                                    |
   | <--------------- SYN + ACK ----------------------- |
   |                                                    |
   | ------------------ ACK --------------------------> |
   |                                                    |
连接建立成功                                     连接建立成功

四次挥手

主动关闭方                                             被动关闭方
   |                                                    |
   | ------------------ FIN --------------------------> |
   |                                                    |
   | <------------------ ACK -------------------------- |
   |                                                    |
   |              等待服务器准备关闭                     |
   |                                                    |
   | <----------------- FIN + ACK --------------------- |
   |                                                    |
   | ------------------ ACK --------------------------> |
   |                                                    |
连接关闭成功                                     连接关闭成功

不过在实际使用时,主动关闭方会发送FIN+ACK给被动关闭方。这也是符合规范的。

二、实践

2.1 实现

本文代码meethigher/bug-test at vertx-network-disconnect

网络链路user --[a conn]-- proxyServer/proxyClient --[b conn]-- backendServer,我现在有三台机器,分别用来模拟链路中的三个角色。

  • backendServer: 192.168.1.223
    • 永不超时
  • proxyServer/proxyClient: 192.168.1.103
    • proxyServer: 永不超时
    • proxyClient: 5秒空闲超时
  • user: 192.168.1.105
    • 永不超时
    • 随便使用局域网一台设备即可,只需要有telnet命令。执行telnet 192.168.1.103 8080,观察5秒之后,连接是否会被断开

backendServer源码

NetServer backendServer = vertx.createNetServer();
backendServer.connectHandler(socket -> socket.write(String.valueOf(System.currentTimeMillis())))
        .listen(8888)
        .onFailure(e -> System.exit(1));

下面记录使用Vertx中NetSocket的两种api来双向传输数据,以及超时导致的问题。

2.1.1 handler…write

proxyServer/proxyClient关键代码

NetServer proxyServer = vertx.createNetServer();
NetClient proxyClient = vertx.createNetClient(new NetClientOptions()
        .setIdleTimeoutUnit(TimeUnit.SECONDS)
        .setIdleTimeout(5));
proxyServer.connectHandler(a -> {
    a.pause();
    proxyClient.connect(8888, "192.168.1.223").onFailure(e -> System.exit(1))
            .onSuccess(b -> {
                b.pause();
        
                a.handler(b::write);
                b.handler(a::write);
        
                a.resume();
                b.resume();
            });
}).listen(8080).onFailure(e -> System.exit(1));

现象:b连接断开,a连接保持

tcp抓包日志截图如下,会发现proxyServer/proxyClientbackendServer发送了FIN,所以b连接断开,但是并没有向user发送,所以a连接仍然保持。

2.1.2 pipeTo

proxyServer/proxyClient关键代码

NetServer proxyServer = vertx.createNetServer();
NetClient proxyClient = vertx.createNetClient(new NetClientOptions()
        .setIdleTimeoutUnit(TimeUnit.SECONDS)
        .setIdleTimeout(5));
proxyServer.connectHandler(a -> {
    a.pause();
    proxyClient.connect(8888, "192.168.1.103").onFailure(e -> System.exit(1))
            .onSuccess(b -> {
                b.pause();
        
                a.pipeTo(b);
                b.pipeTo(a);


                a.resume();
                b.resume();
            });
}).listen(8080).onFailure(e -> System.exit(1));

现象:b连接断开,a连接也断开

tcp抓包日志截图如下,会发现proxyServer/proxyClientbackendServer发送了FIN,所以b连接断开,也向user发送FIN,所以a连接也断开。

2.2 思考

2.2.1 handler…write与pipeTo的区别

为啥handler..writepipeTo的结果不同呢?这就需要跟一下pipeTo源码了。

原因在于pipeTo内部给源头连接注册了endHandlerexceptionHandler,当监听到如上事件时,会默认将对端连接也进行end()

由于io.vertx.core.streams.Pipe的实现类io.vertx.core.streams.impl.PipeImpl逻辑不复杂,跟别的模块代码也并没有强耦合,因此我们可以自己复制一份DiyPipe出来,以供自己调试。

那么pipeTo到底做了哪些东西呢?这个可以将其使用handler..write来实现出来。a.pipeTo(b).onComplete(completion)就相当于如下代码

a.resume();
a.handler(buf -> {
    b.write(buf);
    if (b.writeQueueFull()) {
        a.pause();
        b.drainHandler(t -> a.resume());
    }
});
a.endHandler(v -> {
    a.handler(null);
    a.endHandler(null);
    a.exceptionHandler(null);
    b.end().onComplete(completion);
});
a.exceptionHandler(e -> {
    a.handler(null);
    a.endHandler(null);
    a.exceptionHandler(null);
    b.end().onComplete(v -> completion.handle(Future.failedFuture(e)));
});

在此也提一个插曲,之前发现了一个tcp反向代理的bug

这个问题其实挺傻逼的,用了pipeTo这个api,连接的生命周期已经双向绑定了,而我又进行了再次绑定,进而导致关了又关的问题。

2.2.2 endHandler()/closeHandler()区别

在Vertx中,end和close主要用于区分半关闭和全关闭的状态。

以NetSocket为例,end底层调用了close,因此调用end()和调用close()的作用是一致的。

但是endHandler()和closeHandler()是严格不一样的。可以通过源码查看对应的触发时机,明显是endHandler()会比closeHandler()触发更靠前。

2.3 Promise用法示例

常规使用示例

import io.vertx.core.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.LockSupport;

public class PromiseUsage {

    private static final Vertx vertx = Vertx.vertx();
    private static final Logger log = LoggerFactory.getLogger(PromiseUsage.class);

    public static Future<String> getFuture() {
        // Promise用法
        final Promise<String> promise = Promise.promise();
        vertx.setTimer(5000, id -> {
            if (ThreadLocalRandom.current().nextBoolean()) {
                promise.complete("succeed");
            } else {
                promise.fail("failed");
            }
        });
        return promise.future();
    }

    public static void main(String[] args) {
        Handler<AsyncResult<String>> completion = ar -> {
            if (ar.succeeded()) {
                log.info("test succeed");
            } else {
                log.error("test failed", ar.cause());
            }
        };
        getFuture().onComplete(completion);
        getFuture().onComplete(v -> {
            completion.handle(Future.failedFuture(new RuntimeException("hh")));
        });

        for (int i = 0; i < 10; i++) {
            getFuture().onComplete(ar -> {
                if (ar.succeeded()) {
                    log.info("future completed");
                } else {
                    log.error("future failed");
                }
            });
        }

        LockSupport.park();
    }
}

网站公告

今日签到

点亮在社区的每一天
去签到