File size: 10,194 Bytes
5374a2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
import json
from typing import Optional, List
from pydantic import Field, PositiveInt 

import time
from ..core.logging import logger
from ..core.module import BaseModule
# from ..core.base_config import Parameter
from ..core.message import Message, MessageType
from ..models.base_model import BaseLLM
from ..agents.agent import Agent
from ..agents.task_planner import TaskPlanner
from ..agents.agent_generator import AgentGenerator
from ..agents.workflow_reviewer import WorkFlowReviewer
from ..actions.task_planning import TaskPlanningOutput
from ..actions.agent_generation import AgentGenerationOutput
from ..workflow.workflow_graph import WorkFlowGraph, WorkFlowNode, WorkFlowEdge
from ..tools.tool import Toolkit

class WorkFlowGenerator(BaseModule):
    """
    Automated workflow generation system based on high-level goals.
    
    The WorkFlowGenerator is responsible for creating complete workflow graphs
    from high-level goals or task descriptions. It breaks down the goal into
    subtasks, creates the necessary dependency connections between tasks,
    and assigns or generates appropriate agents for each task.
    
    Attributes:
        llm: Language model used for generation and planning
        task_planner: Component responsible for breaking down goals into subtasks
        agent_generator: Component responsible for agent assignment or creation
        workflow_reviewer: Component for reviewing and improving workflows
        num_turns: Number of refinement iterations for the workflow
    """
    llm: Optional[BaseLLM] = None
    task_planner: Optional[TaskPlanner] = Field(default=None, description="Responsible for breaking down the high-level task into manageable sub-tasks.")
    agent_generator: Optional[AgentGenerator] = Field(default=None, description="Assigns or generates the appropriate agent(s) to handle each sub-task.")
    workflow_reviewer: Optional[WorkFlowReviewer] = Field(default=None, description="Provides feedback and reflections to improve the generated workflow.")
    num_turns: Optional[PositiveInt] = Field(default=0, description="Specifies the number of refinement iterations for the generated workflow.")
    tools: Optional[List[Toolkit]] = Field(default=None, description="A list of tools that can be used in the workflow.")
    
    def init_module(self):
        if self.task_planner is None:
            if self.llm is None:
                raise ValueError("Must provide `llm` when `task_planner` is None")
            self.task_planner = TaskPlanner(llm=self.llm)
        
        if self.agent_generator is None:
            if self.llm is None:
                raise ValueError("Must provide `llm` when `agent_generator` is None")
            self.agent_generator = AgentGenerator(llm=self.llm, tools=self.tools)
        
        # TODO add WorkFlowReviewer
        # if self.workflow_reviewer is None:
        #     if self.llm is None:
        #         raise ValueError(f"Must provide `llm` when `workflow_reviewer` is None")
        #     self.workflow_reviewer = WorkFlowReviewer(llm=self.llm)

    def get_tool_info(self):
        self.tool_info =[
            {
                tool.name: [
                    s["function"]["description"] for s in tool.get_tool_schemas()
                ],
            }
            for tool in self.tools
        ]

    def _execute_with_retry(self, operation_name: str, operation, retries_left: int = 1, **kwargs):
        """Helper method to execute operations with retry logic.
        
        Args:
            operation_name: Name of the operation for logging
            operation: Callable that performs the operation
            retries_left: Number of retry attempts remaining
            **kwargs: Additional arguments to pass to the operation
            
        Returns:
            Tuple of (operation_result, number_of_retries_used)
            
        Raises:
            ValueError: If operation fails after all retries are exhausted
        """
        cur_retries = 0

        while cur_retries <= retries_left:  # Changed < to <= to include the initial try
            try:
                logger.info(f"{operation_name} (attempt {cur_retries + 1}/{retries_left + 1}) ...")
                result = operation(**kwargs)
                return result, cur_retries
            except Exception as e:
                if cur_retries == retries_left:
                    raise ValueError(f"Failed to {operation_name} after {cur_retries + 1} attempts.\nError: {e}")
                sleep_time = 2 ** cur_retries
                logger.error(f"Failed to {operation_name} in {cur_retries + 1} attempts. Retry after {sleep_time} seconds.\nError: {e}")
                time.sleep(sleep_time)
                cur_retries += 1

    def generate_workflow(self, goal: str, existing_agents: Optional[List[Agent]] = None, retry: int = 1, **kwargs) -> WorkFlowGraph:
        # Validate input
        if not goal or len(goal.strip()) < 10:
            raise ValueError("Goal must be at least 10 characters and descriptive")

        plan_history, plan_suggestion = "", ""

        # Generate the initial workflow plan
        cur_retries = 0
        plan, added_retries = self._execute_with_retry(
            operation_name="Generating a workflow plan",
            operation=self.generate_plan,
            retries_left=retry,
            goal=goal,
            history=plan_history,
            suggestion=plan_suggestion
        )
        cur_retries += added_retries

        # Build workflow from plan
        workflow, added_retries = self._execute_with_retry(
            operation_name="Building workflow from plan",
            operation=self.build_workflow_from_plan,
            retries_left=retry - cur_retries,
            goal=goal,
            plan=plan
        )
        cur_retries += added_retries

        # Validate initial workflow structure
        logger.info("Validating initial workflow structure...")
        workflow._validate_workflow_structure()
        logger.info(f"Successfully generate the following workflow:\n{workflow.get_workflow_description()}")

        # generate / assigns the initial agents
        logger.info("Generating agents for the workflow ...")
        workflow, added_retries = self._execute_with_retry(
            operation_name="Generating agents for the workflow",
            operation=self.generate_agents,
            retries_left=retry - cur_retries,
            goal=goal,
            workflow=workflow,
            existing_agents=existing_agents
        )

        # Validate workflow after agent generation
        logger.info("Validating workflow after agent generation...")
        workflow._validate_workflow_structure()
        # Validate that all nodes have agents
        for node in workflow.nodes:
            if not node.agents:
                raise ValueError(f"Node {node.name} has no agents assigned after agent generation")

        return workflow

    def generate_plan(self, goal: str, history: Optional[str] = None, suggestion: Optional[str] = None) -> TaskPlanningOutput:
        history = "" if history is None else history
        suggestion = "" if suggestion is None else suggestion
        task_planner: TaskPlanner = self.task_planner
        task_planning_action_data = {"goal": goal, "history": history, "suggestion": suggestion}
        task_planning_action_name = task_planner.task_planning_action_name
        message: Message = task_planner.execute(
            action_name=task_planning_action_name,
            action_input_data=task_planning_action_data,
            return_msg_type=MessageType.REQUEST
        )
        return message.content
    
    def generate_agents(
        self, 
        goal: str, 
        workflow: WorkFlowGraph,
        existing_agents: Optional[List[Agent]] = None,
        # history: Optional[str] = None, 
        # suggestion: Optional[str] = None
    ) -> WorkFlowGraph:
        
        agent_generator: AgentGenerator = self.agent_generator
        workflow_desc = workflow.get_workflow_description()
        agent_generation_action_name = agent_generator.agent_generation_action_name
        for subtask in workflow.nodes:
            subtask_fields = ["name", "description", "reason", "inputs", "outputs"]
            subtask_data = {key: value for key, value in subtask.to_dict(ignore=["class_name"]).items() if key in subtask_fields}
            subtask_desc = json.dumps(subtask_data, indent=4)
            agent_generation_action_data = {"goal": goal, "workflow": workflow_desc, "task": subtask_desc}
            logger.info(f"Generating agents for subtask: {subtask_data['name']}")
            agents: AgentGenerationOutput = agent_generator.execute(
                action_name=agent_generation_action_name, 
                action_input_data=agent_generation_action_data,
                return_msg_type=MessageType.RESPONSE
            ).content
            # todo I only handle generated agents
            generated_agents = []
            for agent in agents.generated_agents:
                agent_dict = agent.to_dict(ignore=["class_name"])
                # agent_dict["llm_config"] = self.llm.config.to_dict()
                generated_agents.append(agent_dict)
            subtask.set_agents(agents=generated_agents)
        return workflow
    
    # def review_plan(self, goal: str, )
    def build_workflow_from_plan(self, goal: str, plan: TaskPlanningOutput) -> WorkFlowGraph:
        nodes: List[WorkFlowNode] = plan.sub_tasks
        # infer edges from sub-tasks' inputs and outputs
        edges: List[WorkFlowEdge] = []
        for node in nodes:
            for another_node in nodes:
                if node.name == another_node.name:
                    continue
                node_output_params = [param.name for param in node.outputs]
                another_node_input_params = [param.name for param in another_node.inputs]
                if any([param in another_node_input_params for param in node_output_params]):
                    edges.append(WorkFlowEdge(edge_tuple=(node.name, another_node.name)))
        workflow = WorkFlowGraph(goal=goal, nodes=nodes, edges=edges)
        return workflow