File size: 24,175 Bytes
5374a2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
from pydantic import Field
from itertools import chain
from collections import defaultdict
from typing import Union, Optional, Tuple, Dict, List

from ..core.module import BaseModule
# from ..core.base_config import Parameter
from ..core.message import Message, MessageType
from ..models.base_model import BaseLLM, LLMOutputParser
# from ..agents.agent import Agent
from ..actions.action import Action
from ..agents.agent_manager import AgentManager
from .action_graph import ActionGraph
from .environment import Environment, TrajectoryState
from .workflow_graph import WorkFlowNode, WorkFlowGraph
from ..prompts.workflow.workflow_manager import (
    DEFAULT_TASK_SCHEDULER, 
    DEFAULT_ACTION_SCHEDULER, 
    OUTPUT_EXTRACTION_PROMPT
)


class Scheduler(Action):
    """
    Base interface for workflow schedulers.
    
    Provides a common interface for all scheduler types within the workflow
    system. Schedulers are responsible for making decisions about what to 
    execute next in a workflow, whether at the task or action level.
    
    Inherits from Action to leverage the common action interface and functionality.
    """
    pass


class TaskSchedulerOutput(LLMOutputParser):
    
    decision: str = Field(description="The decision made by the scheduler, whether to re-execute, iterate or forward a certain task.")
    task_name: str = Field(description="The name of the scheduled task.")
    reason: str = Field(description="The rationale behind the scheduling decision, explaining why the task was scheduled.")

    def to_str(self, **kwargs) -> str:
        return f"Based on the workflow execution results, the next subtask to be executed is '{self.task_name}' because {self.reason}"
    

