当前位置: 首页 > news >正文

Flink流处理实战:实时对账与双流连接

目录

1. 环境设置

2. 创建两个流

3. 使用 connect 和 CoMapFunction

4. 实时对账场景

5. 使用 KeyedCoProcessFunction 进行对账

6. 执行任务

拓展

总结


package transformplusimport org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.{CoMapFunction, KeyedCoProcessFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector/**** @PROJECT_NAME: flink1.13* @PACKAGE_NAME: transformplus* @author: 赵嘉盟-HONOR* @data: 2025-05-05 10:54* @DESCRIPTION**/
object Connect {def main(args: Array[String]): Unit = {val env=StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val stream1=env.fromElements(1,2,3)val stream2=env.fromElements(1.1,2.2,3.3)stream1.connect(stream2).map(new CoMapFunction[Int,Double,String] {override def map1(in1: Int): String = s"Int: $in1"override def map2(in2: Double): String = s"Double: $in2"})//TODO 实时对账//1、来自APP的支付日志流(order_id,status,timestamp)val appStream=env.fromElements(("order_1","success",1000L),("order_2","success",2000L)).assignAscendingTimestamps(_._3)//2、来自第三方支付平台的日志流(order_id,status,platform_id,timestamp)val thirdpartyStream = env.fromElements(("order_1", "success","wechat", 3000L),("order_3", "success","alipay", 4000L)).assignAscendingTimestamps(_._4)appStream.connect(thirdpartyStream).keyBy(_._1,_._1).process(new KeyedCoProcessFunction[String, (String,String,Long), (String,String,String,Long), String] {var appEvent:ValueState[(String,String,Long)]=_var thirdpartyEvent:ValueState[(String,String,String,Long)]=_override def open(parameters: Configuration): Unit = {appEvent=getRuntimeContext.getState(new ValueStateDescriptor[(String,String,Long)]("app-event",classOf[(String,String,Long)]))thirdpartyEvent=getRuntimeContext.getState(new ValueStateDescriptor[(String,String,String,Long)]("thirdparty-event",classOf[(String,String,String,Long)]))}override def processElement1(in1: (String, String, Long), context: KeyedCoProcessFunction[String, (String, String, Long), (String, String, String, Long), String]#Context, collector: Collector[String]): Unit = {if(thirdpartyEvent.value() != null){collector.collect(s"${in1._1} 对账成功")thirdpartyEvent.clear()} else {context.timerService().registerEventTimeTimer(in1._3+5000L)appEvent.update(in1)}}override def processElement2(in2: (String, String, String, Long), context: KeyedCoProcessFunction[String, (String, String, Long), (String, String, String, Long), String]#Context, collector: Collector[String]): Unit = {if (appEvent.value() != null) {collector.collect(s"${in2._1} 对账成功")appEvent.clear()} else {context.timerService().registerEventTimeTimer(in2._4 + 5000L)thirdpartyEvent.update(in2)}}override def onTimer(timestamp: Long, ctx: KeyedCoProcessFunction[String, (String, String, Long), (String, String, String, Long), String]#OnTimerContext, out: Collector[String]): Unit = {//判断状态是否为空,如果不为空,说明另一条流对应数据没来if(appEvent.value() != null) out.collect(s"${appEvent.value()._1} 对账失败,第三方支付事件未到")if(thirdpartyEvent.value() != null) out.collect(s"${thirdpartyEvent.value()._1} 对账失败,app支付事件未到")appEvent.clear()thirdpartyEvent.clear()}}).print()env.execute("Connect")}
}

这段代码是一个使用 Apache Flink 进行流处理的示例,主要展示了如何使用 connect 操作符将两个流连接起来,并通过 CoMapFunction 和 KeyedCoProcessFunction 对这两个流进行处理。下面是对代码的详细解释和拓展:

1. 环境设置

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
  • StreamExecutionEnvironment 是 Flink 流处理程序的入口。
  • setParallelism(1) 设置并行度为 1,意味着所有的操作都会在单个线程中执行,通常用于调试和测试。

2. 创建两个流

val stream1 = env.fromElements(1, 2, 3)
val stream2 = env.fromElements(1.1, 2.2, 3.3)
  • fromElements 方法用于从给定的元素集合中创建一个流。
  • stream1 是一个包含整数的流,stream2 是一个包含双精度浮点数的流。

3. 使用 connect 和 CoMapFunction

stream1.connect(stream2).map(new CoMapFunction[Int, Double, String] {override def map1(in1: Int): String = s"Int: $in1"override def map2(in2: Double): String = s"Double: $in2"
})
  • connect 操作符将两个流连接在一起,但不会合并它们。它返回一个 ConnectedStreams 对象。
  • CoMapFunction 是一个函数接口,用于对连接的两个流进行映射操作。map1 处理第一个流的元素,map2 处理第二个流的元素。
  • 在这个例子中,map1 将整数转换为字符串,map2 将双精度浮点数转换为字符串。

4. 实时对账场景

val appStream = env.fromElements(("order_1", "success", 1000L),("order_2", "success", 2000L)
).assignAscendingTimestamps(_._3)val thirdpartyStream = env.fromElements(("order_1", "success", "wechat", 3000L),("order_3", "success", "alipay", 4000L)
).assignAscendingTimestamps(_._4)
  • appStream 和 thirdpartyStream 分别表示来自 APP 和第三方支付平台的支付日志流。
  • assignAscendingTimestamps 方法用于为流中的元素分配时间戳,以便后续基于时间的操作(如窗口、定时器等)能够正确执行。

5. 使用 KeyedCoProcessFunction 进行对账

appStream.connect(thirdpartyStream).keyBy(_._1, _._1).process(new KeyedCoProcessFunction[String, (String, String, Long), (String, String, String, Long), String] {var appEvent: ValueState[(String, String, Long)] = _var thirdpartyEvent: ValueState[(String, String, String, Long)] = _override def open(parameters: Configuration): Unit = {appEvent = getRuntimeContext.getState(new ValueStateDescriptor[(String, String, Long)]("app-event", classOf[(String, String, Long)]))thirdpartyEvent = getRuntimeContext.getState(new ValueStateDescriptor[(String, String, String, Long)]("thirdparty-event", classOf[(String, String, String, Long)]))}override def processElement1(in1: (String, String, Long), context: KeyedCoProcessFunction[String, (String, String, Long), (String, String, String, Long), String]#Context, collector: Collector[String]): Unit = {if (thirdpartyEvent.value() != null) {collector.collect(s"${in1._1} 对账成功")thirdpartyEvent.clear()} else {context.timerService().registerEventTimeTimer(in1._3 + 5000L)appEvent.update(in1)}}override def processElement2(in2: (String, String, String, Long), context: KeyedCoProcessFunction[String, (String, String, Long), (String, String, String, Long), String]#Context, collector: Collector[String]): Unit = {if (appEvent.value() != null) {collector.collect(s"${in2._1} 对账成功")appEvent.clear()} else {context.timerService().registerEventTimeTimer(in2._4 + 5000L)thirdpartyEvent.update(in2)}}override def onTimer(timestamp: Long, ctx: KeyedCoProcessFunction[String, (String, String, Long), (String, String, String, Long), String]#OnTimerContext, out: Collector[String]): Unit = {if (appEvent.value() != null) out.collect(s"${appEvent.value()._1} 对账失败,第三方支付事件未到")if (thirdpartyEvent.value() != null) out.collect(s"${thirdpartyEvent.value()._1} 对账失败,app支付事件未到")appEvent.clear()thirdpartyEvent.clear()}}).print()
  • keyBy(_._1, _._1) 根据订单 ID 对两个流进行分组,确保相同订单的事件会被发送到同一个处理函数中。
  • KeyedCoProcessFunction 是一个用于处理两个键控流的函数。它允许你分别处理来自两个流的元素,并且可以访问 Flink 的状态管理和定时器功能。
  • processElement1 和 processElement2 分别处理来自 appStream 和 thirdpartyStream 的元素。如果另一个流的事件已经到达,则输出对账成功;否则,注册一个定时器,并在 5 秒后检查是否对账成功。
  • onTimer 方法在定时器触发时执行,用于处理对账失败的情况。

6. 执行任务

env.execute("Connect")
  • execute 方法启动 Flink 作业的执行。

拓展

  1. 状态管理ValueState 用于存储和访问键控状态。在这个例子中,appEvent 和 thirdpartyEvent 分别存储来自 APP 和第三方支付平台的事件。
  2. 定时器registerEventTimeTimer 用于注册基于事件时间的定时器。定时器可以在未来的某个时间点触发,用于处理超时或延迟的事件。
  3. 对账逻辑:这个例子展示了一个简单的对账逻辑,实际应用中可能需要更复杂的逻辑,如处理部分成功、重试机制等。
  4. 并行度:在实际生产环境中,通常会将并行度设置为大于 1 的值,以充分利用集群资源。

总结

这段代码展示了如何使用 Flink 处理两个流,并通过状态管理和定时器实现实时对账功能。通过 connect 和 KeyedCoProcessFunction,可以灵活地处理来自不同流的事件,并实现复杂的业务逻辑。

相关文章:

  • inode 与 block 概念详解
  • 常见算法题目3 -反转字符串
  • 《P3435 [POI 2006] OKR-Periods of Words》
  • Tabular Editor为PowerBI度量值建立一级或多级文件夹的两种方法
  • SDC命令详解:使用set_logic_dc命令进行约束
  • OKX交易平台有哪些优势引领行业变革?——XBIT平台动态追踪
  • 探索C++对象模型:流插入提取探讨、const修饰、初始化列表(下篇)
  • 图像去雾算法研究报告
  • 记录python在excel中添加一列新的列
  • 【场景分析】基于概率距离快速削减法的风光场景生成与削减方法
  • 构建安全AI风险识别大模型:CoT、训练集与Agent vs. Fine-Tuning对比
  • Maven:在原了解基础上对pom.xml文件进行详细解读
  • 【MySQL系列】 MySQL 中的 TINYINT 类型
  • 如何用事件风暴构建领域模型?
  • 梯度优化提示词:精准引导AI分类
  • 【Redis】分布式缓存的一系列问题(持久化,主从集群,哨兵,分片集群)
  • CMake基础:CMakeLists.txt 文件结构和语法
  • 深入理解 Redis 哨兵模式
  • 理论篇五:如何优化Webpack的打包速度
  • React从基础入门到高级实战:React 基础入门 - React Hooks 入门
  • web网站开发源代码/seo教学免费课程霸屏
  • 多品牌网站建设/做网站建设公司
  • 免费网站代理访问/色盲测试图片
  • 百度怎么做网站广告/西安自动seo
  • 多语言网站难做么/重庆网站
  • 襄樊建设网站/网站推广网