当前位置: 首页 > news >正文

flink1.18配置多个上游source和下游sink

方法一:多 Source 合并后多 Sink 输出

public class MultiSourceMultiSinkJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 1. 定义多个 Source// Kafka Source 1KafkaSource<String> kafkaSource1 = KafkaSource.<String>builder().setBootstrapServers("localhost:9092").setTopics("topic1").setGroupId("flink-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();// Kafka Source 2KafkaSource<String> kafkaSource2 = KafkaSource.<String>builder().setBootstrapServers("localhost:9092").setTopics("topic2").setGroupId("flink-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();// File SourceDataStream<String> fileStream = env.readTextFile("hdfs:///input/data.txt");// 2. 从多个 Source 创建数据流DataStream<String> kafkaStream1 = env.fromSource(kafkaSource1, WatermarkStrategy.noWatermarks(), "Kafka-Source-1");DataStream<String> kafkaStream2 = env.fromSource(kafkaSource2, WatermarkStrategy.noWatermarks(), "Kafka-Source-2");// 3. 合并多个数据流DataStream<String> mergedStream = kafkaStream1.union(kafkaStream2, fileStream).name("Merged-Stream");// 4. 数据处理(可选)DataStream<String> processedStream = mergedStream.map(value -> value.toUpperCase()).name("Processing");// 5. 定义多个 Sink// Sink 1: 输出到 KafkaKafkaSink<String> kafkaSink1 = KafkaSink.<String>builder().setBootstrapServers("localhost:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("output-topic-1").setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();// Sink 2: 输出到文件FileSink<String> fileSink = FileSink.forRowFormat(new Path("hdfs:///output/files"), new SimpleStringEncoder<String>("UTF-8")).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMinutes(15)).build()).build();// Sink 3: 输出到 JDBCJdbcSink<String> jdbcSink = JdbcSink.sink("INSERT INTO results (data, processed_time) VALUES (?, ?)",(statement, value) -> {statement.setString(1, value);statement.setTimestamp(2, new Timestamp(System.currentTimeMillis()));},JdbcExecutionOptions.builder().withBatchSize(1000).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/flink_db").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("user").withPassword("password").build());// Sink 4: 输出到另一个 Kafka TopicKafkaSink<String> kafkaSink2 = KafkaSink.<String>builder().setBootstrapServers("localhost:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("output-topic-2").setValueSerializationSchema(new SimpleStringSchema()).build()).build();// 6. 输出到多个 SinkprocessedStream.sinkTo(kafkaSink1).name("Kafka-Sink-1");processedStream.sinkTo(fileSink).name("File-Sink");processedStream.sinkTo(jdbcSink).name("JDBC-Sink");processedStream.sinkTo(kafkaSink2).name("Kafka-Sink-2");env.execute("Multi-Source Multi-Sink Job");}
}

方法二:不同 Source 路由到不同 Sink

public class RoutedMultiSourceSinkJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 定义输出标签用于路由OutputTag<String> kafkaOutputTag = new OutputTag<String>("kafka-output") {};OutputTag<String> jdbcOutputTag = new OutputTag<String>("jdbc-output") {};OutputTag<String> fileOutputTag = new OutputTag<String>("file-output") {};// 1. 多个 SourceKafkaSource<String> source1 = KafkaSource.<String>builder().setBootstrapServers("localhost:9092").setTopics("sensor-data").setGroupId("flink-group").setValueOnlyDeserializer(new SimpleStringSchema()).build();KafkaSource<String> source2 = KafkaSource.<String>builder().setBootstrapServers("localhost:9092").setTopics("log-data").setGroupId("flink-group").setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStream<String> stream1 = env.fromSource(source1, WatermarkStrategy.noWatermarks(), "Sensor-Source");DataStream<String> stream2 = env.fromSource(source2, WatermarkStrategy.noWatermarks(), "Log-Source");// 2. 根据来源进行路由处理SingleOutputStreamOperator<String> processedStream1 = stream1.process(new RoutingProcessFunction("sensor", kafkaOutputTag, jdbcOutputTag));SingleOutputStreamOperator<String> processedStream2 = stream2.process(new RoutingProcessFunction("log", fileOutputTag, kafkaOutputTag));// 3. 定义 SinkKafkaSink<String> kafkaSink = createKafkaSink("output-topic");FileSink<String> fileSink = createFileSink();JdbcSink<String> jdbcSink = createJdbcSink();// 4. 路由输出到不同 Sink// Sensor 数据路由processedStream1.getSideOutput(kafkaOutputTag).sinkTo(kafkaSink).name("Sensor-Kafka-Sink");processedStream1.getSideOutput(jdbcOutputTag).sinkTo(jdbcSink).name("Sensor-JDBC-Sink");// Log 数据路由processedStream2.getSideOutput(fileOutputTag).sinkTo(fileSink).name("Log-File-Sink");processedStream2.getSideOutput(kafkaOutputTag).sinkTo(kafkaSink).name("Log-Kafka-Sink");env.execute("Routed Multi-Source Multi-Sink Job");}// 路由处理函数public static class RoutingProcessFunction extends ProcessFunction<String, String> {private final String sourceType;private final OutputTag<String> primaryOutput;private final OutputTag<String> secondaryOutput;public RoutingProcessFunction(String sourceType, OutputTag<String> primaryOutput, OutputTag<String> secondaryOutput) {this.sourceType = sourceType;this.primaryOutput = primaryOutput;this.secondaryOutput = secondaryOutput;}@Overridepublic void processElement(String value, Context ctx, Collector<String> out) {if ("sensor".equals(sourceType)) {if (value.contains("high")) {ctx.output(primaryOutput, value); // 到 Kafka} else {ctx.output(secondaryOutput, value); // 到 JDBC}} else if ("log".equals(sourceType)) {if (value.contains("ERROR")) {ctx.output(primaryOutput, value); // 到文件} else {ctx.output(secondaryOutput, value); // 到 Kafka}}out.collect(value); // 主流(可选)}}private static KafkaSink<String> createKafkaSink(String topic) {return KafkaSink.<String>builder().setBootstrapServers("localhost:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(topic).setValueSerializationSchema(new SimpleStringSchema()).build()).build();}private static FileSink<String> createFileSink() {return FileSink.forRowFormat(new Path("hdfs:///output/logs"),new SimpleStringEncoder<String>("UTF-8")).build();}private static JdbcSink<String> createJdbcSink() {return JdbcSink.sink("INSERT INTO sensor_data (value) VALUES (?)",(statement, value) -> statement.setString(1, value),JdbcExecutionOptions.builder().withBatchSize(1000).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/sensor_db").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("user").withPassword("password").build());}
}

