当前位置: 首页 > news >正文

使用 Apache Flink CDC 3.0 实现 MySQL 到 Elasticsearch 的数据同步

下面我将创建一个完整的 Spring Boot 项目,使用 Flink CDC 3.0 基于 MySQL 的 binlog 实现数据同步到 Elasticsearch。

项目概述

这个项目将:

  1. 使用 Flink CDC 连接 MySQL 并读取 binlog
  2. 处理数据变化(插入、更新、删除)
  3. 将数据同步到 Elasticsearch
  4. 提供 REST API 管理同步任务

项目结构

src/main/java/
├── com/example/cdc/
│   ├── config/
│   │   ├── FlinkConfig.java
│   │   └── ElasticsearchConfig.java
│   ├── model/
│   │   └── User.java
│   ├── service/
│   │   ├── SyncService.java
│   │   └── JobManager.java
│   ├── controller/
│   │   └── SyncController.java
│   └── FlinkCdcApplication.java

1. 添加依赖 (pom.xml)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>FlinkCDC</artifactId><version>0.0.1-SNAPSHOT</version><name>FlinkCDC</name><description>FlinkCDC</description><properties><java.version>11</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>2.7.6</spring-boot.version><flink.version>1.16.0</flink.version><flink-cdc.version>3.0.1</flink-cdc.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Flink CDC --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flink-cdc.version}</version></dependency><!-- Flink Connector Elasticsearch --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7</artifactId><version>${flink.version}</version></dependency><!-- Flink JSON --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><!-- Flink Java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- Flink CLI --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.36</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>11</source><target>11</target><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring-boot.version}</version><configuration><mainClass>com.example.cdc.FlinkCdcApplication</mainClass><skip>true</skip></configuration><executions><execution><id>repackage</id><goals><goal>repackage</goal></goals></execution></executions></plugin></plugins></build>
</project>

2. 数据模型 (User.java)

package com.example.cdc.model;import lombok.Data;@Data
public class User {private Long id;private String name;private String email;private Long createdAt;private Long updatedAt;private Boolean deleted;
}

3. Flink 配置 (FlinkConfig.java)

package com.example.cdc.config;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FlinkConfig {@Beanpublic StreamExecutionEnvironment streamExecutionEnvironment() {return StreamExecutionEnvironment.getExecutionEnvironment();}
}

4. Elasticsearch 配置 (ElasticsearchConfig.java)

package com.example.cdc.config;import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ElasticsearchConfig {@Value("${elasticsearch.host:localhost}")private String host;@Value("${elasticsearch.port:9200}")private int port;@Bean(destroyMethod = "close")public RestHighLevelClient restHighLevelClient() {return new RestHighLevelClient(RestClient.builder(new HttpHost(host, port, "http")));}
}

5. 同步服务 (SyncService.java)

