结构

目录

结构#

树形结构#

class takler.core.Bunch(name='', host=None, port=None)#
find_generated_parameter(name)#

Find generated parameter only in this node.

返回类型:

Optional[Parameter]

find_node(a_path)#

Find node from path string.

参数:

a_path (str) -- node path starting with "/".

返回类型:

Optional[Node]

find_path(a_path)#

Find node or node's variable from path string.

参数:

a_path (str) -- node path (/flow1/task1) or variable path (/flow1/task1:event1)

返回类型:

Union[Node, Meter, Event, None]

generated_parameters_only()#

Return a dict containerd generated parameters in this Node

返回类型:

Dict[str, Parameter]

备注

Return value is dict of reference of ``Parameter``s, so it's value will be updated automatically.

get_node_status()#

Calculate node status from all flows in bunch.

返回类型:

NodeStatus

class takler.core.Flow(name)#
find_generated_parameter(name)#

Find generated parameter only in this node.

返回类型:

Optional[Parameter]

find_parent_parameter(name)#

Find a Parameter up along the node tree.

返回类型:

Optional[Parameter]

generated_parameters_only()#

Return a dict containerd generated parameters in this Node

返回类型:

Dict[str, Parameter]

备注

Return value is dict of reference of ``Parameter``s, so it's value will be updated automatically.

get_bunch()#

get Bunch node if node's root is in some bunch.

返回类型:

Bunch

requeue(reset_repeat=True)#

Requeue the node and all its child nodes.

Call Node.requeue first and then requeue its children.

requeue_calendar()#

Requeue calendar with current time.

update_calendar(time)#

Update calendar using given time. Used in scheduler's main loop.

After generated parameters are updated, calendar_changed is called to update time attributes.

参数:

time (datetime)

update_generated_parameters()#

Update generated parameters for this node.

class takler.core.NodeContainer(name)#
add_container(container)#

Add a NodeContainer. If container is a string, a new NodeContainer will be created.

参数:

container (Union[str, NodeContainer])

返回类型:

NodeContainer

add_task(task)#

Add a Task.

Should use a real Task instead of base Task object.

参数:

task (Union[str, Task])

返回类型:

Task

calendar_changed(calendar)#

When Flow's calendar is changed, call this method to update time attributes.

参数:

calendar (Calendar) -- The calendar of a Flow.

computed_status(immediate)#

Compute node_status of a node from its children. Won't change anything in node.

If immediate is True, use children's node_status. If immediate is False, compute each child's node_status

参数:

immediate (bool)

返回类型:

NodeStatus

requeue(reset_repeat=True)#

Requeue the node and all its child nodes.

Call Node.requeue first and then requeue its children.

resolve_dependencies()#

Check all dependencies in the Node, and return True if all dependencies are satisfied.

返回:

True if node should be run, or False.

返回类型:

bool

sink_status_change(node_status)#

Apply the node_status change to all its descendants with side effects.

sink_status_change_only(node_status)#

Apply the node_status change to all its descendants without doing anything.

Sink status down. This method can only be called in set_state and itself.

class takler.core.Task(name)#
after_run()#

If task has been successfully run, change node status to NodeStatus.submitted and handle status change.

before_run()#

Increment try no before actually run the task.

computed_status(immediate)#

The status of a task is its own status.

返回类型:

NodeStatus

do_run()#

Run the task, actually.

Subclasses of Task should reimplement this method to do the real run operation and deal with errors.

返回类型:

bool

classmethod fill_from_dict(d, node, method=SerializationType.Status)#

Fill a Node based object from dictionary.

Subclasses of Node should override this method and call Node.file_from_dict in the override method.

参数:
  • d (Dict)

  • node (Task)

  • method (SerializationType)

返回类型:

Task

find_generated_parameter(name)#

Find generated parameter only in this node.

返回类型:

Optional[Parameter]

generated_parameters_only()#

Return a dict containerd generated parameters in this Node

返回类型:

Dict[str, Parameter]

备注

Return value is dict of reference of ``Parameter``s, so it's value will be updated automatically.

