当前位置: 首页 > news >正文

【Flink快速入门-7.Flink 状态管理】

Flink 状态管理

实验介绍

在批计算中,我们对某个特定 Batch 的数据通过一系列计算之后输出一个最终结果,你会发现我们并没有提到过数据的状态,或者说我们对数据状态并不关心。但是在流计算中,有状态的计算是流处理框架中的重要功能之一,因为很多复杂的业务场景都会涉及到数据前后状态。Flink 本身也是带状态的流处理引擎,如果你在此之前有看过 Flink 的官方文档,应该有注意到Stateful这个关键词。本节实验我们就重点学习 Flink 中的状态(State)相关知识点。

知识点
  • State 分类
    • Keyed State
    • Operator State
  • Checkpoint
  • StateBackend

State 分类

在 Flink 中,State 总的来说可以分为两种类型,Keyed State(键控状态)和 Operator State(算子状态)。

  • Operator State:Operator State 可以作用在所有算子上,每个算子中并行的 Task 都可以共享一个状态,或者说同⼀个算⼦中的多个 Task 的状态是相同的。但是请注意,算子状态不能由相同或不同算子的另一个实例访问。Operator State 支持三种基本数据结构,分别是:

    • ListState:存储列表类型的状态。
    • UnionListState:存储列表类型的状态。和 ListState 的区别是,如果发生故障,ListState 会将该算子的所有并发的状态实例进行汇总,然后均分给新的 Task;而 UnionListState 只是将所有并发的状态实例汇总起来,具体的划分行为则由用户进行定义。
    • BroadcastState:用于广播的算子状态。如果一个算子有多项任务,并且它的每项任务状态又都相同,这种情况就可以使用广播状态。
  • Keyed State:Keyed State 是作用在 KeyedStream 上的。从名称中就可以看出来,它的特点是和 Key 强相关的。在任务处理中,Flink 为每个 Key 维护一个状态实例,而且相同 Key 所对应的数据都会被分配到同一个任务中执行。Keyed State 支持五种基本数据结构,分别是:

    • ValueState:保存单个 Value,可以针对该 Value 进行 get/set 操作。
    • ListState:保存一个列表,列表中可以存储多个 Value。可以针对列表进行 add、get、update 操作。
    • MapState:保存 Key-Value 类型的值。可以针对其进行 get、put、remove 操作,还可以使用 contains 判断某个 key 是否存在。
    • ReducingState:保存一个单一值,该值是添加到状态的所有值聚合的结果。
    • AggregatingState:保存一个单一值,该值是添加到状态的所有值聚合的结果。与 ReducingState 有些不同,聚合类型可能不同于添加到状态的元素的类型。接口和 ListState 相同,但是使用 add(IN)添加的元素本质是通过使用指定的 AggregateFunction 进行聚合。

接下来我们以 KeyedState 为例进行实验。假设某网站的用户访问日志样例如下:

20210301121533,login,北京,118.128.11.31,0001
20210301121536,login,上海,10.90.113.150,0002
20210301121544,login,成都,112.112.31.33,0003
20210301121559,login,成都,101.132.93.24,0004
20210301121612,login,上海,189.112.89.78,0005
20210301121638,login,北京,113.52.101.50,0006

从左往右依次表示用户访问时间、用户触发的 action(登录、登出等)、城市、IP 地址、用户 ID。在实验代码中用 UserLog 样例类来表示该日志:

case class UserLog(time: Long, action: String, city: String, ip: String, user_id: String)

现在的业务需求是:现在的业务需求是:统计同城市两个⽤户登录的时间差。特殊的是第⼀个⽤户登录的时 候,它前⾯没有⽤户登录,我们需要在代码⾥特殊处理。在 com.vlab.state 包下创建 KeyedState 的 object。完整代码如下:

package com.vlab.state

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

/**
 * @projectName FlinkLearning  
 * @package com.vlab.state  
 * @className com.vlab.state.KeyedState  
 * @description ${description}  
 * @author pblh123
 * @date 2025/2/8 13:15
 * @version 1.0
 *
 */
    
object KeyedState {

  /**
   * 用户日志案例类
   * 用于表示用户在特定时间点进行的特定操作
   * 主要解决的问题是封装用户日志数据,以便于日志的处理和分析
   *
   * @param time 日志记录的时间戳,表示日志发生的精确时间
   * @param action 用户的具体操作,如登录、浏览商品等
   * @param city 用户进行操作时所在的城市,用于地理位置分析
   * @param ip 用户的IP地址,用于追踪用户来源和进行安全检查
   * @param userid 用户的唯一标识符,用于关联用户的所有操作
   */
  case class UserLog(time: Long, action: String, city: String, ip:String, userid:String)

