当前位置: 首页 > news >正文

flink1.18下游配置多个sink

方法一:使用 addSink() 多次

DataStream<String> input = ...;// 第一个 Sink
input.addSink(new FileSink<String>(...));// 第二个 Sink  
input.addSink(new KafkaSink<String>(...));// 第三个 Sink
input.addSink(new JdbcSink(...));

方法二:使用 Side Outputs(侧输出流)
如果需要根据条件将数据路由到不同的 Sink:

DataStream<String> input = ...;// 定义输出标签
OutputTag<String> infoTag = new OutputTag<String>("info") {};
OutputTag<String> errorTag = new OutputTag<String>("error") {};
OutputTag<String> debugTag = new OutputTag<String>("debug") {};// 处理主流和侧输出流
SingleOutputStreamOperator<String> processedStream = input.process(new ProcessFunction<String, String>() {@Overridepublic void processElement(String value, Context ctx, Collector<String> out) {if (value.contains("INFO")) {ctx.output(infoTag, value);} else if (value.contains("ERROR")) {ctx.output(errorTag, value);} else if (value.contains("DEBUG")) {ctx.output(debugTag, value);} else {out.collect(value); // 主流}}});// 主流输出
processedStream.addSink(new FileSink<String>(...));// 侧输出流
processedStream.getSideOutput(infoTag).addSink(new KafkaSink<String>(...));
processedStream.getSideOutput(errorTag).addSink(new JdbcSink(...));
processedStream.getSideOutput(debugTag).addSink(new ElasticsearchSink(...));

方法三:使用 RichSinkFunction 封装多个 Sink
自定义一个 Sink 函数来管理多个输出:

public class MultiSinkFunction extends RichSinkFunction<String> {private transient FileSink<String> fileSink;private transient KafkaSink<String> kafkaSink;private transient JdbcSink jdbcSink;@Overridepublic void open(Configuration parameters) throws Exception {// 初始化各个 SinkfileSink = FileSink.forRowFormat(new Path("output/file"),new SimpleStringEncoder<String>("UTF-8")).build();kafkaSink = KafkaSink.<String>builder().setBootstrapServers("localhost:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("topic-name").setValueSerializationSchema(new SimpleStringSchema()).build()).build();jdbcSink = JdbcSink.sink("INSERT INTO table_name (value) VALUES (?)",(statement, value) -> statement.setString(1, value),JdbcExecutionOptions.builder().withBatchSize(1000).withBatchIntervalMs(200).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/db").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("username").withPassword("password").build());}@Overridepublic void invoke(String value, Context context) throws Exception {// 写入文件fileSink.invoke(value, context);// 写入 KafkakafkaSink.invoke(value, context);// 写入数据库jdbcSink.invoke(value, context);}@Overridepublic void close() throws Exception {if (fileSink != null) fileSink.close();if (kafkaSink != null) kafkaSink.close();if (jdbcSink != null) jdbcSink.close();}
}// 使用自定义 MultiSink
input.addSink(new MultiSinkFunction());

方法四:使用 Broadcast Stream 进行动态路由

DataStream<String> mainStream = ...;
DataStream<RoutingRule> ruleStream = ...;// 广播规则流
MapStateDescriptor<String, RoutingRule> ruleDescriptor = new MapStateDescriptor<>("Rules", String.class, RoutingRule.class);
BroadcastStream<RoutingRule> broadcastRules = ruleStream.broadcast(ruleDescriptor);// 连接主流和广播流
BroadcastConnectedStream<String, RoutingRule> connectedStream = mainStream.connect(broadcastRules);// 处理连接后的流
DataStream<Tuple2<String, String>> routedStream = connectedStream.process(new RoutingProcessFunction());// 根据路由结果输出到不同 Sink
routedStream.filter(value -> "kafka".equals(value.f1)).map(value -> value.f0).addSink(new KafkaSink<String>(...));routedStream.filter(value -> "file".equals(value.f1)).map(value -> value.f0).addSink(new FileSink<String>(...));

方法五:使用 Flink CDC 的多表同步

public class MultiSinkJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> input = env.socketTextStream("localhost", 9999);// 1. 输出到文件FileSink<String> fileSink = FileSink.forRowFormat(new Path("/output/file"), new SimpleStringEncoder<String>("UTF-8")).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMinutes(15)).withInactivityInterval(Duration.ofMinutes(5)).withMaxPartSize(MemorySize.ofMebiBytes(1024)).build()).build();// 2. 输出到 KafkaKafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("localhost:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("flink-output").setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();// 3. 输出到 JDBCJdbcSink<String> jdbcSink = JdbcSink.sink("INSERT INTO user_actions (action_time, action_data) VALUES (?, ?)",(statement, value) -> {statement.setTimestamp(1, new Timestamp(System.currentTimeMillis()));statement.setString(2, value);},JdbcExecutionOptions.builder().withBatchSize(1000).withBatchIntervalMs(200).withMaxRetries(3).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/flink_db").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("flinkuser").withPassword("password").build());// 添加多个 Sinkinput.addSink(fileSink).name("FileSink");input.addSink(kafkaSink).name("KafkaSink");input.addSink(jdbcSink).name("JdbcSink");env.execute("Multi-Sink Example Job");}
}
http://www.dtcms.com/a/394879.html

相关文章:

  • 如何删除 MySQL 数据库中的所有数据表 ?
  • win10加域后,控制面板中的,internet 时间就没有了
  • Unity移动平台笔记
  • 【图像算法 - 27】基于YOLOv12与OpenCV的无人机智能检测系统
  • html css js网页制作成品——圣罗兰护肤html+css+js 4页附源码
  • 21届-3年-Java面经-华为od
  • 计算机视觉(opencv)实战三十一——CascadeClassifier 详解与实战人脸检测
  • 计算机视觉:基于YOLOv11 实例分割与OpenCV 在 Java 中的实现图像实例分割
  • 【实战】Spring Boot 3.x整合Redis:注解式缓存与分布式锁最佳实践
  • 密钥耳语-一个轻量、易备份、支持命令行与图形界面的口令派生加密工具 具有 CLI 和 GUI 的轻量级密码衍生加密工具
  • AI重塑流量背后,微软广告打造下一代广告生态
  • 低代码数字化时代的开发新范式
  • 微信小程序“无损去水印精灵”技术解析大纲
  • 少儿舞蹈小程序(18)订单确认
  • Uniapp X 打包抖音小程序教程
  • uni-app中实现在input的placeholder中添加图标
  • vue面试题集锦
  • 基于Springboot+UniApp+Ai实现模拟面试小工具十二:模拟面试功能实现
  • 基于Springboot+UniApp+Ai实现模拟面试小工具十一:主页功能及简历上传功能实现
  • BGP选路“十一步”法则
  • MITRE ATLAS 对抗威胁矩阵与 LLM 安全
  • 第5章:技术深度与广度:构筑你的核心壁垒(1)
  • 洞察未来:Temporal.io 如何赋能复杂模拟引擎的韧性与智能
  • Android 实例 - Android 圆形蒙版(Android 圆形蒙版实现、圆形蒙版解读)
  • PyCharm 在 Linux 上的安装指南
  • Linux 入门:开启开源世界的大门
  • ​​[硬件电路-321]:数字电路的两大家族CMOS(来源于MOS三极管管)与TTL(来源于BJT三极管)
  • 【GitHub每日速递 250922】开源 AI 搜索引擎 Perplexica:本地大模型 + 多模式搜索,免费又强大!
  • CCF-CSP-S 2021 初赛解析
  • 现在如何使用docker下载