| | |
| | """ |
| | VORTEX@SANDBOX Agent Control API |
| | Ultra-fast REST + WebSocket interface for AI agents |
| | """ |
| |
|
| | import asyncio |
| | import base64 |
| | import os |
| | import subprocess |
| | import json |
| | from typing import Optional |
| | from io import BytesIO |
| |
|
| | from fastapi import FastAPI, WebSocket, HTTPException |
| | from fastapi.middleware.cors import CORSMiddleware |
| | from pydantic import BaseModel |
| | import uvicorn |
| |
|
| | |
| | try: |
| | import pyautogui |
| | pyautogui.FAILSAFE = False |
| | pyautogui.PAUSE = 0.01 |
| | except: |
| | pyautogui = None |
| |
|
| | try: |
| | import mss |
| | except: |
| | mss = None |
| |
|
| | try: |
| | from PIL import Image |
| | except: |
| | Image = None |
| |
|
| | app = FastAPI( |
| | title="vortex@sandbox API", |
| | description="Agent control interface for containerized Linux desktop", |
| | version="1.0.0" |
| | ) |
| |
|
| | app.add_middleware( |
| | CORSMiddleware, |
| | allow_origins=["*"], |
| | allow_methods=["*"], |
| | allow_headers=["*"], |
| | ) |
| |
|
| | |
| | |
| | |
| |
|
| | class MouseMove(BaseModel): |
| | x: int |
| | y: int |
| |
|
| | class MouseClick(BaseModel): |
| | x: int |
| | y: int |
| | button: str = "left" |
| | clicks: int = 1 |
| |
|
| | class KeyPress(BaseModel): |
| | key: str |
| | modifiers: list[str] = [] |
| |
|
| | class TypeText(BaseModel): |
| | text: str |
| | interval: float = 0.01 |
| |
|
| | class Screenshot(BaseModel): |
| | region: Optional[list[int]] = None |
| | format: str = "png" |
| |
|
| | class RunCommand(BaseModel): |
| | command: str |
| | timeout: int = 30 |
| |
|
| | class Navigate(BaseModel): |
| | url: str |
| |
|
| | |
| | |
| | |
| |
|
| | @app.get("/") |
| | async def root(): |
| | return { |
| | "name": "vortex@sandbox", |
| | "version": "1.0.0", |
| | "status": "running", |
| | "endpoints": { |
| | "mouse": "/mouse/move, /mouse/click", |
| | "keyboard": "/key, /type", |
| | "screen": "/screenshot", |
| | "system": "/exec, /health", |
| | "browser": "/navigate, /cdp" |
| | } |
| | } |
| |
|
| | @app.get("/health") |
| | async def health(): |
| | return {"status": "healthy", "display": os.environ.get("DISPLAY", ":99")} |
| |
|
| | |
| | |
| | |
| |
|
| | @app.post("/mouse/move") |
| | async def mouse_move(req: MouseMove): |
| | if pyautogui: |
| | pyautogui.moveTo(req.x, req.y, duration=0) |
| | else: |
| | subprocess.run(["xdotool", "mousemove", str(req.x), str(req.y)]) |
| | return {"moved": [req.x, req.y]} |
| |
|
| | @app.post("/mouse/click") |
| | async def mouse_click(req: MouseClick): |
| | if pyautogui: |
| | pyautogui.click(req.x, req.y, clicks=req.clicks, button=req.button) |
| | else: |
| | btn = {"left": "1", "middle": "2", "right": "3"}.get(req.button, "1") |
| | subprocess.run(["xdotool", "mousemove", str(req.x), str(req.y)]) |
| | for _ in range(req.clicks): |
| | subprocess.run(["xdotool", "click", btn]) |
| | return {"clicked": [req.x, req.y], "button": req.button} |
| |
|
| | @app.post("/mouse/scroll") |
| | async def mouse_scroll(direction: str = "down", amount: int = 3): |
| | if pyautogui: |
| | pyautogui.scroll(-amount if direction == "down" else amount) |
| | else: |
| | btn = "5" if direction == "down" else "4" |
| | for _ in range(amount): |
| | subprocess.run(["xdotool", "click", btn]) |
| | return {"scrolled": direction, "amount": amount} |
| |
|
| | |
| | |
| | |
| |
|
| | @app.post("/key") |
| | async def key_press(req: KeyPress): |
| | if pyautogui: |
| | if req.modifiers: |
| | pyautogui.hotkey(*req.modifiers, req.key) |
| | else: |
| | pyautogui.press(req.key) |
| | else: |
| | key_combo = "+".join(req.modifiers + [req.key]) if req.modifiers else req.key |
| | subprocess.run(["xdotool", "key", key_combo]) |
| | return {"pressed": req.key, "modifiers": req.modifiers} |
| |
|
| | @app.post("/type") |
| | async def type_text(req: TypeText): |
| | if pyautogui: |
| | pyautogui.write(req.text, interval=req.interval) |
| | else: |
| | subprocess.run(["xdotool", "type", "--delay", str(int(req.interval*1000)), req.text]) |
| | return {"typed": req.text} |
| |
|
| | @app.post("/hotkey") |
| | async def hotkey(keys: list[str]): |
| | if pyautogui: |
| | pyautogui.hotkey(*keys) |
| | else: |
| | subprocess.run(["xdotool", "key", "+".join(keys)]) |
| | return {"hotkey": keys} |
| |
|
| | |
| | |
| | |
| |
|
| | @app.post("/screenshot") |
| | async def screenshot(req: Screenshot = Screenshot()): |
| | try: |
| | if mss: |
| | with mss.mss() as sct: |
| | monitor = sct.monitors[1] |
| | if req.region: |
| | monitor = {"left": req.region[0], "top": req.region[1], |
| | "width": req.region[2], "height": req.region[3]} |
| | img = sct.grab(monitor) |
| |
|
| | if Image: |
| | pil_img = Image.frombytes("RGB", img.size, img.bgra, "raw", "BGRX") |
| | buffer = BytesIO() |
| | pil_img.save(buffer, format=req.format.upper()) |
| | data = base64.b64encode(buffer.getvalue()).decode() |
| | return {"success": True, "format": req.format, "data": data, |
| | "size": [img.width, img.height]} |
| |
|
| | |
| | result = subprocess.run(["scrot", "-o", "/tmp/screen.png"], capture_output=True) |
| | with open("/tmp/screen.png", "rb") as f: |
| | data = base64.b64encode(f.read()).decode() |
| | return {"success": True, "format": "png", "data": data} |
| |
|
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=str(e)) |
| |
|
| | @app.get("/screenshot.png") |
| | async def screenshot_direct(): |
| | """Direct PNG download""" |
| | try: |
| | subprocess.run(["scrot", "-o", "/tmp/screen.png"], capture_output=True) |
| | with open("/tmp/screen.png", "rb") as f: |
| | data = f.read() |
| | from fastapi.responses import Response |
| | return Response(content=data, media_type="image/png") |
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=str(e)) |
| |
|
| | |
| | |
| | |
| |
|
| | @app.post("/exec") |
| | async def exec_command(req: RunCommand): |
| | try: |
| | result = subprocess.run( |
| | req.command, |
| | shell=True, |
| | capture_output=True, |
| | text=True, |
| | timeout=req.timeout |
| | ) |
| | return { |
| | "success": result.returncode == 0, |
| | "stdout": result.stdout, |
| | "stderr": result.stderr, |
| | "code": result.returncode |
| | } |
| | except subprocess.TimeoutExpired: |
| | raise HTTPException(status_code=408, detail="Command timed out") |
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=str(e)) |
| |
|
| | |
| | |
| | |
| |
|
| | @app.post("/navigate") |
| | async def navigate(req: Navigate): |
| | """Navigate browser via xdotool (focus + Ctrl+L + URL + Enter)""" |
| | try: |
| | |
| | subprocess.run(["xdotool", "search", "--name", "Chromium", "windowactivate"], timeout=2) |
| | await asyncio.sleep(0.1) |
| |
|
| | |
| | subprocess.run(["xdotool", "key", "ctrl+l"], timeout=1) |
| | await asyncio.sleep(0.1) |
| |
|
| | |
| | subprocess.run(["xdotool", "type", "--delay", "10", req.url], timeout=10) |
| | await asyncio.sleep(0.1) |
| |
|
| | |
| | subprocess.run(["xdotool", "key", "Return"], timeout=1) |
| |
|
| | return {"navigated": req.url} |
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=str(e)) |
| |
|
| | @app.get("/cdp") |
| | async def get_cdp_info(): |
| | """Get Chrome DevTools Protocol endpoint info""" |
| | try: |
| | import httpx |
| | async with httpx.AsyncClient() as client: |
| | resp = await client.get("http://localhost:9222/json/version", timeout=5) |
| | return resp.json() |
| | except: |
| | return {"error": "CDP not available", "port": 9222} |
| |
|
| | |
| | |
| | |
| |
|
| | @app.websocket("/ws") |
| | async def websocket_control(ws: WebSocket): |
| | """WebSocket for streaming commands""" |
| | await ws.accept() |
| | try: |
| | while True: |
| | data = await ws.receive_text() |
| | cmd = json.loads(data) |
| | action = cmd.get("action") |
| |
|
| | result = {"error": "unknown action"} |
| |
|
| | if action == "move": |
| | subprocess.run(["xdotool", "mousemove", str(cmd["x"]), str(cmd["y"])]) |
| | result = {"moved": [cmd["x"], cmd["y"]]} |
| | elif action == "click": |
| | subprocess.run(["xdotool", "mousemove", str(cmd["x"]), str(cmd["y"])]) |
| | subprocess.run(["xdotool", "click", "1"]) |
| | result = {"clicked": [cmd["x"], cmd["y"]]} |
| | elif action == "type": |
| | subprocess.run(["xdotool", "type", cmd["text"]]) |
| | result = {"typed": cmd["text"]} |
| | elif action == "key": |
| | subprocess.run(["xdotool", "key", cmd["key"]]) |
| | result = {"pressed": cmd["key"]} |
| | elif action == "screenshot": |
| | subprocess.run(["scrot", "-o", "/tmp/ws_screen.png"]) |
| | with open("/tmp/ws_screen.png", "rb") as f: |
| | img_data = base64.b64encode(f.read()).decode() |
| | result = {"screenshot": img_data} |
| |
|
| | await ws.send_text(json.dumps(result)) |
| | except Exception as e: |
| | await ws.close() |
| |
|
| | |
| | |
| | |
| |
|
| | if __name__ == "__main__": |
| | uvicorn.run(app, host="0.0.0.0", port=8080, log_level="warning") |
| |
|