flink1.18下游配置多个sink
方法一:使用 addSink() 多次
DataStream<String> input = ...;// 第一个 Sink
input.addSink(new FileSink<String>(...));// 第二个 Sink
input.addSink(new KafkaSink<String>(...));// 第三个 Sink
input.addSink(new JdbcSink(...));
方法二:使用 Side Outputs(侧输出流)
如果需要根据条件将数据路由到不同的 Sink:
DataStream<String> input = ...;// 定义输出标签
OutputTag<String> infoTag = new OutputTag<String>("info") {};
OutputTag<String> errorTag = new OutputTag<String>("error") {};
OutputTag<String> debugTag = new OutputTag<String>("debug") {};// 处理主流和侧输出流
SingleOutputStreamOperator<String> processedStream = input.process(new ProcessFunction<String, String>() {@Overridepublic void processElement(String value, Context ctx, Collector<String> out) {if (value.contains("INFO")) {ctx.output(infoTag, value);} else if (value.contains("ERROR")) {ctx.output(errorTag, value);} else if (value.contains("DEBUG")) {ctx.output(debugTag, value);} else {out.collect(value); // 主流}}});// 主流输出
processedStream.addSink(new FileSink<String>(...));// 侧输出流
processedStream.getSideOutput(infoTag).addSink(new KafkaSink<String>(...));
processedStream.getSideOutput(errorTag).addSink(new JdbcSink(...));
processedStream.getSideOutput(debugTag).addSink(new ElasticsearchSink(...));
方法三:使用 RichSinkFunction 封装多个 Sink
自定义一个 Sink 函数来管理多个输出:
public class MultiSinkFunction extends RichSinkFunction<String> {private transient FileSink<String> fileSink;private transient KafkaSink<String> kafkaSink;private transient JdbcSink jdbcSink;@Overridepublic void open(Configuration parameters) throws Exception {// 初始化各个 SinkfileSink = FileSink.forRowFormat(new Path("output/file"),new SimpleStringEncoder<String>("UTF-8")).build();kafkaSink = KafkaSink.<String>builder().setBootstrapServers("localhost:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("topic-name").setValueSerializationSchema(new SimpleStringSchema()).build()).build();jdbcSink = JdbcSink.sink("INSERT INTO table_name (value) VALUES (?)",(statement, value) -> statement.setString(1, value),JdbcExecutionOptions.builder().withBatchSize(1000).withBatchIntervalMs(200).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/db").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("username").withPassword("password").build());}@Overridepublic void invoke(String value, Context context) throws Exception {// 写入文件fileSink.invoke(value, context);// 写入 KafkakafkaSink.invoke(value, context);// 写入数据库jdbcSink.invoke(value, context);}@Overridepublic void close() throws Exception {if (fileSink != null) fileSink.close();if (kafkaSink != null) kafkaSink.close();if (jdbcSink != null) jdbcSink.close();}
}// 使用自定义 MultiSink
input.addSink(new MultiSinkFunction());
方法四:使用 Broadcast Stream 进行动态路由
DataStream<String> mainStream = ...;
DataStream<RoutingRule> ruleStream = ...;// 广播规则流
MapStateDescriptor<String, RoutingRule> ruleDescriptor = new MapStateDescriptor<>("Rules", String.class, RoutingRule.class);
BroadcastStream<RoutingRule> broadcastRules = ruleStream.broadcast(ruleDescriptor);// 连接主流和广播流
BroadcastConnectedStream<String, RoutingRule> connectedStream = mainStream.connect(broadcastRules);// 处理连接后的流
DataStream<Tuple2<String, String>> routedStream = connectedStream.process(new RoutingProcessFunction());// 根据路由结果输出到不同 Sink
routedStream.filter(value -> "kafka".equals(value.f1)).map(value -> value.f0).addSink(new KafkaSink<String>(...));routedStream.filter(value -> "file".equals(value.f1)).map(value -> value.f0).addSink(new FileSink<String>(...));
方法五:使用 Flink CDC 的多表同步
public class MultiSinkJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> input = env.socketTextStream("localhost", 9999);// 1. 输出到文件FileSink<String> fileSink = FileSink.forRowFormat(new Path("/output/file"), new SimpleStringEncoder<String>("UTF-8")).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMinutes(15)).withInactivityInterval(Duration.ofMinutes(5)).withMaxPartSize(MemorySize.ofMebiBytes(1024)).build()).build();// 2. 输出到 KafkaKafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("localhost:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("flink-output").setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();// 3. 输出到 JDBCJdbcSink<String> jdbcSink = JdbcSink.sink("INSERT INTO user_actions (action_time, action_data) VALUES (?, ?)",(statement, value) -> {statement.setTimestamp(1, new Timestamp(System.currentTimeMillis()));statement.setString(2, value);},JdbcExecutionOptions.builder().withBatchSize(1000).withBatchIntervalMs(200).withMaxRetries(3).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/flink_db").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("flinkuser").withPassword("password").build());// 添加多个 Sinkinput.addSink(fileSink).name("FileSink");input.addSink(kafkaSink).name("KafkaSink");input.addSink(jdbcSink).name("JdbcSink");env.execute("Multi-Sink Example Job");}
}