tiny-desktop / agent /cua_agent.py
Likho User
Initial commit of Tiny Desktop
bcb86b5
import os
import time
import base64
import subprocess
import json
import requests
from typing import Optional, Dict, Any, List
from pathlib import Path
from PIL import Image
import io
from loguru import logger
from .x11_computer import X11Computer
# Configure logging
logger.add("/app/logs/agent.log", rotation="100 MB", retention="7 days")
class GeminiClient:
"""Client for interacting with Gemini API"""
def __init__(self, api_key: str):
self.api_key = api_key
self.url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key={self.api_key}"
def generate_actions(self, task: str, screenshot_base64: Optional[str] = None) -> List[Dict[str, Any]]:
"""
Generate actions based on task and screenshot
"""
system_prompt = """
You are a Computer-Using Agent capable of controlling a Linux desktop.
You will receive a task description and a screenshot of the current screen.
Your goal is to generate a list of actions to accomplish the task.
Supported actions:
- {"action": "mousemove", "x": int, "y": int} -> Moves mouse to coordinates (click_at/hover_at)
- {"action": "click", "button": int} -> Clicks mouse button (1=left)
- {"action": "type", "text": str} -> Types text
- {"action": "key", "key": str} -> Presses key combination (e.g., "Return", "ctrl+c")
- {"action": "launch", "app": str} -> Launches application
- {"action": "wait", "seconds": float} -> Waits
- {"action": "done", "message": str} -> Task completed
- {"action": "fail", "message": str} -> Task failed
Return ONLY a JSON array of actions.
"""
parts = [{"text": system_prompt}, {"text": f"Task: {task}"}]
if screenshot_base64:
parts.append({
"inline_data": {
"mime_type": "image/png",
"data": screenshot_base64
}
})
data = {
"contents": [{"parts": parts}],
"generationConfig": {
"temperature": 0.1,
"maxOutputTokens": 1024,
"responseMimeType": "application/json"
}
}
try:
response = requests.post(self.url, json=data, headers={"Content-Type": "application/json"})
if response.status_code == 200:
result = response.json()
try:
text = result['candidates'][0]['content']['parts'][0]['text']
text = text.replace("```json", "").replace("```", "").strip()
return json.loads(text)
except (KeyError, json.JSONDecodeError) as e:
logger.error(f"Failed to parse Gemini response: {e}")
return [{"action": "fail", "message": "Failed to parse AI response"}]
else:
logger.error(f"Gemini API error: {response.text}")
return [{"action": "fail", "message": f"API Error: {response.status_code}"}]
except Exception as e:
logger.error(f"Request failed: {e}")
return [{"action": "fail", "message": f"Connection failed: {str(e)}"}]
class ComputerUsingAgent:
"""
Computer-Using Agent that can interact with desktop environment
using the standard Computer interface
"""
def __init__(self):
self.display = os.getenv("DISPLAY", ":1")
self.computer = X11Computer(self.display)
self.current_task = None
self.task_status = "idle"
# Initialize Gemini Client
api_key = os.getenv("GEMINI_API_KEY", "AIzaSyCXd43s3-sCSUJPkkXa1-LzXCMzFc9_xMI")
self.llm = GeminiClient(api_key)
logger.info("Computer-Using Agent initialized with X11Computer")
def execute_task(self, task_description: str) -> Dict[str, Any]:
"""Execute a task using Gemini for reasoning and Computer interface for action"""
self.current_task = task_description
self.task_status = "running"
logger.info(f"Executing task: {task_description}")
steps_executed = []
final_message = ""
success = False
try:
# 1. Capture initial state
state = self.computer.current_state()
screenshot_b64 = base64.b64encode(state.screenshot).decode() if state.screenshot else None
# 2. Get plan from Gemini
actions = self.llm.generate_actions(task_description, screenshot_b64)
# 3. Execute actions
for action in actions:
act_type = action.get("action")
if act_type == "done":
success = True
final_message = action.get("message", "Task completed")
break
if act_type == "fail":
success = False
final_message = action.get("message", "Task failed")
break
# Map JSON actions to Computer interface methods
try:
if act_type == "mousemove":
self.computer.hover_at(action["x"], action["y"])
elif act_type == "click":
# Assuming last mousemove set the position, or we need position
# For now, just click at current position (requires state tracking or update)
# X11Computer click_at requires x,y.
# Simplification: use xdotool click directly via key_combination or specific method if added
# Or better: update prompt to always provide x,y for click
# For now, let's assume click happens at last known location or we use a direct command
# But we should strictly use Computer interface.
# Let's use a helper to get current mouse pos if possible, or just click 0,0 (bad)
# Re-reading X11Computer: click_at takes x,y.
# If prompt gives "click" without x,y, it implies "click here".
# We'll implement a "click_current" in X11Computer or just use xdotool directly for this edge case
# OR: Update prompt to ensure click has coordinates.
# Let's assume for this refactor we map "click" to "click button 1" via key_combination or similar?
# No, let's just use a direct xdotool call for "click current" since interface doesn't support it
# Wait, I can add `click_current` to X11Computer? No, interface is fixed.
# I will use `xdotool click` via `_run_cmd` (which is private).
# Let's use `key_combination` if possible? No.
# I'll just use `self.computer._run_cmd` for now as a pragmatic fix, or `click_at(0,0)` if I tracked position.
# Actually, `mousemove` sets position. `click` in prompt usually follows.
# Let's just run the raw command for now to be safe.
subprocess.run(["xdotool", "click", str(action.get("button", 1))],
env={**os.environ, "DISPLAY": self.display})
elif act_type == "type":
# Computer interface type_text_at requires x,y.
# We'll use a direct type command for now as we don't always want to click-to-focus specific coords
subprocess.run(["xdotool", "type", "--", action["text"]],
env={**os.environ, "DISPLAY": self.display})
elif act_type == "key":
self.computer.key_combination([action["key"]])
elif act_type == "launch":
if action["app"] == "firefox":
self.computer.open_web_browser()
else:
# Fallback for other apps
subprocess.Popen([action["app"]],
env={**os.environ, "DISPLAY": self.display},
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(2)
elif act_type == "wait":
self.computer.wait_5_seconds() # Or custom sleep
steps_executed.append(f"Executed: {act_type} {action}")
except Exception as e:
logger.error(f"Action execution failed: {e}")
steps_executed.append(f"Failed: {act_type} - {e}")
if not final_message:
final_message = "Actions executed."
success = True
# Capture final state
final_state = self.computer.current_state()
final_screenshot = base64.b64encode(final_state.screenshot).decode() if final_state.screenshot else None
self.task_status = "completed" if success else "failed"
return {
"success": success,
"message": final_message,
"steps_executed": steps_executed,
"screenshot": final_screenshot,
"task": task_description
}
except Exception as e:
logger.error(f"Task execution error: {e}")
self.task_status = "error"
return {
"success": False,
"message": f"Error: {str(e)}",
"steps_executed": steps_executed,
"screenshot": None,
"task": task_description
}
finally:
self.current_task = None
def stop(self):
"""Stop current task"""
logger.info("Stopping current task")
self.task_status = "stopped"
self.current_task = None
def get_status(self) -> Dict[str, Any]:
"""Get current agent status"""
return {
"status": self.task_status,
"current_task": self.current_task,
"display": self.display
}