【实战-12】flink版本表
版本表的概念
只要你的表使用了像 upsert Kafka source ,debezium-json 或 canal-json 这种带操作类型(insert/update/delete)的数据格式,Flink 就会自动把这个表当成“可以查历史版本”的表,无需额外配置。
要求
- 设置主键
- 设置 event-time attribute
CREATE TABLE products (product_id STRING,product_name STRING,price DECIMAL(32, 2),update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,PRIMARY KEY (product_id) NOT ENFORCED,WATERMARK FOR update_time AS update_time
) WITH (...);
如何将append stream转成changelog 流呢
首先要满足:主键和event-time attrbute设置
CREATE TABLE currency_rates (currency STRING,rate DECIMAL(32, 10),update_time TIMESTAMP(3),WATERMARK FOR update_time AS update_time //时间属性
) WITH ('connector' = 'kafka','topic' = 'rates','properties.bootstrap.servers' = 'localhost:9092','format' = 'json'
);
上述表对应的数据为:
(changelog kind) update_time currency rate
================ ============= ========= ====
+(INSERT) 09:00:00 Yen 102
+(INSERT) 09:00:00 Euro 114
+(INSERT) 09:00:00 USD 1
+(INSERT) 11:15:00 Euro 119
+(INSERT) 11:45:00 Pounds 107
+(INSERT) 11:49:00 Pounds 108
- 将apend stream 转成changelog stream
-- Define a versioned view
CREATE VIEW versioned_rates AS
SELECT currency, rate, update_time -- (1) `update_time` keeps the event timeFROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY currency -- (2) the inferred unique key `currency` can be a primary keyORDER BY update_time DESC) AS rownum FROM currency_rates)
WHERE rownum = 1; -- the view `versioned_rates` will produce a changelog as the following.
(changelog kind) update_time currency rate
================ ============= ========= ====
+(INSERT) 09:00:00 Yen 102
+(INSERT) 09:00:00 Euro 114
+(INSERT) 09:00:00 USD 1
+(UPDATE_AFTER) 11:15:00 Euro 119
+(INSERT) 11:45:00 Pounds 107
+(UPDATE_AFTER) 11:49:00 Pounds 108