requeue(reset_repeat=True)#

Requeue the node itself, don't affect children nodes.

Requeue operation sets node's status to NodeStatus.queued

resolve_dependencies()#

Check dependencies and run the task if all dependencies are met.

返回:

True if all dependencies are met and the task is run, False otherwise.

返回类型:

bool

run()#

Run the task, set status to NodeStatus.submitted and handle status change.

update_generated_parameters()#

Update generated parameters for this node.

update_limits()#

Update limits according task status, only used when task status is changed.

class takler.core.node.Node(name)#
name#
state#
parent#
children#
user_parameters#
trigger_expression#

触发器表达式,一个节点只有一个触发器表达式。 触发器在添加时默认不解析。在评估触发器时,会自动解析没有被解析的触发器,触发器只会被解析一次。

events#
meters#
limits#
in_limit_manager#
repeat#
times#
add_in_limit(limit_name, node_path=None, tokens=1)#

Add InLimit to this node's InLimitManager.

参数:
  • limit_name (str) -- Limit's name

  • node_path (Optional[str]) -- Limit's reference node path, default is None

  • tokens (int) -- tokens for this node, default is 1.

返回类型:

InLimit

add_limit(name, limit)#

Add a Limit to node. Limits in one Node should not have duplicate names.

参数:
  • name (str) -- limit name.

  • limit (int) -- total tokens for Limit.

返回类型:

Limit

add_parameter(param, value=None)#
返回类型:

Optional[Parameter]

Add ``Parameter``(s) to this node.

