File size: 6,415 Bytes
bcb86b5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
import os
import time
import subprocess
import io
from typing import Literal, Tuple, List
from PIL import Image
from loguru import logger
from .computer import Computer, EnvState
class X11Computer(Computer):
"""X11 Desktop implementation of the Computer interface"""
def __init__(self, display: str = ":1"):
self.display = display
self._screen_size = self._get_screen_size()
def _run_cmd(self, cmd: List[str], check: bool = True) -> subprocess.CompletedProcess:
"""Run a command with the correct DISPLAY environment variable"""
env = {**os.environ, "DISPLAY": self.display}
return subprocess.run(cmd, env=env, check=check, capture_output=True, text=True)
def _get_screen_size(self) -> Tuple[int, int]:
try:
# xdotool getdisplaygeometry returns "width height"
res = self._run_cmd(["xdotool", "getdisplaygeometry"])
w, h = map(int, res.stdout.strip().split())
return w, h
except Exception as e:
logger.error(f"Failed to get screen size: {e}")
return 1920, 1080
def screen_size(self) -> Tuple[int, int]:
return self._screen_size
def current_state(self) -> EnvState:
"""Capture screenshot and active window title"""
try:
# Capture screenshot using scrot
screenshot_path = "/tmp/screenshot_state.png"
self._run_cmd(["scrot", "-o", screenshot_path])
with open(screenshot_path, "rb") as f:
screenshot_bytes = f.read()
# Get active window title as "url"
try:
res = self._run_cmd(["xdotool", "getactivewindow", "getwindowname"])
window_title = res.stdout.strip()
except subprocess.CalledProcessError:
window_title = "Desktop"
return EnvState(screenshot=screenshot_bytes, url=window_title)
except Exception as e:
logger.error(f"Failed to capture state: {e}")
# Return empty state on failure
return EnvState(screenshot=b"", url="Error")
def open_web_browser(self) -> EnvState:
"""Launch Firefox"""
subprocess.Popen(["firefox"], env={**os.environ, "DISPLAY": self.display},
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(3) # Wait for launch
return self.current_state()
def click_at(self, x: int, y: int) -> EnvState:
self._run_cmd(["xdotool", "mousemove", str(x), str(y), "click", "1"])
time.sleep(0.5)
return self.current_state()
def hover_at(self, x: int, y: int) -> EnvState:
self._run_cmd(["xdotool", "mousemove", str(x), str(y)])
time.sleep(0.5)
return self.current_state()
def type_text_at(
self,
x: int,
y: int,
text: str,
press_enter: bool,
clear_before_typing: bool,
) -> EnvState:
# Move to location and click to focus
self.click_at(x, y)
if clear_before_typing:
# Ctrl+A, Delete
self._run_cmd(["xdotool", "key", "ctrl+a", "Delete"])
time.sleep(0.2)
self._run_cmd(["xdotool", "type", "--", text])
if press_enter:
self._run_cmd(["xdotool", "key", "Return"])
time.sleep(0.5)
return self.current_state()
def scroll_document(
self, direction: Literal["up", "down", "left", "right"]
) -> EnvState:
if direction == "up":
self._run_cmd(["xdotool", "click", "4"]) # Scroll up
elif direction == "down":
self._run_cmd(["xdotool", "click", "5"]) # Scroll down
# Left/Right scroll not standard on all mice, ignoring for now or mapping to keys
return self.current_state()
def scroll_at(
self,
x: int,
y: int,
direction: Literal["up", "down", "left", "right"],
magnitude: int,
) -> EnvState:
# Move mouse first
self._run_cmd(["xdotool", "mousemove", str(x), str(y)])
# Approximate magnitude to clicks (e.g., 1 click ~ 100px)
clicks = max(1, magnitude // 100)
button = "4" if direction == "up" else "5"
if direction in ["left", "right"]:
# Horizontal scroll support varies, skipping for basic implementation
pass
else:
for _ in range(clicks):
self._run_cmd(["xdotool", "click", button])
time.sleep(0.1)
return self.current_state()
def wait_5_seconds(self) -> EnvState:
time.sleep(5)
return self.current_state()
def go_back(self) -> EnvState:
# Alt+Left is standard back shortcut
self._run_cmd(["xdotool", "key", "alt+Left"])
return self.current_state()
def go_forward(self) -> EnvState:
# Alt+Right is standard forward shortcut
self._run_cmd(["xdotool", "key", "alt+Right"])
return self.current_state()
def search(self) -> EnvState:
# Open browser and focus address bar (Ctrl+L)
self.open_web_browser()
time.sleep(1)
self._run_cmd(["xdotool", "key", "ctrl+l"])
return self.current_state()
def navigate(self, url: str) -> EnvState:
# Open browser with URL
subprocess.Popen(["firefox", url], env={**os.environ, "DISPLAY": self.display},
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(3)
return self.current_state()
def key_combination(self, keys: List[str]) -> EnvState:
# Convert list ["control", "c"] to "control+c"
# Map common names if needed
key_str = "+".join(keys)
self._run_cmd(["xdotool", "key", key_str])
return self.current_state()
def drag_and_drop(
self, x: int, y: int, destination_x: int, destination_y: int
) -> EnvState:
self._run_cmd(["xdotool", "mousemove", str(x), str(y)])
self._run_cmd(["xdotool", "mousedown", "1"])
time.sleep(0.2)
self._run_cmd(["xdotool", "mousemove", str(destination_x), str(destination_y)])
time.sleep(0.2)
self._run_cmd(["xdotool", "mouseup", "1"])
return self.current_state()
|