open-computer-use-agent / computer_tool.py
likhonsheikh's picture
Upload computer_tool.py with huggingface_hub
8cfe18c verified
"""
Open-source Computer Use Tool - xdotool based implementation
"""
import asyncio
import base64
import os
import shlex
import shutil
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Literal
from pathlib import Path
class Action(str, Enum):
SCREENSHOT = "screenshot"
KEY = "key"
TYPE = "type"
MOUSE_MOVE = "mouse_move"
LEFT_CLICK = "left_click"
RIGHT_CLICK = "right_click"
DOUBLE_CLICK = "double_click"
SCROLL = "scroll"
WAIT = "wait"
@dataclass
class ToolResult:
output: Optional[str] = None
error: Optional[str] = None
base64_image: Optional[str] = None
class ComputerTool:
def __init__(
self,
display_width: int = 1280,
display_height: int = 800,
display_num: int = 99
):
self.display_width = display_width
self.display_height = display_height
self.display_num = display_num
self._display_prefix = f"DISPLAY=:{self.display_num} "
self._screenshot_delay = 0.5
self._typing_delay_ms = 12
async def _run_shell(self, command: str) -> tuple[str, str]:
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
return stdout.decode(), stderr.decode()
async def screenshot(self) -> ToolResult:
screenshot_path = Path(f"/tmp/screenshot_{os.getpid()}.png")
if shutil.which("scrot"):
cmd = f"{self._display_prefix}scrot -o {screenshot_path}"
else:
cmd = f"{self._display_prefix}import -window root {screenshot_path}"
await self._run_shell(cmd)
if not screenshot_path.exists():
return ToolResult(error="Screenshot failed")
with open(screenshot_path, "rb") as f:
base64_image = base64.standard_b64encode(f.read()).decode()
screenshot_path.unlink(missing_ok=True)
return ToolResult(base64_image=base64_image)
async def click(self, x: int, y: int, button: str = "left", clicks: int = 1) -> ToolResult:
button_map = {"left": 1, "middle": 2, "right": 3}
btn = button_map.get(button, 1)
await self._run_shell(f"{self._display_prefix}xdotool mousemove --sync {x} {y}")
await self._run_shell(f"{self._display_prefix}xdotool click --repeat {clicks} --delay 100 {btn}")
await asyncio.sleep(self._screenshot_delay)
return ToolResult(output=f"Clicked {button} at ({x}, {y})")
async def type_text(self, text: str) -> ToolResult:
cmd = f"{self._display_prefix}xdotool type --delay {self._typing_delay_ms} -- {shlex.quote(text)}"
await self._run_shell(cmd)
await asyncio.sleep(self._screenshot_delay)
return ToolResult(output=f"Typed: {text[:50]}...")
async def press_key(self, key: str) -> ToolResult:
key_map = {
"enter": "Return", "return": "Return", "tab": "Tab",
"escape": "Escape", "esc": "Escape", "backspace": "BackSpace",
"space": "space", "up": "Up", "down": "Down",
"left": "Left", "right": "Right",
}
keys = key.lower().split("+")
mapped = [key_map.get(k.strip(), k.strip()) for k in keys]
key_combo = "+".join(mapped)
await self._run_shell(f"{self._display_prefix}xdotool key -- {shlex.quote(key_combo)}")
await asyncio.sleep(self._screenshot_delay)
return ToolResult(output=f"Pressed: {key}")
async def scroll(self, direction: str = "down", amount: int = 3) -> ToolResult:
button_map = {"up": 4, "down": 5, "left": 6, "right": 7}
button = button_map.get(direction, 5)
await self._run_shell(f"{self._display_prefix}xdotool click --repeat {amount} --delay 50 {button}")
await asyncio.sleep(self._screenshot_delay)
return ToolResult(output=f"Scrolled {direction}")
async def move_mouse(self, x: int, y: int) -> ToolResult:
await self._run_shell(f"{self._display_prefix}xdotool mousemove --sync {x} {y}")
return ToolResult(output=f"Moved to ({x}, {y})")