当前位置: 首页 > news >正文

西部空间怎样上传网站2019建一个什么网站最好

西部空间怎样上传网站,2019建一个什么网站最好,深圳创业印章,intitle:做网站下面我将创建一个完整的 Spring Boot 项目,使用 Flink CDC 3.0 基于 MySQL 的 binlog 实现数据同步到 Elasticsearch。 项目概述 这个项目将: 使用 Flink CDC 连接 MySQL 并读取 binlog处理数据变化(插入、更新、删除)将数据同步到…

下面我将创建一个完整的 Spring Boot 项目,使用 Flink CDC 3.0 基于 MySQL 的 binlog 实现数据同步到 Elasticsearch。

项目概述

这个项目将:

  1. 使用 Flink CDC 连接 MySQL 并读取 binlog
  2. 处理数据变化(插入、更新、删除)
  3. 将数据同步到 Elasticsearch
  4. 提供 REST API 管理同步任务

项目结构

src/main/java/
├── com/example/cdc/
│   ├── config/
│   │   ├── FlinkConfig.java
│   │   └── ElasticsearchConfig.java
│   ├── model/
│   │   └── User.java
│   ├── service/
│   │   ├── SyncService.java
│   │   └── JobManager.java
│   ├── controller/
│   │   └── SyncController.java
│   └── FlinkCdcApplication.java

1. 添加依赖 (pom.xml)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>FlinkCDC</artifactId><version>0.0.1-SNAPSHOT</version><name>FlinkCDC</name><description>FlinkCDC</description><properties><java.version>11</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>2.7.6</spring-boot.version><flink.version>1.16.0</flink.version><flink-cdc.version>3.0.1</flink-cdc.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Flink CDC --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flink-cdc.version}</version></dependency><!-- Flink Connector Elasticsearch --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7</artifactId><version>${flink.version}</version></dependency><!-- Flink JSON --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><!-- Flink Java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- Flink CLI --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.36</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>11</source><target>11</target><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring-boot.version}</version><configuration><mainClass>com.example.cdc.FlinkCdcApplication</mainClass><skip>true</skip></configuration><executions><execution><id>repackage</id><goals><goal>repackage</goal></goals></execution></executions></plugin></plugins></build>
</project>

2. 数据模型 (User.java)

package com.example.cdc.model;import lombok.Data;@Data
public class User {private Long id;private String name;private String email;private Long createdAt;private Long updatedAt;private Boolean deleted;
}

3. Flink 配置 (FlinkConfig.java)

package com.example.cdc.config;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FlinkConfig {@Beanpublic StreamExecutionEnvironment streamExecutionEnvironment() {return StreamExecutionEnvironment.getExecutionEnvironment();}
}

4. Elasticsearch 配置 (ElasticsearchConfig.java)

package com.example.cdc.config;import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ElasticsearchConfig {@Value("${elasticsearch.host:localhost}")private String host;@Value("${elasticsearch.port:9200}")private int port;@Bean(destroyMethod = "close")public RestHighLevelClient restHighLevelClient() {return new RestHighLevelClient(RestClient.builder(new HttpHost(host, port, "http")));}
}

5. 同步服务 (SyncService.java)

