启动服务#
工作流需要在 Takler 服务中才能自动运行,Takler 可以将工作流定义和 Takler 服务启动集成到同一个脚本文件中。
Takler 服务以异步方式运行网络监听服务和调度器:
网络监听服务 (
NetworkService) 用于响应从客户端发送的请求,执行相应操作调度器 (
Scheduler) 用于定时检查工作流任务状态,运行符合条件的任务
基本步骤#
使用 Takler 运行单个工作流的基本步骤:
创建工作流
创建 Takler 服务对象,添加工作流到服务
启动服务
修改 Python 脚本#
修改 test.py 文件:
1import asyncio
2import sys
3from pathlib import Path
4
5from takler.core import Flow
6from takler.server import TaklerServer
7from takler.tasks.shell import ShellScriptTask
8from takler.visitor import pre_order_travel, PrintVisitor
9
10
11TAKLER_HOME = Path(__file__).parent
12
13
14def create_flow():
15 flow = Flow("test")
16 flow.add_parameter("TAKLER_HOME", TAKLER_HOME)
17 task1 = flow.add_task(ShellScriptTask("t1"))
18 task1.add_parameter("TAKLER_SCRIPT", Path(TAKLER_HOME, "test/task1.takler"))
19 return flow
20
21
22async def run_takler(server):
23 await server.start()
24 await server.run()
25
26
27def main():
28 flow = create_flow()
29 pre_order_travel(flow, PrintVisitor(sys.stdout))
30
31 server = TaklerServer()
32 server.bunch.add_flow(flow)
33 asyncio.run(run_takler(server))
34
35
36if __name__ == "__main__":
37 main()
逐行介绍新增代码:
1:导入 Python 异步包 asyncio
6:导入 Takler 服务类
TaklerServer22:异步函数
run_takler用于启动 Takler 服务并运行调度器23:启动 Takler 服务
24:运行 Takler 服务,直到服务被终止
31:创建 Takler 服务对象
server32:将工作流
flow添加到 Takler 服务server33:运行异步函数
run_takler,直到函数运行结束,即 Takler 服务被终止
运行 Takler 服务#
执行 test.py 代码,启动并运行 Takler 服务
python test.py
脚本输出日志如下:
|- test [unknown]
|- t1 [unknown]
2022-07-05 03:09:52.853 | INFO | takler.server:start:35 - start server...
2022-07-05 03:09:52.902 | INFO | takler.server.network_service:start:51 - service started: [::]:33083
2022-07-05 03:09:52.902 | INFO | takler.server:start:38 - start server...done
2022-07-05 03:09:52.902 | INFO | takler.server.scheduler:main_loop:56 - main loop...
2022-07-05 03:10:02.911 | INFO | takler.server.scheduler:main_loop:56 - main loop...
2022-07-05 03:10:12.939 | INFO | takler.server.scheduler:main_loop:56 - main loop...
Takler 服务监听 [::]:33083 端口。
Takler 的调度器每隔 10 秒检查工作流状态,自动提交符合触发器条件的任务。
默认设置下,工作流处于 unknown 状态,任务不会被自动提交。
练习#
修改 test.py
运行 test.py,检查命令行输出