|
|
import json |
|
|
from typing import Optional, List |
|
|
from pydantic import Field, PositiveInt |
|
|
|
|
|
import time |
|
|
from ..core.logging import logger |
|
|
from ..core.module import BaseModule |
|
|
|
|
|
from ..core.message import Message, MessageType |
|
|
from ..models.base_model import BaseLLM |
|
|
from ..agents.agent import Agent |
|
|
from ..agents.task_planner import TaskPlanner |
|
|
from ..agents.agent_generator import AgentGenerator |
|
|
from ..agents.workflow_reviewer import WorkFlowReviewer |
|
|
from ..actions.task_planning import TaskPlanningOutput |
|
|
from ..actions.agent_generation import AgentGenerationOutput |
|
|
from ..workflow.workflow_graph import WorkFlowGraph, WorkFlowNode, WorkFlowEdge |
|
|
from ..tools.tool import Toolkit |
|
|
|
|
|
class WorkFlowGenerator(BaseModule): |
|
|
""" |
|
|
Automated workflow generation system based on high-level goals. |
|
|
|
|
|
The WorkFlowGenerator is responsible for creating complete workflow graphs |
|
|
from high-level goals or task descriptions. It breaks down the goal into |
|
|
subtasks, creates the necessary dependency connections between tasks, |
|
|
and assigns or generates appropriate agents for each task. |
|
|
|
|
|
Attributes: |
|
|
llm: Language model used for generation and planning |
|
|
task_planner: Component responsible for breaking down goals into subtasks |
|
|
agent_generator: Component responsible for agent assignment or creation |
|
|
workflow_reviewer: Component for reviewing and improving workflows |
|
|
num_turns: Number of refinement iterations for the workflow |
|
|
""" |
|
|
llm: Optional[BaseLLM] = None |
|
|
task_planner: Optional[TaskPlanner] = Field(default=None, description="Responsible for breaking down the high-level task into manageable sub-tasks.") |
|
|
agent_generator: Optional[AgentGenerator] = Field(default=None, description="Assigns or generates the appropriate agent(s) to handle each sub-task.") |
|
|
workflow_reviewer: Optional[WorkFlowReviewer] = Field(default=None, description="Provides feedback and reflections to improve the generated workflow.") |
|
|
num_turns: Optional[PositiveInt] = Field(default=0, description="Specifies the number of refinement iterations for the generated workflow.") |
|
|
tools: Optional[List[Toolkit]] = Field(default=None, description="A list of tools that can be used in the workflow.") |
|
|
|
|
|
def init_module(self): |
|
|
if self.task_planner is None: |
|
|
if self.llm is None: |
|
|
raise ValueError("Must provide `llm` when `task_planner` is None") |
|
|
self.task_planner = TaskPlanner(llm=self.llm) |
|
|
|
|
|
if self.agent_generator is None: |
|
|
if self.llm is None: |
|
|
raise ValueError("Must provide `llm` when `agent_generator` is None") |
|
|
self.agent_generator = AgentGenerator(llm=self.llm, tools=self.tools) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_tool_info(self): |
|
|
self.tool_info =[ |
|
|
{ |
|
|
tool.name: [ |
|
|
s["function"]["description"] for s in tool.get_tool_schemas() |
|
|
], |
|
|
} |
|
|
for tool in self.tools |
|
|
] |
|
|
|
|
|
def _execute_with_retry(self, operation_name: str, operation, retries_left: int = 1, **kwargs): |
|
|
"""Helper method to execute operations with retry logic. |
|
|
|
|
|
Args: |
|
|
operation_name: Name of the operation for logging |
|
|
operation: Callable that performs the operation |
|
|
retries_left: Number of retry attempts remaining |
|
|
**kwargs: Additional arguments to pass to the operation |
|
|
|
|
|
Returns: |
|
|
Tuple of (operation_result, number_of_retries_used) |
|
|
|
|
|
Raises: |
|
|
ValueError: If operation fails after all retries are exhausted |
|
|
""" |
|
|
cur_retries = 0 |
|
|
|
|
|
while cur_retries <= retries_left: |
|
|
try: |
|
|
logger.info(f"{operation_name} (attempt {cur_retries + 1}/{retries_left + 1}) ...") |
|
|
result = operation(**kwargs) |
|
|
return result, cur_retries |
|
|
except Exception as e: |
|
|
if cur_retries == retries_left: |
|
|
raise ValueError(f"Failed to {operation_name} after {cur_retries + 1} attempts.\nError: {e}") |
|
|
sleep_time = 2 ** cur_retries |
|
|
logger.error(f"Failed to {operation_name} in {cur_retries + 1} attempts. Retry after {sleep_time} seconds.\nError: {e}") |
|
|
time.sleep(sleep_time) |
|
|
cur_retries += 1 |
|
|
|
|
|
def generate_workflow(self, goal: str, existing_agents: Optional[List[Agent]] = None, retry: int = 1, **kwargs) -> WorkFlowGraph: |
|
|
|
|
|
if not goal or len(goal.strip()) < 10: |
|
|
raise ValueError("Goal must be at least 10 characters and descriptive") |
|
|
|
|
|
plan_history, plan_suggestion = "", "" |
|
|
|
|
|
|
|
|
cur_retries = 0 |
|
|
plan, added_retries = self._execute_with_retry( |
|
|
operation_name="Generating a workflow plan", |
|
|
operation=self.generate_plan, |
|
|
retries_left=retry, |
|
|
goal=goal, |
|
|
history=plan_history, |
|
|
suggestion=plan_suggestion |
|
|
) |
|
|
cur_retries += added_retries |
|
|
|
|
|
|
|
|
workflow, added_retries = self._execute_with_retry( |
|
|
operation_name="Building workflow from plan", |
|
|
operation=self.build_workflow_from_plan, |
|
|
retries_left=retry - cur_retries, |
|
|
goal=goal, |
|
|
plan=plan |
|
|
) |
|
|
cur_retries += added_retries |
|
|
|
|
|
|
|
|
logger.info("Validating initial workflow structure...") |
|
|
workflow._validate_workflow_structure() |
|
|
logger.info(f"Successfully generate the following workflow:\n{workflow.get_workflow_description()}") |
|
|
|
|
|
|
|
|
logger.info("Generating agents for the workflow ...") |
|
|
workflow, added_retries = self._execute_with_retry( |
|
|
operation_name="Generating agents for the workflow", |
|
|
operation=self.generate_agents, |
|
|
retries_left=retry - cur_retries, |
|
|
goal=goal, |
|
|
workflow=workflow, |
|
|
existing_agents=existing_agents |
|
|
) |
|
|
|
|
|
|
|
|
logger.info("Validating workflow after agent generation...") |
|
|
workflow._validate_workflow_structure() |
|
|
|
|
|
for node in workflow.nodes: |
|
|
if not node.agents: |
|
|
raise ValueError(f"Node {node.name} has no agents assigned after agent generation") |
|
|
|
|
|
return workflow |
|
|
|
|
|
def generate_plan(self, goal: str, history: Optional[str] = None, suggestion: Optional[str] = None) -> TaskPlanningOutput: |
|
|
history = "" if history is None else history |
|
|
suggestion = "" if suggestion is None else suggestion |
|
|
task_planner: TaskPlanner = self.task_planner |
|
|
task_planning_action_data = {"goal": goal, "history": history, "suggestion": suggestion} |
|
|
task_planning_action_name = task_planner.task_planning_action_name |
|
|
message: Message = task_planner.execute( |
|
|
action_name=task_planning_action_name, |
|
|
action_input_data=task_planning_action_data, |
|
|
return_msg_type=MessageType.REQUEST |
|
|
) |
|
|
return message.content |
|
|
|
|
|
def generate_agents( |
|
|
self, |
|
|
goal: str, |
|
|
workflow: WorkFlowGraph, |
|
|
existing_agents: Optional[List[Agent]] = None, |
|
|
|
|
|
|
|
|
) -> WorkFlowGraph: |
|
|
|
|
|
agent_generator: AgentGenerator = self.agent_generator |
|
|
workflow_desc = workflow.get_workflow_description() |
|
|
agent_generation_action_name = agent_generator.agent_generation_action_name |
|
|
for subtask in workflow.nodes: |
|
|
subtask_fields = ["name", "description", "reason", "inputs", "outputs"] |
|
|
subtask_data = {key: value for key, value in subtask.to_dict(ignore=["class_name"]).items() if key in subtask_fields} |
|
|
subtask_desc = json.dumps(subtask_data, indent=4) |
|
|
agent_generation_action_data = {"goal": goal, "workflow": workflow_desc, "task": subtask_desc} |
|
|
logger.info(f"Generating agents for subtask: {subtask_data['name']}") |
|
|
agents: AgentGenerationOutput = agent_generator.execute( |
|
|
action_name=agent_generation_action_name, |
|
|
action_input_data=agent_generation_action_data, |
|
|
return_msg_type=MessageType.RESPONSE |
|
|
).content |
|
|
|
|
|
generated_agents = [] |
|
|
for agent in agents.generated_agents: |
|
|
agent_dict = agent.to_dict(ignore=["class_name"]) |
|
|
|
|
|
generated_agents.append(agent_dict) |
|
|
subtask.set_agents(agents=generated_agents) |
|
|
return workflow |
|
|
|
|
|
|
|
|
def build_workflow_from_plan(self, goal: str, plan: TaskPlanningOutput) -> WorkFlowGraph: |
|
|
nodes: List[WorkFlowNode] = plan.sub_tasks |
|
|
|
|
|
edges: List[WorkFlowEdge] = [] |
|
|
for node in nodes: |
|
|
for another_node in nodes: |
|
|
if node.name == another_node.name: |
|
|
continue |
|
|
node_output_params = [param.name for param in node.outputs] |
|
|
another_node_input_params = [param.name for param in another_node.inputs] |
|
|
if any([param in another_node_input_params for param in node_output_params]): |
|
|
edges.append(WorkFlowEdge(edge_tuple=(node.name, another_node.name))) |
|
|
workflow = WorkFlowGraph(goal=goal, nodes=nodes, edges=edges) |
|
|
return workflow |
|
|
|
|
|
|