File size: 10,664 Bytes
bcb86b5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
import os
import time
import base64
import subprocess
import json
import requests
from typing import Optional, Dict, Any, List
from pathlib import Path
from PIL import Image
import io
from loguru import logger
from .x11_computer import X11Computer
# Configure logging
logger.add("/app/logs/agent.log", rotation="100 MB", retention="7 days")
class GeminiClient:
"""Client for interacting with Gemini API"""
def __init__(self, api_key: str):
self.api_key = api_key
self.url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key={self.api_key}"
def generate_actions(self, task: str, screenshot_base64: Optional[str] = None) -> List[Dict[str, Any]]:
"""
Generate actions based on task and screenshot
"""
system_prompt = """
You are a Computer-Using Agent capable of controlling a Linux desktop.
You will receive a task description and a screenshot of the current screen.
Your goal is to generate a list of actions to accomplish the task.
Supported actions:
- {"action": "mousemove", "x": int, "y": int} -> Moves mouse to coordinates (click_at/hover_at)
- {"action": "click", "button": int} -> Clicks mouse button (1=left)
- {"action": "type", "text": str} -> Types text
- {"action": "key", "key": str} -> Presses key combination (e.g., "Return", "ctrl+c")
- {"action": "launch", "app": str} -> Launches application
- {"action": "wait", "seconds": float} -> Waits
- {"action": "done", "message": str} -> Task completed
- {"action": "fail", "message": str} -> Task failed
Return ONLY a JSON array of actions.
"""
parts = [{"text": system_prompt}, {"text": f"Task: {task}"}]
if screenshot_base64:
parts.append({
"inline_data": {
"mime_type": "image/png",
"data": screenshot_base64
}
})
data = {
"contents": [{"parts": parts}],
"generationConfig": {
"temperature": 0.1,
"maxOutputTokens": 1024,
"responseMimeType": "application/json"
}
}
try:
response = requests.post(self.url, json=data, headers={"Content-Type": "application/json"})
if response.status_code == 200:
result = response.json()
try:
text = result['candidates'][0]['content']['parts'][0]['text']
text = text.replace("```json", "").replace("```", "").strip()
return json.loads(text)
except (KeyError, json.JSONDecodeError) as e:
logger.error(f"Failed to parse Gemini response: {e}")
return [{"action": "fail", "message": "Failed to parse AI response"}]
else:
logger.error(f"Gemini API error: {response.text}")
return [{"action": "fail", "message": f"API Error: {response.status_code}"}]
except Exception as e:
logger.error(f"Request failed: {e}")
return [{"action": "fail", "message": f"Connection failed: {str(e)}"}]
class ComputerUsingAgent:
"""
Computer-Using Agent that can interact with desktop environment
using the standard Computer interface
"""
def __init__(self):
self.display = os.getenv("DISPLAY", ":1")
self.computer = X11Computer(self.display)
self.current_task = None
self.task_status = "idle"
# Initialize Gemini Client
api_key = os.getenv("GEMINI_API_KEY", "AIzaSyCXd43s3-sCSUJPkkXa1-LzXCMzFc9_xMI")
self.llm = GeminiClient(api_key)
logger.info("Computer-Using Agent initialized with X11Computer")
def execute_task(self, task_description: str) -> Dict[str, Any]:
"""Execute a task using Gemini for reasoning and Computer interface for action"""
self.current_task = task_description
self.task_status = "running"
logger.info(f"Executing task: {task_description}")
steps_executed = []
final_message = ""
success = False
try:
# 1. Capture initial state
state = self.computer.current_state()
screenshot_b64 = base64.b64encode(state.screenshot).decode() if state.screenshot else None
# 2. Get plan from Gemini
actions = self.llm.generate_actions(task_description, screenshot_b64)
# 3. Execute actions
for action in actions:
act_type = action.get("action")
if act_type == "done":
success = True
final_message = action.get("message", "Task completed")
break
if act_type == "fail":
success = False
final_message = action.get("message", "Task failed")
break
# Map JSON actions to Computer interface methods
try:
if act_type == "mousemove":
self.computer.hover_at(action["x"], action["y"])
elif act_type == "click":
# Assuming last mousemove set the position, or we need position
# For now, just click at current position (requires state tracking or update)
# X11Computer click_at requires x,y.
# Simplification: use xdotool click directly via key_combination or specific method if added
# Or better: update prompt to always provide x,y for click
# For now, let's assume click happens at last known location or we use a direct command
# But we should strictly use Computer interface.
# Let's use a helper to get current mouse pos if possible, or just click 0,0 (bad)
# Re-reading X11Computer: click_at takes x,y.
# If prompt gives "click" without x,y, it implies "click here".
# We'll implement a "click_current" in X11Computer or just use xdotool directly for this edge case
# OR: Update prompt to ensure click has coordinates.
# Let's assume for this refactor we map "click" to "click button 1" via key_combination or similar?
# No, let's just use a direct xdotool call for "click current" since interface doesn't support it
# Wait, I can add `click_current` to X11Computer? No, interface is fixed.
# I will use `xdotool click` via `_run_cmd` (which is private).
# Let's use `key_combination` if possible? No.
# I'll just use `self.computer._run_cmd` for now as a pragmatic fix, or `click_at(0,0)` if I tracked position.
# Actually, `mousemove` sets position. `click` in prompt usually follows.
# Let's just run the raw command for now to be safe.
subprocess.run(["xdotool", "click", str(action.get("button", 1))],
env={**os.environ, "DISPLAY": self.display})
elif act_type == "type":
# Computer interface type_text_at requires x,y.
# We'll use a direct type command for now as we don't always want to click-to-focus specific coords
subprocess.run(["xdotool", "type", "--", action["text"]],
env={**os.environ, "DISPLAY": self.display})
elif act_type == "key":
self.computer.key_combination([action["key"]])
elif act_type == "launch":
if action["app"] == "firefox":
self.computer.open_web_browser()
else:
# Fallback for other apps
subprocess.Popen([action["app"]],
env={**os.environ, "DISPLAY": self.display},
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(2)
elif act_type == "wait":
self.computer.wait_5_seconds() # Or custom sleep
steps_executed.append(f"Executed: {act_type} {action}")
except Exception as e:
logger.error(f"Action execution failed: {e}")
steps_executed.append(f"Failed: {act_type} - {e}")
if not final_message:
final_message = "Actions executed."
success = True
# Capture final state
final_state = self.computer.current_state()
final_screenshot = base64.b64encode(final_state.screenshot).decode() if final_state.screenshot else None
self.task_status = "completed" if success else "failed"
return {
"success": success,
"message": final_message,
"steps_executed": steps_executed,
"screenshot": final_screenshot,
"task": task_description
}
except Exception as e:
logger.error(f"Task execution error: {e}")
self.task_status = "error"
return {
"success": False,
"message": f"Error: {str(e)}",
"steps_executed": steps_executed,
"screenshot": None,
"task": task_description
}
finally:
self.current_task = None
def stop(self):
"""Stop current task"""
logger.info("Stopping current task")
self.task_status = "stopped"
self.current_task = None
def get_status(self) -> Dict[str, Any]:
"""Get current agent status"""
return {
"status": self.task_status,
"current_task": self.current_task,
"display": self.display
}
|