|
|
import os |
|
|
import time |
|
|
import subprocess |
|
|
import io |
|
|
from typing import Literal, Tuple, List |
|
|
from PIL import Image |
|
|
from loguru import logger |
|
|
from .computer import Computer, EnvState |
|
|
|
|
|
class X11Computer(Computer): |
|
|
"""X11 Desktop implementation of the Computer interface""" |
|
|
|
|
|
def __init__(self, display: str = ":1"): |
|
|
self.display = display |
|
|
self._screen_size = self._get_screen_size() |
|
|
|
|
|
def _run_cmd(self, cmd: List[str], check: bool = True) -> subprocess.CompletedProcess: |
|
|
"""Run a command with the correct DISPLAY environment variable""" |
|
|
env = {**os.environ, "DISPLAY": self.display} |
|
|
return subprocess.run(cmd, env=env, check=check, capture_output=True, text=True) |
|
|
|
|
|
def _get_screen_size(self) -> Tuple[int, int]: |
|
|
try: |
|
|
|
|
|
res = self._run_cmd(["xdotool", "getdisplaygeometry"]) |
|
|
w, h = map(int, res.stdout.strip().split()) |
|
|
return w, h |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to get screen size: {e}") |
|
|
return 1920, 1080 |
|
|
|
|
|
def screen_size(self) -> Tuple[int, int]: |
|
|
return self._screen_size |
|
|
|
|
|
def current_state(self) -> EnvState: |
|
|
"""Capture screenshot and active window title""" |
|
|
try: |
|
|
|
|
|
screenshot_path = "/tmp/screenshot_state.png" |
|
|
self._run_cmd(["scrot", "-o", screenshot_path]) |
|
|
|
|
|
with open(screenshot_path, "rb") as f: |
|
|
screenshot_bytes = f.read() |
|
|
|
|
|
|
|
|
try: |
|
|
res = self._run_cmd(["xdotool", "getactivewindow", "getwindowname"]) |
|
|
window_title = res.stdout.strip() |
|
|
except subprocess.CalledProcessError: |
|
|
window_title = "Desktop" |
|
|
|
|
|
return EnvState(screenshot=screenshot_bytes, url=window_title) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to capture state: {e}") |
|
|
|
|
|
return EnvState(screenshot=b"", url="Error") |
|
|
|
|
|
def open_web_browser(self) -> EnvState: |
|
|
"""Launch Firefox""" |
|
|
subprocess.Popen(["firefox"], env={**os.environ, "DISPLAY": self.display}, |
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
|
|
time.sleep(3) |
|
|
return self.current_state() |
|
|
|
|
|
def click_at(self, x: int, y: int) -> EnvState: |
|
|
self._run_cmd(["xdotool", "mousemove", str(x), str(y), "click", "1"]) |
|
|
time.sleep(0.5) |
|
|
return self.current_state() |
|
|
|
|
|
def hover_at(self, x: int, y: int) -> EnvState: |
|
|
self._run_cmd(["xdotool", "mousemove", str(x), str(y)]) |
|
|
time.sleep(0.5) |
|
|
return self.current_state() |
|
|
|
|
|
def type_text_at( |
|
|
self, |
|
|
x: int, |
|
|
y: int, |
|
|
text: str, |
|
|
press_enter: bool, |
|
|
clear_before_typing: bool, |
|
|
) -> EnvState: |
|
|
|
|
|
self.click_at(x, y) |
|
|
|
|
|
if clear_before_typing: |
|
|
|
|
|
self._run_cmd(["xdotool", "key", "ctrl+a", "Delete"]) |
|
|
time.sleep(0.2) |
|
|
|
|
|
self._run_cmd(["xdotool", "type", "--", text]) |
|
|
|
|
|
if press_enter: |
|
|
self._run_cmd(["xdotool", "key", "Return"]) |
|
|
|
|
|
time.sleep(0.5) |
|
|
return self.current_state() |
|
|
|
|
|
def scroll_document( |
|
|
self, direction: Literal["up", "down", "left", "right"] |
|
|
) -> EnvState: |
|
|
if direction == "up": |
|
|
self._run_cmd(["xdotool", "click", "4"]) |
|
|
elif direction == "down": |
|
|
self._run_cmd(["xdotool", "click", "5"]) |
|
|
|
|
|
return self.current_state() |
|
|
|
|
|
def scroll_at( |
|
|
self, |
|
|
x: int, |
|
|
y: int, |
|
|
direction: Literal["up", "down", "left", "right"], |
|
|
magnitude: int, |
|
|
) -> EnvState: |
|
|
|
|
|
self._run_cmd(["xdotool", "mousemove", str(x), str(y)]) |
|
|
|
|
|
|
|
|
clicks = max(1, magnitude // 100) |
|
|
|
|
|
button = "4" if direction == "up" else "5" |
|
|
if direction in ["left", "right"]: |
|
|
|
|
|
pass |
|
|
else: |
|
|
for _ in range(clicks): |
|
|
self._run_cmd(["xdotool", "click", button]) |
|
|
time.sleep(0.1) |
|
|
|
|
|
return self.current_state() |
|
|
|
|
|
def wait_5_seconds(self) -> EnvState: |
|
|
time.sleep(5) |
|
|
return self.current_state() |
|
|
|
|
|
def go_back(self) -> EnvState: |
|
|
|
|
|
self._run_cmd(["xdotool", "key", "alt+Left"]) |
|
|
return self.current_state() |
|
|
|
|
|
def go_forward(self) -> EnvState: |
|
|
|
|
|
self._run_cmd(["xdotool", "key", "alt+Right"]) |
|
|
return self.current_state() |
|
|
|
|
|
def search(self) -> EnvState: |
|
|
|
|
|
self.open_web_browser() |
|
|
time.sleep(1) |
|
|
self._run_cmd(["xdotool", "key", "ctrl+l"]) |
|
|
return self.current_state() |
|
|
|
|
|
def navigate(self, url: str) -> EnvState: |
|
|
|
|
|
subprocess.Popen(["firefox", url], env={**os.environ, "DISPLAY": self.display}, |
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
|
|
time.sleep(3) |
|
|
return self.current_state() |
|
|
|
|
|
def key_combination(self, keys: List[str]) -> EnvState: |
|
|
|
|
|
|
|
|
key_str = "+".join(keys) |
|
|
self._run_cmd(["xdotool", "key", key_str]) |
|
|
return self.current_state() |
|
|
|
|
|
def drag_and_drop( |
|
|
self, x: int, y: int, destination_x: int, destination_y: int |
|
|
) -> EnvState: |
|
|
self._run_cmd(["xdotool", "mousemove", str(x), str(y)]) |
|
|
self._run_cmd(["xdotool", "mousedown", "1"]) |
|
|
time.sleep(0.2) |
|
|
self._run_cmd(["xdotool", "mousemove", str(destination_x), str(destination_y)]) |
|
|
time.sleep(0.2) |
|
|
self._run_cmd(["xdotool", "mouseup", "1"]) |
|
|
return self.current_state() |
|
|
|