当前位置: 首页 > news >正文

flink api-datastream api-source算子

Flink源算子API是构建数据处理程序的输入端组件,主要分为预定义源和自定义源两类实现方式。以下是核心分类与使用方式:

预定义数据源

‌1.集合数据源

通过fromCollection()方法从内存集合创建DataStream,支持List、Iterator等集合类型,需指定元素类型信息

// 从List创建
DataStream<String> stream = env.fromCollection(Arrays.asList("a", "b"));
// 从Iterator创建(需指定类型)
DataStream<Long> numbers = env.fromCollection(new NumberSequenceIterator(1L, 10L), BasicTypeInfo.LONG_TYPE_INFO
);

2.文件数据源

支持读取文本文件(readTextFile)或按行解析文件(readFile),可配置文件监控模式

DataStream<String> logStream = env.readTextFile("hdfs://path/to/logs");

3.Socket数据源

通过socketTextStream从网络套接字读取实时流数据,需指定主机和端口

DataStream<String> socketStream = env.socketTextStream("localhost", 9999);

4.Kafka数据源

官方推荐使用KafkaSource构建器,需配置bootstrap服务器和反序列化器

KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("kafka:9092").setTopics("input-topic").setDeserializer(new SimpleStringSchema()).build();
DataStream<String> kafkaStream = env.fromSource(source...);

自定义数据源

需实现SourceFunction接口(无并行)或ParallelSourceFunction接口(支持并行),重写run()方法定义数据生成逻辑,通过SourceContext.collect()发射数据

public class CustomSource implements SourceFunction<Event> {@Overridepublic void run(SourceContext<Event> ctx) {while (isRunning) {ctx.collect(new Event(...)); // 发射数据Thread.sleep(1000);}}@Overridepublic void cancel() { isRunning = false; }
}
// 使用方式
DataStream<Event> customStream = env.addSource(new CustomSource());

API选择建议

**‌测试场景‌:**优先使用fromCollection或socketTextStream快速验证逻辑
**‌生产环境‌:**推荐fromSource配合Kafka/Pulsar等连接器,内置更完善的容错机制
**‌自定义扩展‌:**通过实现Source接口(新版本)替代旧版SourceFunction,支持更细粒度的水位线生成

http://www.dtcms.com/a/422945.html

相关文章:

  • 基于数据挖掘的在线游戏行为分析预测系统
  • 无极领域付费网站做外贸要访问国外的网站怎么办
  • 本地项目上传到Git仓库
  • 首批CCF教学案例大赛资源上线:涵盖控制仿真、算法与机器人等9大方向
  • Java外功精要(2)——Spring IoCDI
  • Git简单理解
  • 机器人的“神经网络”:以太网技术如何重塑机器人内部通信?【技术类】
  • k8s-pod的资源限制
  • 【附源码】基于Vue的网上约课系统的设计与实现
  • 元宇宙的操作系统:虚拟世界的管理平台
  • 软考 系统架构设计师系列知识点之杂项集萃(161)
  • Python爬虫实战:获取中国检察网公开的案件信息与数据分析
  • 北大软件数字统战解决方案:用智能化技术破解基层治理难题、提升政务服务效能
  • Vue三元表达式
  • 吉林做网站公司wordpress手机仪表盘
  • seo案例网站建设哪家好WordPress用户名怎么泄露的
  • 狄利克雷先验:贝叶斯分析中的多面手与它的学术传承
  • 第三章、信息系统治理
  • 欧姆龙 CP1H PLC借助以太网通讯处理器实现在检测生产线上的应用案例
  • 【C++】继承:菱形继承
  • 【Rust GUI开发入门】编写一个本地音乐播放器(4. 绘制按钮组件)
  • Django小说个性化推荐系统 双算法(基于用户+物品) 评论收藏 书架管理 协同过滤推荐算法(源码+文档)✅
  • 微调数据格式详解:适配任务、模型与生态的最佳实践
  • 黑帽seo是什么做模板网站乐云seo效果好
  • 怎么编辑自己的网站企业展示型网站程序
  • java所有线程都是通过Callable和Runnable和Thread实现的
  • 0.7 秒实现精准图像编辑!VAREdit 让 AI 图像编辑告别“拖沓与失控,代码模型已开源。
  • 计算机软件包含网站开发购物网站开发设计类图
  • 【避坑实战】C# WinForm 上位机开发:解决串口粘包+LiveCharts卡顿+InfluxDB存储(免费代码+仿真工具)
  • 开源 C# 快速开发(十二)进程监控