flink api-datastream api-source算子
Flink源算子API是构建数据处理程序的输入端组件,主要分为预定义源和自定义源两类实现方式。以下是核心分类与使用方式:
预定义数据源
1.集合数据源
通过fromCollection()方法从内存集合创建DataStream,支持List、Iterator等集合类型,需指定元素类型信息
// 从List创建
DataStream<String> stream = env.fromCollection(Arrays.asList("a", "b"));
// 从Iterator创建(需指定类型)
DataStream<Long> numbers = env.fromCollection(new NumberSequenceIterator(1L, 10L), BasicTypeInfo.LONG_TYPE_INFO
);
2.文件数据源
支持读取文本文件(readTextFile)或按行解析文件(readFile),可配置文件监控模式
DataStream<String> logStream = env.readTextFile("hdfs://path/to/logs");
3.Socket数据源
通过socketTextStream从网络套接字读取实时流数据,需指定主机和端口
DataStream<String> socketStream = env.socketTextStream("localhost", 9999);
4.Kafka数据源
官方推荐使用KafkaSource构建器,需配置bootstrap服务器和反序列化器
KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("kafka:9092").setTopics("input-topic").setDeserializer(new SimpleStringSchema()).build();
DataStream<String> kafkaStream = env.fromSource(source...);
自定义数据源
需实现SourceFunction接口(无并行)或ParallelSourceFunction接口(支持并行),重写run()方法定义数据生成逻辑,通过SourceContext.collect()发射数据
public class CustomSource implements SourceFunction<Event> {@Overridepublic void run(SourceContext<Event> ctx) {while (isRunning) {ctx.collect(new Event(...)); // 发射数据Thread.sleep(1000);}}@Overridepublic void cancel() { isRunning = false; }
}
// 使用方式
DataStream<Event> customStream = env.addSource(new CustomSource());
API选择建议
**测试场景:**优先使用fromCollection或socketTextStream快速验证逻辑
**生产环境:**推荐fromSource配合Kafka/Pulsar等连接器,内置更完善的容错机制
**自定义扩展:**通过实现Source接口(新版本)替代旧版SourceFunction,支持更细粒度的水位线生成