当前位置: 首页 > news >正文

小记Vert.x的Pipe都做了什么

注意: 本文内容于 2025-06-08 01:41:22 创建,可能不会在此平台上进行更新。如果您希望查看最新版本或更多相关内容,请访问原文地址:小记Vert.x的Pipe都做了什么。感谢您的关注与支持!

一、背景

最近我在思考一个问题。

在长连接的使用场景中,为了及时释放空闲资源,通常会配置空闲超时机制。

这种机制应用于单个连接(比如一个 TCP 或 HTTP 连接)时,自然没问题。然而,如果放在一整条通信链路中,链路上的各个节点分别配置了不同的空闲超时参数,会发生什么情况呢?

我在一次实施中就遇到了类似的情况:当请求在发送后一段时间(大约 1 分钟)再次发起时,系统就会报错。由于我负责的是链路最下游的部分,无法直接查看上游节点的配置,只能推测问题可能是由于链路中各节点的空闲超时设置不一致所致。最终,我尝试将我这边的 idleTimeout 参数调大,问题随之消失。

虽然问题得以解决,但具体成因仍然只是我的猜测,也没有权力知道全貌。因此为了验证这个猜想,我决定基于 Java 的 Vert.x 框架,模拟并分析这类链路中因空闲超时不一致而导致的问题。

首先了解TCP通信的三次握手、四次挥手。我在下面简单画一下。如何进行抓包可以参考TCP状态以及CLOSE_WAIT问题排查 - 言成言成啊

三次握手

主动建立方                                              被动建立方|                                                    || ------------------ SYN --------------------------> ||                                                    || <--------------- SYN + ACK ----------------------- ||                                                    || ------------------ ACK --------------------------> ||                                                    |
连接建立成功                                     连接建立成功

四次挥手

主动关闭方                                             被动关闭方|                                                    || ------------------ FIN --------------------------> ||                                                    || <------------------ ACK -------------------------- ||                                                    ||              等待服务器准备关闭                     ||                                                    || <----------------- FIN + ACK --------------------- ||                                                    || ------------------ ACK --------------------------> ||                                                    |
连接关闭成功                                     连接关闭成功

不过在实际使用时,主动关闭方会发送FIN+ACK给被动关闭方。这也是符合规范的。

二、实践

2.1 实现

本文代码meethigher/bug-test at vertx-network-disconnect

网络链路user --[a conn]-- proxyServer/proxyClient --[b conn]-- backendServer,我现在有三台机器,分别用来模拟链路中的三个角色。

  • backendServer: 192.168.1.223
    • 永不超时
  • proxyServer/proxyClient: 192.168.1.103
    • proxyServer: 永不超时
    • proxyClient: 5秒空闲超时
  • user: 192.168.1.105
    • 永不超时
    • 随便使用局域网一台设备即可,只需要有telnet命令。执行telnet 192.168.1.103 8080,观察5秒之后,连接是否会被断开

backendServer源码

NetServer backendServer = vertx.createNetServer();
backendServer.connectHandler(socket -> socket.write(String.valueOf(System.currentTimeMillis()))).listen(8888).onFailure(e -> System.exit(1));

下面记录使用Vertx中NetSocket的两种api来双向传输数据,以及超时导致的问题。

2.1.1 handler…write

proxyServer/proxyClient关键代码

NetServer proxyServer = vertx.createNetServer();
NetClient proxyClient = vertx.createNetClient(new NetClientOptions().setIdleTimeoutUnit(TimeUnit.SECONDS).setIdleTimeout(5));
proxyServer.connectHandler(a -> {a.pause();proxyClient.connect(8888, "192.168.1.223").onFailure(e -> System.exit(1)).onSuccess(b -> {b.pause();a.handler(b::write);b.handler(a::write);a.resume();b.resume();});
}).listen(8080).onFailure(e -> System.exit(1));

现象:b连接断开,a连接保持

tcp抓包日志截图如下,会发现proxyServer/proxyClientbackendServer发送了FIN,所以b连接断开,但是并没有向user发送,所以a连接仍然保持。

2.1.2 pipeTo

proxyServer/proxyClient关键代码

NetServer proxyServer = vertx.createNetServer();
NetClient proxyClient = vertx.createNetClient(new NetClientOptions().setIdleTimeoutUnit(TimeUnit.SECONDS).setIdleTimeout(5));
proxyServer.connectHandler(a -> {a.pause();proxyClient.connect(8888, "192.168.1.103").onFailure(e -> System.exit(1)).onSuccess(b -> {b.pause();a.pipeTo(b);b.pipeTo(a);a.resume();b.resume();});
}).listen(8080).onFailure(e -> System.exit(1));

现象:b连接断开,a连接也断开

tcp抓包日志截图如下,会发现proxyServer/proxyClientbackendServer发送了FIN,所以b连接断开,也向user发送FIN,所以a连接也断开。

