Java应用Flink CDC监听MySQL数据变动内容输出到控制台
文章目录
- maven 依赖
- 自定义数据变化处理器
- flink cdc监听
- 验证
maven 依赖
<properties><flink.version>1.14.0</flink.version><flink-cdc.version>2.3.0</flink-cdc.version></properties><dependencies><!-- Flink dependencies --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>${flink-cdc.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.12</artifactId><version>${flink.version}</version></dependency></dependencies>
自定义数据变化处理器
package org.example;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;public class CustomSink extends RichSinkFunction<String> {@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);}@Overridepublic void close() throws Exception {super.close();}@Overridepublic void invoke(String value, Context context) throws Exception {//0P字段,该字段也有4种取值。分别是C(Create ) , U(Updlate) . D(Delete ),Read 。// 对于U操作,其数据部分同时包含了Before和After.System.out.println(">>>" + value);}
}
flink cdc监听
package org.example;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class MysqlSourceExample {public static void main(String[] args) throws Exception {DebeziumDeserializationSchema debeziumDeserializationSchema = new JsonDebeziumDeserializationSchema();MySqlSource<String> source = MySqlSource.builder().hostname("127.0.0.1").port(3306).databaseList("canal_manager")// set captured database.tableList("canal_manager.canal_user")// set captured table.startupOptions(StartupOptions.latest()) // 设置从最新的修改记录开始读取.username("root").password("123456").deserializer(debeziumDeserializationSchema) // converts SourceRecord to JSON string.includeSchemaChanges(true).build();//启动一个webuI。Configuration configuration = new Configuration();configuration.setInteger(RestOptions.PORT, 8081);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);//检者点间隔时间env .enableCheckpointing(5000);DataStreamSink<String> sink = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source").addSink(new CustomSink());env.execute();}
}
验证
启动后web页面地址访问http://localhost:8081/,MySQL数据库canal_manager中的表canal_user数据发生修改,控制台有输出json: