当前位置: 首页 > news >正文

Flink CDC实时数据同步与小时级统计方案(Flink 1.13.5 + MySQL 8.0)

Flink CDC 实时数据同步与小时级统计方案(Flink 1.13.5 + MySQL 8.0)

一、方案概述

本方案基于 Flink 1.13.5MySQL 8.0,通过 Flink CDC 监听 MySQL 源表的新增数据,实时同步至 Kafka,并对数据按小时进行聚合统计,最终将统计结果写入 MySQL 统计表。

  • 核心流程:MySQL 数据变更 → Flink CDC 捕获 → Kafka 存储(原始数据)→ Flink 实时聚合(每小时)→ MySQL 统计表(结果存储)
  • Kafka 主题名称mysql_cdc_user_behavior(以用户行为数据为例,可根据业务调整)
二、环境准备
1. 软件版本
  • Flink:1.13.5(需提前部署集群或使用本地模式)
  • MySQL:8.0(源数据库 + 统计结果数据库,可同一实例或分库)
  • Kafka:2.8.1(用于存储 CDC 原始数据,需提前部署)
  • JDK:1.8
  • Maven:3.6+
2. 依赖配置(pom.xml)
<dependencies><!-- Flink 核心依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.13.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.13.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>1.13.5</version></dependency><!-- Flink CDC 依赖(MySQL 8.0 兼容版本) --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.2.1</version> <!-- 与 Flink 1.13.5 兼容 --></dependency><!-- Kafka 连接器 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.13.5</version></dependency><!-- MySQL JDBC 驱动(用于写入统计结果) --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version></dependency><!-- Flink JDBC 连接器 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>1.13.5</version></dependency>
</dependencies>
3. 数据库配置
(1)源数据库(MySQL 8.0)配置

需开启 binlog 并授权 CDC 用户:

  1. 修改 MySQL 配置文件(my.cnfmy.ini):
    [mysqld]
    server-id = 100  # 唯一标识,避免与其他实例冲突
    log_bin = /var/lib/mysql/mysql-bin  # binlog 存储路径
    binlog_format = ROW  # 必须为 ROW 格式
    binlog_row_image = FULL  # 记录完整行数据
    expire_logs_days = 7  # binlog 保留 7 天(避免全量同步时丢失)
    
  2. 重启 MySQL 并验证配置:
    SHOW VARIABLES LIKE 'log_bin';  -- 结果为 ON
    SHOW VARIABLES LIKE 'binlog_format';  -- 结果为 ROW
    
  3. 创建 CDC 专用用户并授权:
    CREATE USER 'cdc_user'@'%' IDENTIFIED BY 'Cdc@123456';  -- 密码需符合 MySQL 8.0 复杂度要求
    GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc_user'@'%';
    FLUSH PRIVILEGES;
    
  4. 创建源表(以用户行为表为例):
    CREATE DATABASE IF NOT EXISTS user_db;
    USE user_db;
    CREATE TABLE user_behavior (id BIGINT PRIMARY KEY AUTO_INCREMENT,user_id BIGINT NOT NULL,action_type VARCHAR(20) NOT NULL,  -- 行为类型:click, view, purchase 等action_time DATETIME NOT NULL,     -- 行为时间product_id BIGINT NOT NULL         -- 商品 ID
    );
    
(2)统计结果数据库配置

创建统计结果表(可与源库同实例,也可单独部署):

