Spaces:
Sleeping
Sleeping
| # coding: utf-8 | |
| # Copyright (c) 2025 inclusionAI. | |
| import copy | |
| import inspect | |
| import os.path | |
| from typing import Dict, Any, List, Union | |
| from aworld.core.context.base import Context | |
| from aworld.logs.util import logger | |
| from aworld.models.qwen_tokenizer import qwen_tokenizer | |
| from aworld.models.openai_tokenizer import openai_tokenizer | |
| from aworld.utils import import_package | |
| def usage_process(usage: Dict[str, Union[int, Dict[str, int]]] = {}, context: Context = None): | |
| if not context: | |
| context = Context.instance() | |
| stacks = inspect.stack() | |
| index = 0 | |
| for idx, stack in enumerate(stacks): | |
| index = idx + 1 | |
| file = os.path.basename(stack.filename) | |
| # supported use `llm.py` utility function only | |
| if 'call_llm_model' in stack.function and file == 'llm.py': | |
| break | |
| if index >= len(stacks): | |
| logger.warning("not category usage find to count") | |
| else: | |
| instance = stacks[index].frame.f_locals.get('self') | |
| name = getattr(instance, "_name", "unknown") | |
| usage[name] = copy.copy(usage) | |
| # total usage | |
| context.add_token(usage) | |
| def num_tokens_from_messages(messages, model="gpt-4o"): | |
| """Return the number of tokens used by a list of messages.""" | |
| import_package("tiktoken") | |
| import tiktoken | |
| if model.lower() == "qwen": | |
| encoding = qwen_tokenizer | |
| elif model.lower() == "openai": | |
| encoding = openai_tokenizer | |
| else: | |
| try: | |
| encoding = tiktoken.encoding_for_model(model) | |
| except KeyError: | |
| logger.warning(f"{model} model not found. Using cl100k_base encoding.") | |
| encoding = tiktoken.get_encoding("cl100k_base") | |
| tokens_per_message = 3 | |
| tokens_per_name = 1 | |
| num_tokens = 0 | |
| for message in messages: | |
| num_tokens += tokens_per_message | |
| if isinstance(message, str): | |
| num_tokens += len(encoding.encode(message)) | |
| else: | |
| for key, value in message.items(): | |
| num_tokens += len(encoding.encode(str(value))) | |
| if key == "name": | |
| num_tokens += tokens_per_name | |
| num_tokens += 3 | |
| return num_tokens | |
| def truncate_tokens_from_messages(messages: List[Dict[str, Any]], max_tokens: int, keep_both_sides: bool = False, model: str = "gpt-4o"): | |
| import_package("tiktoken") | |
| import tiktoken | |
| if model.lower() == "qwen": | |
| return qwen_tokenizer.truncate(messages, max_tokens, keep_both_sides) | |
| elif model.lower() == "openai": | |
| return openai_tokenizer.truncate(messages, max_tokens, keep_both_sides) | |
| try: | |
| encoding = tiktoken.encoding_for_model(model) | |
| except KeyError: | |
| logger.warning(f"{model} model not found. Using cl100k_base encoding.") | |
| encoding = tiktoken.get_encoding("cl100k_base") | |
| return encoding.truncate(messages, max_tokens, keep_both_sides) | |
| def agent_desc_transform(agent_dict: Dict[str, Any], | |
| agents: List[str] = None, | |
| provider: str = 'openai', | |
| strategy: str = 'min') -> List[Dict[str, Any]]: | |
| """Default implement transform framework standard protocol to openai protocol of agent description. | |
| Args: | |
| agent_dict: Dict of descriptions of agents that are registered in the agent factory. | |
| agents: Description of special agents to use. | |
| provider: Different descriptions formats need to be processed based on the provider. | |
| strategy: The value is `min` or `max`, when no special agents are provided, `min` indicates no content returned, | |
| `max` means get all agents' descriptions. | |
| """ | |
| agent_as_tools = [] | |
| if not agents and strategy == 'min': | |
| return agent_as_tools | |
| if provider and 'openai' in provider: | |
| for agent_name, agent_info in agent_dict.items(): | |
| if agents and agent_name not in agents: | |
| logger.debug(f"{agent_name} can not supported in {agents}, you can set `tools` params to support it.") | |
| continue | |
| for action in agent_info["abilities"]: | |
| # Build parameter properties | |
| properties = {} | |
| required = [] | |
| for param_name, param_info in action["params"].items(): | |
| properties[param_name] = { | |
| "description": param_info["desc"], | |
| "type": param_info["type"] if param_info["type"] != "str" else "string" | |
| } | |
| if param_info.get("required", False): | |
| required.append(param_name) | |
| openai_function_schema = { | |
| "name": f'{agent_name}__{action["name"]}', | |
| "description": action["desc"], | |
| "parameters": { | |
| "type": "object", | |
| "properties": properties, | |
| "required": required | |
| } | |
| } | |
| agent_as_tools.append({ | |
| "type": "function", | |
| "function": openai_function_schema | |
| }) | |
| return agent_as_tools | |
| def tool_desc_transform(tool_dict: Dict[str, Any], | |
| tools: List[str] = None, | |
| black_tool_actions: Dict[str, List[str]] = {}, | |
| provider: str = 'openai', | |
| strategy: str = 'min') -> List[Dict[str, Any]]: | |
| """Default implement transform framework standard protocol to openai protocol of tool description. | |
| Args: | |
| tool_dict: Dict of descriptions of tools that are registered in the agent factory. | |
| tools: Description of special tools to use. | |
| provider: Different descriptions formats need to be processed based on the provider. | |
| strategy: The value is `min` or `max`, when no special tools are provided, `min` indicates no content returned, | |
| `max` means get all tools' descriptions. | |
| """ | |
| openai_tools = [] | |
| if not tools and strategy == 'min': | |
| return openai_tools | |
| if black_tool_actions is None: | |
| black_tool_actions = {} | |
| if provider and 'openai' in provider: | |
| for tool_name, tool_info in tool_dict.items(): | |
| if tools and tool_name not in tools: | |
| logger.debug(f"{tool_name} can not supported in {tools}, you can set `tools` params to support it.") | |
| continue | |
| black_actions = black_tool_actions.get(tool_name, []) | |
| for action in tool_info["actions"]: | |
| if action['name'] in black_actions: | |
| continue | |
| # Build parameter properties | |
| properties = {} | |
| required = [] | |
| for param_name, param_info in action["params"].items(): | |
| properties[param_name] = { | |
| "description": param_info["desc"], | |
| "type": param_info["type"] if param_info["type"] != "str" else "string" | |
| } | |
| if param_info.get("required", False): | |
| required.append(param_name) | |
| openai_function_schema = { | |
| "name": f'{tool_name}__{action["name"]}', | |
| "description": action["desc"], | |
| "parameters": { | |
| "type": "object", | |
| "properties": properties, | |
| "required": required | |
| } | |
| } | |
| openai_tools.append({ | |
| "type": "function", | |
| "function": openai_function_schema | |
| }) | |
| return openai_tools | |