方法三:使用 Broadcast State 动态路由

public class DynamicRoutingJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 1. 主数据流(多个 Source)KafkaSource<String> mainSource1 = createKafkaSource("topic1");KafkaSource<String> mainSource2 = createKafkaSource("topic2");DataStream<String> mainStream = env.fromSource(mainSource1, WatermarkStrategy.noWatermarks(), "Main-Source-1").union(env.fromSource(mainSource2, WatermarkStrategy.noWatermarks(), "Main-Source-2"));// 2. 配置规则流(另一个 Source)KafkaSource<RoutingRule> ruleSource = KafkaSource.<RoutingRule>builder().setBootstrapServers("localhost:9092").setTopics("routing-rules").setGroupId("rule-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new JsonDeserializationSchema<>(RoutingRule.class)).build();DataStream<RoutingRule> ruleStream = env.fromSource(ruleSource, WatermarkStrategy.noWatermarks(), "Rule-Source");// 3. 广播规则MapStateDescriptor<Void, RoutingRule> ruleDescriptor = new MapStateDescriptor<>("Rules", Void.class, RoutingRule.class);BroadcastStream<RoutingRule> broadcastRules = ruleStream.broadcast(ruleDescriptor);// 4. 连接主流和广播规则BroadcastConnectedStream<String, RoutingRule> connectedStream = mainStream.connect(broadcastRules);// 5. 处理连接流DataStream<RoutedMessage> routedStream = connectedStream.process(new DynamicRouterFunction()).name("Dynamic-Router");// 6. 多个 SinkKafkaSink<RoutedMessage> kafkaSink = createKafkaSinkForRouted();FileSink<RoutedMessage> fileSink = createFileSinkForRouted();JdbcSink<RoutedMessage> jdbcSink = createJdbcSinkForRouted();// 7. 根据路由目标输出routedStream.filter(message -> "kafka".equals(message.getTarget())).sinkTo(kafkaSink).name("Kafka-Target-Sink");routedStream.filter(message -> "file".equals(message.getTarget())).sinkTo(fileSink).name("File-Target-Sink");routedStream.filter(message -> "jdbc".equals(message.getTarget())).sinkTo(jdbcSink).name("JDBC-Target-Sink");env.execute("Dynamic Routing Multi-Source Multi-Sink Job");}// 路由规则类public static class RoutingRule {private String pattern;private String target;// getters and setters}// 路由消息类public static class RoutedMessage {private String originalMessage;private String target;// getters and setters}// 动态路由函数public static class DynamicRouterFunction extends BroadcastProcessFunction<String, RoutingRule, RoutedMessage> {private final MapStateDescriptor<Void, RoutingRule> ruleDescriptor;public DynamicRouterFunction(MapStateDescriptor<Void, RoutingRule> ruleDescriptor) {this.ruleDescriptor = ruleDescriptor;}@Overridepublic void processElement(String value, ReadOnlyContext ctx, Collector<RoutedMessage> out) {ReadOnlyBroadcastState<Void, RoutingRule> rules = ctx.getBroadcastState(ruleDescriptor);RoutingRule rule = rules.get(null);RoutedMessage message = new RoutedMessage();message.setOriginalMessage(value);if (rule != null && value.contains(rule.getPattern())) {message.setTarget(rule.getTarget());} else {message.setTarget("default"); // 默认路由}out.collect(message);}@Overridepublic void processBroadcastElement(RoutingRule rule, Context ctx, Collector<RoutedMessage> out) {BroadcastState<Void, RoutingRule> rules = ctx.getBroadcastState(ruleDescriptor);rules.put(null, rule); // 更新规则}}
}

