|
|
import os |
|
|
import time |
|
|
import base64 |
|
|
import subprocess |
|
|
import json |
|
|
import requests |
|
|
from typing import Optional, Dict, Any, List |
|
|
from pathlib import Path |
|
|
from PIL import Image |
|
|
import io |
|
|
from loguru import logger |
|
|
from .x11_computer import X11Computer |
|
|
|
|
|
|
|
|
logger.add("/app/logs/agent.log", rotation="100 MB", retention="7 days") |
|
|
|
|
|
class GeminiClient: |
|
|
"""Client for interacting with Gemini API""" |
|
|
|
|
|
def __init__(self, api_key: str): |
|
|
self.api_key = api_key |
|
|
self.url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key={self.api_key}" |
|
|
|
|
|
def generate_actions(self, task: str, screenshot_base64: Optional[str] = None) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Generate actions based on task and screenshot |
|
|
""" |
|
|
system_prompt = """ |
|
|
You are a Computer-Using Agent capable of controlling a Linux desktop. |
|
|
You will receive a task description and a screenshot of the current screen. |
|
|
|
|
|
Your goal is to generate a list of actions to accomplish the task. |
|
|
|
|
|
Supported actions: |
|
|
- {"action": "mousemove", "x": int, "y": int} -> Moves mouse to coordinates (click_at/hover_at) |
|
|
- {"action": "click", "button": int} -> Clicks mouse button (1=left) |
|
|
- {"action": "type", "text": str} -> Types text |
|
|
- {"action": "key", "key": str} -> Presses key combination (e.g., "Return", "ctrl+c") |
|
|
- {"action": "launch", "app": str} -> Launches application |
|
|
- {"action": "wait", "seconds": float} -> Waits |
|
|
- {"action": "done", "message": str} -> Task completed |
|
|
- {"action": "fail", "message": str} -> Task failed |
|
|
|
|
|
Return ONLY a JSON array of actions. |
|
|
""" |
|
|
|
|
|
parts = [{"text": system_prompt}, {"text": f"Task: {task}"}] |
|
|
|
|
|
if screenshot_base64: |
|
|
parts.append({ |
|
|
"inline_data": { |
|
|
"mime_type": "image/png", |
|
|
"data": screenshot_base64 |
|
|
} |
|
|
}) |
|
|
|
|
|
data = { |
|
|
"contents": [{"parts": parts}], |
|
|
"generationConfig": { |
|
|
"temperature": 0.1, |
|
|
"maxOutputTokens": 1024, |
|
|
"responseMimeType": "application/json" |
|
|
} |
|
|
} |
|
|
|
|
|
try: |
|
|
response = requests.post(self.url, json=data, headers={"Content-Type": "application/json"}) |
|
|
if response.status_code == 200: |
|
|
result = response.json() |
|
|
try: |
|
|
text = result['candidates'][0]['content']['parts'][0]['text'] |
|
|
text = text.replace("```json", "").replace("```", "").strip() |
|
|
return json.loads(text) |
|
|
except (KeyError, json.JSONDecodeError) as e: |
|
|
logger.error(f"Failed to parse Gemini response: {e}") |
|
|
return [{"action": "fail", "message": "Failed to parse AI response"}] |
|
|
else: |
|
|
logger.error(f"Gemini API error: {response.text}") |
|
|
return [{"action": "fail", "message": f"API Error: {response.status_code}"}] |
|
|
except Exception as e: |
|
|
logger.error(f"Request failed: {e}") |
|
|
return [{"action": "fail", "message": f"Connection failed: {str(e)}"}] |
|
|
|
|
|
|
|
|
class ComputerUsingAgent: |
|
|
""" |
|
|
Computer-Using Agent that can interact with desktop environment |
|
|
using the standard Computer interface |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self.display = os.getenv("DISPLAY", ":1") |
|
|
self.computer = X11Computer(self.display) |
|
|
self.current_task = None |
|
|
self.task_status = "idle" |
|
|
|
|
|
|
|
|
api_key = os.getenv("GEMINI_API_KEY", "AIzaSyCXd43s3-sCSUJPkkXa1-LzXCMzFc9_xMI") |
|
|
self.llm = GeminiClient(api_key) |
|
|
|
|
|
logger.info("Computer-Using Agent initialized with X11Computer") |
|
|
|
|
|
def execute_task(self, task_description: str) -> Dict[str, Any]: |
|
|
"""Execute a task using Gemini for reasoning and Computer interface for action""" |
|
|
self.current_task = task_description |
|
|
self.task_status = "running" |
|
|
logger.info(f"Executing task: {task_description}") |
|
|
|
|
|
steps_executed = [] |
|
|
final_message = "" |
|
|
success = False |
|
|
|
|
|
try: |
|
|
|
|
|
state = self.computer.current_state() |
|
|
screenshot_b64 = base64.b64encode(state.screenshot).decode() if state.screenshot else None |
|
|
|
|
|
|
|
|
actions = self.llm.generate_actions(task_description, screenshot_b64) |
|
|
|
|
|
|
|
|
for action in actions: |
|
|
act_type = action.get("action") |
|
|
|
|
|
if act_type == "done": |
|
|
success = True |
|
|
final_message = action.get("message", "Task completed") |
|
|
break |
|
|
|
|
|
if act_type == "fail": |
|
|
success = False |
|
|
final_message = action.get("message", "Task failed") |
|
|
break |
|
|
|
|
|
|
|
|
try: |
|
|
if act_type == "mousemove": |
|
|
self.computer.hover_at(action["x"], action["y"]) |
|
|
elif act_type == "click": |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
subprocess.run(["xdotool", "click", str(action.get("button", 1))], |
|
|
env={**os.environ, "DISPLAY": self.display}) |
|
|
|
|
|
elif act_type == "type": |
|
|
|
|
|
|
|
|
subprocess.run(["xdotool", "type", "--", action["text"]], |
|
|
env={**os.environ, "DISPLAY": self.display}) |
|
|
|
|
|
elif act_type == "key": |
|
|
self.computer.key_combination([action["key"]]) |
|
|
|
|
|
elif act_type == "launch": |
|
|
if action["app"] == "firefox": |
|
|
self.computer.open_web_browser() |
|
|
else: |
|
|
|
|
|
subprocess.Popen([action["app"]], |
|
|
env={**os.environ, "DISPLAY": self.display}, |
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
|
|
time.sleep(2) |
|
|
|
|
|
elif act_type == "wait": |
|
|
self.computer.wait_5_seconds() |
|
|
|
|
|
steps_executed.append(f"Executed: {act_type} {action}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Action execution failed: {e}") |
|
|
steps_executed.append(f"Failed: {act_type} - {e}") |
|
|
|
|
|
if not final_message: |
|
|
final_message = "Actions executed." |
|
|
success = True |
|
|
|
|
|
|
|
|
final_state = self.computer.current_state() |
|
|
final_screenshot = base64.b64encode(final_state.screenshot).decode() if final_state.screenshot else None |
|
|
|
|
|
self.task_status = "completed" if success else "failed" |
|
|
|
|
|
return { |
|
|
"success": success, |
|
|
"message": final_message, |
|
|
"steps_executed": steps_executed, |
|
|
"screenshot": final_screenshot, |
|
|
"task": task_description |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Task execution error: {e}") |
|
|
self.task_status = "error" |
|
|
return { |
|
|
"success": False, |
|
|
"message": f"Error: {str(e)}", |
|
|
"steps_executed": steps_executed, |
|
|
"screenshot": None, |
|
|
"task": task_description |
|
|
} |
|
|
finally: |
|
|
self.current_task = None |
|
|
|
|
|
def stop(self): |
|
|
"""Stop current task""" |
|
|
logger.info("Stopping current task") |
|
|
self.task_status = "stopped" |
|
|
self.current_task = None |
|
|
|
|
|
def get_status(self) -> Dict[str, Any]: |
|
|
"""Get current agent status""" |
|
|
return { |
|
|
"status": self.task_status, |
|
|
"current_task": self.current_task, |
|
|
"display": self.display |
|
|
} |
|
|
|
|
|
|