OpenSpace / openspace /llm /client.py
darkfire514's picture
Upload 160 files
399b80c verified
import litellm
import json
import asyncio
import time
from pathlib import Path
from typing import List, Sequence, Union, Dict, Optional
from dotenv import load_dotenv
from openai.types.chat import ChatCompletionToolParam
from openspace.grounding.core.types import ToolSchema, ToolResult, ToolStatus
from openspace.grounding.core.tool import BaseTool
from openspace.utils.logging import Logger
# Load .env from openspace package root (works regardless of CWD),
# then fall back to CWD/.env. override=False (default) means first-loaded wins.
_PKG_ENV = Path(__file__).resolve().parent.parent / ".env" # openspace/.env
if _PKG_ENV.is_file():
load_dotenv(_PKG_ENV)
load_dotenv() # also try CWD/.env for any remaining vars
# Disable LiteLLM verbose logging to prevent stdout blocking with large tool schemas
litellm.set_verbose = False
litellm.suppress_debug_info = True
logger = Logger.get_logger(__name__)
def _sanitize_schema(params: Dict) -> Dict:
"""Sanitize tool parameter schema to comply with Claude API requirements.
Fixes common issues:
- Empty object schemas (no properties, no required)
- Missing required fields for Claude compatibility
"""
if not params:
return {"type": "object", "properties": {}, "required": []}
# Deep copy to avoid modifying the original
import copy
sanitized = copy.deepcopy(params)
# Anthropic API requires top-level type to be 'object'
# If it's not an object, wrap the schema as a property of an object
top_level_type = sanitized.get("type")
if top_level_type and top_level_type != "object":
# Wrap non-object schema as a single property called "value"
logger.debug(f"[SCHEMA_SANITIZE] Wrapping non-object schema (type={top_level_type}) into object")
wrapped = {
"type": "object",
"properties": {
"value": sanitized # The original schema becomes a property
},
"required": ["value"] # Make it required
}
sanitized = wrapped
# If type is object but missing properties/required, add them
if sanitized.get("type") == "object":
if "properties" not in sanitized:
sanitized["properties"] = {}
if "required" not in sanitized:
sanitized["required"] = []
# Remove non-standard fields that may cause issues (like 'title')
sanitized.pop("title", None)
# Recursively sanitize nested properties
if "properties" in sanitized and isinstance(sanitized["properties"], dict):
for prop_name, prop_schema in list(sanitized["properties"].items()):
if isinstance(prop_schema, dict):
# Remove title from nested properties
prop_schema.pop("title", None)
return sanitized
def _schema_to_openai(schema: ToolSchema) -> ChatCompletionToolParam:
"""Convert ToolSchema to OpenAI ChatCompletion tool format"""
function_def = {
"name": schema.name,
"description": schema.description or "",
}
# Sanitize and add parameters
if schema.parameters:
sanitized = _sanitize_schema(schema.parameters)
function_def["parameters"] = sanitized
# Debug: verify sanitization worked
if "title" in schema.parameters and "title" not in sanitized:
logger.debug(f"Sanitized tool '{schema.name}': removed title")
else:
# Claude requires parameters field even if empty
function_def["parameters"] = {"type": "object", "properties": {}, "required": []}
return {
"type": "function",
"function": function_def
}
def _prepare_tools_for_llmclient(
tools: List[BaseTool] | None,
fmt: str = "openai",
) -> tuple[Sequence[Union[ToolSchema, ChatCompletionToolParam]], Dict[str, BaseTool]]:
"""Convert BaseTool list to LLMClient usable format, with deduplication.
Args:
tools: BaseTool instance list (should be obtained from GroundingClient and bound to runtime_info)
if None or empty list, return empty list
fmt: output format, "openai" for OpenAI format
"""
if not tools:
return [], {}
if fmt == "openai":
result = []
tool_map = {} # llm_name -> BaseTool
name_count = {}
for tool in tools:
name = tool.schema.name
name_count[name] = name_count.get(name, 0) + 1
seen_names = set()
for tool in tools:
original_name = tool.schema.name
if name_count[original_name] > 1:
server_name = "unknown"
if tool.is_bound and tool.runtime_info and tool.runtime_info.server_name:
server_name = tool.runtime_info.server_name
llm_name = f"{server_name}__{original_name}"
else:
llm_name = original_name
if llm_name in seen_names:
logger.warning(f"[TOOL_DEDUP] Skipping duplicate tool: {llm_name}")
continue
seen_names.add(llm_name)
tool_param = _schema_to_openai(tool.schema)
tool_param["function"]["name"] = llm_name
# Tag the description with backend type so the LLM knows each
# tool's origin (e.g. "[MCP] ...", "[Shell] ...").
backend_type = getattr(tool.schema, "backend_type", None)
if backend_type and backend_type.value not in ("not_set",):
_BACKEND_LABELS = {
"mcp": "MCP",
"shell": "Shell",
"gui": "GUI",
"web": "Web",
"system": "System",
}
label = _BACKEND_LABELS.get(backend_type.value, backend_type.value)
desc = tool_param["function"].get("description", "")
tool_param["function"]["description"] = f"[{label}] {desc}"
result.append(tool_param)
tool_map[llm_name] = tool
if llm_name != original_name:
logger.info(f"[TOOL_RENAME] {original_name} -> {llm_name}")
logger.info(f"[SCHEMA_SANITIZE] Prepared {len(result)} tools for LLM (from {len(tools)} total)")
return result, tool_map
tool_map = {tool.schema.name: tool for tool in tools}
return [tool.schema for tool in tools], tool_map
def _infer_backend_from_tool_name(tool_name: str) -> Optional[str]:
"""Infer backend when tool_results would otherwise have no backend (name mismatch or unbound tools)."""
if not tool_name or not isinstance(tool_name, str):
return None
name = tool_name.strip()
# Dedup format: "server__toolname" -> use suffix
if "__" in name:
name = name.split("__", 1)[-1]
shell_tools = {"shell_agent", "read_file", "write_file", "list_dir", "run_shell"}
if name in shell_tools:
return "shell"
if name in ("gui_agent",) or "gui" in name.lower():
return "gui"
if "mcp" in name.lower() or ("." in name and "__" not in name):
return "mcp"
if name in ("deep_research_agent", "deep_research"):
return "web"
return None
DEFAULT_SUMMARIZE_THRESHOLD_CHARS = 200000 # ~50K tokens, lowered from 400K to prevent context overflow
MAX_TOOL_RESULT_CHARS = 200000 # Fallback truncation limit when summarization fails (~50K tokens)
async def _summarize_tool_result(
content: str,
tool_name: str,
task: str = "",
model: str = "openrouter/anthropic/claude-sonnet-4.5",
timeout: float = 120.0
) -> str:
"""Use LLM to summarize large tool results."""
try:
from gdpval_bench.token_tracker import set_call_source, reset_call_source
_src_tok = set_call_source("summarizer")
except ImportError:
_src_tok = None
try:
logger.info(f"Summarizing tool result from '{tool_name}': {len(content):,} chars")
# Pre-truncate if content is too large for the model (leave room for prompt + output)
# Assuming ~4 chars per token, 200K tokens limit, 8K output, ~500 tokens for prompt
# Safe input limit: (200K - 8K - 0.5K) * 4 = ~766K chars, but be conservative at 400K
max_input_chars = 200000
if len(content) > max_input_chars:
logger.warning(f"Pre-truncating content for summarization: {len(content):,} -> {max_input_chars:,} chars")
content = content[:max_input_chars] + f"\n\n[TRUNCATED for summarization: original was {len(content):,} chars]"
task_hint = f"\n\nUser's task: {task}\nSummarize with focus on information relevant to this task." if task else ""
prompt = f"""Tool '{tool_name}' returned a large result ({len(content):,} chars). Summarize it concisely.{task_hint}
**Guidelines:**
- Structured data (coordinates, steps, etc.): Keep key summary (totals, start/end), omit repetitive details.
- Markup content (HTML, XML): Extract text and key data only, ignore tags/scripts.
- Long documents: Keep structure outline and essential sections.
- Lists/arrays: Summarize count and most relevant items.
- Always preserve: numbers, URLs, file paths, IDs, key identifiers.
Content:
{content}
Concise summary:"""
response = await asyncio.wait_for(
litellm.acompletion(
model=model,
messages=[{"role": "user", "content": prompt}],
timeout=timeout
),
timeout=timeout + 5
)
summary = response.choices[0].message.content.strip()
result = f"[SUMMARY of {len(content):,} chars]\n{summary}"
logger.info(f"Tool result summarized: {len(content):,} -> {len(result):,} chars")
return result
except Exception as e:
logger.warning(f"Summarization failed for '{tool_name}': {e}")
return None
finally:
if _src_tok is not None:
reset_call_source(_src_tok)
async def _tool_result_to_message_async(
result: ToolResult,
*,
tool_call_id: str,
tool_name: str,
task: str = "",
summarize_threshold: int = DEFAULT_SUMMARIZE_THRESHOLD_CHARS,
summarize_model: str = "openrouter/anthropic/claude-sonnet-4.5",
enable_summarization: bool = True
) -> Dict:
"""Convert ToolResult to LLMClient usable message format with LLM summarization for large results.
Args:
result: Tool execution result
tool_call_id: OpenAI tool_call ID
tool_name: Tool name
task: User's original task for context-aware summarization
summarize_threshold: If content exceeds this, use LLM summarization
summarize_model: Model to use for summarization
enable_summarization: Whether to enable LLM summarization
Returns:
OpenAI ChatCompletion tool message (text only)
"""
if result.is_error:
text_content = f"[ERROR] {result.error or 'unknown error'}"
else:
text_content = (
result.content
if isinstance(result.content, str)
else json.dumps(result.content, ensure_ascii=False, default=str)
)
original_len = len(text_content)
# Use LLM summarization if content exceeds threshold
if original_len > summarize_threshold and enable_summarization:
summary = await _summarize_tool_result(text_content, tool_name, task, summarize_model)
if summary:
text_content = summary
elif original_len > MAX_TOOL_RESULT_CHARS:
# Fallback: truncate if summarization failed and content is too large
truncate_msg = f"\n\n[TRUNCATED: Original content was {original_len:,} chars, showing first {MAX_TOOL_RESULT_CHARS:,}]"
text_content = text_content[:MAX_TOOL_RESULT_CHARS - len(truncate_msg)] + truncate_msg
logger.warning(f"Tool result truncated for '{tool_name}': {original_len:,} -> {len(text_content):,} chars (summarization failed)")
return {
"role": "tool",
"name": tool_name,
"content": text_content,
"tool_call_id": tool_call_id,
}
async def _execute_tool_call(
tool: BaseTool,
openai_tool_call: Dict,
) -> ToolResult:
"""Execute LLMClient returned tool_call
Args:
tool: BaseTool instance (must be obtained from GroundingClient and bound to runtime_info)
openai_tool_call: LLMClient usable tool_call object, contains id, type, function etc. fields
"""
if not tool.is_bound:
raise ValueError(
f"Tool '{tool.schema.name}' is not bound to runtime_info. "
f"Please ensure tools are obtained from GroundingClient.list_tools() "
f"with bind_runtime_info=True"
)
func = openai_tool_call["function"]
arguments = func.get("arguments", "{}")
if isinstance(arguments, str):
arguments = json.loads(arguments or "{}")
# Filter out parameters that are not in the tool's schema
if isinstance(arguments, dict) and tool.schema.parameters:
# Get valid parameter names from tool schema (JSON Schema format)
schema_params = tool.schema.parameters
valid_params = set()
if isinstance(schema_params, dict) and "properties" in schema_params:
valid_params = set(schema_params["properties"].keys())
# Check for invalid parameters
invalid_params = []
for param_name in list(arguments.keys()):
if param_name == "skip_visual_analysis":
invalid_params.append(param_name)
continue
# Check if parameter is in the tool's schema
if valid_params and param_name not in valid_params:
invalid_params.append(param_name)
# Remove invalid parameters
for param in invalid_params:
arguments.pop(param)
logger.debug(
f"Removed parameter '{param}' from {tool.schema.name} "
f"(not in tool schema)"
)
return await tool.invoke(
parameters=arguments,
keep_session=True
)
class LLMClient:
"""LLMClient class for single round call"""
def __init__(
self,
model: str = "openrouter/anthropic/claude-sonnet-4.5",
enable_thinking: bool = False,
rate_limit_delay: float = 0.0,
max_retries: int = 3,
retry_delay: float = 1.0,
timeout: float = 120.0,
summarize_threshold_chars: int = DEFAULT_SUMMARIZE_THRESHOLD_CHARS,
enable_tool_result_summarization: bool = True,
**litellm_kwargs
):
"""
Args:
model: LLM model identifier
enable_thinking: Whether to enable extended thinking mode
rate_limit_delay: Minimum delay between API calls in seconds (0 = no delay)
max_retries: Maximum number of retries on rate limit errors
retry_delay: Initial delay between retries in seconds (exponential backoff)
timeout: Request timeout in seconds (default: 120s)
summarize_threshold_chars: If tool result exceeds this threshold, use LLM to
summarize the result (default: 50000 chars ≈ 12.5K tokens)
enable_tool_result_summarization: Whether to enable LLM-based summarization for
large tool results (default: True)
**litellm_kwargs: Additional litellm parameters
"""
self.model = model
self.enable_thinking = enable_thinking
self.rate_limit_delay = rate_limit_delay
self.max_retries = max_retries
self.retry_delay = retry_delay
self.timeout = timeout
self.summarize_threshold_chars = summarize_threshold_chars
self.enable_tool_result_summarization = enable_tool_result_summarization
self.litellm_kwargs = litellm_kwargs
self._logger = Logger.get_logger(__name__)
self._last_call_time = 0.0
async def _rate_limit(self):
"""Apply rate limiting by adding delay between API calls"""
if self.rate_limit_delay > 0:
current_time = time.time()
time_since_last_call = current_time - self._last_call_time
if time_since_last_call < self.rate_limit_delay:
sleep_time = self.rate_limit_delay - time_since_last_call
self._logger.debug(f"Rate limiting: waiting {sleep_time:.2f}s before next API call")
await asyncio.sleep(sleep_time)
self._last_call_time = time.time()
async def _call_with_retry(self, **completion_kwargs):
"""Call LLM with backoff retry on rate limit errors
Timeout and retry strategy:
- Single call timeout: self.timeout (default 120s)
- Rate limit retry delays: 60s, 90s, 120s
- Total max time: timeout * max_retries + sum(retry_delays)
"""
last_exception = None
for attempt in range(self.max_retries):
try:
# Add timeout to the completion call
response = await asyncio.wait_for(
litellm.acompletion(**completion_kwargs),
timeout=self.timeout
)
return response
except asyncio.TimeoutError:
self._logger.error(
f"LLM call timed out after {self.timeout}s (attempt {attempt + 1}/{self.max_retries})"
)
last_exception = TimeoutError(f"LLM call timed out after {self.timeout}s")
if attempt < self.max_retries - 1:
# Retry on timeout with shorter delay
self._logger.info(f"Retrying after {self.retry_delay}s delay...")
await asyncio.sleep(self.retry_delay)
continue
else:
raise last_exception
except Exception as e:
last_exception = e
error_str = str(e).lower()
# Check if it's a retryable error
is_rate_limit = any(
keyword in error_str
for keyword in ['rate limit', 'rate_limit', 'too many requests', '429']
)
is_overloaded = any(
keyword in error_str
for keyword in ['overloaded', '500', '502', '503', '504', 'internal server error', 'service unavailable']
)
is_connection_error = any(
keyword in error_str
for keyword in ['cannot connect', 'connection refused', 'connection reset',
'connectionerror', 'timeout', 'name resolution',
'temporary failure', 'network unreachable']
)
if attempt < self.max_retries - 1 and (is_rate_limit or is_overloaded or is_connection_error):
if is_rate_limit:
backoff_delay = 60 + (attempt * 30) # 60s, 90s, 120s
error_type = "Rate limit"
elif is_connection_error:
backoff_delay = min(10 * (2 ** attempt), 60) # 10s, 20s, 40s, max 60s
error_type = "Connection"
else:
backoff_delay = min(5 * (2 ** attempt), 60) # 5s, 10s, 20s, max 60s
error_type = "Server overload"
self._logger.warning(
f"{error_type} error (attempt {attempt + 1}/{self.max_retries}), "
f"waiting {backoff_delay}s before retry..."
)
await asyncio.sleep(backoff_delay)
continue
else:
# Not a retryable error, or max retries reached
if attempt >= self.max_retries - 1:
self._logger.error(f"Max retries ({self.max_retries}) reached, giving up")
raise
raise last_exception
async def complete(
self,
messages: List[Dict] | str,
tools: List[BaseTool] | None = None,
execute_tools: bool = True,
summary_prompt: Optional[str] = None,
tool_result_callback: Optional[callable] = None,
**kwargs
) -> Dict:
"""
Single-round LLM call with optional tool execution.
Args:
messages: conversation history (List[Dict] for standard OpenAI format, or str for text format)
tools: BaseTool instance list (must be obtained from GroundingClient and bound to runtime_info)
if None or empty list, only perform conversation, no tools
execute_tools: if LLM returns tool_calls, whether to automatically execute tools
summary_prompt: Optional custom prompt for requesting iteration summary.
If provided, will request summary after tool execution.
If None, no summary will be requested.
tool_result_callback: Optional async callback to process tool results after execution.
Signature: async def callback(result: ToolResult, tool_name: str, tool_call: Dict, backend: str) -> ToolResult
**kwargs: additional parameters for litellm completion
"""
# 1. Process messages
if isinstance(messages, str):
current_messages = [{"role": "user", "content": messages}]
user_task = messages
elif isinstance(messages, list):
current_messages = messages.copy()
# Extract first user message as task for context-aware summarization
user_task = next(
(m.get("content", "") for m in messages if m.get("role") == "user"),
""
)
else:
raise ValueError("messages must be List[Dict] or str")
# 2. prepare base litellm completion kwargs
completion_kwargs = {
"model": kwargs.get("model", self.model),
**self.litellm_kwargs,
}
# Add thinking/reasoning_effort only if explicitly enabled and not using tools
enable_thinking = kwargs.get("enable_thinking", self.enable_thinking)
# 3. if tools are provided, add them to the request
llm_tools = None
tool_map = {} # llm_name -> BaseTool
if tools:
llm_tools, tool_map = _prepare_tools_for_llmclient(tools, fmt="openai")
if llm_tools:
completion_kwargs["tools"] = llm_tools
completion_kwargs["tool_choice"] = kwargs.get("tool_choice", "auto")
# Disable thinking when using tools to avoid format conflicts
enable_thinking = False
self._logger.debug(f"Prepared {len(llm_tools)} tools for LLM")
else:
self._logger.warning("Tools provided but none could be prepared for LLM")
# Add thinking parameters if enabled
if enable_thinking:
completion_kwargs["reasoning_effort"] = kwargs.get("reasoning_effort", "medium")
# 4. Apply rate limiting
await self._rate_limit()
# 5. Call LLM with retry (single round)
completion_kwargs["messages"] = current_messages
response = await self._call_with_retry(**completion_kwargs)
if not response.choices:
raise ValueError("LLM response has no choices")
response_message = response.choices[0].message
# 6. Build assistant message
assistant_message = {
"role": "assistant",
"content": response_message.content or "",
}
tool_calls = getattr(response_message, 'tool_calls', None)
if tool_calls:
assistant_message["tool_calls"] = [
{
"id": tc.id,
"type": "function",
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments
}
}
for tc in tool_calls
]
# Add assistant message to conversation
current_messages.append(assistant_message)
# 7. Execute tools if requested
tool_results = []
if execute_tools and tool_calls and tools:
self._logger.info(f"Executing {len(tool_calls)} tool calls...")
for tool_call in tool_calls:
tool_name = tool_call.function.name
# Resolve tool instance: key might differ from model response (e.g. API returns
# "read_file" while we stored "server__read_file" for dedup), so fallback by schema.name
tool_obj = tool_map.get(tool_name)
if tool_obj is None and tool_name:
for _k, _t in tool_map.items():
if getattr(getattr(_t, "schema", None), "name", None) == tool_name:
tool_obj = _t
break
backend = None
server_name = None
if tool_obj:
try:
# Prefer runtime_info if bound
if getattr(tool_obj, 'is_bound', False) and getattr(tool_obj, 'runtime_info', None):
backend = tool_obj.runtime_info.backend.value
server_name = tool_obj.runtime_info.server_name
else:
bt = getattr(tool_obj, 'backend_type', None)
bv = getattr(bt, 'value', None) if bt is not None else None
if bv and bv not in ("not_set",):
backend = bv
except Exception as e:
self._logger.warning(f"Failed to resolve backend for tool '{tool_name}': {e}")
# Ensure backend is set for recording: API may return different tool name, or
# runtime_info/backend_type can be missing or raise
if backend is None and tool_name:
backend = _infer_backend_from_tool_name(tool_name)
if backend is None:
self._logger.warning(
f"Could not resolve backend for tool '{tool_name}', "
f"recording will be skipped"
)
# Log tool execution
try:
if isinstance(tool_call.function.arguments, str):
safe_args_str = tool_call.function.arguments.strip() or "{}"
args = json.loads(safe_args_str)
else:
args = tool_call.function.arguments
args_str = json.dumps(args, ensure_ascii=False)[:200]
self._logger.info(f"Calling {tool_name} with args: {args_str}")
except:
pass
if tool_name not in tool_map:
result = ToolResult(
status=ToolStatus.ERROR,
error=f"Tool '{tool_name}' not found"
)
else:
try:
result = await _execute_tool_call(
tool=tool_map[tool_name],
openai_tool_call={
"id": tool_call.id,
"type": "function",
"function": {
"name": tool_call.function.name,
"arguments": tool_call.function.arguments
}
}
)
# Apply tool result callback if provided
if tool_result_callback and not result.is_error:
try:
result = await tool_result_callback(
result=result,
tool_name=tool_name,
tool_call=tool_call,
backend=backend
)
except Exception as e:
self._logger.warning(f"Tool result callback failed for {tool_name}: {e}")
except Exception as e:
result = ToolResult(
status=ToolStatus.ERROR,
error=str(e)
)
# Use async version with LLM summarization for large results
tool_message = await _tool_result_to_message_async(
result,
tool_call_id=tool_call.id,
tool_name=tool_name,
task=user_task,
summarize_threshold=self.summarize_threshold_chars,
summarize_model=self.model,
enable_summarization=self.enable_tool_result_summarization
)
current_messages.append(tool_message)
# Store result
tool_results.append({
"tool_call": tool_call,
"result": result,
"message": tool_message,
"backend": backend,
"server_name": server_name,
})
self._logger.info(f"Tool execution completed, {len(tool_results)} tools executed")
# 8. Request summary if provided and tools were executed
iteration_summary = None
if summary_prompt and tool_results:
self._logger.debug("Requesting iteration summary from LLM")
summary_message = {
"role": "system",
"content": summary_prompt
}
current_messages.append(summary_message)
# Apply rate limiting before summary call
await self._rate_limit()
# Call LLM to generate summary (without tools)
summary_kwargs = {
**self.litellm_kwargs,
"model": self.model,
"messages": current_messages,
"tools": [],
"tool_choice": "none",
}
summary_response = await self._call_with_retry(**summary_kwargs)
if summary_response.choices:
summary_message = summary_response.choices[0].message
iteration_summary = summary_message.content or ""
# Add summary response to messages
current_messages.append({
"role": "assistant",
"content": iteration_summary
})
self._logger.debug(f"Generated iteration summary: {iteration_summary[:100]}...")
# 9. Return single-round result
return {
"message": assistant_message,
"tool_results": tool_results,
"messages": current_messages,
"has_tool_calls": bool(tool_calls),
"iteration_summary": iteration_summary
}
@staticmethod
def format_messages_to_text(messages: List[Dict]) -> str:
"""Format conversation history to readable text (for logging/debugging)"""
formatted = ""
for msg in messages:
role = msg.get("role", "unknown").upper()
content = msg.get("content", "")
formatted += f"[{role}]\n{content}\n\n"
return formatted