当前位置: 首页 > news >正文

Apache NiFi 完全入门与实战教程:从零构建数据流水线

 目录

  1. 什么是 Apache NiFi?
  2. NiFi 的核心特性
  3. 安装与启动 NiFi
  4. NiFi 核心概念详解
  5. 实战 1:读取文件 → 输出日志
  6. 实战 2:HTTP 接口接收 JSON → 写入文件
  7. 实战 3:定时拉取数据库 → 发送到 Kafka
  8. 高级功能:表达式语言、路由、转换
  9. 性能调优与监控
  10. 最佳实践与常见问题
  11. 处理器参考手册

1. 什么是 Apache NiFi?

Apache NiFi 是一个基于 流式数据处理 的开源数据集成工具,最初由美国国家安全局(NSA)开发,后捐赠给 Apache 基金会。

它提供了一个可视化、拖拽式界面,允许用户通过图形化方式设计、部署和监控数据流(DataFlow),实现:

  • 数据采集(日志、文件、数据库、API)
  • 数据转换(清洗、格式化、丰富)
  • 数据路由(条件判断、分流)
  • 数据分发(Kafka、HDFS、S3、Elasticsearch 等)

一句话总结:NiFi 是一个“数据管道的可视化操作系统”。


2. NiFi 的核心特性

特性说明
🔧 可视化界面拖拽组件,无需写代码即可构建复杂数据流
⚡ 高吞吐、低延迟支持每秒数万条消息处理
🔁 数据溯源(Provenance)记录每条数据的来源、去向、处理过程,便于审计与调试
🔐 安全传输支持 SSL/TLS、Kerberos、LDAP 认证
🔄 背压(Backpressure)自动控制流量,防止下游系统过载
📦 丰富的处理器(Processor)超过 300+ 内置组件,支持扩展
🌐 集群支持可部署为集群,实现高可用与负载均衡

3. 安装与启动 NiFi

3.1 环境要求

  • Java 21
  • Linux / macOS / Windows
  • 至少 4GB 内存

3.2 下载与安装

# 下载最新稳定版(以 1.26.0 为例)
wget https://mirrors.tuna.tsinghua.edu.cn/apache/nifi/2.6.0/nifi-2.6.0-bin.zip# 解压
unzip  nifi-2.6.0-bin.zip
cd nifi-1.26.0

3.3 启动 NiFi

# 启动
bin/nifi.sh start# 查看状态
bin/nifi.sh status# 停止
bin/nifi.sh stop

3.4 访问 Web UI

打开浏览器访问:

http://localhost:8080/nifi

首次启动可能需要等待 1-2 分钟。


4. NiFi 核心概念详解

4.1 FlowFile(数据文件)

NiFi 中的最小数据单元,包含:

  • 内容(Content):实际数据(如 JSON、日志、二进制)
  • 属性(Attributes):元数据(如 filename、path、uuid、自定义属性)

📌 类比:FlowFile = 信封(属性) + 信件内容(内容)

4.2 Processor(处理器)

执行具体操作的组件,如:

  • GetFile:读取文件
  • PutKafka:写入 Kafka
  • UpdateAttribute:修改属性
  • LogAttribute:打印属性(调试用)

4.3 Connection(连接)

连接两个 Processor,形成数据流。

  • 可设置队列(Queue),支持背压
  • 可配置最大队列大小(如 10,000 条)

4.4 Process Group(处理组)

逻辑容器,用于组织多个 Processor,支持嵌套。

类比:文件夹,用于模块化管理数据流。

4.5 Controller Services(控制器服务)

共享资源,如:

  • DBCPConnectionPool:数据库连接池
  • KafkaControllerService:Kafka 配置
  • SSLContextService:SSL 配置

4.6 Data Provenance(数据溯源)

NiFi 自动记录每条 FlowFile 的:

  • 何时被创建
  • 经过哪些 Processor
  • 属性如何变化
  • 耗时、大小等

可在 UI 中点击 Provenance 标签查看。


5. 实战 1:读取文件 → 输出日志

目标

读取 /tmp/input/ 目录下的文件,打印其内容和属性。

步骤

  1. 添加 GetFile 处理器

    • 配置 Input Directory/tmp/input
    • Keep Source Filefalse(处理后删除)
    • Polling Interval5 sec
  2. 添加 LogAttribute 处理器

    • 连接 GetFile → LogAttribute
    • 配置 Log Levelinfo
    • Attributes to Log:留空(记录所有)
  3. 启动处理器

    • 右键点击处理器 → Start
  4. 测试

    echo "Hello NiFi!" > /tmp/input/test.txt

    查看 NiFi 日志(logs/nifi-app.log),应看到属性和内容输出。


6. 实战 2:HTTP 接口接收请求 → 写入文件

目标

通过 HTTP 接收 JSON 数据,保存为本地文件。

步骤

  1. 添加 HandleHttpRequest

    • Listening Port8081
    • HTTP Context Path/data
  2. 添加 HandleHttpResponse

    • 连接在最后,用于返回响应
  3. 添加 PutFile

    • Directory/tmp/output
    • 文件名可设置为 ${filename}-${uuid}.json
  4. 连接流程

    HandleHttpRequest → PutFile → HandleHttpResponse
  5. 测试

    curl -X POST http://localhost:8081/data \-H "Content-Type: application/json" \-d '{"name": "张三", "age": 30}'

    查看 /tmp/output/ 目录,应生成 JSON 文件。

  6. 测试图


