Flink CDC实时数据同步与小时级统计方案(Flink 1.13.5 + MySQL 8.0)
Flink CDC 实时数据同步与小时级统计方案(Flink 1.13.5 + MySQL 8.0)
一、方案概述
本方案基于 Flink 1.13.5 和 MySQL 8.0,通过 Flink CDC 监听 MySQL 源表的新增数据,实时同步至 Kafka,并对数据按小时进行聚合统计,最终将统计结果写入 MySQL 统计表。
- 核心流程:MySQL 数据变更 → Flink CDC 捕获 → Kafka 存储(原始数据)→ Flink 实时聚合(每小时)→ MySQL 统计表(结果存储)
- Kafka 主题名称:
mysql_cdc_user_behavior(以用户行为数据为例,可根据业务调整)
二、环境准备
1. 软件版本
- Flink:1.13.5(需提前部署集群或使用本地模式)
- MySQL:8.0(源数据库 + 统计结果数据库,可同一实例或分库)
- Kafka:2.8.1(用于存储 CDC 原始数据,需提前部署)
- JDK:1.8
- Maven:3.6+
2. 依赖配置(pom.xml)
<dependencies><!-- Flink 核心依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.13.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.13.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>1.13.5</version></dependency><!-- Flink CDC 依赖(MySQL 8.0 兼容版本) --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.2.1</version> <!-- 与 Flink 1.13.5 兼容 --></dependency><!-- Kafka 连接器 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.13.5</version></dependency><!-- MySQL JDBC 驱动(用于写入统计结果) --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version></dependency><!-- Flink JDBC 连接器 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>1.13.5</version></dependency>
</dependencies>
3. 数据库配置
(1)源数据库(MySQL 8.0)配置
需开启 binlog 并授权 CDC 用户:
- 修改 MySQL 配置文件(
my.cnf或my.ini):[mysqld] server-id = 100 # 唯一标识,避免与其他实例冲突 log_bin = /var/lib/mysql/mysql-bin # binlog 存储路径 binlog_format = ROW # 必须为 ROW 格式 binlog_row_image = FULL # 记录完整行数据 expire_logs_days = 7 # binlog 保留 7 天(避免全量同步时丢失) - 重启 MySQL 并验证配置:
SHOW VARIABLES LIKE 'log_bin'; -- 结果为 ON SHOW VARIABLES LIKE 'binlog_format'; -- 结果为 ROW - 创建 CDC 专用用户并授权:
CREATE USER 'cdc_user'@'%' IDENTIFIED BY 'Cdc@123456'; -- 密码需符合 MySQL 8.0 复杂度要求 GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc_user'@'%'; FLUSH PRIVILEGES; - 创建源表(以用户行为表为例):
CREATE DATABASE IF NOT EXISTS user_db; USE user_db; CREATE TABLE user_behavior (id BIGINT PRIMARY KEY AUTO_INCREMENT,user_id BIGINT NOT NULL,action_type VARCHAR(20) NOT NULL, -- 行为类型:click, view, purchase 等action_time DATETIME NOT NULL, -- 行为时间product_id BIGINT NOT NULL -- 商品 ID );
(2)统计结果数据库配置
创建统计结果表(可与源库同实例,也可单独部署):
CREATE DATABASE IF NOT EXISTS stat_db;
USE stat_db;
CREATE TABLE hourly_action_stats (stat_hour DATETIME PRIMARY KEY, -- 统计小时(如 2023-10-24 10:00:00)total_actions INT NOT NULL, -- 总行为数click_count INT NOT NULL, -- 点击数view_count INT NOT NULL, -- 浏览数purchase_count INT NOT NULL, -- 购买数update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
4. Kafka 配置
- 确保 Kafka 集群正常运行,创建主题:
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic mysql_cdc_user_behavior --partitions 3 --replication-factor 1
三、核心代码实现
1. 数据结构定义(POJO 类)
// 用户行为数据实体类(与源表字段对应)
public class UserBehavior {private Long id;private Long userId;private String actionType;private LocalDateTime actionTime;private Long productId;// 无参构造器(Flink 序列化要求)public UserBehavior() {}// 全参构造器、getter、setter、toString 方法// ...
}// 小时统计结果实体类(与统计表字段对应)
public class HourlyStats {private LocalDateTime statHour; // 统计小时(精确到小时)private Integer totalActions;private Integer clickCount;private Integer viewCount;private Integer purchaseCount;// 无参构造器、全参构造器、getter、setter// ...
}
2. 主程序逻辑
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.connector.jdbc.JdbcSink;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.sql.PreparedStatement;public class CdcToKafkaAndStats {public static void main(String[] args) throws Exception {// 1. 初始化 Flink 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 启用 Checkpoint(确保 Exactly-Once,每 5 分钟一次)env.enableCheckpointing(300000);env.getCheckpointConfig().setCheckpointTimeout(60000);// 2. 创建 MySQL CDC 源(监听 user_behavior 表的新增数据)MySqlSource<String> mysqlSource = MySqlSource.<String>builder().hostname("localhost").port(3306).databaseList("user_db") // 源数据库.tableList("user_db.user_behavior") // 源表.username("cdc_user").password("Cdc@123456")// 反序列化:将 Debezium 数据转为 JSON 字符串.deserializationSchema(new JsonDebeziumDeserializationSchema()).build();// 3. 读取 CDC 数据,过滤出新增操作(op = 'c')DataStream<String> cdcJsonStream = env.fromSource(mysqlSource,WatermarkStrategy.noWatermarks(),"MySQL-CDC-Source").filter(jsonStr -> {ObjectMapper mapper = new ObjectMapper();try {JsonNode jsonNode = mapper.readTree(jsonStr);return "c".equals(jsonNode.get("op").asText()); // 只处理新增数据} catch (Exception e) {return false;}});// 4. 将 CDC 原始数据写入 KafkaString kafkaBroker = "localhost:9092";String topic = "mysql_cdc_user_behavior";FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(kafkaBroker,topic,new SimpleStringSchema());cdcJsonStream.addSink(kafkaSink).name("Kafka-Sink");// 5. 解析 JSON 为 UserBehavior 对象DataStream<UserBehavior> behaviorStream = cdcJsonStream.map(jsonStr -> {ObjectMapper mapper = new ObjectMapper();JsonNode jsonNode = mapper.readTree(jsonStr);JsonNode afterNode = jsonNode.get("after"); // 新增数据在 after 字段DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");return new UserBehavior(afterNode.get("id").asLong(),afterNode.get("user_id").asLong(),afterNode.get("action_type").asText(),LocalDateTime.parse(afterNode.get("action_time").asText(), formatter),afterNode.get("product_id").asLong());}).name("Parse-To-UserBehavior");// 6. 按小时窗口聚合统计DataStream<HourlyStats> statsStream = behaviorStream// 按统计小时分组(将 actionTime 截断到小时).keyBy(behavior -> {LocalDateTime actionTime = behavior.getActionTime();return actionTime.withMinute(0).withSecond(0).withNano(0); // 如 10:23:45 → 10:00:00})// 滚动窗口(1 小时).window(TumblingProcessingTimeWindows.of(Time.hours(1)))// 聚合计算.aggregate(new AggregateFunction<UserBehavior, HourlyStats, HourlyStats>() {@Overridepublic HourlyStats createAccumulator() {// 初始化累加器return new HourlyStats(null, 0, 0, 0, 0);}@Overridepublic HourlyStats add(UserBehavior behavior, HourlyStats accumulator) {// 累加逻辑LocalDateTime statHour = behavior.getActionTime().withMinute(0).withSecond(0).withNano(0);accumulator.setStatHour(statHour);accumulator.setTotalActions(accumulator.getTotalActions() + 1);// 按行为类型累加switch (behavior.getActionType()) {case "click":accumulator.setClickCount(accumulator.getClickCount() + 1);break;case "view":accumulator.setViewCount(accumulator.getViewCount() + 1);break;case "purchase":accumulator.setPurchaseCount(accumulator.getPurchaseCount() + 1);break;}return accumulator;}@Overridepublic HourlyStats getResult(HourlyStats accumulator) {return accumulator;}@Overridepublic HourlyStats merge(HourlyStats a, HourlyStats b) {// 窗口合并(多并行度时需要)a.setTotalActions(a.getTotalActions() + b.getTotalActions());a.setClickCount(a.getClickCount() + b.getClickCount());a.setViewCount(a.getViewCount() + b.getViewCount());a.setPurchaseCount(a.getPurchaseCount() + b.getPurchaseCount());return a;}}).name("Hourly-Aggregate");// 7. 将统计结果写入 MySQL 统计表(使用 UPSERT 逻辑,避免重复)String mysqlUrl = "jdbc:mysql://localhost:3306/stat_db?useSSL=false&serverTimezone=UTC";String username = "root"; // 统计库用户名(需有写入权限)String password = "Root@123456";statsStream.addSink(JdbcSink.sink("INSERT INTO hourly_action_stats " +"(stat_hour, total_actions, click_count, view_count, purchase_count) " +"VALUES (?, ?, ?, ?, ?) " +"ON DUPLICATE KEY UPDATE " + // 主键冲突时更新"total_actions = VALUES(total_actions), " +"click_count = VALUES(click_count), " +"view_count = VALUES(view_count), " +"purchase_count = VALUES(purchase_count)",(PreparedStatement stmt, HourlyStats stats) -> {stmt.setObject(1, stats.getStatHour());stmt.setInt(2, stats.getTotalActions());stmt.setInt(3, stats.getClickCount());stmt.setInt(4, stats.getViewCount());stmt.setInt(5, stats.getPurchaseCount());},JdbcSink.DEFAULT_BATCH_SIZE, // 批量写入JdbcExecutionOptions.builder().withBatchSize(100).withMaxRetries(3).build(),() -> {// 配置 JDBC 连接com.mysql.cj.jdbc.Driver driver = new com.mysql.cj.jdbc.Driver();java.sql.DriverManager.getConnection(mysqlUrl, username, password);return java.sql.DriverManager.getConnection(mysqlUrl, username, password);})).name("MySQL-Stats-Sink");// 8. 执行任务env.execute("CDC-To-Kafka-And-Hourly-Stats");}
}
四、部署与运行
- 打包程序:使用 Maven 打包为可执行 JAR(
mvn clean package -DskipTests)。 - 提交任务:通过 Flink 集群提交 JAR:
bin/flink run -c com.example.CdcToKafkaAndStats /path/to/your/jar/file.jar - 验证结果:
- 向
user_db.user_behavior插入测试数据:INSERT INTO user_behavior (user_id, action_type, action_time, product_id) VALUES (1001, 'click', '2023-10-24 10:15:30', 5001); - 查看 Kafka 主题数据:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mysql_cdc_user_behavior - 1 小时后(或手动触发窗口计算)查看统计表:
SELECT * FROM stat_db.hourly_action_stats;
- 向
五、注意事项
- MySQL 8.0 兼容性:Flink CDC 2.2.1 支持 MySQL 8.0,需使用
mysql-connector-java 8.x驱动。 - 时区问题:确保 Flink 集群、MySQL、Kafka 时区一致(建议使用 UTC 或统一时区)。
- 窗口延迟:若数据存在乱序,可在
WatermarkStrategy中设置延迟容忍时间(如forBoundedOutOfOrderness)。 - Checkpoint 配置:根据集群性能调整 Checkpoint 间隔,避免影响吞吐量。
- Kafka 分区:根据数据量调整 Kafka 主题分区数,提高并行处理能力。
通过以上方案,可实现从 MySQL 实时捕获新增数据、存储至 Kafka,并按小时统计结果写入 MySQL 的完整流程,满足实时数据同步与离线统计的业务需求。
