File size: 4,205 Bytes
8cfe18c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""
Open-source Computer Use Tool - xdotool based implementation
"""

import asyncio
import base64
import os
import shlex
import shutil
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Literal
from pathlib import Path


class Action(str, Enum):
    SCREENSHOT = "screenshot"
    KEY = "key"
    TYPE = "type"
    MOUSE_MOVE = "mouse_move"
    LEFT_CLICK = "left_click"
    RIGHT_CLICK = "right_click"
    DOUBLE_CLICK = "double_click"
    SCROLL = "scroll"
    WAIT = "wait"


@dataclass
class ToolResult:
    output: Optional[str] = None
    error: Optional[str] = None
    base64_image: Optional[str] = None


class ComputerTool:
    def __init__(
        self,
        display_width: int = 1280,
        display_height: int = 800,
        display_num: int = 99
    ):
        self.display_width = display_width
        self.display_height = display_height
        self.display_num = display_num
        self._display_prefix = f"DISPLAY=:{self.display_num} "
        self._screenshot_delay = 0.5
        self._typing_delay_ms = 12

    async def _run_shell(self, command: str) -> tuple[str, str]:
        process = await asyncio.create_subprocess_shell(
            command,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        stdout, stderr = await process.communicate()
        return stdout.decode(), stderr.decode()

    async def screenshot(self) -> ToolResult:
        screenshot_path = Path(f"/tmp/screenshot_{os.getpid()}.png")

        if shutil.which("scrot"):
            cmd = f"{self._display_prefix}scrot -o {screenshot_path}"
        else:
            cmd = f"{self._display_prefix}import -window root {screenshot_path}"

        await self._run_shell(cmd)

        if not screenshot_path.exists():
            return ToolResult(error="Screenshot failed")

        with open(screenshot_path, "rb") as f:
            base64_image = base64.standard_b64encode(f.read()).decode()

        screenshot_path.unlink(missing_ok=True)
        return ToolResult(base64_image=base64_image)

    async def click(self, x: int, y: int, button: str = "left", clicks: int = 1) -> ToolResult:
        button_map = {"left": 1, "middle": 2, "right": 3}
        btn = button_map.get(button, 1)

        await self._run_shell(f"{self._display_prefix}xdotool mousemove --sync {x} {y}")
        await self._run_shell(f"{self._display_prefix}xdotool click --repeat {clicks} --delay 100 {btn}")

        await asyncio.sleep(self._screenshot_delay)
        return ToolResult(output=f"Clicked {button} at ({x}, {y})")

    async def type_text(self, text: str) -> ToolResult:
        cmd = f"{self._display_prefix}xdotool type --delay {self._typing_delay_ms} -- {shlex.quote(text)}"
        await self._run_shell(cmd)
        await asyncio.sleep(self._screenshot_delay)
        return ToolResult(output=f"Typed: {text[:50]}...")

    async def press_key(self, key: str) -> ToolResult:
        key_map = {
            "enter": "Return", "return": "Return", "tab": "Tab",
            "escape": "Escape", "esc": "Escape", "backspace": "BackSpace",
            "space": "space", "up": "Up", "down": "Down",
            "left": "Left", "right": "Right",
        }

        keys = key.lower().split("+")
        mapped = [key_map.get(k.strip(), k.strip()) for k in keys]
        key_combo = "+".join(mapped)

        await self._run_shell(f"{self._display_prefix}xdotool key -- {shlex.quote(key_combo)}")
        await asyncio.sleep(self._screenshot_delay)
        return ToolResult(output=f"Pressed: {key}")

    async def scroll(self, direction: str = "down", amount: int = 3) -> ToolResult:
        button_map = {"up": 4, "down": 5, "left": 6, "right": 7}
        button = button_map.get(direction, 5)

        await self._run_shell(f"{self._display_prefix}xdotool click --repeat {amount} --delay 50 {button}")
        await asyncio.sleep(self._screenshot_delay)
        return ToolResult(output=f"Scrolled {direction}")

    async def move_mouse(self, x: int, y: int) -> ToolResult:
        await self._run_shell(f"{self._display_prefix}xdotool mousemove --sync {x} {y}")
        return ToolResult(output=f"Moved to ({x}, {y})")