2.2 思考

2.2.1 handler…write与pipeTo的区别

为啥handler..writepipeTo的结果不同呢?这就需要跟一下pipeTo源码了。

原因在于pipeTo内部给源头连接注册了endHandlerexceptionHandler,当监听到如上事件时,会默认将对端连接也进行end()

由于io.vertx.core.streams.Pipe的实现类io.vertx.core.streams.impl.PipeImpl逻辑不复杂,跟别的模块代码也并没有强耦合,因此我们可以自己复制一份DiyPipe出来,以供自己调试。

那么pipeTo到底做了哪些东西呢?这个可以将其使用handler..write来实现出来。a.pipeTo(b).onComplete(completion)就相当于如下代码

a.resume();
a.handler(buf -> {b.write(buf);if (b.writeQueueFull()) {a.pause();b.drainHandler(t -> a.resume());}
});
a.endHandler(v -> {a.handler(null);a.endHandler(null);a.exceptionHandler(null);b.end().onComplete(completion);
});
a.exceptionHandler(e -> {a.handler(null);a.endHandler(null);a.exceptionHandler(null);b.end().onComplete(v -> completion.handle(Future.failedFuture(e)));
});

在此也提一个插曲,之前发现了一个tcp反向代理的bug

  • TCP反向代理在反代HTTP短连接服务时,出现io.netty.channel.StacklessClosedChannelException · Issue #6 · meethigher/tcp-reverse-proxy
  • meethigher/bug-test at vertx-tcp-proxy-closed

这个问题其实挺傻逼的,用了pipeTo这个api,连接的生命周期已经双向绑定了,而我又进行了再次绑定,进而导致关了又关的问题。

2.2.2 endHandler()/closeHandler()区别

在Vertx中,end和close主要用于区分半关闭和全关闭的状态。

以NetSocket为例,end底层调用了close,因此调用end()和调用close()的作用是一致的。

但是endHandler()和closeHandler()是严格不一样的。可以通过源码查看对应的触发时机,明显是endHandler()会比closeHandler()触发更靠前。

2.3 Promise用法示例

常规使用示例

import io.vertx.core.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.LockSupport;public class PromiseUsage {private static final Vertx vertx = Vertx.vertx();private static final Logger log = LoggerFactory.getLogger(PromiseUsage.class);public static Future<String> getFuture() {// Promise用法final Promise<String> promise = Promise.promise();vertx.setTimer(5000, id -> {if (ThreadLocalRandom.current().nextBoolean()) {promise.complete("succeed");} else {promise.fail("failed");}});return promise.future();}public static void main(String[] args) {Handler<AsyncResult<String>> completion = ar -> {if (ar.succeeded()) {log.info("test succeed");} else {log.error("test failed", ar.cause());}};getFuture().onComplete(completion);getFuture().onComplete(v -> {completion.handle(Future.failedFuture(new RuntimeException("hh")));});for (int i = 0; i < 10; i++) {getFuture().onComplete(ar -> {if (ar.succeeded()) {log.info("future completed");} else {log.error("future failed");}});}LockSupport.park();}
}

相关文章:

  • 【GPT模型训练】第二课:张量与秩:从数学本质到深度学习的基础概念解析
  • 自动化立体仓库堆垛机控制系统STEP7 OB1功能块
  • 贝叶斯定理与医学分析(t检验场景)
  • 【证书】2025公益课,人工智能训练师-高级,知识点与题库(橙点同学)
  • Redis持久化策略:RDB与AOF详解
  • 【刷题模板】链表、堆栈
  • 【Vue3】(三)vue3中的pinia状态管理、组件通信
  • 【教学类-53-02】20250607自助餐餐盘教学版(配餐+自助餐)
  • 【razor】x264 在 的intra-refresh和IDR插帧
  • c++对halcon的动态链接库dll封装及调用(细细讲)
  • LLMs 系列科普文(3)
  • 深入探索CDC:实时数据同步利器
  • 227.2018年蓝桥杯国赛 - 交换次数(中等)- 贪心
  • 手动实现C#ArrayList容器
  • yaklang 中的各种 fuzztag 标签及其用法
  • SOC-ESP32S3部分:36-适配自己的板卡
  • 【python深度学习】Day 48 PyTorch基本数据类型与操作
  • MySql读写分离部署(一主一从,双主双从,Mycat)
  • 用于机器学习的 Podman 简介:简化 MLOps 工作流程
  • javaSE复习(7)
  • 网站规划的原则有哪些/b2b网站排名
  • 互联网网站建设公司/百度seo点击排名优化
  • 建设企业网站电话是多少/正规网站建设服务
  • 网站建设副业/优化网站的软件下载
  • linux php网站部署/深圳网络seo推广
  • 描写做网站专业的句子/泰安seo网络公司