Spaces:
Running
Running
| import litellm | |
| import json | |
| import asyncio | |
| import time | |
| from pathlib import Path | |
| from typing import List, Sequence, Union, Dict, Optional | |
| from dotenv import load_dotenv | |
| from openai.types.chat import ChatCompletionToolParam | |
| from openspace.grounding.core.types import ToolSchema, ToolResult, ToolStatus | |
| from openspace.grounding.core.tool import BaseTool | |
| from openspace.utils.logging import Logger | |
| # Load .env from openspace package root (works regardless of CWD), | |
| # then fall back to CWD/.env. override=False (default) means first-loaded wins. | |
| _PKG_ENV = Path(__file__).resolve().parent.parent / ".env" # openspace/.env | |
| if _PKG_ENV.is_file(): | |
| load_dotenv(_PKG_ENV) | |
| load_dotenv() # also try CWD/.env for any remaining vars | |
| # Disable LiteLLM verbose logging to prevent stdout blocking with large tool schemas | |
| litellm.set_verbose = False | |
| litellm.suppress_debug_info = True | |
| logger = Logger.get_logger(__name__) | |
| def _sanitize_schema(params: Dict) -> Dict: | |
| """Sanitize tool parameter schema to comply with Claude API requirements. | |
| Fixes common issues: | |
| - Empty object schemas (no properties, no required) | |
| - Missing required fields for Claude compatibility | |
| """ | |
| if not params: | |
| return {"type": "object", "properties": {}, "required": []} | |
| # Deep copy to avoid modifying the original | |
| import copy | |
| sanitized = copy.deepcopy(params) | |
| # Anthropic API requires top-level type to be 'object' | |
| # If it's not an object, wrap the schema as a property of an object | |
| top_level_type = sanitized.get("type") | |
| if top_level_type and top_level_type != "object": | |
| # Wrap non-object schema as a single property called "value" | |
| logger.debug(f"[SCHEMA_SANITIZE] Wrapping non-object schema (type={top_level_type}) into object") | |
| wrapped = { | |
| "type": "object", | |
| "properties": { | |
| "value": sanitized # The original schema becomes a property | |
| }, | |
| "required": ["value"] # Make it required | |
| } | |
| sanitized = wrapped | |
| # If type is object but missing properties/required, add them | |
| if sanitized.get("type") == "object": | |
| if "properties" not in sanitized: | |
| sanitized["properties"] = {} | |
| if "required" not in sanitized: | |
| sanitized["required"] = [] | |
| # Remove non-standard fields that may cause issues (like 'title') | |
| sanitized.pop("title", None) | |
| # Recursively sanitize nested properties | |
| if "properties" in sanitized and isinstance(sanitized["properties"], dict): | |
| for prop_name, prop_schema in list(sanitized["properties"].items()): | |
| if isinstance(prop_schema, dict): | |
| # Remove title from nested properties | |
| prop_schema.pop("title", None) | |
| return sanitized | |
| def _schema_to_openai(schema: ToolSchema) -> ChatCompletionToolParam: | |
| """Convert ToolSchema to OpenAI ChatCompletion tool format""" | |
| function_def = { | |
| "name": schema.name, | |
| "description": schema.description or "", | |
| } | |
| # Sanitize and add parameters | |
| if schema.parameters: | |
| sanitized = _sanitize_schema(schema.parameters) | |
| function_def["parameters"] = sanitized | |
| # Debug: verify sanitization worked | |
| if "title" in schema.parameters and "title" not in sanitized: | |
| logger.debug(f"Sanitized tool '{schema.name}': removed title") | |
| else: | |
| # Claude requires parameters field even if empty | |
| function_def["parameters"] = {"type": "object", "properties": {}, "required": []} | |
| return { | |
| "type": "function", | |
| "function": function_def | |
| } | |
| def _prepare_tools_for_llmclient( | |
| tools: List[BaseTool] | None, | |
| fmt: str = "openai", | |
| ) -> tuple[Sequence[Union[ToolSchema, ChatCompletionToolParam]], Dict[str, BaseTool]]: | |
| """Convert BaseTool list to LLMClient usable format, with deduplication. | |
| Args: | |
| tools: BaseTool instance list (should be obtained from GroundingClient and bound to runtime_info) | |
| if None or empty list, return empty list | |
| fmt: output format, "openai" for OpenAI format | |
| """ | |
| if not tools: | |
| return [], {} | |
| if fmt == "openai": | |
| result = [] | |
| tool_map = {} # llm_name -> BaseTool | |
| name_count = {} | |
| for tool in tools: | |
| name = tool.schema.name | |
| name_count[name] = name_count.get(name, 0) + 1 | |
| seen_names = set() | |
| for tool in tools: | |
| original_name = tool.schema.name | |
| if name_count[original_name] > 1: | |
| server_name = "unknown" | |
| if tool.is_bound and tool.runtime_info and tool.runtime_info.server_name: | |
| server_name = tool.runtime_info.server_name | |
| llm_name = f"{server_name}__{original_name}" | |
| else: | |
| llm_name = original_name | |
| if llm_name in seen_names: | |
| logger.warning(f"[TOOL_DEDUP] Skipping duplicate tool: {llm_name}") | |
| continue | |
| seen_names.add(llm_name) | |
| tool_param = _schema_to_openai(tool.schema) | |
| tool_param["function"]["name"] = llm_name | |
| # Tag the description with backend type so the LLM knows each | |
| # tool's origin (e.g. "[MCP] ...", "[Shell] ..."). | |
| backend_type = getattr(tool.schema, "backend_type", None) | |
| if backend_type and backend_type.value not in ("not_set",): | |
| _BACKEND_LABELS = { | |
| "mcp": "MCP", | |
| "shell": "Shell", | |
| "gui": "GUI", | |
| "web": "Web", | |
| "system": "System", | |
| } | |
| label = _BACKEND_LABELS.get(backend_type.value, backend_type.value) | |
| desc = tool_param["function"].get("description", "") | |
| tool_param["function"]["description"] = f"[{label}] {desc}" | |
| result.append(tool_param) | |
| tool_map[llm_name] = tool | |
| if llm_name != original_name: | |
| logger.info(f"[TOOL_RENAME] {original_name} -> {llm_name}") | |
| logger.info(f"[SCHEMA_SANITIZE] Prepared {len(result)} tools for LLM (from {len(tools)} total)") | |
| return result, tool_map | |
| tool_map = {tool.schema.name: tool for tool in tools} | |
| return [tool.schema for tool in tools], tool_map | |
| def _infer_backend_from_tool_name(tool_name: str) -> Optional[str]: | |
| """Infer backend when tool_results would otherwise have no backend (name mismatch or unbound tools).""" | |
| if not tool_name or not isinstance(tool_name, str): | |
| return None | |
| name = tool_name.strip() | |
| # Dedup format: "server__toolname" -> use suffix | |
| if "__" in name: | |
| name = name.split("__", 1)[-1] | |
| shell_tools = {"shell_agent", "read_file", "write_file", "list_dir", "run_shell"} | |
| if name in shell_tools: | |
| return "shell" | |
| if name in ("gui_agent",) or "gui" in name.lower(): | |
| return "gui" | |
| if "mcp" in name.lower() or ("." in name and "__" not in name): | |
| return "mcp" | |
| if name in ("deep_research_agent", "deep_research"): | |
| return "web" | |
| return None | |
| DEFAULT_SUMMARIZE_THRESHOLD_CHARS = 200000 # ~50K tokens, lowered from 400K to prevent context overflow | |
| MAX_TOOL_RESULT_CHARS = 200000 # Fallback truncation limit when summarization fails (~50K tokens) | |
| async def _summarize_tool_result( | |
| content: str, | |
| tool_name: str, | |
| task: str = "", | |
| model: str = "openrouter/anthropic/claude-sonnet-4.5", | |
| timeout: float = 120.0 | |
| ) -> str: | |
| """Use LLM to summarize large tool results.""" | |
| try: | |
| from gdpval_bench.token_tracker import set_call_source, reset_call_source | |
| _src_tok = set_call_source("summarizer") | |
| except ImportError: | |
| _src_tok = None | |
| try: | |
| logger.info(f"Summarizing tool result from '{tool_name}': {len(content):,} chars") | |
| # Pre-truncate if content is too large for the model (leave room for prompt + output) | |
| # Assuming ~4 chars per token, 200K tokens limit, 8K output, ~500 tokens for prompt | |
| # Safe input limit: (200K - 8K - 0.5K) * 4 = ~766K chars, but be conservative at 400K | |
| max_input_chars = 200000 | |
| if len(content) > max_input_chars: | |
| logger.warning(f"Pre-truncating content for summarization: {len(content):,} -> {max_input_chars:,} chars") | |
| content = content[:max_input_chars] + f"\n\n[TRUNCATED for summarization: original was {len(content):,} chars]" | |
| task_hint = f"\n\nUser's task: {task}\nSummarize with focus on information relevant to this task." if task else "" | |
| prompt = f"""Tool '{tool_name}' returned a large result ({len(content):,} chars). Summarize it concisely.{task_hint} | |
| **Guidelines:** | |
| - Structured data (coordinates, steps, etc.): Keep key summary (totals, start/end), omit repetitive details. | |
| - Markup content (HTML, XML): Extract text and key data only, ignore tags/scripts. | |
| - Long documents: Keep structure outline and essential sections. | |
| - Lists/arrays: Summarize count and most relevant items. | |
| - Always preserve: numbers, URLs, file paths, IDs, key identifiers. | |
| Content: | |
| {content} | |
| Concise summary:""" | |
| response = await asyncio.wait_for( | |
| litellm.acompletion( | |
| model=model, | |
| messages=[{"role": "user", "content": prompt}], | |
| timeout=timeout | |
| ), | |
| timeout=timeout + 5 | |
| ) | |
| summary = response.choices[0].message.content.strip() | |
| result = f"[SUMMARY of {len(content):,} chars]\n{summary}" | |
| logger.info(f"Tool result summarized: {len(content):,} -> {len(result):,} chars") | |
| return result | |
| except Exception as e: | |
| logger.warning(f"Summarization failed for '{tool_name}': {e}") | |
| return None | |
| finally: | |
| if _src_tok is not None: | |
| reset_call_source(_src_tok) | |
| async def _tool_result_to_message_async( | |
| result: ToolResult, | |
| *, | |
| tool_call_id: str, | |
| tool_name: str, | |
| task: str = "", | |
| summarize_threshold: int = DEFAULT_SUMMARIZE_THRESHOLD_CHARS, | |
| summarize_model: str = "openrouter/anthropic/claude-sonnet-4.5", | |
| enable_summarization: bool = True | |
| ) -> Dict: | |
| """Convert ToolResult to LLMClient usable message format with LLM summarization for large results. | |
| Args: | |
| result: Tool execution result | |
| tool_call_id: OpenAI tool_call ID | |
| tool_name: Tool name | |
| task: User's original task for context-aware summarization | |
| summarize_threshold: If content exceeds this, use LLM summarization | |
| summarize_model: Model to use for summarization | |
| enable_summarization: Whether to enable LLM summarization | |
| Returns: | |
| OpenAI ChatCompletion tool message (text only) | |
| """ | |
| if result.is_error: | |
| text_content = f"[ERROR] {result.error or 'unknown error'}" | |
| else: | |
| text_content = ( | |
| result.content | |
| if isinstance(result.content, str) | |
| else json.dumps(result.content, ensure_ascii=False, default=str) | |
| ) | |
| original_len = len(text_content) | |
| # Use LLM summarization if content exceeds threshold | |
| if original_len > summarize_threshold and enable_summarization: | |
| summary = await _summarize_tool_result(text_content, tool_name, task, summarize_model) | |
| if summary: | |
| text_content = summary | |
| elif original_len > MAX_TOOL_RESULT_CHARS: | |
| # Fallback: truncate if summarization failed and content is too large | |
| truncate_msg = f"\n\n[TRUNCATED: Original content was {original_len:,} chars, showing first {MAX_TOOL_RESULT_CHARS:,}]" | |
| text_content = text_content[:MAX_TOOL_RESULT_CHARS - len(truncate_msg)] + truncate_msg | |
| logger.warning(f"Tool result truncated for '{tool_name}': {original_len:,} -> {len(text_content):,} chars (summarization failed)") | |
| return { | |
| "role": "tool", | |
| "name": tool_name, | |
| "content": text_content, | |
| "tool_call_id": tool_call_id, | |
| } | |
| async def _execute_tool_call( | |
| tool: BaseTool, | |
| openai_tool_call: Dict, | |
| ) -> ToolResult: | |
| """Execute LLMClient returned tool_call | |
| Args: | |
| tool: BaseTool instance (must be obtained from GroundingClient and bound to runtime_info) | |
| openai_tool_call: LLMClient usable tool_call object, contains id, type, function etc. fields | |
| """ | |
| if not tool.is_bound: | |
| raise ValueError( | |
| f"Tool '{tool.schema.name}' is not bound to runtime_info. " | |
| f"Please ensure tools are obtained from GroundingClient.list_tools() " | |
| f"with bind_runtime_info=True" | |
| ) | |
| func = openai_tool_call["function"] | |
| arguments = func.get("arguments", "{}") | |
| if isinstance(arguments, str): | |
| arguments = json.loads(arguments or "{}") | |
| # Filter out parameters that are not in the tool's schema | |
| if isinstance(arguments, dict) and tool.schema.parameters: | |
| # Get valid parameter names from tool schema (JSON Schema format) | |
| schema_params = tool.schema.parameters | |
| valid_params = set() | |
| if isinstance(schema_params, dict) and "properties" in schema_params: | |
| valid_params = set(schema_params["properties"].keys()) | |
| # Check for invalid parameters | |
| invalid_params = [] | |
| for param_name in list(arguments.keys()): | |
| if param_name == "skip_visual_analysis": | |
| invalid_params.append(param_name) | |
| continue | |
| # Check if parameter is in the tool's schema | |
| if valid_params and param_name not in valid_params: | |
| invalid_params.append(param_name) | |
| # Remove invalid parameters | |
| for param in invalid_params: | |
| arguments.pop(param) | |
| logger.debug( | |
| f"Removed parameter '{param}' from {tool.schema.name} " | |
| f"(not in tool schema)" | |
| ) | |
| return await tool.invoke( | |
| parameters=arguments, | |
| keep_session=True | |
| ) | |
| class LLMClient: | |
| """LLMClient class for single round call""" | |
| def __init__( | |
| self, | |
| model: str = "openrouter/anthropic/claude-sonnet-4.5", | |
| enable_thinking: bool = False, | |
| rate_limit_delay: float = 0.0, | |
| max_retries: int = 3, | |
| retry_delay: float = 1.0, | |
| timeout: float = 120.0, | |
| summarize_threshold_chars: int = DEFAULT_SUMMARIZE_THRESHOLD_CHARS, | |
| enable_tool_result_summarization: bool = True, | |
| **litellm_kwargs | |
| ): | |
| """ | |
| Args: | |
| model: LLM model identifier | |
| enable_thinking: Whether to enable extended thinking mode | |
| rate_limit_delay: Minimum delay between API calls in seconds (0 = no delay) | |
| max_retries: Maximum number of retries on rate limit errors | |
| retry_delay: Initial delay between retries in seconds (exponential backoff) | |
| timeout: Request timeout in seconds (default: 120s) | |
| summarize_threshold_chars: If tool result exceeds this threshold, use LLM to | |
| summarize the result (default: 50000 chars ≈ 12.5K tokens) | |
| enable_tool_result_summarization: Whether to enable LLM-based summarization for | |
| large tool results (default: True) | |
| **litellm_kwargs: Additional litellm parameters | |
| """ | |
| self.model = model | |
| self.enable_thinking = enable_thinking | |
| self.rate_limit_delay = rate_limit_delay | |
| self.max_retries = max_retries | |
| self.retry_delay = retry_delay | |
| self.timeout = timeout | |
| self.summarize_threshold_chars = summarize_threshold_chars | |
| self.enable_tool_result_summarization = enable_tool_result_summarization | |
| self.litellm_kwargs = litellm_kwargs | |
| self._logger = Logger.get_logger(__name__) | |
| self._last_call_time = 0.0 | |
| async def _rate_limit(self): | |
| """Apply rate limiting by adding delay between API calls""" | |
| if self.rate_limit_delay > 0: | |
| current_time = time.time() | |
| time_since_last_call = current_time - self._last_call_time | |
| if time_since_last_call < self.rate_limit_delay: | |
| sleep_time = self.rate_limit_delay - time_since_last_call | |
| self._logger.debug(f"Rate limiting: waiting {sleep_time:.2f}s before next API call") | |
| await asyncio.sleep(sleep_time) | |
| self._last_call_time = time.time() | |
| async def _call_with_retry(self, **completion_kwargs): | |
| """Call LLM with backoff retry on rate limit errors | |
| Timeout and retry strategy: | |
| - Single call timeout: self.timeout (default 120s) | |
| - Rate limit retry delays: 60s, 90s, 120s | |
| - Total max time: timeout * max_retries + sum(retry_delays) | |
| """ | |
| last_exception = None | |
| for attempt in range(self.max_retries): | |
| try: | |
| # Add timeout to the completion call | |
| response = await asyncio.wait_for( | |
| litellm.acompletion(**completion_kwargs), | |
| timeout=self.timeout | |
| ) | |
| return response | |
| except asyncio.TimeoutError: | |
| self._logger.error( | |
| f"LLM call timed out after {self.timeout}s (attempt {attempt + 1}/{self.max_retries})" | |
| ) | |
| last_exception = TimeoutError(f"LLM call timed out after {self.timeout}s") | |
| if attempt < self.max_retries - 1: | |
| # Retry on timeout with shorter delay | |
| self._logger.info(f"Retrying after {self.retry_delay}s delay...") | |
| await asyncio.sleep(self.retry_delay) | |
| continue | |
| else: | |
| raise last_exception | |
| except Exception as e: | |
| last_exception = e | |
| error_str = str(e).lower() | |
| # Check if it's a retryable error | |
| is_rate_limit = any( | |
| keyword in error_str | |
| for keyword in ['rate limit', 'rate_limit', 'too many requests', '429'] | |
| ) | |
| is_overloaded = any( | |
| keyword in error_str | |
| for keyword in ['overloaded', '500', '502', '503', '504', 'internal server error', 'service unavailable'] | |
| ) | |
| is_connection_error = any( | |
| keyword in error_str | |
| for keyword in ['cannot connect', 'connection refused', 'connection reset', | |
| 'connectionerror', 'timeout', 'name resolution', | |
| 'temporary failure', 'network unreachable'] | |
| ) | |
| if attempt < self.max_retries - 1 and (is_rate_limit or is_overloaded or is_connection_error): | |
| if is_rate_limit: | |
| backoff_delay = 60 + (attempt * 30) # 60s, 90s, 120s | |
| error_type = "Rate limit" | |
| elif is_connection_error: | |
| backoff_delay = min(10 * (2 ** attempt), 60) # 10s, 20s, 40s, max 60s | |
| error_type = "Connection" | |
| else: | |
| backoff_delay = min(5 * (2 ** attempt), 60) # 5s, 10s, 20s, max 60s | |
| error_type = "Server overload" | |
| self._logger.warning( | |
| f"{error_type} error (attempt {attempt + 1}/{self.max_retries}), " | |
| f"waiting {backoff_delay}s before retry..." | |
| ) | |
| await asyncio.sleep(backoff_delay) | |
| continue | |
| else: | |
| # Not a retryable error, or max retries reached | |
| if attempt >= self.max_retries - 1: | |
| self._logger.error(f"Max retries ({self.max_retries}) reached, giving up") | |
| raise | |
| raise last_exception | |
| async def complete( | |
| self, | |
| messages: List[Dict] | str, | |
| tools: List[BaseTool] | None = None, | |
| execute_tools: bool = True, | |
| summary_prompt: Optional[str] = None, | |
| tool_result_callback: Optional[callable] = None, | |
| **kwargs | |
| ) -> Dict: | |
| """ | |
| Single-round LLM call with optional tool execution. | |
| Args: | |
| messages: conversation history (List[Dict] for standard OpenAI format, or str for text format) | |
| tools: BaseTool instance list (must be obtained from GroundingClient and bound to runtime_info) | |
| if None or empty list, only perform conversation, no tools | |
| execute_tools: if LLM returns tool_calls, whether to automatically execute tools | |
| summary_prompt: Optional custom prompt for requesting iteration summary. | |
| If provided, will request summary after tool execution. | |
| If None, no summary will be requested. | |
| tool_result_callback: Optional async callback to process tool results after execution. | |
| Signature: async def callback(result: ToolResult, tool_name: str, tool_call: Dict, backend: str) -> ToolResult | |
| **kwargs: additional parameters for litellm completion | |
| """ | |
| # 1. Process messages | |
| if isinstance(messages, str): | |
| current_messages = [{"role": "user", "content": messages}] | |
| user_task = messages | |
| elif isinstance(messages, list): | |
| current_messages = messages.copy() | |
| # Extract first user message as task for context-aware summarization | |
| user_task = next( | |
| (m.get("content", "") for m in messages if m.get("role") == "user"), | |
| "" | |
| ) | |
| else: | |
| raise ValueError("messages must be List[Dict] or str") | |
| # 2. prepare base litellm completion kwargs | |
| completion_kwargs = { | |
| "model": kwargs.get("model", self.model), | |
| **self.litellm_kwargs, | |
| } | |
| # Add thinking/reasoning_effort only if explicitly enabled and not using tools | |
| enable_thinking = kwargs.get("enable_thinking", self.enable_thinking) | |
| # 3. if tools are provided, add them to the request | |
| llm_tools = None | |
| tool_map = {} # llm_name -> BaseTool | |
| if tools: | |
| llm_tools, tool_map = _prepare_tools_for_llmclient(tools, fmt="openai") | |
| if llm_tools: | |
| completion_kwargs["tools"] = llm_tools | |
| completion_kwargs["tool_choice"] = kwargs.get("tool_choice", "auto") | |
| # Disable thinking when using tools to avoid format conflicts | |
| enable_thinking = False | |
| self._logger.debug(f"Prepared {len(llm_tools)} tools for LLM") | |
| else: | |
| self._logger.warning("Tools provided but none could be prepared for LLM") | |
| # Add thinking parameters if enabled | |
| if enable_thinking: | |
| completion_kwargs["reasoning_effort"] = kwargs.get("reasoning_effort", "medium") | |
| # 4. Apply rate limiting | |
| await self._rate_limit() | |
| # 5. Call LLM with retry (single round) | |
| completion_kwargs["messages"] = current_messages | |
| response = await self._call_with_retry(**completion_kwargs) | |
| if not response.choices: | |
| raise ValueError("LLM response has no choices") | |
| response_message = response.choices[0].message | |
| # 6. Build assistant message | |
| assistant_message = { | |
| "role": "assistant", | |
| "content": response_message.content or "", | |
| } | |
| tool_calls = getattr(response_message, 'tool_calls', None) | |
| if tool_calls: | |
| assistant_message["tool_calls"] = [ | |
| { | |
| "id": tc.id, | |
| "type": "function", | |
| "function": { | |
| "name": tc.function.name, | |
| "arguments": tc.function.arguments | |
| } | |
| } | |
| for tc in tool_calls | |
| ] | |
| # Add assistant message to conversation | |
| current_messages.append(assistant_message) | |
| # 7. Execute tools if requested | |
| tool_results = [] | |
| if execute_tools and tool_calls and tools: | |
| self._logger.info(f"Executing {len(tool_calls)} tool calls...") | |
| for tool_call in tool_calls: | |
| tool_name = tool_call.function.name | |
| # Resolve tool instance: key might differ from model response (e.g. API returns | |
| # "read_file" while we stored "server__read_file" for dedup), so fallback by schema.name | |
| tool_obj = tool_map.get(tool_name) | |
| if tool_obj is None and tool_name: | |
| for _k, _t in tool_map.items(): | |
| if getattr(getattr(_t, "schema", None), "name", None) == tool_name: | |
| tool_obj = _t | |
| break | |
| backend = None | |
| server_name = None | |
| if tool_obj: | |
| try: | |
| # Prefer runtime_info if bound | |
| if getattr(tool_obj, 'is_bound', False) and getattr(tool_obj, 'runtime_info', None): | |
| backend = tool_obj.runtime_info.backend.value | |
| server_name = tool_obj.runtime_info.server_name | |
| else: | |
| bt = getattr(tool_obj, 'backend_type', None) | |
| bv = getattr(bt, 'value', None) if bt is not None else None | |
| if bv and bv not in ("not_set",): | |
| backend = bv | |
| except Exception as e: | |
| self._logger.warning(f"Failed to resolve backend for tool '{tool_name}': {e}") | |
| # Ensure backend is set for recording: API may return different tool name, or | |
| # runtime_info/backend_type can be missing or raise | |
| if backend is None and tool_name: | |
| backend = _infer_backend_from_tool_name(tool_name) | |
| if backend is None: | |
| self._logger.warning( | |
| f"Could not resolve backend for tool '{tool_name}', " | |
| f"recording will be skipped" | |
| ) | |
| # Log tool execution | |
| try: | |
| if isinstance(tool_call.function.arguments, str): | |
| safe_args_str = tool_call.function.arguments.strip() or "{}" | |
| args = json.loads(safe_args_str) | |
| else: | |
| args = tool_call.function.arguments | |
| args_str = json.dumps(args, ensure_ascii=False)[:200] | |
| self._logger.info(f"Calling {tool_name} with args: {args_str}") | |
| except: | |
| pass | |
| if tool_name not in tool_map: | |
| result = ToolResult( | |
| status=ToolStatus.ERROR, | |
| error=f"Tool '{tool_name}' not found" | |
| ) | |
| else: | |
| try: | |
| result = await _execute_tool_call( | |
| tool=tool_map[tool_name], | |
| openai_tool_call={ | |
| "id": tool_call.id, | |
| "type": "function", | |
| "function": { | |
| "name": tool_call.function.name, | |
| "arguments": tool_call.function.arguments | |
| } | |
| } | |
| ) | |
| # Apply tool result callback if provided | |
| if tool_result_callback and not result.is_error: | |
| try: | |
| result = await tool_result_callback( | |
| result=result, | |
| tool_name=tool_name, | |
| tool_call=tool_call, | |
| backend=backend | |
| ) | |
| except Exception as e: | |
| self._logger.warning(f"Tool result callback failed for {tool_name}: {e}") | |
| except Exception as e: | |
| result = ToolResult( | |
| status=ToolStatus.ERROR, | |
| error=str(e) | |
| ) | |
| # Use async version with LLM summarization for large results | |
| tool_message = await _tool_result_to_message_async( | |
| result, | |
| tool_call_id=tool_call.id, | |
| tool_name=tool_name, | |
| task=user_task, | |
| summarize_threshold=self.summarize_threshold_chars, | |
| summarize_model=self.model, | |
| enable_summarization=self.enable_tool_result_summarization | |
| ) | |
| current_messages.append(tool_message) | |
| # Store result | |
| tool_results.append({ | |
| "tool_call": tool_call, | |
| "result": result, | |
| "message": tool_message, | |
| "backend": backend, | |
| "server_name": server_name, | |
| }) | |
| self._logger.info(f"Tool execution completed, {len(tool_results)} tools executed") | |
| # 8. Request summary if provided and tools were executed | |
| iteration_summary = None | |
| if summary_prompt and tool_results: | |
| self._logger.debug("Requesting iteration summary from LLM") | |
| summary_message = { | |
| "role": "system", | |
| "content": summary_prompt | |
| } | |
| current_messages.append(summary_message) | |
| # Apply rate limiting before summary call | |
| await self._rate_limit() | |
| # Call LLM to generate summary (without tools) | |
| summary_kwargs = { | |
| **self.litellm_kwargs, | |
| "model": self.model, | |
| "messages": current_messages, | |
| "tools": [], | |
| "tool_choice": "none", | |
| } | |
| summary_response = await self._call_with_retry(**summary_kwargs) | |
| if summary_response.choices: | |
| summary_message = summary_response.choices[0].message | |
| iteration_summary = summary_message.content or "" | |
| # Add summary response to messages | |
| current_messages.append({ | |
| "role": "assistant", | |
| "content": iteration_summary | |
| }) | |
| self._logger.debug(f"Generated iteration summary: {iteration_summary[:100]}...") | |
| # 9. Return single-round result | |
| return { | |
| "message": assistant_message, | |
| "tool_results": tool_results, | |
| "messages": current_messages, | |
| "has_tool_calls": bool(tool_calls), | |
| "iteration_summary": iteration_summary | |
| } | |
| def format_messages_to_text(messages: List[Dict]) -> str: | |
| """Format conversation history to readable text (for logging/debugging)""" | |
| formatted = "" | |
| for msg in messages: | |
| role = msg.get("role", "unknown").upper() | |
| content = msg.get("content", "") | |
| formatted += f"[{role}]\n{content}\n\n" | |
| return formatted |