class TaskScheduler(Action):

    """
    Determines the next task to execute in a workflow.
    """
    def __init__(self, **kwargs):
        name = kwargs.pop("name", None) if "name" in kwargs else DEFAULT_TASK_SCHEDULER["name"]
        description = kwargs.pop("description", None) if "description" in kwargs else DEFAULT_TASK_SCHEDULER["description"]
        prompt = kwargs.pop("prompt", None) if "prompt" in kwargs else DEFAULT_TASK_SCHEDULER["prompt"]
        super().__init__(name=name, description=description, prompt=prompt, outputs_format=TaskSchedulerOutput, **kwargs)
        self.max_num_turns = kwargs.get("max_num_turns", DEFAULT_TASK_SCHEDULER["max_num_turns"])

    def get_predecessor_tasks(self, graph: WorkFlowGraph, tasks: List[WorkFlowNode]) -> List[str]:
        predecessors = [] 
        for task in tasks:
            candidates = graph.get_node_predecessors(node=task)
            for candidate in candidates:
                if candidate not in predecessors:
                    predecessors.append(candidate)
        return predecessors
    
    def _handle_edge_cases(self, candidate_tasks: List[WorkFlowNode]) -> Union[TaskSchedulerOutput, None]:
        """
        Handle edge cases for task scheduling: Only one candidate task
        
        Args:
            candidate_tasks (List[WorkFlowNode]): List of candidate tasks to schedule      
            
        Returns:
            Either a TaskSchedulerOutput if a direct return is possible, or None if normal processing should continue
        """
        
        # Only one candidate task
        if len(candidate_tasks) == 1:
            task_name = candidate_tasks[0].name
            scheduled_task = TaskSchedulerOutput(
                decision="forward", 
                task_name=task_name,
                reason = f"Only one candidate task '{task_name}' is available."
            )
            return scheduled_task
        
        # Multiple candidate tasks, need normal processing
        return None
    
    def _prepare_execution(self, graph: WorkFlowGraph, env: Environment, candidate_tasks: List[WorkFlowNode]) -> Tuple[dict, str]:
        """
        Prepares common execution logic for both sync and async execute methods.
        This is only called when edge cases have been handled and we need to generate a prompt.
        
        Args:
            graph (WorkFlowGraph): The workflow graph.
            env (Environment): The execution environment.
            candidate_tasks (List[WorkFlowNode]): List of candidate tasks to schedule
            
        Returns:
            A tuple with prompt_inputs and prompt for LLM processing.
        """

        # Process multiple candidate tasks by preparing the LLM prompt
        workflow_graph_representation = graph.get_workflow_description()
        execution_history = " -> ".join(env.task_execution_history)
        # in execution_ouputs only consider the predecessors of candidate tasks
        predecessor_tasks = self.get_predecessor_tasks(graph=graph, tasks=candidate_tasks)
        execution_outputs = "\n\n".join([str(msg) for msg in env.get_task_messages(tasks=predecessor_tasks)])
        candidate_tasks_info = "\n\n".join([task.get_task_info() for task in candidate_tasks])
        prompt_inputs = {
            "workflow_graph_representation": workflow_graph_representation, 
            "execution_history": execution_history,
            "execution_outputs": execution_outputs, 
            "candidate_tasks": candidate_tasks_info,
            "max_num_turns": self.max_num_turns
        }
        prompt = self.prompt.format(**prompt_inputs)
        return prompt_inputs, prompt
    
    def execute(self, llm: Optional[BaseLLM] = None, graph: WorkFlowGraph = None, env: Environment = None, sys_msg: Optional[str] = None, return_prompt: bool=False, **kwargs) -> Union[TaskSchedulerOutput, Tuple[TaskSchedulerOutput, str]]:
        """
        Determine the next executable tasks.

        Args:
            llm (Optional[BaseLLM]): Language model to use for generation.
            graph (WorkFlowGraph): The workflow graph.
            env (Environment): The execution environment. 
            sys_msg (Optional[str]): Optional system message for the LLM.
            return_prompt (bool): Whether to return the prompt along with the output.
        
        Returns:
            Union[TaskSchedulerOutput, Tuple[TaskSchedulerOutput, str]]: The scheduled task and optionally the prompt.
        """
        assert graph is not None and env is not None, "must provide 'graph' and 'env' when executing TaskScheduler"

        # obtain candidate tasks 
        candidate_tasks: List[WorkFlowNode] = graph.next() 
        if not candidate_tasks:
            return None 

        # First handle edge cases (only one candidate task)
        edge_case_result = self._handle_edge_cases(candidate_tasks)
        if edge_case_result is not None:
            return (edge_case_result, None) if return_prompt else edge_case_result
        
        # Handle LLM generation case
        _, prompt = self._prepare_execution(graph, env, candidate_tasks)
        scheduled_task = llm.generate(prompt=prompt, system_message=sys_msg, parser=self.outputs_format)
        
        if return_prompt:
            return scheduled_task, prompt
        return scheduled_task
    
    async def async_execute(self, llm: Optional[BaseLLM] = None, graph: WorkFlowGraph = None, env: Environment = None, sys_msg: Optional[str] = None, return_prompt: bool=False, **kwargs) -> Union[TaskSchedulerOutput, Tuple[TaskSchedulerOutput, str]]:
        """
        Asynchronously determine the next executable tasks.

        Args:
            llm (Optional[BaseLLM]): Language model to use for generation.
            graph (WorkFlowGraph): The workflow graph.
            env (Environment): The execution environment. 
            sys_msg (Optional[str]): Optional system message for the LLM.
            return_prompt (bool): Whether to return the prompt along with the output.
        
        Returns:
            Union[TaskSchedulerOutput, Tuple[TaskSchedulerOutput, str]]: The scheduled task and optionally the prompt.
        """
        assert graph is not None and env is not None, "must provide 'graph' and 'env' when executing TaskScheduler"

        # obtain candidate tasks 
        candidate_tasks: List[WorkFlowNode] = graph.next()
        if not candidate_tasks:
            return None 

        # First handle edge cases
        edge_case_result = self._handle_edge_cases(candidate_tasks)
        if edge_case_result is not None:
            return (edge_case_result, None) if return_prompt else edge_case_result
        
        # Handle async LLM generation case
        _, prompt = self._prepare_execution(graph, env, candidate_tasks)
        scheduled_task = await llm.async_generate(prompt=prompt, system_message=sys_msg, parser=self.outputs_format)
        
        if return_prompt:
            return scheduled_task, prompt
        return scheduled_task


