当前位置: 首页 > wzjs >正文

宛城区网站建设nba最新消息球员交易

宛城区网站建设,nba最新消息球员交易,网站的建设时间表,wordpress解压子目录下在 Flink CDC 中为 Source 数据流配置事件时间需要结合时间语义设置、时间戳分配和水位线生成三个核心步骤。以下是具体配置方法及注意事项: 1. 设置时间语义 Flink 默认使用处理时间(Processing Time),需显式指定事件时间语义&a…

在 Flink CDC 中为 Source 数据流配置事件时间需要结合时间语义设置时间戳分配水位线生成三个核心步骤。以下是具体配置方法及注意事项:


1. 设置时间语义

Flink 默认使用处理时间(Processing Time),需显式指定事件时间语义:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 设置为事件时间

若使用 Flink 1.12+ 版本,事件时间已是默认语义,但仍建议显式设置以避免混淆。


2. 分配时间戳

(1) 从 CDC 数据中提取时间戳

CDC 数据(如 MySQL Binlog)通常包含变更时间字段(如 update_time),需通过 TimestampAssigner 提取:

DataStream<ChangeEvent> cdcStream = env.addSource(MySqlSource.create(...));DataStream<ChangeEvent> timedStream = cdcStream.assignTimestampsAndWatermarks(WatermarkStrategy.<ChangeEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event, recordTimestamp) -> event.getTimestamp() // 从事件中提取时间戳(毫秒))
);

关键点

  • 字段选择:优先使用业务字段(如订单创建时间)或数据库的 update_time 作为事件时间戳。
  • 类型转换:若时间戳为字符串(如 "2023-10-01 12:00:00"),需先转换为毫秒值。

(2) 通过 DDL 定义时间属性(Table API)

若使用 Flink SQL/Table API,可在 DDL 中直接定义时间属性:

CREATE TABLE orders (id INT,order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'mysql-cdc',...
);

此方式通过 WATERMARK 语句隐式分配时间戳并生成水位线。


3. 生成水位线

水位线用于处理乱序事件,需根据业务容忍的延迟设置策略:

(1) 固定延迟策略(BoundedOutOfOrderness)

WatermarkStrategy.<ChangeEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(...);

此策略允许最大 5 秒的乱序延迟,适用于大多数业务场景。

(2) 单调递增策略(MonotonousTimestamps)

WatermarkStrategy.<ChangeEvent>forMonotonousTimestamps();

若数据严格有序(如 Kafka 分区有序),可直接使用此策略。

(3) 自定义水位线生成器

对于复杂逻辑(如动态调整延迟),需实现 WatermarkGenerator 接口:

public class CustomWatermarkStrategy implements WatermarkGenerator<ChangeEvent> {@Overridepublic void onEvent(ChangeEvent event, long eventTimestamp, WatermarkOutput output) {// 动态计算最大事件时间maxTimestamp = Math.max(maxTimestamp, eventTimestamp);}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(maxTimestamp - 5000)); // 延迟5秒}
}

4. CDC 源的特殊处理

(1) MySQL CDC 的时间戳提取

MySQL Binlog 中的 ts_sec 字段表示事务提交时间,可将其作为事件时间戳:

.withTimestampAssigner((event, recordTimestamp) -> event.getSource().get("ts_sec") // 提取Binlog中的时间戳字段
)

(2) 处理无时间戳的 CDC 数据

若 CDC 数据无时间戳字段,可回退到处理时间或摄取时间:

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 切换为处理时间

5. 注意事项

  1. 水位线生成位置:尽量在 Source 后第一个算子分配时间戳,避免因并行度变化导致乱序。
  2. 水位线间隔调整:默认 200ms 生成一次,可通过 env.getConfig().setAutoWatermarkInterval(1000) 调整为 1 秒。
  3. 状态 TTL:若 CDC 数据量极大,需设置状态 TTL 防止 OOM:
    StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.days(1)).build();
    

完整示例(DataStream API)

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 定义 MySQL CDC Source
MySqlSource<ChangeEvent> source = MySqlSource.<ChangeEvent>builder().hostname("localhost").port(3306).databaseList("mydb").tableList("mydb.orders").username("user").password("pass").deserializer(new JsonDebeziumDeserializationSchema()).build();// 分配时间戳与水位线
DataStream<ChangeEvent> stream = env.fromSource(source,WatermarkStrategy.<ChangeEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event, ts) -> event.getUpdateTime()),"MySQL Source"
);// 后续窗口处理
stream.keyBy(event -> event.getOrderId()).window(TumblingEventTimeWindows.of(Time.minutes(5))).aggregate(...);

通过以上配置,Flink CDC 数据流即可正确使用事件时间语义,处理乱序数据并触发窗口计算。具体策略需根据业务延迟容忍度和数据特征调整。

http://www.dtcms.com/wzjs/145308.html

相关文章:

  • 大学生网页设计作业成品下载seo搜狗
  • 站长推荐入口自动跳转广告公司
  • 女装东莞网站建设建设网站公司
  • 家乐福网上商城客服下载优化大师并安装
  • 去哪找网站建设公司百度网盟推广官方网站
  • 站外营销有哪几种主流方式关键词林俊杰mp3免费下载
  • 中国城乡建设厅网站流氓网站
  • 网站公司怎么做的好如何进行搜索引擎优化
  • 怎么让百度快速收录网站最新国际新闻大事件
  • 网站建设安全保密协议百度网盘客服24小时电话人工服务
  • 网站后台管理员做链接成都网站seo收费标准
  • 做新闻类网站如何盈利百度注册新账号
  • 正规免费发布信息网站深圳百度快速排名提升
  • py怎么做网站站长网站工具
  • 自己动手创建一个公司网站枫树seo
  • 北京办理营业执照多少钱贵州萝岗seo整站优化
  • 网页设计作业制作与江东seo做关键词优化
  • 温州网站推广优化怎么在百度上发广告
  • 1级a做爰免费网站天津做网站的公司
  • 织梦做的网站织梦修改网页百度竞价怎么开户
  • 武汉疾控最新提醒巩义网站优化公司
  • 做的新网站网上搜不到合肥头条今日头条新闻最新消息
  • 首尔面积影响关键词优化的因素
  • 做市场的逛的网站百度惠生活怎么优化排名
  • 广州网站建设模板制作厦门seo网站管理
  • php网站开发师招聘百度推广竞价排名技巧
  • 音响 东莞网站建设技术支持seo百度百科
  • 衡阳市住房和城乡建设局网站外贸网络推广怎么做
  • 免费建设公司网站新东方烹饪学校
  • 建设网站需要多少钱济南兴田德润地址关键词优化排名的步骤