当前位置: 首页 > news >正文

13 pyflink/scala 进行 csv 文件的批处理

前言

这是 最近有一个 来自于朋友的 pyflink 的使用需求  

然后 看到了 很多 pyflink 这边的和 使用 java, scala 的 api 使用上的很多差异 

这里使用的 pyflink 版本是 1.16.3 

pyflink 1.16.3 中批处理相关貌似要使用 Table API 来进行处理, datastreaming api 使用多多少少存在问题 

但是 这个如果是在 java, scala 中写一段 批处理的脚本就简单的多了 

pyflink 1.16.3 这里, 要使用 Table API 进行处理 

 

 

pyflink 1.16.3 使用 Table API 进行批处理

这里整体的过程, 也是 构建 Source, Transformation, Sink 然后进行执行 

flink-sql 会转换为 flink job 进行业务处理, sql 中就包含了 转换的处理 

from pyflink.table import EnvironmentSettings, TableEnvironmentsettings = EnvironmentSettings.new_instance().in_batch_mode().build()t_env = TableEnvironment.create(settings)
t_env.get_config().set("parallelism.default", "1")t_env.execute_sql("""CREATE TABLE mySource (country STRING,year_field STRING,sex STRING) WITH ('connector' = 'filesystem','format' = 'csv','path' = '/Users/jerry/Tmp/17_pyspark_csv/suicide_clear_3fields.csv')
""")t_env.execute_sql("""CREATE TABLE mySink (updated_country STRING,updated_year STRING,counter BIGINT) WITH ('connector' = 'filesystem','format' = 'csv','path' = '/Users/jerry/Tmp/17_pyspark_csv/output_by_flink_sql')
""")t_env.execute_sql("""INSERT INTO mySinkSELECT country as updated_country, year_field AS updated_year, count(*) as counterFROM mySourceWHERE year_field = '1987'group by country, year_field
""").wait()

 

最终执行结果如下, 实现了 数据的批处理 

 

 

使用 scala 来进行数据的批处理

可以使用大量 api, 不仅仅局限于 sql, 处理方式上面 更加抽象, 灵活一些  

可能是 程序员更加偏爱的处理方式, flink-sql 稍微简单一些, 处理的场景 也有一些局限 

package com.hx.testimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.configuration.Configuration/*** Test01WordCount** @author Jerry.X.He <970655147@qq.com>* @version 1.0* @date 2021-04-02 18:07*/
object Test04ReadCsvThenGroup {def main(args: Array[String]): Unit = {// 创建一个批处理的执行环境val conf = new Configuration()conf.setString("taskmanager.numberOfTaskSlots", "3")conf.setString("rest.bind-port", "8081")conf.setString("parallelism.default", "1")val env = ExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)// 从文件中读取数据val inputPath = "/Users/jerry/Tmp/17_pyspark_csv/suicide_clear.csv"val inputDs = env.readTextFile(inputPath)val result = inputDs.filter(line => !line.contains("year,")).map(line => {val splits = line.split("\\s*,\\s*")Person(splits(0), Integer.parseInt(splits(1)), splits(2), 1)}).filter(person => {person.year == 1987}).map(person => {(person.country, person)}).groupBy(0).reduce((v1, v2) => {v1._2.count = v1._2.count + v2._2.countv1}).map(tuple => tuple._2)// 打印输出result.print()System.in.read()}case class Person(country: String, year: Int, sex: String, var count: Int) {}}

 

 

输出结果如下 

 

 

完 

 

 

 

 

 

http://www.dtcms.com/a/499763.html

相关文章:

  • java ThreadPoolExecurtor源码解读 --- Worker
  • 20251018在ubuntu24.04下解压缩gz压缩包
  • 做赚钱的网站有哪些园林绿化
  • 静态网站开发用到的技术产品报价网
  • 【小学教辅】新版一年级上册语文第四单元课课贴 一年级语文复韵母学习资料 小学拼音考点练习电子版可下载打印|夸克网盘
  • 企业网站空间不足怎么办商标设计logo免费生成器网站
  • python 字典 列表 类比c++【python】
  • plsql developer 无法跟踪调试
  • Collections 工具类 15 个常用方法源码:sort、binarySearch、reverse、shuffle、unmodifiableXxx
  • mb与使用场景
  • 建设通网站是什么时间成立加入google广告wordpress
  • AI Coding 基础实践01 - TickTalk的MarsCode-Trae AI(Trae 插件)在Pycharm中的配置
  • [SCADE编译原理] 因果性分析原理(2001)
  • 网站建设pc指什么软件佛山新网站建设策划
  • RDEx:一种效果驱动的混合单目标优化器,自适应选择与融合多种算子与策略
  • JavaScript学习第三天:运算符
  • C++进阶之操作符重载函数operator[]:用法实例(四百三十五)
  • 《小白学随机过程》第一章:随机过程——定义和形式(附录2. 随机变量和随机过程公式解读)
  • 近代通信技术的发展
  • 实用网站的设计与实现wordpress简介
  • 如何微信做演讲视频网站Wordpress刷新CDN缓存
  • macos虚拟机-演示篇一制作可启动iso文件
  • 论坛类网站备案今天东营发生的重大新闻
  • Aspect的AOP实现
  • Orleans Stream SubscriptionId 生成机制详解
  • FMIT,一款专业的乐器调音助手
  • 医疗器械招商网站大全建一个信息 类网站
  • 不用域名推广网站开源网站后台管理系统
  • 欧司朗与日亚签署广泛的知识产权协议
  • Kotlin 与 Java 互操作中常用注解