基于scala使用flink将读取到的数据写入到kafka
这段代码展示了如何使用 Apache Flink 将数据流写入 Kafka,并提供了两种不同的 Kafka Sink 实现方式。以下是对代码的详细解析和说明:
代码结构
包声明:
package sink
定义了代码所在的包。导入依赖:
导入了必要的 Flink 和 Kafka 相关类库,包括:org.apache.flink.api.common.serialization.SimpleStringSchema:用于将数据序列化为字符串。org.apache.flink.connector.kafka.sink:Flink 的 Kafka Sink 相关类。org.apache.flink.streaming.api.scala._:Flink 流处理 API。org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer:旧版的 Kafka Sink 实现。
sinkToKafka对象:
主程序入口,包含 Flink 流处理逻辑和 Kafka Sink 的配置。
package sinkimport org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.base.DeliveryGuarantee
import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer/**** @PROJECT_NAME: flink1.13* @PACKAGE_NAME: sink* @author: 赵嘉盟-HONOR* @data: 2023-11-19 23:46* @DESCRIPTION**/
object sinkToKafka {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(4)val data = env.fromElements(Event("Mary", "./home", 100L),Event("Sum", "./cart", 500L),Event("King", "./prod", 1000L),Event("King", "./root", 200L))data.map(_.toString).addSink(new FlinkKafkaProducer[String]("","",new SimpleStringSchema()))val kafkaSink=KafkaSink.builder().setBootstrapServers("").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("").setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build()data.map(_.toString).sinkTo(kafkaSink)env.execute("sinkKafka")}
}
基于scala使用flink将读取到的数据写入到kafka
val kafkaSink=KafkaSink.builder():这行代码创建了一个KafkaSink对象的构建器。.setBootstrapServers(""):这行代码设置了Kafka集群的地址,这里为空字符串,表示没有指定具体的Kafka集群地址。.setRecordSerializer(KafkaRecordSerializationSchema.builder():这行代码创建了一个KafkaRecordSerializationSchema对象的构建器,用于序列化要发送到Kafka的数据。.setTopic(""):这行代码设置了要发送数据的Kafka主题,这里为空字符串,表示没有指定具体的Kafka主题。.setValueSerializationSchema(new SimpleStringSchema()):这行代码设置了要发送数据的序列化方式,这里使用了SimpleStringSchema,表示将数据直接转换为字符串进行发送。.build():这行代码完成了KafkaRecordSerializationSchema对象的构建。.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)是一个设置消息传递保证的代码片段。它指定了消息传递的可靠性,即至少传递一次。在分布式系统中,消息传递是一种常见的通信方式,用于在不同的节点之间传递数据或指令。为了保证消息传递的可靠性,通常会使用一些机制来确保消息至少被传递一次。
在这个代码片段中,
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)是设置消息传递保证的方法调用。其中,DeliveryGuarantee是一个枚举类型,表示不同的消息传递保证级别。AT_LEAST_ONCE是其中一个级别,表示至少传递一次。通过将
DeliveryGuarantee.AT_LEAST_ONCE作为参数传递给.setDeliverGuarantee()方法,可以确保消息至少被传递一次,即使中间出现了故障或其他问题。这样可以提高系统的可靠性和容错能力。
代码解析
(1) 主程序入口
def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(4)- 创建 Flink 流处理环境
StreamExecutionEnvironment。 - 设置并行度为 4。
(2) 定义数据流
val data = env.fromElements(Event("Mary", "./home", 100L),Event("Sum", "./cart", 500L),Event("King", "./prod", 1000L),Event("King", "./root", 200L)
)- 使用
fromElements方法生成一个包含 4 个Event对象的流。
(3) 使用旧版 Kafka Sink
data.map(_.toString).addSink(new FlinkKafkaProducer[String]("","",new SimpleStringSchema()))- 将
Event对象转换为字符串。 - 使用
FlinkKafkaProducer将数据写入 Kafka。- 参数说明:
- 第一个参数:Kafka 的
bootstrap.servers(未填写)。 - 第二个参数:Kafka 的
topic(未填写)。 - 第三个参数:序列化器
SimpleStringSchema。
- 第一个参数:Kafka 的
- 参数说明:
(4) 使用新版 Kafka Sink
val kafkaSink = KafkaSink.builder().setBootstrapServers("") // Kafka 服务器地址.setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("") // Kafka topic.setValueSerializationSchema(new SimpleStringSchema()) // 值序列化器.build()).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) // 交付保证.build()- 使用
KafkaSink.builder()创建一个 Kafka Sink:setBootstrapServers:设置 Kafka 服务器地址(未填写)。setRecordSerializer:配置记录序列化器:setTopic:设置 Kafka topic(未填写)。setValueSerializationSchema:设置值序列化器为SimpleStringSchema。
setDeliverGuarantee:设置交付保证为AT_LEAST_ONCE(至少一次)。
(5) 将数据写入 Kafka
data.map(_.toString).sinkTo(kafkaSink)- 将
Event对象转换为字符串。 - 使用
sinkTo方法将数据写入 Kafka。
(6) 执行任务
env.execute("sinkKafka")- 启动 Flink 流处理任务,任务名称为
sinkKafka。
代码优化
交付保证
DeliveryGuarantee.AT_LEAST_ONCE是默认的交付保证,如果需要更高的可靠性,可以设置为EXACTLY_ONCE:.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
异常处理
- 在 Sink 中添加异常处理逻辑,避免程序因 Kafka 写入失败而崩溃:
data.map(_.toString).sinkTo(kafkaSink).setParallelism(1).name("KafkaSink")
动态 Topic
- 如果需要根据数据动态选择 Kafka topic,可以实现
KafkaRecordSerializationSchema。
优化后的代码
以下是优化后的完整代码:
package sinkimport org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.base.DeliveryGuarantee
import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink}
import org.apache.flink.streaming.api.scala._object sinkToKafka {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(4)val data = env.fromElements(Event("Mary", "./home", 100L),Event("Sum", "./cart", 500L),Event("King", "./prod", 1000L),Event("King", "./root", 200L))val kafkaSink = KafkaSink.builder().setBootstrapServers("localhost:9092") // Kafka 服务器地址.setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("test-topic") // Kafka topic.setValueSerializationSchema(new SimpleStringSchema()) // 值序列化器.build()).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) // 交付保证.build()data.map(_.toString).sinkTo(kafkaSink)env.execute("sinkKafka")}
}