vnc-browser-agent / vortex_api.py
megharudushi's picture
Upload vortex_api.py with huggingface_hub
f31bb42 verified
#!/usr/bin/env python3
"""
VORTEX@SANDBOX Agent Control API
Ultra-fast REST + WebSocket interface for AI agents
"""
import asyncio
import base64
import os
import subprocess
import json
from typing import Optional
from io import BytesIO
from fastapi import FastAPI, WebSocket, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import uvicorn
# Optional imports with fallbacks
try:
import pyautogui
pyautogui.FAILSAFE = False
pyautogui.PAUSE = 0.01 # Minimal delay for speed
except:
pyautogui = None
try:
import mss
except:
mss = None
try:
from PIL import Image
except:
Image = None
app = FastAPI(
title="vortex@sandbox API",
description="Agent control interface for containerized Linux desktop",
version="1.0.0"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# ============================================
# Request Models
# ============================================
class MouseMove(BaseModel):
x: int
y: int
class MouseClick(BaseModel):
x: int
y: int
button: str = "left"
clicks: int = 1
class KeyPress(BaseModel):
key: str
modifiers: list[str] = []
class TypeText(BaseModel):
text: str
interval: float = 0.01
class Screenshot(BaseModel):
region: Optional[list[int]] = None # [x, y, w, h]
format: str = "png"
class RunCommand(BaseModel):
command: str
timeout: int = 30
class Navigate(BaseModel):
url: str
# ============================================
# Core Endpoints
# ============================================
@app.get("/")
async def root():
return {
"name": "vortex@sandbox",
"version": "1.0.0",
"status": "running",
"endpoints": {
"mouse": "/mouse/move, /mouse/click",
"keyboard": "/key, /type",
"screen": "/screenshot",
"system": "/exec, /health",
"browser": "/navigate, /cdp"
}
}
@app.get("/health")
async def health():
return {"status": "healthy", "display": os.environ.get("DISPLAY", ":99")}
# ============================================
# Mouse Control
# ============================================
@app.post("/mouse/move")
async def mouse_move(req: MouseMove):
if pyautogui:
pyautogui.moveTo(req.x, req.y, duration=0)
else:
subprocess.run(["xdotool", "mousemove", str(req.x), str(req.y)])
return {"moved": [req.x, req.y]}
@app.post("/mouse/click")
async def mouse_click(req: MouseClick):
if pyautogui:
pyautogui.click(req.x, req.y, clicks=req.clicks, button=req.button)
else:
btn = {"left": "1", "middle": "2", "right": "3"}.get(req.button, "1")
subprocess.run(["xdotool", "mousemove", str(req.x), str(req.y)])
for _ in range(req.clicks):
subprocess.run(["xdotool", "click", btn])
return {"clicked": [req.x, req.y], "button": req.button}
@app.post("/mouse/scroll")
async def mouse_scroll(direction: str = "down", amount: int = 3):
if pyautogui:
pyautogui.scroll(-amount if direction == "down" else amount)
else:
btn = "5" if direction == "down" else "4"
for _ in range(amount):
subprocess.run(["xdotool", "click", btn])
return {"scrolled": direction, "amount": amount}
# ============================================
# Keyboard Control
# ============================================
@app.post("/key")
async def key_press(req: KeyPress):
if pyautogui:
if req.modifiers:
pyautogui.hotkey(*req.modifiers, req.key)
else:
pyautogui.press(req.key)
else:
key_combo = "+".join(req.modifiers + [req.key]) if req.modifiers else req.key
subprocess.run(["xdotool", "key", key_combo])
return {"pressed": req.key, "modifiers": req.modifiers}
@app.post("/type")
async def type_text(req: TypeText):
if pyautogui:
pyautogui.write(req.text, interval=req.interval)
else:
subprocess.run(["xdotool", "type", "--delay", str(int(req.interval*1000)), req.text])
return {"typed": req.text}
@app.post("/hotkey")
async def hotkey(keys: list[str]):
if pyautogui:
pyautogui.hotkey(*keys)
else:
subprocess.run(["xdotool", "key", "+".join(keys)])
return {"hotkey": keys}
# ============================================
# Screenshot
# ============================================
@app.post("/screenshot")
async def screenshot(req: Screenshot = Screenshot()):
try:
if mss:
with mss.mss() as sct:
monitor = sct.monitors[1]
if req.region:
monitor = {"left": req.region[0], "top": req.region[1],
"width": req.region[2], "height": req.region[3]}
img = sct.grab(monitor)
if Image:
pil_img = Image.frombytes("RGB", img.size, img.bgra, "raw", "BGRX")
buffer = BytesIO()
pil_img.save(buffer, format=req.format.upper())
data = base64.b64encode(buffer.getvalue()).decode()
return {"success": True, "format": req.format, "data": data,
"size": [img.width, img.height]}
# Fallback to scrot
result = subprocess.run(["scrot", "-o", "/tmp/screen.png"], capture_output=True)
with open("/tmp/screen.png", "rb") as f:
data = base64.b64encode(f.read()).decode()
return {"success": True, "format": "png", "data": data}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/screenshot.png")
async def screenshot_direct():
"""Direct PNG download"""
try:
subprocess.run(["scrot", "-o", "/tmp/screen.png"], capture_output=True)
with open("/tmp/screen.png", "rb") as f:
data = f.read()
from fastapi.responses import Response
return Response(content=data, media_type="image/png")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============================================
# System Commands
# ============================================
@app.post("/exec")
async def exec_command(req: RunCommand):
try:
result = subprocess.run(
req.command,
shell=True,
capture_output=True,
text=True,
timeout=req.timeout
)
return {
"success": result.returncode == 0,
"stdout": result.stdout,
"stderr": result.stderr,
"code": result.returncode
}
except subprocess.TimeoutExpired:
raise HTTPException(status_code=408, detail="Command timed out")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============================================
# Browser Control (via CDP)
# ============================================
@app.post("/navigate")
async def navigate(req: Navigate):
"""Navigate browser via xdotool (focus + Ctrl+L + URL + Enter)"""
try:
# Focus Chromium window
subprocess.run(["xdotool", "search", "--name", "Chromium", "windowactivate"], timeout=2)
await asyncio.sleep(0.1)
# Ctrl+L to focus address bar
subprocess.run(["xdotool", "key", "ctrl+l"], timeout=1)
await asyncio.sleep(0.1)
# Type URL
subprocess.run(["xdotool", "type", "--delay", "10", req.url], timeout=10)
await asyncio.sleep(0.1)
# Press Enter
subprocess.run(["xdotool", "key", "Return"], timeout=1)
return {"navigated": req.url}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/cdp")
async def get_cdp_info():
"""Get Chrome DevTools Protocol endpoint info"""
try:
import httpx
async with httpx.AsyncClient() as client:
resp = await client.get("http://localhost:9222/json/version", timeout=5)
return resp.json()
except:
return {"error": "CDP not available", "port": 9222}
# ============================================
# WebSocket for Real-time Control
# ============================================
@app.websocket("/ws")
async def websocket_control(ws: WebSocket):
"""WebSocket for streaming commands"""
await ws.accept()
try:
while True:
data = await ws.receive_text()
cmd = json.loads(data)
action = cmd.get("action")
result = {"error": "unknown action"}
if action == "move":
subprocess.run(["xdotool", "mousemove", str(cmd["x"]), str(cmd["y"])])
result = {"moved": [cmd["x"], cmd["y"]]}
elif action == "click":
subprocess.run(["xdotool", "mousemove", str(cmd["x"]), str(cmd["y"])])
subprocess.run(["xdotool", "click", "1"])
result = {"clicked": [cmd["x"], cmd["y"]]}
elif action == "type":
subprocess.run(["xdotool", "type", cmd["text"]])
result = {"typed": cmd["text"]}
elif action == "key":
subprocess.run(["xdotool", "key", cmd["key"]])
result = {"pressed": cmd["key"]}
elif action == "screenshot":
subprocess.run(["scrot", "-o", "/tmp/ws_screen.png"])
with open("/tmp/ws_screen.png", "rb") as f:
img_data = base64.b64encode(f.read()).decode()
result = {"screenshot": img_data}
await ws.send_text(json.dumps(result))
except Exception as e:
await ws.close()
# ============================================
# Run Server
# ============================================
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8080, log_level="warning")