Spaces:
Build error
Build error
| #!/usr/bin/env python3 | |
| """ | |
| Enhanced Computer-Using Agent with VNC Integration | |
| Combines browser automation with full desktop environment access | |
| """ | |
| import asyncio | |
| import json | |
| import base64 | |
| import io | |
| import os | |
| import time | |
| import threading | |
| from typing import Dict, List, Optional, Any | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| import logging | |
| import gradio as gr | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image | |
| from playwright.async_api import async_playwright, Browser, BrowserContext, Page | |
| import requests | |
| from huggingface_hub import hf_hub_download, login | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class AgentState: | |
| """State management for the computer agent""" | |
| browser: Optional[Browser] = None | |
| context: Optional[BrowserContext] = None | |
| page: Optional[Page] = None | |
| is_running: bool = False | |
| screenshot_count: int = 0 | |
| action_history: List[str] = None | |
| vnc_port: int = 5901 | |
| def __post_init__(self): | |
| if self.action_history is None: | |
| self.action_history = [] | |
| class ComputerUsingAgent: | |
| """Enhanced Computer-Using Agent with VNC Integration""" | |
| def __init__(self): | |
| self.state = AgentState() | |
| self.setup_logging() | |
| self.vnc_url = f"http://localhost:{self.state.vnc_port}/vnc.html" | |
| def setup_logging(self): | |
| """Setup logging configuration""" | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler('agent.log'), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| async def initialize_browser(self, headless: bool = True, viewport_width: int = 1280, viewport_height: int = 720): | |
| """Initialize browser with specified settings""" | |
| try: | |
| logger.info("Initializing browser...") | |
| playwright = await async_playwright().start() | |
| # Launch browser with enhanced settings | |
| self.state.browser = await playwright.chromium.launch( | |
| headless=headless, | |
| args=[ | |
| "--no-sandbox", | |
| "--disable-dev-shm-usage", | |
| "--disable-web-security", | |
| "--disable-features=VizDisplayCompositor", | |
| "--disable-blink-features=AutomationControlled", | |
| "--disable-infobars", | |
| "--disable-background-timer-throttling", | |
| "--disable-popup-blocking", | |
| "--disable-backgrounding-occluded-windows", | |
| "--disable-renderer-backgrounding", | |
| "--disable-window-activation", | |
| "--disable-focus-on-load", | |
| "--no-first-run", | |
| "--no-default-browser-check", | |
| "--window-position=0,0", | |
| ] | |
| ) | |
| # Create context with persistent user data | |
| self.state.context = await self.state.browser.new_context( | |
| viewport={'width': viewport_width, 'height': viewport_height}, | |
| user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' | |
| ) | |
| # Create a new page | |
| self.state.page = await self.state.context.new_page() | |
| self.state.is_running = True | |
| logger.info("Browser initialized successfully") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to initialize browser: {str(e)}") | |
| return False | |
| async def navigate_to_url(self, url: str) -> Dict[str, Any]: | |
| """Navigate to a URL and return status""" | |
| if not self.state.page: | |
| return {"success": False, "message": "Browser not initialized"} | |
| try: | |
| # Add protocol if missing | |
| if not url.startswith(('http://', 'https://')): | |
| url = 'https://' + url | |
| await self.state.page.goto(url, wait_until='networkidle', timeout=30000) | |
| await self.state.page.wait_for_timeout(2000) | |
| # Get page title and URL | |
| title = await self.state.page.title() | |
| current_url = self.state.page.url | |
| self.state.action_history.append(f"Navigated to: {url}") | |
| return { | |
| "success": True, | |
| "message": f"Successfully navigated to {url}", | |
| "title": title, | |
| "current_url": current_url | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to navigate to {url}: {str(e)}") | |
| return {"success": False, "message": f"Failed to navigate: {str(e)}"} | |
| async def take_screenshot(self) -> str: | |
| """Take a screenshot and return base64 encoded image""" | |
| if not self.state.page: | |
| return "" | |
| try: | |
| # Take screenshot | |
| screenshot_bytes = await self.state.page.screenshot(type='png') | |
| # Convert to base64 | |
| base64_image = base64.b64encode(screenshot_bytes).decode('utf-8') | |
| self.state.screenshot_count += 1 | |
| self.state.action_history.append(f"Screenshot taken (Total: {self.state.screenshot_count})") | |
| return base64_image | |
| except Exception as e: | |
| logger.error(f"Failed to take screenshot: {str(e)}") | |
| return "" | |
| async def get_vnc_status(self) -> Dict[str, Any]: | |
| """Get VNC connection status""" | |
| try: | |
| # Check if VNC port is accessible | |
| import socket | |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| result = sock.connect_ex(('localhost', self.state.vnc_port)) | |
| vnc_running = result == 0 | |
| sock.close() | |
| return { | |
| "vnc_running": vnc_running, | |
| "vnc_port": self.state.vnc_port, | |
| "vnc_url": self.vnc_url, | |
| "status": "VNC Server Active" if vnc_running else "VNC Server Not Available" | |
| } | |
| except Exception as e: | |
| return { | |
| "vnc_running": False, | |
| "vnc_port": self.state.vnc_port, | |
| "error": str(e) | |
| } | |
| def get_status(self) -> Dict[str, Any]: | |
| """Get current agent status including VNC info""" | |
| vnc_status = asyncio.run(self.get_vnc_status()) | |
| return { | |
| "is_running": self.state.is_running, | |
| "browser_initialized": self.state.browser is not None, | |
| "page_loaded": self.state.page is not None, | |
| "screenshot_count": self.state.screenshot_count, | |
| "action_history": self.state.action_history[-10:], | |
| "current_url": self.state.page.url if self.state.page else "None", | |
| "vnc_info": vnc_status | |
| } | |
| # Global agent instance | |
| agent = ComputerUsingAgent() | |
| def process_action(action_type: str, **kwargs): | |
| """Process agent actions""" | |
| try: | |
| if action_type == "initialize": | |
| headless = kwargs.get("headless", True) | |
| result = asyncio.run(agent.initialize_browser(headless=headless)) | |
| return "Browser initialized successfully" if result else "Failed to initialize browser" | |
| elif action_type == "navigate": | |
| url = kwargs.get("url", "") | |
| if not url: | |
| return "URL is required" | |
| result = asyncio.run(agent.navigate_to_url(url)) | |
| return result["message"] | |
| elif action_type == "screenshot": | |
| image_base64 = asyncio.run(agent.take_screenshot()) | |
| if image_base64: | |
| return "Screenshot taken successfully", image_base64 | |
| else: | |
| return "Failed to take screenshot" | |
| elif action_type == "status": | |
| status = agent.get_status() | |
| return json.dumps(status, indent=2) | |
| elif action_type == "vnc_status": | |
| vnc_status = asyncio.run(agent.get_vnc_status()) | |
| return json.dumps(vnc_status, indent=2) | |
| else: | |
| return f"Unknown action: {action_type}" | |
| except Exception as e: | |
| logger.error(f"Error processing action {action_type}: {str(e)}") | |
| return f"Error: {str(e)}" | |
| def gradio_interface(): | |
| """Create enhanced Gradio interface with VNC integration""" | |
| with gr.Blocks(title="Enhanced Computer-Using Agent with VNC", theme=gr.themes.Soft()) as interface: | |
| gr.Markdown("# π₯οΈ Enhanced Computer-Using Agent with VNC") | |
| gr.Markdown("π€ **AI-powered browser automation with full desktop environment access**") | |
| with gr.Tab("π Browser Automation"): | |
| with gr.Row(): | |
| initialize_btn = gr.Button("Initialize Browser", variant="primary") | |
| status_btn = gr.Button("Get Status", variant="secondary") | |
| status_display = gr.Textbox(label="Agent Status", lines=8) | |
| with gr.Row(): | |
| url_input = gr.Textbox(label="URL", placeholder="https://example.com") | |
| navigate_btn = gr.Button("Navigate", variant="primary") | |
| navigation_status = gr.Textbox(label="Navigation Status") | |
| with gr.Row(): | |
| screenshot_btn = gr.Button("Take Screenshot", variant="primary") | |
| screenshot_output = gr.Image(label="Current Screenshot") | |
| screenshot_status = gr.Textbox(label="Screenshot Status") | |
| with gr.Tab("π₯οΈ VNC Desktop"): | |
| with gr.Row(): | |
| vnc_status_btn = gr.Button("Check VNC Status", variant="primary") | |
| open_vnc_btn = gr.Button("Open VNC Viewer", variant="secondary") | |
| vnc_status_display = gr.Textbox(label="VNC Status", lines=6) | |
| with gr.Row(): | |
| gr.HTML(""" | |
| <div style="text-align: center; padding: 20px; background-color: #f0f0f0; border-radius: 10px;"> | |
| <h3>π VNC Web Access</h3> | |
| <p>Click the button above to open the VNC web viewer in a new tab</p> | |
| <p><strong>Port:</strong> 5901 | <strong>Password:</strong> computer-agent</p> | |
| </div> | |
| """) | |
| # VNC viewer iframe (placeholder - will be populated dynamically) | |
| vnc_viewer = gr.HTML(""" | |
| <div style="width: 100%; height: 600px; border: 2px solid #ccc; border-radius: 10px; background-color: #f9f9f9;"> | |
| <div style="display: flex; align-items: center; justify-content: center; height: 100%; color: #666;"> | |
| <div style="text-align: center;"> | |
| <h4>π₯οΈ VNC Desktop Environment</h4> | |
| <p>Desktop environment will be accessible here once VNC server is running</p> | |
| <p><em>Use the "Open VNC Viewer" button to access full desktop</em></p> | |
| </div> | |
| </div> | |
| </div> | |
| """) | |
| with gr.Tab("π System Info"): | |
| with gr.Row(): | |
| system_info_btn = gr.Button("Get System Info", variant="primary") | |
| system_info_display = gr.Textbox(label="System Information", lines=10) | |
| with gr.Row(): | |
| gr.HTML(""" | |
| <div style="background-color: #e8f5e8; padding: 20px; border-radius: 10px; margin-top: 20px;"> | |
| <h4>π Features Available</h4> | |
| <ul> | |
| <li>β Browser Automation with Playwright</li> | |
| <li>β Screenshot Capture</li> | |
| <li>β VNC Desktop Environment (XFCE4)</li> | |
| <li>β Web-based VNC Access</li> | |
| <li>β Real-time Status Monitoring</li> | |
| <li>β Action History Tracking</li> | |
| </ul> | |
| </div> | |
| """) | |
| # Event handlers | |
| initialize_btn.click( | |
| fn=lambda: process_action("initialize"), | |
| outputs=status_display | |
| ) | |
| status_btn.click( | |
| fn=lambda: process_action("status"), | |
| outputs=status_display | |
| ) | |
| navigate_btn.click( | |
| fn=lambda url: process_action("navigate", url=url), | |
| inputs=url_input, | |
| outputs=navigation_status | |
| ) | |
| screenshot_btn.click( | |
| fn=lambda: process_action("screenshot"), | |
| outputs=[screenshot_status, screenshot_output] | |
| ) | |
| vnc_status_btn.click( | |
| fn=lambda: process_action("vnc_status"), | |
| outputs=vnc_status_display | |
| ) | |
| open_vnc_btn.click( | |
| fn=lambda: f"window.open('{agent.vnc_url}', '_blank')", | |
| outputs=gr.HTML() | |
| ) | |
| system_info_btn.click( | |
| fn=lambda: json.dumps({ | |
| "platform": "Hugging Face Spaces", | |
| "docker": True, | |
| "vnc_enabled": True, | |
| "desktop_env": "XFCE4", | |
| "python_version": "3.10", | |
| "features": ["browser_automation", "vnc_desktop", "web_interface"] | |
| }, indent=2), | |
| outputs=system_info_display | |
| ) | |
| return interface | |
| if __name__ == "__main__": | |
| # Create and launch enhanced Gradio interface | |
| interface = gradio_interface() | |
| interface.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| debug=True, | |
| show_error=True | |
| ) |