当前位置: 首页 > news >正文

mongodbcdc脚本开发

调试的结论

  • 多个集合设置从历史的时间开始读取数据不起作用
  • 使用mongodb3.0以上版本,会报错guagu的版本问题
  • 可以用正则表达式来表示多个集合,然后使用快照加增量的读取方式来同步数据
  • 核心代码如下:
        MongoDBSource<String> mongoDBSource = MongoDBSource.<String>builder().hosts("xxx:27017,xxx:27018,xxx:27019").username("name").password("password").databaseList("exchange_kline").collectionList("exchange_kline.futu.*")
//                .collectionList("exchange_kline.futu","exchange_kline.futu_1m","exchange_kline.futu_5m","exchange_kline.futu_15m"
//                ,"exchange_kline.futu_hour","exchange_kline.futu_week","exchange_kline.futu_month").startupOptions(StartupOptions.timestamp(localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli() - 8*60*60*1000) )
//                .startupOptions(StartupOptions.initial()).deserializer(new JsonDebeziumDeserializationSchema()).build();
  • pom文件
    本次测试基于flink是1.17.1和mongodb-cdc2.4.1
 <flink.version>1.17.1</flink.version><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mongodb-cdc</artifactId><version>2.4.1</version><exclusions><exclusion><artifactId>slf4j-api</artifactId><groupId>org.slf4j</groupId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency>

示例数据

{"_id": "{"_id": {"_id": {"$oid": "686cd2dd781d31ce821c9a56"}}}","operationType": "insert","fullDocument": "{"_id": {"$oid": "686cd2dd781d31ce821c9a56"}, "do": "2025-07-07 10:24:00", "d": "2025-07-07 14:24:00", "t": {"$numberLong": "1751898240000"}, "p": "US.MSW", "n": "明成集团", "o": 4.85, "c": 4.85, "h": 4.85, "l": 4.85, "v": 0, "u": {"$numberLong": "1751962333000"}}","source": {"ts_ms": 1751962334000,"snapshot": "false"},"ts_ms": 1752016662101,"ns": {"db": "exchange_kline","coll": "futu_1m"},"to": null,"documentKey": "{"_id": {"$oid": "686cd2dd781d31ce821c9a56"}}","updateDescription": null,"clusterTime": "{"$timestamp": {"t": 1751962334, "i": 6157}}","txnNumber": null,"lsid": null
}

官方文档flinkcdc3.4

FAQ
在这里插入图片描述

http://www.dtcms.com/a/272431.html

相关文章:

  • 书生大模型实战营——1. 大语言模型原理与书生大模型提示词工程实践
  • 大数据学习7:Azkaban调度器
  • 记一次Android Studio编译报错:Execution failed for task ‘:app:compileDebugAidl‘
  • Redis数据类型之hash
  • Android 网络开发核心知识点
  • ICML 2025|快手提出了基于残差的超低码率图像压缩方法ResULIC
  • 【Bluedroid】蓝牙协议栈控制器能力解析与核心功能配置机制(decode_controller_support)
  • git中的fork指令解释
  • Linux - firewall 防火墙
  • 强缓存和协商缓存详解
  • 机器学习核心算法:PCA与K-Means解析
  • Java从入门到精通!第三天(数组)
  • Shell 中的重定向
  • C++实习面试题
  • 如何看待java开发和AI的关系?
  • GO启动一个视频下载接口 前端可以边下边放
  • 【PyTorch】PyTorch中的数据预处理操作
  • 50天50个小项目 (Vue3 + Tailwindcss V4) ✨ | DoubleVerticalSlider(双垂直滑块)
  • 图解LeetCode:79递归实现单词搜索
  • Django+DRF 实战:自定义异常处理流程
  • 20.4 量子安全加密算法
  • 案例分享--福建洋柄水库大桥智慧桥梁安全监测(二)之数字孪生和系统平台
  • 机器学习13——支持向量机下
  • TCP传输控制层协议深入理解
  • 当CCLinkIE撞上Modbus TCP:照明控制系统的“方言战争”终结术
  • VIP可读
  • 线性回归与正则化
  • Django专家成长路线知识点——AI教你学Django
  • 【PTA数据结构 | C语言版】顺序栈的3个操作
  • 【深度学习系列--经典论文解读】Gradient-Based Learning Applied to Document Recognition