当前位置: 首页 > news >正文

当 PyIceberg 和 DuckDB 遇见 AWS S3 Tables:打造 Serverless 数据湖“开源梦幻组合”

引言

在一些大数据分析场景比如电商大数据营销中,我们需要快速分析存储海量用户行为数据(如浏览、加购、下单),以进行用户行为分析,优化营销策略。传统方法依赖 Spark/Presto 集群或 Redshift 查询 S3 上的 Parquet/ORC 文件,这对于需要快速迭代、按需执行的分析来说,成本高、运维复杂且响应不够敏捷。

本文将介绍一种现代化的 Serverless 解决方案:利用 S3 Tables(内置优化的 Apache Iceberg 支持)作为存储基础,并结合 PyIceberg 的便捷性与 DuckDB 的高性能嵌入式分析能力,直接在 AWS Lambda 等环境中实现对 S3 数据的低成本、高效率即时查询,彻底摆脱集群运维的负担,加速您的用户行为分析。关键工具及技术点:

  1. S3 Tables:在 S3 上为表格数据(采用内建优化的 Apache Iceberg 格式)设计的、具备自动性能优化的智能对象存储。
  2. Lambda:提供按需运行代码的无服务器计算能力
  3. PyIceberg:Iceberg 官方开源项目,提供简洁的 Python API 来操作 Iceberg 表
  4. DuckDB:高性能嵌入式分析引擎,支持 Iceberg rest catalog 接口

核心实践

使用 PyIceberg 创建和插入 S3 Tables

首先,安装 python 依赖

pip install pyiceberg[s3fs, pyarrow]

创建表和插入表的核心代码如下,通过 pyiceberg 对接 S3 Tables 的 rest catalog api 来实现 catalog 的获取,从而实现表的创建、列出,以及数据的插入等操作。

from pyiceberg.catalog import load_catalog
import pyarrow as pa
rest_catalog = load_catalog("catalog_name",**{"type": "rest","warehouse": "arn:aws:s3tables:us-west-2:${awsAccountId}:bucket/testtable","uri": "https://s3tables.us-west-2.amazonaws.com/iceberg","rest.sigv4-enabled": "true","rest.signing-name": "s3tables","rest.signing-region": "us-west-2","py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"}
)
# 新建namespace
rest_catalog.create_namespace("namespace_example")
# 新建表
rest_catalog.create_table("namespace_example.test_table",schema=pa.schema([("id", pa.int32()),("data", pa.string()),])
)
# 打印表列表
tables_list = rest_catalog.list_tables("namespace_example")
print(tables_list)
# 获取表对象
table = rest_catalog.load_table("namespace_example.test_table")
df = pa.Table.from_pylist([{"id": 303, "data": 'test insert icb'}], schema=table.schema().as_arrow()
)
#插入表
table.append(df)
# 读取表并打印
for row in table.scan().to_arrow().to_pylist():
print(row)

可以先通过本地配置 AWS CLI 权限然后运行代码进行测试,然后通过 docker 的方式部署 Lambda。

参考 Dockerfile:

FROM public.ecr.aws/lambda/python:3.12
COPY requirements.txt ${LAMBDA_TASK_ROOT}
RUN pip install -r requirements.txt
COPY lambda_function.py ${LAMBDA_TASK_ROOT}
CMD [ "lambda_function.handler" ]

使用 DuckDB 在 S3 Tables 进行复杂数据分析查询

这里使用 1.2.1 版本的 DuckDB,通过 pip install duckdb==1.2.1 来安装,DuckDB 最新的夜间版本插件支持了 Apache Iceberg REST 目录,而 S3 Tables 也有 REST 目录接口。可以通过在 Lambda 上部署 DuckDB 来读取查询分析 S3 Tables 里面的数据。也可以把 DuckDB 嵌入到您的应用程序中直接查询 S3 Tables。

