Flink流处理实战:实时对账与双流连接
目录
1. 环境设置
2. 创建两个流
3. 使用 connect 和 CoMapFunction
4. 实时对账场景
5. 使用 KeyedCoProcessFunction 进行对账
6. 执行任务
拓展
总结
package transformplusimport org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.{CoMapFunction, KeyedCoProcessFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector/**** @PROJECT_NAME: flink1.13* @PACKAGE_NAME: transformplus* @author: 赵嘉盟-HONOR* @data: 2025-05-05 10:54* @DESCRIPTION**/
object Connect {def main(args: Array[String]): Unit = {val env=StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val stream1=env.fromElements(1,2,3)val stream2=env.fromElements(1.1,2.2,3.3)stream1.connect(stream2).map(new CoMapFunction[Int,Double,String] {override def map1(in1: Int): String = s"Int: $in1"override def map2(in2: Double): String = s"Double: $in2"})//TODO 实时对账//1、来自APP的支付日志流(order_id,status,timestamp)val appStream=env.fromElements(("order_1","success",1000L),("order_2","success",2000L)).assignAscendingTimestamps(_._3)//2、来自第三方支付平台的日志流(order_id,status,platform_id,timestamp)val thirdpartyStream = env.fromElements(("order_1", "success","wechat", 3000L),("order_3", "success","alipay", 4000L)).assignAscendingTimestamps(_._4)appStream.connect(thirdpartyStream).keyBy(_._1,_._1).process(new KeyedCoProcessFunction[String, (String,String,Long), (String,String,String,Long), String] {var appEvent:ValueState[(String,String,Long)]=_var thirdpartyEvent:ValueState[(String,String,String,Long)]=_override def open(parameters: Configuration): Unit = {appEvent=getRuntimeContext.getState(new ValueStateDescriptor[(String,String,Long)]("app-event",classOf[(String,String,Long)]))thirdpartyEvent=getRuntimeContext.getState(new ValueStateDescriptor[(String,String,String,Long)]("thirdparty-event",classOf[(String,String,String,Long)]))}override def processElement1(in1: (String, String, Long), context: KeyedCoProcessFunction[String, (String, String, Long), (String, String, String, Long), String]#Context, collector: Collector[String]): Unit = {if(thirdpartyEvent.value() != null){collector.collect(s"${in1._1} 对账成功")thirdpartyEvent.clear()} else {context.timerService().registerEventTimeTimer(in1._3+5000L)appEvent.update(in1)}}override def processElement2(in2: (String, String, String, Long), context: KeyedCoProcessFunction[String, (String, String, Long), (String, String, String, Long), String]#Context, collector: Collector[String]): Unit = {if (appEvent.value() != null) {collector.collect(s"${in2._1} 对账成功")appEvent.clear()} else {context.timerService().registerEventTimeTimer(in2._4 + 5000L)thirdpartyEvent.update(in2)}}override def onTimer(timestamp: Long, ctx: KeyedCoProcessFunction[String, (String, String, Long), (String, String, String, Long), String]#OnTimerContext, out: Collector[String]): Unit = {//判断状态是否为空,如果不为空,说明另一条流对应数据没来if(appEvent.value() != null) out.collect(s"${appEvent.value()._1} 对账失败,第三方支付事件未到")if(thirdpartyEvent.value() != null) out.collect(s"${thirdpartyEvent.value()._1} 对账失败,app支付事件未到")appEvent.clear()thirdpartyEvent.clear()}}).print()env.execute("Connect")}
}
这段代码是一个使用 Apache Flink 进行流处理的示例,主要展示了如何使用 connect
操作符将两个流连接起来,并通过 CoMapFunction
和 KeyedCoProcessFunction
对这两个流进行处理。下面是对代码的详细解释和拓展:
1. 环境设置
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
StreamExecutionEnvironment
是 Flink 流处理程序的入口。setParallelism(1)
设置并行度为 1,意味着所有的操作都会在单个线程中执行,通常用于调试和测试。
2. 创建两个流
val stream1 = env.fromElements(1, 2, 3)
val stream2 = env.fromElements(1.1, 2.2, 3.3)
fromElements
方法用于从给定的元素集合中创建一个流。stream1
是一个包含整数的流,stream2
是一个包含双精度浮点数的流。
3. 使用 connect
和 CoMapFunction
stream1.connect(stream2).map(new CoMapFunction[Int, Double, String] {override def map1(in1: Int): String = s"Int: $in1"override def map2(in2: Double): String = s"Double: $in2"
})
connect
操作符将两个流连接在一起,但不会合并它们。它返回一个ConnectedStreams
对象。CoMapFunction
是一个函数接口,用于对连接的两个流进行映射操作。map1
处理第一个流的元素,map2
处理第二个流的元素。- 在这个例子中,
map1
将整数转换为字符串,map2
将双精度浮点数转换为字符串。
4. 实时对账场景
val appStream = env.fromElements(("order_1", "success", 1000L),("order_2", "success", 2000L)
).assignAscendingTimestamps(_._3)val thirdpartyStream = env.fromElements(("order_1", "success", "wechat", 3000L),("order_3", "success", "alipay", 4000L)
).assignAscendingTimestamps(_._4)
appStream
和thirdpartyStream
分别表示来自 APP 和第三方支付平台的支付日志流。assignAscendingTimestamps
方法用于为流中的元素分配时间戳,以便后续基于时间的操作(如窗口、定时器等)能够正确执行。
5. 使用 KeyedCoProcessFunction
进行对账
appStream.connect(thirdpartyStream).keyBy(_._1, _._1).process(new KeyedCoProcessFunction[String, (String, String, Long), (String, String, String, Long), String] {var appEvent: ValueState[(String, String, Long)] = _var thirdpartyEvent: ValueState[(String, String, String, Long)] = _override def open(parameters: Configuration): Unit = {appEvent = getRuntimeContext.getState(new ValueStateDescriptor[(String, String, Long)]("app-event", classOf[(String, String, Long)]))thirdpartyEvent = getRuntimeContext.getState(new ValueStateDescriptor[(String, String, String, Long)]("thirdparty-event", classOf[(String, String, String, Long)]))}override def processElement1(in1: (String, String, Long), context: KeyedCoProcessFunction[String, (String, String, Long), (String, String, String, Long), String]#Context, collector: Collector[String]): Unit = {if (thirdpartyEvent.value() != null) {collector.collect(s"${in1._1} 对账成功")thirdpartyEvent.clear()} else {context.timerService().registerEventTimeTimer(in1._3 + 5000L)appEvent.update(in1)}}override def processElement2(in2: (String, String, String, Long), context: KeyedCoProcessFunction[String, (String, String, Long), (String, String, String, Long), String]#Context, collector: Collector[String]): Unit = {if (appEvent.value() != null) {collector.collect(s"${in2._1} 对账成功")appEvent.clear()} else {context.timerService().registerEventTimeTimer(in2._4 + 5000L)thirdpartyEvent.update(in2)}}override def onTimer(timestamp: Long, ctx: KeyedCoProcessFunction[String, (String, String, Long), (String, String, String, Long), String]#OnTimerContext, out: Collector[String]): Unit = {if (appEvent.value() != null) out.collect(s"${appEvent.value()._1} 对账失败,第三方支付事件未到")if (thirdpartyEvent.value() != null) out.collect(s"${thirdpartyEvent.value()._1} 对账失败,app支付事件未到")appEvent.clear()thirdpartyEvent.clear()}}).print()
keyBy(_._1, _._1)
根据订单 ID 对两个流进行分组,确保相同订单的事件会被发送到同一个处理函数中。KeyedCoProcessFunction
是一个用于处理两个键控流的函数。它允许你分别处理来自两个流的元素,并且可以访问 Flink 的状态管理和定时器功能。processElement1
和processElement2
分别处理来自appStream
和thirdpartyStream
的元素。如果另一个流的事件已经到达,则输出对账成功;否则,注册一个定时器,并在 5 秒后检查是否对账成功。onTimer
方法在定时器触发时执行,用于处理对账失败的情况。
6. 执行任务
env.execute("Connect")
execute
方法启动 Flink 作业的执行。
拓展
- 状态管理:
ValueState
用于存储和访问键控状态。在这个例子中,appEvent
和thirdpartyEvent
分别存储来自 APP 和第三方支付平台的事件。 - 定时器:
registerEventTimeTimer
用于注册基于事件时间的定时器。定时器可以在未来的某个时间点触发,用于处理超时或延迟的事件。 - 对账逻辑:这个例子展示了一个简单的对账逻辑,实际应用中可能需要更复杂的逻辑,如处理部分成功、重试机制等。
- 并行度:在实际生产环境中,通常会将并行度设置为大于 1 的值,以充分利用集群资源。
总结
这段代码展示了如何使用 Flink 处理两个流,并通过状态管理和定时器实现实时对账功能。通过 connect
和 KeyedCoProcessFunction
,可以灵活地处理来自不同流的事件,并实现复杂的业务逻辑。