computer-using-agent / computer_agent.py
OsamaBinLikhon's picture
Enhancement: Add VNC desktop environment integration
13bcdd9 verified
#!/usr/bin/env python3
"""
Enhanced Computer-Using Agent with VNC Integration
Combines browser automation with full desktop environment access
"""
import asyncio
import json
import base64
import io
import os
import time
import threading
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from pathlib import Path
import logging
import gradio as gr
import cv2
import numpy as np
from PIL import Image
from playwright.async_api import async_playwright, Browser, BrowserContext, Page
import requests
from huggingface_hub import hf_hub_download, login
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class AgentState:
"""State management for the computer agent"""
browser: Optional[Browser] = None
context: Optional[BrowserContext] = None
page: Optional[Page] = None
is_running: bool = False
screenshot_count: int = 0
action_history: List[str] = None
vnc_port: int = 5901
def __post_init__(self):
if self.action_history is None:
self.action_history = []
class ComputerUsingAgent:
"""Enhanced Computer-Using Agent with VNC Integration"""
def __init__(self):
self.state = AgentState()
self.setup_logging()
self.vnc_url = f"http://localhost:{self.state.vnc_port}/vnc.html"
def setup_logging(self):
"""Setup logging configuration"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('agent.log'),
logging.StreamHandler()
]
)
async def initialize_browser(self, headless: bool = True, viewport_width: int = 1280, viewport_height: int = 720):
"""Initialize browser with specified settings"""
try:
logger.info("Initializing browser...")
playwright = await async_playwright().start()
# Launch browser with enhanced settings
self.state.browser = await playwright.chromium.launch(
headless=headless,
args=[
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-web-security",
"--disable-features=VizDisplayCompositor",
"--disable-blink-features=AutomationControlled",
"--disable-infobars",
"--disable-background-timer-throttling",
"--disable-popup-blocking",
"--disable-backgrounding-occluded-windows",
"--disable-renderer-backgrounding",
"--disable-window-activation",
"--disable-focus-on-load",
"--no-first-run",
"--no-default-browser-check",
"--window-position=0,0",
]
)
# Create context with persistent user data
self.state.context = await self.state.browser.new_context(
viewport={'width': viewport_width, 'height': viewport_height},
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
)
# Create a new page
self.state.page = await self.state.context.new_page()
self.state.is_running = True
logger.info("Browser initialized successfully")
return True
except Exception as e:
logger.error(f"Failed to initialize browser: {str(e)}")
return False
async def navigate_to_url(self, url: str) -> Dict[str, Any]:
"""Navigate to a URL and return status"""
if not self.state.page:
return {"success": False, "message": "Browser not initialized"}
try:
# Add protocol if missing
if not url.startswith(('http://', 'https://')):
url = 'https://' + url
await self.state.page.goto(url, wait_until='networkidle', timeout=30000)
await self.state.page.wait_for_timeout(2000)
# Get page title and URL
title = await self.state.page.title()
current_url = self.state.page.url
self.state.action_history.append(f"Navigated to: {url}")
return {
"success": True,
"message": f"Successfully navigated to {url}",
"title": title,
"current_url": current_url
}
except Exception as e:
logger.error(f"Failed to navigate to {url}: {str(e)}")
return {"success": False, "message": f"Failed to navigate: {str(e)}"}
async def take_screenshot(self) -> str:
"""Take a screenshot and return base64 encoded image"""
if not self.state.page:
return ""
try:
# Take screenshot
screenshot_bytes = await self.state.page.screenshot(type='png')
# Convert to base64
base64_image = base64.b64encode(screenshot_bytes).decode('utf-8')
self.state.screenshot_count += 1
self.state.action_history.append(f"Screenshot taken (Total: {self.state.screenshot_count})")
return base64_image
except Exception as e:
logger.error(f"Failed to take screenshot: {str(e)}")
return ""
async def get_vnc_status(self) -> Dict[str, Any]:
"""Get VNC connection status"""
try:
# Check if VNC port is accessible
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(('localhost', self.state.vnc_port))
vnc_running = result == 0
sock.close()
return {
"vnc_running": vnc_running,
"vnc_port": self.state.vnc_port,
"vnc_url": self.vnc_url,
"status": "VNC Server Active" if vnc_running else "VNC Server Not Available"
}
except Exception as e:
return {
"vnc_running": False,
"vnc_port": self.state.vnc_port,
"error": str(e)
}
def get_status(self) -> Dict[str, Any]:
"""Get current agent status including VNC info"""
vnc_status = asyncio.run(self.get_vnc_status())
return {
"is_running": self.state.is_running,
"browser_initialized": self.state.browser is not None,
"page_loaded": self.state.page is not None,
"screenshot_count": self.state.screenshot_count,
"action_history": self.state.action_history[-10:],
"current_url": self.state.page.url if self.state.page else "None",
"vnc_info": vnc_status
}
# Global agent instance
agent = ComputerUsingAgent()
def process_action(action_type: str, **kwargs):
"""Process agent actions"""
try:
if action_type == "initialize":
headless = kwargs.get("headless", True)
result = asyncio.run(agent.initialize_browser(headless=headless))
return "Browser initialized successfully" if result else "Failed to initialize browser"
elif action_type == "navigate":
url = kwargs.get("url", "")
if not url:
return "URL is required"
result = asyncio.run(agent.navigate_to_url(url))
return result["message"]
elif action_type == "screenshot":
image_base64 = asyncio.run(agent.take_screenshot())
if image_base64:
return "Screenshot taken successfully", image_base64
else:
return "Failed to take screenshot"
elif action_type == "status":
status = agent.get_status()
return json.dumps(status, indent=2)
elif action_type == "vnc_status":
vnc_status = asyncio.run(agent.get_vnc_status())
return json.dumps(vnc_status, indent=2)
else:
return f"Unknown action: {action_type}"
except Exception as e:
logger.error(f"Error processing action {action_type}: {str(e)}")
return f"Error: {str(e)}"
def gradio_interface():
"""Create enhanced Gradio interface with VNC integration"""
with gr.Blocks(title="Enhanced Computer-Using Agent with VNC", theme=gr.themes.Soft()) as interface:
gr.Markdown("# πŸ–₯️ Enhanced Computer-Using Agent with VNC")
gr.Markdown("πŸ€– **AI-powered browser automation with full desktop environment access**")
with gr.Tab("🌐 Browser Automation"):
with gr.Row():
initialize_btn = gr.Button("Initialize Browser", variant="primary")
status_btn = gr.Button("Get Status", variant="secondary")
status_display = gr.Textbox(label="Agent Status", lines=8)
with gr.Row():
url_input = gr.Textbox(label="URL", placeholder="https://example.com")
navigate_btn = gr.Button("Navigate", variant="primary")
navigation_status = gr.Textbox(label="Navigation Status")
with gr.Row():
screenshot_btn = gr.Button("Take Screenshot", variant="primary")
screenshot_output = gr.Image(label="Current Screenshot")
screenshot_status = gr.Textbox(label="Screenshot Status")
with gr.Tab("πŸ–₯️ VNC Desktop"):
with gr.Row():
vnc_status_btn = gr.Button("Check VNC Status", variant="primary")
open_vnc_btn = gr.Button("Open VNC Viewer", variant="secondary")
vnc_status_display = gr.Textbox(label="VNC Status", lines=6)
with gr.Row():
gr.HTML("""
<div style="text-align: center; padding: 20px; background-color: #f0f0f0; border-radius: 10px;">
<h3>🌐 VNC Web Access</h3>
<p>Click the button above to open the VNC web viewer in a new tab</p>
<p><strong>Port:</strong> 5901 | <strong>Password:</strong> computer-agent</p>
</div>
""")
# VNC viewer iframe (placeholder - will be populated dynamically)
vnc_viewer = gr.HTML("""
<div style="width: 100%; height: 600px; border: 2px solid #ccc; border-radius: 10px; background-color: #f9f9f9;">
<div style="display: flex; align-items: center; justify-content: center; height: 100%; color: #666;">
<div style="text-align: center;">
<h4>πŸ–₯️ VNC Desktop Environment</h4>
<p>Desktop environment will be accessible here once VNC server is running</p>
<p><em>Use the "Open VNC Viewer" button to access full desktop</em></p>
</div>
</div>
</div>
""")
with gr.Tab("πŸ“Š System Info"):
with gr.Row():
system_info_btn = gr.Button("Get System Info", variant="primary")
system_info_display = gr.Textbox(label="System Information", lines=10)
with gr.Row():
gr.HTML("""
<div style="background-color: #e8f5e8; padding: 20px; border-radius: 10px; margin-top: 20px;">
<h4>πŸš€ Features Available</h4>
<ul>
<li>βœ… Browser Automation with Playwright</li>
<li>βœ… Screenshot Capture</li>
<li>βœ… VNC Desktop Environment (XFCE4)</li>
<li>βœ… Web-based VNC Access</li>
<li>βœ… Real-time Status Monitoring</li>
<li>βœ… Action History Tracking</li>
</ul>
</div>
""")
# Event handlers
initialize_btn.click(
fn=lambda: process_action("initialize"),
outputs=status_display
)
status_btn.click(
fn=lambda: process_action("status"),
outputs=status_display
)
navigate_btn.click(
fn=lambda url: process_action("navigate", url=url),
inputs=url_input,
outputs=navigation_status
)
screenshot_btn.click(
fn=lambda: process_action("screenshot"),
outputs=[screenshot_status, screenshot_output]
)
vnc_status_btn.click(
fn=lambda: process_action("vnc_status"),
outputs=vnc_status_display
)
open_vnc_btn.click(
fn=lambda: f"window.open('{agent.vnc_url}', '_blank')",
outputs=gr.HTML()
)
system_info_btn.click(
fn=lambda: json.dumps({
"platform": "Hugging Face Spaces",
"docker": True,
"vnc_enabled": True,
"desktop_env": "XFCE4",
"python_version": "3.10",
"features": ["browser_automation", "vnc_desktop", "web_interface"]
}, indent=2),
outputs=system_info_display
)
return interface
if __name__ == "__main__":
# Create and launch enhanced Gradio interface
interface = gradio_interface()
interface.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
debug=True,
show_error=True
)