package com.example.cdc.service;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;import java.util.ArrayList;
import java.util.List;@Service
public class SyncService {@Autowiredprivate StreamExecutionEnvironment env;@Value("${mysql.host:localhost}")private String mysqlHost;@Value("${mysql.port:3306}")private int mysqlPort;@Value("${mysql.username:root}")private String mysqlUsername;@Value("${mysql.password:password}")private String mysqlPassword;@Value("${mysql.database:test}")private String mysqlDatabase;@Value("${mysql.table:users}")private String mysqlTable;@Value("${elasticsearch.host:localhost}")private String esHost;@Value("${elasticsearch.port:9200}")private int esPort;@Value("${elasticsearch.index:users}")private String esIndex;public void startSync() throws Exception {// 创建 MySQL CDC SourceMySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname(mysqlHost).port(mysqlPort).databaseList(mysqlDatabase).tableList(mysqlDatabase + "." + mysqlTable).username(mysqlUsername).password(mysqlPassword).deserializer(new JsonDebeziumDeserializationSchema()).build();// 创建数据流DataStream<String> stream = env.fromSource(mySqlSource,org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(),"MySQL Source");// 转换和处理数据DataStream<String> processedStream = stream.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {// 这里可以添加自定义的数据处理逻辑return value;}});// 配置 Elasticsearch SinkList<HttpHost> httpHosts = new ArrayList<>();httpHosts.add(new HttpHost(esHost, esPort, "http"));ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,(element, ctx, indexer) -> {IndexRequest request = Requests.indexRequest().index(esIndex).source(element, XContentType.JSON);indexer.add(request);});// 配置批量请求esSinkBuilder.setBulkFlushMaxActions(1);// 将数据发送到 Elasticsearch - 使用 addSink 而不是 sinkToprocessedStream.addSink(esSinkBuilder.build());// 启动任务env.execute("MySQL to Elasticsearch Sync");}
}

6. 任务管理器 (JobManager.java)

package com.example.cdc.service;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PreDestroy;@Component
public class JobManager {@Autowiredprivate StreamExecutionEnvironment env;@Autowiredprivate SyncService syncService;private Thread jobThread;public void startJob() {jobThread = new Thread(() -> {try {syncService.startSync();} catch (Exception e) {e.printStackTrace();}});jobThread.start();}public void stopJob() {if (env != null) {try {env.close();} catch (Exception e) {e.printStackTrace();}}if (jobThread != null && jobThread.isAlive()) {jobThread.interrupt();}}@PreDestroypublic void onDestroy() {stopJob();}
}

7. REST 控制器 (SyncController.java)

package com.example.cdc.controller;import com.example.cdc.service.JobManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/api/sync")
public class SyncController {@Autowiredprivate JobManager jobManager;@PostMapping("/start")public String startSync() {try {jobManager.startJob();return "Sync job started successfully";} catch (Exception e) {return "Failed to start sync job: " + e.getMessage();}}@PostMapping("/stop")public String stopSync() {try {jobManager.stopJob();return "Sync job stopped successfully";} catch (Exception e) {return "Failed to stop sync job: " + e.getMessage();}}
}

8. 应用主类 (FlinkCdcApplication.java)

package com.example.cdc;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class FlinkCdcApplication {public static void main(String[] args) {SpringApplication.run(FlinkCdcApplication.class, args);}
}

9. 配置文件 (application.yml)

server:port: 8080spring:application:name: mysql-cdc-to-esmysql:host: localhostport: 3306username: rootpassword: your_mysql_passworddatabase: your_databasetable: your_tableelasticsearch:host: localhostport: 9200index: your_es_indexflink:parallelism: 1

使用说明

  1. 确保 MySQL 已开启 binlog:

    SHOW VARIABLES LIKE 'log_bin';
    

    如果未开启,需要在 MySQL 配置文件中添加:

    [mysqld]
    server-id=1
    log-bin=mysql-bin
    binlog_format=row
    binlog_row_image=full
    
  2. 创建具有复制权限的 MySQL 用户:

    CREATE USER 'flink_user'@'%' IDENTIFIED BY 'password';
    GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink_user'@'%';
    FLUSH PRIVILEGES;
    
  3. 启动应用程序:

    mvn spring-boot:run
    
  4. 通过 REST API 启动同步任务:

    POST http://localhost:8080/api/sync/start
    
http://www.dtcms.com/a/468492.html

相关文章:

  • 如何登录网站备案网站建设办公软件销售技巧
  • 建一个网站需要哪些东西三合一网站什么意思
  • 重庆房产信息网官网外贸seo网站制作
  • 商城建设网站开发住房和城乡建设部网站31号文
  • 北京城建设计集团网站深圳鹏洲建设工程有限公司网站
  • 专业做二手网站有哪些经典营销案例分析
  • 黄山网站建设哪家好企业网站搭建的优点
  • 可信网站认证查询郑州做网站擎天
  • 昆明网站排名优化价格东莞百度seo排名
  • 深圳宝安区住房和建设局网站网店推广新趋势
  • 宣城做w网站的公司gif素材网站推荐
  • 中国和城乡建设部网站开源商城小程序
  • 泰州网站制作公司中国建设银行保函查询网站
  • 做非洲出口的网站自我介绍的网页设计作业
  • 邢台网站优化公司企业级网站开发与部署
  • 珠海网站建设服务网站制作论文文献综述
  • 做折页的网站网站建设的平台
  • 做网站要有哪些知识大连网站建设大连
  • 郑州市惠济区城乡建设局网站淄博百度
  • 做网页前端接活网站网络建设服务与网站运营推广
  • 做建材加盟什么网站好wordpress数据库设置密码
  • 网站静态页有哪些做问卷调查的网站
  • 中细软做的网站dede网站 设置404 错误页面
  • 有限责任公司公司章程范本自动app优化最新版
  • 基于ASP与Access数据库的网站开发为什么电脑打开那个做网站都是那一个
  • 找做网站的上什么app网络服务器异常是怎么回事
  • 建立企业的网站有哪几种方案郑州建设信息网首页图
  • 如何开发一个视频网站中国设计网址
  • 公司做网站需要网站维护人员吗宜春网站建设联系方式
  • 唐山正规做网站的公司哪家好检察院做网站的目的