当前位置: 首页 > news >正文

基于scala使用flink将读取到的数据写入到kafka

这段代码展示了如何使用 Apache Flink 将数据流写入 Kafka,并提供了两种不同的 Kafka Sink 实现方式。以下是对代码的详细解析和说明:

代码结构

  • 包声明package sink
    定义了代码所在的包。

  • 导入依赖
    导入了必要的 Flink 和 Kafka 相关类库,包括:

    • org.apache.flink.api.common.serialization.SimpleStringSchema:用于将数据序列化为字符串。
    • org.apache.flink.connector.kafka.sink:Flink 的 Kafka Sink 相关类。
    • org.apache.flink.streaming.api.scala._:Flink 流处理 API。
    • org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer:旧版的 Kafka Sink 实现。
  • sinkToKafka 对象
    主程序入口,包含 Flink 流处理逻辑和 Kafka Sink 的配置。

package sinkimport org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.base.DeliveryGuarantee
import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer/**** @PROJECT_NAME: flink1.13* @PACKAGE_NAME: sink* @author: 赵嘉盟-HONOR* @data: 2023-11-19 23:46* @DESCRIPTION**/
object sinkToKafka {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(4)val data = env.fromElements(Event("Mary", "./home", 100L),Event("Sum", "./cart", 500L),Event("King", "./prod", 1000L),Event("King", "./root", 200L))data.map(_.toString).addSink(new FlinkKafkaProducer[String]("","",new SimpleStringSchema()))val kafkaSink=KafkaSink.builder().setBootstrapServers("").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("").setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build()data.map(_.toString).sinkTo(kafkaSink)env.execute("sinkKafka")}
}

基于scala使用flink将读取到的数据写入到kafka

  1. val kafkaSink=KafkaSink.builder():这行代码创建了一个KafkaSink对象的构建器。

  2. .setBootstrapServers(""):这行代码设置了Kafka集群的地址,这里为空字符串,表示没有指定具体的Kafka集群地址。

  3. .setRecordSerializer(KafkaRecordSerializationSchema.builder():这行代码创建了一个KafkaRecordSerializationSchema对象的构建器,用于序列化要发送到Kafka的数据。

  4. .setTopic(""):这行代码设置了要发送数据的Kafka主题,这里为空字符串,表示没有指定具体的Kafka主题。

  5. .setValueSerializationSchema(new SimpleStringSchema()):这行代码设置了要发送数据的序列化方式,这里使用了SimpleStringSchema,表示将数据直接转换为字符串进行发送。

  6. .build():这行代码完成了KafkaRecordSerializationSchema对象的构建。

  7. .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) 是一个设置消息传递保证的代码片段。它指定了消息传递的可靠性,即至少传递一次。

    在分布式系统中,消息传递是一种常见的通信方式,用于在不同的节点之间传递数据或指令。为了保证消息传递的可靠性,通常会使用一些机制来确保消息至少被传递一次。

    在这个代码片段中,.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) 是设置消息传递保证的方法调用。其中,DeliveryGuarantee 是一个枚举类型,表示不同的消息传递保证级别。AT_LEAST_ONCE 是其中一个级别,表示至少传递一次。

    通过将 DeliveryGuarantee.AT_LEAST_ONCE 作为参数传递给 .setDeliverGuarantee() 方法,可以确保消息至少被传递一次,即使中间出现了故障或其他问题。这样可以提高系统的可靠性和容错能力。

代码解析

(1) 主程序入口
def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(4)
  • 创建 Flink 流处理环境 StreamExecutionEnvironment
  • 设置并行度为 4。
(2) 定义数据流
val data = env.fromElements(Event("Mary", "./home", 100L),Event("Sum", "./cart", 500L),Event("King", "./prod", 1000L),Event("King", "./root", 200L)
)
  • 使用 fromElements 方法生成一个包含 4 个 Event 对象的流。
(3) 使用旧版 Kafka Sink
data.map(_.toString).addSink(new FlinkKafkaProducer[String]("","",new SimpleStringSchema()))
  • 将 Event 对象转换为字符串。
  • 使用 FlinkKafkaProducer 将数据写入 Kafka。
    • 参数说明:
      • 第一个参数:Kafka 的 bootstrap.servers(未填写)。
      • 第二个参数:Kafka 的 topic(未填写)。
      • 第三个参数:序列化器 SimpleStringSchema