7. 实战 3:定时拉取数据库 → 发送到 Kafka

目标

每 30 秒从 MySQL 查询数据,发送到 Kafka。

前提

  • MySQL 已安装,表 users 存在
  • Kafka 已启动,Topic user_events 存在

步骤

  1. 配置 DBCP 连接池

    • 菜单 → Controller Settings → Controller Services
    • 添加 DBCPConnectionPool
    • 配置 JDBC URL、用户名、密码、驱动 JAR(需放入 lib/ 目录)
  2. 添加 QueryDatabaseTable

    • 选择 DBCP 服务
    • Table Nameusers
    • Run Schedule30 sec
  3. 添加 ConvertAvroToJSON(可选)

    • 将 Avro 格式转为 JSON
  4. 添加 PutKafka

    • 配置 Kafka Broker、Topic 名称
    • 使用表达式 ${topic} 动态指定 Topic
  5. 连接并启动

    QueryDatabaseTable → ConvertAvroToJSON → PutKafka

8. 高级功能:表达式语言、路由、转换

8.1 表达式语言(Expression Language)

NiFi 支持强大的 EL,语法:${attributeName}

示例
  • ${filename:equals('data.txt')} → 判断文件名
  • ${path:contains('logs')} → 判断路径
  • ${now():toNumber()} → 当前时间戳
  • ${uuid} → 生成 UUID

8.2 路由(RouteOnAttribute)

根据属性值分流:

Condition: code == '200' → success
Condition: code != '200' → failure

8.3 数据转换

  • JoltTransformJSON:JSON 结构转换
  • EvaluateJsonPath:提取 JSON 字段 → 属性
  • ReplaceText:字符串替换(支持 EL)
  • SplitJson:将数组拆分为多个 FlowFile

9. 性能调优与监控

9.1 关键配置(nifi.properties

配置项建议值说明
nifi.web.http.port8080Web 端口
nifi.cluster.is.nodetrue是否集群节点
nifi.flowfile.repository.directory/data/nifi/flowfileFlowFile 存储
nifi.database.repository.directory/data/nifi/database元数据存储
nifi.bored.yield.duration10.millis空闲时 CPU 优化

9.2 监控指标

  • 队列长度:避免积压
  • Backpressure:触发时说明下游压力大
  • Provenance Events:分析处理延迟
  • JVM GC 日志:避免长时间停顿

9.3 集群模式

  • 使用 ZooKeeper 管理集群
  • 所有节点配置相同 nifi.cluster.node.address
  • 流程在集群中自动同步

10. 最佳实践与常见问题

✅ 最佳实践

  1. 模块化设计:使用 Process Group 划分功能模块
  2. 命名规范:Processor 命名为 Get-LogFilePut-Kafka-User
  3. 日志调试:善用 LogAttribute 和 LogMessage
  4. 版本控制:导出流程为 XML,存入 Git
  5. 安全配置:启用 HTTPS、LDAP 认证

11. 处理器参考手册

    http://www.dtcms.com/a/466280.html

    相关文章:

  1. xtuoj 字符串
  2. TDengine 数学函数 ACOS() 用户手册
  3. wordpress做社区网站我的微信公众号
  4. 判断和测量共模信号
  5. STM32H743-ARM例程15-RTC
  6. 顺企网贵阳网站建设怎么创建网站后台
  7. 常州酒店网站建设外贸网站做开关行业的哪个好
  8. 沈阳市建设工程质量检测中心网站内容型网站
  9. 做的好的地方网站wordpress上传思源字体
  10. leetcode 62 不同路径
  11. GitHub fork仓库同步原仓库tags(标签)的详细教程
  12. 岳阳品牌网站定制开发建站页面
  13. 网站维护的协议给一个企业做网站
  14. Servlet 调试
  15. 《大模型赋能文化遗产数字化:古籍修复与知识挖掘的技术实践》
  16. TSP问题1 NEURAL COMBINATORIAL OPTIMIZATION WITH REINFORCEMENT LEARNING
  17. 代码随想录Day46|647. 回文子串、516.最长回文子序列
  18. 钦州 网站建设全屋定制十大名牌口碑
  19. 【MySQL】认识数据库以及MySQL安装
  20. 网站建设网站软件有哪些内容金华网站建设seo
  21. 做素描的网站鲜花网站建设文档
  22. 从 PE 安装 Windows 系统全流程教程(适合U盘重装)
  23. 自动下载ICLR论文
  24. 导诊机器人如何提升三甲医院服务效能?
  25. 北京网站模板下载品牌定位的三要素
  26. 做游戏都需要什么网站微网站开发需要多少钱
  27. 存储RAM/ROM硬件笔试真题解析
  28. React.lazy 和 suspense 如何使用?
  29. 深圳购物网站建设价格引流推广app
  30. 【React】useMemo 和 useEffect 的用法