当前位置: 首页 > news >正文

巧用输出变量,提升Dolphinscheduler工作流灵活性和可维护性

输出变量是 DolphinScheduler 任务调度中实现数据流动与任务协作的核心机制,通过显式定义和传递参数,解决了跨节点数据共享、优先级冲突等问题,同时支持复杂流程编排(如子流程、条件分支)。合理使用输出变量能显著提升工作流的灵活性和可维护性。本文将介绍 DolphinScheduler 中重要的输出变量及其使用方法。

1、Shell 脚本中,单引号 (')、双引号 (") 和反引号 (`)使用

在 Shell 脚本中,单引号 (')、双引号 (") 和反引号 (`) 各自有不同的作用和用法。理解它们的区别和用法对于编写和调试 Shell 脚本非常重要

1.1、单引号 (')

  • 用途:完全引用,用于保护字符串中的所有字符,不进行变量替换或命令替换
  • 特性:引号内的任何字符都将原样输出,不会进行任何解释
VAR="world"
echo 'Hello, $VAR'  # 输出:Hello, $VAR

1.2、双引号 (")

  • 用途:部分引用,用于保护字符串中的大部分字符,但允许变量替换和命令替换
  • 特性:引号内的变量和命令会被解释,其余字符原样输出
VAR="world"
echo "Hello, $VAR"  # 输出:Hello, world

1.3、反引号 (`)

  • 用途:命令替换,用于执行引号内的命令,并将命令的输出作为结果返回
  • 特性:引号内的命令会被执行,结果会替换反引号中的内容
  • 注意:反引号是旧的命令替换方式,更推荐使用 $()
DATE=`date`
echo "Current date and time: $DATE"  # 输出当前日期和时间

1.4、推荐使用 $() 进行命令替换

  • 更现代和可读性更好
  • 可以嵌套使用,而反引号嵌套使用比较困难
DATE=$(date)
echo "Current date and time: $DATE"  # 输出当前日期和时间

# 嵌套命令替换示例
OUTER=$(echo "Outer $(echo "Inner")")
echo $OUTER  # 输出:Outer Inner

1.5、使用场景

  • 单引号:当你不希望字符串中的任何内容被解释时使用,例如正则表达式、特殊字符等
  • 双引号:当你希望字符串中包含变量或命令替换时使用
  • 反引号/$():当你需要执行命令并使用其输出时使用

2、Dolphinscheduler中的输出变量和文件传递

2.1、Shell输出变量

  • 2.1.1、流程如下
    在这里插入图片描述

  • 2.1.2、任务配置
    taskA
    在这里插入图片描述

echo 'taskA'
echo "#{setValue(linesNum=${lines_num})}"
echo '${setValue(words=20)}'

注意 : 这里${lines_num}其实直接就是通过Worker进行变量进行替换的

taskB
在这里插入图片描述

echo 'taskB'
echo ${linesNum}
echo ${words}
  • 2.1.3、结果输出
    主要看taskB的输出
}
[INFO] 2024-07-05 10:09:54.539 +0800 - Success initialized task plugin instance successfully
[INFO] 2024-07-05 10:09:54.539 +0800 - Set taskVarPool: [{"prop":"linesNum","direct":"IN","type":"VARCHAR","value":"100"},{"prop":"words","direct":"IN","type":"VARCHAR","value":"20"}] successfully
[INFO] 2024-07-05 10:09:54.539 +0800 - ***********************************************************************************************
[INFO] 2024-07-05 10:09:54.539 +0800 - *********************************  Execute task instance  *************************************
[INFO] 2024-07-05 10:09:54.539 +0800 - ***********************************************************************************************
[INFO] 2024-07-05 10:09:54.540 +0800 - Final Shell file is: 
[INFO] 2024-07-05 10:09:54.540 +0800 - ****************************** Script Content *****************************************************************
[INFO] 2024-07-05 10:09:54.540 +0800 - #!/bin/bash
BASEDIR=$(cd `dirname $0`; pwd)
cd $BASEDIR
source /etc/profile
export HADOOP_HOME=${HADOOP_HOME:-/home/hadoop-3.3.1}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/opt/soft/hadoop/etc/hadoop}
export SPARK_HOME=${SPARK_HOME:-/home/spark-3.2.1-bin-hadoop3.2}
export PYTHON_HOME=${PYTHON_HOME:-/opt/soft/python}
export HIVE_HOME=${HIVE_HOME:-/home/hive-3.1.2}
export FLINK_HOME=/home/flink-1.18.1
export DATAX_HOME=${DATAX_HOME:-/opt/soft/datax}
export SEATUNNEL_HOME=/opt/software/seatunnel
export CHUNJUN_HOME=${CHUNJUN_HOME:-/opt/soft/chunjun}

export PATH=$HADOOP_HOME/bin:$SPARK_HOME/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$SEATUNNEL_HOME/bin:$CHUNJUN_HOME/bin:$PATH
echo 'taskB'
echo 100
echo 20
[INFO] 2024-07-05 10:09:54.540 +0800 - ****************************** Script Content *****************************************************************
[INFO] 2024-07-05 10:09:54.540 +0800 - Executing shell command : sudo -u root -i /tmp/dolphinscheduler/exec/process/root/13850571680800/14171397340576_3/1961/1454/1961_1454.sh
[INFO] 2024-07-05 10:09:54.544 +0800 - process start, process id is: 588336
[INFO] 2024-07-05 10:09:56.544 +0800 -  -> 
    taskB
    100
    20
[INFO] 2024-07-05 10:09:56.546 +0800 - process has exited. execute path:/tmp/dolphinscheduler/exec/process/root/13850571680800/14171397340576_3/1961/1454, processId:588336 ,exitStatusCode:0 ,processWaitForStatus:true ,processExitValue:0

2.2、Shell文件传递

  • 2.2.1、流程如下
    在这里插入图片描述

  • 2.2.2、任务配置
    fileUploadTask
    在这里插入图片描述

echo 'fileUploadTask'

mkdir -p data/test1 data/test2
echo "test1 message" >> data/test1/text.txt
echo "test2 message" >> data/test2/text.txt

tree.

fileDownloadTask
在这里插入图片描述

echo 'fileDownloadTask'

cat input_dir/test1/text.txt
cat input_dir/test2/text.txt
  • 2.2.3、结果输出
    fileDownloadTask
[INFO] 2024-07-05 11:11:08.160 +0800 - Success initialized task plugin instance successfully
[INFO] 2024-07-05 11:11:08.160 +0800 - Set taskVarPool: [{"prop":"fileUploadTask.file-text","direct":"IN","type":"FILE","value":"DATA_TRANSFER/20240705/14171986797856/2_1962/fileUploadTask_1455_text.txt"},{"prop":"fileUploadTask.dir-data","direct":"IN","type":"FILE","value":"DATA_TRANSFER/20240705/14171986797856/2_1962/fileUploadTask_1455_data_ds_pack.zip"}] successfully
[INFO] 2024-07-05 11:11:08.160 +0800 - ***********************************************************************************************
[INFO] 2024-07-05 11:11:08.160 +0800 - *********************************  Execute task instance  *************************************
[INFO] 2024-07-05 11:11:08.160 +0800 - ***********************************************************************************************
[INFO] 2024-07-05 11:11:08.160 +0800 - Final Shell file is: 
[INFO] 2024-07-05 11:11:08.160 +0800 - ****************************** Script Content *****************************************************************
[INFO] 2024-07-05 11:11:08.160 +0800 - #!/bin/bash
BASEDIR=$(cd `dirname $0`; pwd)
cd $BASEDIR
source /etc/profile
export HADOOP_HOME=${HADOOP_HOME:-/home/hadoop-3.3.1}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/opt/soft/hadoop/etc/hadoop}
export SPARK_HOME=${SPARK_HOME:-/home/spark-3.2.1-bin-hadoop3.2}
export PYTHON_HOME=${PYTHON_HOME:-/opt/soft/python}
export HIVE_HOME=${HIVE_HOME:-/home/hive-3.1.2}
export FLINK_HOME=/home/flink-1.18.1
export DATAX_HOME=${DATAX_HOME:-/opt/soft/datax}
export SEATUNNEL_HOME=/opt/software/seatunnel
export CHUNJUN_HOME=${CHUNJUN_HOME:-/opt/soft/chunjun}

export PATH=$HADOOP_HOME/bin:$SPARK_HOME/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$SEATUNNEL_HOME/bin:$CHUNJUN_HOME/bin:$PATH
echo 'fileDownloadTask'

cat input_dir/test1/text.txt
cat input_dir/test2/text.txt
[INFO] 2024-07-05 11:11:08.161 +0800 - ****************************** Script Content *****************************************************************
[INFO] 2024-07-05 11:11:08.161 +0800 - Executing shell command : sudo -u root -i /tmp/dolphinscheduler/exec/process/root/13850571680800/14171986797856_2/1962/1456/1962_1456.sh
[INFO] 2024-07-05 11:11:08.164 +0800 - process start, process id is: 590323
[INFO] 2024-07-05 11:11:10.164 +0800 -  -> 
    fileDownloadTask
    test1 message
    test2 message
[INFO] 2024-07-05 11:11:10.166 +0800 - process has exited. execute path:/tmp/dolphinscheduler/exec/process/root/13850571680800/14171986797856_2/1962/1456, processId:590323 ,exitStatusCode:0 ,processWaitForStatus:true ,processExitValue:0

2.3、SQL任务输出变量

模式从SqlTask任务进行结果的输出,ShellTask对结果使用

  • 2.3.1、流程如下
    在这里插入图片描述

  • 2.3.2、任务配置
    sqlOutVarTask
    在这里插入图片描述

select user_name as userNameList from t_ds_user

readOutVarTask
在这里插入图片描述

echo 'readOutVarTask'
echo ${userNameList}
  • 2.3.3、结果输出
    readOutVarTask
[INFO] 2024-07-05 11:19:00.294 +0800 - Success initialized task plugin instance successfully
[INFO] 2024-07-05 11:19:00.294 +0800 - Set taskVarPool: [{"prop":"userNameList","direct":"IN","type":"LIST","value":"[\"admin\",\"qiaozhanwei\",\"test\"]"}] successfully
[INFO] 2024-07-05 11:19:00.294 +0800 - ***********************************************************************************************
[INFO] 2024-07-05 11:19:00.294 +0800 - *********************************  Execute task instance  *************************************
[INFO] 2024-07-05 11:19:00.294 +0800 - ***********************************************************************************************
[INFO] 2024-07-05 11:19:00.295 +0800 - Final Shell file is: 
[INFO] 2024-07-05 11:19:00.295 +0800 - ****************************** Script Content *****************************************************************
[INFO] 2024-07-05 11:19:00.295 +0800 - #!/bin/bash
BASEDIR=$(cd `dirname $0`; pwd)
cd $BASEDIR
source /etc/profile
export HADOOP_HOME=${HADOOP_HOME:-/home/hadoop-3.3.1}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/opt/soft/hadoop/etc/hadoop}
export SPARK_HOME=${SPARK_HOME:-/home/spark-3.2.1-bin-hadoop3.2}
export PYTHON_HOME=${PYTHON_HOME:-/opt/soft/python}
export HIVE_HOME=${HIVE_HOME:-/home/hive-3.1.2}
export FLINK_HOME=/home/flink-1.18.1
export DATAX_HOME=${DATAX_HOME:-/opt/soft/datax}
export SEATUNNEL_HOME=/opt/software/seatunnel
export CHUNJUN_HOME=${CHUNJUN_HOME:-/opt/soft/chunjun}

export PATH=$HADOOP_HOME/bin:$SPARK_HOME/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$SEATUNNEL_HOME/bin:$CHUNJUN_HOME/bin:$PATH
echo 'readOutVarTask'
echo ["admin","qiaozhanwei","test"]
[INFO] 2024-07-05 11:19:00.295 +0800 - ****************************** Script Content *****************************************************************
[INFO] 2024-07-05 11:19:00.295 +0800 - Executing shell command : sudo -u root -i /tmp/dolphinscheduler/exec/process/root/13850571680800/14172048617888_1/1963/1458/1963_1458.sh
[INFO] 2024-07-05 11:19:00.299 +0800 - process start, process id is: 590781
[INFO] 2024-07-05 11:19:02.299 +0800 -  -> 
    readOutVarTask
    [admin,qiaozhanwei,test]
[INFO] 2024-07-05 11:19:02.301 +0800 - process has exited. execute path:/tmp/dolphinscheduler/exec/process/root/13850571680800/14172048617888_1/1963/1458, processId:590781 ,exitStatusCode:0 ,processWaitForStatus:true ,processExitValue:0

以上就是今天的全部内容了,希望对大家更好地使用DolphinScheduler有所帮助。

转载自Journey
原文链接:https://segmentfault.com/a/1190000045035406

相关文章:

  • 【多线程-第四天-自己模拟SDWebImage的下载图片功能-自定义block和传递参数 Objective-C语言】
  • 技术引领未来创新发展引擎
  • 库存扣减解决方案
  • 南京审计大学:《 面向工程审计行业的DeepSeek大模型应用指南》.pdf(免费下载)
  • 7. 【Vue实战--孢子记账--Web 版开发】-- 收支分类设置
  • MySQL 调优:查询慢除了索引还能因为什么?
  • 设计模式之责任链模式:原理、实现与应用
  • 各软件快捷键
  • 【CXX-Qt】2.5 继承
  • 基于认证的 Harbor 容器镜像仓库
  • 基于koajsAdmin+mongodb的后台管理快速开发框架安装运行记录
  • 深度学习-151-Dify工具之创建一个生成财务报表的智能体Agent
  • 【容器运维】docker搭建私有仓库
  • 【MySQL篇】复合查询
  • 数学爱好者写的编程系列文章
  • Linux | make和Makefile命令详细篇
  • 深度学习:让机器学会“思考”的魔法
  • webpack使用详细步骤
  • SpringBootAdmin-clinet自定义监控CPU、内存、磁盘等health
  • Linux:xxx is not in the sudoers file. This incident will be reported.
  • 前4个月我国货物贸易进出口同比增长2.4%,增速较一季度加快1.1个百分点
  • 顾家家居:拟定增募资近20亿元,用于家居产品生产线的改造和扩建等
  • 大四本科生已发14篇SCI论文?学校工作人员:已记录汇报
  • 戴维·珀杜宣誓就任美国驻华大使
  • 巴基斯坦军方:印度袭击已致巴方31人死亡
  • 中国经济新动能|警惕数字时代下经济的“四大极化”效应