class NextAction(LLMOutputParser):

    agent: Optional[str] = Field(default=None, description="The name of the selected agent responsible for executing the next action in the workflow.")
    action: Optional[str] = Field(default=None, description="The name of the action that the selected agent will execute to continue progressing the subtask.")
    reason: Optional[str] = Field(default=None, description= "The justification for selecting this agent and action, explaining how it contributes to subtask execution based on workflow requirements and execution history.")
    action_graph: Optional[ActionGraph] = Field(default=None, description="The predefined action graph to be executed.")

    def to_str(self, **kwargs) -> str:
        if self.agent is not None and self.action is not None:
            return f"Based on the tasks' execution results, the next action to be executed is the '{self.action}' action of '{self.agent}' agent."
        elif self.action_graph is not None:
            return f"The predefined action graph '{type(self.action_graph).__name__}' will be executed."
        else:
            raise ValueError("must provide either both agent (str) and action (str), or action_graph (ActionGraph).")


class ActionScheduler(Action):

    """
    Determines the next action(s) to execute for a given task using an LLM.
    """
    def __init__(self, **kwargs):
        name = kwargs.pop("name", None) if "name" in kwargs else DEFAULT_ACTION_SCHEDULER["name"]
        description = kwargs.pop("description", None) if "description" in kwargs else DEFAULT_ACTION_SCHEDULER["description"]
        prompt = kwargs.pop("prompt", None) if "prompt" in kwargs else DEFAULT_ACTION_SCHEDULER["prompt"]
        super().__init__(name=name, description=description, prompt=prompt, outputs_format=NextAction, **kwargs)

    def format_task_input_data(self, data: dict) -> str:
        info_list = [] 
        for key, value in data.items():
            info_list.append("## {}\n{}".format(key, value))
        return "\n\n".join(info_list)
    
    def check_candidate_action(self, task_name: str, actions: List[str], agent_actions_map: Dict[str, List[str]]):
        unknown_actions = []
        merged_actions = set(chain.from_iterable(agent_actions_map.values()))
        for action in actions:
            if action not in merged_actions:
                unknown_actions.append(action)
        if unknown_actions:
            raise ValueError(f"Unknown actions: {unknown_actions} specified in the `next_actions`. All available actions defined for the task ({task_name}) are {merged_actions}.")
    
    def get_agent_action_pairs(self, action: str, agent_actions_map: Dict[str, List[str]]) -> List[Tuple[str, str]]:
        pairs = [] 
        for agent, actions in agent_actions_map.items():
            if action in actions:
                pairs.append((agent, action))
        return pairs

    def _prepare_action_execution(
        self, 
        task: WorkFlowNode, 
        agent_manager: AgentManager, 
        env: Environment
    ) -> Union[Tuple[NextAction, None], Tuple[None, dict, str]]:
        """
        Prepares common execution logic for both sync and async execute methods.
        
        Args:
            task (WorkFlowNode): The task for which to schedule an action.
            agent_manager (AgentManager): The agent manager providing the agents.
            env (Environment): The execution environment.
            
        Returns:
            Either a tuple with a scheduled action and None if a direct return is possible,
            or a tuple with None, prompt_inputs, and prompt if LLM processing is needed.
        """
        # the task has a action_graph, directly return the action_graph for execution 
        if task.action_graph is not None:
            next_action = NextAction(action_graph=task.action_graph)
            return next_action, None
        
        # Otherwise, schedule an agent to execute the task.
        task_agent_names = task.get_agents()
        if not task_agent_names:
            raise ValueError(f"The task '{task.name}' does not provide any agents for execution!")
        
        task_agents = [agent_manager.get_agent(name) for name in task_agent_names]
        task_agent_actions_map = {agent.name: [action.name for action in agent.get_all_actions()] for agent in task_agents}
        
        next_action = None
        candidate_agent_actions = defaultdict(set)

        # if a previous message has specified next_actions, select from these actions
        task_execution_messages = env.get_task_messages(task.name)
        if task_execution_messages and task_execution_messages[-1].next_actions:
            predefined_next_actions = task_execution_messages[-1].next_actions
            # check whether all the predefined_next_actions are present in the actions of task_agents
            self.check_candidate_action(task.name, predefined_next_actions, task_agent_actions_map)
            if len(predefined_next_actions) == 1:
                predefined_next_action = predefined_next_actions[0]
                agent_action_pairs = self.get_agent_action_pairs(predefined_next_action, task_agent_actions_map)
                if len(agent_action_pairs) == 1:
                    next_action = NextAction(
                        agent=agent_action_pairs[0][0], 
                        action=agent_action_pairs[0][1],
                        reason=f"Selected because task history indicates a single predefined next action: {predefined_next_action}"
                    )
                else:
                    for agent, action in agent_action_pairs:
                        candidate_agent_actions[agent].add(action)
            else:
                for predefined_next_action in predefined_next_actions:
                    agent_action_pairs = self.get_agent_action_pairs(predefined_next_action, task_agent_actions_map)
                    for agent, action in agent_action_pairs:
                        candidate_agent_actions[agent].add(action)
        
        # if there are only one agent and one action, directly return the action
        if not next_action and len(task_agent_names) == 1 and len(task_agent_actions_map[task_agent_names[0]]) == 1:
            task_agent_name = task_agent_names[0]
            task_action_name = task_agent_actions_map[task_agent_name][0]
            next_action = NextAction(
                agent=task_agent_name, 
                action=task_action_name, 
                reason=f"Only one agent ('{task_agent_name}') is available, and it has only one action ('{task_action_name}'), making it the obvious choice."
            )
        
        if next_action is not None:
            return next_action, None

        # prepare candidate agent & action information 
        # agent_actions_info = "\n\n".join([agent.get_agent_profile() for agent in task_agents])
        candidate_agent_actions = candidate_agent_actions or task_agent_actions_map
        agent_actions_info = "\n\n".join(
            [
                agent.get_agent_profile(action_names=candidate_agent_actions[agent.name]) \
                    for agent in task_agents if agent.name in candidate_agent_actions
            ]
        )

        # prepare task and execution information
        task_info = task.get_task_info()
        task_input_names = [param.name for param in task.inputs]
        task_input_data: dict = env.get_execution_data(task_input_names)
        task_input_data_info = self.format_task_input_data(data=task_input_data)
        task_execution_history = "\n\n".join([str(msg) for msg in task_execution_messages])

        prompt_inputs = {
            "task_info": task_info, 
            "task_inputs": task_input_data_info, 
            "task_execution_history": task_execution_history, 
            "agent_action_list": agent_actions_info,
        }
        prompt = self.prompt.format(**prompt_inputs)
        return None, prompt_inputs, prompt
        
    def execute(
        self, 
        llm: Optional[BaseLLM] = None, 
        task: WorkFlowNode = None, 
        agent_manager: AgentManager = None, 
        env: Environment = None, 
        sys_msg: Optional[str] = None, 
        return_prompt: bool=True, 
        **kwargs
    ) -> Union[NextAction, Tuple[NextAction, str]]:
        """
        Determine the next actions to take for the given task. 
        If the last message stored in ``next_actions`` specifies the ``next_actions``, choose an action from these actions to execute.

        Args:
            llm (Optional[BaseLLM]): Language model to use for generation.
            task (WorkFlowNode): The task for which to schedule an action.
            agent_manager (AgentManager): The agent manager providing the agents.
            env (Environment): The execution environment.
            sys_msg (Optional[str]): Optional system message for the LLM.
            return_prompt (bool): Whether to return the prompt along with the output.
            
        Returns:
            Union[NextAction, Tuple[NextAction, str]]: The scheduled action and optionally the prompt.
        """
        result = self._prepare_action_execution(task=task, agent_manager=agent_manager, env=env)
        if result[0] is not None:
            # Handle direct return case
            next_action, _ = result
            return (next_action, None) if return_prompt else next_action
        
        # Handle LLM generation case
        _, _, prompt = result
        next_action = llm.generate(prompt=prompt, system_message=sys_msg, parser=self.outputs_format)
        
        if return_prompt:
            return next_action, prompt
        return next_action
    
    async def async_execute(
        self, 
        llm: Optional[BaseLLM] = None, 
        task: WorkFlowNode = None, 
        agent_manager: AgentManager = None, 
        env: Environment = None, 
        sys_msg: Optional[str] = None, 
        return_prompt: bool=True, 
        **kwargs
    ) -> Union[NextAction, Tuple[NextAction, str]]:
        """
        Asynchronously determine the next actions to take for the given task.
        If the last message stored in ``next_actions`` specifies the ``next_actions``, choose an action from these actions to execute.

        Args:
            llm (Optional[BaseLLM]): Language model to use for generation.
            task (WorkFlowNode): The task for which to schedule an action.
            agent_manager (AgentManager): The agent manager providing the agents.
            env (Environment): The execution environment.
            sys_msg (Optional[str]): Optional system message for the LLM.
            return_prompt (bool): Whether to return the prompt along with the output.
            
        Returns:
            Union[NextAction, Tuple[NextAction, str]]: The scheduled action and optionally the prompt.
        """
        result = self._prepare_action_execution(task=task, agent_manager=agent_manager, env=env)
        if result[0] is not None:
            # Handle direct return case
            next_action, _ = result
            return (next_action, None) if return_prompt else next_action
        
        # Handle async LLM generation case
        _, _, prompt = result
        next_action = await llm.async_generate(prompt=prompt, system_message=sys_msg, parser=self.outputs_format)
        
        if return_prompt:
            return next_action, prompt
        return next_action


