NIFI的处理器:ExecuteGroovyScript 2.4.0
ExecuteGroovyScript是常用的处理器之一,用于执行GroovyScript脚本。该脚本负责处理传入的流文件(例如传输到SUCCESS或删除)以及由该脚本创建的任何流文件。如果处理不完整或不正确,会话将被回滚。
属性值-失败处理策略 Failure strategy:如何处理未处理的异常。如果你想通过代码管理异常,那么保留默认值“rollback”。如果选择了“转移到失败”并且发生了未处理的异常,则此会话中从传入队列接收到的所有flowFiles都将转移到“失败”关系,并设置了其他属性:ERROR_MESSAGE和ERROR_STACKTRACE。如果选择了“回滚”并且发生了未处理的异常,则从传入队列接收到的所有flowFiles都将受到惩罚并返回。如果处理器没有传入连接,则此参数无效。
该脚本的性能大约是java语言的10%,性能不高,调试效率也很低,但是编写灵活,使用方便。
如下为部分代码示例:
import groovy.json.JsonSlurper
import groovy.json.JsonBuilder
// 解析输入 JSON
def flowFile = session.get();
if(!flowFile) return;
def jsonText = flowFile.read().getText('UTF-8');
def inputJson = new JsonSlurper().parseText(jsonText);
// 获取 timestamp
def timestamp = inputJson.timestamp
// 创建输出数组
def outputList = []
// 遍历 plc1 和 plc2,并提取每个设备的数据
inputJson.each { key, value ->
if (!key.equalsIgnoreCase("ct_timestamp")) {
// 克隆原始的 plc 数据
def deviceData = value.collectEntries { k, v -> [(k): v] }
// 创建输出数据结构
def outputData = [
data: deviceData,
device_name: key,
data_time: ct_timestamp
]
// 将转换后的数据添加到输出列表
outputList.add(outputData)
}
}
// 返回输出 JSON
def outputJson = new JsonBuilder(outputList).toPrettyString()
flowFile=session.write(flowFile, {outputStream ->outputStream.write(
outputJson.bytes
) } as OutputStreamCallback);
REL_SUCCESS << flowFile;