当前位置: 首页 > news >正文

使用 libpq 的 COPY 协议维护自定义 PG 到 PG 连接

1. 引言

最近在我的开发工作中,需要在主节点上的 PG 后端与备节点上的其他 PG 后端之间维护一个自定义连接,以传输除现有 walsender/walreceiver 连接所传输的 WAL 数据之外的自定义数据。当然,我可以简单地创建一个新的独立后端并自己维护一个 socket 连接来传输自定义数据。从技术上讲,这是可行的,但也会带来一些问题。自定义连接的持久性、用户安全性、数据加密都需要额外处理。那么,为什么不直接使用 libpq 来为我们处理这些问题呢?本文将分享基于 PG14 使用 libpq COPY 协议维护自定义数据连接的经验。

2. 在 libpqwalreceiver.c 中创建新例程

该文件位于 src/backend/replication/libpqwalreceiver,并被编译为一个共享库 (.so),其中包含与 libpq 库相关的例程,这些例程允许 PG 后端在不将 libpq 编译到后端代码中的情况下使用 libpq。当 PG 后端进程需要使用 libpq 时,它需要先使用 load_file() 调用加载共享库。

我的需求很简单,我需要的新例程是 connectsendrecv,类似于普通的 socket 交互。我这里没有定义 close 函数,因为我希望只要主节点和备节点在运行,连接就保持持久。当其中一个节点退出时,连接会在检测到对端断开后自动终止。

3. 连接例程

与用于复制的 libpqrcv_connect 例程不同,我的情况简单得多。我只需要让备节点连接到主节点,因此我可以简单地重用备节点的 primary_conninfo 配置参数来建立连接。这将触发主节点派生一个新的后端进程来服务这个连接。代码片段如下所示:

在这里插入图片描述

我还将 libpq socket 连接设置为使用阻塞 socket,并将 asyncStatus 设置为 PGASYNC_COPY_BOTH,以表示我将进行双向数据通信。

4. 发送例程

我的发送例程与用于复制的 libpqrcv_send 例程完全相同。两者都使用 PQputCopyData 来向主节点发送数据流。为了保持一致性,我对其进行了重命名。代码片段如下:

在这里插入图片描述

5. 接收例程

与用于复制的 libpqrcv_recv 例程也非常相似,代码几乎完全相同。只是根据我的需求,连接需要是同步连接。这意味着我的备节点在等待主节点响应时会阻塞。为了使接收同步,我必须将 PQgetCopyData 的第三个参数传递为 0。因此,如果你对异步连接没有问题,这个例程的代码也可以与 libpqrcv_recv 完全相同。

在这里插入图片描述

6. 让备节点发送自定义数据

现在我们已经为自己的目的创建了 libpq 包装例程,我们可以让备节点向主节点发送一些自定义数据并等待响应。注意,我发送了一个字母‘N’,后面跟着三个示例自定义数据:100、200、300。Libpq COPY 使用字母 d 来表示 COPY 命令,而我们在这里做的是在 d 命令中包装我们自己的命令。

StringInfoData buf_blk_request;WalReceiverConn *wrconn;int len;load_file("libpqwalreceiver", false);wrconn = netbuf_connect("dbname=postgres  host=127.0.0.1 port=5550");initStringInfo(&buf_blk_request);pq_sendbyte(&buf_blk_request, 'N');pq_sendint32(&buf_blk_request, 100);pq_sendint32(&buf_blk_request, 200);pq_sendint32(&buf_blk_request, 300);pq_flush();/* Send it */netbuf_send(wrconn, buf_blk_request.data, buf_blk_request.len);/* Read the data */len = netbuf_recv(wrconn, &tmp, &fd);if (len > 0){/** Something was received from primary*/}

7. 让主节点接收自定义数据

当我们使用上述方法发送数据时,主节点的 postmaster 主循环将接收数据并决定如何处理。因为我们使用的是 COPY 协议,第一个字符是 d,而 src/backend/tcop/postgres.c 中已经有一个针对该字符的处理程序。因此,我们需要在 postgres.c 中的 d 处理程序下添加额外代码,以接收和处理备节点发送的数据,并在需要时提供响应。

case 'd':           /* copy data */elog(DEBUG2, "copy data request received");int op;op = pq_getmsgbyte(&input_message);if (op == 'N'){StringInfoData buf_blk_reply;int data1, data2, data3;/* receive custom data here */data1 = pq_getmsgint(&input_message, 4);data2 = pq_getmsgint(&input_message, 4);data3 = pq_getmsgint(&input_message, 4);pq_getmsgend(&input_message);/* send another custom data back to standby here */pq_beginmessage(&buf_blk_reply, 'd');pq_sendint32(&buf_blk_request, 400);pq_sendint32(&buf_blk_request, 500);pq_sendint32(&buf_blk_request, 600);pq_endmessage(&buf_blk_reply);pq_flush();}break;

8. 总结

基于 libpq COPY,我创建了一个主节点与备节点之间的独立通信通道,可用于传输自定义数据,类似于通常处理普通 socket 的方式。这一切都基于 libpq 已支持的 COPY 协议,在该协议中,我们包装了自己的数据。在上述示例中,当备节点向主节点发送 100、200、300 时,主节点能够接收这些数据并响应 400、500、600。这个简单的示例可以扩展以支持您开发中可能需要的其他功能。使用 COPY 来实现我自己的目的可能不是最优雅的方式,但它对我来说是有效的。

http://www.dtcms.com/a/329273.html

相关文章:

  • 飞算JavaAI的中间件风暴:Redis + Kafka 全链路实战
  • WMware的安装以及Ubuntu22的安装
  • 自动驾驶中安全相关机器学习功能的可靠性定义方法
  • VirtualBox中的Ubuntu共享Windows的文件夹
  • 【Excel】被保护的文档如何显示隐藏的行或列
  • 厚铜PCB在百安级电流与高温环境中的关键作用
  • 普通电脑与云电脑的区别有哪些?全面科普
  • C++ 错误记录模块实现与解析
  • Redis:是什么、能做什么?
  • uniapp跨端性能优化方案
  • 各种排序算法(一)
  • Highcharts 图表示例|面积图与堆叠图(Area Stacked Chart)——让数据趋势更有层次感
  • SODA自然美颜相机(甜盐相机国际版) v9.3.0
  • LangChain是如何实现RAG多轮问答的
  • 【算法岗面试】手撕Self-Attention、Multi-head Attention
  • 比特币持有者结构性转变 XBIT分析BTC最新价格行情市场重构
  • 微店商品数据API接口的应用||电商API接口的应用
  • 数据结构与算法-选择题
  • 公司项目用户密码加密方案推荐(兼顾安全、可靠与通用性)
  • Chaos Vantage 2.8.1 发布:实时探索与材质工作流的全新突破
  • CacheBlend:结合缓存知识融合的快速RAG大语言模型推理服务
  • 大模型推理框架vLLM 中的Prompt缓存实现原理
  • 性能优化之通俗易懂学习requestAnimationFrame和使用场景举例
  • 来伊份×养馋记:社区零售4.0模式加速渗透上海市场
  • 四、深入剖析Java程序逻辑控制:从字节码到性能优化
  • MySQL事务原理分析以及隔离与锁
  • 从人机协作到情感共鸣:智能销售机器人如何重塑零售体验
  • 基于RTSP|RTMP低延迟视频链路的多模态情绪识别系统构建与实现
  • C++ 类和对象详解(1)
  • 飞算JavaAI实现数据库交互:JPA/Hibernate + MyBatis Plus基础功能学习