OpenSpace / openspace /agents /grounding_agent.py
darkfire514's picture
Upload 160 files
399b80c verified
from __future__ import annotations
import copy
import json
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from openspace.agents.base import BaseAgent
from openspace.grounding.core.types import BackendType, ToolResult
from openspace.platforms.screenshot import ScreenshotClient
from openspace.prompts import GroundingAgentPrompts
from openspace.utils.logging import Logger
if TYPE_CHECKING:
from openspace.llm import LLMClient
from openspace.grounding.core.grounding_client import GroundingClient
from openspace.recording import RecordingManager
from openspace.skill_engine import SkillRegistry
logger = Logger.get_logger(__name__)
class GroundingAgent(BaseAgent):
def __init__(
self,
name: str = "GroundingAgent",
backend_scope: Optional[List[str]] = None,
llm_client: Optional[LLMClient] = None,
grounding_client: Optional[GroundingClient] = None,
recording_manager: Optional[RecordingManager] = None,
system_prompt: Optional[str] = None,
max_iterations: int = 15,
visual_analysis_timeout: float = 30.0,
tool_retrieval_llm: Optional[LLMClient] = None,
visual_analysis_model: Optional[str] = None,
) -> None:
"""
Initialize the Grounding Agent.
Args:
name: Agent name
backend_scope: List of backends this agent can access (None = all available)
llm_client: LLM client for reasoning
grounding_client: GroundingClient for tool execution
recording_manager: RecordingManager for recording execution
system_prompt: Custom system prompt
max_iterations: Maximum LLM reasoning iterations for self-correction
visual_analysis_timeout: Timeout for visual analysis LLM calls in seconds
tool_retrieval_llm: LLM client for tool retrieval filter (None = use llm_client)
visual_analysis_model: Model name for visual analysis (None = use llm_client.model)
"""
super().__init__(
name=name,
backend_scope=backend_scope or ["gui", "shell", "mcp", "web", "system"],
llm_client=llm_client,
grounding_client=grounding_client,
recording_manager=recording_manager
)
self._system_prompt = system_prompt or self._default_system_prompt()
self._max_iterations = max_iterations
self._visual_analysis_timeout = visual_analysis_timeout
self._tool_retrieval_llm = tool_retrieval_llm
self._visual_analysis_model = visual_analysis_model
# Skill context injection (set externally before process())
self._skill_context: Optional[str] = None
self._active_skill_ids: List[str] = []
# Skill registry for mid-iteration retrieve_skill tool
self._skill_registry: Optional["SkillRegistry"] = None
# Tools from the last execution (available for post-execution analysis)
self._last_tools: List = []
logger.info(f"Grounding Agent initialized: {name}")
logger.info(f"Backend scope: {self._backend_scope}")
logger.info(f"Max iterations: {self._max_iterations}")
logger.info(f"Visual analysis timeout: {self._visual_analysis_timeout}s")
if tool_retrieval_llm:
logger.info(f"Tool retrieval model: {tool_retrieval_llm.model}")
if visual_analysis_model:
logger.info(f"Visual analysis model: {visual_analysis_model}")
def set_skill_context(
self,
context: str,
skill_ids: Optional[List[str]] = None,
) -> None:
"""Inject skill guidance into the agent's system prompt.
Called by ``OpenSpace.execute()`` before ``process()`` when skills
are matched. The context is a formatted string built by
``SkillRegistry.build_context_injection()``.
Args:
context: Formatted skill content for system prompt injection.
skill_ids: skill_id values of injected skills.
"""
self._skill_context = context if context else None
self._active_skill_ids = skill_ids or []
if self._skill_context:
logger.info(f"Skill context set: {', '.join(self._active_skill_ids) or '(unnamed)'}")
def clear_skill_context(self) -> None:
"""Remove skill guidance (used before fallback execution)."""
if self._skill_context:
logger.info(f"Skill context cleared (was: {', '.join(self._active_skill_ids)})")
self._skill_context = None
self._active_skill_ids = []
@property
def has_skill_context(self) -> bool:
return self._skill_context is not None
def set_skill_registry(self, registry: Optional["SkillRegistry"]) -> None:
"""Attach a SkillRegistry so the agent can offer ``retrieve_skill`` as a tool."""
self._skill_registry = registry
if registry:
count = len(registry.list_skills())
logger.info(f"Skill registry attached ({count} skill(s) available for mid-iteration retrieval)")
_MAX_SINGLE_CONTENT_CHARS = 30_000
@classmethod
def _cap_message_content(cls, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Truncate oversized individual message contents in-place.
Targets tool-result messages and assistant messages that can
carry enormous file contents (read_file on large CSVs/scripts).
System messages and the first user instruction are never touched.
"""
cap = cls._MAX_SINGLE_CONTENT_CHARS
trimmed = 0
for msg in messages:
content = msg.get("content")
if not isinstance(content, str) or len(content) <= cap:
continue
if msg.get("role") == "system":
continue
original_len = len(content)
msg["content"] = (
content[: cap // 2]
+ f"\n\n... [truncated {original_len - cap:,} chars] ...\n\n"
+ content[-(cap // 2):]
)
trimmed += 1
if trimmed:
logger.info(f"Capped {trimmed} oversized message(s) to {cap:,} chars each")
return messages
def _truncate_messages(
self,
messages: List[Dict[str, Any]],
keep_recent: int = 8,
max_tokens_estimate: int = 120000
) -> List[Dict[str, Any]]:
# First: cap any single oversized message to prevent one huge
# tool-result from dominating the context window.
messages = self._cap_message_content(messages)
if len(messages) <= keep_recent + 2: # +2 for system and initial user
return messages
total_text = json.dumps(messages, ensure_ascii=False)
estimated_tokens = len(total_text) // 4
if estimated_tokens < max_tokens_estimate:
return messages
logger.info(f"Truncating message history: {len(messages)} messages, "
f"~{estimated_tokens:,} tokens -> keeping recent {keep_recent} rounds")
system_messages = []
user_instruction = None
conversation_messages = []
for msg in messages:
role = msg.get("role")
if role == "system":
system_messages.append(msg)
elif role == "user" and user_instruction is None:
user_instruction = msg
else:
conversation_messages.append(msg)
recent_messages = conversation_messages[-(keep_recent * 2):] if conversation_messages else []
truncated = system_messages.copy()
if user_instruction:
truncated.append(user_instruction)
truncated.extend(recent_messages)
logger.info(f"After truncation: {len(truncated)} messages, "
f"~{len(json.dumps(truncated, ensure_ascii=False))//4:,} tokens (estimated)")
return truncated
async def process(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Process a task execution request with multi-round iteration control.
"""
instruction = context.get("instruction", "")
if not instruction:
logger.error("Grounding Agent: No instruction provided")
return {"error": "No instruction provided", "status": "error"}
# Store current instruction for visual analysis context
self._current_instruction = instruction
logger.info(f"Grounding Agent: Processing instruction at step {self.step}")
# Exist workspace files check
workspace_info = await self._check_workspace_artifacts(context)
if workspace_info["has_files"]:
context["workspace_artifacts"] = workspace_info
logger.info(f"Workspace has {len(workspace_info['files'])} existing files: {workspace_info['files']}")
# Get available tools (auto-search with cap)
tools = await self._get_available_tools(instruction)
self._last_tools = tools # expose for post-execution analysis
# Get search debug info (similarity scores, LLM selections)
search_debug_info = None
if self.grounding_client:
search_debug_info = self.grounding_client.get_last_search_debug_info()
# Build retrieved tools list for return value
retrieved_tools_list = []
for tool in tools:
tool_info = {
"name": getattr(tool, "name", str(tool)),
"description": getattr(tool, "description", ""),
}
# Prefer runtime_info.backend
# over backend_type (may be NOT_SET for cached RemoteTools)
runtime_info = getattr(tool, "_runtime_info", None)
if runtime_info and hasattr(runtime_info, "backend"):
tool_info["backend"] = runtime_info.backend.value if hasattr(runtime_info.backend, "value") else str(runtime_info.backend)
tool_info["server_name"] = runtime_info.server_name
elif hasattr(tool, "backend_type"):
tool_info["backend"] = tool.backend_type.value if hasattr(tool.backend_type, "value") else str(tool.backend_type)
# Add similarity score if available
if search_debug_info and search_debug_info.get("tool_scores"):
for score_info in search_debug_info["tool_scores"]:
if score_info["name"] == tool_info["name"]:
tool_info["similarity_score"] = score_info["score"]
break
retrieved_tools_list.append(tool_info)
# Record retrieved tools
if self._recording_manager:
from openspace.recording import RecordingManager
await RecordingManager.record_retrieved_tools(
task_instruction=instruction,
tools=tools,
search_debug_info=search_debug_info,
)
# Initialize iteration state
max_iterations = context.get("max_iterations", self._max_iterations)
current_iteration = 0
all_tool_results = []
iteration_contexts = []
consecutive_empty_responses = 0 # Track consecutive empty LLM responses
MAX_CONSECUTIVE_EMPTY = 5 # Exit after this many empty responses
# Build initial messages
messages = self.construct_messages(context)
# Record initial conversation setup once (system prompts + user instruction + tool definitions)
from openspace.recording import RecordingManager
await RecordingManager.record_conversation_setup(
setup_messages=copy.deepcopy(messages),
tools=tools,
)
try:
while current_iteration < max_iterations:
current_iteration += 1
logger.info(f"Grounding Agent: Iteration {current_iteration}/{max_iterations}")
# Strip skill context after the first iteration to save prompt tokens.
# Skills only need to guide the first LLM call; subsequent iterations
# already have the plan and tool results in context.
if current_iteration == 2 and self._skill_context:
skill_ctx = self._skill_context
messages = [
m for m in messages
if not (m.get("role") == "system" and m.get("content") == skill_ctx)
]
logger.info("Skill context removed from messages after first iteration")
# Cap oversized individual messages every iteration to prevent
# a single huge tool result from ballooning all subsequent calls.
if current_iteration >= 2:
messages = self._cap_message_content(messages)
# Truncate message history to prevent context length issues
# Start truncating after 5 iterations to keep context manageable
if current_iteration >= 5:
messages = self._truncate_messages(
messages,
keep_recent=8,
max_tokens_estimate=120000
)
messages_input_snapshot = copy.deepcopy(messages)
# [DISABLED] Iteration summary generation
# Tool results (including visual analysis) are already in context,
# LLM can make decisions directly without separate summary.
# To re-enable, uncomment below and pass iteration_summary_prompt to complete()
# iteration_summary_prompt = GroundingAgentPrompts.iteration_summary(
# instruction=instruction,
# iteration=current_iteration,
# max_iterations=max_iterations
# ) if context.get("auto_execute", True) else None
# Call LLMClient for single round
# LLM will decide whether to call tools or finish with <COMPLETE>
llm_response = await self._llm_client.complete(
messages=messages,
tools=tools if context.get("auto_execute", True) else None,
execute_tools=context.get("auto_execute", True),
summary_prompt=None, # Disabled
tool_result_callback=self._visual_analysis_callback
)
# Update messages with LLM response
messages = llm_response["messages"]
# Collect tool results
tool_results_this_iteration = llm_response.get("tool_results", [])
if tool_results_this_iteration:
all_tool_results.extend(tool_results_this_iteration)
# [DISABLED] Iteration summary logging
# llm_summary = llm_response.get("iteration_summary")
# if llm_summary:
# logger.info(f"Iteration {current_iteration} summary: {llm_summary[:150]}...")
assistant_message = llm_response.get("message", {})
assistant_content = assistant_message.get("content", "")
has_tool_calls = llm_response.get('has_tool_calls', False)
logger.info(f"Iteration {current_iteration} - Has tool calls: {has_tool_calls}, "
f"Tool results: {len(tool_results_this_iteration)}, "
f"Content length: {len(assistant_content)} chars")
if len(assistant_content) > 0:
logger.info(f"Iteration {current_iteration} - Assistant content preview: {repr(assistant_content[:300])}")
consecutive_empty_responses = 0 # Reset counter on valid response
else:
if not has_tool_calls:
consecutive_empty_responses += 1
logger.warning(f"Iteration {current_iteration} - NO tool calls and NO content "
f"(empty response {consecutive_empty_responses}/{MAX_CONSECUTIVE_EMPTY})")
if consecutive_empty_responses >= MAX_CONSECUTIVE_EMPTY:
logger.error(f"Exiting due to {MAX_CONSECUTIVE_EMPTY} consecutive empty LLM responses. "
"This may indicate API issues, rate limiting, or context too long.")
break
else:
consecutive_empty_responses = 0 # Reset if we have tool calls
# Snapshot messages after LLM call (accumulated context)
messages_output_snapshot = copy.deepcopy(messages)
# Delta messages: only the messages produced in this iteration
# (avoids repeating system prompts / initial user instruction each time)
delta_messages = messages[len(messages_input_snapshot):]
# Response metadata (lightweight; full content lives in delta_messages)
response_metadata = {
"has_tool_calls": has_tool_calls,
"tool_calls_count": len(tool_results_this_iteration),
}
iteration_context = {
"iteration": current_iteration,
"messages_input": messages_input_snapshot,
"messages_output": messages_output_snapshot,
"response_metadata": response_metadata,
}
iteration_contexts.append(iteration_context)
# Real-time save to conversations.jsonl (delta only, no redundancy)
await RecordingManager.record_iteration_context(
iteration=current_iteration,
delta_messages=copy.deepcopy(delta_messages),
response_metadata=response_metadata,
)
# Check for completion token in assistant content
# [DISABLED] Also check in iteration summary when enabled
# is_complete = (
# GroundingAgentPrompts.TASK_COMPLETE in assistant_content or
# (llm_summary and GroundingAgentPrompts.TASK_COMPLETE in llm_summary)
# )
is_complete = GroundingAgentPrompts.TASK_COMPLETE in assistant_content
if is_complete:
# Task is complete - LLM generated completion token
logger.info(f"Task completed at iteration {current_iteration} (found {GroundingAgentPrompts.TASK_COMPLETE})")
break
else:
# LLM didn't generate <COMPLETE>, continue to next iteration
if tool_results_this_iteration:
logger.debug(f"Task in progress, LLM called {len(tool_results_this_iteration)} tools")
else:
logger.debug(f"Task in progress, LLM did not generate <COMPLETE>")
# Remove previous iteration guidance to avoid accumulation
messages = [
msg for msg in messages
if not (msg.get("role") == "system" and "Iteration" in msg.get("content", "") and "complete" in msg.get("content", ""))
]
guidance_msg = {
"role": "system",
"content": f"Iteration {current_iteration} complete. "
f"Check if task is finished - if yes, output {GroundingAgentPrompts.TASK_COMPLETE}. "
f"If not, continue with next action."
}
messages.append(guidance_msg)
# [DISABLED] Full iteration feedback with summary
# self._remove_previous_guidance(messages)
# feedback_msg = self._build_iteration_feedback(
# iteration=current_iteration,
# llm_summary=llm_summary,
# add_guidance=True
# )
# if feedback_msg:
# messages.append(feedback_msg)
# logger.debug(f"Added iteration {current_iteration} feedback with guidance")
continue
# Build final result
result = await self._build_final_result(
instruction=instruction,
messages=messages,
all_tool_results=all_tool_results,
iterations=current_iteration,
max_iterations=max_iterations,
iteration_contexts=iteration_contexts,
retrieved_tools_list=retrieved_tools_list,
search_debug_info=search_debug_info,
)
# Record agent action to recording manager
if self._recording_manager:
await self._record_agent_execution(result, instruction)
# Increment step
self.increment_step()
logger.info(f"Grounding Agent: Execution completed with status: {result.get('status')}")
return result
except Exception as e:
logger.error(f"Grounding Agent: Execution failed: {e}")
result = {
"error": str(e),
"status": "error",
"instruction": instruction,
"iteration": current_iteration
}
self.increment_step()
return result
def _default_system_prompt(self) -> str:
"""Default system prompt tailored to the agent's actual backend scope."""
return GroundingAgentPrompts.build_system_prompt(self._backend_scope)
def construct_messages(
self,
context: Dict[str, Any]
) -> List[Dict[str, Any]]:
messages = [{"role": "system", "content": self._system_prompt}]
# Get instruction from context
instruction = context.get("instruction", "")
if not instruction:
raise ValueError("context must contain 'instruction' field")
# Add workspace directory
workspace_dir = context.get("workspace_dir")
if workspace_dir:
messages.append({
"role": "system",
"content": GroundingAgentPrompts.workspace_directory(workspace_dir)
})
# Add workspace artifacts information
workspace_artifacts = context.get("workspace_artifacts")
if workspace_artifacts and workspace_artifacts.get("has_files"):
files = workspace_artifacts.get("files", [])
matching_files = workspace_artifacts.get("matching_files", [])
recent_files = workspace_artifacts.get("recent_files", [])
if matching_files:
artifact_msg = GroundingAgentPrompts.workspace_matching_files(matching_files)
elif len(recent_files) >= 2:
artifact_msg = GroundingAgentPrompts.workspace_recent_files(
total_files=len(files),
recent_files=recent_files
)
else:
artifact_msg = GroundingAgentPrompts.workspace_file_list(files)
messages.append({
"role": "system",
"content": artifact_msg
})
# Skill injection — only active (selected) skills, full content
if self._skill_context:
messages.append({
"role": "system",
"content": self._skill_context
})
logger.info(f"Injected active skill context ({len(self._active_skill_ids)} skill(s))")
# User instruction
messages.append({"role": "user", "content": instruction})
return messages
async def _get_available_tools(self, task_description: Optional[str]) -> List:
"""
Retrieve tools for the current execution phase.
Both skill-augmented and normal modes use the same
``get_tools_with_auto_search`` pipeline:
- Non-MCP tools (shell, gui, web, system) are always included.
- MCP tools are filtered by relevance only when their count
exceeds ``max_tools``.
When skills are active, the shell backend is guaranteed to be in
scope (skills commonly reference ``shell_agent``).
Falls back to returning all tools if anything fails.
"""
grounding_client = self.grounding_client
if not grounding_client:
return []
backends = [BackendType(name) for name in self._backend_scope]
# Ensure shell backend is available when skills are active
# (skills commonly reference shell_agent, read_file, etc.)
if self.has_skill_context:
shell_bt = BackendType.SHELL
if shell_bt not in backends:
backends = list(backends) + [shell_bt]
logger.info("Added Shell backend to scope for skill file I/O")
try:
retrieval_llm = self._tool_retrieval_llm or self._llm_client
tools = await grounding_client.get_tools_with_auto_search(
task_description=task_description,
backend=backends,
use_cache=True,
llm_callable=retrieval_llm,
)
logger.info(
f"GroundingAgent selected {len(tools)} tools (auto-search) "
f"from {len(backends)} backends"
+ (f" [skill-augmented]" if self.has_skill_context else "")
)
except Exception as e:
logger.warning(f"Auto-search tools failed, falling back to full list: {e}")
tools = await self._load_all_tools(grounding_client)
# Append retrieve_skill tool when skill registry is available
if self._skill_registry and self._skill_registry.list_skills():
from openspace.skill_engine.retrieve_tool import RetrieveSkillTool
retrieve_llm = self._tool_retrieval_llm or self._llm_client
retrieve_tool = RetrieveSkillTool(
self._skill_registry,
backends=[b.value for b in backends],
llm_client=retrieve_llm,
skill_store=getattr(self, "_skill_store", None),
)
retrieve_tool.bind_runtime_info(
backend=BackendType.SYSTEM,
session_name="internal",
)
tools.append(retrieve_tool)
logger.info("Added retrieve_skill tool for mid-iteration skill retrieval")
return tools
async def _load_all_tools(self, grounding_client: "GroundingClient") -> List:
"""Fallback: load all tools from all backends without search."""
all_tools = []
for backend_name in self._backend_scope:
try:
backend_type = BackendType(backend_name)
tools = await grounding_client.list_tools(backend=backend_type)
all_tools.extend(tools)
logger.debug(f"Retrieved {len(tools)} tools from backend: {backend_name}")
except Exception as e:
logger.debug(f"Could not get tools from {backend_name}: {e}")
logger.info(
f"GroundingAgent fallback retrieved {len(all_tools)} tools "
f"from {len(self._backend_scope)} backends"
)
return all_tools
async def _visual_analysis_callback(
self,
result: ToolResult,
tool_name: str,
tool_call: Dict,
backend: str
) -> ToolResult:
"""
Callback for LLMClient to handle visual analysis after tool execution.
"""
# 1. Check if LLM requested to skip visual analysis
skip_visual_analysis = False
try:
arguments = tool_call.function.arguments
if isinstance(arguments, str):
args = json.loads(arguments.strip() or "{}")
else:
args = arguments
if isinstance(args, dict) and args.get("skip_visual_analysis"):
skip_visual_analysis = True
logger.info(f"Visual analysis skipped for {tool_name} (meta-parameter set by LLM)")
except Exception as e:
logger.debug(f"Could not parse tool arguments: {e}")
# 2. If skip requested, return original result
if skip_visual_analysis:
return result
# 3. Check if this backend needs visual analysis
if backend != "gui":
return result
# 4. Check if tool has visual data
metadata = getattr(result, 'metadata', None)
has_screenshots = metadata and (metadata.get("screenshot") or metadata.get("screenshots"))
# 5. If no visual data, try to capture a screenshot
if not has_screenshots:
try:
logger.info(f"No visual data from {tool_name}, capturing screenshot...")
screenshot_client = ScreenshotClient()
screenshot_bytes = await screenshot_client.capture()
if screenshot_bytes:
# Add screenshot to result metadata
if metadata is None:
result.metadata = {}
metadata = result.metadata
metadata["screenshot"] = screenshot_bytes
has_screenshots = True
logger.info(f"Screenshot captured for visual analysis")
else:
logger.warning("Failed to capture screenshot")
except Exception as e:
logger.warning(f"Error capturing screenshot: {e}")
# 6. If still no screenshots, return original result
if not has_screenshots:
logger.debug(f"No visual data available for {tool_name}")
return result
# 7. Perform visual analysis
return await self._enhance_result_with_visual_context(result, tool_name)
async def _enhance_result_with_visual_context(
self,
result: ToolResult,
tool_name: str
) -> ToolResult:
"""
Enhance tool result with visual analysis for grounding agent workflows.
"""
import asyncio
import base64
import litellm
try:
metadata = getattr(result, 'metadata', None)
if not metadata:
return result
# Collect all screenshots
screenshots_bytes = []
# Check for multiple screenshots first
if metadata.get("screenshots"):
screenshots_list = metadata["screenshots"]
if isinstance(screenshots_list, list):
screenshots_bytes = [s for s in screenshots_list if s]
# Fall back to single screenshot
elif metadata.get("screenshot"):
screenshots_bytes = [metadata["screenshot"]]
if not screenshots_bytes:
return result
# Select key screenshots if there are too many
selected_screenshots = self._select_key_screenshots(screenshots_bytes, max_count=3)
# Convert to base64
visual_b64_list = []
for visual_data in selected_screenshots:
if isinstance(visual_data, bytes):
visual_b64_list.append(base64.b64encode(visual_data).decode('utf-8'))
else:
visual_b64_list.append(visual_data) # Already base64
# Build prompt based on number of screenshots
num_screenshots = len(visual_b64_list)
prompt = GroundingAgentPrompts.visual_analysis(
tool_name=tool_name,
num_screenshots=num_screenshots,
task_description=getattr(self, '_current_instruction', '')
)
# Build content with text prompt + all images
content = [{"type": "text", "text": prompt}]
for visual_b64 in visual_b64_list:
content.append({
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{visual_b64}"
}
})
# Use dedicated visual analysis model if configured, otherwise use main LLM model
visual_model = self._visual_analysis_model or (self._llm_client.model if self._llm_client else "openrouter/anthropic/claude-sonnet-4.5")
response = await asyncio.wait_for(
litellm.acompletion(
model=visual_model,
messages=[{
"role": "user",
"content": content
}],
timeout=self._visual_analysis_timeout
),
timeout=self._visual_analysis_timeout + 5
)
analysis = response.choices[0].message.content.strip()
# Inject visual analysis into content
original_content = result.content or "(no text output)"
enhanced_content = f"{original_content}\n\n**Visual content**: {analysis}"
# Create enhanced result
enhanced_result = ToolResult(
status=result.status,
content=enhanced_content,
error=result.error,
metadata={**metadata, "visual_analyzed": True, "visual_analysis": analysis},
execution_time=result.execution_time
)
logger.info(f"Enhanced {tool_name} result with visual analysis ({num_screenshots} screenshot(s))")
return enhanced_result
except asyncio.TimeoutError:
logger.warning(f"Visual analysis timed out for {tool_name}, returning original result")
return result
except Exception as e:
logger.warning(f"Failed to analyze visual content for {tool_name}: {e}")
return result
def _select_key_screenshots(
self,
screenshots: List[bytes],
max_count: int = 3
) -> List[bytes]:
"""
Select key screenshots if there are too many.
"""
if len(screenshots) <= max_count:
return screenshots
selected_indices = set()
# Always include last (final state)
selected_indices.add(len(screenshots) - 1)
# If room, include first (initial state)
if max_count >= 2:
selected_indices.add(0)
# Fill remaining slots with evenly spaced middle screenshots
remaining_slots = max_count - len(selected_indices)
if remaining_slots > 0:
# Calculate spacing
available_indices = [
i for i in range(1, len(screenshots) - 1)
if i not in selected_indices
]
if available_indices:
step = max(1, len(available_indices) // (remaining_slots + 1))
for i in range(remaining_slots):
idx = min((i + 1) * step, len(available_indices) - 1)
if idx < len(available_indices):
selected_indices.add(available_indices[idx])
# Return screenshots in original order
selected = [screenshots[i] for i in sorted(selected_indices)]
logger.debug(
f"Selected {len(selected)} screenshots at indices {sorted(selected_indices)} "
f"from total of {len(screenshots)}"
)
return selected
def _get_workspace_path(self, context: Dict[str, Any]) -> Optional[str]:
"""
Get workspace directory path from context.
"""
return context.get("workspace_dir")
def _scan_workspace_files(
self,
workspace_path: str,
recent_threshold: int = 600 # seconds
) -> Dict[str, Any]:
"""
Scan workspace directory and collect file information.
Args:
workspace_path: Path to workspace directory
recent_threshold: Threshold in seconds for recent files
Returns:
Dictionary with file information:
- files: List of all filenames
- file_details: Dict mapping filename to file info (size, modified, age_seconds)
- recent_files: List of recently modified filenames
"""
import os
import time
result = {
"files": [],
"file_details": {},
"recent_files": []
}
if not workspace_path or not os.path.exists(workspace_path):
return result
# Recording system files to exclude from workspace scanning
excluded_files = {"metadata.json", "traj.jsonl"}
try:
current_time = time.time()
for filename in os.listdir(workspace_path):
filepath = os.path.join(workspace_path, filename)
if os.path.isfile(filepath) and filename not in excluded_files:
result["files"].append(filename)
# Get file stats
stat = os.stat(filepath)
file_info = {
"size": stat.st_size,
"modified": stat.st_mtime,
"age_seconds": current_time - stat.st_mtime
}
result["file_details"][filename] = file_info
# Track recently created/modified files
if file_info["age_seconds"] < recent_threshold:
result["recent_files"].append(filename)
result["files"] = sorted(result["files"])
except Exception as e:
logger.debug(f"Error scanning workspace files: {e}")
return result
async def _check_workspace_artifacts(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Check workspace directory for existing artifacts that might be relevant to the task.
Enhanced to detect if task might already be completed.
"""
import re
workspace_info = {"has_files": False, "files": [], "file_details": {}, "recent_files": []}
try:
# Get workspace path
workspace_path = self._get_workspace_path(context)
# Scan workspace files
scan_result = self._scan_workspace_files(workspace_path, recent_threshold=600)
if scan_result["files"]:
workspace_info["has_files"] = True
workspace_info["files"] = scan_result["files"]
workspace_info["file_details"] = scan_result["file_details"]
workspace_info["recent_files"] = scan_result["recent_files"]
logger.info(f"Grounding Agent: Found {len(scan_result['files'])} existing files in workspace "
f"({len(scan_result['recent_files'])} recent)")
# Check if instruction mentions specific filenames
instruction = context.get("instruction", "")
if instruction:
# Look for potential file references in instruction
potential_outputs = []
# Match common file patterns: filename.ext, "filename", 'filename'
file_patterns = re.findall(r'["\']?([a-zA-Z0-9_\-]+\.[a-zA-Z0-9]+)["\']?', instruction)
for pattern in file_patterns:
if pattern in scan_result["files"]:
potential_outputs.append(pattern)
if potential_outputs:
workspace_info["matching_files"] = potential_outputs
logger.info(f"Grounding Agent: Found {len(potential_outputs)} files matching task: {potential_outputs}")
except Exception as e:
logger.debug(f"Could not check workspace artifacts: {e}")
return workspace_info
def _build_iteration_feedback(
self,
iteration: int,
llm_summary: Optional[str] = None,
add_guidance: bool = True
) -> Optional[Dict[str, str]]:
"""
Build feedback message to add to next iteration.
"""
if not llm_summary:
return None
feedback_content = GroundingAgentPrompts.iteration_feedback(
iteration=iteration,
llm_summary=llm_summary,
add_guidance=add_guidance
)
return {
"role": "system",
"content": feedback_content
}
def _remove_previous_guidance(self, messages: List[Dict[str, Any]]) -> None:
"""
Remove guidance section from previous iteration feedback messages.
"""
for msg in messages:
if msg.get("role") == "system":
content = msg.get("content", "")
# Check if this is an iteration feedback message with guidance
if "## Iteration" in content and "Summary" in content and "---" in content:
# Remove everything from "---" onwards (the guidance part)
summary_only = content.split("---")[0].strip()
msg["content"] = summary_only
async def _generate_final_summary(
self,
instruction: str,
messages: List[Dict],
iterations: int
) -> tuple[str, bool, List[Dict]]:
"""
Generate final summary across all iterations for reporting to upper layer.
Returns:
tuple[str, bool, List[Dict]]: (summary_text, success_flag, context_used)
- summary_text: The generated summary or error message
- success_flag: True if summary was generated successfully, False otherwise
- context_used: The cleaned messages used for generating summary
"""
final_summary_prompt = {
"role": "user",
"content": GroundingAgentPrompts.final_summary(
instruction=instruction,
iterations=iterations
)
}
clean_messages = []
for msg in messages:
# Skip tool result messages
if msg.get("role") == "tool":
continue
# Copy message and remove tool_calls if present
clean_msg = msg.copy()
if "tool_calls" in clean_msg:
del clean_msg["tool_calls"]
clean_messages.append(clean_msg)
clean_messages.append(final_summary_prompt)
# Save context for return
context_for_return = copy.deepcopy(clean_messages)
try:
# Call LLMClient to generate final summary (without tools)
summary_response = await self._llm_client.complete(
messages=clean_messages,
tools=None,
execute_tools=False
)
final_summary = summary_response.get("message", {}).get("content", "")
if final_summary:
logger.info(f"Generated final summary: {final_summary[:200]}...")
return final_summary, True, context_for_return
else:
logger.warning("LLM returned empty final summary")
return f"Task completed after {iterations} iteration(s). Check execution history for details.", True, context_for_return
except Exception as e:
logger.error(f"Error generating final summary: {e}")
return f"Task completed after {iterations} iteration(s), but failed to generate summary: {str(e)}", False, context_for_return
async def _build_final_result(
self,
instruction: str,
messages: List[Dict],
all_tool_results: List[Dict],
iterations: int,
max_iterations: int,
iteration_contexts: List[Dict] = None,
retrieved_tools_list: List[Dict] = None,
search_debug_info: Dict[str, Any] = None,
) -> Dict[str, Any]:
"""
Build final execution result.
Args:
instruction: Original instruction
messages: Complete conversation history (including all iteration summaries)
all_tool_results: All tool execution results
iterations: Number of iterations performed
max_iterations: Maximum allowed iterations
iteration_contexts: Context snapshots for each iteration
retrieved_tools_list: List of tools retrieved for this task
search_debug_info: Debug info from tool search (similarity scores, LLM selections)
"""
is_complete = self._check_task_completion(messages)
tool_executions = self._format_tool_executions(all_tool_results)
result = {
"instruction": instruction,
"step": self.step,
"iterations": iterations,
"tool_executions": tool_executions,
"messages": messages,
"iteration_contexts": iteration_contexts or [],
"retrieved_tools_list": retrieved_tools_list or [],
"search_debug_info": search_debug_info,
"active_skills": list(self._active_skill_ids),
"keep_session": True
}
if is_complete:
logger.info("Task completed with <COMPLETE> marker")
# Use LLM's own completion response directly (no extra LLM call needed)
# LLM already generates a summary before outputting <COMPLETE>
last_response = self._extract_last_assistant_message(messages)
# Remove the <COMPLETE> token from response for cleaner output
result["response"] = last_response.replace(GroundingAgentPrompts.TASK_COMPLETE, "").strip()
result["status"] = "success"
# [DISABLED] Extra LLM call to generate final summary
# final_summary, summary_success, final_summary_context = await self._generate_final_summary(
# instruction=instruction,
# messages=messages,
# iterations=iterations
# )
# result["response"] = final_summary
# result["final_summary_context"] = final_summary_context
else:
result["response"] = self._extract_last_assistant_message(messages)
result["status"] = "incomplete"
result["warning"] = (
f"Task reached max iterations ({max_iterations}) without completion. "
f"This may indicate the task needs more steps or clarification."
)
return result
def _format_tool_executions(self, all_tool_results: List[Dict]) -> List[Dict]:
executions = []
for tr in all_tool_results:
tool_result_obj = tr.get("result")
tool_call = tr.get("tool_call")
status = "unknown"
if hasattr(tool_result_obj, 'status'):
status_obj = tool_result_obj.status
status = getattr(status_obj, 'value', status_obj)
# Extract tool_name and arguments from tool_call object (litellm format)
tool_name = "unknown"
arguments = {}
if tool_call is not None:
if hasattr(tool_call, 'function'):
# tool_call is an object with .function attribute
tool_name = getattr(tool_call.function, 'name', 'unknown')
args_raw = getattr(tool_call.function, 'arguments', '{}')
if isinstance(args_raw, str):
try:
arguments = json.loads(args_raw) if args_raw.strip() else {}
except json.JSONDecodeError:
arguments = {}
else:
arguments = args_raw if isinstance(args_raw, dict) else {}
elif isinstance(tool_call, dict):
# Fallback: tool_call is a dict
func = tool_call.get("function", {})
tool_name = func.get("name", "unknown")
args_raw = func.get("arguments", "{}")
if isinstance(args_raw, str):
try:
arguments = json.loads(args_raw) if args_raw.strip() else {}
except json.JSONDecodeError:
arguments = {}
else:
arguments = args_raw if isinstance(args_raw, dict) else {}
executions.append({
"tool_name": tool_name,
"arguments": arguments,
"backend": tr.get("backend"),
"server_name": tr.get("server_name"),
"status": status,
"content": tool_result_obj.content if hasattr(tool_result_obj, 'content') else None,
"error": tool_result_obj.error if hasattr(tool_result_obj, 'error') else None,
"execution_time": tool_result_obj.execution_time if hasattr(tool_result_obj, 'execution_time') else None,
"metadata": tool_result_obj.metadata if hasattr(tool_result_obj, 'metadata') else {},
})
return executions
def _check_task_completion(self, messages: List[Dict]) -> bool:
for msg in reversed(messages):
if msg.get("role") == "assistant":
content = msg.get("content", "")
return GroundingAgentPrompts.TASK_COMPLETE in content
return False
def _extract_last_assistant_message(self, messages: List[Dict]) -> str:
for msg in reversed(messages):
if msg.get("role") == "assistant":
return msg.get("content", "")
return ""
async def _record_agent_execution(
self,
result: Dict[str, Any],
instruction: str
) -> None:
"""
Record agent execution to recording manager.
Args:
result: Execution result
instruction: Original instruction
"""
if not self._recording_manager:
return
# Extract tool execution summary
tool_summary = []
if result.get("tool_executions"):
for exec_info in result["tool_executions"]:
tool_summary.append({
"tool": exec_info.get("tool_name", "unknown"),
"backend": exec_info.get("backend", "unknown"),
"status": exec_info.get("status", "unknown"),
})
await self._recording_manager.record_agent_action(
agent_name=self.name,
action_type="execute",
input_data={"instruction": instruction},
reasoning={
"response": result.get("response", ""),
"tools_selected": tool_summary,
},
output_data={
"status": result.get("status", "unknown"),
"iterations": result.get("iterations", 0),
"num_tool_executions": len(result.get("tool_executions", [])),
},
metadata={
"step": self.step,
"instruction": instruction,
}
)