iLOVE2D's picture
Upload 2846 files
5374a2d verified
import asyncio
import inspect
from pydantic import Field
from typing import Type, Optional, Union, Tuple, List, Any, Coroutine
from ..core.module import BaseModule
from ..core.module_utils import generate_id
from ..core.message import Message, MessageType
from ..core.registry import MODEL_REGISTRY
from ..models.model_configs import LLMConfig
from ..models.base_model import BaseLLM
from ..memory.memory import ShortTermMemory
from ..memory.long_term_memory import LongTermMemory
from ..memory.memory_manager import MemoryManager
from ..storages.base import StorageHandler
from ..actions.action import Action
from ..actions.action import ContextExtraction
class Agent(BaseModule):
"""
Base class for all agents.
Attributes:
name (str): Unique identifier for the agent
description (str): Human-readable description of the agent's purpose
llm_config (Optional[LLMConfig]): Configuration for the language model. If provided, a new LLM instance will be created.
Otherwise, the existing LLM instance specified in the `llm` field will be used.
llm (Optional[BaseLLM]): Language model instance. If provided, the existing LLM instance will be used.
agent_id (Optional[str]): Unique ID for the agent, auto-generated if not provided
system_prompt (Optional[str]): System prompt for the Agent.
actions (List[Action]): List of available actions
n (Optional[int]): Number of latest messages used to provide context for action execution. It uses all the messages in short term memory by default.
is_human (bool): Whether this agent represents a human user
version (int): Version number of the agent, default is 0.
"""
name: str # should be unique
description: str
llm_config: Optional[LLMConfig] = None
llm: Optional[BaseLLM] = None
agent_id: Optional[str] = Field(default_factory=generate_id)
system_prompt: Optional[str] = None
short_term_memory: Optional[ShortTermMemory] = Field(default_factory=ShortTermMemory) # store short term memory for a single workflow.
use_long_term_memory: Optional[bool] = False
storage_handler: Optional[StorageHandler] = None
long_term_memory: Optional[LongTermMemory] = None
long_term_memory_manager: Optional[MemoryManager] = None
actions: List[Action] = Field(default=None)
n: int = Field(default=None, description="number of latest messages used to provide context for action execution. It uses all the messages in short term memory by default.")
is_human: bool = Field(default=False)
version: int = 0
def init_module(self):
if not self.is_human:
self.init_llm()
if self.use_long_term_memory:
self.init_long_term_memory()
self.actions = [] if self.actions is None else self.actions
self._action_map = {action.name: action for action in self.actions} if self.actions else dict()
self._save_ignore_fields = ["llm", "llm_config"]
self.init_context_extractor()
# def __call__(self, *args, **kwargs) -> Message:
# """Make the agent callable and automatically choose between sync and async execution"""
# if asyncio.iscoroutinefunction(self.async_execute) and asyncio.get_event_loop().is_running():
# # If the operator is in an asynchronous environment and has an execute_async method, return a coroutine
# return self.async_execute(*args, **kwargs)
# # Otherwise, use the synchronous method
# return self.execute(*args, **kwargs)
def __call__(self, *args: Any, **kwargs: Any) -> Union[dict, Coroutine[Any, Any, dict]]:
"""Make the operator callable and automatically choose between sync and async execution."""
try:
# Safe way to check if we're inside an async environment
asyncio.get_running_loop()
return self.async_execute(*args, **kwargs)
except RuntimeError:
# No running loop — likely in sync context or worker thread
return self.execute(*args, **kwargs)
def _prepare_execution(
self,
action_name: str,
msgs: Optional[List[Message]] = None,
action_input_data: Optional[dict] = None,
**kwargs
) -> Tuple[Action, dict]:
"""Prepare for action execution by updating memory and getting inputs.
Helper method used by both execute and aexecute methods.
Args:
action_name: The name of the action to execute
msgs: Optional list of messages providing context for the action
action_input_data: Optional pre-extracted input data for the action
**kwargs: Additional workflow parameters
Returns:
Tuple containing the action object and input data
Raises:
AssertionError: If neither msgs nor action_input_data is provided
"""
assert msgs is not None or action_input_data is not None, "must provide either `msgs` or `action_input_data`"
action = self.get_action(action_name=action_name)
# update short-term memory
if msgs is not None:
# directly add messages to short-term memory
self.short_term_memory.add_messages(msgs)
if action_input_data is not None:
# create a message from action_input_data and add it to short-term memory
input_message = Message(
content = action_input_data,
next_actions = [action_name],
msg_type = MessageType.INPUT,
wf_goal = kwargs.get("wf_goal", None),
wf_task = kwargs.get("wf_task", None),
wf_task_desc = kwargs.get("wf_task_desc", None)
)
self.short_term_memory.add_message(input_message)
# obtain action input data from short term memory if not provided
action_input_data = action_input_data or self.get_action_inputs(action=action)
return action, action_input_data
def _create_output_message(
self,
action_output,
prompt: str,
action_name: str,
return_msg_type: Optional[MessageType] = MessageType.UNKNOWN,
**kwargs
) -> Message:
"""Create a message from execution results and update memory.
Helper method used by both execute and aexecute methods.
Args:
action_output: The output from action execution
prompt: The prompt used for execution
action_name: The name of the executed action
return_msg_type: Message type for the return message
**kwargs: Additional workflow parameters
Returns:
Message object containing execution results
"""
# formulate a message
message = Message(
content=action_output,
agent=self.name,
action=action_name,
prompt=prompt,
msg_type=return_msg_type,
wf_goal = kwargs.get("wf_goal", None),
wf_task = kwargs.get("wf_task", None),
wf_task_desc = kwargs.get("wf_task_desc", None)
)
# update short-term memory
self.short_term_memory.add_message(message)
return message
async def async_execute(
self,
action_name: str,
msgs: Optional[List[Message]] = None,
action_input_data: Optional[dict] = None,
return_msg_type: Optional[MessageType] = MessageType.UNKNOWN,
return_action_input_data: Optional[bool] = False,
**kwargs
) -> Union[Message, Tuple[Message, dict]]:
"""Execute an action asynchronously with the given context and return results.
This is the async version of the execute method, allowing it to perform actions
based on the current conversation context.
Args:
action_name: The name of the action to execute
msgs: Optional list of messages providing context for the action
action_input_data: Optional pre-extracted input data for the action
return_msg_type: Message type for the return message
**kwargs (Any): Additional parameters, may include workflow information
Returns:
Message: A message containing the execution results
"""
action, action_input_data = self._prepare_execution(
action_name=action_name,
msgs=msgs,
action_input_data=action_input_data,
**kwargs
)
# execute action asynchronously
async_execute_source = inspect.getsource(action.async_execute)
if "NotImplementedError" in async_execute_source:
# if the async_execute method is not implemented, use the execute method instead
execution_results = action.execute(
llm=self.llm,
inputs=action_input_data,
sys_msg=self.system_prompt,
return_prompt=True,
**kwargs
)
else:
execution_results = await action.async_execute(
llm=self.llm,
inputs=action_input_data,
sys_msg=self.system_prompt,
return_prompt=True,
**kwargs
)
action_output, prompt = execution_results
message = self._create_output_message(
action_output=action_output,
prompt=prompt,
action_name=action_name,
return_msg_type=return_msg_type,
**kwargs
)
if return_action_input_data:
return message, action_input_data
return message
def execute(
self,
action_name: str,
msgs: Optional[List[Message]] = None,
action_input_data: Optional[dict] = None,
return_msg_type: Optional[MessageType] = MessageType.UNKNOWN,
return_action_input_data: Optional[bool] = False,
**kwargs
) -> Union[Message, Tuple[Message, dict]]:
"""Execute an action with the given context and return results.
This is the core method for agent functionality, allowing it to perform actions
based on the current conversation context.
Args:
action_name: The name of the action to execute
msgs: Optional list of messages providing context for the action
action_input_data: Optional pre-extracted input data for the action
return_msg_type: Message type for the return message
**kwargs (Any): Additional parameters, may include workflow information
Returns:
Message: A message containing the execution results
"""
action, action_input_data = self._prepare_execution(
action_name=action_name,
msgs=msgs,
action_input_data=action_input_data,
**kwargs
)
# execute action
execution_results = action.execute(
llm=self.llm,
inputs=action_input_data,
sys_msg=self.system_prompt,
return_prompt=True,
**kwargs
)
action_output, prompt = execution_results
message = self._create_output_message(
action_output=action_output,
prompt=prompt,
action_name=action_name,
return_msg_type=return_msg_type,
**kwargs
)
if return_action_input_data:
return message, action_input_data
return message
def init_llm(self):
"""
Initialize the language model for the agent.
"""
# Only initialize LLM if not human and LLM is provided
if not self.is_human and (not self.llm_config and not self.llm):
raise ValueError("must provide `llm_config` or `llm` when `is_human` is False")
if not self.is_human and (self.llm_config or self.llm):
if self.llm_config and not self.llm:
llm_cls = MODEL_REGISTRY.get_model(self.llm_config.llm_type)
self.llm = llm_cls(config=self.llm_config)
if self.llm:
self.llm_config = self.llm.config
# If is_human=True or no LLM provided, self.llm remains None
def init_long_term_memory(self):
"""
Initialize long-term memory components.
"""
assert self.storage_handler is not None, "must provide ``storage_handler`` when use_long_term_memory=True"
# TODO revise the initialisation of long_term_memory and long_term_memory_manager
if not self.long_term_memory:
self.long_term_memory = LongTermMemory()
if not self.long_term_memory_manager:
self.long_term_memory_manager = MemoryManager(
storage_handler=self.storage_handler,
memory=self.long_term_memory
)
def init_context_extractor(self):
"""
Initialize the context extraction action.
"""
cext_action = ContextExtraction()
self.cext_action_name = cext_action.name
self.add_action(cext_action)
def add_action(self, action: Type[Action]):
"""
Add a new action to the agent's available actions.
Args:
action: The action instance to add
"""
action_name = action.name
if action_name in self._action_map:
return
self.actions.append(action)
self._action_map[action_name] = action
def check_action_name(self, action_name: str):
"""
Check if an action name is valid for this agent.
Args:
action_name: Name of the action to check
"""
if action_name not in self._action_map:
raise KeyError(f"'{action_name}' is an invalid action for {self.name}! Available action names: {list(self._action_map.keys())}")
def get_action(self, action_name: str) -> Action:
"""
Retrieves the Action instance associated with the given name.
Args:
action_name: Name of the action to retrieve
Returns:
The Action instance with the specified name
"""
self.check_action_name(action_name=action_name)
return self._action_map[action_name]
def get_action_name(self, action_cls: Type[Action]) -> str:
"""
Searches through the agent's actions to find one matching the specified type.
Args:
action_cls: The Action class type to search for
Returns:
The name of the matching action
"""
for name, action in self._action_map.items():
if isinstance(action, action_cls):
return name
raise ValueError(f"Couldn't find an action that matches Type '{action_cls.__name__}'")
def get_action_inputs(self, action: Action) -> Union[dict, None]:
"""
Uses the context extraction action to determine appropriate inputs
for the specified action based on the conversation history.
Args:
action: The action for which to extract inputs
Returns:
Dictionary of extracted input data, or None if extraction fails
"""
# return the input data of an action.
context = self.short_term_memory.get(n=self.n)
cext_action = self.get_action(self.cext_action_name)
action_inputs = cext_action.execute(llm=self.llm, action=action, context=context)
return action_inputs
def get_all_actions(self) -> List[Action]:
"""Get all actions except the context extraction action.
Returns:
List of Action instances available for execution
"""
actions = [action for action in self.actions if action.name != self.cext_action_name]
return actions
def get_agent_profile(self, action_names: List[str] = None) -> str:
"""Generate a human-readable profile of the agent and its capabilities.
Args:
action_names: Optional list of action names to include in the profile.
If None, all actions are included.
Returns:
A formatted string containing the agent profile
"""
all_actions = self.get_all_actions()
if action_names is None:
# if `action_names` is None, return description of all actions
action_descriptions = "\n".join([f" - {action.name}: {action.description}" for action in all_actions])
else:
# otherwise, only return description of actions that matches `action_names`
action_descriptions = "\n".join([f" - {action.name}: {action.description}" for action in all_actions if action.name in action_names])
profile = f"Agent Name: {self.name}\nDescription: {self.description}\nAvailable Actions:\n{action_descriptions}"
return profile
def clear_short_term_memory(self):
"""
Remove all content from the agent's short-term memory.
"""
pass
def __eq__(self, other: "Agent"):
return self.agent_id == other.agent_id
def __hash__(self):
return self.agent_id
def get_prompts(self) -> dict:
"""
Get all the prompts of the agent.
Returns:
dict: A dictionary with keys in the format 'agent_name::action_name' and values
containing the system_prompt and action prompt.
"""
prompts = {}
for action in self.get_all_actions():
prompts[action.name] = {
"system_prompt": self.system_prompt,
"prompt": action.prompt
}
return prompts
def set_prompt(self, action_name: str, prompt: str, system_prompt: Optional[str] = None) -> bool:
"""
Set the prompt for a specific action of this agent.
Args:
action_name: Name of the action whose prompt should be updated
prompt: New prompt text to set for the action
system_prompt: Optional new system prompt to set for the agent
Returns:
bool: True if the prompt was successfully updated, False otherwise
Raises:
KeyError: If the action_name does not exist for this agent
"""
try:
action = self.get_action(action_name)
action.prompt = prompt
if system_prompt is not None:
self.system_prompt = system_prompt
return True
except KeyError:
raise KeyError(f"Action '{action_name}' not found in agent '{self.name}'")
def set_prompts(self, prompts: dict) -> bool:
"""
Set the prompts for all actions of this agent.
Args:
prompts: A dictionary with keys in the format 'action_name' and values
containing the system_prompt and action prompt.
Returns:
bool: True if the prompts were successfully updated, False otherwise
"""
for action_name, prompt_data in prompts.items():
# self.set_prompt(action_name, prompt_data["prompt"], prompt_data["system_prompt"])
if not isinstance(prompt_data, dict):
raise ValueError(f"Invalid prompt data for action '{action_name}'. Expected a dictionary with 'prompt' and 'system_prompt' (optional) keys.")
if "prompt" not in prompt_data:
raise ValueError(f"Missing 'prompt' key in prompt data for action '{action_name}'.")
self.set_prompt(action_name, prompt_data["prompt"], prompt_data.get("system_prompt", None))
return True
def save_module(self, path: str, ignore: List[str] = [], **kwargs)-> str:
"""Save the agent to persistent storage.
Args:
path: Path where the agent should be saved
ignore: List of field names to exclude from serialization
**kwargs (Any): Additional parameters for the save operation
Returns:
The path where the agent was saved
"""
ignore_fields = self._save_ignore_fields + ignore
super().save_module(path=path, ignore=ignore_fields, **kwargs)
@classmethod
def load_module(cls, path: str, llm_config: LLMConfig = None, **kwargs) -> "Agent":
"""
load the agent from local storage. Must provide `llm_config` when loading the agent from local storage.
Args:
path: The path of the file
llm_config: The LLMConfig instance
Returns:
Agent: The loaded agent instance
"""
agent = super().load_module(path=path, **kwargs)
if llm_config is not None:
agent["llm_config"] = llm_config.to_dict()
return agent
def get_config(self) -> dict:
"""
Get a dictionary containing all necessary configuration to recreate this agent.
Returns:
dict: A configuration dictionary that can be used to initialize a new Agent instance
with the same properties as this one.
"""
config = self.to_dict()
return config