结构#
树形结构#
- class takler.core.Bunch(name='', host=None, port=None)#
- find_generated_parameter(name)#
Find generated
parameteronly in this node.- 返回类型:
Optional[Parameter]
- find_node(a_path)#
Find node from path string.
- 参数:
a_path (
str) -- node path starting with "/".- 返回类型:
Optional[Node]
- find_path(a_path)#
Find node or node's variable from path string.
- generated_parameters_only()#
Return a dict containerd generated parameters in this Node
- 返回类型:
Dict[str,Parameter]
备注
Return value is dict of reference of ``Parameter``s, so it's value will be updated automatically.
- get_node_status()#
Calculate node status from all flows in bunch.
- 返回类型:
- class takler.core.Flow(name)#
- find_generated_parameter(name)#
Find generated
parameteronly in this node.- 返回类型:
Optional[Parameter]
- generated_parameters_only()#
Return a dict containerd generated parameters in this Node
- 返回类型:
Dict[str,Parameter]
备注
Return value is dict of reference of ``Parameter``s, so it's value will be updated automatically.
- requeue(reset_repeat=True)#
Requeue the node and all its child nodes.
Call
Node.requeuefirst and then requeue its children.
- requeue_calendar()#
Requeue calendar with current time.
- update_calendar(time)#
Update calendar using given time. Used in scheduler's main loop.
After generated parameters are updated,
calendar_changedis called to update time attributes.- 参数:
time (
datetime)
- update_generated_parameters()#
Update generated parameters for this node.
- class takler.core.NodeContainer(name)#
- add_container(container)#
Add a
NodeContainer. Ifcontaineris a string, a newNodeContainerwill be created.- 参数:
container (
Union[str,NodeContainer])- 返回类型:
- add_task(task)#
Add a
Task.Should use a real
Taskinstead of baseTaskobject.
- calendar_changed(calendar)#
When Flow's calendar is changed, call this method to update time attributes.
- 参数:
calendar (
Calendar) -- The calendar of a Flow.
- computed_status(immediate)#
Compute node_status of a node from its children. Won't change anything in
node.If
immediateis True, use children's node_status. Ifimmediateis False, compute each child's node_status- 参数:
immediate (
bool)- 返回类型:
- requeue(reset_repeat=True)#
Requeue the node and all its child nodes.
Call
Node.requeuefirst and then requeue its children.
- resolve_dependencies()#
Check all dependencies in the Node, and return True if all dependencies are satisfied.
- 返回:
True if node should be run, or False.
- 返回类型:
bool
- sink_status_change(node_status)#
Apply the node_status change to all its descendants with side effects.
- sink_status_change_only(node_status)#
Apply the node_status change to all its descendants without doing anything.
Sink status down. This method can only be called in set_state and itself.
- class takler.core.Task(name)#
- after_run()#
If task has been successfully run, change node status to
NodeStatus.submittedand handle status change.
- before_run()#
Increment try no before actually run the task.
- computed_status(immediate)#
The status of a task is its own status.
- 返回类型:
- do_run()#
Run the task, actually.
Subclasses of
Taskshould reimplement this method to do the real run operation and deal with errors.- 返回类型:
bool
- classmethod fill_from_dict(d, node, method=SerializationType.Status)#
Fill a
Nodebased object from dictionary.Subclasses of
Nodeshould override this method and callNode.file_from_dictin the override method.
- find_generated_parameter(name)#
Find generated
parameteronly in this node.- 返回类型:
Optional[Parameter]
- generated_parameters_only()#
Return a dict containerd generated parameters in this Node
- 返回类型:
Dict[str,Parameter]
备注
Return value is dict of reference of ``Parameter``s, so it's value will be updated automatically.
- requeue(reset_repeat=True)#
Requeue the node itself, don't affect children nodes.
Requeue operation sets node's status to
NodeStatus.queued
- resolve_dependencies()#
Check dependencies and run the task if all dependencies are met.
- 返回:
True if all dependencies are met and the task is run, False otherwise.
- 返回类型:
bool
- run()#
Run the task, set status to
NodeStatus.submittedand handle status change.
- update_generated_parameters()#
Update generated parameters for this node.
- update_limits()#
Update limits according task status, only used when task status is changed.
- class takler.core.node.Node(name)#
- name#
- state#
- parent#
- children#
- user_parameters#
- trigger_expression#
触发器表达式,一个节点只有一个触发器表达式。 触发器在添加时默认不解析。在评估触发器时,会自动解析没有被解析的触发器,触发器只会被解析一次。
- events#
- meters#
- limits#
- in_limit_manager#
- repeat#
- times#
- add_in_limit(limit_name, node_path=None, tokens=1)#
Add InLimit to this node's InLimitManager.
- 参数:
limit_name (
str) -- Limit's namenode_path (
Optional[str]) -- Limit's reference node path, default is Nonetokens (
int) -- tokens for this node, default is 1.
- 返回类型:
- add_limit(name, limit)#
Add a Limit to node. Limits in one Node should not have duplicate names.
- 参数:
name (
str) -- limit name.limit (
int) -- total tokens for Limit.
- 返回类型:
- add_parameter(param, value=None)#
- 返回类型:
Optional[Parameter]
Add ``Parameter``(s) to this node.
If
param` is a dict or list, ``valuemust be None. Ifparamis a str,valuemust not be None.TODO: add_parameter([dict])
- add_time(time)#
Add a
TimeAttributeto Node. Node can have multiply time attributes.- 参数:
time (
Union[time,str]) -- When to "run" the node.- 返回类型:
- add_trigger(trigger, parse=False)#
Add trigger to node.
- 参数:
trigger (
Union[str,Expression])parse (
bool) -- If set, trigger is parsed to create an AST immediately. If not set, trigger is just store as a string in expression and is not parsed.
- calendar_changed(calendar)#
When Flow's calendar is changed, call this method to update time attributes.
- 参数:
calendar (
Calendar) -- The calendar of a Flow.
- check_in_limit_up()#
Check if all
InLimithave enough tokens up along the tree.- 返回:
If all
InLimithave enough tokens, return True.- 返回类型:
bool
- computed_status(immediate)#
Compute node_status of a node from its children. Won't change anything in
node.If
immediateis True, use children's node_status. Ifimmediateis False, compute each child's node_status- 参数:
immediate (
bool)- 返回类型:
- decrement_in_limit(limit_set)#
Release all InLimit up the node tree.
- 参数:
limit_set (
Set[Limit]) -- A set to save changedLimit, to make sure one Limit is decremented only once.
- evaluate_trigger()#
Evaluate trigger expression of this node.
If trigger is not parsed, an AST is created first before the expression is evaluated.
- 返回:
true if trigger is satisfied.
- 返回类型:
bool
备注
该方法仅检查节点自己的触发器是否满足条件,节点是否满足运行条件还取决于父节点是否满足。
resolve_dependencies方法会自顶向下检查各节点触发器。
- classmethod fill_from_dict(d, node, method=SerializationType.Status)#
Fill a
Nodebased object from dictionary.Subclasses of
Nodeshould override this method and callNode.file_from_dictin the override method.
- find_generated_parameter(name)#
Find generated
parameteronly in this node.- 返回类型:
Optional[Parameter]
- find_limit_up(name)#
Find Limit up along the node tree.
- 参数:
name (
str) -- Limit's name- 返回类型:
Optional[Limit]
- find_node(a_path)#
use node path to find a node.
- 参数:
a_path (
str) --type of node path
node3: relative to currently level
/flow1/node1/node2
node1/node2
./node3
../node1/node2
- 返回:
node with node_path or None if not found
- 返回类型:
Optional[Node]
- find_parameter(name)#
Find a
parameteronly in this node, first in user parameters, then in generated parameters.- 返回类型:
Optional[Parameter]
- find_variable(name)#
Find variable (event, meter, parameter) by name.
- free_dependencies(dep_type=None)#
Ignore some type of dependencies in Node. Including:
time
trigger
- 参数:
dep_type (
Optional[Literal['all','time','trigger']]) --dependency type:
all
time
trigger
- classmethod from_dict(d, method=SerializationType.Status)#
Create
Nodebased object from dictionary. Used["class_type"]to determine which class is to be created.- 参数:
d (
Dict)method (
SerializationType)
- 返回:
A
Nodebased object created from dictionary.- 返回类型:
- generated_parameters_only()#
Return a dict containerd generated parameters in this Node
- 返回类型:
Dict[str,Parameter]
备注
Return value is dict of reference of ``Parameter``s, so it's value will be updated automatically.
- increment_in_limit(limit_set)#
Occupy all InLimit up the node tree.
- 参数:
limit_set (
Set[Limit]) -- A set to save changedLimit, to make sure one Limit is incremented only once.
- property node_path: str#
full path from root node, starts with "/" and split each level with "/"
- Type:
str
- parameters()#
Return all parameters accessible to this Node up the node tree.
- 返回类型:
dict[str,Parameter]
- parameters_only()#
Return all parameters only in this Node.
If generated parameter has same name as user parameter, user parameter will be used.
- 返回类型:
Dict[str,Parameter]
- requeue(reset_repeat=True)#
Requeue the node itself, don't affect children nodes.
Requeue operation sets node's status to
NodeStatus.queued
- resolve_dependencies()#
Check all dependencies in the Node, and return True if all dependencies are satisfied.
- 返回:
True if node should be run, or False.
- 返回类型:
bool
- resolve_time_dependencies()#
Check if there has one time dependency which is satisfied.
- 返回类型:
bool
- resume()#
Resume the node.
- set_node_status(node_status)#
Set node status to some status with side effect.
- 参数:
node_status (
NodeStatus) -- Node status, just an enum without any additional data.
- set_node_status_only(node_status)#
Set node status to some status without any side effect.
- 参数:
node_status (
NodeStatus) -- Node status, just an enum without any additional data.
- sink_status_change(node_status)#
Apply the node_status change to all its descendants with side effects.
- sink_status_change_only(node_status)#
Apply the node_status change to all its descendants without doing anything.
Sink status down. This method can only be called in set_state and itself.
- suspend()#
Suspend the node.
Suspended nodes does not run automatically, see
Node.resolve_dependenciesmethod.
- swim_status_change()#
Compute current node's node_status, and swim status change up the tree.
Swim current status up. This method can only be called in handle_status_change and itself.
- update_generated_parameters()#
Update generated parameters for this node.
访问工具#
- class takler.visitor.NodeVisitor#
- class takler.visitor.SimplePrintVisitor#
Print node tree with state.
- class takler.visitor.PrintVisitor(stream, show_parameter=False, show_user_parameter=None, show_generated_parameter=None, show_trigger=False, show_limit=True, show_event=True, show_meter=True, show_repeat=True)#
Print node tree with attributes. Use
show_*options to choose printed attrs.- show_parameter#
print parameters
- show_trigger#
print trigger expressing
- show_limit#
print limits
- show_event#
print events
- show_meter#
print meters
- show_repeat#
print repeat
- takler.visitor.pre_order_travel(root_node, visitor)#