from __future__ import annotations import copy import json from typing import TYPE_CHECKING, Any, Dict, List, Optional from openspace.agents.base import BaseAgent from openspace.grounding.core.types import BackendType, ToolResult from openspace.platforms.screenshot import ScreenshotClient from openspace.prompts import GroundingAgentPrompts from openspace.utils.logging import Logger if TYPE_CHECKING: from openspace.llm import LLMClient from openspace.grounding.core.grounding_client import GroundingClient from openspace.recording import RecordingManager from openspace.skill_engine import SkillRegistry logger = Logger.get_logger(__name__) class GroundingAgent(BaseAgent): def __init__( self, name: str = "GroundingAgent", backend_scope: Optional[List[str]] = None, llm_client: Optional[LLMClient] = None, grounding_client: Optional[GroundingClient] = None, recording_manager: Optional[RecordingManager] = None, system_prompt: Optional[str] = None, max_iterations: int = 15, visual_analysis_timeout: float = 30.0, tool_retrieval_llm: Optional[LLMClient] = None, visual_analysis_model: Optional[str] = None, ) -> None: """ Initialize the Grounding Agent. Args: name: Agent name backend_scope: List of backends this agent can access (None = all available) llm_client: LLM client for reasoning grounding_client: GroundingClient for tool execution recording_manager: RecordingManager for recording execution system_prompt: Custom system prompt max_iterations: Maximum LLM reasoning iterations for self-correction visual_analysis_timeout: Timeout for visual analysis LLM calls in seconds tool_retrieval_llm: LLM client for tool retrieval filter (None = use llm_client) visual_analysis_model: Model name for visual analysis (None = use llm_client.model) """ super().__init__( name=name, backend_scope=backend_scope or ["gui", "shell", "mcp", "web", "system"], llm_client=llm_client, grounding_client=grounding_client, recording_manager=recording_manager ) self._system_prompt = system_prompt or self._default_system_prompt() self._max_iterations = max_iterations self._visual_analysis_timeout = visual_analysis_timeout self._tool_retrieval_llm = tool_retrieval_llm self._visual_analysis_model = visual_analysis_model # Skill context injection (set externally before process()) self._skill_context: Optional[str] = None self._active_skill_ids: List[str] = [] # Skill registry for mid-iteration retrieve_skill tool self._skill_registry: Optional["SkillRegistry"] = None # Tools from the last execution (available for post-execution analysis) self._last_tools: List = [] logger.info(f"Grounding Agent initialized: {name}") logger.info(f"Backend scope: {self._backend_scope}") logger.info(f"Max iterations: {self._max_iterations}") logger.info(f"Visual analysis timeout: {self._visual_analysis_timeout}s") if tool_retrieval_llm: logger.info(f"Tool retrieval model: {tool_retrieval_llm.model}") if visual_analysis_model: logger.info(f"Visual analysis model: {visual_analysis_model}") def set_skill_context( self, context: str, skill_ids: Optional[List[str]] = None, ) -> None: """Inject skill guidance into the agent's system prompt. Called by ``OpenSpace.execute()`` before ``process()`` when skills are matched. The context is a formatted string built by ``SkillRegistry.build_context_injection()``. Args: context: Formatted skill content for system prompt injection. skill_ids: skill_id values of injected skills. """ self._skill_context = context if context else None self._active_skill_ids = skill_ids or [] if self._skill_context: logger.info(f"Skill context set: {', '.join(self._active_skill_ids) or '(unnamed)'}") def clear_skill_context(self) -> None: """Remove skill guidance (used before fallback execution).""" if self._skill_context: logger.info(f"Skill context cleared (was: {', '.join(self._active_skill_ids)})") self._skill_context = None self._active_skill_ids = [] @property def has_skill_context(self) -> bool: return self._skill_context is not None def set_skill_registry(self, registry: Optional["SkillRegistry"]) -> None: """Attach a SkillRegistry so the agent can offer ``retrieve_skill`` as a tool.""" self._skill_registry = registry if registry: count = len(registry.list_skills()) logger.info(f"Skill registry attached ({count} skill(s) available for mid-iteration retrieval)") _MAX_SINGLE_CONTENT_CHARS = 30_000 @classmethod def _cap_message_content(cls, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Truncate oversized individual message contents in-place. Targets tool-result messages and assistant messages that can carry enormous file contents (read_file on large CSVs/scripts). System messages and the first user instruction are never touched. """ cap = cls._MAX_SINGLE_CONTENT_CHARS trimmed = 0 for msg in messages: content = msg.get("content") if not isinstance(content, str) or len(content) <= cap: continue if msg.get("role") == "system": continue original_len = len(content) msg["content"] = ( content[: cap // 2] + f"\n\n... [truncated {original_len - cap:,} chars] ...\n\n" + content[-(cap // 2):] ) trimmed += 1 if trimmed: logger.info(f"Capped {trimmed} oversized message(s) to {cap:,} chars each") return messages def _truncate_messages( self, messages: List[Dict[str, Any]], keep_recent: int = 8, max_tokens_estimate: int = 120000 ) -> List[Dict[str, Any]]: # First: cap any single oversized message to prevent one huge # tool-result from dominating the context window. messages = self._cap_message_content(messages) if len(messages) <= keep_recent + 2: # +2 for system and initial user return messages total_text = json.dumps(messages, ensure_ascii=False) estimated_tokens = len(total_text) // 4 if estimated_tokens < max_tokens_estimate: return messages logger.info(f"Truncating message history: {len(messages)} messages, " f"~{estimated_tokens:,} tokens -> keeping recent {keep_recent} rounds") system_messages = [] user_instruction = None conversation_messages = [] for msg in messages: role = msg.get("role") if role == "system": system_messages.append(msg) elif role == "user" and user_instruction is None: user_instruction = msg else: conversation_messages.append(msg) recent_messages = conversation_messages[-(keep_recent * 2):] if conversation_messages else [] truncated = system_messages.copy() if user_instruction: truncated.append(user_instruction) truncated.extend(recent_messages) logger.info(f"After truncation: {len(truncated)} messages, " f"~{len(json.dumps(truncated, ensure_ascii=False))//4:,} tokens (estimated)") return truncated async def process(self, context: Dict[str, Any]) -> Dict[str, Any]: """ Process a task execution request with multi-round iteration control. """ instruction = context.get("instruction", "") if not instruction: logger.error("Grounding Agent: No instruction provided") return {"error": "No instruction provided", "status": "error"} # Store current instruction for visual analysis context self._current_instruction = instruction logger.info(f"Grounding Agent: Processing instruction at step {self.step}") # Exist workspace files check workspace_info = await self._check_workspace_artifacts(context) if workspace_info["has_files"]: context["workspace_artifacts"] = workspace_info logger.info(f"Workspace has {len(workspace_info['files'])} existing files: {workspace_info['files']}") # Get available tools (auto-search with cap) tools = await self._get_available_tools(instruction) self._last_tools = tools # expose for post-execution analysis # Get search debug info (similarity scores, LLM selections) search_debug_info = None if self.grounding_client: search_debug_info = self.grounding_client.get_last_search_debug_info() # Build retrieved tools list for return value retrieved_tools_list = [] for tool in tools: tool_info = { "name": getattr(tool, "name", str(tool)), "description": getattr(tool, "description", ""), } # Prefer runtime_info.backend # over backend_type (may be NOT_SET for cached RemoteTools) runtime_info = getattr(tool, "_runtime_info", None) if runtime_info and hasattr(runtime_info, "backend"): tool_info["backend"] = runtime_info.backend.value if hasattr(runtime_info.backend, "value") else str(runtime_info.backend) tool_info["server_name"] = runtime_info.server_name elif hasattr(tool, "backend_type"): tool_info["backend"] = tool.backend_type.value if hasattr(tool.backend_type, "value") else str(tool.backend_type) # Add similarity score if available if search_debug_info and search_debug_info.get("tool_scores"): for score_info in search_debug_info["tool_scores"]: if score_info["name"] == tool_info["name"]: tool_info["similarity_score"] = score_info["score"] break retrieved_tools_list.append(tool_info) # Record retrieved tools if self._recording_manager: from openspace.recording import RecordingManager await RecordingManager.record_retrieved_tools( task_instruction=instruction, tools=tools, search_debug_info=search_debug_info, ) # Initialize iteration state max_iterations = context.get("max_iterations", self._max_iterations) current_iteration = 0 all_tool_results = [] iteration_contexts = [] consecutive_empty_responses = 0 # Track consecutive empty LLM responses MAX_CONSECUTIVE_EMPTY = 5 # Exit after this many empty responses # Build initial messages messages = self.construct_messages(context) # Record initial conversation setup once (system prompts + user instruction + tool definitions) from openspace.recording import RecordingManager await RecordingManager.record_conversation_setup( setup_messages=copy.deepcopy(messages), tools=tools, ) try: while current_iteration < max_iterations: current_iteration += 1 logger.info(f"Grounding Agent: Iteration {current_iteration}/{max_iterations}") # Strip skill context after the first iteration to save prompt tokens. # Skills only need to guide the first LLM call; subsequent iterations # already have the plan and tool results in context. if current_iteration == 2 and self._skill_context: skill_ctx = self._skill_context messages = [ m for m in messages if not (m.get("role") == "system" and m.get("content") == skill_ctx) ] logger.info("Skill context removed from messages after first iteration") # Cap oversized individual messages every iteration to prevent # a single huge tool result from ballooning all subsequent calls. if current_iteration >= 2: messages = self._cap_message_content(messages) # Truncate message history to prevent context length issues # Start truncating after 5 iterations to keep context manageable if current_iteration >= 5: messages = self._truncate_messages( messages, keep_recent=8, max_tokens_estimate=120000 ) messages_input_snapshot = copy.deepcopy(messages) # [DISABLED] Iteration summary generation # Tool results (including visual analysis) are already in context, # LLM can make decisions directly without separate summary. # To re-enable, uncomment below and pass iteration_summary_prompt to complete() # iteration_summary_prompt = GroundingAgentPrompts.iteration_summary( # instruction=instruction, # iteration=current_iteration, # max_iterations=max_iterations # ) if context.get("auto_execute", True) else None # Call LLMClient for single round # LLM will decide whether to call tools or finish with llm_response = await self._llm_client.complete( messages=messages, tools=tools if context.get("auto_execute", True) else None, execute_tools=context.get("auto_execute", True), summary_prompt=None, # Disabled tool_result_callback=self._visual_analysis_callback ) # Update messages with LLM response messages = llm_response["messages"] # Collect tool results tool_results_this_iteration = llm_response.get("tool_results", []) if tool_results_this_iteration: all_tool_results.extend(tool_results_this_iteration) # [DISABLED] Iteration summary logging # llm_summary = llm_response.get("iteration_summary") # if llm_summary: # logger.info(f"Iteration {current_iteration} summary: {llm_summary[:150]}...") assistant_message = llm_response.get("message", {}) assistant_content = assistant_message.get("content", "") has_tool_calls = llm_response.get('has_tool_calls', False) logger.info(f"Iteration {current_iteration} - Has tool calls: {has_tool_calls}, " f"Tool results: {len(tool_results_this_iteration)}, " f"Content length: {len(assistant_content)} chars") if len(assistant_content) > 0: logger.info(f"Iteration {current_iteration} - Assistant content preview: {repr(assistant_content[:300])}") consecutive_empty_responses = 0 # Reset counter on valid response else: if not has_tool_calls: consecutive_empty_responses += 1 logger.warning(f"Iteration {current_iteration} - NO tool calls and NO content " f"(empty response {consecutive_empty_responses}/{MAX_CONSECUTIVE_EMPTY})") if consecutive_empty_responses >= MAX_CONSECUTIVE_EMPTY: logger.error(f"Exiting due to {MAX_CONSECUTIVE_EMPTY} consecutive empty LLM responses. " "This may indicate API issues, rate limiting, or context too long.") break else: consecutive_empty_responses = 0 # Reset if we have tool calls # Snapshot messages after LLM call (accumulated context) messages_output_snapshot = copy.deepcopy(messages) # Delta messages: only the messages produced in this iteration # (avoids repeating system prompts / initial user instruction each time) delta_messages = messages[len(messages_input_snapshot):] # Response metadata (lightweight; full content lives in delta_messages) response_metadata = { "has_tool_calls": has_tool_calls, "tool_calls_count": len(tool_results_this_iteration), } iteration_context = { "iteration": current_iteration, "messages_input": messages_input_snapshot, "messages_output": messages_output_snapshot, "response_metadata": response_metadata, } iteration_contexts.append(iteration_context) # Real-time save to conversations.jsonl (delta only, no redundancy) await RecordingManager.record_iteration_context( iteration=current_iteration, delta_messages=copy.deepcopy(delta_messages), response_metadata=response_metadata, ) # Check for completion token in assistant content # [DISABLED] Also check in iteration summary when enabled # is_complete = ( # GroundingAgentPrompts.TASK_COMPLETE in assistant_content or # (llm_summary and GroundingAgentPrompts.TASK_COMPLETE in llm_summary) # ) is_complete = GroundingAgentPrompts.TASK_COMPLETE in assistant_content if is_complete: # Task is complete - LLM generated completion token logger.info(f"Task completed at iteration {current_iteration} (found {GroundingAgentPrompts.TASK_COMPLETE})") break else: # LLM didn't generate , continue to next iteration if tool_results_this_iteration: logger.debug(f"Task in progress, LLM called {len(tool_results_this_iteration)} tools") else: logger.debug(f"Task in progress, LLM did not generate ") # Remove previous iteration guidance to avoid accumulation messages = [ msg for msg in messages if not (msg.get("role") == "system" and "Iteration" in msg.get("content", "") and "complete" in msg.get("content", "")) ] guidance_msg = { "role": "system", "content": f"Iteration {current_iteration} complete. " f"Check if task is finished - if yes, output {GroundingAgentPrompts.TASK_COMPLETE}. " f"If not, continue with next action." } messages.append(guidance_msg) # [DISABLED] Full iteration feedback with summary # self._remove_previous_guidance(messages) # feedback_msg = self._build_iteration_feedback( # iteration=current_iteration, # llm_summary=llm_summary, # add_guidance=True # ) # if feedback_msg: # messages.append(feedback_msg) # logger.debug(f"Added iteration {current_iteration} feedback with guidance") continue # Build final result result = await self._build_final_result( instruction=instruction, messages=messages, all_tool_results=all_tool_results, iterations=current_iteration, max_iterations=max_iterations, iteration_contexts=iteration_contexts, retrieved_tools_list=retrieved_tools_list, search_debug_info=search_debug_info, ) # Record agent action to recording manager if self._recording_manager: await self._record_agent_execution(result, instruction) # Increment step self.increment_step() logger.info(f"Grounding Agent: Execution completed with status: {result.get('status')}") return result except Exception as e: logger.error(f"Grounding Agent: Execution failed: {e}") result = { "error": str(e), "status": "error", "instruction": instruction, "iteration": current_iteration } self.increment_step() return result def _default_system_prompt(self) -> str: """Default system prompt tailored to the agent's actual backend scope.""" return GroundingAgentPrompts.build_system_prompt(self._backend_scope) def construct_messages( self, context: Dict[str, Any] ) -> List[Dict[str, Any]]: messages = [{"role": "system", "content": self._system_prompt}] # Get instruction from context instruction = context.get("instruction", "") if not instruction: raise ValueError("context must contain 'instruction' field") # Add workspace directory workspace_dir = context.get("workspace_dir") if workspace_dir: messages.append({ "role": "system", "content": GroundingAgentPrompts.workspace_directory(workspace_dir) }) # Add workspace artifacts information workspace_artifacts = context.get("workspace_artifacts") if workspace_artifacts and workspace_artifacts.get("has_files"): files = workspace_artifacts.get("files", []) matching_files = workspace_artifacts.get("matching_files", []) recent_files = workspace_artifacts.get("recent_files", []) if matching_files: artifact_msg = GroundingAgentPrompts.workspace_matching_files(matching_files) elif len(recent_files) >= 2: artifact_msg = GroundingAgentPrompts.workspace_recent_files( total_files=len(files), recent_files=recent_files ) else: artifact_msg = GroundingAgentPrompts.workspace_file_list(files) messages.append({ "role": "system", "content": artifact_msg }) # Skill injection — only active (selected) skills, full content if self._skill_context: messages.append({ "role": "system", "content": self._skill_context }) logger.info(f"Injected active skill context ({len(self._active_skill_ids)} skill(s))") # User instruction messages.append({"role": "user", "content": instruction}) return messages async def _get_available_tools(self, task_description: Optional[str]) -> List: """ Retrieve tools for the current execution phase. Both skill-augmented and normal modes use the same ``get_tools_with_auto_search`` pipeline: - Non-MCP tools (shell, gui, web, system) are always included. - MCP tools are filtered by relevance only when their count exceeds ``max_tools``. When skills are active, the shell backend is guaranteed to be in scope (skills commonly reference ``shell_agent``). Falls back to returning all tools if anything fails. """ grounding_client = self.grounding_client if not grounding_client: return [] backends = [BackendType(name) for name in self._backend_scope] # Ensure shell backend is available when skills are active # (skills commonly reference shell_agent, read_file, etc.) if self.has_skill_context: shell_bt = BackendType.SHELL if shell_bt not in backends: backends = list(backends) + [shell_bt] logger.info("Added Shell backend to scope for skill file I/O") try: retrieval_llm = self._tool_retrieval_llm or self._llm_client tools = await grounding_client.get_tools_with_auto_search( task_description=task_description, backend=backends, use_cache=True, llm_callable=retrieval_llm, ) logger.info( f"GroundingAgent selected {len(tools)} tools (auto-search) " f"from {len(backends)} backends" + (f" [skill-augmented]" if self.has_skill_context else "") ) except Exception as e: logger.warning(f"Auto-search tools failed, falling back to full list: {e}") tools = await self._load_all_tools(grounding_client) # Append retrieve_skill tool when skill registry is available if self._skill_registry and self._skill_registry.list_skills(): from openspace.skill_engine.retrieve_tool import RetrieveSkillTool retrieve_llm = self._tool_retrieval_llm or self._llm_client retrieve_tool = RetrieveSkillTool( self._skill_registry, backends=[b.value for b in backends], llm_client=retrieve_llm, skill_store=getattr(self, "_skill_store", None), ) retrieve_tool.bind_runtime_info( backend=BackendType.SYSTEM, session_name="internal", ) tools.append(retrieve_tool) logger.info("Added retrieve_skill tool for mid-iteration skill retrieval") return tools async def _load_all_tools(self, grounding_client: "GroundingClient") -> List: """Fallback: load all tools from all backends without search.""" all_tools = [] for backend_name in self._backend_scope: try: backend_type = BackendType(backend_name) tools = await grounding_client.list_tools(backend=backend_type) all_tools.extend(tools) logger.debug(f"Retrieved {len(tools)} tools from backend: {backend_name}") except Exception as e: logger.debug(f"Could not get tools from {backend_name}: {e}") logger.info( f"GroundingAgent fallback retrieved {len(all_tools)} tools " f"from {len(self._backend_scope)} backends" ) return all_tools async def _visual_analysis_callback( self, result: ToolResult, tool_name: str, tool_call: Dict, backend: str ) -> ToolResult: """ Callback for LLMClient to handle visual analysis after tool execution. """ # 1. Check if LLM requested to skip visual analysis skip_visual_analysis = False try: arguments = tool_call.function.arguments if isinstance(arguments, str): args = json.loads(arguments.strip() or "{}") else: args = arguments if isinstance(args, dict) and args.get("skip_visual_analysis"): skip_visual_analysis = True logger.info(f"Visual analysis skipped for {tool_name} (meta-parameter set by LLM)") except Exception as e: logger.debug(f"Could not parse tool arguments: {e}") # 2. If skip requested, return original result if skip_visual_analysis: return result # 3. Check if this backend needs visual analysis if backend != "gui": return result # 4. Check if tool has visual data metadata = getattr(result, 'metadata', None) has_screenshots = metadata and (metadata.get("screenshot") or metadata.get("screenshots")) # 5. If no visual data, try to capture a screenshot if not has_screenshots: try: logger.info(f"No visual data from {tool_name}, capturing screenshot...") screenshot_client = ScreenshotClient() screenshot_bytes = await screenshot_client.capture() if screenshot_bytes: # Add screenshot to result metadata if metadata is None: result.metadata = {} metadata = result.metadata metadata["screenshot"] = screenshot_bytes has_screenshots = True logger.info(f"Screenshot captured for visual analysis") else: logger.warning("Failed to capture screenshot") except Exception as e: logger.warning(f"Error capturing screenshot: {e}") # 6. If still no screenshots, return original result if not has_screenshots: logger.debug(f"No visual data available for {tool_name}") return result # 7. Perform visual analysis return await self._enhance_result_with_visual_context(result, tool_name) async def _enhance_result_with_visual_context( self, result: ToolResult, tool_name: str ) -> ToolResult: """ Enhance tool result with visual analysis for grounding agent workflows. """ import asyncio import base64 import litellm try: metadata = getattr(result, 'metadata', None) if not metadata: return result # Collect all screenshots screenshots_bytes = [] # Check for multiple screenshots first if metadata.get("screenshots"): screenshots_list = metadata["screenshots"] if isinstance(screenshots_list, list): screenshots_bytes = [s for s in screenshots_list if s] # Fall back to single screenshot elif metadata.get("screenshot"): screenshots_bytes = [metadata["screenshot"]] if not screenshots_bytes: return result # Select key screenshots if there are too many selected_screenshots = self._select_key_screenshots(screenshots_bytes, max_count=3) # Convert to base64 visual_b64_list = [] for visual_data in selected_screenshots: if isinstance(visual_data, bytes): visual_b64_list.append(base64.b64encode(visual_data).decode('utf-8')) else: visual_b64_list.append(visual_data) # Already base64 # Build prompt based on number of screenshots num_screenshots = len(visual_b64_list) prompt = GroundingAgentPrompts.visual_analysis( tool_name=tool_name, num_screenshots=num_screenshots, task_description=getattr(self, '_current_instruction', '') ) # Build content with text prompt + all images content = [{"type": "text", "text": prompt}] for visual_b64 in visual_b64_list: content.append({ "type": "image_url", "image_url": { "url": f"data:image/png;base64,{visual_b64}" } }) # Use dedicated visual analysis model if configured, otherwise use main LLM model visual_model = self._visual_analysis_model or (self._llm_client.model if self._llm_client else "openrouter/anthropic/claude-sonnet-4.5") response = await asyncio.wait_for( litellm.acompletion( model=visual_model, messages=[{ "role": "user", "content": content }], timeout=self._visual_analysis_timeout ), timeout=self._visual_analysis_timeout + 5 ) analysis = response.choices[0].message.content.strip() # Inject visual analysis into content original_content = result.content or "(no text output)" enhanced_content = f"{original_content}\n\n**Visual content**: {analysis}" # Create enhanced result enhanced_result = ToolResult( status=result.status, content=enhanced_content, error=result.error, metadata={**metadata, "visual_analyzed": True, "visual_analysis": analysis}, execution_time=result.execution_time ) logger.info(f"Enhanced {tool_name} result with visual analysis ({num_screenshots} screenshot(s))") return enhanced_result except asyncio.TimeoutError: logger.warning(f"Visual analysis timed out for {tool_name}, returning original result") return result except Exception as e: logger.warning(f"Failed to analyze visual content for {tool_name}: {e}") return result def _select_key_screenshots( self, screenshots: List[bytes], max_count: int = 3 ) -> List[bytes]: """ Select key screenshots if there are too many. """ if len(screenshots) <= max_count: return screenshots selected_indices = set() # Always include last (final state) selected_indices.add(len(screenshots) - 1) # If room, include first (initial state) if max_count >= 2: selected_indices.add(0) # Fill remaining slots with evenly spaced middle screenshots remaining_slots = max_count - len(selected_indices) if remaining_slots > 0: # Calculate spacing available_indices = [ i for i in range(1, len(screenshots) - 1) if i not in selected_indices ] if available_indices: step = max(1, len(available_indices) // (remaining_slots + 1)) for i in range(remaining_slots): idx = min((i + 1) * step, len(available_indices) - 1) if idx < len(available_indices): selected_indices.add(available_indices[idx]) # Return screenshots in original order selected = [screenshots[i] for i in sorted(selected_indices)] logger.debug( f"Selected {len(selected)} screenshots at indices {sorted(selected_indices)} " f"from total of {len(screenshots)}" ) return selected def _get_workspace_path(self, context: Dict[str, Any]) -> Optional[str]: """ Get workspace directory path from context. """ return context.get("workspace_dir") def _scan_workspace_files( self, workspace_path: str, recent_threshold: int = 600 # seconds ) -> Dict[str, Any]: """ Scan workspace directory and collect file information. Args: workspace_path: Path to workspace directory recent_threshold: Threshold in seconds for recent files Returns: Dictionary with file information: - files: List of all filenames - file_details: Dict mapping filename to file info (size, modified, age_seconds) - recent_files: List of recently modified filenames """ import os import time result = { "files": [], "file_details": {}, "recent_files": [] } if not workspace_path or not os.path.exists(workspace_path): return result # Recording system files to exclude from workspace scanning excluded_files = {"metadata.json", "traj.jsonl"} try: current_time = time.time() for filename in os.listdir(workspace_path): filepath = os.path.join(workspace_path, filename) if os.path.isfile(filepath) and filename not in excluded_files: result["files"].append(filename) # Get file stats stat = os.stat(filepath) file_info = { "size": stat.st_size, "modified": stat.st_mtime, "age_seconds": current_time - stat.st_mtime } result["file_details"][filename] = file_info # Track recently created/modified files if file_info["age_seconds"] < recent_threshold: result["recent_files"].append(filename) result["files"] = sorted(result["files"]) except Exception as e: logger.debug(f"Error scanning workspace files: {e}") return result async def _check_workspace_artifacts(self, context: Dict[str, Any]) -> Dict[str, Any]: """ Check workspace directory for existing artifacts that might be relevant to the task. Enhanced to detect if task might already be completed. """ import re workspace_info = {"has_files": False, "files": [], "file_details": {}, "recent_files": []} try: # Get workspace path workspace_path = self._get_workspace_path(context) # Scan workspace files scan_result = self._scan_workspace_files(workspace_path, recent_threshold=600) if scan_result["files"]: workspace_info["has_files"] = True workspace_info["files"] = scan_result["files"] workspace_info["file_details"] = scan_result["file_details"] workspace_info["recent_files"] = scan_result["recent_files"] logger.info(f"Grounding Agent: Found {len(scan_result['files'])} existing files in workspace " f"({len(scan_result['recent_files'])} recent)") # Check if instruction mentions specific filenames instruction = context.get("instruction", "") if instruction: # Look for potential file references in instruction potential_outputs = [] # Match common file patterns: filename.ext, "filename", 'filename' file_patterns = re.findall(r'["\']?([a-zA-Z0-9_\-]+\.[a-zA-Z0-9]+)["\']?', instruction) for pattern in file_patterns: if pattern in scan_result["files"]: potential_outputs.append(pattern) if potential_outputs: workspace_info["matching_files"] = potential_outputs logger.info(f"Grounding Agent: Found {len(potential_outputs)} files matching task: {potential_outputs}") except Exception as e: logger.debug(f"Could not check workspace artifacts: {e}") return workspace_info def _build_iteration_feedback( self, iteration: int, llm_summary: Optional[str] = None, add_guidance: bool = True ) -> Optional[Dict[str, str]]: """ Build feedback message to add to next iteration. """ if not llm_summary: return None feedback_content = GroundingAgentPrompts.iteration_feedback( iteration=iteration, llm_summary=llm_summary, add_guidance=add_guidance ) return { "role": "system", "content": feedback_content } def _remove_previous_guidance(self, messages: List[Dict[str, Any]]) -> None: """ Remove guidance section from previous iteration feedback messages. """ for msg in messages: if msg.get("role") == "system": content = msg.get("content", "") # Check if this is an iteration feedback message with guidance if "## Iteration" in content and "Summary" in content and "---" in content: # Remove everything from "---" onwards (the guidance part) summary_only = content.split("---")[0].strip() msg["content"] = summary_only async def _generate_final_summary( self, instruction: str, messages: List[Dict], iterations: int ) -> tuple[str, bool, List[Dict]]: """ Generate final summary across all iterations for reporting to upper layer. Returns: tuple[str, bool, List[Dict]]: (summary_text, success_flag, context_used) - summary_text: The generated summary or error message - success_flag: True if summary was generated successfully, False otherwise - context_used: The cleaned messages used for generating summary """ final_summary_prompt = { "role": "user", "content": GroundingAgentPrompts.final_summary( instruction=instruction, iterations=iterations ) } clean_messages = [] for msg in messages: # Skip tool result messages if msg.get("role") == "tool": continue # Copy message and remove tool_calls if present clean_msg = msg.copy() if "tool_calls" in clean_msg: del clean_msg["tool_calls"] clean_messages.append(clean_msg) clean_messages.append(final_summary_prompt) # Save context for return context_for_return = copy.deepcopy(clean_messages) try: # Call LLMClient to generate final summary (without tools) summary_response = await self._llm_client.complete( messages=clean_messages, tools=None, execute_tools=False ) final_summary = summary_response.get("message", {}).get("content", "") if final_summary: logger.info(f"Generated final summary: {final_summary[:200]}...") return final_summary, True, context_for_return else: logger.warning("LLM returned empty final summary") return f"Task completed after {iterations} iteration(s). Check execution history for details.", True, context_for_return except Exception as e: logger.error(f"Error generating final summary: {e}") return f"Task completed after {iterations} iteration(s), but failed to generate summary: {str(e)}", False, context_for_return async def _build_final_result( self, instruction: str, messages: List[Dict], all_tool_results: List[Dict], iterations: int, max_iterations: int, iteration_contexts: List[Dict] = None, retrieved_tools_list: List[Dict] = None, search_debug_info: Dict[str, Any] = None, ) -> Dict[str, Any]: """ Build final execution result. Args: instruction: Original instruction messages: Complete conversation history (including all iteration summaries) all_tool_results: All tool execution results iterations: Number of iterations performed max_iterations: Maximum allowed iterations iteration_contexts: Context snapshots for each iteration retrieved_tools_list: List of tools retrieved for this task search_debug_info: Debug info from tool search (similarity scores, LLM selections) """ is_complete = self._check_task_completion(messages) tool_executions = self._format_tool_executions(all_tool_results) result = { "instruction": instruction, "step": self.step, "iterations": iterations, "tool_executions": tool_executions, "messages": messages, "iteration_contexts": iteration_contexts or [], "retrieved_tools_list": retrieved_tools_list or [], "search_debug_info": search_debug_info, "active_skills": list(self._active_skill_ids), "keep_session": True } if is_complete: logger.info("Task completed with marker") # Use LLM's own completion response directly (no extra LLM call needed) # LLM already generates a summary before outputting last_response = self._extract_last_assistant_message(messages) # Remove the token from response for cleaner output result["response"] = last_response.replace(GroundingAgentPrompts.TASK_COMPLETE, "").strip() result["status"] = "success" # [DISABLED] Extra LLM call to generate final summary # final_summary, summary_success, final_summary_context = await self._generate_final_summary( # instruction=instruction, # messages=messages, # iterations=iterations # ) # result["response"] = final_summary # result["final_summary_context"] = final_summary_context else: result["response"] = self._extract_last_assistant_message(messages) result["status"] = "incomplete" result["warning"] = ( f"Task reached max iterations ({max_iterations}) without completion. " f"This may indicate the task needs more steps or clarification." ) return result def _format_tool_executions(self, all_tool_results: List[Dict]) -> List[Dict]: executions = [] for tr in all_tool_results: tool_result_obj = tr.get("result") tool_call = tr.get("tool_call") status = "unknown" if hasattr(tool_result_obj, 'status'): status_obj = tool_result_obj.status status = getattr(status_obj, 'value', status_obj) # Extract tool_name and arguments from tool_call object (litellm format) tool_name = "unknown" arguments = {} if tool_call is not None: if hasattr(tool_call, 'function'): # tool_call is an object with .function attribute tool_name = getattr(tool_call.function, 'name', 'unknown') args_raw = getattr(tool_call.function, 'arguments', '{}') if isinstance(args_raw, str): try: arguments = json.loads(args_raw) if args_raw.strip() else {} except json.JSONDecodeError: arguments = {} else: arguments = args_raw if isinstance(args_raw, dict) else {} elif isinstance(tool_call, dict): # Fallback: tool_call is a dict func = tool_call.get("function", {}) tool_name = func.get("name", "unknown") args_raw = func.get("arguments", "{}") if isinstance(args_raw, str): try: arguments = json.loads(args_raw) if args_raw.strip() else {} except json.JSONDecodeError: arguments = {} else: arguments = args_raw if isinstance(args_raw, dict) else {} executions.append({ "tool_name": tool_name, "arguments": arguments, "backend": tr.get("backend"), "server_name": tr.get("server_name"), "status": status, "content": tool_result_obj.content if hasattr(tool_result_obj, 'content') else None, "error": tool_result_obj.error if hasattr(tool_result_obj, 'error') else None, "execution_time": tool_result_obj.execution_time if hasattr(tool_result_obj, 'execution_time') else None, "metadata": tool_result_obj.metadata if hasattr(tool_result_obj, 'metadata') else {}, }) return executions def _check_task_completion(self, messages: List[Dict]) -> bool: for msg in reversed(messages): if msg.get("role") == "assistant": content = msg.get("content", "") return GroundingAgentPrompts.TASK_COMPLETE in content return False def _extract_last_assistant_message(self, messages: List[Dict]) -> str: for msg in reversed(messages): if msg.get("role") == "assistant": return msg.get("content", "") return "" async def _record_agent_execution( self, result: Dict[str, Any], instruction: str ) -> None: """ Record agent execution to recording manager. Args: result: Execution result instruction: Original instruction """ if not self._recording_manager: return # Extract tool execution summary tool_summary = [] if result.get("tool_executions"): for exec_info in result["tool_executions"]: tool_summary.append({ "tool": exec_info.get("tool_name", "unknown"), "backend": exec_info.get("backend", "unknown"), "status": exec_info.get("status", "unknown"), }) await self._recording_manager.record_agent_action( agent_name=self.name, action_type="execute", input_data={"instruction": instruction}, reasoning={ "response": result.get("response", ""), "tools_selected": tool_summary, }, output_data={ "status": result.get("status", "unknown"), "iterations": result.get("iterations", 0), "num_tool_executions": len(result.get("tool_executions", [])), }, metadata={ "step": self.step, "instruction": instruction, } )