File size: 54,226 Bytes
5374a2d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 |
import json
import inspect
import threading
from enum import Enum
import networkx as nx
from copy import deepcopy
from networkx import MultiDiGraph
from collections import defaultdict
from pydantic import Field, field_validator, model_validator
from typing import Union, Optional, Tuple, Callable, Dict, List
from functools import wraps
from ..core.logging import logger
from ..core.module import BaseModule
from ..core.base_config import Parameter
from .action_graph import ActionGraph
from ..agents.agent import Agent
from ..utils.utils import generate_dynamic_class_name, make_parent_folder
from ..prompts.workflow.sew_workflow import SEW_WORKFLOW
from ..prompts.workflow.structure_workflow import STRUCTURE_WORKFLOW
from ..prompts.utils import DEFAULT_SYSTEM_PROMPT
# from ..tools.tool import Toolkit, Tool
class WorkFlowNodeState(str, Enum):
"""
Enumeration of possible states for workflow nodes.
This enum defines the lifecycle states of a workflow node:
- PENDING: The node is waiting to be executed
- RUNNING: The node is currently being executed
- COMPLETED: The node has been successfully executed
- FAILED: The node execution has failed
"""
PENDING="pending"
RUNNING="running"
COMPLETED = "completed"
FAILED = "failed"
class WorkFlowNode(BaseModule):
"""
Represents a node in a workflow graph.
A workflow node represents a specific task in the workflow with its
inputs, outputs, and execution metadata. It can have associated agents
that execute the task and track its execution status.
Attributes:
name: A unique identifier for the task within a workflow
description: Detailed description of what the task does
inputs: List of input parameters required by the task
outputs: List of output parameters produced by the task
reason: Optional justification for this task's existence
agents: Optional list of agents that can execute this task
action_graph: Optional graph of actions to execute this task
status: Current execution state of the task
"""
name: str # A short name of the task. Should be unique in a single workflow
description: str # A detailed description of the task
inputs: List[Parameter] # inputs for the task
outputs: List[Parameter] # outputs of the task
reason: Optional[str] = None
agents: Optional[List[Union[str, dict]]] = None
action_graph: Optional[ActionGraph] = None
status: Optional[WorkFlowNodeState] = WorkFlowNodeState.PENDING
@field_validator('agents', mode="before")
@classmethod
def check_agent_format(cls, agents: List[Union[str, dict, Agent]]):
if agents is None:
return None
validated_agents = []
for agent in agents:
if isinstance(agent, str):
validated_agents.append(agent)
elif isinstance(agent, Agent):
validated_agents.append(agent.get_config())
elif isinstance(agent, dict):
assert "name" in agent and "description" in agent, \
"must provide the name and description of an agent when specifying an agent with a dict."
validated_agents.append(agent)
return validated_agents
@model_validator(mode="after")
@classmethod
def check_action_graph(cls, instance: "WorkFlowNode"):
"""
Validates that:
1. All required parameters of execute/async_execute methods are included in inputs
2. The execute/async_execute methods return dictionaries
3. All output parameters are present in the returned dictionaries
"""
if instance.action_graph is None:
return instance
# Get input parameter names from the node's input parameters
input_param_names = {param.name for param in instance.inputs if param.required}
output_param_names = {param.name for param in instance.outputs if param.required}
def check_method_signature(method, method_name):
"""Helper function to check method signature against input parameters"""
method_source = inspect.getsource(method)
if "NotImplementedError" in method_source:
return
# Get method signature
method_sig = inspect.signature(method)
# Only consider parameters other than self, *args, and **kwargs as required
required_params = []
for name, param in method_sig.parameters.items():
if name != 'self' and param.kind not in (param.VAR_POSITIONAL, param.VAR_KEYWORD):
if param.default == param.empty: # This is a required parameter
required_params.append(name)
# Check if all required parameters are in inputs
missing_inputs = set(required_params) - input_param_names
if missing_inputs:
raise ValueError(f"`{method_name}` method requires parameters that are not in `inputs`: {missing_inputs}")
# Check execute method
check_method_signature(instance.action_graph.execute, "execute")
# Check async_execute method if it exists
check_method_signature(instance.action_graph.async_execute, "async_execute")
# Monkey-patch execute and async_execute to check returns at runtime
original_execute = instance.action_graph.execute
original_async_execute = instance.action_graph.async_execute
def check_method_return(method_name, result):
if not isinstance(result, dict):
raise TypeError(f"{method_name} must return a dictionary, got {type(result)}")
# Check if all output keys are in the result
missing_outputs = output_param_names - set(result.keys())
if missing_outputs:
raise ValueError(f"{method_name} return value is missing required outputs: {missing_outputs}")
return result
@wraps(original_execute)
def patched_execute(*args, **kwargs):
result = original_execute(*args, **kwargs)
return check_method_return("execute", result)
@wraps(original_async_execute)
async def patched_async_execute(*args, **kwargs):
result = await original_async_execute(*args, **kwargs)
return check_method_return("async_execute", result)
# Replace the methods with our patched versions
instance.action_graph.execute = patched_execute
instance.action_graph.async_execute = patched_async_execute
return instance
def to_dict(self, exclude_none: bool = True, ignore: List[str] = [], **kwargs) -> dict:
data = super().to_dict(exclude_none=exclude_none, ignore=ignore, **kwargs)
for agent in data.get("agents", []):
# for CustomizeAgent
if isinstance(agent, dict) and "parse_func" in agent and isinstance(agent["parse_func"], Callable):
agent["parse_func"] = agent["parse_func"].__name__
return data
def get_agents(self) -> List[str]:
"""
Return the names of all agents associated with this node.
"""
agent_names = []
if not self.agents:
return []
for agent in self.agents:
if isinstance(agent, str):
agent_names.append(agent)
elif isinstance(agent, dict):
agent_names.append(agent["name"])
else:
raise TypeError(f"{type(agent)} is an unknown agent type!")
return agent_names
def set_agents(self, agents: List[Union[str, dict]]):
self.agents = agents
def get_status(self) -> WorkFlowNodeState:
return self.status
def set_status(self, state: WorkFlowNodeState):
self.status = state
@property
def is_complete(self) -> bool:
return self.status == WorkFlowNodeState.COMPLETED
def get_task_info(self) -> str:
def format_parameters(params: List[Parameter]) -> str:
if not params:
return "None"
return "\n".join(f" - {param.name} ({param.type}): {param.description}" for param in params)
desc = (
f"Name: {self.name}\n"
f"Description: {self.description}\n"
f"Inputs:\n{format_parameters(self.inputs)}\n"
f"Outputs:\n{format_parameters(self.outputs)}\n"
)
return desc
def get_input_names(self, required: bool = False) -> List[str]:
if required:
return [param.name for param in self.inputs if param.required]
else:
return [param.name for param in self.inputs]
def get_output_names(self, required: bool = False) -> List[str]:
if required:
return [param.name for param in self.outputs if param.required]
else:
return [param.name for param in self.outputs]
class WorkFlowEdge(BaseModule):
"""
Represents a directed edge in a workflow graph.
Workflow edges connect tasks (nodes) in the workflow graph, establishing
execution dependencies and data flow relationships. Each edge has a source
node, target node, and optional priority to influence execution order.
Attributes:
source: Name of the source node (where the edge starts)
target: Name of the target node (where the edge ends)
priority: Numeric priority value for this edge (higher means higher priority)
"""
source: str
target: str
priority: int = 0
def __init__(self, edge_tuple: Optional[tuple]=(), **kwargs):
"""
Initialize a WorkFlowEdge instance with either a tuple or keyword arguments.
Parameters:
----------
edge_tuple (tuple): a tuple containing the edge attributes in the format: (source, target, priority[optional]).
- source (str): the source of the edge.
- target (str): the target of the edge.
- priority (int, optional): The priority of the edge. Defaults to 0 if not provided.
kwargs (dict): Key-value pairs specifying the edge attributes. These values will override those provided in `args` if both are supplied.
Notes:
----------
- Attributes provided via `kwargs` take precedence over those from the `args` tuple.
- If `args` is empty or not provided, only `kwargs` will be used to initialize the instance.
"""
data = self.init_from_tuple(edge_tuple)
data.update(kwargs)
super().__init__(**data)
def init_from_tuple(self, edge_tuple: tuple) -> dict:
if not edge_tuple:
return {}
keys = ["source", "target", "priority"]
data = {k: v for k, v in zip(keys, edge_tuple)}
return data
def compare_attrs(self):
return (self.source, self.target, self.priority)
def __eq__(self, other: "WorkFlowEdge"):
if not isinstance(other, WorkFlowEdge):
return NotImplemented
self_compare_attrs = self.compare_attrs()
other_compare_attrs = other.compare_attrs()
return all(self_attr==other_attr for self_attr, other_attr in zip(self_compare_attrs, other_compare_attrs))
def __hash__(self):
return hash(self.compare_attrs())
class WorkFlowGraph(BaseModule):
"""
Represents a complete workflow as a directed graph.
WorkFlowGraph models a workflow as a directed graph where nodes represent tasks
and edges represent dependencies and data flow between tasks. It provides
methods for constructing, validating, traversing, and executing workflows.
The graph structure supports advanced features like detecting and handling loops,
determining execution order, and tracking execution state.
Attributes:
goal: The high-level objective of this workflow
nodes: List of WorkFlowNode instances representing tasks
edges: List of WorkFlowEdge instances representing dependencies
graph: Internal NetworkX MultiDiGraph or another WorkFlowGraph
"""
goal: str
nodes: Optional[List[WorkFlowNode]] = []
edges: Optional[List[WorkFlowEdge]] = []
graph: Optional[Union[MultiDiGraph, "WorkFlowGraph"]] = Field(default=None, exclude=True)
def init_module(self):
self._lock = threading.Lock()
if not self.graph:
self._init_from_nodes_and_edges(self.nodes, self.edges)
elif isinstance(self.graph, MultiDiGraph):
self._init_from_multidigraph(self.graph, self.nodes, self.edges)
elif isinstance(self.graph, WorkFlowGraph):
self._init_from_workflowgraph(self.graph, self.nodes, self.edges)
else:
raise TypeError(f"{type(self.graph)} is an unknown type for graph. Supported types: [MultiDiGraph, WorkFlowGraph]")
self._validate_workflow_structure()
self.update_graph()
def update_graph(self):
# call this function when modifying nodes or edges!
self._loops = self._find_all_loops()
def _init_from_nodes_and_edges(self, nodes: List[WorkFlowNode] = [], edges: List[WorkFlowEdge] = []):
"""
Initialize the WorkFlowGraph from a set of nodes and edges.
"""
if edges and not nodes:
raise ValueError("edges cannot be passed without nodes or a graph")
self.nodes = []
self.edges = []
self.graph = MultiDiGraph()
self.add_nodes(*nodes, update_graph=False)
self.add_edges(*edges, update_graph=False)
def _init_from_multidigraph(self, graph: MultiDiGraph, nodes: List[WorkFlowNode] = [], edges: List[WorkFlowEdge] = []):
graph_nodes = [deepcopy(node_attrs["ref"]) for _, node_attrs in graph.nodes(data=True)]
graph_edges = [deepcopy(edge_attrs["ref"]) for *_, edge_attrs in graph.edges(data=True)]
graph_nodes = self.merge_nodes(graph_nodes, nodes)
graph_edges = self.merge_edges(graph_edges, edges)
self._init_from_nodes_and_edges(nodes=graph_nodes, edges=graph_edges)
def _init_from_workflowgraph(self, graph: "WorkFlowGraph", nodes: List[WorkFlowNode] = [], edges: List[WorkFlowEdge] = []):
graph_nodes = deepcopy(graph.nodes)
graph_edges = deepcopy(graph.edges)
graph_nodes = self.merge_nodes(graph_nodes, nodes)
graph_edges = self.merge_edges(graph_edges, edges)
self._init_from_nodes_and_edges(nodes=graph_nodes, edges=graph_edges)
def _validate_workflow_structure(self):
isolated_nodes = list(nx.isolates(self.graph))
if len(self.graph.nodes) > 1 and isolated_nodes:
logger.warning(f"The workflow contains isolated nodes: {isolated_nodes}")
initial_nodes = self.find_initial_nodes()
if len(self.graph.nodes) > 1 and not initial_nodes:
error_message = "There are no initial nodes in the workflow!"
logger.error(error_message)
raise ValueError(error_message)
end_nodes = self.find_end_nodes()
if len(self.graph.nodes) > 1 and not end_nodes:
logger.warning("There are no end nodes in the workflow")
def find_initial_nodes(self) -> List[str]:
initial_nodes = [node for node, in_degree in self.graph.in_degree() if in_degree==0]
return initial_nodes
def find_end_nodes(self) -> List[str]:
end_nodes = [node for node, out_degree in self.graph.out_degree() if out_degree==0]
return end_nodes
def _find_loops(self, start_node: Union[str, WorkFlowNode]) -> Dict[str, list]:
if isinstance(start_node, str):
start_node = self.get_node(node_name=start_node)
start_node_name = start_node.name
loops = defaultdict(list)
def dfs(current_node_name: str, path: List[str]):
if current_node_name in path:
# a loop exists
loops[current_node_name].append(path[path.index(current_node_name):])
return
path.append(current_node_name)
children = self.get_node_children(current_node_name)
if children:
for child in children:
dfs(child, path)
path.pop()
dfs(start_node_name, [])
return loops
def _find_all_loops(self) -> Dict[str, list]:
initial_nodes = self.find_initial_nodes()
if not initial_nodes:
return {}
def contain_loop(loops: List[List[str]], new_loop: List[str]):
if not loops:
return False
return frozenset(new_loop) in [frozenset(loop) for loop in loops]
# merge loops from different nodes
all_loops = defaultdict(list)
for initial_node in initial_nodes:
loops_from_init_node = self._find_loops(initial_node)
for start_node, loops in loops_from_init_node.items():
for loop in loops:
if not contain_loop(all_loops[start_node], loop):
# 合并从相同的start_node开始的环
all_loops[start_node].append(loop)
if len(all_loops) <= 1:
return all_loops
# merge same loops with different starts (因为同一个环可能在之前的遍历中有不同的start_node)
loop_to_start_nodes = defaultdict(dict)
for start_node, loops in all_loops.items():
for loop in loops:
normalized_loop = frozenset(loop)
loop_to_start_nodes[normalized_loop][start_node] = loop
all_paths: List[List[str]] = []
# 用深度遍历来判断一个环中的开始节点
for initial_node in initial_nodes:
all_paths.extend(self.get_all_paths_from_node(initial_node))
def rank_nodes(nodes: List[str]):
if len(nodes) == 1:
return nodes[0]
path_contain_nodes = None
for path in all_paths:
if all(node in path for node in nodes):
path_contain_nodes = path
break
if path_contain_nodes is None:
raise ValueError(f"Couldn't find a path that contain nodes: {nodes}")
node_indices = [path.index(node) for node in nodes]
return nodes[node_indices.index(min(node_indices))]
all_loops = defaultdict(list)
for start_node_loop in loop_to_start_nodes.values():
first_node = rank_nodes(list(start_node_loop.keys()))
all_loops[first_node].append(start_node_loop[first_node])
return all_loops
def add_node(self, node: WorkFlowNode, update_graph: bool = True, **kwargs):
if not isinstance(node, WorkFlowNode):
raise ValueError(f"{node} is not a valid WorkFlowNode instance!")
if self.node_exists(node.name):
raise ValueError(f"Duplicate node names are not allowed! Found duplicate node name: {node.name}")
self.nodes.append(node)
self.graph.add_node(node.name, ref=node)
if update_graph:
self.update_graph()
def add_edge(self, edge: WorkFlowEdge, update_graph: bool = True, **kwargs):
if not isinstance(edge, WorkFlowEdge):
raise ValueError(f"{edge} is not a valid WorkFlowEdge instance!")
for attr, node_name in zip(["source", "target"], [edge.source, edge.target]):
if not self.node_exists(node_name):
raise ValueError(f"{attr} node {node_name} does not exists!")
if self.edge_exists(edge):
raise ValueError(f"Duplicate edges are not allowed! Found duplicate edges: {edge}")
# check the inputs and outputs of the edge
source_node = self.get_node(edge.source)
target_node = self.get_node(edge.target)
source_output_names = set(param.name for param in source_node.outputs)
target_input_names = set(param.name for param in target_node.inputs)
if len(source_output_names & target_input_names) == 0:
logger.warning(f"The edge ({edge.source}, {edge.target}) has no matching inputs and outputs! You may need to check the inputs and outputs of the nodes to ensure that at least one input of the target node is the output of the source node.")
self.edges.append(edge)
self.graph.add_edge(edge.source, edge.target, ref=edge)
if update_graph:
self.update_graph()
def add_nodes(self, *nodes: WorkFlowNode, update_graph: bool = True, **kwargs):
nodes: list = list(nodes)
nodes.extend([kwargs.pop(var) for var in ["node", "nodes"] if var in kwargs])
for node in nodes:
if isinstance(node, (tuple, list)):
for n in node:
self.add_node(n, update_graph=update_graph, **kwargs)
else:
self.add_node(node, update_graph=update_graph, **kwargs)
def add_edges(self, *edges: WorkFlowEdge, update_graph: bool = True, **kwargs):
edges: list = list(edges)
edges.extend([kwargs.pop(var) for var in ["edge", "edges"] if var in kwargs])
for edge in edges:
if isinstance(edge, (tuple, list)):
for e in edge:
self.add_edge(e, update_graph=update_graph, **kwargs)
else:
self.add_edge(edge, update_graph=update_graph, **kwargs)
def node_exists(self, node: Union[str, WorkFlowNode]) -> bool:
if isinstance(node, str):
return node in self.graph.nodes
elif isinstance(node, WorkFlowNode):
return node.name in self.graph.nodes
else:
raise TypeError("node must be a str or WorkFlowNode instance")
def _edge_exists(self, source: str, target: str, **attr_filters) -> bool:
if not self.graph.has_edge(source, target):
return False
if attr_filters:
for key, value in attr_filters.items():
if key not in self.graph[source][target] or self.graph[source][target][key] != value:
return False
return True
def edge_exists(self, edge: Union[Tuple[str, str], WorkFlowEdge], **attr_filters) -> bool:
"""
Check whether an edge exists in the workflow graph. The input `edge` can either be a tuple or a WorkFlowEdge instance.
1. If a tuple is passed, it should be (source, target). The function will only determin whether there is an edge between the source node and the target node.
If attr_filters is passed, they will also be used to match the edge attributes.
2. If a WorkFlowEdge is passed, it will use the __eq__ method in WorkFlowEdge to determine
Parameters:
----------
edge (Union[Tuple[str, str], WorkFlowEdge]):
- If a tuple is provided, it should be in the format `(source, target)`.
The method will check whether there is an edge between the source and target nodes.
If `attr_filters` are provided, they will be used to match edge attributes.
- If a WorkFlowEdge instance is provided, the method will use the `__eq__` method in WorkFlowEdge
to determine whether the edge exists.
attr_filters (dict, optional):
Additional attributes to filter edges when `edge` is a tuple.
Returns:
-------
bool: True if the edge exists and matches the filters (if provided); False otherwise.
"""
if isinstance(edge, tuple):
assert len(edge) == 2, "edge must be a tuple (source, target) or WorkFlowEdge instance"
source, target = edge
return self._edge_exists(source, target, **attr_filters)
elif isinstance(edge, WorkFlowEdge):
return edge in self.edges
else:
raise TypeError("edge must be a tuple (source, target) or WorkFlowEdge instance")
def is_loop_start(self, node: Union[str, WorkFlowNode]) -> bool:
if len(self._loops) == 0:
return False
node_name = node if isinstance(node, str) else node.name
return node_name in self._loops
def is_loop_end(self, node: Union[str, WorkFlowNode]) -> bool:
if len(self._loops) == 0:
return False
loop_end_nodes = set()
node_name = node if isinstance(node, str) else node.name
for loops in self._loops.values():
loop_end_nodes.update([loop[-1] for loop in loops])
return node_name in loop_end_nodes
def find_loops_with_start_and_end(self, start_node: Union[str, WorkFlowNode], end_node: Union[str, WorkFlowNode]) -> List[List[str]]:
if len(self._loops) == 0:
return []
start_node_name = start_node if isinstance(start_node, str) else start_node.name
end_node_name = end_node if isinstance(end_node, str) else end_node.name
if start_node_name not in self._loops:
return []
target = []
for loop in self._loops[start_node_name]:
if loop[-1] == end_node_name:
target.append(loop)
return target
def merge_nodes(self, nodes: List[WorkFlowNode], new_nodes: List[WorkFlowNode]):
node_names = {node.name for node in nodes}
for node in new_nodes:
if node.name in node_names:
continue
nodes.append(node)
return nodes
def merge_edges(self, edges: List[WorkFlowEdge], new_edges: List[WorkFlowEdge]):
for edge in new_edges:
if edge in edges:
continue
edges.append(edge)
return edges
def list_nodes(self) -> List[str]:
"""
return the names of all nodes
"""
return [node.name for node in self.nodes]
def get_node(self, node_name: str) -> WorkFlowNode:
"""
return a WorkFlowNode instance based on its name.
"""
if not self.node_exists(node=node_name):
raise KeyError(f"{node_name} is an invalid node name. Currently available node names: {self.list_nodes()}")
return self.graph.nodes[node_name]["ref"]
def get_node_status(self, node: Union[str, WorkFlowNode]) -> WorkFlowNodeState:
if isinstance(node, str):
node = self.get_node(node_name=node)
return node.get_status()
@property
def is_complete(self):
# node_complete_list = [node.is_complete for node in self.nodes]
leaf_nodes = [self.get_node(name) for name in self.find_end_nodes()]
node_complete_list = [node.is_complete for node in leaf_nodes]
if len(node_complete_list) == 0:
return True
if all(node_complete_list):
return True
return False
def reset_graph(self):
"""
set the status of all nodes to pending
"""
for node in self.nodes:
node.set_status(WorkFlowNodeState.PENDING)
def set_node_status(self, node: Union[str, WorkFlowNode], new_state: WorkFlowNodeState) -> bool:
"""
Update the state of a specific node.
Args:
node (Union[str, WorkFlowNode]): The name of a node or the node instance.
new_state (WorkFlowNodeState): The new state to set.
Returns:
bool: True if the state was updated successfully, False otherwise.
"""
flag = False
try:
if isinstance(node, str):
node = self.get_node(node_name=node)
node.set_status(new_state)
flag = True
except Exception as e:
raise ValueError(f"An error occurs when setting node status: {e}")
return flag
def pending(self, node: Union[str, WorkFlowNode]) -> bool:
return self.set_node_status(node=node, new_state=WorkFlowNodeState.PENDING)
def running(self, node: Union[str, WorkFlowNode]) -> bool:
return self.set_node_status(node=node, new_state=WorkFlowNodeState.RUNNING)
def completed(self, node: Union[str, WorkFlowNode]) -> bool:
return self.set_node_status(node=node, new_state=WorkFlowNodeState.COMPLETED)
def failed(self, node: Union[str, WorkFlowNode]) -> bool:
return self.set_node_status(node=node, new_state=WorkFlowNodeState.FAILED)
def get_node_children(self, node: Union[str, WorkFlowNode]) -> List[str]:
node_name = node if isinstance(node, str) else node.name
if not self.node_exists(node=node):
raise ValueError(f"Node `{node_name}` does not exists!")
children = list(self.graph.successors(node_name))
return children
def get_node_predecessors(self, node: Union[str, WorkFlowNode]) -> List[str]:
node_name = node if isinstance(node, str) else node.name
if not self.node_exists(node=node):
raise ValueError(f"Node `{node_name}` does not exists!")
predecessors = list(self.graph.predecessors(node_name))
return predecessors
def get_uncomplete_initial_nodes(self) -> List[str]:
initial_nodes = self.find_initial_nodes()
are_initial_nodes_complete = [self.get_node(node_name).is_complete for node_name in initial_nodes]
uncomplete_initial_nodes = []
for node_name, is_complete in zip(initial_nodes, are_initial_nodes_complete):
if not is_complete:
uncomplete_initial_nodes.append(node_name)
return uncomplete_initial_nodes
def get_all_paths_from_node(self, start_node: Union[str, WorkFlowNode]) -> List[List[str]]:
if isinstance(start_node, str):
start_node = self.get_node(node_name=start_node)
start_node_name = start_node.name
all_paths = []
visited = set() # handle loop in the graph
def dfs(current_node_name: str, path: List[str]):
if current_node_name in visited:
# 如果一个loop的end node只有指向loop的start node的边,那么添加这条路径
if path and len(self.get_node_children(path[-1])) == 1:
all_paths.append(path.copy())
return
path.append(current_node_name)
visited.add(current_node_name)
children = self.get_node_children(current_node_name)
if not children:
all_paths.append(path.copy())
else:
for child in children:
dfs(child, path)
path.pop()
visited.remove(current_node_name)
dfs(start_node_name, [])
return all_paths
def find_completed_leaf_nodes(self, start_node: Union[str, WorkFlowNode]) -> List[str]:
if isinstance(start_node, str):
start_node = self.get_node(node_name=start_node)
start_node_name = start_node.name
paths_starting_from_node = self.get_all_paths_from_node(start_node=start_node_name)
last_completed_nodes = []
for path in paths_starting_from_node:
if not path:
continue
completed_node = None
for path_node in path:
if self.get_node(path_node).is_complete:
completed_node = path_node
else:
break
if completed_node and completed_node not in last_completed_nodes:
last_completed_nodes.append(completed_node)
last_completed_nodes = last_completed_nodes[::-1]
return last_completed_nodes
def find_completed_leaf_nodes_start_from_initial_nodes(self) -> List[str]:
initial_nodes = self.find_initial_nodes()
completed_leaf_nodes = []
for initial_node in initial_nodes:
for complete_node in self.find_completed_leaf_nodes(start_node=initial_node):
if complete_node not in completed_leaf_nodes:
completed_leaf_nodes.append(complete_node)
return completed_leaf_nodes
def get_all_children_nodes(self, nodes: List[Union[str, WorkFlowNode]]) -> List[str]:
node_names = [node if isinstance(node, str) else node.name for node in nodes]
children_nodes = []
for node_name in node_names:
for child in self.get_node_children(node_name):
if child not in children_nodes:
children_nodes.append(child)
return children_nodes
def filter_completed_nodes(self, nodes: List[Union[str, WorkFlowNode]]) -> List[str]:
"""
remove completed nodes from `nodes`
"""
node_names = [node if isinstance(node, str) else node.name for node in nodes]
uncompleted_nodes = []
for node_name in node_names:
if self.get_node(node_name).is_complete:
continue
uncompleted_nodes.append(node_name)
return uncompleted_nodes
def get_candidate_children_nodes(self, completed_nodes: List[Union[str, WorkFlowNode]]) -> List[str]:
"""
Return the next set of possible tasks to execute. If there are no loops in the graph, consider only the uncompleted children.
If there exists loops, also consider the previous completed tasks.
Args:
completed_nodes (List[Union[str, WorkFlowNode]]): A list of completed nodes.
Returns:
List[str]: List of node names that are candidates for execution.
"""
node_names = [node if isinstance(node, str) else node.name for node in completed_nodes]
has_loop = (len(self._loops) > 0)
if has_loop:
# if there exists loops, we need to check the completed nodes and their children nodes
uncompleted_children_nodes = []
for node_name in node_names:
children_nodes = self.get_all_children_nodes(nodes=[node_name])
if self.is_loop_end(node=node_name):
current_uncompleted_children_nodes = []
for child in children_nodes:
if self.is_loop_start(node=child):
# node_name是一个环的结束的时候,如果它的子节点是环的开始,那么无论它是否completed,都添加到下一步可执行的操作
current_uncompleted_children_nodes.append(child)
else:
# node_name是环的结束,但是子节点不是环的开始时,需要检查child是否已经completed,只添加未完成的任务
current_uncompleted_children_nodes.extend(self.filter_completed_nodes(nodes=[child]))
else:
current_uncompleted_children_nodes = self.filter_completed_nodes(nodes=children_nodes)
for child in current_uncompleted_children_nodes:
if child not in uncompleted_children_nodes:
uncompleted_children_nodes.append(child)
else:
# 不存在环的时候直接得到所有的子节点,并去掉其中已完成的部分
children_nodes = self.get_all_children_nodes(nodes=node_names)
uncompleted_children_nodes = self.filter_completed_nodes(nodes=children_nodes)
return uncompleted_children_nodes
def are_dependencies_complete(self, node_name: str) -> bool:
"""
Check if all predecessors for a node are complete.
Args:
node_name (str): The name of the task/node to check.
Returns:
bool: True if all predecessors are complete, False otherwise.
"""
has_loop = (len(self._loops) > 0)
predecessors = self.get_node_predecessors(node=node_name)
if has_loop and self.is_loop_start(node=node_name):
flag = True
for pre in predecessors:
if self.is_loop_end(pre):
pass
else:
flag &= self.get_node(pre).is_complete
else:
flag = all(self.get_node(pre).is_complete for pre in predecessors)
return flag
def filter_nodes_with_uncompleted_predecessors(self, nodes: List[Union[str, WorkFlowNode]]) -> List[str]:
node_names = [node if isinstance(node, str) else node.name for node in nodes]
nodes_with_completed_predecessors = []
for node_name in node_names:
if self.are_dependencies_complete(node_name=node_name):
nodes_with_completed_predecessors.append(node_name)
return nodes_with_completed_predecessors
def get_next_candidate_nodes(self) -> List[str]:
uncomplete_initial_nodes = self.get_uncomplete_initial_nodes()
if len(uncomplete_initial_nodes) > 0:
return uncomplete_initial_nodes
# find the last completed nodes in all paths starting from initial nodes.
completed_leaf_nodes = self.find_completed_leaf_nodes_start_from_initial_nodes()
# obtain children nodes of last completed nodes which are uncompleted (consider previous completed tasks if there exists loops)
# children_nodes = self.get_all_children_nodes(nodes=completed_leaf_nodes)
# uncompleted_children_nodes = self.filter_completed_nodes(nodes=children_nodes)
candidate_children_nodes = self.get_candidate_children_nodes(completed_nodes=completed_leaf_nodes)
# check whether all the predecessors are completed
# children_nodes_with_complete_predecessors = self.filter_nodes_with_uncompleted_predecessors(uncompleted_children_nodes)
children_nodes_with_complete_predecessors = self.filter_nodes_with_uncompleted_predecessors(candidate_children_nodes)
return children_nodes_with_complete_predecessors
def next(self) -> List[WorkFlowNode]:
if self.is_complete:
return []
candidate_node_names = self.get_next_candidate_nodes()
candidate_tasks = [self.get_node(node_name=node_name) for node_name in candidate_node_names]
return candidate_tasks
def step(self, source_node: Union[str, WorkFlowNode], target_node: Union[str, WorkFlowNode]):
if source_node is None:
self.running(target_node)
return
source_node_name = source_node if isinstance(source_node, str) else source_node.name
target_node_name = target_node if isinstance(target_node, str) else target_node.name
source_node_status = self.get_node_status(source_node_name)
if source_node_status != WorkFlowNodeState.COMPLETED:
raise ValueError(f"The state of `source_node` should be WorkFlowNodeState.COMPLETED, but found {source_node_status}")
# set the state of `target_node` to WorkFlowNodeState.RUNNING
if self.is_loop_end(source_node_name) and self.is_loop_start(target_node_name):
loops = self.find_loops_with_start_and_end(
start_node=target_node_name, end_node=source_node_name
)
loop_nodes = set(sum(loops, []))
for loop_node in loop_nodes:
self.pending(node=loop_node)
if not self.edge_exists(edge=(source_node_name, target_node_name)):
# the execution doesn't follow an edge means re-executing a previous subtask due to errors or incomplete output
# find a path that contains both source_node and target node and set them as "pending"
all_paths = self.get_all_paths_from_node(start_node=target_node_name)
for path in all_paths:
if source_node_name in path:
for node_name in path:
self.pending(node=node_name)
self.running(node=target_node_name)
def get_node_description(self, node: Union[str, WorkFlowNode]) -> str:
if isinstance(node, str):
node = self.get_node(node_name=node)
def format_parameters(params: List[Parameter]) -> str:
if not params:
return " - None"
# return "\n".join(f" - {param.name} ({param.type}): {param.description}" for param in params)
return "\n".join(f" - {param.name} ({param.type})" for param in params)
def format_agents(agent_names: List[str]) -> str:
if not agent_names:
return "None"
return "\n".join(f" - {name}" for name in agent_names)
def format_action_graph(action_graph: ActionGraph) -> str:
if action_graph is None:
return " - None"
return type(action_graph).__name__
desc = (
f"Name: {node.name}\n"
# f"Description: {node.description}\n"
f"Inputs:\n{format_parameters(node.inputs)}\n"
f"Outputs:\n{format_parameters(node.outputs)}\n"
f"Agents:\n{format_agents(node.get_agents())}\n"
f"Action Graph:\n{format_action_graph(node.action_graph)}"
)
return desc
def display(self):
"""
Display the workflow graph with node and edge attributes.
Nodes are colored based on their status.
"""
import matplotlib.pyplot as plt
# Define colors for node statuses
status_colors = {
WorkFlowNodeState.PENDING: 'lightgray',
WorkFlowNodeState.RUNNING: 'orange',
WorkFlowNodeState.COMPLETED: 'green',
WorkFlowNodeState.FAILED: 'red'
}
if not self.graph.nodes:
print("Graph is empty. No nodes to display.")
return
# Get node colors based on their statuses
node_colors = [status_colors.get(self.get_node_status(node), 'lightgray') for node in self.graph.nodes]
# Prepare node labels with additional information
node_labels = {node: self.get_node_description(data["ref"]) for node, data in self.graph.nodes(data=True)}
# Draw the graph
# pos = nx.shell_layout(self.graph)
if len(self.graph.nodes) == 1:
single_node = list(self.graph.nodes)[0]
pos = {single_node: (0, 0)} # Place the single node at the center
else:
pos = nx.shell_layout(self.graph)
plt.figure(figsize=(12, 8))
nx.draw(
self.graph, pos, with_labels=False, node_color=node_colors, edge_color='black',
node_size=1500, font_size=8, font_color='black', font_weight='bold'
)
if len(self.graph.nodes) == 1:
for node, (x, y) in pos.items():
plt.text(x+0.005, y, node_labels[node], ha='left', va='center', fontsize=9, bbox=dict(facecolor='white', alpha=0.7))
else:
# Draw node labels next to the nodes (left-aligned)
# text_offsets = {node: (pos[node][0]-0.2, pos[node][1]-0.22) for node in self.graph.nodes}
y_positions = [y for _, y in pos.values()]
y_min, y_max = min(y_positions), max(y_positions)
lower_third_boundary = y_min + (y_max - y_min) / 3
# Adjust text offsets based on node position in the graph
text_offsets = {}
for node, (x, y) in pos.items():
if y < lower_third_boundary: # If in the lower third, display label above the node
text_offsets[node] = (x-0.2, y + 0.23)
else: # Otherwise, display label below the node
text_offsets[node] = (x-0.2, y - 0.23)
for node, (x, y) in text_offsets.items():
plt.text(x, y, node_labels[node], ha='left', va='center', fontsize=9, bbox=dict(facecolor='white', alpha=0.7))
# Draw edge labels for priorities
edge_labels = nx.get_edge_attributes(self.graph, 'priority')
nx.draw_networkx_edge_labels(self.graph, pos, edge_labels=edge_labels)
# Add a legend to show node status colors
legend_elements = [
plt.Line2D([0], [0], marker='o', color='w', label=status.name, markersize=10, markerfacecolor=color)
for status, color in status_colors.items()
]
plt.legend(handles=legend_elements, title="Workflow Node Status", loc='upper left', fontsize='medium')
plt.title("Workflow Graph")
plt.show()
def get_workflow_description(self) -> str:
def format_param_requirement(required: bool):
return "required" if required else "optional"
def format_parameters(params: List[Parameter]) -> str:
if not params:
return "None"
return "\n".join(
f" - {param.name} ({param.type}, {format_param_requirement(param.required)}): "
f"{param.description}" for param in params
)
subtask_texts = []
for node in self.nodes:
text = (
f"Task Name: {node.name}\n"
f"Description: {node.description}\n"
f"Inputs:\n{format_parameters(node.inputs)}\n"
f"Outputs:\n{format_parameters(node.outputs)}"
)
subtask_texts.append(text)
workflow_desc = "\n\n".join(subtask_texts)
return workflow_desc
def _infer_edges_from_nodes(self, nodes: List[WorkFlowNode]) -> List[WorkFlowEdge]:
if not nodes:
return []
edges: List[WorkFlowEdge] = []
for node in nodes:
for another_node in nodes:
if node.name == another_node.name:
continue
node_output_params = [param.name for param in node.outputs]
another_node_input_params = [param.name for param in another_node.inputs]
if any([param in another_node_input_params for param in node_output_params]):
edges.append(WorkFlowEdge(edge_tuple=(node.name, another_node.name)))
return edges
def get_config(self) -> dict:
"""
Get a dictionary containing all necessary configuration to recreate this workflow graph.
Returns:
dict: A configuration dictionary that can be used to initialize a new WorkFlowGraph instance
with the same properties as this one.
"""
config = self.to_dict()
config.pop("graph", None)
return config
class SequentialWorkFlowGraph(WorkFlowGraph):
"""
A linear workflow graph with a single path from start to end.
Args:
goal (str): The goal of the workflow.
tasks (List[dict]): A list of tasks with their descriptions and inputs. Each task should have the following format:
{
"name": str,
"description": str,
"inputs": [{"name": str, "type": str, "required": bool, "description": str}, ...],
"outputs": [{"name": str, "type": str, "required": bool, "description": str}, ...],
"prompt": str,
"prompt_template": PromptTemplate,
"system_prompt" (optional): str, default is DEFAULT_SYSTEM_PROMPT,
"output_parser" (optional): Type[ActionOutput],
"parse_mode" (optional): str, default is "str"
"parse_func" (optional): Callable,
"parse_title" (optional): str ,
"tool_names" (optional): List[str]
}
"""
def __init__(self, goal: str, tasks: List[dict], **kwargs):
nodes = self._infer_nodes_from_tasks(tasks=tasks)
edges = self._infer_edges_from_nodes(nodes=nodes)
super().__init__(goal=goal, nodes=nodes, edges=edges, **kwargs)
def _infer_nodes_from_tasks(self, tasks: List[dict]) -> List[WorkFlowNode]:
nodes = [self._infer_node_from_task(task=task) for task in tasks]
return nodes
def _infer_node_from_task(self, task: dict) -> WorkFlowNode:
node_name = task.get("name", None)
if not node_name:
raise ValueError("The `name` for the following task is required: {}".format(task))
node_description = task.get("description", None)
if not node_description:
raise ValueError("The `description` for the following task is required: {}".format(task))
agent_prompt = task.get("prompt", None)
agent_prompt_template = task.get("prompt_template", None)
if not agent_prompt and not agent_prompt_template:
raise ValueError("The `prompt` or `prompt_template` for the following task is required: {}".format(task))
inputs = task.get("inputs", [])
outputs = task.get("outputs", [])
agent_name = generate_dynamic_class_name(node_name+" Agent")
agent_description = node_description # .replace("task", "agent")
agent_system_prompt = task.get("system_prompt", DEFAULT_SYSTEM_PROMPT)
agent_output_parser = task.get("output_parser", None)
agent_parse_mode = task.get("parse_mode", "str")
agent_parse_func = task.get("parse_func", None)
agent_parse_title = task.get("parse_title", None)
tool_names = task.get("tool_names", None)
tools = task.get("tools",None)
# tools = task.get("tools", [])
# tool_names = []
# if tools:
# for tool in tools:
# if isinstance(tool,Toolkit):
# tool_names.append(tool.name)
# elif isinstance(tool, Tool):
# tool_names.append(tool.name)
# else:
# tool_names.append(tool)
node = WorkFlowNode.from_dict(
{
"name": node_name,
"description": node_description,
"inputs": inputs,
"outputs": outputs,
"agents": [
{
"name": agent_name,
"description": agent_description,
"prompt": agent_prompt,
"prompt_template": agent_prompt_template,
"system_prompt": agent_system_prompt,
"inputs": inputs,
"outputs": outputs,
"output_parser": agent_output_parser,
"parse_mode": agent_parse_mode,
"parse_func": agent_parse_func,
"parse_title": agent_parse_title,
"tool_names": tool_names,
"tools":tools
}
],
}
)
return node
def get_graph_info(self, **kwargs) -> dict:
"""
Get the information of the workflow graph.
"""
config = {
"class_name": self.__class__.__name__,
"goal": self.goal,
"tasks": [
{
"name": node.name,
"description": node.description,
"inputs": [param.to_dict(ignore=["class_name"]) for param in node.inputs],
"outputs": [param.to_dict(ignore=["class_name"]) for param in node.outputs],
"prompt": node.agents[0].get("prompt", None),
"prompt_template": node.agents[0].get("prompt_template", None).to_dict() if node.agents[0].get("prompt_template", None) else None,
"system_prompt": node.agents[0].get("system_prompt", None),
"parse_mode": node.agents[0].get("parse_mode", "str"),
"parse_func": node.agents[0].get("parse_func", None).__name__ if node.agents[0].get("parse_func", None) else None,
"parse_title": node.agents[0].get("parse_title", None),
"tool_names": node.agents[0].get("tool_names", None),
"tools": node.agents[0].get("tools", None)
}
for node in self.nodes
]
}
return config
def save_module(self, path: str, ignore: List[str] = [], **kwargs):
"""
Save the workflow graph to a module file.
"""
logger.info("Saving {} to {}", self.__class__.__name__, path)
config = self.get_graph_info()
for ignore_key in ignore:
config.pop(ignore_key, None)
make_parent_folder(path)
with open(path, "w", encoding="utf-8") as f:
json.dump(config, f, indent=4)
return path
def get_config(self) -> Dict:
"""
Get a dictionary containing all necessary configuration to recreate this workflow graph.
Returns:
dict: A configuration dictionary that can be used to initialize a new SequentialWorkFlowGraph instance
with the same properties as this one.
"""
return self.get_graph_info()
class SEWWorkFlowGraph(SequentialWorkFlowGraph):
def __init__(self, **kwargs):
goal = kwargs.pop("goal", SEW_WORKFLOW["goal"])
tasks = kwargs.pop("tasks", SEW_WORKFLOW["tasks"])
super().__init__(goal=goal, tasks=tasks, **kwargs)
class STRUCTUREWorkFlowGraph(SequentialWorkFlowGraph):
def __init__(self, **kwargs):
goal = kwargs.pop("goal", STRUCTURE_WORKFLOW["goal"])
tasks = kwargs.pop("tasks", STRUCTURE_WORKFLOW["tasks"])
super().__init__(goal=goal, tasks=tasks, **kwargs)
class QASTRUCTUREWorkFlowGraph(SequentialWorkFlowGraph):
def __init__(self, **kwargs):
goal = kwargs.pop("goal", STRUCTURE_WORKFLOW["goal"])
tasks = kwargs.pop("tasks", STRUCTURE_WORKFLOW["tasks"])
super().__init__(goal=goal, tasks=tasks, **kwargs) |