selfevolveagent / evoagentx /actions /customize_action.py
iLOVE2D's picture
Upload 2846 files
5374a2d verified
from pydantic import Field
from typing import Optional, Any, Callable, List, Union
import re
import json
import asyncio
import inspect
import concurrent.futures
from ..core.logging import logger
from ..models.base_model import BaseLLM
from .action import Action
from ..core.message import Message
from ..prompts.template import StringTemplate, ChatTemplate
from ..prompts.tool_calling import OUTPUT_EXTRACTION_PROMPT, TOOL_CALLING_TEMPLATE, TOOL_CALLING_HISTORY_PROMPT, TOOL_CALLING_RETRY_PROMPT
from ..tools.tool import Toolkit
from ..core.registry import MODULE_REGISTRY
from ..models.base_model import LLMOutputParser
from ..core.module_utils import parse_json_from_llm_output, parse_json_from_text
class CustomizeAction(Action):
parse_mode: Optional[str] = Field(default="title", description="the parse mode of the action, must be one of: ['title', 'str', 'json', 'xml', 'custom']")
parse_func: Optional[Callable] = Field(default=None, exclude=True, description="the function to parse the LLM output. It receives the LLM output and returns a dict.")
title_format: Optional[str] = Field(default="## {title}", exclude=True, description="the format of the title. It is used when the `parse_mode` is 'title'.")
custom_output_format: Optional[str] = Field(default=None, exclude=True, description="the format of the output. It is used when the `prompt_template` is provided.")
tools: Optional[List[Toolkit]] = Field(default=None, description="The tools that the action can use")
conversation: Optional[Message] = Field(default=None, description="Current conversation state")
max_tool_try: int = Field(default=2, description="Maximum number of tool calling attempts allowed")
def __init__(self, **kwargs):
name = kwargs.pop("name", "CustomizeAction")
description = kwargs.pop("description", "Customized action that can use tools to accomplish its task")
super().__init__(name=name, description=description, **kwargs)
# Validate that at least one of prompt or prompt_template is provided
if not self.prompt and not self.prompt_template:
raise ValueError("`prompt` or `prompt_template` is required when creating CustomizeAction action")
# Prioritize template and give warning if both are provided
if self.prompt and self.prompt_template:
logger.warning("Both `prompt` and `prompt_template` are provided for CustomizeAction action. Prioritizing `prompt_template` and ignoring `prompt`.")
if self.tools:
self.tools_caller = {}
self.add_tools(self.tools)
def prepare_action_prompt(
self,
inputs: Optional[dict] = None,
system_prompt: Optional[str] = None,
**kwargs
) -> Union[str, List[dict]]:
"""Prepare prompt for action execution.
This helper function transforms the input dictionary into a formatted prompt
for the language model, handling different prompting modes.
Args:
inputs: Dictionary of input parameters
system_prompt: Optional system prompt to include
Returns:
Union[str, List[dict]]: Formatted prompt ready for LLM (string or chat messages)
Raises:
TypeError: If an input value type is not supported
ValueError: If neither prompt nor prompt_template is available
"""
# Process inputs into prompt parameter values
if inputs is None:
inputs = {}
prompt_params_names = self.inputs_format.get_attrs()
prompt_params_values = {}
for param in prompt_params_names:
value = inputs.get(param, "")
if isinstance(value, str):
prompt_params_values[param] = value
elif isinstance(value, (dict, list)):
prompt_params_values[param] = json.dumps(value, indent=4)
else:
raise TypeError(f"The input type {type(value)} is invalid! Valid types: [str, dict, list].")
if self.prompt:
prompt = self.prompt.format(**prompt_params_values) if prompt_params_values else self.prompt
if self.tools:
tools_schemas = [j["function"] for i in [tool.get_tool_schemas() for tool in self.tools] for j in i]
prompt += "\n\n" + TOOL_CALLING_TEMPLATE.format(tools_description = tools_schemas)
return prompt
else:
# Use goal-based tool calling mode
if self.tools:
self.prompt_template.set_tools(self.tools)
return self.prompt_template.format(
system_prompt=system_prompt,
values=prompt_params_values,
inputs_format=self.inputs_format,
outputs_format=self.outputs_format,
parse_mode=self.parse_mode,
title_format=self.title_format,
custom_output_format=self.custom_output_format,
tools=self.tools
)
def prepare_extraction_prompt(self, llm_output_content: str) -> str:
"""Prepare extraction prompt for fallback extraction when parsing fails.
Args:
self: The action instance
llm_output_content: Raw output content from LLM
Returns:
str: Formatted extraction prompt
"""
attr_descriptions: dict = self.outputs_format.get_attr_descriptions()
output_description_list = []
for i, (name, desc) in enumerate(attr_descriptions.items()):
output_description_list.append(f"{i+1}. {name}\nDescription: {desc}")
output_description = "\n\n".join(output_description_list)
return OUTPUT_EXTRACTION_PROMPT.format(text=llm_output_content, output_description=output_description)
def _get_unique_class_name(self, candidate_name: str) -> str:
"""
Get a unique class name by checking if it already exists in the registry.
If it does, append "Vx" to make it unique.
"""
if not MODULE_REGISTRY.has_module(candidate_name):
return candidate_name
i = 1
while True:
unique_name = f"{candidate_name}V{i}"
if not MODULE_REGISTRY.has_module(unique_name):
break
i += 1
return unique_name
def add_tools(self, tools: Union[Toolkit, List[Toolkit]]):
if not tools:
return
if isinstance(tools,Toolkit):
tools = [tools]
if not all(isinstance(tool, Toolkit) for tool in tools):
raise TypeError("`tools` must be a Toolkit or list of Toolkit instances.")
if not self.tools:
self.tools_caller = {}
self.tools = []
# self.tools += tools
# tools_callers = [tool.get_tools() for tool in tools]
# tools_callers = [j for i in tools_callers for j in i]
# for tool_caller in tools_callers:
# self.tools_caller[tool_caller.name] = tool_caller
# avoid duplication & type checks
for toolkit in tools:
try:
tool_callers = toolkit.get_tools()
if not isinstance(tool_callers, list):
logger.warning(f"Expected list of tool functions from '{toolkit.name}.get_tools()', got {type(tool_callers)}.")
continue
# add tool callers to the tools_caller dictionary
valid_tools_count = 0
valid_tools_names, valid_tool_callers = [], []
for tool_caller in tool_callers:
tool_caller_name = getattr(tool_caller, "name", None)
if not tool_caller_name or not callable(tool_caller):
logger.warning(f"Invalid tool function in '{toolkit.name}': missing name or not callable.")
continue
if tool_caller_name in self.tools_caller:
logger.warning(f"Duplicate tool function '{tool_caller_name}' detected. Overwriting previous function.")
# self.tools_caller[tool_caller_name] = tool_caller
valid_tools_count += 1
valid_tools_names.append(tool_caller_name)
valid_tool_callers.append(tool_caller)
if valid_tools_count == 0:
logger.info(f"No valid tools found in toolkit '{toolkit.name}'. Skipping.")
continue
if valid_tools_count > 0 and all(name in self.tools_caller for name in valid_tools_names):
logger.info(f"All tools from toolkit '{toolkit.name}' are already added. Skipping.")
continue
if valid_tools_count > 0:
self.tools_caller.update({name: caller for name, caller in zip(valid_tools_names, valid_tool_callers)})
# only add toolkit if at least one valid tool is added and toolkit is not already added
existing_toolkit_names = {tkt.name for tkt in self.tools}
if valid_tools_count > 0 and toolkit.name not in existing_toolkit_names:
self.tools.append(toolkit)
if valid_tools_count > 0:
logger.info(f"Added toolkit '{toolkit.name}' with {valid_tools_count} valid tools in {self.name}: {valid_tools_names}.")
except Exception as e:
logger.error(f"Failed to load tools from toolkit '{toolkit.name}': {e}")
def _extract_tool_calls(self, llm_output: str, llm: Optional[BaseLLM] = None) -> List[dict]:
pattern = r"<ToolCalling>\s*(.*?)\s*</ToolCalling>"
# Find all ToolCalling blocks in the output
matches = re.findall(pattern, llm_output, re.DOTALL)
if not matches:
return []
parsed_tool_calls = []
for match_content in matches:
try:
json_content = match_content.strip()
json_list = parse_json_from_text(json_content)
if not json_list:
logger.warning("No valid JSON found in ToolCalling block")
continue
# Only use the first JSON string from each block
parsed_tool_call = json.loads(json_list[0])
if isinstance(parsed_tool_call, dict):
parsed_tool_calls.append(parsed_tool_call)
elif isinstance(parsed_tool_call, list):
parsed_tool_calls.extend(parsed_tool_call)
else:
logger.warning(f"Invalid tool call format: {parsed_tool_call}")
continue
except (json.JSONDecodeError, IndexError) as e:
logger.warning(f"Failed to parse tool calls from LLM output: {e}")
if llm is not None:
retry_prompt = TOOL_CALLING_RETRY_PROMPT.format(text=match_content)
try:
fixed_output = llm.generate(prompt=retry_prompt).content.strip()
logger.info(f"Retrying tool call parse with fixed output:\n{fixed_output}")
fixed_list = parse_json_from_text(fixed_output)
if fixed_list:
parsed_tool_call = json.loads(fixed_list[0])
if isinstance(parsed_tool_call, dict):
parsed_tool_calls.append(parsed_tool_call)
elif isinstance(parsed_tool_call, list):
parsed_tool_calls.extend(parsed_tool_call)
except Exception as retry_err:
logger.error(f"Retry failed: {retry_err}")
continue
else:
continue
return parsed_tool_calls
def _extract_output(self, llm_output: Any, llm: BaseLLM = None, **kwargs):
# Get the raw output content
llm_output_content = getattr(llm_output, "content", str(llm_output))
# Check if there are any defined output fields
output_attrs = self.outputs_format.get_attrs()
# If no output fields are defined, create a simple content-only output
if not output_attrs:
# Create output with just the content field
output = self.outputs_format.parse(content=llm_output_content)
# print("Created simple content output for agent with no defined outputs:")
# print(output)
return output
# Use the action's parse_mode and parse_func for parsing
try:
# Use the outputs_format's parse method with the action's parse settings
parsed_output = self.outputs_format.parse(
content=llm_output_content,
parse_mode=self.parse_mode,
parse_func=getattr(self, 'parse_func', None),
title_format=getattr(self, 'title_format', "## {title}")
)
# print("Successfully parsed output using action's parse settings:")
# print(parsed_output)
return parsed_output
except Exception as e:
logger.info(f"Failed to parse with action's parse settings: {e}")
logger.info("Falling back to using LLM to extract outputs...")
# Fall back to extraction prompt if direct parsing fails
extraction_prompt = self.prepare_extraction_prompt(llm_output_content)
llm_extracted_output: LLMOutputParser = llm.generate(prompt=extraction_prompt)
llm_extracted_data: dict = parse_json_from_llm_output(llm_extracted_output.content)
output = self.outputs_format.from_dict(llm_extracted_data)
# print("Extracted output using fallback:")
# print(output)
return output
async def _async_extract_output(self, llm_output: Any, llm: BaseLLM = None, **kwargs):
# Get the raw output content
llm_output_content = getattr(llm_output, "content", str(llm_output))
# Check if there are any defined output fields
output_attrs = self.outputs_format.get_attrs()
# If no output fields are defined, create a simple content-only output
if not output_attrs:
# Create output with just the content field
output = self.outputs_format.parse(content=llm_output_content)
# print("Created simple content output for agent with no defined outputs:")
# print(output)
return output
# Use the action's parse_mode and parse_func for parsing
try:
# Use the outputs_format's parse method with the action's parse settings
parsed_output = self.outputs_format.parse(
content=llm_output_content,
parse_mode=self.parse_mode,
parse_func=getattr(self, 'parse_func', None),
title_format=getattr(self, 'title_format', "## {title}")
)
# print("Successfully parsed output using action's parse settings:")
# print(parsed_output)
return parsed_output
except Exception as e:
logger.info(f"Failed to parse with action's parse settings: {e}")
logger.info("Falling back to using LLM to extract outputs...")
# Fall back to extraction prompt if direct parsing fails
extraction_prompt = self.prepare_extraction_prompt(llm_output_content)
llm_extracted_output = await llm.async_generate(prompt=extraction_prompt)
llm_extracted_data: dict = parse_json_from_llm_output(llm_extracted_output.content)
output = self.outputs_format.from_dict(llm_extracted_data)
# print("Extracted output using fallback:")
# print(output)
return output
def _call_single_tool(self, function_param: dict) -> tuple:
try:
function_name = function_param.get("function_name")
function_args = function_param.get("function_args") or {}
if not function_name:
return None, "No function name provided"
callable_fn = self.tools_caller.get(function_name)
if not callable(callable_fn):
return None, f"Function '{function_name}' not found or not callable"
print("_____________________ Start Function Calling _____________________")
print(f"Executing function calling: {function_name} with parameters: {function_args}")
result = callable_fn(**function_args)
return result, None
except Exception as e:
logger.error(f"Error executing tool {function_name}: {e}")
return None, f"Error executing tool {function_name}: {str(e)}"
def _calling_tools(self, tool_call_args: List[dict]) -> dict:
## ___________ Call the tools in parallel___________
errors = []
results = []
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_tool = {executor.submit(self._call_single_tool, param): param for param in tool_call_args}
for future in concurrent.futures.as_completed(future_to_tool):
result, error = future.result()
if error:
errors.append(error)
if result is not None:
results.append(result)
return {"result": results, "error": errors}
async def _async_call_single_tool(self, function_param: dict) -> tuple:
try:
function_name = function_param.get("function_name")
function_args = function_param.get("function_args") or {}
if not function_name:
return None, "No function name provided"
callable_fn = self.tools_caller.get(function_name)
if not callable(callable_fn):
return None, f"Function '{function_name}' not found or not callable"
print("_____________________ Start Function Calling _____________________")
print(f"Executing function calling: {function_name} with parameters: {function_args}")
if inspect.iscoroutinefunction(callable_fn):
result = await callable_fn(**function_args)
else:
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, lambda: callable_fn(**function_args))
return result, None
except Exception as e:
logger.error(f"Error executing tool {function_name}: {e}")
return None, f"Error executing tool {function_name}: {str(e)}"
async def _async_calling_tools(self, tool_call_args: List[dict]) -> dict:
## ___________ Call the tools concurrently ___________
tasks = [self._async_call_single_tool(param) for param in tool_call_args]
results_with_errors = await asyncio.gather(*tasks)
results = [res for res, err in results_with_errors if err is None and res is not None]
errors = [err for _, err in results_with_errors if err is not None]
return {"result": results, "error": errors}
def execute(self, llm: Optional[BaseLLM] = None, inputs: Optional[dict] = None, sys_msg: Optional[str]=None, return_prompt: bool = False, time_out = 0, **kwargs):
# Allow empty inputs if the action has no required input attributes
input_attributes: dict = self.inputs_format.get_attr_descriptions()
if not inputs and input_attributes:
logger.error("CustomizeAction action received invalid `inputs`: None or empty.")
raise ValueError('The `inputs` to CustomizeAction action is None or empty.')
# Set inputs to empty dict if None and no inputs are required
if inputs is None:
inputs = {}
final_llm_response = None
if self.prompt_template:
if isinstance(self.prompt_template, ChatTemplate):
# must determine whether prompt_template is ChatTemplate first since ChatTemplate is a subclass of StringTemplate
conversation = self.prepare_action_prompt(inputs=inputs, system_prompt=sys_msg)
elif isinstance(self.prompt_template, StringTemplate):
conversation = [{"role": "system", "content": self.prepare_action_prompt(inputs=inputs, system_prompt=sys_msg)}]
else:
raise ValueError(f"`prompt_template` must be a StringTemplate or ChatTemplate instance, but got {type(self.prompt_template)}")
else:
conversation = [{"role": "system", "content": sys_msg}, {"role": "user", "content": self.prepare_action_prompt(inputs=inputs, system_prompt=sys_msg)}]
## 1. get all the input parameters
prompt_params_values = {k: inputs.get(k, "") for k in input_attributes.keys()}
while True:
### Generate response from LLM
if time_out > self.max_tool_try:
# Get the appropriate prompt for return
current_prompt = self.prepare_action_prompt(inputs=prompt_params_values or {})
# Use the final LLM response if available, otherwise fall back to execution history
content_to_extract = final_llm_response if final_llm_response is not None else "{content}".format(content = conversation)
if return_prompt:
return self._extract_output(content_to_extract, llm = llm), current_prompt
return self._extract_output(content_to_extract, llm = llm)
time_out += 1
# Handle both string prompts and chat message lists
llm_response = llm.generate(messages=conversation)
conversation.append({"role": "assistant", "content": llm_response.content})
# Store the final LLM response
final_llm_response = llm_response
tool_call_args = self._extract_tool_calls(llm_response.content)
if not tool_call_args:
break
logger.info("Extracted tool call args:")
logger.info(json.dumps(tool_call_args, indent=4))
results = self._calling_tools(tool_call_args)
logger.info("Tool call results:")
logger.info(json.dumps(results, indent=4))
conversation.append({"role": "assistant", "content": TOOL_CALLING_HISTORY_PROMPT.format(
iteration_number=time_out,
tool_call_args=f"{tool_call_args}",
results=f"{results}"
)})
# Get the appropriate prompt for return
current_prompt = self.prepare_action_prompt(inputs=prompt_params_values or {})
# Use the final LLM response if available, otherwise fall back to execution history
content_to_extract = final_llm_response if final_llm_response is not None else "{content}".format(content = conversation)
if return_prompt:
return self._extract_output(content_to_extract, llm = llm), current_prompt
return self._extract_output(content_to_extract, llm = llm)
async def async_execute(self, llm: Optional[BaseLLM] = None, inputs: Optional[dict] = None, sys_msg: Optional[str]=None, return_prompt: bool = False, time_out = 0, **kwargs):
# Allow empty inputs if the action has no required input attributes
input_attributes: dict = self.inputs_format.get_attr_descriptions()
if not inputs and input_attributes:
logger.error("CustomizeAction action received invalid `inputs`: None or empty.")
raise ValueError('The `inputs` to CustomizeAction action is None or empty.')
# Set inputs to empty dict if None and no inputs are required
if inputs is None:
inputs = {}
final_llm_response = None
if self.prompt_template:
if isinstance(self.prompt_template, ChatTemplate):
# must determine whether prompt_template is ChatTemplate first since ChatTemplate is a subclass of StringTemplate
conversation = self.prepare_action_prompt(inputs=inputs, system_prompt=sys_msg)
elif isinstance(self.prompt_template, StringTemplate):
conversation = [{"role": "system", "content": self.prepare_action_prompt(inputs=inputs, system_prompt=sys_msg)}]
else:
raise ValueError(f"`prompt_template` must be a StringTemplate or ChatTemplate instance, but got {type(self.prompt_template)}")
else:
conversation = [{"role": "system", "content": sys_msg}, {"role": "user", "content": self.prepare_action_prompt(inputs=inputs, system_prompt=sys_msg)}]
## 1. get all the input parameters
prompt_params_values = {k: inputs.get(k, "") for k in input_attributes.keys()}
while True:
### Generate response from LLM
if time_out > self.max_tool_try:
# Get the appropriate prompt for return
current_prompt = self.prepare_action_prompt(inputs=prompt_params_values or {})
# Use the final LLM response if available, otherwise fall back to execution history
content_to_extract = final_llm_response if final_llm_response is not None else "{content}".format(content = conversation)
if return_prompt:
return await self._async_extract_output(content_to_extract, llm = llm), current_prompt
return await self._async_extract_output(content_to_extract, llm = llm)
time_out += 1
# Handle both string prompts and chat message lists
llm_response = await llm.async_generate(messages=conversation)
conversation.append({"role": "assistant", "content": llm_response.content})
# Store the final LLM response
final_llm_response = llm_response
tool_call_args = self._extract_tool_calls(llm_response.content)
if not tool_call_args:
break
logger.info("Extracted tool call args:")
logger.info(json.dumps(tool_call_args, indent=4))
results = self._calling_tools(tool_call_args)
logger.info("Tool call results:")
try:
logger.info(json.dumps(results, indent=4))
except Exception:
logger.info(str(results))
conversation.append({"role": "assistant", "content": TOOL_CALLING_HISTORY_PROMPT.format(
iteration_number=time_out,
tool_call_args=f"{tool_call_args}",
results=f"{results}"
)})
# Get the appropriate prompt for return
current_prompt = self.prepare_action_prompt(inputs=prompt_params_values or {})
# Use the final LLM response if available, otherwise fall back to execution history
content_to_extract = final_llm_response if final_llm_response is not None else "{content}".format(content = conversation)
if return_prompt:
return await self._async_extract_output(content_to_extract, llm = llm), current_prompt
return await self._async_extract_output(content_to_extract, llm = llm)