小记Vert.x的Pipe都做了什么
注意: 本文内容于 2025-06-08 01:41:22 创建,可能不会在此平台上进行更新。如果您希望查看最新版本或更多相关内容,请访问原文地址:小记Vert.x的Pipe都做了什么。感谢您的关注与支持!
一、背景
最近我在思考一个问题。
在长连接的使用场景中,为了及时释放空闲资源,通常会配置空闲超时机制。
这种机制应用于单个连接(比如一个 TCP 或 HTTP 连接)时,自然没问题。然而,如果放在一整条通信链路中,链路上的各个节点分别配置了不同的空闲超时参数,会发生什么情况呢?
我在一次实施中就遇到了类似的情况:当请求在发送后一段时间(大约 1 分钟)再次发起时,系统就会报错。由于我负责的是链路最下游的部分,无法直接查看上游节点的配置,只能推测问题可能是由于链路中各节点的空闲超时设置不一致所致。最终,我尝试将我这边的 idleTimeout
参数调大,问题随之消失。
虽然问题得以解决,但具体成因仍然只是我的猜测,也没有权力知道全貌。因此为了验证这个猜想,我决定基于 Java 的 Vert.x 框架,模拟并分析这类链路中因空闲超时不一致而导致的问题。
首先了解TCP通信的三次握手、四次挥手。我在下面简单画一下。如何进行抓包可以参考TCP状态以及CLOSE_WAIT问题排查 - 言成言成啊
三次握手
主动建立方 被动建立方| || ------------------ SYN --------------------------> || || <--------------- SYN + ACK ----------------------- || || ------------------ ACK --------------------------> || |
连接建立成功 连接建立成功
四次挥手
主动关闭方 被动关闭方| || ------------------ FIN --------------------------> || || <------------------ ACK -------------------------- || || 等待服务器准备关闭 || || <----------------- FIN + ACK --------------------- || || ------------------ ACK --------------------------> || |
连接关闭成功 连接关闭成功
不过在实际使用时,主动关闭方会发送
FIN+ACK
给被动关闭方。这也是符合规范的。
二、实践
2.1 实现
本文代码meethigher/bug-test at vertx-network-disconnect
网络链路user --[a conn]-- proxyServer/proxyClient --[b conn]-- backendServer
,我现在有三台机器,分别用来模拟链路中的三个角色。
backendServer
: 192.168.1.223- 永不超时
proxyServer/proxyClient
: 192.168.1.103proxyServer
: 永不超时proxyClient
: 5秒空闲超时
user
: 192.168.1.105- 永不超时
- 随便使用局域网一台设备即可,只需要有
telnet
命令。执行telnet 192.168.1.103 8080
,观察5秒
之后,连接是否会被断开
backendServer
源码
NetServer backendServer = vertx.createNetServer();
backendServer.connectHandler(socket -> socket.write(String.valueOf(System.currentTimeMillis()))).listen(8888).onFailure(e -> System.exit(1));
下面记录使用Vertx中NetSocket的两种api来双向传输数据,以及超时导致的问题。
2.1.1 handler…write
proxyServer/proxyClient
关键代码
NetServer proxyServer = vertx.createNetServer();
NetClient proxyClient = vertx.createNetClient(new NetClientOptions().setIdleTimeoutUnit(TimeUnit.SECONDS).setIdleTimeout(5));
proxyServer.connectHandler(a -> {a.pause();proxyClient.connect(8888, "192.168.1.223").onFailure(e -> System.exit(1)).onSuccess(b -> {b.pause();a.handler(b::write);b.handler(a::write);a.resume();b.resume();});
}).listen(8080).onFailure(e -> System.exit(1));
现象:b连接
断开,a连接
保持
tcp抓包日志截图如下,会发现proxyServer/proxyClient
向backendServer
发送了FIN
,所以b连接
断开,但是并没有向user
发送,所以a连接
仍然保持。
2.1.2 pipeTo
proxyServer/proxyClient
关键代码
NetServer proxyServer = vertx.createNetServer();
NetClient proxyClient = vertx.createNetClient(new NetClientOptions().setIdleTimeoutUnit(TimeUnit.SECONDS).setIdleTimeout(5));
proxyServer.connectHandler(a -> {a.pause();proxyClient.connect(8888, "192.168.1.103").onFailure(e -> System.exit(1)).onSuccess(b -> {b.pause();a.pipeTo(b);b.pipeTo(a);a.resume();b.resume();});
}).listen(8080).onFailure(e -> System.exit(1));
现象:b连接
断开,a连接
也断开
tcp抓包日志截图如下,会发现proxyServer/proxyClient
向backendServer
发送了FIN
,所以b连接
断开,也向user
发送FIN
,所以a连接
也断开。
2.2 思考
2.2.1 handler…write与pipeTo的区别
为啥handler..write
与pipeTo
的结果不同呢?这就需要跟一下pipeTo
源码了。
原因在于pipeTo
内部给源头连接注册了endHandler
和exceptionHandler
,当监听到如上事件时,会默认将对端连接也进行end()
。
由于
io.vertx.core.streams.Pipe
的实现类io.vertx.core.streams.impl.PipeImpl
逻辑不复杂,跟别的模块代码也并没有强耦合,因此我们可以自己复制一份DiyPipe
出来,以供自己调试。
那么pipeTo
到底做了哪些东西呢?这个可以将其使用handler..write
来实现出来。a.pipeTo(b).onComplete(completion)
就相当于如下代码
a.resume();
a.handler(buf -> {b.write(buf);if (b.writeQueueFull()) {a.pause();b.drainHandler(t -> a.resume());}
});
a.endHandler(v -> {a.handler(null);a.endHandler(null);a.exceptionHandler(null);b.end().onComplete(completion);
});
a.exceptionHandler(e -> {a.handler(null);a.endHandler(null);a.exceptionHandler(null);b.end().onComplete(v -> completion.handle(Future.failedFuture(e)));
});
在此也提一个插曲,之前发现了一个tcp反向代理的bug
- TCP反向代理在反代HTTP短连接服务时,出现io.netty.channel.StacklessClosedChannelException · Issue #6 · meethigher/tcp-reverse-proxy
- meethigher/bug-test at vertx-tcp-proxy-closed
这个问题其实挺傻逼的,用了pipeTo这个api,连接的生命周期已经双向绑定了,而我又进行了再次绑定,进而导致关了又关的问题。
2.2.2 endHandler()/closeHandler()区别
在Vertx中,end和close主要用于区分半关闭和全关闭的状态。
以NetSocket为例,end底层调用了close,因此调用end()和调用close()的作用是一致的。
但是endHandler()和closeHandler()是严格不一样的。可以通过源码查看对应的触发时机,明显是endHandler()会比closeHandler()触发更靠前。
2.3 Promise用法示例
常规使用示例
import io.vertx.core.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.LockSupport;public class PromiseUsage {private static final Vertx vertx = Vertx.vertx();private static final Logger log = LoggerFactory.getLogger(PromiseUsage.class);public static Future<String> getFuture() {// Promise用法final Promise<String> promise = Promise.promise();vertx.setTimer(5000, id -> {if (ThreadLocalRandom.current().nextBoolean()) {promise.complete("succeed");} else {promise.fail("failed");}});return promise.future();}public static void main(String[] args) {Handler<AsyncResult<String>> completion = ar -> {if (ar.succeeded()) {log.info("test succeed");} else {log.error("test failed", ar.cause());}};getFuture().onComplete(completion);getFuture().onComplete(v -> {completion.handle(Future.failedFuture(new RuntimeException("hh")));});for (int i = 0; i < 10; i++) {getFuture().onComplete(ar -> {if (ar.succeeded()) {log.info("future completed");} else {log.error("future failed");}});}LockSupport.park();}
}