Flink数据流高效写入MySQL实战
这段代码展示了如何使用 Apache Flink 将数据流写入 MySQL 数据库,并使用了 JdbcSink
来实现自定义的 Sink 逻辑。以下是对代码的详细解析和说明:
代码结构
包声明:
package sink
定义了代码所在的包。导入依赖:
导入了必要的 Flink 和 JDBC 相关类库,包括:java.sql.PreparedStatement
:用于执行 SQL 语句。org.apache.flink.connector.jdbc
:Flink 的 JDBC 连接器相关类。org.apache.flink.streaming.api.scala._
:Flink 流处理 API。
sinkToMysql
对象:
主程序入口,包含 Flink 流处理逻辑和 MySQL Sink 的配置。
package sinkimport java.sql.PreparedStatementimport org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.scala._/**** @PROJECT_NAME: flink1.13* @PACKAGE_NAME: sink* @author: 赵嘉盟-HONOR* @data: 2023-11-20 15:23* @DESCRIPTION**/
object sinkToMysql {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval data = env.fromElements(Event("Mary", "./home", 100L),Event("Sum", "./cart", 500L),Event("King", "./prod", 1000L),Event("King", "./root", 200L))data.addSink( JdbcSink.sink("insert into clicks values(?,?)",new JdbcStatementBuilder[Event] {override def accept(t: PreparedStatement, u: Event): Unit = {t.setString(1,u.user)t.setString(2,u.url)}},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://master:3306/test").withDriverName("com.jdbc.jdbc.Driver").withUsername("root").withPassword("root").build()))env.execute("sinkRedis")}
}
基于scala使用flink将读取到的数据写入到Mysql
data.addSink( JdbcSink.sink(...) )
:这行代码将一个JdbcSink添加到Flink的数据流中,用于将数据写入到数据库中。"insert into clicks values(?,?)"
:这是SQL语句,表示将用户和URL插入到名为clicks的表中。new JdbcStatementBuilder[Event] {...}
:这是一个匿名内部类,用于构建PreparedStatement对象。在这个类中,我们重写了accept
方法,该方法接受一个PreparedStatement对象和一个Event对象,然后将Event对象的user和url属性设置到PreparedStatement对象中。new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()...
:这是一个JdbcConnectionOptionsBuilder对象,用于构建数据库连接选项。在这个对象中,我们设置了数据库的URL、驱动名称、用户名和密码。.build()
:这是JdbcConnectionOptionsBuilder对象的一个方法,用于构建JdbcConnectionOptions对象。
代码解析
(1) 主程序入口
def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment
- 创建 Flink 流处理环境
StreamExecutionEnvironment
。
(2) 定义数据流
val data = env.fromElements(Event("Mary", "./home", 100L),Event("Sum", "./cart", 500L),Event("King", "./prod", 1000L),Event("King", "./root", 200L)
)
- 使用
fromElements
方法生成一个包含 4 个Event
对象的流。
(3) 自定义 MySQL Sink
data.addSink(JdbcSink.sink("insert into clicks values(?,?)",new JdbcStatementBuilder[Event] {override def accept(t: PreparedStatement, u: Event): Unit = {t.setString(1, u.user)t.setString(2, u.url)}},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://master:3306/test").withDriverName("com.jdbc.jdbc.Driver").withUsername("root").withPassword("root").build()
))
- 使用
JdbcSink.sink
方法将数据写入 MySQL:- SQL 语句:
insert into clicks values(?,?)
,插入user
和url
字段。 JdbcStatementBuilder
:用于将Event
对象映射到 SQL 语句的参数。JdbcConnectionOptions
:配置 MySQL 连接信息,包括 URL、驱动名称、用户名和密码。
- SQL 语句:
(4) 执行任务
env.execute("sinkRedis")
- 启动 Flink 流处理任务,任务名称为
sinkRedis
。
优化版本
异常处理
- 在 Sink 中添加异常处理逻辑,避免程序因 MySQL 写入失败而崩溃:
data.addSink(JdbcSink.sink("insert into clicks values(?,?)",new JdbcStatementBuilder[Event] {override def accept(t: PreparedStatement, u: Event): Unit = {try {t.setString(1, u.user)t.setString(2, u.url)} catch {case e: Exception => e.printStackTrace()}}},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://master:3306/test").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("root").withPassword("root").build() ))
批量写入
- 如果需要提高写入性能,可以启用批量写入功能:
data.addSink(JdbcSink.sink("insert into clicks values(?,?)",new JdbcStatementBuilder[Event] {override def accept(t: PreparedStatement, u: Event): Unit = {t.setString(1, u.user)t.setString(2, u.url)}},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://master:3306/test").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("root").withPassword("root").withBatchSize(1000) // 设置批量大小.build() ))
优化后的代码
以下是优化后的完整代码:
package sinkimport java.sql.PreparedStatement
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.scala._object sinkToMysql {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval data = env.fromElements(Event("Mary", "./home", 100L),Event("Sum", "./cart", 500L),Event("King", "./prod", 1000L),Event("King", "./root", 200L))data.addSink(JdbcSink.sink("insert into clicks values(?,?)",new JdbcStatementBuilder[Event] {override def accept(t: PreparedStatement, u: Event): Unit = {try {t.setString(1, u.user)t.setString(2, u.url)} catch {case e: Exception => e.printStackTrace()}}},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://master:3306/test").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("root").withPassword("root").withBatchSize(1000) // 启用批量写入.build()))env.execute("sinkToMysql")}
}