当前位置: 首页 > news >正文

利用postgres_proto和pgproto测试postgres协议访问duckdb

postgres_proto是一个python编写的模拟接收postgres协议请求的服务器。
pgproto是一个c编写的发送postgres协议请求的客户端。

参考examples中csv_db.py例子,改出一个duck.py。其实并没有真正把sql语句传进去,而是输出select version()的结果,因为query是SQL解析后的字符串(如代码底部注释所示),并不是原始SQL,不能传给DuckDB。
而且不能在__init__函数中用self.connection取得duckdb连接,因为它没有execute方法。客户端会收到类似的错误消息。

from postgres_proto.socket_handler import PostgresRequestHandler
from postgres_proto.flow import PostgresError, catch_all_as_postgres_error_context
import duckdbclass DuckDBRequestHandler(PostgresRequestHandler):def __init__(self, *args, **kwargs):super().__init__(*args, **kwargs)#self.connection = duckdb.connect('demo.ddb')def query_tables(self, stmt_info):with catch_all_as_postgres_error_context():query = str(stmt_info)connection = duckdb.connect('demo.ddb')result = connection.execute("select version()")columns = [desc[0] for desc in result.description]rows_data = result.fetchall()# 转换为JSON格式的字典列表rows = []for row in rows_data:row_dict = {}for i, value in enumerate(row):row_dict[columns[i]] = valuerows.append(row_dict)print(rows, columns)return rows, columnsdef list_tables(self):with catch_all_as_postgres_error_context():result = self.connection.execute("SHOW TABLES")return [row[0] for row in result.fetchall()]def describe_table(self, table_name):with catch_all_as_postgres_error_context():result = self.connection.execute(f"DESCRIBE {table_name}")return [row[0] for row in result.fetchall()]if __name__ == '__main__':from postgres_proto.server import start_server, cli_arg_parserstart_server(DuckDBRequestHandler, **vars(cli_arg_parser.parse_args()))#Serving on 127.0.0.1:55432
#'Q'	"SELECT * FROM foods"
#====query======= SelectStmt(columns=[SelectColumnExpr(name='*', alias=None)], tables=[FromTableExpr(name='foods', schema=None, alias=None)])#pgproto -h 127.0.0.1 -p 55432 -f /par/pgduckpo.txt
#FE=> Query (query="SELECT * FROM foods")
#<= BE ErrorResponse(S ERROR C 0 M 'socket' object has no attribute 'execute' )
#<= BE ReadyForQuery(I)

在read.c中添加如下函数,并在数据描述行的处理中添加调用,这个函数是将postgresql协议中的规格说明传给DeepSeek生成后修改的, 对于这种明确的需求,他给出的程序还能保证正确性。
有一个小问题,他给的示例程序用数组保存消息,所以用了parse_row_description(sample_data, sizeof(sample_data), &row_desc);,而我的buf是指针,sizeof(buf)等于8,就不能读取正确的值了。要改为parse_row_description(buf, len - sizeof(int), &row_desc);

