| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import re |
| import uuid |
| from collections import defaultdict |
| from typing import ( |
| TYPE_CHECKING, |
| Any, |
| Callable, |
| Dict, |
| List, |
| Optional, |
| Tuple, |
| Type, |
| Union, |
| ) |
|
|
| from openai.types.chat import ChatCompletionMessageToolCall |
| from openai.types.chat.chat_completion_message_tool_call import Function |
| from pydantic import BaseModel, ValidationError |
|
|
| from camel.agents.base import BaseAgent |
| from camel.memories import ( |
| AgentMemory, |
| ChatHistoryMemory, |
| MemoryRecord, |
| ScoreBasedContextCreator, |
| ) |
| from camel.messages import BaseMessage, FunctionCallingMessage, OpenAIMessage |
| from camel.models import ( |
| BaseModelBackend, |
| ModelFactory, |
| ModelManager, |
| ModelProcessingError, |
| ) |
| from camel.responses import ChatAgentResponse |
| from camel.types import ( |
| ChatCompletion, |
| ChatCompletionChunk, |
| ModelPlatformType, |
| ModelType, |
| OpenAIBackendRole, |
| RoleType, |
| ) |
| from camel.utils import ( |
| func_string_to_callable, |
| generate_prompt_for_structured_output, |
| get_model_encoding, |
| get_pydantic_object_schema, |
| json_to_function_code, |
| ) |
|
|
| if TYPE_CHECKING: |
| from openai import Stream |
|
|
| from camel.terminators import ResponseTerminator |
| from camel.toolkits import FunctionTool |
|
|
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| try: |
| import os |
|
|
| if os.getenv("AGENTOPS_API_KEY") is not None: |
| from agentops import track_agent |
| else: |
| raise ImportError |
| except (ImportError, AttributeError): |
| from camel.utils import track_agent |
|
|
|
|
| class FunctionCallingRecord(BaseModel): |
| r"""Historical records of functions called in the conversation. |
| |
| Attributes: |
| func_name (str): The name of the function being called. |
| args (Dict[str, Any]): The dictionary of arguments passed to |
| the function. |
| result (Any): The execution result of calling this function. |
| tool_call_id (str): The ID of the tool call, if available. |
| """ |
|
|
| func_name: str |
| args: Dict[str, Any] |
| result: Any |
| tool_call_id: str |
|
|
| def __str__(self) -> str: |
| r"""Overridden version of the string function. |
| |
| Returns: |
| str: Modified string to represent the function calling. |
| """ |
| return ( |
| f"Function Execution: {self.func_name}\n" |
| f"\tArgs: {self.args}\n" |
| f"\tResult: {self.result}\n" |
| ) |
|
|
| def as_dict(self) -> dict[str, Any]: |
| r"""Returns the function calling record as a dictionary. |
| |
| Returns: |
| dict[str, Any]: The function calling record as a dictionary. |
| """ |
| return self.model_dump() |
|
|
|
|
| @track_agent(name="ChatAgent") |
| class ChatAgent(BaseAgent): |
| r"""Class for managing conversations of CAMEL Chat Agents. |
| |
| Args: |
| system_message (Union[BaseMessage, str], optional): The system message |
| for the chat agent. |
| model (BaseModelBackend, optional): The model backend to use for |
| generating responses. (default: :obj:`ModelPlatformType.DEFAULT` |
| with `ModelType.DEFAULT`) |
| memory (AgentMemory, optional): The agent memory for managing chat |
| messages. If `None`, a :obj:`ChatHistoryMemory` will be used. |
| (default: :obj:`None`) |
| message_window_size (int, optional): The maximum number of previous |
| messages to include in the context window. If `None`, no windowing |
| is performed. (default: :obj:`None`) |
| token_limit (int, optional): The maximum number of tokens in a context. |
| The context will be automatically pruned to fulfill the limitation. |
| If `None`, it will be set according to the backend model. |
| (default: :obj:`None`) |
| output_language (str, optional): The language to be output by the |
| agent. (default: :obj:`None`) |
| tools (Optional[List[Union[FunctionTool, Callable]]], optional): List |
| of available :obj:`FunctionTool` or :obj:`Callable`. (default: |
| :obj:`None`) |
| external_tools (Optional[List[Union[FunctionTool, Callable]]], |
| optional): List of external tools (:obj:`FunctionTool` or or |
| :obj:`Callable`) bind to one chat agent. When these tools are |
| called, the agent will directly return the request instead of |
| processing it. (default: :obj:`None`) |
| response_terminators (List[ResponseTerminator], optional): List of |
| :obj:`ResponseTerminator` bind to one chat agent. |
| (default: :obj:`None`) |
| scheduling_strategy (str): name of function that defines how to select |
| the next model in ModelManager. (default: :str:`round_robin`) |
| single_iteration (bool): Whether to let the agent perform only one |
| model calling at each step. (default: :obj:`False`) |
| """ |
|
|
| def __init__( |
| self, |
| system_message: Optional[Union[BaseMessage, str]] = None, |
| model: Optional[ |
| Union[BaseModelBackend, List[BaseModelBackend]] |
| ] = None, |
| memory: Optional[AgentMemory] = None, |
| message_window_size: Optional[int] = None, |
| token_limit: Optional[int] = None, |
| output_language: Optional[str] = None, |
| tools: Optional[List[Union[FunctionTool, Callable]]] = None, |
| external_tools: Optional[List[Union[FunctionTool, Callable]]] = None, |
| response_terminators: Optional[List[ResponseTerminator]] = None, |
| scheduling_strategy: str = "round_robin", |
| single_iteration: bool = False, |
| ) -> None: |
| |
| if isinstance(system_message, str): |
| system_message = BaseMessage.make_assistant_message( |
| role_name='Assistant', content=system_message |
| ) |
|
|
| self.orig_sys_message: Optional[BaseMessage] = system_message |
| self._system_message: Optional[BaseMessage] = system_message |
| self.role_name: str = ( |
| getattr(system_message, 'role_name', None) or "assistant" |
| ) |
| self.role_type: RoleType = ( |
| getattr(system_message, 'role_type', None) or RoleType.ASSISTANT |
| ) |
| self.model_backend = ModelManager( |
| model |
| if model is not None |
| else ModelFactory.create( |
| model_platform=ModelPlatformType.DEFAULT, |
| model_type=ModelType.DEFAULT, |
| ), |
| scheduling_strategy=scheduling_strategy, |
| ) |
| self.model_type = self.model_backend.model_type |
|
|
| |
| self.tools: List[FunctionTool] = ( |
| self._initialize_tools(tools) if tools else [] |
| ) |
| self.external_tools: List[FunctionTool] = ( |
| self._initialize_tools(external_tools) if external_tools else [] |
| ) |
| self.external_tool_names: List[str] = [ |
| tool.get_function_name() for tool in self.external_tools |
| ] |
| self.all_tools = self.tools + self.external_tools or [] |
|
|
| |
| self.tool_dict = { |
| tool.get_function_name(): tool for tool in self.all_tools |
| } |
|
|
| |
| |
| if self.all_tools: |
| logger.warning( |
| "Overriding the configured tools in `BaseModelBackend` with the tools from `ChatAgent`." |
| ) |
| tool_schema_list = [ |
| tool.get_openai_tool_schema() for tool in self.all_tools |
| ] |
| self.model_backend.model_config_dict['tools'] = tool_schema_list |
|
|
| self.model_token_limit = token_limit or self.model_backend.token_limit |
| context_creator = ScoreBasedContextCreator( |
| self.model_backend.token_counter, |
| self.model_token_limit, |
| ) |
| self.memory: AgentMemory = memory or ChatHistoryMemory( |
| context_creator, window_size=message_window_size |
| ) |
|
|
| self.output_language: Optional[str] = output_language |
| if self.output_language is not None: |
| self.set_output_language(self.output_language) |
|
|
| self.terminated: bool = False |
| self.response_terminators = response_terminators or [] |
| self.init_messages() |
| self.tool_prompt_added = False |
| self.single_iteration = single_iteration |
|
|
| def _initialize_tools( |
| self, tools: List[Union[FunctionTool, Callable]] |
| ) -> List[FunctionTool]: |
| r"""Helper method to initialize tools as FunctionTool instances.""" |
| from camel.toolkits import FunctionTool |
|
|
| func_tools = [] |
| for tool in tools: |
| if not isinstance(tool, FunctionTool): |
| tool = FunctionTool(tool) |
| func_tools.append(tool) |
| return func_tools |
|
|
| def add_tool( |
| self, tool: Union[FunctionTool, Callable], is_external: bool = False |
| ) -> None: |
| r"""Add a tool to the agent, specifying if it's an external tool.""" |
| |
| initialized_tool = self._initialize_tools([tool]) |
|
|
| |
| if is_external: |
| self.external_tools = self.external_tools + initialized_tool |
| self.external_tool_names.extend( |
| tool.get_function_name() for tool in initialized_tool |
| ) |
| else: |
| self.tools = self.tools + initialized_tool |
|
|
| |
| self.all_tools = self.tools + self.external_tools |
| self.tool_dict = { |
| tool.get_function_name(): tool for tool in self.all_tools |
| } |
|
|
| tool_schema_list = [ |
| tool.get_openai_tool_schema() for tool in self.all_tools |
| ] |
| self.model_backend.model_config_dict['tools'] = tool_schema_list |
|
|
| def remove_tool(self, tool_name: str, is_external: bool = False) -> bool: |
| r"""Remove a tool by name, specifying if it's an external tool.""" |
| tool_list = self.external_tools if is_external else self.tools |
| if not tool_list: |
| return False |
|
|
| for tool in tool_list: |
| if tool.get_function_name() == tool_name: |
| tool_list.remove(tool) |
| if is_external: |
| self.external_tool_names.remove(tool_name) |
| |
| self.all_tools = (self.tools or []) + ( |
| self.external_tools or [] |
| ) |
| self.tool_dict = { |
| tool.get_function_name(): tool for tool in self.all_tools |
| } |
| tool_schema_list = [ |
| tool.get_openai_tool_schema() for tool in self.all_tools |
| ] |
| self.model_backend.model_config_dict['tools'] = ( |
| tool_schema_list |
| ) |
| return True |
| return False |
|
|
| def list_tools(self) -> dict: |
| r"""List all tools, separated into normal and external tools.""" |
| normal_tools = [ |
| tool.get_function_name() for tool in (self.tools or []) |
| ] |
| external_tools = [ |
| tool.get_function_name() for tool in (self.external_tools or []) |
| ] |
|
|
| return {"normal_tools": normal_tools, "external_tools": external_tools} |
|
|
| |
| def _generate_tool_prompt(self, tool_schema_list: List[Dict]) -> str: |
| r"""Generates a tool prompt based on the provided tool schema list. |
| |
| Args: |
| tool_schema_list (List[Dict]): A list of dictionaries, each |
| containing a tool schema. |
| |
| Returns: |
| str: A string representing the tool prompt. |
| """ |
| tool_prompts = [] |
|
|
| for tool in tool_schema_list: |
| tool_info = tool['function'] |
| tool_name = tool_info['name'] |
| tool_description = tool_info['description'] |
| tool_json = json.dumps(tool_info, indent=4) |
|
|
| prompt = f"Use the function '{tool_name}' to '{tool_description}':\n{tool_json}\n" |
| tool_prompts.append(prompt) |
|
|
| tool_prompt_str = "\n".join(tool_prompts) |
|
|
| final_prompt = f""" |
| You have access to the following functions: |
| |
| {tool_prompt_str} |
| |
| If you choose to call a function ONLY reply in the following format with no |
| prefix or suffix: |
| |
| <function=example_function_name>{{"example_name": "example_value"}}</function> |
| |
| Reminder: |
| - Function calls MUST follow the specified format, start with <function= and end with </function> |
| - Required parameters MUST be specified |
| - Only call one function at a time |
| - Put the entire function call reply on one line |
| - If there is no function call available, answer the question like normal |
| with your current knowledge and do not tell the user about function calls |
| """ |
| return final_prompt |
|
|
| def _parse_tool_response(self, response: str): |
| r"""Parses the tool response to extract the function name and |
| arguments. |
| |
| Args: |
| response (str): The response from the model containing the |
| function call. |
| |
| Returns: |
| Optional[Dict[str, Any]]: The parsed function name and arguments |
| if found, otherwise :obj:`None`. |
| """ |
| function_regex = r"<function=(\w+)>(.*?)</function>" |
| match = re.search(function_regex, response) |
|
|
| if match: |
| function_name, args_string = match.groups() |
| try: |
| args = json.loads(args_string) |
| return {"function": function_name, "arguments": args} |
| except json.JSONDecodeError as error: |
| logger.error(f"Error parsing function arguments: {error}") |
| return None |
| return None |
|
|
| def reset(self): |
| r"""Resets the :obj:`ChatAgent` to its initial state.""" |
| self.terminated = False |
| self.init_messages() |
| for terminator in self.response_terminators: |
| terminator.reset() |
|
|
| @property |
| def system_message(self) -> Optional[BaseMessage]: |
| r"""The getter method for the property :obj:`system_message`. |
| |
| Returns: |
| Optional[BaseMessage]: The system message of this agent if set, |
| else :obj:`None`. |
| """ |
| return self._system_message |
|
|
| @system_message.setter |
| def system_message(self, message: BaseMessage) -> None: |
| r"""The setter method for the property :obj:`system_message`. |
| |
| Args: |
| message (BaseMessage): The message to be set as the |
| new system message of this agent. |
| """ |
| self._system_message = message |
|
|
| def is_tools_added(self) -> bool: |
| r"""Whether tool calling is enabled for this agent. |
| |
| Returns: |
| bool: Whether tool calling is enabled for this agent, determined |
| by whether the dictionary of tools is empty. |
| """ |
| return len(self.tool_dict) > 0 |
|
|
| def update_memory( |
| self, message: BaseMessage, role: OpenAIBackendRole |
| ) -> None: |
| r"""Updates the agent memory with a new message. |
| |
| Args: |
| message (BaseMessage): The new message to add to the stored |
| messages. |
| role (OpenAIBackendRole): The backend role type. |
| """ |
| self.memory.write_record( |
| MemoryRecord(message=message, role_at_backend=role) |
| ) |
|
|
| def set_output_language(self, output_language: str) -> BaseMessage: |
| r"""Sets the output language for the system message. This method |
| updates the output language for the system message. The output |
| language determines the language in which the output text should be |
| generated. |
| |
| Args: |
| output_language (str): The desired output language. |
| |
| Returns: |
| BaseMessage: The updated system message object. |
| """ |
| self.output_language = output_language |
| language_prompt = ( |
| "\nRegardless of the input language, " |
| f"you must output text in {output_language}." |
| ) |
| if self.orig_sys_message is not None: |
| content = self.orig_sys_message.content + language_prompt |
| self._system_message = self.orig_sys_message.create_new_instance( |
| content |
| ) |
| else: |
| self._system_message = BaseMessage.make_assistant_message( |
| role_name="Assistant", |
| content=language_prompt, |
| ) |
|
|
| system_record = MemoryRecord( |
| message=self._system_message, |
| role_at_backend=OpenAIBackendRole.SYSTEM, |
| ) |
| self.memory.clear() |
| self.memory.write_record(system_record) |
| return self._system_message |
|
|
| def get_info( |
| self, |
| session_id: Optional[str], |
| usage: Optional[Dict[str, int]], |
| termination_reasons: List[str], |
| num_tokens: int, |
| tool_calls: List[FunctionCallingRecord], |
| external_tool_request: Optional[ChatCompletionMessageToolCall] = None, |
| ) -> Dict[str, Any]: |
| r"""Returns a dictionary containing information about the chat session. |
| |
| Args: |
| session_id (str, optional): The ID of the chat session. |
| usage (Dict[str, int], optional): Information about the usage of |
| the LLM. |
| termination_reasons (List[str]): The reasons for the termination |
| of the chat session. |
| num_tokens (int): The number of tokens used in the chat session. |
| tool_calls (List[FunctionCallingRecord]): The list of function |
| calling records, containing the information of called tools. |
| external_tool_request |
| (Optional[ChatCompletionMessageToolCall], optional): |
| The tool calling request of external tools from the model. |
| These requests are directly returned to the user instead of |
| being processed by the agent automatically. |
| (default: :obj:`None`) |
| |
| Returns: |
| Dict[str, Any]: The chat session information. |
| """ |
| return { |
| "id": session_id, |
| "usage": usage, |
| "termination_reasons": termination_reasons, |
| "num_tokens": num_tokens, |
| "tool_calls": tool_calls, |
| "external_tool_request": external_tool_request, |
| } |
|
|
| def init_messages(self) -> None: |
| r"""Initializes the stored messages list with the current system |
| message. |
| """ |
| if self._system_message is not None: |
| system_record = MemoryRecord( |
| message=self._system_message, |
| role_at_backend=OpenAIBackendRole.SYSTEM, |
| ) |
| self.memory.clear() |
| self.memory.write_record(system_record) |
| else: |
| self.memory.clear() |
|
|
| def record_message(self, message: BaseMessage) -> None: |
| r"""Records the externally provided message into the agent memory as if |
| it were an answer of the :obj:`ChatAgent` from the backend. Currently, |
| the choice of the critic is submitted with this method. |
| |
| Args: |
| message (BaseMessage): An external message to be recorded in the |
| memory. |
| """ |
| self.update_memory(message, OpenAIBackendRole.ASSISTANT) |
|
|
| def step( |
| self, |
| input_message: Union[BaseMessage, str], |
| response_format: Optional[Type[BaseModel]] = None, |
| ) -> ChatAgentResponse: |
| r"""Executes a single step in the chat session, generating a response |
| to the input message. |
| |
| Args: |
| input_message (Union[BaseMessage, str]): The input message for the |
| agent. If provided as a BaseMessage, the `role` is adjusted to |
| `user` to indicate an external message. |
| response_format (Optional[Type[BaseModel]], optional): A Pydantic |
| model defining the expected structure of the response. Used to |
| generate a structured response if provided. (default: |
| :obj:`None`) |
| |
| Returns: |
| ChatAgentResponse: Contains output messages, a termination status |
| flag, and session information. |
| """ |
|
|
| if ( |
| self.model_backend.model_config_dict.get("response_format") |
| and response_format |
| ): |
| raise ValueError( |
| "The `response_format` parameter cannot be set both in " |
| "the model configuration and in the ChatAgent step." |
| ) |
|
|
| self.original_model_dict = self.model_backend.model_config_dict |
| model_response_format_modified = False |
| if ( |
| response_format |
| and self.model_type.support_native_structured_output |
| ): |
| self.model_backend.model_config_dict = ( |
| self.original_model_dict.copy() |
| ) |
| self.model_backend.model_config_dict["response_format"] = ( |
| response_format |
| ) |
| model_response_format_modified = True |
|
|
| |
| if isinstance(input_message, str): |
| input_message = BaseMessage.make_user_message( |
| role_name='User', content=input_message |
| ) |
|
|
| |
| if ( |
| self.is_tools_added() |
| and not self.model_type.support_native_tool_calling |
| and not self.tool_prompt_added |
| ): |
| self._inject_tool_prompt() |
|
|
| |
| self.update_memory(input_message, OpenAIBackendRole.USER) |
|
|
| try: |
| return self._handle_step(response_format, self.single_iteration) |
| finally: |
| if model_response_format_modified: |
| |
| self.model_backend.model_config_dict = self.original_model_dict |
|
|
| def _inject_tool_prompt(self) -> None: |
| r"""Generate and add the tool prompt to memory.""" |
| tool_prompt = self._generate_tool_prompt( |
| self.model_backend.model_config_dict["tools"] |
| ) |
| tool_msg = BaseMessage.make_assistant_message( |
| role_name="Assistant", content=tool_prompt |
| ) |
| self.update_memory(tool_msg, OpenAIBackendRole.SYSTEM) |
| self.tool_prompt_added = True |
|
|
| def _handle_step( |
| self, |
| response_format: Optional[Type[BaseModel]], |
| single_step: bool, |
| ) -> ChatAgentResponse: |
| r"""Handles a single or multi-step interaction.""" |
|
|
| if ( |
| self.model_backend.model_config_dict.get("tool_choice") |
| == "required" |
| and not single_step |
| ): |
| raise ValueError( |
| "`tool_choice` cannot be set to `required` for multi-step" |
| " mode. To proceed, set `single_iteration` to `True`." |
| ) |
|
|
| |
| tool_call_records: List[FunctionCallingRecord] = [] |
|
|
| external_tool_request = None |
|
|
| while True: |
| try: |
| openai_messages, num_tokens = self.memory.get_context() |
| except RuntimeError as e: |
| self.model_backend.model_config_dict = self.original_model_dict |
| return self._step_token_exceed( |
| e.args[1], tool_call_records, "max_tokens_exceeded" |
| ) |
|
|
| |
| inject_prompt_for_structured_output = ( |
| response_format |
| and not self.model_type.support_native_structured_output |
| ) |
|
|
| if inject_prompt_for_structured_output: |
| |
| usr_msg = openai_messages.pop() |
| usr_msg["content"] = generate_prompt_for_structured_output( |
| response_format, |
| usr_msg["content"], |
| ) |
| openai_messages.append(usr_msg) |
|
|
| |
| ( |
| response, |
| output_messages, |
| finish_reasons, |
| usage_dict, |
| response_id, |
| ) = self._step_model_response(openai_messages, num_tokens) |
|
|
| |
| if inject_prompt_for_structured_output and isinstance( |
| response, ChatCompletion |
| ): |
| content = response.choices[0].message.content |
| try: |
| json_content = json.loads(str(content)) |
| output_messages[0].parsed = response_format(**json_content) |
| except json.JSONDecodeError as e: |
| logger.error( |
| f"Failed in parsing the output into JSON: {e}" |
| ) |
| output_messages[0].parsed = None |
| except ValidationError as e: |
| logger.warning( |
| "Successfully generating JSON response, " |
| "but failed in parsing it into Pydantic object :" |
| f"{e}, return the JSON response in parsed field" |
| ) |
| output_messages[0].parsed = json_content |
|
|
| |
| if self._is_standard_response(response): |
| break |
|
|
| |
| tool_request = self._extract_tool_call(response) |
| if isinstance(response, ChatCompletion) and tool_request: |
| response.choices[0].message.tool_calls = [tool_request] |
| tool_call_records.append( |
| self._step_tool_call_and_update(response) |
| ) |
|
|
| if tool_request.function.name in self.external_tool_names: |
| external_tool_request = tool_request |
| info = self._step_get_info( |
| output_messages, |
| finish_reasons, |
| usage_dict, |
| response_id, |
| tool_call_records, |
| num_tokens, |
| tool_request, |
| ) |
| self._log_final_output(output_messages) |
| self.model_backend.model_config_dict = ( |
| self.original_model_dict |
| ) |
| return ChatAgentResponse( |
| msgs=output_messages, |
| terminated=self.terminated, |
| info=info, |
| ) |
|
|
| |
| if single_step: |
| break |
|
|
| |
| if ( |
| response_format |
| and not inject_prompt_for_structured_output |
| and self.model_type |
| not in { |
| "gpt-4o", |
| "gpt-4o-mini", |
| } |
| ): |
| ( |
| output_messages, |
| finish_reasons, |
| usage_dict, |
| response_id, |
| tool_call, |
| num_tokens, |
| ) = self._structure_output_with_function(response_format) |
| tool_call_records.append(tool_call) |
|
|
| |
| info = self._step_get_info( |
| output_messages, |
| finish_reasons, |
| usage_dict, |
| response_id, |
| tool_call_records, |
| num_tokens, |
| external_tool_request, |
| ) |
| self._log_final_output(output_messages) |
| self.model_backend.model_config_dict = self.original_model_dict |
| return ChatAgentResponse( |
| msgs=output_messages, terminated=self.terminated, info=info |
| ) |
|
|
| def _extract_tool_call( |
| self, response: Any |
| ) -> Optional[ChatCompletionMessageToolCall]: |
| r"""Extract the tool call from the model response, if present. |
| |
| Args: |
| response (Any): The model's response object. |
| |
| Returns: |
| Optional[ChatCompletionMessageToolCall]: The parsed tool call if |
| present, otherwise None. |
| """ |
| |
| if ( |
| self.is_tools_added() |
| and not self.model_type.support_native_tool_calling |
| and "</function>" in response.choices[0].message.content |
| ): |
| parsed_content = self._parse_tool_response( |
| response.choices[0].message.content |
| ) |
| if parsed_content: |
| return ChatCompletionMessageToolCall( |
| id=str(uuid.uuid4()), |
| function=Function( |
| arguments=str(parsed_content["arguments"]).replace( |
| "'", '"' |
| ), |
| name=str(parsed_content["function"]), |
| ), |
| type="function", |
| ) |
| elif ( |
| self.is_tools_added() |
| and self.model_type.support_native_tool_calling |
| and response.choices[0].message.tool_calls |
| ): |
| return response.choices[0].message.tool_calls[0] |
|
|
| |
| return None |
|
|
| def _is_standard_response(self, response: Any) -> bool: |
| r"""Determine if the provided response is a standard reply without |
| tool calls. |
| |
| Args: |
| response (Any): The response object to evaluate. |
| |
| Returns: |
| bool: `True` if the response is a standard reply, `False` |
| otherwise. |
| """ |
| if not self.is_tools_added(): |
| return True |
|
|
| if not isinstance(response, ChatCompletion): |
| return True |
|
|
| if self.model_type.support_native_tool_calling: |
| return not response.choices[0].message.tool_calls |
|
|
| return "</function>" not in str( |
| response.choices[0].message.content or "" |
| ) |
|
|
| def _log_final_output(self, output_messages: List[BaseMessage]) -> None: |
| r"""Log final messages or warnings about multiple responses.""" |
| if len(output_messages) == 1: |
| self.record_message(output_messages[0]) |
| else: |
| logger.warning( |
| "Multiple messages returned in `step()`. Record " |
| "selected message manually using `record_message()`." |
| ) |
|
|
| async def step_async( |
| self, |
| input_message: Union[BaseMessage, str], |
| response_format: Optional[Type[BaseModel]] = None, |
| ) -> ChatAgentResponse: |
| r"""Performs a single step in the chat session by generating a response |
| to the input message. This agent step can call async function calls. |
| |
| Args: |
| input_message (Union[BaseMessage, str]): The input message to the |
| agent. For BaseMessage input, its `role` field that specifies |
| the role at backend may be either `user` or `assistant` but it |
| will be set to `user` anyway since for the self agent any |
| incoming message is external. For str input, the `role_name` |
| would be `User`. |
| response_format (Optional[Type[BaseModel]], optional): A pydantic |
| model class that includes value types and field descriptions |
| used to generate a structured response by LLM. This schema |
| helps in defining the expected output format. (default: |
| :obj:`None`) |
| |
| Returns: |
| ChatAgentResponse: A struct containing the output messages, |
| a boolean indicating whether the chat session has terminated, |
| and information about the chat session. |
| """ |
| if isinstance(input_message, str): |
| input_message = BaseMessage.make_user_message( |
| role_name='User', content=input_message |
| ) |
|
|
| self.update_memory(input_message, OpenAIBackendRole.USER) |
|
|
| tool_call_records: List[FunctionCallingRecord] = [] |
| while True: |
| try: |
| openai_messages, num_tokens = self.memory.get_context() |
| except RuntimeError as e: |
| return self._step_token_exceed( |
| e.args[1], tool_call_records, "max_tokens_exceeded" |
| ) |
|
|
| ( |
| response, |
| output_messages, |
| finish_reasons, |
| usage_dict, |
| response_id, |
| ) = self._step_model_response(openai_messages, num_tokens) |
|
|
| if ( |
| not self.is_tools_added() |
| or not isinstance(response, ChatCompletion) |
| or not response.choices[0].message.tool_calls |
| ): |
| break |
|
|
| |
| external_tool_request = response.choices[0].message.tool_calls[0] |
| if external_tool_request.function.name in self.external_tool_names: |
| |
| info = self._step_get_info( |
| output_messages, |
| finish_reasons, |
| usage_dict, |
| response_id, |
| tool_call_records, |
| num_tokens, |
| external_tool_request, |
| ) |
| return ChatAgentResponse( |
| msgs=output_messages, terminated=self.terminated, info=info |
| ) |
|
|
| |
| tool_call_records.append( |
| await self._step_tool_call_and_update_async(response) |
| ) |
|
|
| if ( |
| response_format is not None |
| and self.model_type.support_native_tool_calling |
| ): |
| ( |
| output_messages, |
| finish_reasons, |
| usage_dict, |
| response_id, |
| tool_call_record, |
| num_tokens, |
| ) = self._structure_output_with_function(response_format) |
| tool_call_records.append(tool_call_record) |
|
|
| info = self._step_get_info( |
| output_messages, |
| finish_reasons, |
| usage_dict, |
| response_id, |
| tool_call_records, |
| num_tokens, |
| ) |
|
|
| if len(output_messages) == 1: |
| |
| self.record_message(output_messages[0]) |
| else: |
| logger.warning( |
| "Multiple messages returned in `step()`, message won't be " |
| "recorded automatically. Please call `record_message()` to " |
| "record the selected message manually." |
| ) |
|
|
| return ChatAgentResponse( |
| msgs=output_messages, terminated=self.terminated, info=info |
| ) |
|
|
| def _step_tool_call_and_update( |
| self, response: ChatCompletion |
| ) -> FunctionCallingRecord: |
| r"""Processes a function call within the chat completion response, |
| records the function call in the provided list of tool calls and |
| updates the memory of the current agent. |
| |
| Args: |
| response (ChatCompletion): The response object from the chat |
| completion. |
| |
| Returns: |
| FunctionCallingRecord: The record of calling the function. |
| """ |
|
|
| |
| func_assistant_msg, func_result_msg, tool_call_record = ( |
| self._step_tool_call(response) |
| ) |
|
|
| |
| self.update_memory(func_assistant_msg, OpenAIBackendRole.ASSISTANT) |
| self.update_memory(func_result_msg, OpenAIBackendRole.FUNCTION) |
|
|
| return tool_call_record |
|
|
| async def _step_tool_call_and_update_async( |
| self, response: ChatCompletion |
| ) -> FunctionCallingRecord: |
| ( |
| func_assistant_msg, |
| func_result_msg, |
| func_record, |
| ) = await self.step_tool_call_async(response) |
|
|
| self.update_memory(func_assistant_msg, OpenAIBackendRole.ASSISTANT) |
| self.update_memory(func_result_msg, OpenAIBackendRole.FUNCTION) |
|
|
| return func_record |
|
|
| def _structure_output_with_function( |
| self, response_format: Type[BaseModel] |
| ) -> Tuple[ |
| List[BaseMessage], |
| List[str], |
| Dict[str, int], |
| str, |
| FunctionCallingRecord, |
| int, |
| ]: |
| r"""Internal function of structuring the output of the agent based on |
| the given output schema. |
| |
| Args: |
| response_format (Type[BaseModel]): The output schema to use for |
| structuring the output. |
| |
| Returns: |
| Tuple[List[BaseMessage], List[str], Dict[str, int], str, |
| FunctionCallingRecord, int]: |
| A tuple containing the output messages, finish reasons, usage |
| dictionary, response ID, function calling record, and number of |
| tokens. |
| """ |
| from camel.toolkits import FunctionTool |
|
|
| schema_json = get_pydantic_object_schema(response_format) |
| func_str = json_to_function_code(schema_json) |
| func_callable = func_string_to_callable(func_str) |
| func = FunctionTool(func_callable) |
|
|
| original_model_dict = self.model_backend.model_config_dict |
|
|
| |
| self.tool_dict = {func.get_function_name(): func} |
| self.model_backend.model_config_dict = original_model_dict.copy() |
| self.model_backend.model_config_dict["tools"] = [ |
| func.get_openai_tool_schema() |
| ] |
| self.model_backend.model_config_dict["tool_choice"] = "required" |
|
|
| openai_messages, num_tokens = self.memory.get_context() |
| ( |
| response, |
| output_messages, |
| finish_reasons, |
| usage_dict, |
| response_id, |
| ) = self._step_model_response(openai_messages, num_tokens) |
|
|
| if isinstance(response, ChatCompletion): |
| tool_call_record = self._step_tool_call_and_update(response) |
| else: |
| raise ValueError( |
| "Structured output is not supported for stream responses." |
| ) |
|
|
| for base_message_item in output_messages: |
| base_message_item.content = json.dumps(tool_call_record.result) |
|
|
| |
| self.model_backend.model_config_dict = original_model_dict |
|
|
| return ( |
| output_messages, |
| finish_reasons, |
| usage_dict, |
| response_id, |
| tool_call_record, |
| num_tokens, |
| ) |
|
|
| def _step_model_response( |
| self, |
| openai_messages: List[OpenAIMessage], |
| num_tokens: int, |
| ) -> tuple[ |
| Union[ChatCompletion, Stream], |
| List[BaseMessage], |
| List[str], |
| Dict[str, int], |
| str, |
| ]: |
| r"""Internal function for agent step model response.""" |
|
|
| response = None |
| |
| for _ in range(len(self.model_backend.models)): |
| try: |
| response = self.model_backend.run(openai_messages) |
| break |
| except Exception as exc: |
| logger.error( |
| f"An error occurred while running model " |
| f"{self.model_backend.model_type}, " |
| f"index: {self.model_backend.current_model_index}", |
| exc_info=exc, |
| ) |
| continue |
| if not response: |
| raise ModelProcessingError( |
| "Unable to process messages: none of the provided models " |
| "run succesfully." |
| ) |
|
|
| logger.info( |
| f"Model {self.model_backend.model_type}, " |
| f"index {self.model_backend.current_model_index}, " |
| f"processed these messages: {openai_messages}" |
| ) |
|
|
| if isinstance(response, ChatCompletion): |
| output_messages, finish_reasons, usage_dict, response_id = ( |
| self.handle_batch_response(response) |
| ) |
| else: |
| output_messages, finish_reasons, usage_dict, response_id = ( |
| self.handle_stream_response(response, num_tokens) |
| ) |
| return ( |
| response, |
| output_messages, |
| finish_reasons, |
| usage_dict, |
| response_id, |
| ) |
|
|
| def _step_get_info( |
| self, |
| output_messages: List[BaseMessage], |
| finish_reasons: List[str], |
| usage_dict: Dict[str, int], |
| response_id: str, |
| tool_calls: List[FunctionCallingRecord], |
| num_tokens: int, |
| external_tool_request: Optional[ChatCompletionMessageToolCall] = None, |
| ) -> Dict[str, Any]: |
| r"""Process the output of a chat step and gather information about the |
| step. |
| |
| This method checks for termination conditions, updates the agent's |
| state, and collects information about the chat step, including tool |
| calls and termination reasons. |
| |
| Args: |
| output_messages (List[BaseMessage]): The messages generated in |
| this step. |
| finish_reasons (List[str]): The reasons for finishing the |
| generation for each message. |
| usage_dict (Dict[str, int]): Dictionary containing token usage |
| information. |
| response_id (str): The ID of the response from the model. |
| tool_calls (List[FunctionCallingRecord]): Records of function calls |
| made during this step. |
| num_tokens (int): The number of tokens used in this step. |
| external_tool_request (Optional[ChatCompletionMessageToolCall]): |
| Any external tool request made during this step. |
| (default: :obj:`None`) |
| |
| Returns: |
| Dict[str, Any]: A dictionary containing information about the chat |
| step, including termination status, reasons, and tool call |
| information. |
| |
| Note: |
| This method iterates over all response terminators and checks if |
| any of them signal termination. If a terminator signals |
| termination, the agent's state is updated accordingly, and the |
| termination reason is recorded. |
| """ |
| termination = [ |
| terminator.is_terminated(output_messages) |
| for terminator in self.response_terminators |
| ] |
| |
| self.terminated, termination_reason = next( |
| ( |
| (terminated, termination_reason) |
| for terminated, termination_reason in termination |
| if terminated |
| ), |
| (False, None), |
| ) |
| |
| if self.terminated and termination_reason is not None: |
| finish_reasons = [termination_reason] * len(finish_reasons) |
|
|
| info = self.get_info( |
| response_id, |
| usage_dict, |
| finish_reasons, |
| num_tokens, |
| tool_calls, |
| external_tool_request, |
| ) |
| return info |
|
|
| def handle_batch_response( |
| self, response: ChatCompletion |
| ) -> Tuple[List[BaseMessage], List[str], Dict[str, int], str]: |
| r"""Process a batch response from the model and extract the necessary |
| information. |
| |
| Args: |
| response (dict): Model response. |
| |
| Returns: |
| tuple: A tuple of list of output `ChatMessage`, list of |
| finish reasons, usage dictionary, and response id. |
| """ |
| output_messages: List[BaseMessage] = [] |
| for choice in response.choices: |
| chat_message = BaseMessage( |
| role_name=self.role_name, |
| role_type=self.role_type, |
| meta_dict=dict(), |
| content=choice.message.content or "", |
| parsed=getattr(choice.message, 'parsed', None), |
| ) |
| |
| if choice.logprobs is not None: |
| tokens_logprobs = choice.logprobs.content |
|
|
| if tokens_logprobs is not None: |
| |
| logprobs_info = [ |
| { |
| "token": token_logprob.token, |
| "logprob": token_logprob.logprob, |
| "top_logprobs": [ |
| (top_logprob.token, top_logprob.logprob) |
| for top_logprob in token_logprob.top_logprobs |
| ], |
| } |
| for token_logprob in tokens_logprobs |
| ] |
| |
| if chat_message.meta_dict is None: |
| chat_message.meta_dict = {} |
| chat_message.meta_dict["logprobs_info"] = logprobs_info |
| |
| output_messages.append(chat_message) |
|
|
| finish_reasons = [ |
| str(choice.finish_reason) for choice in response.choices |
| ] |
| usage = ( |
| self._safe_model_dump(response.usage) |
| if response.usage is not None |
| else {} |
| ) |
| return ( |
| output_messages, |
| finish_reasons, |
| usage, |
| response.id, |
| ) |
|
|
| def _safe_model_dump(self, obj) -> dict: |
| r"""Safely dump a Pydantic model to a dictionary. |
| |
| This method attempts to use the `model_dump` method if available, |
| otherwise it falls back to the `dict` method. |
| |
| Args: |
| obj: The Pydantic model instance to be dumped. |
| |
| Returns: |
| dict: A dictionary representation of the Pydantic model. |
| """ |
| |
| if hasattr(obj, 'model_dump'): |
| return obj.model_dump() |
| |
| elif hasattr(obj, 'dict'): |
| return obj.dict() |
| else: |
| raise TypeError("The object is not a Pydantic model") |
|
|
| def handle_stream_response( |
| self, |
| response: Stream[ChatCompletionChunk], |
| prompt_tokens: int, |
| ) -> Tuple[List[BaseMessage], List[str], Dict[str, int], str]: |
| r"""Process a stream response from the model and extract the necessary |
| information. |
| |
| Args: |
| response (dict): Model response. |
| prompt_tokens (int): Number of input prompt tokens. |
| |
| Returns: |
| tuple: A tuple of list of output `ChatMessage`, list of |
| finish reasons, usage dictionary, and response id. |
| """ |
| content_dict: defaultdict = defaultdict(lambda: "") |
| finish_reasons_dict: defaultdict = defaultdict(lambda: "") |
| output_messages: List[BaseMessage] = [] |
| response_id: str = "" |
| |
| for chunk in response: |
| response_id = chunk.id |
| for choice in chunk.choices: |
| index = choice.index |
| delta = choice.delta |
| if delta.content is not None: |
| |
| |
| content_dict[index] += delta.content |
| if choice.finish_reason: |
| finish_reasons_dict[index] = choice.finish_reason |
| chat_message = BaseMessage( |
| role_name=self.role_name, |
| role_type=self.role_type, |
| meta_dict=dict(), |
| content=content_dict[index], |
| ) |
| output_messages.append(chat_message) |
| finish_reasons = [ |
| finish_reasons_dict[i] for i in range(len(finish_reasons_dict)) |
| ] |
| usage_dict = self.get_usage_dict(output_messages, prompt_tokens) |
| return output_messages, finish_reasons, usage_dict, response_id |
|
|
| def _step_token_exceed( |
| self, |
| num_tokens: int, |
| tool_calls: List[FunctionCallingRecord], |
| termination_reason: str, |
| ) -> ChatAgentResponse: |
| r"""Return trivial response containing number of tokens and information |
| of called functions when the number of tokens exceeds. |
| |
| Args: |
| num_tokens (int): Number of tokens in the messages. |
| tool_calls (List[FunctionCallingRecord]): List of information |
| objects of functions called in the current step. |
| termination_reason (str): String of termination reason. |
| |
| Returns: |
| ChatAgentResponse: The struct containing trivial outputs and |
| information about token number and called functions. |
| """ |
| self.terminated = True |
| output_messages: List[BaseMessage] = [] |
|
|
| info = self.get_info( |
| None, |
| None, |
| [termination_reason], |
| num_tokens, |
| tool_calls, |
| ) |
|
|
| return ChatAgentResponse( |
| msgs=output_messages, |
| terminated=self.terminated, |
| info=info, |
| ) |
|
|
| def _step_tool_call( |
| self, |
| response: ChatCompletion, |
| ) -> Tuple[ |
| FunctionCallingMessage, FunctionCallingMessage, FunctionCallingRecord |
| ]: |
| r"""Execute the function with arguments following the model's response. |
| |
| Args: |
| response (Dict[str, Any]): The response obtained by calling the |
| model. |
| |
| Returns: |
| tuple: A tuple consisting of two obj:`FunctionCallingMessage`, |
| one about the arguments and the other about the execution |
| result, and a struct for logging information about this |
| function call. |
| """ |
| choice = response.choices[0] |
| if choice.message.tool_calls is None: |
| raise RuntimeError("Tool call is None") |
| func_name = choice.message.tool_calls[0].function.name |
|
|
| arguments_str = choice.message.tool_calls[0].function.arguments |
| args = self._safe_json_loads(arguments_str) |
|
|
| tool = self.tool_dict[func_name] |
| result = tool(**args) |
| tool_call_id = choice.message.tool_calls[0].id |
|
|
| assist_msg = FunctionCallingMessage( |
| role_name=self.role_name, |
| role_type=self.role_type, |
| meta_dict=None, |
| content="", |
| func_name=func_name, |
| args=args, |
| tool_call_id=tool_call_id, |
| ) |
| func_msg = FunctionCallingMessage( |
| role_name=self.role_name, |
| role_type=self.role_type, |
| meta_dict=None, |
| content="", |
| func_name=func_name, |
| result=result, |
| tool_call_id=tool_call_id, |
| ) |
|
|
| |
| func_record = FunctionCallingRecord( |
| func_name=func_name, |
| args=args, |
| result=result, |
| tool_call_id=tool_call_id, |
| ) |
| return assist_msg, func_msg, func_record |
|
|
| def _safe_json_loads(self, arguments_str): |
| |
| arguments_str = arguments_str.replace("None", "null") |
| arguments_str = arguments_str.replace("True", "true") |
| arguments_str = arguments_str.replace("False", "false") |
|
|
| |
| try: |
| return json.loads(arguments_str) |
| except json.JSONDecodeError as e: |
| raise ValueError(f"Invalid JSON format: {e}") |
|
|
| async def step_tool_call_async( |
| self, |
| response: ChatCompletion, |
| ) -> Tuple[ |
| FunctionCallingMessage, FunctionCallingMessage, FunctionCallingRecord |
| ]: |
| r"""Execute the async function with arguments following the model's |
| response. |
| |
| Args: |
| response (Dict[str, Any]): The response obtained by calling the |
| model. |
| |
| Returns: |
| tuple: A tuple consisting of two obj:`FunctionCallingMessage`, |
| one about the arguments and the other about the execution |
| result, and a struct for logging information about this |
| function call. |
| """ |
| |
| choice = response.choices[0] |
| if choice.message.tool_calls is None: |
| raise RuntimeError("Tool call is None") |
| func_name = choice.message.tool_calls[0].function.name |
|
|
| args = json.loads(choice.message.tool_calls[0].function.arguments) |
| tool = self.tool_dict[func_name] |
| result = await tool(**args) |
| tool_call_id = choice.message.tool_calls[0].id |
|
|
| assist_msg = FunctionCallingMessage( |
| role_name=self.role_name, |
| role_type=self.role_type, |
| meta_dict=None, |
| content="", |
| func_name=func_name, |
| args=args, |
| tool_call_id=tool_call_id, |
| ) |
| func_msg = FunctionCallingMessage( |
| role_name=self.role_name, |
| role_type=self.role_type, |
| meta_dict=None, |
| content="", |
| func_name=func_name, |
| result=result, |
| tool_call_id=tool_call_id, |
| ) |
|
|
| |
| func_record = FunctionCallingRecord( |
| func_name=func_name, |
| args=args, |
| result=result, |
| tool_call_id=tool_call_id, |
| ) |
| return assist_msg, func_msg, func_record |
|
|
| def get_usage_dict( |
| self, output_messages: List[BaseMessage], prompt_tokens: int |
| ) -> Dict[str, int]: |
| r"""Get usage dictionary when using the stream mode. |
| |
| Args: |
| output_messages (list): List of output messages. |
| prompt_tokens (int): Number of input prompt tokens. |
| |
| Returns: |
| dict: Usage dictionary. |
| """ |
| encoding = get_model_encoding(self.model_type.value_for_tiktoken) |
| completion_tokens = 0 |
| for message in output_messages: |
| completion_tokens += len(encoding.encode(message.content)) |
| usage_dict = dict( |
| completion_tokens=completion_tokens, |
| prompt_tokens=prompt_tokens, |
| total_tokens=completion_tokens + prompt_tokens, |
| ) |
| return usage_dict |
|
|
| def add_model_scheduling_strategy(self, name: str, strategy_fn: Callable): |
| r"""Add a scheduling strategy method provided by user to ModelManger. |
| |
| Args: |
| name (str): The name of the strategy. |
| strategy_fn (Callable): The scheduling strategy function. |
| """ |
| self.model_backend.add_strategy(name, strategy_fn) |
|
|
| def __repr__(self) -> str: |
| r"""Returns a string representation of the :obj:`ChatAgent`. |
| |
| Returns: |
| str: The string representation of the :obj:`ChatAgent`. |
| """ |
| return ( |
| f"ChatAgent({self.role_name}, {self.role_type}, {self.model_type})" |
| ) |
|
|