当前位置: 首页 > news >正文

Flink Python API 提交 Socket 数据源的 WordCount 作业

下面详细讲解使用 Flink Python API 提交 Socket 数据源的 WordCount 作业的完整流程:

一、环境准备

  1. 安装 Flink

    • 下载并解压 Flink 安装包(建议 1.14 + 版本,对 Python 支持更完善)
    • 确保环境变量FLINK_HOME已配置
  2. 安装 PyFlink

    pip install apache-flink
    
  3. 准备工具

    • 需要netcat工具用于创建 Socket 数据源
    • 确保 Java 8 + 已安装(Flink 运行依赖)

二、编写 WordCount 代码

创建wordcount_socket.py文件,实现从 Socket 读取数据并进行词频统计:

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.descriptors import Schema, Socket, Formatdef word_count():# 1. 创建执行环境env = StreamExecutionEnvironment.get_execution_environment()env.set_parallelism(1)  # 设置并行度为1,方便测试t_env = StreamTableEnvironment.create(env)# 2. 定义Socket数据源t_env.connect(Socket().version("1.0").host("localhost").port(9999)).with_format(Format().type("csv").field_delimiter("\n")  # 按行分割.field("line", DataTypes.STRING())).with_schema(Schema().field("line", DataTypes.STRING())).create_temporary_table("socket_source")# 3. 注册结果表(打印到控制台)t_env.execute_sql("""CREATE TEMPORARY TABLE result_sink (word STRING,count BIGINT) WITH ('connector' = 'print')""")# 4. 执行WordCount逻辑t_env.execute_sql("""INSERT INTO result_sinkSELECT word, COUNT(1) as countFROM (SELECT explode(split(line, ' ')) as wordFROM socket_source)GROUP BY word""")if __name__ == '__main__':word_count()

三、启动必要服务

  1. 启动 Flink 集群

    $FLINK_HOME/bin/start-cluster.sh
    
     

    访问 http://localhost:8081 可查看 Flink Web UI

  2. 启动 Socket 服务器
    打开新终端,使用 netcat 启动 Socket 服务:

    nc -lk 9999
    
     

    (-l:监听模式,-k:保持连接,9999:端口号)

四、提交 Flink Python 作业

使用 Flink 的 Python 提交脚本提交作业:

$FLINK_HOME/bin/flink run -py wordcount_socket.py

提交成功后,会显示作业 ID,同时在 Flink Web UI 上能看到运行中的作业。

五、测试作业

  1. 在启动 netcat 的终端中输入文本,例如:

    hello flink
    hello python
    flink python
    hello world
    
  2. 查看 Flink 作业输出

    • 方式 1:在 Flink Web UI 中,进入对应作业的 Task Managers 页面,查看 Stdout
    • 方式 2:查看 Flink 的日志文件:$FLINK_HOME/log/flink-*-taskexecutor-*.out

    输出结果会类似:

    +I[hello, 1]
    +I[flink, 1]
    +I[hello, 2]
    +I[python, 1]
    +I[flink, 2]
    +I[python, 2]
    +I[hello, 3]
    +I[world, 1]
    

六、停止服务

  1. 停止作业:在 Flink Web UI 中点击作业,选择 "Cancel"
  2. 停止 Socket 服务器:按Ctrl+C
  3. 停止 Flink 集群:
    $FLINK_HOME/bin/stop-cluster.sh
    

关键说明

  1. Socket 数据源特性:Socket 是流式数据源,会持续监听指定端口接收数据
  2. 动态结果:WordCount 会对输入的单词进行持续计数,相同单词会更新计数
  3. 并行度设置:示例中设置并行度为 1,实际生产可根据集群资源调整
  4. 异常处理:如果 Socket 服务中断,作业会失败,需要重启 Socket 服务和作业

通过以上步骤,你可以完整体验 Flink Python 作业从编写、提交到运行的整个流程。

http://www.dtcms.com/a/327459.html

相关文章:

  • TDengine 可观测性最佳实践
  • 荣耀手机无法连接win11电脑,错误消息:“无法在此设备上加载驱动程序 (hn_usbccgpfilter.sys)。”解决方案
  • Flink运行时的实现细节
  • 嵌入式Linux进程管理面试题大全(含详细解析)
  • 基于热成像摄像头检测蚊子的可行性研究
  • iOS 签名证书全生命周期实战,从开发到上架的多阶段应用
  • 《Qwen2.5-VL 》论文精读笔记
  • 网络协议之TCP和UDP
  • 【iOS】Block基础知识和底层探索
  • Model Context Protocol (MCP)标准化应用程序向大型语言模型 (LLM) 提供上下文协议
  • 如何通过 5 种方法轻松格式化 USB 驱动器
  • Kubernetes 资源管理全解析:从基础到企业级实践
  • MyBatis-Plus——SQL注入器
  • 华清远见25072班C语言学习day7
  • 《算法导论》第 21 章-用于不相交集合的数据结构
  • 01-Ansible 自动化介绍与使用
  • 【数据结构】二叉树结构与相关实现
  • .NET MAUI框架编译Android应用流程
  • 服务降级方式
  • Python实现Amazon Redshift数据库元数据提取类
  • 分布式事务Seata使用不当引发的全局锁问题
  • 解锁Java线程池:从原理到实战的深度剖析
  • 无人机三维路径规划
  • 前端基础知识NodeJS系列 - 06( Node 中的 Stream 的理解?应用场景?)
  • 如何实现PostgreSQL的高可用性,包括主流的复制方案、负载均衡方法以及故障转移流程?
  • TensorBoard的使用 小土堆pytorch记录
  • 数据类型 list
  • 小白挑战一周上架元服务——ArkUI04
  • 前端最新Vue2+Vue3基础入门到实战项目全套教程,自学前端vue就选黑马程序员,一套全通关!笔记
  • Java面试宝典:G1垃圾收集器上