import threading from enum import Enum from typing import Union, Optional, Dict, List from pydantic import Field from copy import deepcopy from .agent import Agent # from .agent_generator import AgentGenerator from .customize_agent import CustomizeAgent from ..core.module import BaseModule from ..core.decorators import atomic_method from ..storages.base import StorageHandler from ..models.model_configs import LLMConfig from ..tools.tool import Toolkit, Tool class AgentState(str, Enum): AVAILABLE = "available" RUNNING = "running" class AgentManager(BaseModule): """ Responsible for creating and managing all Agent objects required for workflow operation. Attributes: storage_handler (StorageHandler): Used to load and save agents from/to storage. agents (List[Agent]): A list to keep track of all managed Agent instances. agent_states (Dict[str, AgentState]): A dictionary to track the state of each Agent by name. """ agents: List[Agent] = Field(default_factory=list) agent_states: Dict[str, AgentState] = Field(default_factory=dict) # agent_name to AgentState mapping storage_handler: Optional[StorageHandler] = None # used to load and save agent from storage. # agent_generator: Optional[AgentGenerator] = None # used to generate agents for a specific subtask tools: Optional[List[Union[Toolkit, Tool]]] = None def init_module(self): self._lock = threading.Lock() self._state_conditions = {} if self.agents: for agent in self.agents: self.agent_states[agent.name] = self.agent_states.get(agent.name, AgentState.AVAILABLE) if agent.name not in self._state_conditions: self._state_conditions[agent.name] = threading.Condition() self.check_agents() def check_agents(self): """Validate agent list integrity and state consistency. Performs thorough validation of the agent manager's internal state: 1. Checks for duplicate agent names 2. Verifies that agent states exist for all agents 3. Ensures agent list and state dictionary sizes match """ # check that the names of self.agents should be unique duplicate_agent_names = self.find_duplicate_agents(self.agents) if duplicate_agent_names: raise ValueError(f"The agents should be unique. Found duplicate agent names: {duplicate_agent_names}!") # check agent states if len(self.agents) != len(self.agent_states): raise ValueError(f"The lengths of self.agents ({len(self.agents)}) and self.agent_states ({len(self.agent_states)}) are different!") missing_agents = self.find_missing_agent_states() if missing_agents: raise ValueError(f"The following agents' states were not found: {missing_agents}") def find_duplicate_agents(self, agents: List[Agent]) -> List[str]: # return the names of duplicate agents based on agent.name unique_agent_names = set() duplicate_agent_names = set() for agent in agents: agent_name = agent.name if agent_name in unique_agent_names: duplicate_agent_names.add(agent_name) unique_agent_names.add(agent_name) return list(duplicate_agent_names) def find_missing_agent_states(self): missing_agents = [agent.name for agent in self.agents if agent.name not in self.agent_states] return missing_agents def list_agents(self) -> List[str]: return [agent.name for agent in self.agents] def has_agent(self, agent_name: str) -> bool: """Check if an agent with the given name exists in the manager. Args: agent_name: The name of the agent to check Returns: True if an agent with the given name exists, False otherwise """ all_agent_names = self.list_agents() return agent_name in all_agent_names @property def size(self): """ Get the total number of agents managed by this manager. """ return len(self.agents) def load_agent(self, agent_name: str, **kwargs) -> Agent: """Load an agent from local storage through storage_handler. Retrieves agent data from storage and creates an Agent instance. Args: agent_name: The name of the agent to load **kwargs (Any): Additional parameters for agent creation Returns: Agent instance with data loaded from storage """ if not self.storage_handler: raise ValueError("must provide ``self.storage_handler`` to use ``load_agent``") agent_data = self.storage_handler.load_agent(agent_name=agent_name) agent: Agent = self.create_customize_agent(agent_data=agent_data) return agent def load_all_agents(self, **kwargs): """Load all agents from storage and add them to the manager. Retrieves all available agents from storage and adds them to the managed agents collection. Args: **kwargs (Any): Additional parameters passed to storage handler """ pass def update_tools(self, agent_data: dict) -> None: """ Update agent_data with tools based on tool_names. Handles four scenarios: 1. Neither tool_names nor tools exist: return directly 2. Only tool_names exists: resolve tool_names to tools and set tools field 3. Only tools exists: return directly (no action needed) 4. Both exist: merge tool_names into existing tools (skip duplicates) Args: agent_data (dict): Agent configuration dictionary that may contain 'tool_names' and/or 'tools' Raises: ValueError: If tool_names exist but self.tools is None, or if requested tools are not found """ tool_names = agent_data.get("tool_names", None) existing_tools = agent_data.get("tools", None) # Case 1: Neither tool_names nor tools exist if not tool_names and not existing_tools: return # Case 3: Only tools exist (no tool_names) if not tool_names and existing_tools: return # For cases 2 and 4: tool_names exists, need to resolve if self.tools is None: raise ValueError( f"Agent requires tools {tool_names}, but no tools are available in AgentManager. " f"Please set self.tools before creating agents with tool_names." ) # Create tool mapping from available tools tool_mapping = {} for tool in self.tools: tool_mapping[tool.name] = tool # Case 2: Only tool_names exists - initialize empty tools list if tool_names and not existing_tools: existing_tools = [] # Case 2 & 4: Process tool_names (either with empty or existing tools list) if tool_names: # Create a set of existing tool names for quick lookup existing_tool_names = {tool.name for tool in existing_tools} tools_to_add = [] missing_tools = [] for tool_name in tool_names: # Skip if tool already exists in tools if tool_name in existing_tool_names: continue # Try to resolve new tool if tool_name in tool_mapping: tools_to_add.append(tool_mapping[tool_name]) else: missing_tools.append(tool_name) if missing_tools: available_tools = list(tool_mapping.keys()) raise ValueError( f"The following tools are not available: {missing_tools}. " f"Available tools: {available_tools}" ) # Merge new tools with existing ones if tools_to_add: agent_data["tools"] = list(existing_tools) + tools_to_add def create_customize_agent(self, agent_data: dict, llm_config: Optional[Union[LLMConfig, dict]]=None, **kwargs) -> CustomizeAgent: """ create a customized agent from the provided `agent_data`. Args: agent_data: The data used to create an Agent instance, must contain 'name', 'description' and 'prompt' keys. llm_config (Optional[LLMConfig]): The LLM configuration to be used for the agent. It will be used as the default LLM for agents without a `llm_config` key. If not provided, the `agent_data` should contain a `llm_config` key. If provided and `agent_data` contains a `llm_config` key, the `llm_config` in `agent_data` will be used. **kwargs (Any): Additional parameters for agent creation Returns: Agent: the instantiated agent instance. """ agent_data = deepcopy(agent_data) agent_llm_config = agent_data.get("llm_config", llm_config) if not agent_data.get("is_human", False) and not agent_llm_config: raise ValueError("`agent_data` should contain a `llm_config` key or `llm_config` should be provided.") if agent_llm_config: if isinstance(agent_llm_config, dict): agent_data["llm_config"] = agent_llm_config elif isinstance(agent_llm_config, LLMConfig): agent_data["llm_config"] = agent_llm_config.to_dict() # tool_mapping = {} # if self.tools is not None: # for tool in self.tools: # tool_mapping[tool.name] = tool # if agent_data.get("tool_names", None): # agent_data["tools"] = [tool_mapping[tool_name] for tool_name in agent_data["tool_names"]] self.update_tools(agent_data=agent_data) # add `tools` field if needed return CustomizeAgent.from_dict(data=agent_data) def get_agent_name(self, agent: Union[str, dict, Agent]) -> str: """Extract agent name from different agent representations. Handles different ways to specify an agent (string name, dictionary, or Agent instance) and extracts the agent name. Args: agent: Agent specified as a string name, dictionary with 'name' key, or Agent instance Returns: The extracted agent name as a string """ if isinstance(agent, str): agent_name = agent elif isinstance(agent, dict): agent_name = agent["name"] elif isinstance(agent, Agent): agent_name = agent.name else: raise ValueError(f"{type(agent)} is not a supported type for ``get_agent_name``. Supported types: [str, dict, Agent].") return agent_name def create_agent(self, agent: Union[str, dict, Agent], llm_config: Optional[LLMConfig]=None, **kwargs) -> Agent: if isinstance(agent, str): if self.storage_handler is None: # if self.storage_handler is None, the agent (str) must exist in self.agents. Otherwise, a dictionary or an Agent instance should be provided. if not self.has_agent(agent_name=agent): raise ValueError(f"Agent ``{agent}`` does not exist! You should provide a dictionary or an Agent instance when ``self.storage_handler`` is not provided.") return self.get_agent(agent_name=agent) else: # if self.storage_handler is not None, the agent (str) must exist in the storage and will be loaded from the storage. agent_instance = self.load_agent(agent_name=agent) elif isinstance(agent, dict): if not agent.get("is_human", False) and (llm_config is None and "llm_config" not in agent): raise ValueError("When providing an agent as a dictionary, you must either include 'llm_config' in the dictionary or provide it as a parameter.") agent_instance = self.create_customize_agent(agent_data=agent, llm_config=llm_config, **kwargs) elif isinstance(agent, Agent): agent_instance = agent else: raise ValueError(f"{type(agent)} is not a supported input type of ``create_agent``. Supported types: [str, dict, Agent].") return agent_instance @atomic_method def add_agent(self, agent: Union[str, dict, Agent], llm_config: Optional[LLMConfig]=None, **kwargs): """ add a single agent, ignore if the agent already exists (judged by the name of an agent). Args: agent: The agent to be added, specified as: - String: Agent name to load from storage - Dictionary: Agent specification to create a CustomizeAgent - Agent: Existing Agent instance to add directly llm_config (Optional[LLMConfig]): The LLM configuration to be used for the agent. Only used when the `agent` is a dictionary, used to create a CustomizeAgent. **kwargs (Any): Additional parameters for agent creation """ # Check for 'tool' key and convert it to 'tools' if needed # if isinstance(agent, dict) and "tool_names" in agent: # tools_mapping = {} # if self.tools is not None: # for tool in self.tools: # tools_mapping[tool.name] = tool # agent["tools"] = [tools_mapping[tool_name] for tool_name in agent["tool_names"]] # agent["tools"] = [tool if isinstance(tool, Toolkit) else Toolkit(name=tool.name, tools=[tool]) for tool in agent["tools"]] agent_name = self.get_agent_name(agent=agent) if self.has_agent(agent_name=agent_name): return agent_instance = self.create_agent(agent=agent, llm_config=llm_config, **kwargs) self.agents.append(agent_instance) self.agent_states[agent_instance.name] = AgentState.AVAILABLE if agent_instance.name not in self._state_conditions: self._state_conditions[agent_instance.name] = threading.Condition() self.check_agents() def add_agents(self, agents: List[Union[str, dict, Agent]], llm_config: Optional[LLMConfig]=None, **kwargs): """ add several agents by using self.add_agent(). """ for agent in agents: self.add_agent(agent=agent, llm_config=llm_config, **kwargs) def add_agents_from_workflow(self, workflow_graph, llm_config: Optional[LLMConfig]=None, **kwargs): """ Initialize agents from the nodes of a given WorkFlowGraph and add these agents to self.agents. Args: workflow_graph (WorkFlowGraph): The workflow graph containing nodes with agents information. llm_config (Optional[LLMConfig]): The LLM configuration to be used for the agents. **kwargs (Any): Additional parameters passed to add_agent """ from ..workflow.workflow_graph import WorkFlowGraph if not isinstance(workflow_graph, WorkFlowGraph): raise TypeError("workflow_graph must be an instance of WorkFlowGraph") for node in workflow_graph.nodes: if node.agents: for agent in node.agents: self.add_agent(agent=agent, llm_config=llm_config, **kwargs) def update_agents_from_workflow(self, workflow_graph, llm_config: Optional[LLMConfig]=None, **kwargs): """ Update agents from a given WorkFlowGraph. Args: workflow_graph (WorkFlowGraph): The workflow graph containing nodes with agents information. llm_config (Optional[LLMConfig]): The LLM configuration to be used for the agents. **kwargs: Additional parameters passed to update_agent """ from ..workflow.workflow_graph import WorkFlowGraph if not isinstance(workflow_graph, WorkFlowGraph): raise TypeError("workflow_graph must be an instance of WorkFlowGraph") for node in workflow_graph.nodes: if node.agents: for agent in node.agents: agent_name = self.get_agent_name(agent=agent) if self.has_agent(agent_name=agent_name): # use the llm_config of the existing agent agent_llm_config = self.get_agent(agent_name).llm_config self.update_agent(agent=agent, llm_config=agent_llm_config, **kwargs) else: self.add_agent(agent=agent, llm_config=llm_config, **kwargs) def get_agent(self, agent_name: str, **kwargs) -> Agent: """Retrieve an agent by its name from managed agents. Searches the list of managed agents for an agent with the specified name. Args: agent_name: The name of the agent to retrieve **kwargs (Any): Additional parameters (unused) Returns: The Agent instance with the specified name """ for agent in self.agents: if agent.name == agent_name: return agent raise ValueError(f"Agent ``{agent_name}`` does not exists!") def update_agent(self, agent: Union[dict, Agent], llm_config: Optional[LLMConfig]=None, **kwargs): """ Update an agent in the manager. Args: agent: The agent to be updated, specified as: - Dictionary: Agent specification to update a CustomizeAgent - Agent: Existing Agent instance to update llm_config (Optional[LLMConfig]): The LLM configuration to be used for the agent. """ agent_name = self.get_agent_name(agent=agent) self.remove_agent(agent_name=agent_name) self.add_agent(agent=agent, llm_config=llm_config, **kwargs) @atomic_method def remove_agent(self, agent_name: str, remove_from_storage: bool=False, **kwargs): """ Remove an agent from the manager and optionally from storage. Args: agent_name: The name of the agent to remove remove_from_storage: If True, also remove the agent from storage **kwargs (Any): Additional parameters passed to storage_handler.remove_agent """ self.agents = [agent for agent in self.agents if agent.name != agent_name] self.agent_states.pop(agent_name, None) self._state_conditions.pop(agent_name, None) if remove_from_storage: self.storage_handler.remove_agent(agent_name=agent_name, **kwargs) self.check_agents() def get_agent_state(self, agent_name: str) -> AgentState: """ Get the state of a specific agent by its name. Args: agent_name: The name of the agent. Returns: AgentState: The current state of the agent. """ return self.agent_states[agent_name] @atomic_method def set_agent_state(self, agent_name: str, new_state: AgentState) -> bool: """ Changes an agent's state and notifies any threads waiting on that agent's state. Thread-safe operation for coordinating multi-threaded agent execution. Args: agent_name: The name of the agent new_state: The new state to set Returns: True if the state was updated successfully, False otherwise """ # if agent_name in self.agent_states and isinstance(new_state, AgentState): # # self.agent_states[agent_name] = new_state # with self._state_conditions[agent_name]: # self.agent_states[agent_name] = new_state # self._state_conditions[agent_name].notify_all() # self.check_agents() # return True # else: # return False if agent_name in self.agent_states and isinstance(new_state, AgentState): if agent_name not in self._state_conditions: self._state_conditions[agent_name] = threading.Condition() with self._state_conditions[agent_name]: self.agent_states[agent_name] = new_state self._state_conditions[agent_name].notify_all() return True return False def get_all_agent_states(self) -> Dict[str, AgentState]: """Get the states of all managed agents. Returns: Dict[str, AgentState]: A dictionary mapping agent names to their states. """ return self.agent_states @atomic_method def save_all_agents(self, **kwargs): """Save all managed agents to persistent storage. Args: **kwargs (Any): Additional parameters passed to the storage handler """ pass @atomic_method def clear_agents(self): """ Remove all agents from the manager. """ self.agents = [] self.agent_states = {} self._state_conditions = {} self.check_agents() def wait_for_agent_available(self, agent_name: str, timeout: Optional[float] = None) -> bool: """Wait for an agent to be available. Args: agent_name: The name of the agent to wait for timeout: Maximum time to wait in seconds, or None to wait indefinitely Returns: True if the agent became available, False if timed out """ if agent_name not in self._state_conditions: self._state_conditions[agent_name] = threading.Condition() condition = self._state_conditions[agent_name] with condition: return condition.wait_for( lambda: self.agent_states.get(agent_name) == AgentState.AVAILABLE, timeout=timeout ) def copy(self) -> "AgentManager": """ Create a shallow copy of the AgentManager. """ return AgentManager(agents=self.agents, storage_handler=self.storage_handler)