  def main(args: Array[String]): Unit = {
    // 参数数量判断,输入IP
    if (args.length != 1) {
      System.err.println("Usage: KeyedState <input ip>")
      System.exit(5)
    }
    val inputIp = args(0)
    // 获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val data: DataStream[String] = env.socketTextStream(inputIp, 9999)

    import org.apache.flink.api.scala._

    // 数据映射为UserLog案例类
    val datauserlog = data.map(line => {
      val fields = line.split(",")
      UserLog(fields(0).toLong, fields(1), fields(2), fields(3), fields(4))
    })

    // 进行 keyBy 操作,基于 city 来分组
    datauserlog.keyBy(_.city)
      .mapWithState[(String, Long), UserLog] {
        case (curr: UserLog, None) =>
          // 没有前一个状态,返回当前城市和初始时间差(0)
          // 输出格式 (城市名, 0),// 保存当前的 UserLog
          ((curr.city, 0L),Some(curr))

        case (curr: UserLog, last: Some[UserLog]) =>
          // 有前一个状态,计算时间差
          val diff = curr.time - last.get.time
          // 返回城市名和时间差 // 更新状态
          ((curr.city, Math.abs(diff)) ,Some(curr))
      }
      .print() // 打印输出

    env.execute("KeyedState")
  }
}

上面代码的 main 函数中,我们监听 localhost 的 9999 端口并将接收到的日志通过 map 算子转换为 UserLog 类型。重点是接下来的这段代码:

    // 进行 keyBy 操作,基于 city 来分组
    datauserlog.keyBy(_.city)
      .mapWithState[(String, Long), UserLog] {
        case (curr: UserLog, None) =>
          // 没有前一个状态,返回当前城市和初始时间差(0)
          // 输出格式 (城市名, 0),// 保存当前的 UserLog
          ((curr.city, 0L),Some(curr))

        case (curr: UserLog, last: Some[UserLog]) =>
          // 有前一个状态,计算时间差
          val diff = curr.time - last.get.time
          // 返回城市名和时间差 // 更新状态
          ((curr.city, Math.abs(diff)) ,Some(curr))
      }
      .print() // 打印输出

在根据城市 keyBy 之后,使用了 mapWithState 方法,mapWithState 算子是 map 算子对应的带状态的方法。Flink 中基本上所有的基础算子都有一个对应的带状态的算子,且以 WithState 结尾。在 mapWithState 中,curr 变量和 last 变量分别表示当前登录用户和前一个登录用户,但是由于第一个用户登录的时候,它前面没有用户登录,所以用 case (curr:UserLog, None) => ((curr.city, 0), Some(curr)) 来处理,注意第二个输入参数为 None。由于数据在传输过程中可能存在乱序的情况(输出的顺序和输入的顺序不一致),为了避免时间差为负值的情况,所以用了 Math.abs() 取绝对值。

在终端中执行 nc -l -p 9999,运行以上代码,并在终端中输入以下日志:

20210301121533,login,北京,118.128.11.31,0001
20210301121536,login,上海,10.90.113.150,0002
20210301121544,login,成都,112.112.31.33,0003
20210301121559,login,成都,101.132.93.24,0004
20210301121612,login,上海,189.112.89.78,0005
20210301121638,login,北京,113.52.101.50,0006

输出如下:
在这里插入图片描述

Checkpoint

在开始认识 Checkpoint 之前,我们先认识三个分布式中表示状态一致性的术语,分别是 at-most-once、at-least-once、exactly-once。

  • At-most-once:最多一次。数据最多被处理一次,这意味着当程序发生故障的时候,有可能会造成数据丢失。
  • At-least-once:最少一次。数据至少会被处理一次,这意味着当程序发生故障的时候,可能会存在同一条数据被重复多次计算的情况。
  • Exactly-once:精准一次。数据不多不少刚好被使用了一次,即使程序发生故障,但是任务恢复之后会和发生故障之前的状态保持一致,已经处理过的数据不会被重复处理,没有被处理的数据也不会丢失。

这三个术语并不是 Flink 中特有的,在其它分布式框架中也会涉及到,特别是在 Kafka 中尤为频繁。

既然 Flink 提供了 State 的支持,那么在程序出现问题或者机器宕机的时候,如何保证整个程序都能够顺利恢复到正确的 State?或者说如何保证 Exactly-Once 语义?为了解决这个问题,Flink 引入了 Checkpoint(检查点)机制。Flink Checkpoint 是一种容错恢复机制,这种机制保证了实时程序运行时,即使突然遇到异常也可以自我进行恢复。Checkpoint 示意图如下所示,其中一个非常重要的角色就是 barrier(栅栏,可以理解为一个个标记)。在数据流入 Flink 的过程中,会从源头开始就往数据流中注入 barrier。作为数据流数据的一部分,barrier 不会干扰正常的数据流,一个 barrier 会把数据分割成两个部分,一部分进入当前快照,另一部分进入下一个快照。每个 barrier 都带有快照的 id,并且 barrier 之前的数据都进入了此快照。多个 barrier 会出现在数据流中,也就是会产生多个快照。当 barrier 在 source 源头被注入时,系统就会记录当前快照的位置。
在这里插入图片描述

