20250330 Pyflink with Paimon
1. 数据湖
2. 本地安装Pyflink和Paimon
- 必须安装Python 3.11
- Pip install
python -m pip install apache-flink==1.20.1
- 需要手动加入这两个jar
测试代码:
import argparse
import logging
import sys
import time
from pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,DataTypes, FormatDescriptor)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtf
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(message)s")
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set("parallelism.default", "1")
my_source_ddl = """
create table source (
word STRING
) with (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '{}'
)
""".format("D:/PyCharmWorkspace/PaimonLakeP02/src/basic/words.csv")
print(t_env.execute_sql(my_source_ddl))
print(t_env.execute_sql("""
-- if you're trying out Paimon in a distributed environment,
-- the warehouse path should be set to a shared file system, such as HDFS or OSS
CREATE CATALOG paimon_catalog WITH (
'type'='paimon',
'warehouse'='D:/PyCharmWorkspace/PaimonLakeP02/src/basic/paimon'
);
"""))
print(t_env.execute_sql("""
USE CATALOG paimon_catalog;
"""))
print(t_env.execute_sql("""
-- create a word count table
CREATE TABLE IF NOT EXISTS word_count (
word STRING PRIMARY KEY NOT ENFORCED,
cnt BIGINT
);
"""))
# r=t_env.sql_query("select word from source").execute()
# r.print()
stmt_set = t_env.create_statement_set()
r=stmt_set.add_insert_sql("""
insert into word_count select word, count(1) as `count` from default_catalog.default_database.source group by word
""")
stmt_set.execute().wait()
# print sink
t_env.sql_query("select 'another print', * from word_count").execute().print()
print("===========end==============")
启动成功:
Paimon的本地数据文件:
参考资料
安装指引:Quick Start | Apache Paimon
下载包:Downloads | Apache Flink
附录:遇到的问题
1. Flink2.0 + Paimon
//没有配套的Paimon库,会报Sink不匹配异常。