|
|
""" |
|
|
Open-source Computer Use Tool - xdotool based implementation |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import base64 |
|
|
import os |
|
|
import shlex |
|
|
import shutil |
|
|
from dataclasses import dataclass |
|
|
from enum import Enum |
|
|
from typing import Optional, Literal |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
class Action(str, Enum): |
|
|
SCREENSHOT = "screenshot" |
|
|
KEY = "key" |
|
|
TYPE = "type" |
|
|
MOUSE_MOVE = "mouse_move" |
|
|
LEFT_CLICK = "left_click" |
|
|
RIGHT_CLICK = "right_click" |
|
|
DOUBLE_CLICK = "double_click" |
|
|
SCROLL = "scroll" |
|
|
WAIT = "wait" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class ToolResult: |
|
|
output: Optional[str] = None |
|
|
error: Optional[str] = None |
|
|
base64_image: Optional[str] = None |
|
|
|
|
|
|
|
|
class ComputerTool: |
|
|
def __init__( |
|
|
self, |
|
|
display_width: int = 1280, |
|
|
display_height: int = 800, |
|
|
display_num: int = 99 |
|
|
): |
|
|
self.display_width = display_width |
|
|
self.display_height = display_height |
|
|
self.display_num = display_num |
|
|
self._display_prefix = f"DISPLAY=:{self.display_num} " |
|
|
self._screenshot_delay = 0.5 |
|
|
self._typing_delay_ms = 12 |
|
|
|
|
|
async def _run_shell(self, command: str) -> tuple[str, str]: |
|
|
process = await asyncio.create_subprocess_shell( |
|
|
command, |
|
|
stdout=asyncio.subprocess.PIPE, |
|
|
stderr=asyncio.subprocess.PIPE, |
|
|
) |
|
|
stdout, stderr = await process.communicate() |
|
|
return stdout.decode(), stderr.decode() |
|
|
|
|
|
async def screenshot(self) -> ToolResult: |
|
|
screenshot_path = Path(f"/tmp/screenshot_{os.getpid()}.png") |
|
|
|
|
|
if shutil.which("scrot"): |
|
|
cmd = f"{self._display_prefix}scrot -o {screenshot_path}" |
|
|
else: |
|
|
cmd = f"{self._display_prefix}import -window root {screenshot_path}" |
|
|
|
|
|
await self._run_shell(cmd) |
|
|
|
|
|
if not screenshot_path.exists(): |
|
|
return ToolResult(error="Screenshot failed") |
|
|
|
|
|
with open(screenshot_path, "rb") as f: |
|
|
base64_image = base64.standard_b64encode(f.read()).decode() |
|
|
|
|
|
screenshot_path.unlink(missing_ok=True) |
|
|
return ToolResult(base64_image=base64_image) |
|
|
|
|
|
async def click(self, x: int, y: int, button: str = "left", clicks: int = 1) -> ToolResult: |
|
|
button_map = {"left": 1, "middle": 2, "right": 3} |
|
|
btn = button_map.get(button, 1) |
|
|
|
|
|
await self._run_shell(f"{self._display_prefix}xdotool mousemove --sync {x} {y}") |
|
|
await self._run_shell(f"{self._display_prefix}xdotool click --repeat {clicks} --delay 100 {btn}") |
|
|
|
|
|
await asyncio.sleep(self._screenshot_delay) |
|
|
return ToolResult(output=f"Clicked {button} at ({x}, {y})") |
|
|
|
|
|
async def type_text(self, text: str) -> ToolResult: |
|
|
cmd = f"{self._display_prefix}xdotool type --delay {self._typing_delay_ms} -- {shlex.quote(text)}" |
|
|
await self._run_shell(cmd) |
|
|
await asyncio.sleep(self._screenshot_delay) |
|
|
return ToolResult(output=f"Typed: {text[:50]}...") |
|
|
|
|
|
async def press_key(self, key: str) -> ToolResult: |
|
|
key_map = { |
|
|
"enter": "Return", "return": "Return", "tab": "Tab", |
|
|
"escape": "Escape", "esc": "Escape", "backspace": "BackSpace", |
|
|
"space": "space", "up": "Up", "down": "Down", |
|
|
"left": "Left", "right": "Right", |
|
|
} |
|
|
|
|
|
keys = key.lower().split("+") |
|
|
mapped = [key_map.get(k.strip(), k.strip()) for k in keys] |
|
|
key_combo = "+".join(mapped) |
|
|
|
|
|
await self._run_shell(f"{self._display_prefix}xdotool key -- {shlex.quote(key_combo)}") |
|
|
await asyncio.sleep(self._screenshot_delay) |
|
|
return ToolResult(output=f"Pressed: {key}") |
|
|
|
|
|
async def scroll(self, direction: str = "down", amount: int = 3) -> ToolResult: |
|
|
button_map = {"up": 4, "down": 5, "left": 6, "right": 7} |
|
|
button = button_map.get(direction, 5) |
|
|
|
|
|
await self._run_shell(f"{self._display_prefix}xdotool click --repeat {amount} --delay 50 {button}") |
|
|
await asyncio.sleep(self._screenshot_delay) |
|
|
return ToolResult(output=f"Scrolled {direction}") |
|
|
|
|
|
async def move_mouse(self, x: int, y: int) -> ToolResult: |
|
|
await self._run_shell(f"{self._display_prefix}xdotool mousemove --sync {x} {y}") |
|
|
return ToolResult(output=f"Moved to ({x}, {y})") |
|
|
|