File size: 10,664 Bytes
bcb86b5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
import os
import time
import base64
import subprocess
import json
import requests
from typing import Optional, Dict, Any, List
from pathlib import Path
from PIL import Image
import io
from loguru import logger
from .x11_computer import X11Computer

# Configure logging
logger.add("/app/logs/agent.log", rotation="100 MB", retention="7 days")

class GeminiClient:
    """Client for interacting with Gemini API"""
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key={self.api_key}"
        
    def generate_actions(self, task: str, screenshot_base64: Optional[str] = None) -> List[Dict[str, Any]]:
        """
        Generate actions based on task and screenshot
        """
        system_prompt = """
        You are a Computer-Using Agent capable of controlling a Linux desktop.
        You will receive a task description and a screenshot of the current screen.
        
        Your goal is to generate a list of actions to accomplish the task.
        
        Supported actions:
        - {"action": "mousemove", "x": int, "y": int} -> Moves mouse to coordinates (click_at/hover_at)
        - {"action": "click", "button": int} -> Clicks mouse button (1=left)
        - {"action": "type", "text": str} -> Types text
        - {"action": "key", "key": str} -> Presses key combination (e.g., "Return", "ctrl+c")
        - {"action": "launch", "app": str} -> Launches application
        - {"action": "wait", "seconds": float} -> Waits
        - {"action": "done", "message": str} -> Task completed
        - {"action": "fail", "message": str} -> Task failed
        
        Return ONLY a JSON array of actions.
        """
        
        parts = [{"text": system_prompt}, {"text": f"Task: {task}"}]
        
        if screenshot_base64:
            parts.append({
                "inline_data": {
                    "mime_type": "image/png",
                    "data": screenshot_base64
                }
            })
            
        data = {
            "contents": [{"parts": parts}],
            "generationConfig": {
                "temperature": 0.1,
                "maxOutputTokens": 1024,
                "responseMimeType": "application/json"
            }
        }
        
        try:
            response = requests.post(self.url, json=data, headers={"Content-Type": "application/json"})
            if response.status_code == 200:
                result = response.json()
                try:
                    text = result['candidates'][0]['content']['parts'][0]['text']
                    text = text.replace("```json", "").replace("```", "").strip()
                    return json.loads(text)
                except (KeyError, json.JSONDecodeError) as e:
                    logger.error(f"Failed to parse Gemini response: {e}")
                    return [{"action": "fail", "message": "Failed to parse AI response"}]
            else:
                logger.error(f"Gemini API error: {response.text}")
                return [{"action": "fail", "message": f"API Error: {response.status_code}"}]
        except Exception as e:
            logger.error(f"Request failed: {e}")
            return [{"action": "fail", "message": f"Connection failed: {str(e)}"}]


