likhonsheikh commited on
Commit
8cfe18c
·
verified ·
1 Parent(s): b55e0e7

Upload computer_tool.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. computer_tool.py +119 -0
computer_tool.py ADDED
@@ -0,0 +1,119 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Open-source Computer Use Tool - xdotool based implementation
3
+ """
4
+
5
+ import asyncio
6
+ import base64
7
+ import os
8
+ import shlex
9
+ import shutil
10
+ from dataclasses import dataclass
11
+ from enum import Enum
12
+ from typing import Optional, Literal
13
+ from pathlib import Path
14
+
15
+
16
+ class Action(str, Enum):
17
+ SCREENSHOT = "screenshot"
18
+ KEY = "key"
19
+ TYPE = "type"
20
+ MOUSE_MOVE = "mouse_move"
21
+ LEFT_CLICK = "left_click"
22
+ RIGHT_CLICK = "right_click"
23
+ DOUBLE_CLICK = "double_click"
24
+ SCROLL = "scroll"
25
+ WAIT = "wait"
26
+
27
+
28
+ @dataclass
29
+ class ToolResult:
30
+ output: Optional[str] = None
31
+ error: Optional[str] = None
32
+ base64_image: Optional[str] = None
33
+
34
+
35
+ class ComputerTool:
36
+ def __init__(
37
+ self,
38
+ display_width: int = 1280,
39
+ display_height: int = 800,
40
+ display_num: int = 99
41
+ ):
42
+ self.display_width = display_width
43
+ self.display_height = display_height
44
+ self.display_num = display_num
45
+ self._display_prefix = f"DISPLAY=:{self.display_num} "
46
+ self._screenshot_delay = 0.5
47
+ self._typing_delay_ms = 12
48
+
49
+ async def _run_shell(self, command: str) -> tuple[str, str]:
50
+ process = await asyncio.create_subprocess_shell(
51
+ command,
52
+ stdout=asyncio.subprocess.PIPE,
53
+ stderr=asyncio.subprocess.PIPE,
54
+ )
55
+ stdout, stderr = await process.communicate()
56
+ return stdout.decode(), stderr.decode()
57
+
58
+ async def screenshot(self) -> ToolResult:
59
+ screenshot_path = Path(f"/tmp/screenshot_{os.getpid()}.png")
60
+
61
+ if shutil.which("scrot"):
62
+ cmd = f"{self._display_prefix}scrot -o {screenshot_path}"
63
+ else:
64
+ cmd = f"{self._display_prefix}import -window root {screenshot_path}"
65
+
66
+ await self._run_shell(cmd)
67
+
68
+ if not screenshot_path.exists():
69
+ return ToolResult(error="Screenshot failed")
70
+
71
+ with open(screenshot_path, "rb") as f:
72
+ base64_image = base64.standard_b64encode(f.read()).decode()
73
+
74
+ screenshot_path.unlink(missing_ok=True)
75
+ return ToolResult(base64_image=base64_image)
76
+
77
+ async def click(self, x: int, y: int, button: str = "left", clicks: int = 1) -> ToolResult:
78
+ button_map = {"left": 1, "middle": 2, "right": 3}
79
+ btn = button_map.get(button, 1)
80
+
81
+ await self._run_shell(f"{self._display_prefix}xdotool mousemove --sync {x} {y}")
82
+ await self._run_shell(f"{self._display_prefix}xdotool click --repeat {clicks} --delay 100 {btn}")
83
+
84
+ await asyncio.sleep(self._screenshot_delay)
85
+ return ToolResult(output=f"Clicked {button} at ({x}, {y})")
86
+
87
+ async def type_text(self, text: str) -> ToolResult:
88
+ cmd = f"{self._display_prefix}xdotool type --delay {self._typing_delay_ms} -- {shlex.quote(text)}"
89
+ await self._run_shell(cmd)
90
+ await asyncio.sleep(self._screenshot_delay)
91
+ return ToolResult(output=f"Typed: {text[:50]}...")
92
+
93
+ async def press_key(self, key: str) -> ToolResult:
94
+ key_map = {
95
+ "enter": "Return", "return": "Return", "tab": "Tab",
96
+ "escape": "Escape", "esc": "Escape", "backspace": "BackSpace",
97
+ "space": "space", "up": "Up", "down": "Down",
98
+ "left": "Left", "right": "Right",
99
+ }
100
+
101
+ keys = key.lower().split("+")
102
+ mapped = [key_map.get(k.strip(), k.strip()) for k in keys]
103
+ key_combo = "+".join(mapped)
104
+
105
+ await self._run_shell(f"{self._display_prefix}xdotool key -- {shlex.quote(key_combo)}")
106
+ await asyncio.sleep(self._screenshot_delay)
107
+ return ToolResult(output=f"Pressed: {key}")
108
+
109
+ async def scroll(self, direction: str = "down", amount: int = 3) -> ToolResult:
110
+ button_map = {"up": 4, "down": 5, "left": 6, "right": 7}
111
+ button = button_map.get(direction, 5)
112
+
113
+ await self._run_shell(f"{self._display_prefix}xdotool click --repeat {amount} --delay 50 {button}")
114
+ await asyncio.sleep(self._screenshot_delay)
115
+ return ToolResult(output=f"Scrolled {direction}")
116
+
117
+ async def move_mouse(self, x: int, y: int) -> ToolResult:
118
+ await self._run_shell(f"{self._display_prefix}xdotool mousemove --sync {x} {y}")
119
+ return ToolResult(output=f"Moved to ({x}, {y})")