当前位置: 首页 > news >正文

自己做微信电影网站怎么做搜索引擎如何找到网站

自己做微信电影网站怎么做,搜索引擎如何找到网站,网络推广网站优化,wordpress栏目加密这段代码展示了如何使用 Apache Flink 将数据流写入 MySQL 数据库,并使用了 JdbcSink 来实现自定义的 Sink 逻辑。以下是对代码的详细解析和说明:代码结构包声明:package sink定义了代码所在的包。导入依赖:导入了必要的 Flink 和…

这段代码展示了如何使用 Apache Flink 将数据流写入 MySQL 数据库,并使用了 JdbcSink 来实现自定义的 Sink 逻辑。以下是对代码的详细解析和说明:

代码结构

  • 包声明package sink
    定义了代码所在的包。

  • 导入依赖
    导入了必要的 Flink 和 JDBC 相关类库,包括:

    • java.sql.PreparedStatement:用于执行 SQL 语句。
    • org.apache.flink.connector.jdbc:Flink 的 JDBC 连接器相关类。
    • org.apache.flink.streaming.api.scala._:Flink 流处理 API。
  • sinkToMysql 对象
    主程序入口,包含 Flink 流处理逻辑和 MySQL Sink 的配置。

package sinkimport java.sql.PreparedStatementimport org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.scala._/**** @PROJECT_NAME: flink1.13* @PACKAGE_NAME: sink* @author: 赵嘉盟-HONOR* @data: 2023-11-20 15:23* @DESCRIPTION**/
object sinkToMysql {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval data = env.fromElements(Event("Mary", "./home", 100L),Event("Sum", "./cart", 500L),Event("King", "./prod", 1000L),Event("King", "./root", 200L))data.addSink( JdbcSink.sink("insert into clicks values(?,?)",new JdbcStatementBuilder[Event] {override def accept(t: PreparedStatement, u: Event): Unit = {t.setString(1,u.user)t.setString(2,u.url)}},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://master:3306/test").withDriverName("com.jdbc.jdbc.Driver").withUsername("root").withPassword("root").build()))env.execute("sinkRedis")}
}

基于scala使用flink将读取到的数据写入到Mysql

  1. data.addSink( JdbcSink.sink(...) ):这行代码将一个JdbcSink添加到Flink的数据流中,用于将数据写入到数据库中。

  2. "insert into clicks values(?,?)":这是SQL语句,表示将用户和URL插入到名为clicks的表中。

  3. new JdbcStatementBuilder[Event] {...}:这是一个匿名内部类,用于构建PreparedStatement对象。在这个类中,我们重写了accept方法,该方法接受一个PreparedStatement对象和一个Event对象,然后将Event对象的user和url属性设置到PreparedStatement对象中。

  4. new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()...:这是一个JdbcConnectionOptionsBuilder对象,用于构建数据库连接选项。在这个对象中,我们设置了数据库的URL、驱动名称、用户名和密码。

  5. .build():这是JdbcConnectionOptionsBuilder对象的一个方法,用于构建JdbcConnectionOptions对象。

代码解析

(1) 主程序入口
def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment
  • 创建 Flink 流处理环境 StreamExecutionEnvironment
(2) 定义数据流
val data = env.fromElements(Event("Mary", "./home", 100L),Event("Sum", "./cart", 500L),Event("King", "./prod", 1000L),Event("King", "./root", 200L)
)
  • 使用 fromElements 方法生成一个包含 4 个 Event 对象的流。
(3) 自定义 MySQL Sink
data.addSink(JdbcSink.sink("insert into clicks values(?,?)",new JdbcStatementBuilder[Event] {override def accept(t: PreparedStatement, u: Event): Unit = {t.setString(1, u.user)t.setString(2, u.url)}},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://master:3306/test").withDriverName("com.jdbc.jdbc.Driver").withUsername("root").withPassword("root").build()
))
  • 使用 JdbcSink.sink 方法将数据写入 MySQL:
    • SQL 语句insert into clicks values(?,?),插入 user 和 url 字段。
    • JdbcStatementBuilder:用于将 Event 对象映射到 SQL 语句的参数。
    • JdbcConnectionOptions:配置 MySQL 连接信息,包括 URL、驱动名称、用户名和密码。
(4) 执行任务
env.execute("sinkRedis")
  • 启动 Flink 流处理任务,任务名称为 sinkRedis

优化版本

异常处理
  • 在 Sink 中添加异常处理逻辑,避免程序因 MySQL 写入失败而崩溃:
    data.addSink(JdbcSink.sink("insert into clicks values(?,?)",new JdbcStatementBuilder[Event] {override def accept(t: PreparedStatement, u: Event): Unit = {try {t.setString(1, u.user)t.setString(2, u.url)} catch {case e: Exception => e.printStackTrace()}}},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://master:3306/test").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("root").withPassword("root").build()
    ))
批量写入
  • 如果需要提高写入性能,可以启用批量写入功能:
    data.addSink(JdbcSink.sink("insert into clicks values(?,?)",new JdbcStatementBuilder[Event] {override def accept(t: PreparedStatement, u: Event): Unit = {t.setString(1, u.user)t.setString(2, u.url)}},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://master:3306/test").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("root").withPassword("root").withBatchSize(1000) // 设置批量大小.build()
    ))

优化后的代码

以下是优化后的完整代码:

package sinkimport java.sql.PreparedStatement
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.scala._object sinkToMysql {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval data = env.fromElements(Event("Mary", "./home", 100L),Event("Sum", "./cart", 500L),Event("King", "./prod", 1000L),Event("King", "./root", 200L))data.addSink(JdbcSink.sink("insert into clicks values(?,?)",new JdbcStatementBuilder[Event] {override def accept(t: PreparedStatement, u: Event): Unit = {try {t.setString(1, u.user)t.setString(2, u.url)} catch {case e: Exception => e.printStackTrace()}}},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://master:3306/test").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("root").withPassword("root").withBatchSize(1000) // 启用批量写入.build()))env.execute("sinkToMysql")}
}
http://www.dtcms.com/a/572063.html

相关文章:

  • 心悦俱乐部官方网站怎么做任务微信小程序怎么关闭防沉迷
  • 固镇网站建设哪家好seo刷排名软件
  • 如何腾讯云二级域名做网站房地产信息网新楼盘
  • 介绍一个电影的网站模板网站建设功能文案
  • 湖北住房和城乡建设厅官方网站网站分成推广怎么做
  • 潍坊市企业型网站建设简单的房源展示网站开发
  • 西部数码网站正在建设中是什么意思建设网站要注意哪些
  • 做网站用框架好吗工信部官网备案查询系统
  • 站长工具seo综合查询权重网站建设市场调查报告
  • 做商品网站数据库有哪些seo及网络推广
  • 做网站工资多少钱企查查 天眼查
  • html个人网站制作互联网做什么比较赚钱
  • 一个网站每年维护费用长沙网络营销顾问
  • 怎么自己开个免费网站王也道长高清头像黑白
  • 提供企业网站建设wordpress 管理员
  • 河南省住房和城乡建设局网站网站建设开票内容是什么
  • 手机网站用二级目录做的弊端怎么做夜场网站
  • 网站制作的评价指标中影视公司需要的许可证
  • 建设工程质量检测机构主管网站wordpress使用百度分享插件下载
  • 深圳营销型网站建设电话外包开发app需要多少钱
  • 网站设计服务商长沙官网网站推广优化
  • erp办公系统软件网站怎样做有利于seo
  • 用django怎么做网站郑州医疗网站建设
  • 晋城做推广的网站排行厂房装修多少钱一个平方米
  • 优化企业网站标题内销常用网站
  • 公司网站制作排名网站是先备案还是先做网站
  • wordpress网站制作教程知名网站制作
  • 汉口江岸区城市建设局网站音乐网站还可以做
  • 网站建设科技有限公司手机网站开源模板
  • 厦门网站建设报设计制作费税率是多少