DuckDB 的 Lambda 实现代码如下,结合了 boto3 的 S3 Tables 客户端,通过 api 把 S3 Tables 里面的桶加载到 DuckDB 的 catalog 中,后续就可以直接通过 sql 来进行查询了。Lambda 的入口函数接收 sql,然后返回 sql 的执行结果,示例 sql: Select * from bucketname.namespace.tablename 就可以直接查询出对应桶里面的表的数据了,需要注意的是在 DuckDB 里面一般通过 DETACH 和 ATTACH 来获取最新的 catalog 表元数据。

import os
import duckdb
import boto3
os.environ['HOME'] = '/tmp'
con = duckdb.connect(database=':memory:', config={'memory_limit': '9GB','worker_threads': 5,'temp_directory':'/tmp/file/overmem'})
# 验证设置
con.execute("""
FORCE INSTALL aws FROM core_nightly;
FORCE INSTALL httpfs FROM core_nightly;
FORCE INSTALL iceberg FROM core_nightly;
CREATE SECRET (TYPE s3,PROVIDER credential_chain
);
""")
s3tables = boto3.client('s3tables')
table_buckets = s3tables.list_table_buckets(maxBuckets=1000)['tableBuckets']
def handler(event, context):for table_bucket in table_buckets:name = table_bucket['name']arn = table_bucket['arn']try:con.execute(f"DETACH {name};")except:passcon.execute(f"""ATTACH '{arn}' AS {name} (TYPE iceberg,ENDPOINT_TYPE s3_tables);""")sql = event.get("sql")try:result = con.execute(sql).fetchall()return {"statusCode": 200,"result": result}except Exception as e:return {"statusCode": 500,"error": str(e)}

Dockerfile 可以参考插入部分的 Dockerfile,通过镜像部署到 Lambda,并设置好对应的 IAM 角色权限以及 Lambda 的超时以及内存设置。这里代码通过 duckdb.connect(database=’:memory:’, config={‘memory_limit’: ‘9GB’,’worker_threads’: 5,’temp_directory’:’/tmp/file/overmem’})来设置最大内存的使用和工作的线程数,这个可以根据实际的需要来调整

数据分析实践

测试数据集:电商用户行为数据,总量 13 亿数据,字段如下:

user_id               STRING       ‘用户ID(非真实ID),经抽样&字段脱敏处理后得到’

item_id               STRING       ‘商品ID(非真实ID),经抽样&字段脱敏处理后得到’

item_category    STRING       ‘商品类别ID(非真实ID),经抽样&字段脱敏处理后得到’

behavior_type    STRING       ‘用户对商品的行为类型,包括浏览、收藏、加购物车、购买,pv,fav,cart,buy)’

behavior_time    STRING       ‘行为时间,精确到小时级别’

测试 sql:用户行为数据漏斗分析

WITH user_behavior_counts AS (SELECTuser_id,SUM(CASE WHEN behavior_type = 'pv' THEN 1 ELSE 0 END) AS view_count,SUM(CASE WHEN behavior_type = 'fav' THEN 1 ELSE 0 END) AS favorite_count,SUM(CASE WHEN behavior_type = 'cart' THEN 1 ELSE 0 END) AS cart_count,SUM(CASE WHEN behavior_type = 'buy' THEN 1 ELSE 0 END) AS purchase_countFROM testtable.testdb.commerce_shoppingGROUP BY user_id
),
funnel_stages AS (SELECTCOUNT(DISTINCT user_id) AS total_users,COUNT(DISTINCT CASE WHEN view_count > 0 THEN user_id END) AS users_with_views,COUNT(DISTINCT CASE WHEN favorite_count > 0 THEN user_id END) AS users_with_favorites,COUNT(DISTINCT CASE WHEN cart_count > 0 THEN user_id END) AS users_with_cart_adds,COUNT(DISTINCT CASE WHEN purchase_count > 0 THEN user_id END) AS users_with_purchasesFROM user_behavior_counts
)
SELECTtotal_users,users_with_views,users_with_favorites,users_with_cart_adds,users_with_purchases,ROUND(100.0 * users_with_views / total_users, 2) AS view_rate,ROUND(100.0 * users_with_favorites / users_with_views, 2) AS view_to_favorite_rate,ROUND(100.0 * users_with_cart_adds / users_with_favorites, 2) AS favorite_to_cart_rate,ROUND(100.0 * users_with_purchases / users_with_cart_adds, 2) AS cart_to_purchase_rate,ROUND(100.0 * users_with_purchases / total_users, 2) AS overall_conversion_rate
FROM funnel_stages;

Lambda 测试结果:消耗内存 1934M

用时:37s

关键优势

将 PyIceberg 和 DuckDB 运行在 AWS Lambda 上来访问 S3 上的 Iceberg 表,这种 Serverless 数据湖模式主要的优势如下:

  • 低门槛:主要依赖 python 和 sql,这两种是数据开发领域最常见的技能,大大降低了学习和使用的门槛,同时基础设施 0 依赖且易于部署,不需要投入基础设施运维。
  • 高性价比:Lambda 按实际计算时间付费且自动伸缩,而 S3 的存储成本也较为低廉。加上 DuckDB 高性能的特性,这意味着更短的 Lambda 执行时间,进一步降低成本。
  • 开源与灵活性:核心组件 Apache Iceberg、DuckDB 和 PyIceberg 均为广泛应用的开源项目。受益于活跃的开源社区支持,可以获得持续的功能更新、问题修复和丰富的学习资源。

典型应用场景

  • 低成本海量分析负载对于需要控制成本,但仍需进行有效数据分析的场景,如中小型企业或特定项目预算有限的情况。
  • 非频繁或突发性查询如定期的报表生成、临时的业务数据洞察、偶尔进行的数据探索等,这些场景下按需付费的 Lambda + DuckDB 极具优势。
  • 事件驱动的数据处理由 S3 事件触发 Lambda (PyIceberg) 进行数据验证、转换和加载到 Iceberg 表,后续可由另一个 Lambda (DuckDB) 进行即时查询或聚合。
  • 交互式查询接口后端通过 API Gateway 暴露一个 Lambda (DuckDB) 端点,为内部用户或应用提供一个低成本的 SQL 查询接口,用于查询特定范围的数据。
  • 快速原型验证在开发或研究阶段,快速搭建一个功能完备的数据湖查询环境,用于验证想法或进行小规模实验。

相关文章:

  • git管理忽略指定路径/临时文件
  • QT6 源(101)篇一:阅读与注释 QPlainTextEdit,其继承于QAbstractScrollArea,属性学习与测试
  • Python零基础入门到高手8.4节: 元组与列表的区别
  • 以项目的方式学QT开发C++(二)——超详细讲解(120000多字详细讲解,涵盖qt大量知识)逐步更新!
  • c++,windows,多线程编程详细介绍
  • 【歌曲结构】2:小节与歌曲结构信息整合
  • 模糊综合评价模型建立
  • salesforce如何导出所有字段
  • 人工神经网络(ANN)模型
  • ctfshow权限维持
  • FPGA: UltraScale+ bitslip实现(方案+代码)
  • AI Agent开发第67课-彻底消除RAG知识库幻觉(1)-文档分块全技巧
  • 通义灵码 2.5.4 版【**编程智能体**】初体验
  • How to initialize Linux Mint 22.1 Desktop Operating System
  • Python训练打卡Day22
  • 【Java实战】IO流(转换流,打印流,数据流,序列化流)
  • 网络原理 | 网络基础概念复习
  • 以项目的方式学QT开发C++(一)——超详细讲解(120000多字详细讲解,涵盖qt大量知识)逐步更新!
  • 第二十五天打卡
  • 晶振的核心参数
  • 4台肺癌手术,2名“90后”患者,这届年轻人的肺怎么了?
  • 内塔尼亚胡:以军将在未来几天“全力进入”加沙
  • 孙简任吉林省副省长
  • 老人将房产遗赠给外孙,三个女儿却认为遗嘱应无效,法院判了
  • 科普|“小”耳洞也会引发“大”疙瘩,如何治疗和预防?
  • 金俊峰已跨区任上海金山区委副书记