class WorkFlowManager(BaseModule):
    """
    Responsible for the scheduling and decision-making when executing a workflow. 

    Attributes:
        task_scheduler (TaskScheduler): Determines the next task(s) to execute based on the workflow graph and node states.
        action_scheduler (ActionScheduler): Determines the next action(s) to take for the selected task using an LLM.
    """
    llm: BaseLLM
    action_scheduler: ActionScheduler = Field(default_factory=ActionScheduler)
    task_scheduler: TaskScheduler = Field(default_factory=TaskScheduler)

    def init_module(self):
        self._save_ignore_fields = ["llm"]

    async def schedule_next_task(self, graph: WorkFlowGraph, env: Environment = None, **kwargs) -> WorkFlowNode:
        """
        Return the next task to execute asynchronously.
        """
        execution_results = await self.task_scheduler.async_execute(llm=self.llm, graph=graph, env=env, return_prompt=True, **kwargs)
        if execution_results is None:
            return None
        scheduled_task, prompt, *other = execution_results
        message = Message(
            content=scheduled_task, agent=type(self).__name__, action=self.task_scheduler.name, \
                prompt=prompt, msg_type=MessageType.COMMAND, wf_goal=graph.goal
        )
        env.update(message=message, state=TrajectoryState.COMPLETED)
        task: WorkFlowNode = graph.get_node(scheduled_task.task_name)
        return task
    
    async def schedule_next_action(self, goal: str, task: WorkFlowNode, agent_manager: AgentManager, env: Environment = None, **kwargs) -> NextAction:
        """
        Asynchronously return the next action to execute. If the task is completed, return None.
        """
        execution_results = await self.action_scheduler.async_execute(llm=self.llm, task=task, agent_manager=agent_manager, env=env, return_prompt=True, **kwargs)
        if execution_results is None:
            return None
        next_action, prompt, *_ = execution_results
        message = Message(
            content=next_action, agent=type(self).__name__, action=self.action_scheduler.name, \
                prompt=prompt, msg_type=MessageType.COMMAND, wf_goal=goal, wf_task=task.name, wf_task_desc=task.description 
        )
        env.update(message=message, state=TrajectoryState.COMPLETED)
        return next_action

    async def extract_output(self, graph: WorkFlowGraph, env: Environment, **kwargs) -> str:
        """
        Asynchronously extract output from the workflow execution.
        
        Args:
            graph (WorkFlowGraph): The workflow graph.
            env (Environment): The execution environment.
            
        Returns:
            str: The extracted output.
        """
        # obtain the output for end tasks
        end_tasks = graph.find_end_nodes()
        end_task_predecesssors = sum([graph.get_node_predecessors(node=end_task) for end_task in end_tasks], [])
        candidate_taks_with_output = list(set(end_tasks)|set(end_task_predecesssors))
        candidate_msgs_with_output = [] 
        for task in candidate_taks_with_output:
            # only task the final output of the task
            candidate_msgs_with_output.extend(env.get_task_messages(tasks=task, n=1))
        candidate_msgs_with_output = Message.sort_by_timestamp(messages=candidate_msgs_with_output)

        prompt = OUTPUT_EXTRACTION_PROMPT.format(
            goal=graph.goal, 
            workflow_graph_representation=graph.get_workflow_description(), 
            workflow_execution_results="\n\n".join([str(msg) for msg in candidate_msgs_with_output]), 
        )
        llm_output: LLMOutputParser = await self.llm.async_generate(prompt=prompt)
        return llm_output.content

    def save_module(self, path: str, ignore: List[str] = [], **kwargs)-> str:
        ignore_fields = self._save_ignore_fields + ignore
        super().save_module(path=path, ignore=ignore_fields, **kwargs)