Spaces:
Running
Running
| import base64 | |
| from typing import Any, Dict | |
| from openspace.grounding.core.tool.base import BaseTool | |
| from openspace.grounding.core.types import BackendType, ToolResult, ToolStatus | |
| from .transport.connector import GUIConnector | |
| from .transport.actions import ACTION_SPACE, KEYBOARD_KEYS | |
| from openspace.utils.logging import Logger | |
| logger = Logger.get_logger(__name__) | |
| class GUIAgentTool(BaseTool): | |
| """ | |
| LLM-powered GUI Agent Tool. | |
| This tool acts as an intelligent agent that: | |
| - Takes a task description as input | |
| - Observes the desktop via screenshot | |
| - Uses LLM/VLM to understand and plan actions | |
| - Outputs action space commands | |
| - Executes actions through the connector | |
| """ | |
| _name = "gui_agent" | |
| _description = """Vision-based GUI automation agent for tasks requiring graphical interface interaction. | |
| Use this tool when the task involves: | |
| - Operating desktop applications with graphical interfaces (browsers, editors, design tools, etc.) | |
| - Tasks that require visual understanding of UI elements, layouts, or content | |
| - Multi-step workflows that need click, drag, type, or other GUI interactions | |
| - Scenarios where programmatic APIs or command-line tools are unavailable or insufficient | |
| The agent observes screen state through screenshots, uses vision-language models to understand | |
| the interface, plans appropriate actions, and executes GUI operations autonomously. | |
| IMPORTANT - max_steps Parameter Guidelines: | |
| - Simple tasks (1-2 actions): 15-20 steps | |
| - Medium tasks (3-5 actions): 25-35 steps | |
| - Complex tasks (6+ actions, like web navigation): 35-50 steps | |
| - When uncertain, prefer larger values (35+) to avoid premature termination | |
| - Default is 25, but increase for multi-step workflows | |
| Input: | |
| - task_description: Natural language task description | |
| - max_steps: Maximum actions (default 25, increase for complex tasks) | |
| Output: Task execution results with action history and completion status | |
| """ | |
| backend_type = BackendType.GUI | |
| def __init__(self, connector: GUIConnector, llm_client=None, recording_manager=None, **kwargs): | |
| """ | |
| Initialize GUI Agent Tool. | |
| Args: | |
| connector: GUI connector for communication with desktop_env | |
| llm_client: LLM/VLM client for vision-based planning (optional) | |
| recording_manager: RecordingManager for recording intermediate steps (optional) | |
| **kwargs: Additional arguments for BaseTool | |
| """ | |
| super().__init__(**kwargs) | |
| self.connector = connector | |
| self.llm_client = llm_client # Will be injected later | |
| self.recording_manager = recording_manager # For recording intermediate steps | |
| self.action_history = [] # Track executed actions | |
| async def _arun( | |
| self, | |
| task_description: str, | |
| max_steps: int = 50, | |
| ) -> ToolResult: | |
| """ | |
| Execute a GUI automation task using LLM planning. | |
| This is the main entry point that: | |
| 1. Gets current screenshot | |
| 2. Uses LLM to plan next action based on task and screenshot | |
| 3. Executes the planned action | |
| 4. Repeats until task is complete or max_steps reached | |
| Args: | |
| task_description: Natural language description of the task | |
| max_steps: Maximum number of actions to execute (default 25) | |
| Recommended values based on task complexity: | |
| - Simple (1-2 actions): 15-20 | |
| - Medium (3-5 actions): 25-35 | |
| - Complex (6+ actions, web navigation, multi-app): 35-50 | |
| When in doubt, use higher values to avoid premature termination | |
| Returns: | |
| ToolResult with task execution status | |
| """ | |
| if not task_description: | |
| return ToolResult( | |
| status=ToolStatus.ERROR, | |
| error="task_description is required" | |
| ) | |
| logger.info(f"Starting GUI task: {task_description}") | |
| self.action_history = [] | |
| # Execute task with LLM planning loop | |
| try: | |
| result = await self._execute_task_with_planning( | |
| task_description=task_description, | |
| max_steps=max_steps, | |
| ) | |
| return result | |
| except Exception as e: | |
| logger.error(f"Task execution failed: {e}") | |
| return ToolResult( | |
| status=ToolStatus.ERROR, | |
| error=str(e), | |
| metadata={ | |
| "task_description": task_description, | |
| "actions_executed": len(self.action_history), | |
| "action_history": self.action_history, | |
| } | |
| ) | |
| async def _execute_task_with_planning( | |
| self, | |
| task_description: str, | |
| max_steps: int, | |
| ) -> ToolResult: | |
| """ | |
| Execute task with LLM-based planning loop. | |
| Planning loop: | |
| 1. Observe: Get screenshot | |
| 2. Plan: LLM decides next action | |
| 3. Execute: Perform the action | |
| 4. Verify: Check if task is complete | |
| 5. Repeat until done or max_steps | |
| Args: | |
| task_description: Task to complete | |
| max_steps: Maximum planning iterations | |
| Returns: | |
| ToolResult with execution details | |
| """ | |
| # Collect all screenshots for visual analysis | |
| all_screenshots = [] | |
| # Collect intermediate steps | |
| intermediate_steps = [] | |
| for step in range(max_steps): | |
| logger.info(f"Planning step {step + 1}/{max_steps}") | |
| # Step 1: Observe current state | |
| screenshot = await self.connector.get_screenshot() | |
| if not screenshot: | |
| return ToolResult( | |
| status=ToolStatus.ERROR, | |
| error="Failed to get screenshot for planning", | |
| metadata={"step": step, "action_history": self.action_history} | |
| ) | |
| # Collect screenshot for visual analysis | |
| all_screenshots.append(screenshot) | |
| # Step 2: Plan next action using LLM | |
| planned_action = await self._plan_next_action( | |
| task_description=task_description, | |
| screenshot=screenshot, | |
| action_history=self.action_history, | |
| ) | |
| # Check if task is complete | |
| if planned_action["action_type"] == "DONE": | |
| logger.info("Task marked as complete by LLM") | |
| reasoning = planned_action.get("reasoning", "Task completed successfully") | |
| intermediate_steps.append({ | |
| "step_number": step + 1, | |
| "action": "DONE", | |
| "reasoning": reasoning, | |
| "status": "done", | |
| }) | |
| return ToolResult( | |
| status=ToolStatus.SUCCESS, | |
| content=f"Task completed: {task_description}\n\nFinal state: {reasoning}", | |
| metadata={ | |
| "steps_taken": step + 1, | |
| "action_history": self.action_history, | |
| "screenshots": all_screenshots, | |
| "intermediate_steps": intermediate_steps, | |
| "final_reasoning": reasoning, | |
| } | |
| ) | |
| # Check if task failed | |
| if planned_action["action_type"] == "FAIL": | |
| logger.warning("Task marked as failed by LLM") | |
| reason = planned_action.get("reason", "Task cannot be completed") | |
| intermediate_steps.append({ | |
| "step_number": step + 1, | |
| "action": "FAIL", | |
| "reasoning": planned_action.get("reasoning", ""), | |
| "status": "failed", | |
| }) | |
| return ToolResult( | |
| status=ToolStatus.ERROR, | |
| error=reason, | |
| metadata={ | |
| "steps_taken": step + 1, | |
| "action_history": self.action_history, | |
| "screenshots": all_screenshots, | |
| "intermediate_steps": intermediate_steps, | |
| } | |
| ) | |
| # Check if action is WAIT (screenshot observation, continue to next step) | |
| if planned_action["action_type"] == "WAIT": | |
| logger.info("Screenshot observation step, continuing planning loop") | |
| intermediate_steps.append({ | |
| "step_number": step + 1, | |
| "action": "WAIT", | |
| "reasoning": planned_action.get("reasoning", ""), | |
| "status": "observation", | |
| }) | |
| continue | |
| # Step 3: Execute the planned action | |
| execution_result = await self._execute_planned_action(planned_action) | |
| # Record action in history | |
| self.action_history.append({ | |
| "step": step + 1, | |
| "planned_action": planned_action, | |
| "execution_result": execution_result, | |
| }) | |
| intermediate_steps.append({ | |
| "step_number": step + 1, | |
| "action": planned_action.get("action_type", "unknown"), | |
| "reasoning": planned_action.get("reasoning", ""), | |
| "status": execution_result.get("status", "unknown"), | |
| }) | |
| # Check execution result | |
| if execution_result.get("status") != "success": | |
| logger.warning(f"Action execution failed: {execution_result.get('error')}") | |
| # Continue to next iteration for retry planning | |
| # Max steps reached | |
| return ToolResult( | |
| status=ToolStatus.ERROR, | |
| error=f"Task incomplete after {max_steps} steps", | |
| metadata={ | |
| "task_description": task_description, | |
| "steps_taken": max_steps, | |
| "action_history": self.action_history, | |
| "screenshots": all_screenshots, | |
| "intermediate_steps": intermediate_steps, | |
| } | |
| ) | |
| async def _plan_next_action( | |
| self, | |
| task_description: str, | |
| screenshot: bytes, | |
| action_history: list, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Use LLM/VLM to plan the next action. | |
| This method sends: | |
| - Task description | |
| - Current screenshot (vision input) | |
| - Action history (context) | |
| - Available ACTION_SPACE | |
| And gets back a structured action plan. | |
| Args: | |
| task_description: The task to accomplish | |
| screenshot: Current desktop screenshot (PNG/JPEG bytes) | |
| action_history: Previously executed actions | |
| Returns: | |
| Dict with action_type and parameters | |
| """ | |
| if self.llm_client is None: | |
| # Fallback: Simple heuristic or manual mode | |
| logger.warning("No LLM client configured, using fallback mode") | |
| return { | |
| "action_type": "FAIL", | |
| "reason": "LLM client not configured" | |
| } | |
| # Check if using Anthropic client | |
| try: | |
| from .anthropic_client import AnthropicGUIClient | |
| is_anthropic = isinstance(self.llm_client, AnthropicGUIClient) | |
| except ImportError: | |
| is_anthropic = False | |
| if is_anthropic: | |
| # Use Anthropic client | |
| try: | |
| reasoning, commands = await self.llm_client.plan_action( | |
| task_description=task_description, | |
| screenshot=screenshot, | |
| action_history=action_history, | |
| ) | |
| if commands == ["FAIL"]: | |
| return { | |
| "action_type": "FAIL", | |
| "reason": "Anthropic planning failed" | |
| } | |
| if commands == ["DONE"]: | |
| return { | |
| "action_type": "DONE", | |
| "reasoning": reasoning | |
| } | |
| if commands == ["SCREENSHOT"]: | |
| # Screenshot is automatically handled by system | |
| # Continue to next planning step | |
| logger.info("LLM requested screenshot (observation step)") | |
| return { | |
| "action_type": "WAIT", | |
| "reasoning": reasoning or "Observing screen state" | |
| } | |
| # If no commands but has reasoning, task is complete | |
| # (Anthropic returns text-only when task is done) | |
| if not commands and reasoning: | |
| logger.info("LLM returned text-only response, interpreting as task completion") | |
| return { | |
| "action_type": "DONE", | |
| "reasoning": reasoning | |
| } | |
| # No commands and no reasoning = error | |
| if not commands: | |
| return { | |
| "action_type": "FAIL", | |
| "reason": "No commands generated and no completion message" | |
| } | |
| # Return first command (Anthropic returns pyautogui commands directly) | |
| return { | |
| "action_type": "PYAUTOGUI_COMMAND", | |
| "command": commands[0], | |
| "reasoning": reasoning | |
| } | |
| except Exception as e: | |
| logger.error(f"Anthropic planning failed: {e}") | |
| return { | |
| "action_type": "FAIL", | |
| "reason": f"Planning error: {str(e)}" | |
| } | |
| # Generic LLM client (for future integration with other LLMs) | |
| # Encode screenshot to base64 for LLM | |
| screenshot_b64 = base64.b64encode(screenshot).decode('utf-8') | |
| # Prepare prompt for LLM | |
| prompt = self._build_planning_prompt( | |
| task_description=task_description, | |
| action_history=action_history, | |
| ) | |
| # Call LLM with vision input | |
| try: | |
| response = await self.llm_client.plan_action( | |
| prompt=prompt, | |
| image_base64=screenshot_b64, | |
| action_space=ACTION_SPACE, | |
| keyboard_keys=KEYBOARD_KEYS, | |
| ) | |
| # Parse LLM response to action dict | |
| action = self._parse_llm_response(response) | |
| logger.info(f"LLM planned action: {action['action_type']}") | |
| return action | |
| except Exception as e: | |
| logger.error(f"LLM planning failed: {e}") | |
| return { | |
| "action_type": "FAIL", | |
| "reason": f"Planning error: {str(e)}" | |
| } | |
| def _build_planning_prompt( | |
| self, | |
| task_description: str, | |
| action_history: list, | |
| ) -> str: | |
| """ | |
| Build prompt for LLM planning. | |
| Args: | |
| task_description: The task to accomplish | |
| action_history: Previously executed actions | |
| Returns: | |
| Formatted prompt string | |
| """ | |
| prompt = f"""You are a GUI automation agent. Your task is to complete the following: | |
| Task: {task_description} | |
| You can observe the current desktop state through the provided screenshot. | |
| You must plan the next action to take from the available ACTION_SPACE. | |
| Available actions: | |
| - Mouse: MOVE_TO, CLICK, RIGHT_CLICK, DOUBLE_CLICK, DRAG_TO, SCROLL | |
| - Keyboard: TYPING, PRESS, KEY_DOWN, KEY_UP, HOTKEY | |
| - Control: WAIT, DONE, FAIL | |
| """ | |
| if action_history: | |
| prompt += f"\nPrevious actions taken ({len(action_history)}):\n" | |
| for i, action in enumerate(action_history[-5:], 1): # Last 5 actions | |
| prompt += f"{i}. {action['planned_action']['action_type']}" | |
| if 'parameters' in action['planned_action']: | |
| prompt += f" - {action['planned_action']['parameters']}" | |
| prompt += "\n" | |
| prompt += """ | |
| Based on the screenshot and task, output the next action in JSON format: | |
| { | |
| "action_type": "ACTION_TYPE", | |
| "parameters": {...}, | |
| "reasoning": "Why this action is needed" | |
| } | |
| If the task is complete, output: {"action_type": "DONE"} | |
| If the task cannot be completed, output: {"action_type": "FAIL", "reason": "explanation"} | |
| """ | |
| return prompt | |
| def _parse_llm_response(self, response: str) -> Dict[str, Any]: | |
| """ | |
| Parse LLM response to extract action. | |
| Args: | |
| response: LLM response (should be JSON) | |
| Returns: | |
| Action dict with action_type and parameters | |
| """ | |
| import json | |
| try: | |
| # Try to parse as JSON | |
| action = json.loads(response) | |
| # Validate action | |
| if "action_type" not in action: | |
| raise ValueError("Missing action_type in LLM response") | |
| return action | |
| except json.JSONDecodeError: | |
| logger.error(f"Failed to parse LLM response as JSON: {response[:200]}") | |
| return { | |
| "action_type": "FAIL", | |
| "reason": "Invalid LLM response format" | |
| } | |
| async def _execute_planned_action( | |
| self, | |
| action: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Execute a planned action through the connector. | |
| Args: | |
| action: Action dict with action_type and parameters | |
| Returns: | |
| Execution result dict | |
| """ | |
| action_type = action["action_type"] | |
| # Handle Anthropic's direct pyautogui commands | |
| if action_type == "PYAUTOGUI_COMMAND": | |
| command = action.get("command", "") | |
| logger.info(f"Executing pyautogui command: {command}") | |
| try: | |
| result = await self.connector.execute_python_command(command) | |
| return { | |
| "status": "success" if result else "error", | |
| "action_type": action_type, | |
| "command": command, | |
| "result": result | |
| } | |
| except Exception as e: | |
| logger.error(f"Command execution error: {e}") | |
| return { | |
| "status": "error", | |
| "action_type": action_type, | |
| "error": str(e) | |
| } | |
| # Handle standard action space commands | |
| parameters = action.get("parameters", {}) | |
| logger.info(f"Executing action: {action_type}") | |
| try: | |
| result = await self.connector.execute_action(action_type, parameters) | |
| return result | |
| except Exception as e: | |
| logger.error(f"Action execution error: {e}") | |
| return { | |
| "status": "error", | |
| "action_type": action_type, | |
| "error": str(e) | |
| } | |
| # Helper methods for direct action execution | |
| async def execute_action( | |
| self, | |
| action_type: str, | |
| parameters: Dict[str, Any] | |
| ) -> ToolResult: | |
| """ | |
| Direct action execution (bypass LLM planning). | |
| Args: | |
| action_type: Action type from ACTION_SPACE | |
| parameters: Action parameters | |
| Returns: | |
| ToolResult with execution status | |
| """ | |
| result = await self.connector.execute_action(action_type, parameters) | |
| if result.get("status") == "success": | |
| return ToolResult( | |
| status=ToolStatus.SUCCESS, | |
| content=f"Executed {action_type}", | |
| metadata=result | |
| ) | |
| else: | |
| return ToolResult( | |
| status=ToolStatus.ERROR, | |
| error=result.get("error", "Unknown error"), | |
| metadata=result | |
| ) | |
| async def get_screenshot(self) -> ToolResult: | |
| """Get current desktop screenshot.""" | |
| screenshot = await self.connector.get_screenshot() | |
| if screenshot: | |
| return ToolResult( | |
| status=ToolStatus.SUCCESS, | |
| content=screenshot, | |
| metadata={"type": "screenshot", "size": len(screenshot)} | |
| ) | |
| else: | |
| return ToolResult( | |
| status=ToolStatus.ERROR, | |
| error="Failed to capture screenshot" | |
| ) | |
| async def _record_intermediate_step( | |
| self, | |
| step_number: int, | |
| planned_action: Dict[str, Any], | |
| execution_result: Dict[str, Any], | |
| screenshot: bytes, | |
| task_description: str, | |
| ): | |
| """ | |
| Record an intermediate step of GUI agent execution. | |
| This method records each planning-action cycle to the recording system, | |
| providing detailed traces of GUI agent's decision-making process. | |
| Args: | |
| step_number: Step number in the execution sequence | |
| planned_action: Action planned by LLM | |
| execution_result: Result of executing the action | |
| screenshot: Screenshot before executing the action | |
| task_description: Overall task description | |
| """ | |
| # Try to get recording_manager dynamically if not set at initialization | |
| recording_manager = self.recording_manager | |
| if not recording_manager and hasattr(self, '_runtime_info') and self._runtime_info: | |
| # Try to get from grounding_client | |
| grounding_client = self._runtime_info.grounding_client | |
| if grounding_client and hasattr(grounding_client, 'recording_manager'): | |
| recording_manager = grounding_client.recording_manager | |
| logger.debug(f"Step {step_number}: Dynamically retrieved recording_manager from grounding_client") | |
| if not recording_manager: | |
| logger.debug(f"Step {step_number}: No recording_manager available, skipping intermediate step recording") | |
| return | |
| # Check if recording is active | |
| try: | |
| from openspace.recording.manager import RecordingManager | |
| if not RecordingManager.is_recording(): | |
| logger.debug(f"Step {step_number}: RecordingManager not started") | |
| return | |
| except Exception as e: | |
| logger.debug(f"Step {step_number}: Failed to check recording status: {e}") | |
| return | |
| # Check if recorder is initialized | |
| if not hasattr(recording_manager, '_recorder') or not recording_manager._recorder: | |
| logger.warning(f"Step {step_number}: recording_manager._recorder not initialized") | |
| return | |
| # Build command string for display | |
| action_type = planned_action.get("action_type", "unknown") | |
| command = self._format_action_command(planned_action) | |
| # Build result summary | |
| status = execution_result.get("status", "unknown") | |
| is_success = status in ("success", "done", "observation") | |
| # Build result content | |
| if status == "done": | |
| result_content = f"Task completed at step {step_number}" | |
| elif status == "failed": | |
| result_content = execution_result.get("message", "Task failed") | |
| elif status == "observation": | |
| result_content = execution_result.get("message", "Screenshot observation") | |
| else: | |
| result_content = execution_result.get("result", execution_result.get("message", str(execution_result))) | |
| # Build parameters for recording | |
| parameters = { | |
| "task_description": task_description, | |
| "step_number": step_number, | |
| "action_type": action_type, | |
| "planned_action": planned_action, | |
| } | |
| # Record to trajectory recorder (handles screenshot saving) | |
| try: | |
| await recording_manager._recorder.record_step( | |
| backend="gui", | |
| tool="gui_agent_step", | |
| command=command, | |
| result={ | |
| "status": "success" if is_success else "error", | |
| "output": str(result_content)[:200], | |
| }, | |
| parameters=parameters, | |
| screenshot=screenshot, | |
| extra={ | |
| "gui_step_number": step_number, | |
| "reasoning": planned_action.get("reasoning", ""), | |
| } | |
| ) | |
| logger.info(f"✓ Recorded GUI intermediate step {step_number}: {command}") | |
| except Exception as e: | |
| logger.error(f"✗ Failed to record intermediate step {step_number}: {e}", exc_info=True) | |
| def _format_action_command(self, planned_action: Dict[str, Any]) -> str: | |
| """ | |
| Format planned action into a human-readable command string. | |
| Args: | |
| planned_action: Action dictionary from LLM planning | |
| Returns: | |
| Formatted command string | |
| """ | |
| action_type = planned_action.get("action_type", "unknown") | |
| # Handle special action types | |
| if action_type == "DONE": | |
| return "DONE (task completed)" | |
| elif action_type == "FAIL": | |
| reason = planned_action.get("reason", "unknown") | |
| return f"FAIL ({reason})" | |
| elif action_type == "WAIT": | |
| return "WAIT (screenshot observation)" | |
| # Handle PyAutoGUI commands | |
| elif action_type == "PYAUTOGUI_COMMAND": | |
| command = planned_action.get("command", "") | |
| # Truncate long commands | |
| if len(command) > 100: | |
| return command[:100] + "..." | |
| return command | |
| # Handle standard action space commands | |
| else: | |
| parameters = planned_action.get("parameters", {}) | |
| if parameters: | |
| # Format first 2 parameters | |
| param_items = list(parameters.items())[:2] | |
| param_str = ", ".join([f"{k}={v}" for k, v in param_items]) | |
| return f"{action_type}({param_str})" | |
| else: | |
| return action_type |