import os import time import base64 import subprocess import json import requests from typing import Optional, Dict, Any, List from pathlib import Path from PIL import Image import io from loguru import logger from .x11_computer import X11Computer # Configure logging logger.add("/app/logs/agent.log", rotation="100 MB", retention="7 days") class GeminiClient: """Client for interacting with Gemini API""" def __init__(self, api_key: str): self.api_key = api_key self.url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key={self.api_key}" def generate_actions(self, task: str, screenshot_base64: Optional[str] = None) -> List[Dict[str, Any]]: """ Generate actions based on task and screenshot """ system_prompt = """ You are a Computer-Using Agent capable of controlling a Linux desktop. You will receive a task description and a screenshot of the current screen. Your goal is to generate a list of actions to accomplish the task. Supported actions: - {"action": "mousemove", "x": int, "y": int} -> Moves mouse to coordinates (click_at/hover_at) - {"action": "click", "button": int} -> Clicks mouse button (1=left) - {"action": "type", "text": str} -> Types text - {"action": "key", "key": str} -> Presses key combination (e.g., "Return", "ctrl+c") - {"action": "launch", "app": str} -> Launches application - {"action": "wait", "seconds": float} -> Waits - {"action": "done", "message": str} -> Task completed - {"action": "fail", "message": str} -> Task failed Return ONLY a JSON array of actions. """ parts = [{"text": system_prompt}, {"text": f"Task: {task}"}] if screenshot_base64: parts.append({ "inline_data": { "mime_type": "image/png", "data": screenshot_base64 } }) data = { "contents": [{"parts": parts}], "generationConfig": { "temperature": 0.1, "maxOutputTokens": 1024, "responseMimeType": "application/json" } } try: response = requests.post(self.url, json=data, headers={"Content-Type": "application/json"}) if response.status_code == 200: result = response.json() try: text = result['candidates'][0]['content']['parts'][0]['text'] text = text.replace("```json", "").replace("```", "").strip() return json.loads(text) except (KeyError, json.JSONDecodeError) as e: logger.error(f"Failed to parse Gemini response: {e}") return [{"action": "fail", "message": "Failed to parse AI response"}] else: logger.error(f"Gemini API error: {response.text}") return [{"action": "fail", "message": f"API Error: {response.status_code}"}] except Exception as e: logger.error(f"Request failed: {e}") return [{"action": "fail", "message": f"Connection failed: {str(e)}"}] class ComputerUsingAgent: """ Computer-Using Agent that can interact with desktop environment using the standard Computer interface """ def __init__(self): self.display = os.getenv("DISPLAY", ":1") self.computer = X11Computer(self.display) self.current_task = None self.task_status = "idle" # Initialize Gemini Client api_key = os.getenv("GEMINI_API_KEY", "AIzaSyCXd43s3-sCSUJPkkXa1-LzXCMzFc9_xMI") self.llm = GeminiClient(api_key) logger.info("Computer-Using Agent initialized with X11Computer") def execute_task(self, task_description: str) -> Dict[str, Any]: """Execute a task using Gemini for reasoning and Computer interface for action""" self.current_task = task_description self.task_status = "running" logger.info(f"Executing task: {task_description}") steps_executed = [] final_message = "" success = False try: # 1. Capture initial state state = self.computer.current_state() screenshot_b64 = base64.b64encode(state.screenshot).decode() if state.screenshot else None # 2. Get plan from Gemini actions = self.llm.generate_actions(task_description, screenshot_b64) # 3. Execute actions for action in actions: act_type = action.get("action") if act_type == "done": success = True final_message = action.get("message", "Task completed") break if act_type == "fail": success = False final_message = action.get("message", "Task failed") break # Map JSON actions to Computer interface methods try: if act_type == "mousemove": self.computer.hover_at(action["x"], action["y"]) elif act_type == "click": # Assuming last mousemove set the position, or we need position # For now, just click at current position (requires state tracking or update) # X11Computer click_at requires x,y. # Simplification: use xdotool click directly via key_combination or specific method if added # Or better: update prompt to always provide x,y for click # For now, let's assume click happens at last known location or we use a direct command # But we should strictly use Computer interface. # Let's use a helper to get current mouse pos if possible, or just click 0,0 (bad) # Re-reading X11Computer: click_at takes x,y. # If prompt gives "click" without x,y, it implies "click here". # We'll implement a "click_current" in X11Computer or just use xdotool directly for this edge case # OR: Update prompt to ensure click has coordinates. # Let's assume for this refactor we map "click" to "click button 1" via key_combination or similar? # No, let's just use a direct xdotool call for "click current" since interface doesn't support it # Wait, I can add `click_current` to X11Computer? No, interface is fixed. # I will use `xdotool click` via `_run_cmd` (which is private). # Let's use `key_combination` if possible? No. # I'll just use `self.computer._run_cmd` for now as a pragmatic fix, or `click_at(0,0)` if I tracked position. # Actually, `mousemove` sets position. `click` in prompt usually follows. # Let's just run the raw command for now to be safe. subprocess.run(["xdotool", "click", str(action.get("button", 1))], env={**os.environ, "DISPLAY": self.display}) elif act_type == "type": # Computer interface type_text_at requires x,y. # We'll use a direct type command for now as we don't always want to click-to-focus specific coords subprocess.run(["xdotool", "type", "--", action["text"]], env={**os.environ, "DISPLAY": self.display}) elif act_type == "key": self.computer.key_combination([action["key"]]) elif act_type == "launch": if action["app"] == "firefox": self.computer.open_web_browser() else: # Fallback for other apps subprocess.Popen([action["app"]], env={**os.environ, "DISPLAY": self.display}, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) time.sleep(2) elif act_type == "wait": self.computer.wait_5_seconds() # Or custom sleep steps_executed.append(f"Executed: {act_type} {action}") except Exception as e: logger.error(f"Action execution failed: {e}") steps_executed.append(f"Failed: {act_type} - {e}") if not final_message: final_message = "Actions executed." success = True # Capture final state final_state = self.computer.current_state() final_screenshot = base64.b64encode(final_state.screenshot).decode() if final_state.screenshot else None self.task_status = "completed" if success else "failed" return { "success": success, "message": final_message, "steps_executed": steps_executed, "screenshot": final_screenshot, "task": task_description } except Exception as e: logger.error(f"Task execution error: {e}") self.task_status = "error" return { "success": False, "message": f"Error: {str(e)}", "steps_executed": steps_executed, "screenshot": None, "task": task_description } finally: self.current_task = None def stop(self): """Stop current task""" logger.info("Stopping current task") self.task_status = "stopped" self.current_task = None def get_status(self) -> Dict[str, Any]: """Get current agent status""" return { "status": self.task_status, "current_task": self.current_task, "display": self.display }