当前位置: 首页 > news >正文

【实战-12】flink版本表

版本表的概念

只要你的表使用了像 upsert Kafka source ,debezium-json 或 canal-json 这种带操作类型(insert/update/delete)的数据格式,Flink 就会自动把这个表当成“可以查历史版本”的表,无需额外配置。

要求

  1. 设置主键
  2. 设置 event-time attribute
CREATE TABLE products (product_id    STRING,product_name  STRING,price         DECIMAL(32, 2),update_time   TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,PRIMARY KEY (product_id) NOT ENFORCED,WATERMARK FOR update_time AS update_time
) WITH (...);

如何将append stream转成changelog 流呢

首先要满足:主键和event-time attrbute设置

CREATE TABLE currency_rates (currency      STRING,rate          DECIMAL(32, 10),update_time   TIMESTAMP(3),WATERMARK FOR update_time AS update_time //时间属性
) WITH ('connector' = 'kafka','topic'	    = 'rates','properties.bootstrap.servers' = 'localhost:9092','format'    = 'json'
);
上述表对应的数据为:
(changelog kind) update_time   currency   rate
================ ============= =========  ====
+(INSERT)        09:00:00      Yen        102
+(INSERT)        09:00:00      Euro       114
+(INSERT)        09:00:00      USD        1
+(INSERT)        11:15:00      Euro       119
+(INSERT)        11:45:00      Pounds     107
+(INSERT)        11:49:00      Pounds     108
  • 将apend stream 转成changelog stream
-- Define a versioned view
CREATE VIEW versioned_rates AS              
SELECT currency, rate, update_time              -- (1) `update_time` keeps the event timeFROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY currency  -- (2) the inferred unique key `currency` can be a primary keyORDER BY update_time DESC) AS rownum FROM currency_rates)
WHERE rownum = 1; -- the view `versioned_rates` will produce a changelog as the following.
(changelog kind) update_time currency   rate
================ ============= =========  ====
+(INSERT)        09:00:00      Yen        102
+(INSERT)        09:00:00      Euro       114
+(INSERT)        09:00:00      USD        1
+(UPDATE_AFTER)  11:15:00      Euro       119
+(INSERT)        11:45:00      Pounds     107
+(UPDATE_AFTER)  11:49:00      Pounds     108
http://www.dtcms.com/a/486259.html

相关文章:

  • ‌MyBatis-Plus 的 LambdaQueryWrapper 可以实现 OR 条件查询‌
  • 带你了解STM32:SPI通信(硬件部分)
  • CentOS下安装配置JDK24和tomcat11
  • springboot mybatisplus 配置SQL日志,但是没有日志输出
  • Windows下安装配置JDK24和tomcat11
  • 建个大型网站要多少钱房产信息网网站
  • 贵阳建站公司做的不错的h5高端网站
  • 实践 3:Vim 编辑器的使用
  • UG(NX)转换为3DXML全流程技术指南,附迪威模型网在线方案,适用于技术人员与学生
  • Python爬虫第4课:XPath与lxml高级解析技术
  • 使用 EasyExcel 封装通用 Excel 导出工具类
  • asp.net做网站的流程百度标注平台怎么加入
  • 怎么做同学录的网站电子商务公司简介模板
  • Redis(63)Redis的Lua脚本如何使用?
  • 鸿蒙NEXT输入设备开发指南:从触摸屏到游戏手柄的完整解决方案
  • 鸿蒙Harmony实战开发教学Day2-鸿蒙新项目创建+目录配置!(新手入门指南)
  • Lua中,表、元表、对象、类的解析
  • 在易语言里面做网站做二手物资哪个网站好
  • excel和word文件默认用office打开而不是用wps
  • 万网上传网站企业信用信息查询网官网
  • python学习之路(二)
  • IDEA弹框 Server‘s certificate is not trusted /服务器的证书不可信如何解决
  • ​rxn_yields 仓库介绍(https://rxn4chemistry.github.io/rxn_yields/)​
  • 前端视频课程添加水印,全屏不消失解决方法
  • 湖州网站建设哪家好google云平台 wordpress
  • Spring Boot性能优化详解
  • leetcode 329 矩阵中的最长递增路径
  • 生成模型实战 | 实时任意风格迁移
  • C++ --- 模版初阶
  • 外贸家具网站.net网站开发简介