Spaces:
Runtime error
Runtime error
| import asyncio | |
| import base64 | |
| import logging | |
| import os | |
| import time | |
| from typing import Dict, Literal, Optional | |
| import aiohttp | |
| from pydantic import Field | |
| from app.daytona.tool_base import Sandbox, SandboxToolsBase | |
| from app.tool.base import ToolResult | |
| KEYBOARD_KEYS = [ | |
| "a", | |
| "b", | |
| "c", | |
| "d", | |
| "e", | |
| "f", | |
| "g", | |
| "h", | |
| "i", | |
| "j", | |
| "k", | |
| "l", | |
| "m", | |
| "n", | |
| "o", | |
| "p", | |
| "q", | |
| "r", | |
| "s", | |
| "t", | |
| "u", | |
| "v", | |
| "w", | |
| "x", | |
| "y", | |
| "z", | |
| "0", | |
| "1", | |
| "2", | |
| "3", | |
| "4", | |
| "5", | |
| "6", | |
| "7", | |
| "8", | |
| "9", | |
| "enter", | |
| "esc", | |
| "backspace", | |
| "tab", | |
| "space", | |
| "delete", | |
| "ctrl", | |
| "alt", | |
| "shift", | |
| "win", | |
| "up", | |
| "down", | |
| "left", | |
| "right", | |
| "f1", | |
| "f2", | |
| "f3", | |
| "f4", | |
| "f5", | |
| "f6", | |
| "f7", | |
| "f8", | |
| "f9", | |
| "f10", | |
| "f11", | |
| "f12", | |
| "ctrl+c", | |
| "ctrl+v", | |
| "ctrl+x", | |
| "ctrl+z", | |
| "ctrl+a", | |
| "ctrl+s", | |
| "alt+tab", | |
| "alt+f4", | |
| "ctrl+alt+delete", | |
| ] | |
| MOUSE_BUTTONS = ["left", "right", "middle"] | |
| _COMPUTER_USE_DESCRIPTION = """\ | |
| A comprehensive computer automation tool that allows interaction with the desktop environment. | |
| * This tool provides commands for controlling mouse, keyboard, and taking screenshots | |
| * It maintains state including current mouse position | |
| * Use this when you need to automate desktop applications, fill forms, or perform GUI interactions | |
| Key capabilities include: | |
| * Mouse Control: Move, click, drag, scroll | |
| * Keyboard Input: Type text, press keys or key combinations | |
| * Screenshots: Capture and save screen images | |
| * Waiting: Pause execution for specified duration | |
| """ | |
| class ComputerUseTool(SandboxToolsBase): | |
| """Computer automation tool for controlling the desktop environment.""" | |
| name: str = "computer_use" | |
| description: str = _COMPUTER_USE_DESCRIPTION | |
| parameters: dict = { | |
| "type": "object", | |
| "properties": { | |
| "action": { | |
| "type": "string", | |
| "enum": [ | |
| "move_to", | |
| "click", | |
| "scroll", | |
| "typing", | |
| "press", | |
| "wait", | |
| "mouse_down", | |
| "mouse_up", | |
| "drag_to", | |
| "hotkey", | |
| "screenshot", | |
| ], | |
| "description": "The computer action to perform", | |
| }, | |
| "x": {"type": "number", "description": "X coordinate for mouse actions"}, | |
| "y": {"type": "number", "description": "Y coordinate for mouse actions"}, | |
| "button": { | |
| "type": "string", | |
| "enum": MOUSE_BUTTONS, | |
| "description": "Mouse button for click/drag actions", | |
| "default": "left", | |
| }, | |
| "num_clicks": { | |
| "type": "integer", | |
| "description": "Number of clicks", | |
| "enum": [1, 2, 3], | |
| "default": 1, | |
| }, | |
| "amount": { | |
| "type": "integer", | |
| "description": "Scroll amount (positive for up, negative for down)", | |
| "minimum": -10, | |
| "maximum": 10, | |
| }, | |
| "text": {"type": "string", "description": "Text to type"}, | |
| "key": { | |
| "type": "string", | |
| "enum": KEYBOARD_KEYS, | |
| "description": "Key to press", | |
| }, | |
| "keys": { | |
| "type": "string", | |
| "enum": KEYBOARD_KEYS, | |
| "description": "Key combination to press", | |
| }, | |
| "duration": { | |
| "type": "number", | |
| "description": "Duration in seconds to wait", | |
| "default": 0.5, | |
| }, | |
| }, | |
| "required": ["action"], | |
| "dependencies": { | |
| "move_to": ["x", "y"], | |
| "click": [], | |
| "scroll": ["amount"], | |
| "typing": ["text"], | |
| "press": ["key"], | |
| "wait": [], | |
| "mouse_down": [], | |
| "mouse_up": [], | |
| "drag_to": ["x", "y"], | |
| "hotkey": ["keys"], | |
| "screenshot": [], | |
| }, | |
| } | |
| session: Optional[aiohttp.ClientSession] = Field(default=None, exclude=True) | |
| mouse_x: int = Field(default=0, exclude=True) | |
| mouse_y: int = Field(default=0, exclude=True) | |
| api_base_url: Optional[str] = Field(default=None, exclude=True) | |
| def __init__(self, sandbox: Optional[Sandbox] = None, **data): | |
| """Initialize with optional sandbox.""" | |
| super().__init__(**data) | |
| if sandbox is not None: | |
| self._sandbox = sandbox # 直接操作基类的私有属性 | |
| self.api_base_url = sandbox.get_preview_link(8000).url | |
| logging.info( | |
| f"Initialized ComputerUseTool with API URL: {self.api_base_url}" | |
| ) | |
| def create_with_sandbox(cls, sandbox: Sandbox) -> "ComputerUseTool": | |
| """Factory method to create a tool with sandbox.""" | |
| return cls(sandbox=sandbox) # 通过构造函数初始化 | |
| async def _get_session(self) -> aiohttp.ClientSession: | |
| """Get or create aiohttp session for API requests.""" | |
| if self.session is None or self.session.closed: | |
| self.session = aiohttp.ClientSession() | |
| return self.session | |
| async def _api_request( | |
| self, method: str, endpoint: str, data: Optional[Dict] = None | |
| ) -> Dict: | |
| """Send request to automation service API.""" | |
| try: | |
| session = await self._get_session() | |
| url = f"{self.api_base_url}/api{endpoint}" | |
| logging.debug(f"API request: {method} {url} {data}") | |
| if method.upper() == "GET": | |
| async with session.get(url) as response: | |
| result = await response.json() | |
| else: # POST | |
| async with session.post(url, json=data) as response: | |
| result = await response.json() | |
| logging.debug(f"API response: {result}") | |
| return result | |
| except Exception as e: | |
| logging.error(f"API request failed: {str(e)}") | |
| return {"success": False, "error": str(e)} | |
| async def execute( | |
| self, | |
| action: Literal[ | |
| "move_to", | |
| "click", | |
| "scroll", | |
| "typing", | |
| "press", | |
| "wait", | |
| "mouse_down", | |
| "mouse_up", | |
| "drag_to", | |
| "hotkey", | |
| "screenshot", | |
| ], | |
| x: Optional[float] = None, | |
| y: Optional[float] = None, | |
| button: str = "left", | |
| num_clicks: int = 1, | |
| amount: Optional[int] = None, | |
| text: Optional[str] = None, | |
| key: Optional[str] = None, | |
| keys: Optional[str] = None, | |
| duration: float = 0.5, | |
| **kwargs, | |
| ) -> ToolResult: | |
| """ | |
| Execute a specified computer automation action. | |
| Args: | |
| action: The action to perform | |
| x: X coordinate for mouse actions | |
| y: Y coordinate for mouse actions | |
| button: Mouse button for click/drag actions | |
| num_clicks: Number of clicks to perform | |
| amount: Scroll amount (positive for up, negative for down) | |
| text: Text to type | |
| key: Key to press | |
| keys: Key combination to press | |
| duration: Duration in seconds to wait | |
| **kwargs: Additional arguments | |
| Returns: | |
| ToolResult with the action's output or error | |
| """ | |
| try: | |
| if action == "move_to": | |
| if x is None or y is None: | |
| return ToolResult(error="x and y coordinates are required") | |
| x_int = int(round(float(x))) | |
| y_int = int(round(float(y))) | |
| result = await self._api_request( | |
| "POST", "/automation/mouse/move", {"x": x_int, "y": y_int} | |
| ) | |
| if result.get("success", False): | |
| self.mouse_x = x_int | |
| self.mouse_y = y_int | |
| return ToolResult(output=f"Moved to ({x_int}, {y_int})") | |
| else: | |
| return ToolResult( | |
| error=f"Failed to move: {result.get('error', 'Unknown error')}" | |
| ) | |
| elif action == "click": | |
| x_val = x if x is not None else self.mouse_x | |
| y_val = y if y is not None else self.mouse_y | |
| x_int = int(round(float(x_val))) | |
| y_int = int(round(float(y_val))) | |
| num_clicks = int(num_clicks) | |
| result = await self._api_request( | |
| "POST", | |
| "/automation/mouse/click", | |
| { | |
| "x": x_int, | |
| "y": y_int, | |
| "clicks": num_clicks, | |
| "button": button.lower(), | |
| }, | |
| ) | |
| if result.get("success", False): | |
| self.mouse_x = x_int | |
| self.mouse_y = y_int | |
| return ToolResult( | |
| output=f"{num_clicks} {button} click(s) performed at ({x_int}, {y_int})" | |
| ) | |
| else: | |
| return ToolResult( | |
| error=f"Failed to click: {result.get('error', 'Unknown error')}" | |
| ) | |
| elif action == "scroll": | |
| if amount is None: | |
| return ToolResult(error="Scroll amount is required") | |
| amount = int(float(amount)) | |
| amount = max(-10, min(10, amount)) | |
| result = await self._api_request( | |
| "POST", | |
| "/automation/mouse/scroll", | |
| {"clicks": amount, "x": self.mouse_x, "y": self.mouse_y}, | |
| ) | |
| if result.get("success", False): | |
| direction = "up" if amount > 0 else "down" | |
| steps = abs(amount) | |
| return ToolResult( | |
| output=f"Scrolled {direction} {steps} step(s) at position ({self.mouse_x}, {self.mouse_y})" | |
| ) | |
| else: | |
| return ToolResult( | |
| error=f"Failed to scroll: {result.get('error', 'Unknown error')}" | |
| ) | |
| elif action == "typing": | |
| if text is None: | |
| return ToolResult(error="Text is required for typing") | |
| text = str(text) | |
| result = await self._api_request( | |
| "POST", | |
| "/automation/keyboard/write", | |
| {"message": text, "interval": 0.01}, | |
| ) | |
| if result.get("success", False): | |
| return ToolResult(output=f"Typed: {text}") | |
| else: | |
| return ToolResult( | |
| error=f"Failed to type: {result.get('error', 'Unknown error')}" | |
| ) | |
| elif action == "press": | |
| if key is None: | |
| return ToolResult(error="Key is required for press action") | |
| key = str(key).lower() | |
| result = await self._api_request( | |
| "POST", "/automation/keyboard/press", {"keys": key, "presses": 1} | |
| ) | |
| if result.get("success", False): | |
| return ToolResult(output=f"Pressed key: {key}") | |
| else: | |
| return ToolResult( | |
| error=f"Failed to press key: {result.get('error', 'Unknown error')}" | |
| ) | |
| elif action == "wait": | |
| duration = float(duration) | |
| duration = max(0, min(10, duration)) | |
| await asyncio.sleep(duration) | |
| return ToolResult(output=f"Waited {duration} seconds") | |
| elif action == "mouse_down": | |
| x_val = x if x is not None else self.mouse_x | |
| y_val = y if y is not None else self.mouse_y | |
| x_int = int(round(float(x_val))) | |
| y_int = int(round(float(y_val))) | |
| result = await self._api_request( | |
| "POST", | |
| "/automation/mouse/down", | |
| {"x": x_int, "y": y_int, "button": button.lower()}, | |
| ) | |
| if result.get("success", False): | |
| self.mouse_x = x_int | |
| self.mouse_y = y_int | |
| return ToolResult( | |
| output=f"{button} button pressed at ({x_int}, {y_int})" | |
| ) | |
| else: | |
| return ToolResult( | |
| error=f"Failed to press button: {result.get('error', 'Unknown error')}" | |
| ) | |
| elif action == "mouse_up": | |
| x_val = x if x is not None else self.mouse_x | |
| y_val = y if y is not None else self.mouse_y | |
| x_int = int(round(float(x_val))) | |
| y_int = int(round(float(y_val))) | |
| result = await self._api_request( | |
| "POST", | |
| "/automation/mouse/up", | |
| {"x": x_int, "y": y_int, "button": button.lower()}, | |
| ) | |
| if result.get("success", False): | |
| self.mouse_x = x_int | |
| self.mouse_y = y_int | |
| return ToolResult( | |
| output=f"{button} button released at ({x_int}, {y_int})" | |
| ) | |
| else: | |
| return ToolResult( | |
| error=f"Failed to release button: {result.get('error', 'Unknown error')}" | |
| ) | |
| elif action == "drag_to": | |
| if x is None or y is None: | |
| return ToolResult(error="x and y coordinates are required") | |
| target_x = int(round(float(x))) | |
| target_y = int(round(float(y))) | |
| start_x = self.mouse_x | |
| start_y = self.mouse_y | |
| result = await self._api_request( | |
| "POST", | |
| "/automation/mouse/drag", | |
| {"x": target_x, "y": target_y, "duration": 0.3, "button": "left"}, | |
| ) | |
| if result.get("success", False): | |
| self.mouse_x = target_x | |
| self.mouse_y = target_y | |
| return ToolResult( | |
| output=f"Dragged from ({start_x}, {start_y}) to ({target_x}, {target_y})" | |
| ) | |
| else: | |
| return ToolResult( | |
| error=f"Failed to drag: {result.get('error', 'Unknown error')}" | |
| ) | |
| elif action == "hotkey": | |
| if keys is None: | |
| return ToolResult(error="Keys are required for hotkey action") | |
| keys = str(keys).lower().strip() | |
| key_sequence = keys.split("+") | |
| result = await self._api_request( | |
| "POST", | |
| "/automation/keyboard/hotkey", | |
| {"keys": key_sequence, "interval": 0.01}, | |
| ) | |
| if result.get("success", False): | |
| return ToolResult(output=f"Pressed key combination: {keys}") | |
| else: | |
| return ToolResult( | |
| error=f"Failed to press keys: {result.get('error', 'Unknown error')}" | |
| ) | |
| elif action == "screenshot": | |
| result = await self._api_request("POST", "/automation/screenshot") | |
| if "image" in result: | |
| base64_str = result["image"] | |
| timestamp = time.strftime("%Y%m%d_%H%M%S") | |
| # Save screenshot to file | |
| screenshots_dir = "screenshots" | |
| if not os.path.exists(screenshots_dir): | |
| os.makedirs(screenshots_dir) | |
| timestamped_filename = os.path.join( | |
| screenshots_dir, f"screenshot_{timestamp}.png" | |
| ) | |
| latest_filename = "latest_screenshot.png" | |
| # Decode base64 string and save to file | |
| img_data = base64.b64decode(base64_str) | |
| with open(timestamped_filename, "wb") as f: | |
| f.write(img_data) | |
| # Save a copy as the latest screenshot | |
| with open(latest_filename, "wb") as f: | |
| f.write(img_data) | |
| return ToolResult( | |
| output=f"Screenshot saved as {timestamped_filename}", | |
| base64_image=base64_str, | |
| ) | |
| else: | |
| return ToolResult(error="Failed to capture screenshot") | |
| else: | |
| return ToolResult(error=f"Unknown action: {action}") | |
| except Exception as e: | |
| return ToolResult(error=f"Computer action failed: {str(e)}") | |
| async def cleanup(self): | |
| """Clean up resources.""" | |
| if self.session and not self.session.closed: | |
| await self.session.close() | |
| self.session = None | |
| def __del__(self): | |
| """Ensure cleanup on destruction.""" | |
| if hasattr(self, "session") and self.session is not None: | |
| try: | |
| asyncio.run(self.cleanup()) | |
| except RuntimeError: | |
| loop = asyncio.new_event_loop() | |
| loop.run_until_complete(self.cleanup()) | |
| loop.close() | |