Flink 中默认没有开启 Checkpoint,需要我们通过 env.enableCheckpointing() 方法去指定。env.enableCheckpointing() 方法接受一个 Long 类型的参数,表示设定 Checkpoint 的时间间隔,单位为毫秒。如设置时间间隔为 10 秒:

env.enableCheckpointing(10 * 1000)

StateBackends

当 Checkpoint 机制启动时,State 将在检查点中持久化来应对数据丢失以及恢复。而状态在内部是如何表示的、状态是如何持久化到检查点中以及持久化到哪里都取决于选定的 StateBackends(状态后端)。Flink 提供了三种 StateBackends,分别是 MemoryStateBackend、

FsStateBackend 和 RocksDBStateBackend。

  • MemoryStateBackend:基于内存的状态管理。特点是速度快、性能高。缺点是一旦机器发生故障,整个内存中存储的状态数据都会丢失,而且内存由于内存大小的限制,这种方式并不会被用在生产环境中。MemoryStateBackend 可以通过 env.setStateBackend(new MemoryStateBackend(maxStateSize)),maxStateSize 是一个 Int 类型的参数,用来设置保存 State 可占用的最大内存。

  • FsStateBackend:基于文件系统的状态管理。这里的文件系统可以是本地文件系统,也可以是分布式文件系统,如:HDFS、OSS、S3 等。相对于 MemoryStateBackend 来说,基于文件的方式数据安全性更加得到了保障,可以保存的 State 也不会受限于磁盘大小。FsStateBackend 可以使用 setStateBackend(new FsStateBackend(path)) 来指定,参数 path 表示存储路径。生产环境一般都用分布式文件系统进存储,如:hdfs://node1:9000/flink/checkpoint/

  • RocksDBStateBackend:用 RocksDB 来存储 State。RocksDB 是一个 Key-Value 型的数据库,这种方式大家了解即可。

实验总结

本节实验我们介绍了 Flink 中的状态管理,包括 State 分类、Checkpoint 机制和 StateBackends。其中 State 分类包括 Keyed State 和 Operator State。在 Flink 状态管理中,使用相对来说比较简单,重点是概念理解。如果你学习过 Spark,请不要用 Spark 中的 Checkpoint 来类比 Flink 中的 Checkpoint,这是两种完全不同的机制。另外,在 Keyed State 案例中的 mapWithStat 参数如果理解起来比较困难,可以查阅资料补充一下 Scala 基础知识。

相关文章:

  • c++作业
  • 数据结构-----双向链表
  • Java 大视界 -- 企业数字化转型中的 Java 大数据战略与实践(93)
  • 普通人如何主动适应AI浪潮
  • Dify +deepseek-free-api 实现API对话
  • 机器学习·数据处理
  • 你知道数字电路中的运算器不?
  • 内存泄漏是什么?
  • 【Web前端开发精品课 HTML CSS JavaScript基础教程】第二十五章课后题答案
  • 华为动态路由-OSPF-骨干区
  • 有没有其他技术可以替代本地 RAG?
  • 基于ffmpeg+openGL ES实现的视频编辑工具-环境搭建(三)
  • 负载均衡集群( LVS 相关原理与集群构建 )
  • 无人机热成像与AI跟踪技术:全天候智能应用的未来!
  • 【无标题】基于Unity写一个DelayInvoke方法
  • Vue 3:基于按钮切换动态图片展示(附Demo)
  • 让大模型帮我设计crnn网络及可运行demo,gpt4o豆包qwendeepseek-r1
  • 用idea创建vue3项目过程中遇到的问题
  • 机器学习-生命周期
  • 为什么外贸办公需要跨境专线网络?
  • 虚假认定实质性重组、高估不良债权价值,原中国华融资产重庆分公司被罚180万元
  • 援藏博士张兴堂已任西藏农牧学院党委书记、副校长
  • 保证断电、碰撞等事故中车门系统能够开启!汽车车门把手将迎来强制性国家标准
  • 体坛联播|曼联热刺会师欧联杯决赛,多哈世乒赛首日赛程出炉
  • 昆明阳宗海风景名胜区19口井违规抽取地热水,整改后用自来水代替温泉
  • 纪录片《中国》原班人马打造,《船山先生》美学再升级