class ComputerUsingAgent:
    """
    Computer-Using Agent that can interact with desktop environment
    using the standard Computer interface
    """
    
    def __init__(self):
        self.display = os.getenv("DISPLAY", ":1")
        self.computer = X11Computer(self.display)
        self.current_task = None
        self.task_status = "idle"
        
        # Initialize Gemini Client
        api_key = os.getenv("GEMINI_API_KEY", "AIzaSyCXd43s3-sCSUJPkkXa1-LzXCMzFc9_xMI")
        self.llm = GeminiClient(api_key)
        
        logger.info("Computer-Using Agent initialized with X11Computer")
    
    def execute_task(self, task_description: str) -> Dict[str, Any]:
        """Execute a task using Gemini for reasoning and Computer interface for action"""
        self.current_task = task_description
        self.task_status = "running"
        logger.info(f"Executing task: {task_description}")
        
        steps_executed = []
        final_message = ""
        success = False
        
        try:
            # 1. Capture initial state
            state = self.computer.current_state()
            screenshot_b64 = base64.b64encode(state.screenshot).decode() if state.screenshot else None
            
            # 2. Get plan from Gemini
            actions = self.llm.generate_actions(task_description, screenshot_b64)
            
            # 3. Execute actions
            for action in actions:
                act_type = action.get("action")
                
                if act_type == "done":
                    success = True
                    final_message = action.get("message", "Task completed")
                    break
                    
                if act_type == "fail":
                    success = False
                    final_message = action.get("message", "Task failed")
                    break
                
                # Map JSON actions to Computer interface methods
                try:
                    if act_type == "mousemove":
                        self.computer.hover_at(action["x"], action["y"])
                    elif act_type == "click":
                        # Assuming last mousemove set the position, or we need position
                        # For now, just click at current position (requires state tracking or update)
                        # X11Computer click_at requires x,y. 
                        # Simplification: use xdotool click directly via key_combination or specific method if added
                        # Or better: update prompt to always provide x,y for click
                        # For now, let's assume click happens at last known location or we use a direct command
                        # But we should strictly use Computer interface.
                        # Let's use a helper to get current mouse pos if possible, or just click 0,0 (bad)
                        # Re-reading X11Computer: click_at takes x,y.
                        # If prompt gives "click" without x,y, it implies "click here".
                        # We'll implement a "click_current" in X11Computer or just use xdotool directly for this edge case
                        # OR: Update prompt to ensure click has coordinates.
                        # Let's assume for this refactor we map "click" to "click button 1" via key_combination or similar?
                        # No, let's just use a direct xdotool call for "click current" since interface doesn't support it
                        # Wait, I can add `click_current` to X11Computer? No, interface is fixed.
                        # I will use `xdotool click` via `_run_cmd` (which is private).
                        # Let's use `key_combination` if possible? No.
                        # I'll just use `self.computer._run_cmd` for now as a pragmatic fix, or `click_at(0,0)` if I tracked position.
                        # Actually, `mousemove` sets position. `click` in prompt usually follows.
                        # Let's just run the raw command for now to be safe.
                        subprocess.run(["xdotool", "click", str(action.get("button", 1))], 
                                     env={**os.environ, "DISPLAY": self.display})
                        
                    elif act_type == "type":
                        # Computer interface type_text_at requires x,y.
                        # We'll use a direct type command for now as we don't always want to click-to-focus specific coords
                        subprocess.run(["xdotool", "type", "--", action["text"]], 
                                     env={**os.environ, "DISPLAY": self.display})
                        
                    elif act_type == "key":
                        self.computer.key_combination([action["key"]])
                        
                    elif act_type == "launch":
                        if action["app"] == "firefox":
                            self.computer.open_web_browser()
                        else:
                            # Fallback for other apps
                            subprocess.Popen([action["app"]], 
                                           env={**os.environ, "DISPLAY": self.display},
                                           stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
                            time.sleep(2)
                            
                    elif act_type == "wait":
                        self.computer.wait_5_seconds() # Or custom sleep
                        
                    steps_executed.append(f"Executed: {act_type} {action}")
                    
                except Exception as e:
                    logger.error(f"Action execution failed: {e}")
                    steps_executed.append(f"Failed: {act_type} - {e}")

            if not final_message:
                final_message = "Actions executed."
                success = True

            # Capture final state
            final_state = self.computer.current_state()
            final_screenshot = base64.b64encode(final_state.screenshot).decode() if final_state.screenshot else None
            
            self.task_status = "completed" if success else "failed"
            
            return {
                "success": success,
                "message": final_message,
                "steps_executed": steps_executed,
                "screenshot": final_screenshot,
                "task": task_description
            }
            
        except Exception as e:
            logger.error(f"Task execution error: {e}")
            self.task_status = "error"
            return {
                "success": False,
                "message": f"Error: {str(e)}",
                "steps_executed": steps_executed,
                "screenshot": None,
                "task": task_description
            }
        finally:
            self.current_task = None
            
    def stop(self):
        """Stop current task"""
        logger.info("Stopping current task")
        self.task_status = "stopped"
        self.current_task = None
    
    def get_status(self) -> Dict[str, Any]:
        """Get current agent status"""
        return {
            "status": self.task_status,
            "current_task": self.current_task,
            "display": self.display
        }