(4) 使用新版 Kafka Sink
val kafkaSink = KafkaSink.builder().setBootstrapServers("") // Kafka 服务器地址.setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("") // Kafka topic.setValueSerializationSchema(new SimpleStringSchema()) // 值序列化器.build()).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) // 交付保证.build()
  • 使用 KafkaSink.builder() 创建一个 Kafka Sink:
    • setBootstrapServers:设置 Kafka 服务器地址(未填写)。
    • setRecordSerializer:配置记录序列化器:
      • setTopic:设置 Kafka topic(未填写)。
      • setValueSerializationSchema:设置值序列化器为 SimpleStringSchema
    • setDeliverGuarantee:设置交付保证为 AT_LEAST_ONCE(至少一次)。
(5) 将数据写入 Kafka
data.map(_.toString).sinkTo(kafkaSink)
  • 将 Event 对象转换为字符串。
  • 使用 sinkTo 方法将数据写入 Kafka。
(6) 执行任务
env.execute("sinkKafka")
  • 启动 Flink 流处理任务,任务名称为 sinkKafka

代码优化

交付保证
  • DeliveryGuarantee.AT_LEAST_ONCE 是默认的交付保证,如果需要更高的可靠性,可以设置为 EXACTLY_ONCE
    .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
异常处理
  • 在 Sink 中添加异常处理逻辑,避免程序因 Kafka 写入失败而崩溃:
    data.map(_.toString).sinkTo(kafkaSink).setParallelism(1).name("KafkaSink")
动态 Topic
  • 如果需要根据数据动态选择 Kafka topic,可以实现 KafkaRecordSerializationSchema

优化后的代码

以下是优化后的完整代码:

package sinkimport org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.base.DeliveryGuarantee
import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink}
import org.apache.flink.streaming.api.scala._object sinkToKafka {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(4)val data = env.fromElements(Event("Mary", "./home", 100L),Event("Sum", "./cart", 500L),Event("King", "./prod", 1000L),Event("King", "./root", 200L))val kafkaSink = KafkaSink.builder().setBootstrapServers("localhost:9092") // Kafka 服务器地址.setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("test-topic") // Kafka topic.setValueSerializationSchema(new SimpleStringSchema()) // 值序列化器.build()).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) // 交付保证.build()data.map(_.toString).sinkTo(kafkaSink)env.execute("sinkKafka")}
}
http://www.dtcms.com/a/592523.html

相关文章:

  • 跨平台OPC UA开发:.NET、Java与C++ SDK的深度对比
  • 硬盘第一关:MBR VS GPT
  • 从原理到演进:vLLM PD分离KV cache传递机制全解析
  • 如何在浏览器侧边栏中使用GPT/Gemini/Claude进行网页对话?
  • 【gpt-oss-20b】一次 20B 大模型的私有化部署评测
  • zynq的PS端ENET网口引出到EMIO的PL引脚
  • 商城网站设计策划wordpress 去除归档链接
  • 李宏毅机器学习笔记44
  • 小杰-大模型(three)——RAG与Agent设计——Langchain-OutputParser输出解析器
  • LSTM核心参数与输入输出解读
  • 【机器学习算法】面试中的ROC和AUC
  • OSPF中的cost值
  • 《场景化落地:用 Linux 共享内存解决进程间高效数据传输问题(终篇)》
  • 襄阳建设网站首页向网站服务器上传网页文件下载
  • 视频去动态水印软件HitPaw安装和使用教程
  • O2OA(翱途)开发平台 v9.5 前端框架设计|开放 · 安全 · 可控 · 信创优选
  • CMakeList 中 PUBLIC 和 PRIVATE的区别
  • langchain 环境搭建
  • 捷讯官网 网站建设中小型企业网站大全
  • 《算法闯关指南:优选算法--位运算》--36.两个整数之和,37.只出现一次的数字 ||
  • 素材网站开发做流量网站挂广告还能挣钱吗
  • 学习OPC UA,连接OPC UA服务器
  • 从零开始:构建你的第一个MCP服务器
  • 数据结构之二叉树-堆
  • BridgeVLA 对比 pi 0.5 有提升吗
  • 深度学习 :python水下海洋生物识别检测系统 Yolo模型 PyTorch框架 计算机 ✅
  • COM_QueryInterface
  • DeepSeek-OCR全面解析:技术原理、性能优势与实战指南
  • WebKit Insie: WebKit 调试(二)
  • 网站建设需求材料推广网店的途径和方法