If param` is a dict or list, ``value must be None. If param is a str, value must not be None.

TODO: add_parameter([dict])

add_time(time)#

Add a TimeAttribute to Node. Node can have multiply time attributes.

参数:

time (Union[time, str]) -- When to "run" the node.

返回类型:

TimeAttribute

add_trigger(trigger, parse=False)#

Add trigger to node.

参数:
  • trigger (Union[str, Expression])

  • parse (bool) -- If set, trigger is parsed to create an AST immediately. If not set, trigger is just store as a string in expression and is not parsed.

calendar_changed(calendar)#

When Flow's calendar is changed, call this method to update time attributes.

参数:

calendar (Calendar) -- The calendar of a Flow.

check_in_limit_up()#

Check if all InLimit have enough tokens up along the tree.

返回:

If all InLimit have enough tokens, return True.

返回类型:

bool

computed_status(immediate)#

Compute node_status of a node from its children. Won't change anything in node.

If immediate is True, use children's node_status. If immediate is False, compute each child's node_status

参数:

immediate (bool)

返回类型:

NodeStatus

decrement_in_limit(limit_set)#

Release all InLimit up the node tree.

参数:

limit_set (Set[Limit]) -- A set to save changed Limit, to make sure one Limit is decremented only once.

evaluate_trigger()#

Evaluate trigger expression of this node.

If trigger is not parsed, an AST is created first before the expression is evaluated.

返回:

true if trigger is satisfied.

返回类型:

bool

备注

该方法仅检查节点自己的触发器是否满足条件,节点是否满足运行条件还取决于父节点是否满足。

resolve_dependencies 方法会自顶向下检查各节点触发器。

classmethod fill_from_dict(d, node, method=SerializationType.Status)#

Fill a Node based object from dictionary.

Subclasses of Node should override this method and call Node.file_from_dict in the override method.

参数:
  • d (Dict)

  • node (Node)

  • method (SerializationType)

返回类型:

Node

find_generated_parameter(name)#

Find generated parameter only in this node.

返回类型:

Optional[Parameter]

find_limit(name)#

Find Limit in this node.

参数:

name (str) -- Limit's name

返回类型:

Optional[Limit]

find_limit_up(name)#

Find Limit up along the node tree.

参数:

name (str) -- Limit's name

返回类型:

Optional[Limit]

find_node(a_path)#

use node path to find a node.

参数:

a_path (str) --

type of node path

  1. node3: relative to currently level

  2. /flow1/node1/node2

  3. node1/node2

  4. ./node3

  5. ../node1/node2

返回:

node with node_path or None if not found

返回类型:

Optional[Node]

find_parameter(name)#

Find a parameter only in this node, first in user parameters, then in generated parameters.

返回类型:

Optional[Parameter]

find_parent_parameter(name)#

Find a Parameter up along the node tree.

返回类型:

Optional[Parameter]

find_user_parameter(name)#

Find user parameter only in this node.

返回类型:

Optional[Parameter]

find_variable(name)#

Find variable (event, meter, parameter) by name.

参数:

name (str)

返回:

The variable found, or None if not found.

返回类型:

Union[Event, Parameter, Meter, None]

free_dependencies(dep_type=None)#

Ignore some type of dependencies in Node. Including:

  • time

  • trigger

参数:

dep_type (Optional[Literal['all', 'time', 'trigger']]) --

dependency type:

  • all

  • time

  • trigger

classmethod from_dict(d, method=SerializationType.Status)#

Create Node based object from dictionary. Use d["class_type"] to determine which class is to be created.

参数:
  • d (Dict)

  • method (SerializationType)

返回:

A Node based object created from dictionary.

返回类型:

Node

generated_parameters_only()#

Return a dict containerd generated parameters in this Node

返回类型:

Dict[str, Parameter]

备注

Return value is dict of reference of ``Parameter``s, so it's value will be updated automatically.

get_bunch()#

get Bunch node if node's root is in some bunch.

返回类型:

Optional[Bunch]

get_root()#

get root node which is usually a Flow

返回类型:

Node

increment_in_limit(limit_set)#

Occupy all InLimit up the node tree.

参数:

limit_set (Set[Limit]) -- A set to save changed Limit, to make sure one Limit is incremented only once.

property node_path: str#

full path from root node, starts with "/" and split each level with "/"

Type:

str

parameters()#

Return all parameters accessible to this Node up the node tree.

返回类型:

dict[str, Parameter]

parameters_only()#

Return all parameters only in this Node.

If generated parameter has same name as user parameter, user parameter will be used.

返回类型:

Dict[str, Parameter]

requeue(reset_repeat=True)#

Requeue the node itself, don't affect children nodes.

Requeue operation sets node's status to NodeStatus.queued

resolve_dependencies()#

Check all dependencies in the Node, and return True if all dependencies are satisfied.

返回:

True if node should be run, or False.

返回类型:

bool

resolve_time_dependencies()#

Check if there has one time dependency which is satisfied.

返回类型:

bool

resume()#

Resume the node.

set_node_status(node_status)#

Set node status to some status with side effect.

参数:

node_status (NodeStatus) -- Node status, just an enum without any additional data.

set_node_status_only(node_status)#

Set node status to some status without any side effect.

参数:

node_status (NodeStatus) -- Node status, just an enum without any additional data.

sink_status_change(node_status)#

Apply the node_status change to all its descendants with side effects.

sink_status_change_only(node_status)#

Apply the node_status change to all its descendants without doing anything.

Sink status down. This method can only be called in set_state and itself.

suspend()#

Suspend the node.

Suspended nodes does not run automatically, see Node.resolve_dependencies method.

swim_status_change()#

Compute current node's node_status, and swim status change up the tree.

Swim current status up. This method can only be called in handle_status_change and itself.

update_generated_parameters()#

Update generated parameters for this node.

user_parameters_only()#

Return user defined parameters in this Node.

返回类型:

Dict[str, Parameter]

访问工具#

class takler.visitor.NodeVisitor#
class takler.visitor.SimplePrintVisitor#

Print node tree with state.

class takler.visitor.PrintVisitor(stream, show_parameter=False, show_user_parameter=None, show_generated_parameter=None, show_trigger=False, show_limit=True, show_event=True, show_meter=True, show_repeat=True)#

Print node tree with attributes. Use show_* options to choose printed attrs.

show_parameter#

print parameters

show_trigger#

print trigger expressing

show_limit#

print limits

show_event#

print events

show_meter#

print meters

show_repeat#

print repeat

takler.visitor.pre_order_travel(root_node, visitor)#