selfevolveagent / evoagentx /workflow /workflow_generator.py
iLOVE2D's picture
Upload 2846 files
5374a2d verified
import json
from typing import Optional, List
from pydantic import Field, PositiveInt
import time
from ..core.logging import logger
from ..core.module import BaseModule
# from ..core.base_config import Parameter
from ..core.message import Message, MessageType
from ..models.base_model import BaseLLM
from ..agents.agent import Agent
from ..agents.task_planner import TaskPlanner
from ..agents.agent_generator import AgentGenerator
from ..agents.workflow_reviewer import WorkFlowReviewer
from ..actions.task_planning import TaskPlanningOutput
from ..actions.agent_generation import AgentGenerationOutput
from ..workflow.workflow_graph import WorkFlowGraph, WorkFlowNode, WorkFlowEdge
from ..tools.tool import Toolkit
class WorkFlowGenerator(BaseModule):
"""
Automated workflow generation system based on high-level goals.
The WorkFlowGenerator is responsible for creating complete workflow graphs
from high-level goals or task descriptions. It breaks down the goal into
subtasks, creates the necessary dependency connections between tasks,
and assigns or generates appropriate agents for each task.
Attributes:
llm: Language model used for generation and planning
task_planner: Component responsible for breaking down goals into subtasks
agent_generator: Component responsible for agent assignment or creation
workflow_reviewer: Component for reviewing and improving workflows
num_turns: Number of refinement iterations for the workflow
"""
llm: Optional[BaseLLM] = None
task_planner: Optional[TaskPlanner] = Field(default=None, description="Responsible for breaking down the high-level task into manageable sub-tasks.")
agent_generator: Optional[AgentGenerator] = Field(default=None, description="Assigns or generates the appropriate agent(s) to handle each sub-task.")
workflow_reviewer: Optional[WorkFlowReviewer] = Field(default=None, description="Provides feedback and reflections to improve the generated workflow.")
num_turns: Optional[PositiveInt] = Field(default=0, description="Specifies the number of refinement iterations for the generated workflow.")
tools: Optional[List[Toolkit]] = Field(default=None, description="A list of tools that can be used in the workflow.")
def init_module(self):
if self.task_planner is None:
if self.llm is None:
raise ValueError("Must provide `llm` when `task_planner` is None")
self.task_planner = TaskPlanner(llm=self.llm)
if self.agent_generator is None:
if self.llm is None:
raise ValueError("Must provide `llm` when `agent_generator` is None")
self.agent_generator = AgentGenerator(llm=self.llm, tools=self.tools)
# TODO add WorkFlowReviewer
# if self.workflow_reviewer is None:
# if self.llm is None:
# raise ValueError(f"Must provide `llm` when `workflow_reviewer` is None")
# self.workflow_reviewer = WorkFlowReviewer(llm=self.llm)
def get_tool_info(self):
self.tool_info =[
{
tool.name: [
s["function"]["description"] for s in tool.get_tool_schemas()
],
}
for tool in self.tools
]
def _execute_with_retry(self, operation_name: str, operation, retries_left: int = 1, **kwargs):
"""Helper method to execute operations with retry logic.
Args:
operation_name: Name of the operation for logging
operation: Callable that performs the operation
retries_left: Number of retry attempts remaining
**kwargs: Additional arguments to pass to the operation
Returns:
Tuple of (operation_result, number_of_retries_used)
Raises:
ValueError: If operation fails after all retries are exhausted
"""
cur_retries = 0
while cur_retries <= retries_left: # Changed < to <= to include the initial try
try:
logger.info(f"{operation_name} (attempt {cur_retries + 1}/{retries_left + 1}) ...")
result = operation(**kwargs)
return result, cur_retries
except Exception as e:
if cur_retries == retries_left:
raise ValueError(f"Failed to {operation_name} after {cur_retries + 1} attempts.\nError: {e}")
sleep_time = 2 ** cur_retries
logger.error(f"Failed to {operation_name} in {cur_retries + 1} attempts. Retry after {sleep_time} seconds.\nError: {e}")
time.sleep(sleep_time)
cur_retries += 1
def generate_workflow(self, goal: str, existing_agents: Optional[List[Agent]] = None, retry: int = 1, **kwargs) -> WorkFlowGraph:
# Validate input
if not goal or len(goal.strip()) < 10:
raise ValueError("Goal must be at least 10 characters and descriptive")
plan_history, plan_suggestion = "", ""
# Generate the initial workflow plan
cur_retries = 0
plan, added_retries = self._execute_with_retry(
operation_name="Generating a workflow plan",
operation=self.generate_plan,
retries_left=retry,
goal=goal,
history=plan_history,
suggestion=plan_suggestion
)
cur_retries += added_retries
# Build workflow from plan
workflow, added_retries = self._execute_with_retry(
operation_name="Building workflow from plan",
operation=self.build_workflow_from_plan,
retries_left=retry - cur_retries,
goal=goal,
plan=plan
)
cur_retries += added_retries
# Validate initial workflow structure
logger.info("Validating initial workflow structure...")
workflow._validate_workflow_structure()
logger.info(f"Successfully generate the following workflow:\n{workflow.get_workflow_description()}")
# generate / assigns the initial agents
logger.info("Generating agents for the workflow ...")
workflow, added_retries = self._execute_with_retry(
operation_name="Generating agents for the workflow",
operation=self.generate_agents,
retries_left=retry - cur_retries,
goal=goal,
workflow=workflow,
existing_agents=existing_agents
)
# Validate workflow after agent generation
logger.info("Validating workflow after agent generation...")
workflow._validate_workflow_structure()
# Validate that all nodes have agents
for node in workflow.nodes:
if not node.agents:
raise ValueError(f"Node {node.name} has no agents assigned after agent generation")
return workflow
def generate_plan(self, goal: str, history: Optional[str] = None, suggestion: Optional[str] = None) -> TaskPlanningOutput:
history = "" if history is None else history
suggestion = "" if suggestion is None else suggestion
task_planner: TaskPlanner = self.task_planner
task_planning_action_data = {"goal": goal, "history": history, "suggestion": suggestion}
task_planning_action_name = task_planner.task_planning_action_name
message: Message = task_planner.execute(
action_name=task_planning_action_name,
action_input_data=task_planning_action_data,
return_msg_type=MessageType.REQUEST
)
return message.content
def generate_agents(
self,
goal: str,
workflow: WorkFlowGraph,
existing_agents: Optional[List[Agent]] = None,
# history: Optional[str] = None,
# suggestion: Optional[str] = None
) -> WorkFlowGraph:
agent_generator: AgentGenerator = self.agent_generator
workflow_desc = workflow.get_workflow_description()
agent_generation_action_name = agent_generator.agent_generation_action_name
for subtask in workflow.nodes:
subtask_fields = ["name", "description", "reason", "inputs", "outputs"]
subtask_data = {key: value for key, value in subtask.to_dict(ignore=["class_name"]).items() if key in subtask_fields}
subtask_desc = json.dumps(subtask_data, indent=4)
agent_generation_action_data = {"goal": goal, "workflow": workflow_desc, "task": subtask_desc}
logger.info(f"Generating agents for subtask: {subtask_data['name']}")
agents: AgentGenerationOutput = agent_generator.execute(
action_name=agent_generation_action_name,
action_input_data=agent_generation_action_data,
return_msg_type=MessageType.RESPONSE
).content
# todo I only handle generated agents
generated_agents = []
for agent in agents.generated_agents:
agent_dict = agent.to_dict(ignore=["class_name"])
# agent_dict["llm_config"] = self.llm.config.to_dict()
generated_agents.append(agent_dict)
subtask.set_agents(agents=generated_agents)
return workflow
# def review_plan(self, goal: str, )
def build_workflow_from_plan(self, goal: str, plan: TaskPlanningOutput) -> WorkFlowGraph:
nodes: List[WorkFlowNode] = plan.sub_tasks
# infer edges from sub-tasks' inputs and outputs
edges: List[WorkFlowEdge] = []
for node in nodes:
for another_node in nodes:
if node.name == another_node.name:
continue
node_output_params = [param.name for param in node.outputs]
another_node_input_params = [param.name for param in another_node.inputs]
if any([param in another_node_input_params for param in node_output_params]):
edges.append(WorkFlowEdge(edge_tuple=(node.name, another_node.name)))
workflow = WorkFlowGraph(goal=goal, nodes=nodes, edges=edges)
return workflow