「Flink」Flink项目搭建方法介绍
1.背景
做数据分析处理,平时主要使用DataWorks进行数据的离线处理,通常处理的是昨天及以前的数据,但业务上有些数据实时的数据价值更大,需要进行数据流的实时获取、处理和展示,那这时候使用Flink进行实时数据流处理是个很好的技术方案
2.Fink介绍
Flink是Apache Flink的简称。Flink是一款开源的流处理框架,专注于处理无界和有界数据流,具有高吞吐、低延迟、精准状态一致性等核心特性。它支持事件时间处理、精确一次状态保证,并适用于实时分析、事件驱动应用、数据管道构建等场景,是大数据生态中的重要工具。
主要特性有:
1.高性能与低延迟:每秒可处理数百万事件,毫秒级延迟响应,满足实时计算需求。
2.事件事件处理与状态一致性:
支持事件时间(Event-Time)语义,能正确处理乱序数据流,确保计算结果准确性
3.高可用性与扩展性:原生支持与 Kubernetes、YARN 等资源管理工具集成,故障恢复快,支持动态扩缩容,保障 7×24 小时稳定运行
主要应用的场景:
-
事件驱动型应用:通过实时处理事件流触发计算逻辑,如实时推荐系统等
-
流批一体化分析:支持连续查询(无界流)与传统批处理(有界数据流),如实时日志分析,历史数据统计
-
数据管道与ETL:在不同存储系统间高效转换和迁移数据,替代部分传统批量ETL工具
3.项目搭建
Apache Flink官网
Apache Flink开发文档
可以通过Flink的开发文档中的介绍方式进行导入到本地进行开发,目前我这边都使用的方式是通过Maven导入Flink Jar包的方式在IDEA中进行开发。
开发环境:
Java 8
Flink 1.17.0
Maven文件
<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.17.0</flink.version>
</properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.4.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.1-1.17</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency><!-- JSON解析 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.aliyun.openservices</groupId><artifactId>flink-log-connector</artifactId><version>0.1.38</version></dependency><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>2.5.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.28</version></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.39</version></dependency></dependencies>
我这里除了依赖了Flink,还依赖了Flink CDC,和一些Flink Connect用于与数据库进行连接,便于数据流的输入和输出
4.项目开发
根据Flink的开发文档中介绍,Flink的项目主要为这几步骤
public static void main(String[] args) throws Exception {// 1.获取Flink环境//`StreamExecutionEnvironment` 是所有 Flink 程序的基础。//如何使用IDEA进行运行,那么它将创建一个本地环境,该环境将在你的本地机器上执行你的程序。StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.指定数据源FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("file:///path/to/file")).build();// 3.将数据源进行一定的业务操作fileSource.process();// 4.(可选)将数据源进行输出stream.sinkTo(FileSink.forRowFormat(new Path("outputPath"), new SimpleStringEncoder<>()).build());// 5.本地执行环境 env.execute("Window WordCount");}
接下来将对于重点的2、3、4步骤点进行分别介绍,省流版:可以直接跳到5.完整项目举例进行查看完整项目代码
指定输入数据源
这里就是输入原始数据流的,连接数据库,将需要处理的数据输入至项目中。这里读取的是数据库的binglog,所以需要将数据库的binglog模式进行打开。这里以连接本地Mysql数据库进行举例说明
1.打开Mysql binglog模式
在 [mysqld]
部分添加以下配置:
[mysqld]
# 启用 binlog
server-id = 1
log_bin = /opt/homebrew/var/mysql/mysql-bin.log
binlog_format = ROW
binlog_row_image = FULL
expire_logs_days = 10
max_binlog_size = 100M# 启用 GTID (推荐)
gtid_mode = ON
enforce_gtid_consistency = ON# 必须设置的事务隔离级别
transaction_isolation = READ-COMMITTED
2.验证Mysql binglog是否打开
SHOW VARIABLES LIKE 'log_bin'; -- 应为 ON
SHOW VARIABLES LIKE 'binlog_format'; -- 应为 ROW
SHOW VARIABLES LIKE 'binlog_row_image'; -- 应为 FULL
SHOW VARIABLES LIKE 'gtid_mode'; -- 应为 ON
3.Flink CDC链接本地Mysql数据库
这里链接的是我本地Mysql数据库,需要读取的Scheme是local,读取的库是flink_input,然后读取的数据是更新的数据StartupOptions.latest(),如果需要读取全部数据则设置为StartupOptions.initial(),然后也添加了JDBC的一些配置。
Flink CDC中自带许多系统的Source,如KafKaSource、MysqlSource,还可以通过一些读取本地文件的方式进行Source输入。
此外,有时候需要两个输入源进行数据比较,或者数据合并的时候,就可以设置两个Source进行分别输入。
MySqlSource<DataChangeInfo> inputSource = MySqlSource.<DataChangeInfo>builder().hostname("localhost").port(3306).databaseList("local").tableList("local.flink_input").username("root").password("********").deserializer(new MysqlDeserialization()).startupOptions(StartupOptions.latest()).jdbcProperties(createJdbcProperties()).build();private static Properties createJdbcProperties() {Properties props = new Properties();props.setProperty("useUnicode", "true");props.setProperty("characterEncoding", "UTF-8");props.setProperty("zeroDateTimeBehavior", "convertToNull");props.setProperty("serverTimezone", "Asia/Shanghai");props.setProperty("autoReconnect", "true");props.setProperty("allowMultiQueries", "true");return props;
}
将数据源进行业务处理
Flink最主要的功能就是将数据进行一定的处理,首先在Flink中分为流处理和批处理,然后Flink处理数据的时间分为以系统时间戳时间进行处理和以事件时间进行处理,并且设定窗口,将窗口中数据最后交给算子进行处理。下面我将依次介绍
1.流处理/批处理
流处理还是批处理,这主要是依据数据流是有界的还是无界的,这两者具体的区别如下
-
流处理 (Stream Processing) :
- 核心概念: 处理持续不断、理论上永无止境的数据流。
- 数据特性: 处理的是无界数据流 (Unbounded Data Stream) 。数据没有明确的终点,会持续不断地产生和到达(例如,来自 Kafka 的实时交易数据、传感器读数、用户活动日志)。
- 执行模型: 增量计算。数据到达一条处理一条(或微批次),计算结果持续产生和更新。需要管理状态(State)、处理时间(Time)和乱序数据(Watermark)。
- API: 主要使用
DataStream API
。
-
批处理 (Batch Processing) :
- 核心概念: 处理有限、完整的数据集。
- 数据特性: 处理的是有界数据流 (Bounded Data Stream) 或有界数据集 (Bounded Dataset) 。数据是完整的、已知的、不会改变的(例如,存储在 HDFS 上的昨天的日志文件、关系数据库中的特定表快照)。
- 执行模型: 有界计算。整个数据集在开始处理前是已知的(或可视为已知的)。Flink 会一次性加载所有(或大部分)数据,执行计算,产生最终结果后作业结束。状态管理相对简单,有序数据(通常不需要处理乱序时间)。
- API: 主要使用
DataSet API
(旧版,逐步被 Table API / SQL 和 DataStream 有界流取代)或DataStream API
配合有界数据源(推荐方式,实现流批一体)。
2.时间标定:以系统时间处理数据/以事件时间处理数据
这个是Flink中标定是以哪个时间为准进行处理相对应的数据流
如果以系统时间进行处理,那么基本上就是处理有序数据,默认在这批数据是按照系统时间来进行
如果以事件时间处理,那么需要指定数据的具体的字段作为处理时间的标定,Flink为此也提出了一个Watermark水位线的概念,这个水位线是专门给流处理数据进行处理的,通常处理的是乱序数据
3.窗口
通常就是设定一个时间段,在这个时间段内的数据将进行数据处理,时间的标定是以上面系统时间/事件事件为准的
举例说明事件标定和窗口
//系统时间
dataStream.map(new MapFunction<DataChangeInfo, OrderInfo>() {@Overridepublic OrderInfo map(DataChangeInfo dataChangeInfo) {//初步处理}})// 设置了以系统时间的20秒窗口期.window(TumblingProcessingTimeWindows.of(Time.seconds(20)))//事件时间
//水位线,这里是以数据的create_time这个字段作为水位线,并设置了10秒的允许迟到时间
WatermarkStrategy<OrderInfo> strategy = WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((o, t) -> o.getCreate_time());//设置了20秒的窗口期
dataStream.map(new MapFunction<DataChangeInfo, OrderInfo>() {@Overridepublic OrderInfo map(DataChangeInfo dataChangeInfo) {//初步处理}})// 水位线.assignTimestampsAndWatermarks(strategy)// 窗口期.window(TumblingEventTimeWindows.of(Time.seconds(20)))
重点讲解水位线和延迟时间
(以20秒窗口、10秒延迟时间举例)
流入数据:{"create_time":"2025-06-14 08:50:01","id":29,"status":1} 流水线 01
流入数据:{"create_time":"2025-06-14 08:50:10","id":30,"status":1} 流水线 10
流入数据:{"create_time":"2025-06-14 08:50:20","id":31,"status":1} 流水线 20
流入数据:{"create_time":"2025-06-14 08:50:15","id":32,"status":1} 流水线 20
流入数据:{"create_time":"2025-06-14 08:50:25","id":33,"status":1} 流水线 25
流入数据:{"create_time":"2025-06-14 08:50:30","id":34,"status":1} 流水线 30
================================================================= 触发窗口
窗口2025-06-14 16:50:00---2025-06-14 16:50:20
窗口流出数据:OrderInfo(id=29, status=1, create_time=1749891000000)
窗口流出数据:OrderInfo(id=30, status=1, create_time=1749891010000)
窗口流出数据:OrderInfo(id=32, status=1, create_time=1749891015000)
=================================================================
流入数据:{"create_time":"2025-06-14T08:50:50Z","id":35,"status":1} 流水线 50
================================================================= 触发窗口
窗口2025-06-14 16:50:20---2025-06-14 16:50:40
窗口流出数据:OrderInfo(id=31, `status`=1, create_time=1749891020000)
窗口流出数据:OrderInfo(id=33, status=1, create_time=1749891025000)
窗口流出数据:OrderInfo(id=34, status=1, create_time=1749891030000)
=================================================================
1.前两条数据就是正常输入,流水线正常往下推进;
2.到第三条数据,流水线其实已经来到了20秒,但是未触发窗口,是因为还有10秒的允许迟到时间,需要等到流水线推进到30时,才会触发窗口关闭;
3.第四条数据时间其实为15s,但是由于流水线不会回退,所以仍为20,这条数据也就是为迟到数据,仍会归到窗口内数据;
4.直到第六条数据,流水线来到了30,所以触发[00,20)的窗口关闭,处理数据,所以打印出29、30、32这三条数据;
5.然后输入第七条数据,流水线到了50,触发了第二个窗口的关闭,40窗口结束时间+10允许延迟时间,所以第二个窗口关闭,输入第二个窗口的数据,31、33、34
根据多次数据打印得出,窗口期通常会以整数形式出现和结束,无论第一个事件时间是否为整数,窗口都为整数进行开始和结束
4.算子
算子就为flink的自带的各种数据流的处理,包括流的合并,流的连接,如join、connect、cogroup、process等等,这部分内容比较多,后续单独再写一篇文章主要讲解常用的算子
5.将数据源进行输出
就是将处理好的最终数据,以sink的方式,输出到目标库中。这里我继承了官方的sink,然后进行自定义重写方法,主要目的为了将写入数据库失败可以以异常的形式日志打印出来,目前官方的sink使用下来无法知道写入数据库的异常日志。
自定义MysqlSink
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class MysqlSink extends RichSinkFunction<OrderInfo> {private transient Connection connection;private transient PreparedStatement ps;@Overridepublic void open(Configuration parameters) throws Exception {connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/local?zeroDateTimeBehavior=convertToNull&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&autoReconnect=true&allowMultiQueries=true","root", "*******");ps = connection.prepareStatement("insert into flink_output (id, record_time) VALUES (?, ?);");}@Overridepublic void invoke(OrderInfo value, Context context) throws Exception {try {System.out.println("窗口流出数据:" + value);ps.setString(1, value.getId().toString());ps.setString(2, DateFormatUtils.format(value.getCreate_time(), "yyyy-MM-dd HH:mm:ss"));ps.execute();} catch (SQLException e) {System.out.println("写入数据库失败:" + e.getMessage());e.printStackTrace();}}@Overridepublic void close() throws Exception {if (ps != null) {ps.close();}if (connection != null) {connection.close();}}
}
主函数调用sink
MysqlSink mysqlSink = new MysqlSink();process.addSink(mysqlSink).name("Mysql Sink").setParallelism(1);
5.完整项目举例
主工程
package com.flink;import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Properties;public class Main {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);MySqlSource<DataChangeInfo> inputSource = MySqlSource.<DataChangeInfo>builder().hostname("localhost").port(3306).databaseList("local").tableList("local.flink_input").username("root").password("62983335").deserializer(new MysqlDeserialization()).startupOptions(StartupOptions.latest()).jdbcProperties(createJdbcProperties()).build();MysqlSink mysqlSink = new MysqlSink();DataStream<DataChangeInfo> dataStream = env.fromSource(inputSource, WatermarkStrategy.noWatermarks(), "MySQL Source").setParallelism(1);WatermarkStrategy<OrderInfo> strategy = WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((o, t) -> o.getCreate_time());SingleOutputStreamOperator<OrderInfo> process = dataStream.filter(d -> StringUtils.isNoneBlank(d.getAfterData())).map(new MapFunction<DataChangeInfo, OrderInfo>() {@Overridepublic OrderInfo map(DataChangeInfo dataChangeInfo) {try {System.out.println("原始数据:" + dataChangeInfo.getAfterData());return JSONObject.parseObject(dataChangeInfo.getAfterData(), OrderInfo.class);} catch (Exception e) {System.err.println("Failed to parse JSON: " + dataChangeInfo.getAfterData());System.err.println("Exception message: " + e.getMessage());e.printStackTrace();return null;}}}).assignTimestampsAndWatermarks(strategy).keyBy(OrderInfo::getStatus).window(TumblingEventTimeWindows.of(Time.seconds(20)))
// .window(TumblingEventTimeWindows.of(Time.seconds(20))).process(new ProcessWindowFunction<OrderInfo, OrderInfo, Integer, TimeWindow>() {@Overridepublic void process(Integer id, ProcessWindowFunction<OrderInfo, OrderInfo, Integer, TimeWindow>.Context context, Iterable<OrderInfo> iterable, Collector<OrderInfo> output) throws Exception {long startTimeStamp = context.window().getStart();long endTimeStamp = context.window().getEnd();String startTime = DateFormatUtils.format(startTimeStamp, "yyyy-MM-dd HH:mm:ss");String endTime = DateFormatUtils.format(endTimeStamp, "yyyy-MM-dd HH:mm:ss");System.out.println("窗口" + startTime + "---" + endTime);iterable.forEach(orderInfo -> {if (orderInfo.getStatus().equals(1)) {output.collect(orderInfo);
// output.collect(orderInfo.getId() + "-----" + orderInfo.getCreate_time());}});}});process.print();process.addSink(mysqlSink).name("Mysql Sink").setParallelism(1);env.execute("start job");}private static Properties createJdbcProperties() {Properties props = new Properties();props.setProperty("useUnicode", "true");props.setProperty("characterEncoding", "UTF-8");props.setProperty("zeroDateTimeBehavior", "convertToNull");props.setProperty("serverTimezone", "Asia/Shanghai");props.setProperty("autoReconnect", "true");props.setProperty("allowMultiQueries", "true");return props;}
}
MysqlDeserialization类(用于Mysql数据库binglog数据解析)
package com.flink;import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ImmutableMap;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;import java.util.List;
import java.util.Map;
import java.util.Optional;
@Slf4j
public class MysqlDeserialization implements DebeziumDeserializationSchema<DataChangeInfo> {public static final String TS_MS = "ts_ms";public static final String BIN_FILE = "file";public static final String POS = "pos";public static final String BEFORE = "before";public static final String AFTER = "after";public static final String SOURCE = "source";/*** 获取操作类型 READ CREATE UPDATE DELETE TRUNCATE;* 变更类型: 0 初始化 1新增 2修改 3删除 4导致源中的现有表被截断的操作*/private static final Map<String, Integer> OPERATION_MAP = ImmutableMap.of("READ", 0,"CREATE", 1,"UPDATE", 2,"DELETE", 3,"TRUNCATE", 4);@Overridepublic void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) throws Exception {String topic = sourceRecord.topic();String[] fields = topic.split("\.");String database = fields[1];String tableName = fields[2];Struct struct = (Struct) sourceRecord.value();final Struct source = struct.getStruct(SOURCE);DataChangeInfo dataChangeInfo = new DataChangeInfo();// 获取操作类型 READ CREATE UPDATE DELETE TRUNCATE;Envelope.Operation operation = Envelope.operationFor(sourceRecord);String type = operation.toString().toUpperCase();int eventType = OPERATION_MAP.get(type);// fixme 一般情况是无需关心其之前之后数据的,直接获取最新的数据即可,但这里为了演示,都进行输出dataChangeInfo.setBeforeData(getJsonObject(struct, BEFORE).toJSONString());dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());if (eventType == 3) {dataChangeInfo.setData(getJsonObject(struct, BEFORE).toJSONString());} else {dataChangeInfo.setData(getJsonObject(struct, AFTER).toJSONString());}dataChangeInfo.setOperatorType(eventType);dataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse(""));dataChangeInfo.setFilePos(Optional.ofNullable(source.get(POS)).map(x -> Integer.parseInt(x.toString())).orElse(0));dataChangeInfo.setDatabase(database);dataChangeInfo.setTableName(tableName);dataChangeInfo.setOperatorTime(Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis));// 输出数据collector.collect(dataChangeInfo);}/*** 从元素数据获取出变更之前或之后的数据** @param value value* @param fieldElement fieldElement* @return JSONObject*/private JSONObject getJsonObject(Struct value, String fieldElement) {Struct element = value.getStruct(fieldElement);JSONObject jsonObject = new JSONObject();if (element != null) {Schema afterSchema = element.schema();List<Field> fieldList = afterSchema.fields();for (Field field : fieldList) {Object afterValue = element.get(field);jsonObject.put(field.name(), afterValue);}}return jsonObject;}@Overridepublic TypeInformation<DataChangeInfo> getProducedType() {return TypeInformation.of(DataChangeInfo.class);}}
DataChangeInfo类
import lombok.Data;@Data
public class DataChangeInfo {/*** 变更类型: 0 初始化 1新增 2修改 3删除 4导致源中的现有表被截断的操作*/private Integer operatorType;/*** 变更前数据*/private String beforeData;/*** 变更后数据*/private String afterData;/*** 操作的数据*/private String data;/*** binlog文件名*/private String fileName;/*** binlog当前读取点位*/private Integer filePos;/*** 数据库名*/private String database;/*** 表名*/private String tableName;/*** 变更时间*/private Long operatorTime;
}