当前位置: 首页 > news >正文

20250330 Pyflink with Paimon

1. 数据湖

2. 本地安装Pyflink和Paimon

  • 必须安装Python 3.11

  • Pip install

python -m pip install apache-flink==1.20.1

  • 需要手动加入这两个jar

测试代码:

import argparse
import logging
import sys
import time

from pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,DataTypes, FormatDescriptor)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtf

logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(message)s")


t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set("parallelism.default", "1")


my_source_ddl = """
    create table source (
        word STRING
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '{}'
    )
""".format("D:/PyCharmWorkspace/PaimonLakeP02/src/basic/words.csv")

print(t_env.execute_sql(my_source_ddl))


print(t_env.execute_sql("""
    -- if you're trying out Paimon in a distributed environment,
    -- the warehouse path should be set to a shared file system, such as HDFS or OSS
    CREATE CATALOG paimon_catalog WITH (
        'type'='paimon',
        'warehouse'='D:/PyCharmWorkspace/PaimonLakeP02/src/basic/paimon'
    );
"""))

print(t_env.execute_sql("""
    USE CATALOG paimon_catalog;
"""))

print(t_env.execute_sql("""
    -- create a word count table
    CREATE TABLE IF NOT EXISTS word_count (
        word STRING PRIMARY KEY NOT ENFORCED,
        cnt BIGINT
    );
"""))




# r=t_env.sql_query("select word from source").execute()
# r.print()
stmt_set = t_env.create_statement_set()
r=stmt_set.add_insert_sql("""
insert into word_count select word, count(1) as `count` from default_catalog.default_database.source group by word
""")
stmt_set.execute().wait()

# print sink
t_env.sql_query("select 'another print', * from word_count").execute().print()

print("===========end==============")

启动成功:

Paimon的本地数据文件:

参考资料

安装指引:Quick Start | Apache Paimon​​​​​​

下载包:Downloads | Apache Flink

附录:遇到的问题

1. Flink2.0 + Paimon

//没有配套的Paimon库,会报Sink不匹配异常。

相关文章:

  • rocky linux下载软件
  • [Python]代理批量检测延迟工具
  • ChatTTS:对话场景语音合成的开源新星
  • 《量子密码》
  • CSS3学习教程,从入门到精通,CSS3 图像属性知识点及案例代码(16)
  • 如何获取thinkphp的所有发行版本
  • 运行时智控:PanLang 开发者指南(一)运行时系统核心模块实现——PanLang 原型全栈设计方案与实验性探索5
  • 单链表中的递归算法
  • 编译原理——词法分析
  • GD32 ARM单片机开发规范检查清单 GD32嵌入式C代码检查清单
  • 《TypeScript 类的艺术:高效编码指南》
  • TransformersInternLM源码阅读
  • 括弧匹配检验(信息学奥赛一本通-1354)
  • Cherry Studio搭建本地知识库,结合DeepSeek实现RAG
  • AM32-MultiRotor-ESC项目固件编译和烧录方法介绍
  • 【Spring】Spring框架介绍
  • C/C++蓝桥杯算法真题打卡(Day7)
  • 生物化学笔记:医学免疫学原理03 超抗原+丝裂原+疫苗佐剂
  • BLE 4.0开发技术全景解析
  • [自动化] 【八爪鱼】使用八爪鱼实现CSDN文章自动阅读脚本
  • 车展之战:国产狂飙、外资反扑、智驾变辅助
  • 中国强镇密码丨洪泽湖畔的蒋坝,如何打破古镇刻板印象
  • 应急管理部派出工作组赴山西太原小区爆炸现场指导救援处置
  • 国新办发布《关于新冠疫情防控与病毒溯源的中方行动和立场》白皮书
  • 融创服务全面退出彰泰服务集团:约8.26亿元出售广西彰泰融创智慧80%股权
  • 连演三场,歌剧《义勇军进行曲》在上海西岸大剧院上演