CREATE DATABASE IF NOT EXISTS stat_db;
USE stat_db;
CREATE TABLE hourly_action_stats (stat_hour DATETIME PRIMARY KEY,  -- 统计小时(如 2023-10-24 10:00:00)total_actions INT NOT NULL,      -- 总行为数click_count INT NOT NULL,        -- 点击数view_count INT NOT NULL,         -- 浏览数purchase_count INT NOT NULL,     -- 购买数update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
4. Kafka 配置
  1. 确保 Kafka 集群正常运行,创建主题:
    bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic mysql_cdc_user_behavior --partitions 3 --replication-factor 1
    
三、核心代码实现
1. 数据结构定义(POJO 类)
// 用户行为数据实体类(与源表字段对应)
public class UserBehavior {private Long id;private Long userId;private String actionType;private LocalDateTime actionTime;private Long productId;// 无参构造器(Flink 序列化要求)public UserBehavior() {}// 全参构造器、getter、setter、toString 方法// ...
}// 小时统计结果实体类(与统计表字段对应)
public class HourlyStats {private LocalDateTime statHour;  // 统计小时(精确到小时)private Integer totalActions;private Integer clickCount;private Integer viewCount;private Integer purchaseCount;// 无参构造器、全参构造器、getter、setter// ...
}
2. 主程序逻辑
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.connector.jdbc.JdbcSink;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.sql.PreparedStatement;public class CdcToKafkaAndStats {public static void main(String[] args) throws Exception {// 1. 初始化 Flink 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 启用 Checkpoint(确保 Exactly-Once,每 5 分钟一次)env.enableCheckpointing(300000);env.getCheckpointConfig().setCheckpointTimeout(60000);// 2. 创建 MySQL CDC 源(监听 user_behavior 表的新增数据)MySqlSource<String> mysqlSource = MySqlSource.<String>builder().hostname("localhost").port(3306).databaseList("user_db")  // 源数据库.tableList("user_db.user_behavior")  // 源表.username("cdc_user").password("Cdc@123456")// 反序列化:将 Debezium 数据转为 JSON 字符串.deserializationSchema(new JsonDebeziumDeserializationSchema()).build();// 3. 读取 CDC 数据,过滤出新增操作(op = 'c')DataStream<String> cdcJsonStream = env.fromSource(mysqlSource,WatermarkStrategy.noWatermarks(),"MySQL-CDC-Source").filter(jsonStr -> {ObjectMapper mapper = new ObjectMapper();try {JsonNode jsonNode = mapper.readTree(jsonStr);return "c".equals(jsonNode.get("op").asText());  // 只处理新增数据} catch (Exception e) {return false;}});// 4. 将 CDC 原始数据写入 KafkaString kafkaBroker = "localhost:9092";String topic = "mysql_cdc_user_behavior";FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(kafkaBroker,topic,new SimpleStringSchema());cdcJsonStream.addSink(kafkaSink).name("Kafka-Sink");// 5. 解析 JSON 为 UserBehavior 对象DataStream<UserBehavior> behaviorStream = cdcJsonStream.map(jsonStr -> {ObjectMapper mapper = new ObjectMapper();JsonNode jsonNode = mapper.readTree(jsonStr);JsonNode afterNode = jsonNode.get("after");  // 新增数据在 after 字段DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");return new UserBehavior(afterNode.get("id").asLong(),afterNode.get("user_id").asLong(),afterNode.get("action_type").asText(),LocalDateTime.parse(afterNode.get("action_time").asText(), formatter),afterNode.get("product_id").asLong());}).name("Parse-To-UserBehavior");// 6. 按小时窗口聚合统计DataStream<HourlyStats> statsStream = behaviorStream// 按统计小时分组(将 actionTime 截断到小时).keyBy(behavior -> {LocalDateTime actionTime = behavior.getActionTime();return actionTime.withMinute(0).withSecond(0).withNano(0);  // 如 10:23:45 → 10:00:00})// 滚动窗口(1 小时).window(TumblingProcessingTimeWindows.of(Time.hours(1)))// 聚合计算.aggregate(new AggregateFunction<UserBehavior, HourlyStats, HourlyStats>() {@Overridepublic HourlyStats createAccumulator() {// 初始化累加器return new HourlyStats(null, 0, 0, 0, 0);}@Overridepublic HourlyStats add(UserBehavior behavior, HourlyStats accumulator) {// 累加逻辑LocalDateTime statHour = behavior.getActionTime().withMinute(0).withSecond(0).withNano(0);accumulator.setStatHour(statHour);accumulator.setTotalActions(accumulator.getTotalActions() + 1);// 按行为类型累加switch (behavior.getActionType()) {case "click":accumulator.setClickCount(accumulator.getClickCount() + 1);break;case "view":accumulator.setViewCount(accumulator.getViewCount() + 1);break;case "purchase":accumulator.setPurchaseCount(accumulator.getPurchaseCount() + 1);break;}return accumulator;}@Overridepublic HourlyStats getResult(HourlyStats accumulator) {return accumulator;}@Overridepublic HourlyStats merge(HourlyStats a, HourlyStats b) {// 窗口合并(多并行度时需要)a.setTotalActions(a.getTotalActions() + b.getTotalActions());a.setClickCount(a.getClickCount() + b.getClickCount());a.setViewCount(a.getViewCount() + b.getViewCount());a.setPurchaseCount(a.getPurchaseCount() + b.getPurchaseCount());return a;}}).name("Hourly-Aggregate");// 7. 将统计结果写入 MySQL 统计表(使用 UPSERT 逻辑,避免重复)String mysqlUrl = "jdbc:mysql://localhost:3306/stat_db?useSSL=false&serverTimezone=UTC";String username = "root";  // 统计库用户名(需有写入权限)String password = "Root@123456";statsStream.addSink(JdbcSink.sink("INSERT INTO hourly_action_stats " +"(stat_hour, total_actions, click_count, view_count, purchase_count) " +"VALUES (?, ?, ?, ?, ?) " +"ON DUPLICATE KEY UPDATE " +  // 主键冲突时更新"total_actions = VALUES(total_actions), " +"click_count = VALUES(click_count), " +"view_count = VALUES(view_count), " +"purchase_count = VALUES(purchase_count)",(PreparedStatement stmt, HourlyStats stats) -> {stmt.setObject(1, stats.getStatHour());stmt.setInt(2, stats.getTotalActions());stmt.setInt(3, stats.getClickCount());stmt.setInt(4, stats.getViewCount());stmt.setInt(5, stats.getPurchaseCount());},JdbcSink.DEFAULT_BATCH_SIZE,  // 批量写入JdbcExecutionOptions.builder().withBatchSize(100).withMaxRetries(3).build(),() -> {// 配置 JDBC 连接com.mysql.cj.jdbc.Driver driver = new com.mysql.cj.jdbc.Driver();java.sql.DriverManager.getConnection(mysqlUrl, username, password);return java.sql.DriverManager.getConnection(mysqlUrl, username, password);})).name("MySQL-Stats-Sink");// 8. 执行任务env.execute("CDC-To-Kafka-And-Hourly-Stats");}
}
四、部署与运行
  1. 打包程序:使用 Maven 打包为可执行 JAR(mvn clean package -DskipTests)。
  2. 提交任务:通过 Flink 集群提交 JAR:
    bin/flink run -c com.example.CdcToKafkaAndStats /path/to/your/jar/file.jar
    
  3. 验证结果
    • user_db.user_behavior 插入测试数据:
      INSERT INTO user_behavior (user_id, action_type, action_time, product_id) 
      VALUES (1001, 'click', '2023-10-24 10:15:30', 5001);
      
    • 查看 Kafka 主题数据:
      bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mysql_cdc_user_behavior
      
    • 1 小时后(或手动触发窗口计算)查看统计表:
      SELECT * FROM stat_db.hourly_action_stats;
      
五、注意事项
  1. MySQL 8.0 兼容性:Flink CDC 2.2.1 支持 MySQL 8.0,需使用 mysql-connector-java 8.x 驱动。
  2. 时区问题:确保 Flink 集群、MySQL、Kafka 时区一致(建议使用 UTC 或统一时区)。
  3. 窗口延迟:若数据存在乱序,可在 WatermarkStrategy 中设置延迟容忍时间(如 forBoundedOutOfOrderness)。
  4. Checkpoint 配置:根据集群性能调整 Checkpoint 间隔,避免影响吞吐量。
  5. Kafka 分区:根据数据量调整 Kafka 主题分区数,提高并行处理能力。

通过以上方案,可实现从 MySQL 实时捕获新增数据、存储至 Kafka,并按小时统计结果写入 MySQL 的完整流程,满足实时数据同步与离线统计的业务需求。

http://www.dtcms.com/a/526112.html

相关文章:

  • 使页面具有动态效果的网站建设技术关键词搜索推广排行榜
  • dw asp access 网站开发优点网站手机页面如何做
  • discuz网站建设教学视频做视频网站免费观看爱
  • 电子商务网站开发技术路线网上做网站兼职
  • 百度搜索网站数字选用平台介绍
  • 制作简历seo搜索引擎优化招聘
  • 网站返回首页怎么做的好看虚拟产品网站
  • 医院NTP授时母钟功能介绍和应用价值
  • 【电路基础篇】:认识电流、电压、功率
  • 专业外贸网站开发wordpress手机菜单没有反应
  • 衡阳网站建设步骤网页设计免费模板图片
  • 上海浦东网站建设公司网站开发 源代码
  • 做网站大概需要几个人手机官网制作
  • 广州制作公司网站个人怎么注册网站流程
  • 中职国示范建设网站服务器iis搭建网站
  • 做网站会被捉吗网络营销的策划流程
  • 深圳正规的保安公司网站建设加推广优化
  • 对外网站建设情况汇报中国中建设计集团有限公司网站
  • 2025年MathorCup数学应用挑战赛---大数据竞赛赛题分析
  • 商城网站建设论文怎么做自助交易网站
  • 东莞浩智网站建设公司免费制作图文的软件
  • 效率革命:蓝光三维扫描仪如何赋能汽车零部件装配孔位与全尺寸检测
  • 做设计有必要买素材网站会员中国建设报官网
  • 宗亲网站开发6网站制作成功后怎么使用
  • 网站建设明细价单网页设计与制作精品课程网站
  • 在线编程网站开发交互式网站开发技术有哪些
  • 解构全球经营复杂性:业财一体化是出海企业的财务底盘
  • std::cout打印不同颜色的字符到终端
  • 做网站杭州傲视信息网站备案 名称 不一致吗
  • 网站名查询网址如何给自己的网站做外链