方法四:使用 Table API 的多 Source 多 Sink

public class TableApiMultiSourceSink {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 1. 注册多个 Source TabletableEnv.executeSql("CREATE TABLE source1 (" +"  id STRING," +"  value INT," +"  ts TIMESTAMP(3)" +") WITH (" +"  'connector' = 'kafka'," +"  'topic' = 'topic1'," +"  'properties.bootstrap.servers' = 'localhost:9092'," +"  'format' = 'json'" +")");tableEnv.executeSql("CREATE TABLE source2 (" +"  log_level STRING," +"  message STRING," +"  timestamp TIMESTAMP(3)" +") WITH (" +"  'connector' = 'kafka'," +"  'topic' = 'topic2'," +"  'properties.bootstrap.servers' = 'localhost:9092'," +"  'format' = 'json'" +")");// 2. 注册多个 Sink TabletableEnv.executeSql("CREATE TABLE sink1 (" +"  id STRING," +"  processed_value INT," +"  process_time TIMESTAMP(3)" +") WITH (" +"  'connector' = 'kafka'," +"  'topic' = 'output-topic1'," +"  'properties.bootstrap.servers' = 'localhost:9092'," +"  'format' = 'json'" +")");tableEnv.executeSql("CREATE TABLE sink2 (" +"  level STRING," +"  msg STRING," +"  storage_time TIMESTAMP(3)" +") WITH (" +"  'connector' = 'jdbc'," +"  'url' = 'jdbc:mysql://localhost:3306/flink_db'," +"  'table-name' = 'log_data'," +"  'username' = 'user'," +"  'password' = 'password'" +")");tableEnv.executeSql("CREATE TABLE sink3 (" +"  data STRING," +"  file_time TIMESTAMP(3)" +") WITH (" +"  'connector' = 'filesystem'," +"  'path' = 'hdfs:///output/files'," +"  'format' = 'json'" +")");// 3. 执行多个 INSERT 语句tableEnv.executeSql("INSERT INTO sink1 " +"SELECT id, value * 2, CURRENT_TIMESTAMP " +"FROM source1 " +"WHERE value > 100");tableEnv.executeSql("INSERT INTO sink2 " +"SELECT log_level, message, CURRENT_TIMESTAMP " +"FROM source2 " +"WHERE log_level = 'ERROR'");tableEnv.executeSql("INSERT INTO sink3 " +"SELECT CONCAT(id, ':', value), CURRENT_TIMESTAMP " +"FROM source1 " +"WHERE value <= 100");}
}
http://www.dtcms.com/a/395385.html

相关文章:

  • 快速查看自己电脑的ip地址:一个命令见本机私网ip,一步查询本地网络公网ip,附内网ip让外网访问的通用方法
  • 插件化(Plugin)设计模式——Python 的动态导入和参数解析库 argparse 的高级用法
  • 【JavaSE】【网络原理】UDP和TCP原理
  • 高防IP真的能抵御DDoS攻击吗?
  • 93. 复原 IP 地址
  • 智能排班系统,促进人岗匹配提升人效
  • PostgreSQL介绍和PostgreSQL包安装
  • 分享“泰迪杯”数据挖掘挑战赛全新升级——赛题精准对标,搭建 “白名单” 赛事进阶通道
  • 对接文档:快递鸟取件码API,实现物流末端服务自动化
  • GIS学习:GIS认知与开发初步入门
  • 9. NVME与SSD之间的通信
  • Navicat连接PostgreSQL报错:authentication method 10 not supported
  • Diffusion 模型解读
  • 【寰宇光锥舟】 数学模型讨论
  • Further inference in the multiple linear regression model
  • Turtlebot: 开源机器人开发平台 SLAM硬件搭建(激光雷达+IMU+相机+移动底盘)
  • Java 线程的几种状态
  • 在线ps修改图片中的文字
  • Hadoop 保姆级搭建手册:突出教程的细致和易上手
  • 使用gsettings修改命令ubuntu快捷键
  • Linux线程互斥与同步
  • 【AI扣子生成测试用例】自动生成测试用例工作流
  • Hive建表实战
  • Ethernaut Level 5: Token - 整数下溢攻击详解
  • 正向代理 vs 反向代理
  • SNN论文阅读——spikformer
  • 【论文阅读】Robix:机器人交互、推理与规划的统一模型
  • 【论文阅读】AutoDrive-R^2: 激励自动驾驶VLA模型的推理与自我反思能力
  • [UnrealEngine] 虚幻引擎UE5下载及安装(UE4、UE5)
  • AI原生安全架构的提出与落地路径:来自南凌科技的实践观察