mongodbcdc脚本开发
调试的结论
- 多个集合设置从历史的时间开始读取数据不起作用
- 使用mongodb3.0以上版本,会报错guagu的版本问题
- 可以用正则表达式来表示多个集合,然后使用快照加增量的读取方式来同步数据
- 核心代码如下:
MongoDBSource<String> mongoDBSource = MongoDBSource.<String>builder().hosts("xxx:27017,xxx:27018,xxx:27019").username("name").password("password").databaseList("exchange_kline").collectionList("exchange_kline.futu.*")
// .collectionList("exchange_kline.futu","exchange_kline.futu_1m","exchange_kline.futu_5m","exchange_kline.futu_15m"
// ,"exchange_kline.futu_hour","exchange_kline.futu_week","exchange_kline.futu_month").startupOptions(StartupOptions.timestamp(localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli() - 8*60*60*1000) )
// .startupOptions(StartupOptions.initial()).deserializer(new JsonDebeziumDeserializationSchema()).build();
- pom文件
本次测试基于flink是1.17.1和mongodb-cdc2.4.1
<flink.version>1.17.1</flink.version><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mongodb-cdc</artifactId><version>2.4.1</version><exclusions><exclusion><artifactId>slf4j-api</artifactId><groupId>org.slf4j</groupId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency>
示例数据
{"_id": "{"_id": {"_id": {"$oid": "686cd2dd781d31ce821c9a56"}}}","operationType": "insert","fullDocument": "{"_id": {"$oid": "686cd2dd781d31ce821c9a56"}, "do": "2025-07-07 10:24:00", "d": "2025-07-07 14:24:00", "t": {"$numberLong": "1751898240000"}, "p": "US.MSW", "n": "明成集团", "o": 4.85, "c": 4.85, "h": 4.85, "l": 4.85, "v": 0, "u": {"$numberLong": "1751962333000"}}","source": {"ts_ms": 1751962334000,"snapshot": "false"},"ts_ms": 1752016662101,"ns": {"db": "exchange_kline","coll": "futu_1m"},"to": null,"documentKey": "{"_id": {"$oid": "686cd2dd781d31ce821c9a56"}}","updateDescription": null,"clusterTime": "{"$timestamp": {"t": 1751962334, "i": 6157}}","txnNumber": null,"lsid": null
}
官方文档flinkcdc3.4
FAQ