定义新工作流#

本节介绍如何定义只有一个任务的工作流。

创建工作流文件#

创建一个 Python 文件 test.py

本教程在目录 ${TAKLER_HOME} (即 /g6/wangdp/project/course/takler/tutorial) 中创建该文件:

 1import sys
 2from pathlib import Path
 3
 4from takler.core import Flow, Bunch
 5from takler.tasks.shell import ShellScriptTask
 6from takler.visitor import pre_order_travel, PrintVisitor
 7
 8
 9TAKLER_HOME = Path(__file__).parent
10
11
12def create_flow():
13    flow = Flow("test")
14    flow.add_parameter("TAKLER_HOME", str(TAKLER_HOME))
15    task1 = flow.add_task(ShellScriptTask("t1"))
16    task1.add_parameter("TAKLER_SCRIPT", str(Path(TAKLER_HOME, "test/task1.takler")))
17    return flow
18
19
20if __name__ == "__main__":
21    test_flow = create_flow()
22    bunch = Bunch()
23    bunch.add_flow(test_flow)
24    pre_order_travel(test_flow, PrintVisitor(sys.stdout))

上述代码定义了一个叫做 test 的工作流,包含一个叫做 t1 的任务。

逐行解释上述代码:

1-2:导入 Python 自带包

4-6:导入 takler 包中对象

  • Flow:工作流类

  • Bunch:工作流集合类,可以包含多个工作流 (Flow) 对象

  • ShellScriptTask:Shell 脚本任务类

  • pre_order_travel() 函数:前序遍历工作流

  • PrintVisitor 类:打印节点信息

9:create_flow() 函数创建只有一个任务 (task1) 的工作流 (flow1)

13:定义工作流 flow

14:为 flow 定义变量 TAKLER_HOME,该变量定义工作流生成的作业文件的保存目录,本例中是脚本当前目录

15:定义 Shell 脚本任务 task1

16:为 task1 定义变量 TAKLER_SCRIPT,该变量定义 Shell 脚本任务对应的脚本目录,本例中为

/g6/wangdp/project/course/takler/tutorial/test/task1.takler

17:create_flow() 函数返回工作流 flow 对象

20:定义直接运行脚本会执行的代码

21:创建工作流 flow

22:创建工作流集合对象 bunchBunch 对象用于保存 Takler 服务中的工作流。

23:将工作流 flow 添加到 bunch

24:pre_order_travelPrintVisitor 结合会打印工作流的树形结构

运行上述脚本

python test.py

会打印工作流结构

|- test [unknown]
  |- t1 [unknown]

另一种定义方式#

为了表现工作流节点的层次关系,可以使用 Python 的 with 语句创建工作流,上述代码中的 create_flow() 函数可以修改为:

def create_flow():
   with Flow("test") as flow
       flow.add_parameter("TAKLER_HOME", TAKLER_HOME)
       with flow.add_task(ShellScriptTask("t1")) as task1:
           task1.add_parameter("TAKLER_SCRIPT", Path(TAKLER_HOME, "test/task1.takler"))
   return flow

备注

在大型工作流中可以使用 with 方式合理缩进代码,方便后续修改维护。

练习#

  1. 创建工作流定义文件 test.py

  2. 运行脚本