package com.example.cdc.service;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;import java.util.ArrayList;
import java.util.List;@Service
public class SyncService {@Autowiredprivate StreamExecutionEnvironment env;@Value("${mysql.host:localhost}")private String mysqlHost;@Value("${mysql.port:3306}")private int mysqlPort;@Value("${mysql.username:root}")private String mysqlUsername;@Value("${mysql.password:password}")private String mysqlPassword;@Value("${mysql.database:test}")private String mysqlDatabase;@Value("${mysql.table:users}")private String mysqlTable;@Value("${elasticsearch.host:localhost}")private String esHost;@Value("${elasticsearch.port:9200}")private int esPort;@Value("${elasticsearch.index:users}")private String esIndex;public void startSync() throws Exception {// 创建 MySQL CDC SourceMySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname(mysqlHost).port(mysqlPort).databaseList(mysqlDatabase).tableList(mysqlDatabase + "." + mysqlTable).username(mysqlUsername).password(mysqlPassword).deserializer(new JsonDebeziumDeserializationSchema()).build();// 创建数据流DataStream<String> stream = env.fromSource(mySqlSource,org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(),"MySQL Source");// 转换和处理数据DataStream<String> processedStream = stream.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {// 这里可以添加自定义的数据处理逻辑return value;}});// 配置 Elasticsearch SinkList<HttpHost> httpHosts = new ArrayList<>();httpHosts.add(new HttpHost(esHost, esPort, "http"));ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,(element, ctx, indexer) -> {IndexRequest request = Requests.indexRequest().index(esIndex).source(element, XContentType.JSON);indexer.add(request);});// 配置批量请求esSinkBuilder.setBulkFlushMaxActions(1);// 将数据发送到 Elasticsearch - 使用 addSink 而不是 sinkToprocessedStream.addSink(esSinkBuilder.build());// 启动任务env.execute("MySQL to Elasticsearch Sync");}
}

6. 任务管理器 (JobManager.java)

package com.example.cdc.service;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PreDestroy;@Component
public class JobManager {@Autowiredprivate StreamExecutionEnvironment env;@Autowiredprivate SyncService syncService;private Thread jobThread;public void startJob() {jobThread = new Thread(() -> {try {syncService.startSync();} catch (Exception e) {e.printStackTrace();}});jobThread.start();}public void stopJob() {if (env != null) {try {env.close();} catch (Exception e) {e.printStackTrace();}}if (jobThread != null && jobThread.isAlive()) {jobThread.interrupt();}}@PreDestroypublic void onDestroy() {stopJob();}
}

7. REST 控制器 (SyncController.java)

package com.example.cdc.controller;import com.example.cdc.service.JobManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/api/sync")
public class SyncController {@Autowiredprivate JobManager jobManager;@PostMapping("/start")public String startSync() {try {jobManager.startJob();return "Sync job started successfully";} catch (Exception e) {return "Failed to start sync job: " + e.getMessage();}}@PostMapping("/stop")public String stopSync() {try {jobManager.stopJob();return "Sync job stopped successfully";} catch (Exception e) {return "Failed to stop sync job: " + e.getMessage();}}
}

8. 应用主类 (FlinkCdcApplication.java)

package com.example.cdc;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class FlinkCdcApplication {public static void main(String[] args) {SpringApplication.run(FlinkCdcApplication.class, args);}
}

9. 配置文件 (application.yml)

server:port: 8080spring:application:name: mysql-cdc-to-esmysql:host: localhostport: 3306username: rootpassword: your_mysql_passworddatabase: your_databasetable: your_tableelasticsearch:host: localhostport: 9200index: your_es_indexflink:parallelism: 1

使用说明

  1. 确保 MySQL 已开启 binlog:

    SHOW VARIABLES LIKE 'log_bin';
    

    如果未开启,需要在 MySQL 配置文件中添加:

    [mysqld]
    server-id=1
    log-bin=mysql-bin
    binlog_format=row
    binlog_row_image=full
    
  2. 创建具有复制权限的 MySQL 用户:

    CREATE USER 'flink_user'@'%' IDENTIFIED BY 'password';
    GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink_user'@'%';
    FLUSH PRIVILEGES;
    
  3. 启动应用程序:

    mvn spring-boot:run
    
  4. 通过 REST API 启动同步任务:

    POST http://localhost:8080/api/sync/start
    
http://www.dtcms.com/a/343691.html

相关文章:

  • 回归测试的重要性与实践指南
  • 十年磨一剑!Apache Hive 性能优化演进全史(2013 - )
  • Ubuntu部署K8S集群
  • unistd.h 常用函数速查表
  • 论文精读(三)|智能合约漏洞检测技术综述
  • 《WINDOWS 环境下32位汇编语言程序设计》第7章 图形操作(1)
  • Redis内存架构解析与性能优化实战
  • 通用的嵌入式 Linux 系统镜像制作流程
  • STM32F103RC的USB上拉电阻1.5K
  • MongoDB 从入门到实践:全面掌握文档型 NoSQL 数据库核心操作
  • 基于Node.js服务端的社区报修管理系统/基于express的在线报修管理系统
  • (论文速读)RandAR:突破传统限制的随机顺序图像自回归生成模型
  • 基于C#的宠物医院管理系统/基于asp.net的宠物医院管理系统
  • 开源 python 应用 开发(十)音频压缩
  • AI时代的“双刃剑”:效率革命与人文焦虑的碰撞
  • week3-[二维数组]小方块
  • 靶机 - SAR
  • UVa1472/LA4980 Hanging Hats
  • C++的指针和引用:
  • C++部署Yolov5模型流程记录
  • flutter geolocator Android国内定位失败问题解决
  • Redis事务全解析:从秒杀案例看原子操作实现
  • C#_接口设计:角色与契约的分离
  • 【C语言强化训练16天】--从基础到进阶的蜕变之旅:Day10
  • 树莓派采集、计算机推理:基于GStreamer的YOLOv5实现方案
  • Codeforces Round 1043 (Div.3)
  • AI生成技术报告:GaussDB与openGauss的HTAP功能全面对比
  • Vue 插槽(Slots)全解析2
  • 大数据毕业设计推荐:基于Hadoop+Spark的手机信息分析系统完整方案
  • 使用GMail API 发送邮箱