typedef struct {char* field_name;uint32_t table_oid;uint16_t column_attnum;uint32_t type_oid;int16_t type_size;int32_t type_modifier;int16_t format_code;
} FieldDescription;typedef struct {uint16_t field_count;FieldDescription* fields;
} RowDescription;void parse_row_description(const unsigned char* data, size_t length, RowDescription* row_desc) {if (length < 4 + 2) return; // 至少要有消息长度和字段数量int pos = 0;// 跳过消息类型('T')和消息长度//pos += 5; // 1字节类型 + 4字节长度// 读取字段数量row_desc->field_count = (data[pos] << 8) | data[pos+1];pos += 2;// 分配字段描述内存row_desc->fields = malloc(row_desc->field_count * sizeof(FieldDescription));// 解析每个字段for (int i = 0; i < row_desc->field_count; i++) {FieldDescription* field = &row_desc->fields[i];// 读取字段名称(以\0结尾的字符串)int name_len = 0;while (pos + name_len < length && data[pos + name_len] != '\0') {name_len++;}printf("name_len=%d,length=%d",name_len,length);field->field_name = malloc(name_len + 1);memcpy(field->field_name, data + pos, name_len);field->field_name[name_len] = '\0';pos += name_len + 1; // 跳过字符串和终止符// 读取表OID(4字节)if (pos + 4 <= length) {field->table_oid = (data[pos] << 24) | (data[pos+1] << 16) | (data[pos+2] << 8) | data[pos+3];pos += 4;}// 读取列属性编号(2字节)if (pos + 2 <= length) {field->column_attnum = (data[pos] << 8) | data[pos+1];pos += 2;}// 读取数据类型OID(4字节)if (pos + 4 <= length) {field->type_oid = (data[pos] << 24) | (data[pos+1] << 16) | (data[pos+2] << 8) | data[pos+3];pos += 4;}// 读取数据类型大小(2字节)if (pos + 2 <= length) {field->type_size = (data[pos] << 8) | data[pos+1];pos += 2;}// 读取类型修饰符(4字节)if (pos + 4 <= length) {field->type_modifier = (data[pos] << 24) | (data[pos+1] << 16) | (data[pos+2] << 8) | data[pos+3];pos += 4;}// 读取格式代码(2字节)if (pos + 2 <= length) {field->format_code = (data[pos] << 8) | data[pos+1];pos += 2;}}
}void free_row_description(RowDescription* row_desc) {for (int i = 0; i < row_desc->field_count; i++) {free(row_desc->fields[i].field_name);}free(row_desc->fields);row_desc->field_count = 0;row_desc->fields = NULL;
}void print_field_names(const RowDescription* row_desc) {printf("字段数量: %d\n", row_desc->field_count);for (int i = 0; i < row_desc->field_count; i++) {printf("字段 %d: %s\n", i + 1, row_desc->fields[i].field_name);}
}...
void read_until_ready_for_query(PGconn *conn, int timeout)
{
...case 'T':	/* Row Description */fprintf(stderr, "<= BE RowDescription\n");len = read_int32(conn);buf = read_bytes(len - sizeof(int), conn);RowDescription row_desc;parse_row_description(buf, len - sizeof(int), &row_desc);printf("解析出的字段名:\n");print_field_names(&row_desc);// 清理内存free_row_description(&row_desc);pg_free(buf);								//read_and_discard(conn);break;
...

测试DuckDB中的数据
服务端

root@6ae32a5ffcde:/par/postgres-proto/examples# PYTHONPATH=.. python3 duck2.py
Serving on 127.0.0.1:55432
[{'"version"()': 'v1.4.0'}] ['"version"()']

客户端

/par/pgproto# src/pgproto -h 127.0.0.1 -p 55432 -f /par/pgpo.txt
FE=> Query (query="SELECT * FROM csv")
<= BE RowDescription
name_len=11,length=32解析出的字段名:
字段数量: 1
字段 1: "version"()
<= BE DataRow
v1.4.0
<= BE CommandComplete(SELECT)
<= BE ReadyForQuery(I)
FE=> Parse(stmt="S1", query="BEGIN")
FE=> Bind(stmt="S1", portal="")
FE=> Execute(portal="")
FE=> Close(stmt="S1")
FE=> Parse(stmt="foo", query="SELECT 1")
FE=> Bind(stmt="foo", portal="myportal")
FE=> Execute(portal="myportal")
FE=> Parse(stmt="S2", query="COMMIT")
FE=> Bind(stmt="S2", portal="")
FE=> Execute(portal="")
FE=> Close(stmt="S2")
FE=> Sync
<= BE ParseComplete
<= BE BindComplete
<= BE CommandComplete(BEGIN)
<= BE CloseComplete
<= BE ParseComplete
<= BE BindComplete
read_it: EOF detected

协议的字母含义参阅postgresql文档。上述自定义函数的结构来自以下说明。

RowDescription (B) 
字节1('T') 标识消息为行描述。Int32 消息内容(包括自身)的长度(以字节为单位)。Int16 指定一行中的字段数量(可以为零)。然后,对于每个字段,都有以下内容字符串 以\0结尾 字段名称。Int32 如果该字段可以标识为特定表的列,则为该表的对象 ID;否则为零。Int16 如果该字段可以标识为特定表的列,则为该列的属性编号;否则为零。Int32 字段数据类型的对象 ID。Int16 数据类型大小(请参阅 pg_type.typlen)。请注意,负值表示可变宽度类型。Int32 类型修饰符(请参阅 pg_attribute.atttypmod)。修饰符的含义是类型特定的。Int16 用于该字段的格式代码。目前将为零(文本)或一(二进制)。

暂时只考虑字符串类型。

postgresql的psql客户端也能访问此服务端, 注意如果选择常数select 1,它没有读取DuckDB中的数据,而select *会读取。

psql  -h 127.0.0.1 -p 55432
psql (15.13 (Debian 15.13-0+deb12u1), server 130000)
WARNING: psql major version 15, server major version 130000.Some psql features might not work.
Type "help" for help.root=> select 1 from t;1
---(1 row)root=> select * from t;"version"()
-------------v1.4.0
(1 row)

文章转载自:

http://1BbwRCci.zwgrf.cn
http://AQyuo3Pi.zwgrf.cn
http://iYm4gUXo.zwgrf.cn
http://fuBErkr7.zwgrf.cn
http://AUjtaerq.zwgrf.cn
http://MATL0Ys7.zwgrf.cn
http://MPrQ5DvA.zwgrf.cn
http://5FFzUatl.zwgrf.cn
http://UCny0rpJ.zwgrf.cn
http://v3rj99Nw.zwgrf.cn
http://TXM8OFYO.zwgrf.cn
http://pMAcXcxV.zwgrf.cn
http://oKLz87kp.zwgrf.cn
http://rMox9Lk0.zwgrf.cn
http://wUWGCjdr.zwgrf.cn
http://VAC1rVQd.zwgrf.cn
http://SFSEzYOI.zwgrf.cn
http://6gCxmTXb.zwgrf.cn
http://eFmNa0x6.zwgrf.cn
http://xXKRKDO8.zwgrf.cn
http://EvJJDTbF.zwgrf.cn
http://pm47MmPa.zwgrf.cn
http://FwpYcEzd.zwgrf.cn
http://khRQm0Ic.zwgrf.cn
http://BwnX83Ak.zwgrf.cn
http://za9SjxmO.zwgrf.cn
http://ypCP1aGq.zwgrf.cn
http://V5kY36TO.zwgrf.cn
http://v8TZIIfZ.zwgrf.cn
http://zhX1zRK2.zwgrf.cn
http://www.dtcms.com/a/386667.html

相关文章:

  • 拼多多-----anti_content逆向分析
  • 【一文了解】Unity的协程(Coroutine)与线程(Thread)
  • 贪心算法在网络入侵检测(NID)中的应用
  • 数据搬家后如何处理旧 iPhone
  • [react native招聘]
  • IDE工具RAD Studio 13 Florence重磅发布:64 位 IDE + AI 组件全面升级!
  • session存储
  • Another Redis Desktop Manager 的 SCAN 使用问题与风险分析
  • MATLAB绘制一个新颖的混沌图像(新四翼混沌系统)
  • AI起名工具
  • typeScript 装饰器
  • 【算法磨剑:用 C++ 思考的艺术・单源最短路进阶】Bellman-Ford 与 SPFA 算法模板精讲,突破负权边场景
  • 单元测试:驱动模块与桩模块在自顶向下和自底向上的策略中的作用
  • SpringBoot MVC 快速入门
  • Nature Communications 北京大学联合德国马普所在触觉传感器方面取得进展,实现机器人指尖超分辨率力感知
  • 解决一次 “Failed to load model because protobuf parsing failed”:从现象到根因与修复
  • 从ppm到ppb:全面解读浓度单位转换的诀窍
  • 贪心算法应用:霍夫曼编码详解
  • NLP Subword 之 BBPE(Byte-level BPE) 算法原理
  • 【nodejs】Windows7系统下如何安装nodejs16以上版本
  • Part05 数学
  • 每天五分钟深度学习:深层神经网络的优势
  • PCGrad解决多任务冲突
  • 第十一章:游戏玩法和屏幕特效-Gameplay and ScreenEffects《Unity Shaders and Effets Cookbook》
  • Choerodon UI V1.6.7发布!为 H-ZERO 开发注入新动能
  • 科教共融,具创未来!节卡助力第十届浦东新区机器人创新应用及技能竞赛圆满举行
  • 食品包装 AI 视觉检测技术:原理、优势与数据应用解析
  • 【深度学习计算机视觉】05:多尺度目标检测之FPN架构详解与PyTorch实战
  • 从工业革命到人工智能:深度学习的演进与核心概念解析
  • [Emacs list使用及配置]