Spaces:
Running
Running
| """ | |
| app.py β Enhanced Open Computer Agent v2.0 | |
| ========================================== | |
| Powered by smolagents + E2B + Playwright + Multi-Model Router + Memory + SoM + Voice | |
| """ | |
| import os | |
| import json | |
| import time | |
| import uuid | |
| import shutil | |
| import base64 | |
| from io import BytesIO | |
| from threading import Timer | |
| from typing import Any, Dict, List, Optional, Generator | |
| from datetime import datetime | |
| import gradio as gr | |
| from dotenv import load_dotenv | |
| from e2b_desktop import Sandbox | |
| from huggingface_hub import login, upload_folder | |
| from PIL import Image | |
| from smolagents import CodeAgent | |
| from smolagents.gradio_ui import GradioUI, stream_to_gradio | |
| try: | |
| from gradio_modal import Modal | |
| HAS_GRADIO_MODAL = True | |
| except ImportError: | |
| HAS_GRADIO_MODAL = False | |
| Modal = None | |
| # Our enhanced modules | |
| from core_agent import ( | |
| AgentConfig, | |
| IntelligenceRouter, | |
| HierarchicalPlanner, | |
| VerifierAgent, | |
| AgentMemory, | |
| SoMPreprocessor, | |
| SessionRecorder, | |
| HITLCheckpoint, | |
| CostTracker, | |
| ModelCall, | |
| Subtask, | |
| ) | |
| from mcp_tools import ( | |
| BrowserMCP, | |
| CodeExecutionMCP, | |
| FileSystemMCP, | |
| HFHubMCP, | |
| make_browser_tools, | |
| make_code_tools, | |
| make_fs_tools, | |
| make_hf_tools, | |
| ) | |
| from voice_interface import VoiceInterface | |
| from eval_harness import EvaluationHarness, DEFAULT_BENCHMARKS | |
| load_dotenv(override=True) | |
| # ============================================================================= | |
| # Config & Globals | |
| # ============================================================================= | |
| E2B_API_KEY = os.getenv("E2B_API_KEY") | |
| SANDBOXES: Dict[str, Sandbox] = {} | |
| SANDBOX_METADATA: Dict[str, Dict[str, float]] = {} | |
| SANDBOX_TIMEOUT = 600 | |
| WIDTH = 1024 | |
| HEIGHT = 768 | |
| TMP_DIR = "./tmp/" | |
| os.makedirs(TMP_DIR, exist_ok=True) | |
| hf_token = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_API_KEY") | |
| if hf_token: | |
| login(token=hf_token) | |
| # Global enhanced components (lazy init per session) | |
| SESSION_COMPONENTS: Dict[str, Dict[str, Any]] = {} | |
| ACTIVE_AGENTS: Dict[str, Any] = {} # session_uuid -> agent for interrupt | |
| # UI warnings collected at import time | |
| STARTUP_WARNINGS: List[str] = [] | |
| if not E2B_API_KEY: | |
| STARTUP_WARNINGS.append("β οΈ E2B_API_KEY not set. Desktop automation will be unavailable.") | |
| if not hf_token: | |
| STARTUP_WARNINGS.append("β οΈ HF_TOKEN not set. Model inference and Hub tools may be limited.") | |
| # ============================================================================= | |
| # CSS & HTML Templates | |
| # ============================================================================= | |
| custom_css = """ | |
| .modal-container { margin: var(--size-16) auto !important; } | |
| .sandbox-container { position: relative; width: 910px; overflow: hidden; margin: auto; height: 800px; } | |
| .sandbox-frame { display: none; position: absolute; top: 0; left: 0; width: 910px; height: 800px; pointer-events: none; } | |
| .sandbox-iframe, .bsod-image { position: absolute; width: <<WIDTH>>px; height: <<HEIGHT>>px; border: 4px solid #444444; transform-origin: 0 0; } | |
| .primary-color-label label span { font-weight: bold; color: var(--color-accent); } | |
| .status-bar { display: flex; flex-direction: row; align-items: center; z-index: 100; } | |
| .status-indicator { width: 15px; height: 15px; border-radius: 50%; } | |
| .status-text { font-size: 16px; font-weight: bold; padding-left: 8px; text-shadow: none; } | |
| .status-interactive { background-color: #2ecc71; animation: blink 2s infinite; } | |
| .status-view-only { background-color: #e74c3c; } | |
| .status-error { background-color: #e74c3c; animation: blink-error 1s infinite; } | |
| @keyframes blink-error { 0% { background-color: rgba(231, 76, 60, 1); } 50% { background-color: rgba(231, 76, 60, 0.4); } 100% { background-color: rgba(231, 76, 60, 1); } } | |
| @keyframes blink { 0% { background-color: rgba(46, 204, 113, 1); } 50% { background-color: rgba(46, 204, 113, 0.4); } 100% { background-color: rgba(46, 204, 113, 1); } } | |
| #chatbot { height: 1000px !important; } | |
| #chatbot .role { max-width: 95%; } | |
| #chatbot .bubble-wrap { overflow-y: visible; } | |
| .logo-container { display: flex; flex-direction: column; align-items: flex-start; width: 100%; box-sizing: border-box; gap: 5px; } | |
| .logo-item { display: flex; align-items: center; padding: 0 30px; gap: 10px; text-decoration: none !important; color: #f59e0b; font-size: 17px; } | |
| .logo-item:hover { color: #935f06 !important; } | |
| .thought-stream { font-family: monospace; font-size: 13px; background: #1a1a2e; color: #a0c4ff; padding: 10px; border-radius: 8px; max-height: 300px; overflow-y: auto; white-space: pre-wrap; } | |
| .plan-checklist { background: #16213e; padding: 10px; border-radius: 8px; } | |
| .plan-checklist li { list-style: none; margin: 4px 0; } | |
| .plan-checklist li.done::before { content: "β "; } | |
| .plan-checklist li.pending::before { content: "β¬ "; } | |
| .plan-checklist li.running::before { content: "π "; } | |
| .plan-checklist li.failed::before { content: "β "; } | |
| .cost-badge { font-family: monospace; background: #0f3460; color: #e94560; padding: 4px 8px; border-radius: 4px; font-size: 12px; } | |
| """.replace("<<WIDTH>>", str(WIDTH + 15)).replace("<<HEIGHT>>", str(HEIGHT + 10)) | |
| footer_html = """ | |
| <h3 style="text-align: center; margin-top:50px;"><i>Powered by open source:</i></h2> | |
| <link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/4.7.0/css/font-awesome.min.css"> | |
| <div class="logo-container"> | |
| <a class="logo-item" href="https://github.com/huggingface/smolagents"><i class="fa fa-github"></i>smolagents</a> | |
| <a class="logo-item" href="https://huggingface.co/Qwen/Qwen2.5-VL-72B-Instruct"><i class="fa fa-github"></i>Qwen2.5-VL</a> | |
| <a class="logo-item" href="https://github.com/e2b-dev/desktop"><i class="fa fa-github"></i>E2B Desktop</a> | |
| <a class="logo-item" href="https://playwright.dev"><i class="fa fa-github"></i>Playwright</a> | |
| </div> | |
| """ | |
| sandbox_html_template = """ | |
| <style>@import url('https://fonts.googleapis.com/css2?family=Oxanium:wght@200..800&display=swap');</style> | |
| <h1 style="color:var(--color-accent);margin:0;">Open Computer Agent v2.0 β <i>Enhanced</i></h1> | |
| <div class="sandbox-container" style="margin:0;"> | |
| <div class="status-bar"> | |
| <div class="status-indicator {status_class}"></div> | |
| <div class="status-text">{status_text}</div> | |
| </div> | |
| <iframe id="sandbox-iframe" src="{stream_url}" class="sandbox-iframe" style="display:block;" allowfullscreen></iframe> | |
| <img src="https://huggingface.co/datasets/mfarre/servedfiles/resolve/main/blue_screen_of_death.gif" class="bsod-image" style="display:none;"/> | |
| <img src="https://huggingface.co/datasets/m-ric/images/resolve/main/HUD_thom.png" class="sandbox-frame" /> | |
| </div> | |
| """ | |
| custom_js = """function() { | |
| document.body.classList.add('dark'); | |
| const checkSandboxTimeout = function() { | |
| const timeElement = document.getElementById('sandbox-creation-time'); | |
| if (timeElement) { | |
| const creationTime = parseFloat(timeElement.getAttribute('data-time')); | |
| const timeoutValue = parseFloat(timeElement.getAttribute('data-timeout')); | |
| const currentTime = Math.floor(Date.now() / 1000); | |
| const elapsedTime = currentTime - creationTime; | |
| if (elapsedTime >= timeoutValue) { showBSOD('Error'); return; } | |
| } | |
| setTimeout(checkSandboxTimeout, 5000); | |
| }; | |
| const showBSOD = function(statusText = 'Error') { | |
| const iframe = document.getElementById('sandbox-iframe'); | |
| const bsod = document.querySelector('.bsod-image'); | |
| if (iframe && bsod) { iframe.style.display = 'none'; bsod.style.display = 'block'; } | |
| }; | |
| const resetBSOD = function() { | |
| const iframe = document.getElementById('sandbox-iframe'); | |
| const bsod = document.querySelector('.bsod-image'); | |
| if (iframe && bsod && bsod.style.display === 'block') { | |
| iframe.style.display = 'block'; bsod.style.display = 'none'; return true; | |
| } | |
| return false; | |
| }; | |
| checkSandboxTimeout(); | |
| document.addEventListener('click', function(e) { | |
| if (e.target.tagName === 'BUTTON') { | |
| if (e.target.innerText.includes("Let's go") || e.target.innerText.includes("Run")) { resetBSOD(); } | |
| } | |
| }); | |
| const params = new URLSearchParams(window.location.search); | |
| if (!params.has('__theme')) { params.set('__theme', 'dark'); window.location.search = params.toString(); } | |
| }""" | |
| # ============================================================================= | |
| # Sandbox Lifecycle | |
| # ============================================================================= | |
| def upload_to_hf_and_remove(folder_path: str) -> str: | |
| repo_id = "smolagents/computer-agent-logs" | |
| try: | |
| folder_name = os.path.basename(os.path.normpath(folder_path)) | |
| url = upload_folder( | |
| folder_path=folder_path, repo_id=repo_id, repo_type="dataset", | |
| path_in_repo=folder_name, ignore_patterns=[".git/*", ".gitignore"], | |
| ) | |
| shutil.rmtree(folder_path) | |
| return url | |
| except Exception as e: | |
| print(f"Upload error: {e}") | |
| return "" | |
| def cleanup_sandboxes() -> None: | |
| current_time = time.time() | |
| to_remove = [sid for sid, meta in SANDBOX_METADATA.items() if current_time - meta["last_accessed"] > SANDBOX_TIMEOUT] | |
| for sid in to_remove: | |
| if sid in SANDBOXES: | |
| try: | |
| data_dir = os.path.join(TMP_DIR, sid) | |
| if os.path.exists(data_dir): | |
| upload_to_hf_and_remove(data_dir) | |
| SANDBOXES[sid].kill() | |
| del SANDBOXES[sid] | |
| del SANDBOX_METADATA[sid] | |
| print(f"Cleaned up sandbox {sid}") | |
| except Exception as e: | |
| print(f"Cleanup error for {sid}: {e}") | |
| def get_or_create_sandbox(session_uuid: str) -> Optional[Sandbox]: | |
| if not E2B_API_KEY: | |
| return None | |
| current_time = time.time() | |
| if session_uuid in SANDBOXES and session_uuid in SANDBOX_METADATA: | |
| if current_time - SANDBOX_METADATA[session_uuid]["created_at"] < SANDBOX_TIMEOUT: | |
| SANDBOX_METADATA[session_uuid]["last_accessed"] = current_time | |
| return SANDBOXES[session_uuid] | |
| if session_uuid in SANDBOXES: | |
| try: | |
| SANDBOXES[session_uuid].kill() | |
| except Exception: | |
| pass | |
| desktop = Sandbox( | |
| api_key=E2B_API_KEY, resolution=(WIDTH, HEIGHT), dpi=96, | |
| timeout=SANDBOX_TIMEOUT, template="k0wmnzir0zuzye6dndlw", | |
| ) | |
| desktop.stream.start(require_auth=True) | |
| setup_cmd = """sudo mkdir -p /usr/lib/firefox-esr/distribution && echo '{"policies":{"OverrideFirstRunPage":"","OverridePostUpdatePage":"","DisableProfileImport":true,"DontCheckDefaultBrowser":true}}' | sudo tee /usr/lib/firefox-esr/distribution/policies.json > /dev/null""" | |
| desktop.commands.run(setup_cmd) | |
| SANDBOXES[session_uuid] = desktop | |
| SANDBOX_METADATA[session_uuid] = {"created_at": current_time, "last_accessed": current_time} | |
| return desktop | |
| def update_html(interactive_mode: bool, session_uuid: str) -> str: | |
| desktop = get_or_create_sandbox(session_uuid) | |
| if desktop is None: | |
| no_key_html = ( | |
| '<div style="padding:20px; background:#1a1a2e; color:#e94560; border-radius:8px; text-align:center;">' | |
| '<h3>π E2B_API_KEY Required</h3>' | |
| '<p>Desktop automation is unavailable because <code>E2B_API_KEY</code> is not configured.</p>' | |
| '<p>Please add it in <b>Space Settings β Secrets</b> and restart the Space.</p>' | |
| '</div>' | |
| ) | |
| return no_key_html | |
| auth_key = desktop.stream.get_auth_key() | |
| base_url = desktop.stream.get_url(auth_key=auth_key) | |
| stream_url = base_url if interactive_mode else f"{base_url}&view_only=true" | |
| status_class = "status-interactive" if interactive_mode else "status-view-only" | |
| status_text = "Interactive" if interactive_mode else "Agent running..." | |
| creation_time = SANDBOX_METADATA.get(session_uuid, {}).get("created_at", time.time()) | |
| html = sandbox_html_template.format( | |
| stream_url=stream_url, status_class=status_class, status_text=status_text, | |
| ) | |
| html += f'<div id="sandbox-creation-time" style="display:none;" data-time="{creation_time}" data-timeout="{SANDBOX_TIMEOUT}"></div>' | |
| return html | |
| # ============================================================================= | |
| # Enhanced Agent Factory | |
| # ============================================================================= | |
| def build_session_components(session_uuid: str, data_dir: str) -> Dict[str, Any]: | |
| """Initialize all enhanced components for a session.""" | |
| cfg = AgentConfig(hf_token=hf_token, cost_budget_usd=2.0) | |
| # Core intelligence | |
| router = IntelligenceRouter(hf_token=hf_token) | |
| planner = HierarchicalPlanner(router) | |
| verifier = VerifierAgent(router) | |
| memory = AgentMemory(persist_dir=f"./memory_db/{session_uuid}") | |
| som = SoMPreprocessor(use_icon_detection=False) | |
| hitl = HITLCheckpoint(auto_approve=False) | |
| tracker = CostTracker() | |
| recorder = SessionRecorder(session_uuid, output_dir=data_dir) | |
| voice = VoiceInterface(hf_token=hf_token) | |
| # MCP tools | |
| try: | |
| browser_mcp = BrowserMCP(headless=True) | |
| except Exception: | |
| browser_mcp = None | |
| try: | |
| code_mcp = CodeExecutionMCP(api_key=E2B_API_KEY) | |
| except Exception: | |
| code_mcp = None | |
| fs_mcp = FileSystemMCP(base_dir=data_dir) | |
| try: | |
| hf_mcp = HFHubMCP(token=hf_token) | |
| except Exception: | |
| hf_mcp = None | |
| components = { | |
| "config": cfg, | |
| "router": router, | |
| "planner": planner, | |
| "verifier": verifier, | |
| "memory": memory, | |
| "som": som, | |
| "hitl": hitl, | |
| "tracker": tracker, | |
| "recorder": recorder, | |
| "voice": voice, | |
| "browser_mcp": browser_mcp, | |
| "code_mcp": code_mcp, | |
| "fs_mcp": fs_mcp, | |
| "hf_mcp": hf_mcp, | |
| } | |
| SESSION_COMPONENTS[session_uuid] = components | |
| return components | |
| # ============================================================================= | |
| # Streaming Agent Runner with Plan + Thought Visibility | |
| # ============================================================================= | |
| def run_enhanced_agent( | |
| task_input: str, | |
| session_uuid: str, | |
| use_planner: bool = True, | |
| use_verifier: bool = True, | |
| use_som: bool = False, | |
| use_browser_mcp: bool = True, | |
| consent_storage: bool = True, | |
| ) -> Generator[Any, None, None]: | |
| """Yields (chat_messages, plan_markdown, cost_html) tuples.""" | |
| # Early guard for missing E2B key | |
| if not E2B_API_KEY: | |
| yield [ | |
| {"role": "user", "content": task_input}, | |
| {"role": "assistant", "content": ( | |
| "π **Desktop automation unavailable**\n\n" | |
| "The agent needs an **E2B_API_KEY** to launch a sandboxed desktop.\n\n" | |
| "**How to fix:**\n" | |
| "1. Go to [e2b.dev](https://e2b.dev) and create a free API key\n" | |
| "2. Open this Space's **Settings β Secrets** tab\n" | |
| "3. Add a secret with name `E2B_API_KEY` and your key as the value\n" | |
| "4. Restart the Space (Factory Rebuild)\n\n" | |
| "Once configured, the agent can browse, click, type, and run code in a real desktop environment." | |
| )}, | |
| ], "*No plan β E2B key missing*", '<span class="cost-badge">Cost: $0.0000 / $2.00</span>' | |
| return | |
| try: | |
| interaction_id = f"{session_uuid}_{int(time.time())}" | |
| data_dir = os.path.join(TMP_DIR, interaction_id) | |
| os.makedirs(data_dir, exist_ok=True) | |
| desktop = get_or_create_sandbox(session_uuid) | |
| if desktop is None: | |
| yield [{"role": "assistant", "content": "π₯ Failed to initialize E2B sandbox. Please check E2B_API_KEY and try again."}], "*Sandbox failed*", '<span class="cost-badge">Cost: $0.0000 / $2.00</span>' | |
| return | |
| comps = build_session_components(session_uuid, data_dir) | |
| tracker: CostTracker = comps["tracker"] | |
| recorder: SessionRecorder = comps["recorder"] | |
| planner: HierarchicalPlanner = comps["planner"] | |
| verifier: VerifierAgent = comps["verifier"] | |
| memory: AgentMemory = comps["memory"] | |
| hitl: HITLCheckpoint = comps["hitl"] | |
| router: IntelligenceRouter = comps["router"] | |
| som: SoMPreprocessor = comps["som"] | |
| browser_mcp: BrowserMCP = comps["browser_mcp"] | |
| tracker.start_task(interaction_id) | |
| plan_md: str = "*Plan will appear here...*" | |
| cost_html: str = '<span class="cost-badge">Cost: $0.0000 / $2.00</span>' | |
| messages: List[Any] = [] | |
| messages.append({"role": "user", "content": task_input}) | |
| yield messages.copy(), plan_md, cost_html | |
| # ---- PLANNING PHASE ---- | |
| plan = None | |
| if use_planner: | |
| messages.append({ | |
| "role": "assistant", | |
| "content": f"π§ **Planning...** Breaking down: *{task_input}*", | |
| }) | |
| yield messages.copy(), plan_md, cost_html, plan_md, cost_html | |
| # Retrieve similar past tasks | |
| similar = memory.retrieve_similar(task_input, n_results=2) | |
| context = "" | |
| if similar: | |
| context = "Previous successful strategies:\n" + "\n".join( | |
| f"- {s.get('strategy_summary', '')}" for s in similar | |
| ) | |
| plan = planner.plan(task_input, context=context) | |
| plan_md = "π **Plan**\n" | |
| for st in plan.subtasks: | |
| plan_md += f"- β¬ [{st.strategy}] {st.description}\n" | |
| messages.append({"role": "assistant", "content": plan_md}) | |
| yield messages.copy(), plan_md, cost_html, plan_md, cost_html | |
| # ---- EXECUTION PHASE ---- | |
| # Bridge E2BVisionAgent with our IntelligenceRouter for multi-model support. | |
| from e2bqwen import E2BVisionAgent | |
| # Use the IntelligenceRouter (already initialized) as the vision model. | |
| # It auto-selects the cheapest capable model and tracks cost. | |
| vision_model = router | |
| agent = E2BVisionAgent( | |
| model=vision_model, | |
| data_dir=data_dir, | |
| desktop=desktop, | |
| max_steps=100, | |
| verbosity_level=2, | |
| use_v1_prompt=True, | |
| ) | |
| ACTIVE_AGENTS[session_uuid] = agent | |
| # Inject MCP browser tools if enabled | |
| if use_browser_mcp: | |
| try: | |
| browser_mcp.start() | |
| mcp_tools = make_browser_tools(browser_mcp) | |
| # Merge into agent.tools | |
| for name, fn in mcp_tools.items(): | |
| agent.tools[name] = fn | |
| messages.append({ | |
| "role": "assistant", | |
| "content": "π **Playwright MCP connected.** Browser automation ready.", | |
| }) | |
| yield messages.copy(), plan_md, cost_html, plan_md, cost_html | |
| except Exception as e: | |
| messages.append({ | |
| "role": "assistant", | |
| "content": f"β οΈ Playwright MCP unavailable: {e}. Using vision-only fallback.", | |
| }) | |
| yield messages.copy(), plan_md, cost_html, plan_md, cost_html | |
| # Inject HF Hub tools | |
| try: | |
| hf_tools = make_hf_tools(comps["hf_mcp"]) | |
| for name, fn in hf_tools.items(): | |
| agent.tools[name] = fn | |
| except Exception: | |
| pass | |
| # Take initial screenshot | |
| screenshot_bytes = desktop.screenshot(format="bytes") | |
| initial_screenshot = Image.open(BytesIO(screenshot_bytes)) | |
| # SoM preprocessing on initial screenshot (optional) | |
| if use_som: | |
| annotated, registry = som.preprocess(initial_screenshot) | |
| annotated_path = os.path.join(data_dir, "som_initial.png") | |
| annotated.save(annotated_path) | |
| messages.append({ | |
| "role": "assistant", | |
| "content": {"path": annotated_path, "mime_type": "image/png"}, | |
| }) | |
| yield messages.copy(), plan_md, cost_html, plan_md, cost_html | |
| # Execute task with streaming | |
| step_count = 0 | |
| try: | |
| for msg in stream_to_gradio( | |
| agent, task=task_input, task_images=[initial_screenshot], reset_agent_memory=False, | |
| ): | |
| step_count += 1 | |
| # Thought streaming: inject router cost status | |
| if step_count % 5 == 0: | |
| cost_report = router.get_cost_report() | |
| cost_text = f"π° Cost: ${cost_report['spent_usd']:.4f} / ${cost_report['budget_usd']:.2f} | Calls: {cost_report['calls']}" | |
| messages.append({"role": "assistant", "content": cost_text}) | |
| # Sync to tracker | |
| tracker.tasks[interaction_id] = [ModelCall(model_id='sync', cost_usd=cost_report['spent_usd'])] | |
| cost_html = f'<span class="cost-badge">Cost: ${cost_report["spent_usd"]:.4f} / ${cost_report["budget_usd"]:.2f}</span>' | |
| yield messages.copy(), plan_md, cost_html, plan_md, cost_html | |
| # Append screenshots | |
| if hasattr(agent, "last_marked_screenshot") and getattr(msg, "content", None) == "-----": | |
| try: | |
| img = agent.last_marked_screenshot | |
| img_path = getattr(img, "path", str(img)) | |
| messages.append({ | |
| "role": "assistant", | |
| "content": {"path": img_path, "mime_type": "image/png"}, | |
| }) | |
| except Exception: | |
| pass | |
| # Convert smolagents message to dict if needed | |
| if hasattr(msg, "role") and hasattr(msg, "content"): | |
| messages.append({"role": msg.role, "content": msg.content}) | |
| else: | |
| messages.append({"role": "assistant", "content": str(msg)}) | |
| yield messages.copy(), plan_md, cost_html, plan_md, cost_html | |
| # HITL check every step | |
| if hasattr(agent, "memory") and agent.memory.steps: | |
| last_step = agent.memory.steps[-1] | |
| if hasattr(last_step, "tool_calls") and last_step.tool_calls: | |
| action_str = str(last_step.tool_calls[0]) | |
| approved, reason = hitl.check_action(action_str) | |
| if not approved: | |
| messages.append({ | |
| "role": "assistant", | |
| "content": f"π **HITL Checkpoint:** {reason}\nPlease approve or modify the action.", | |
| }) | |
| yield messages.copy(), plan_md, cost_html, plan_md, cost_html | |
| # In a real implementation we'd pause here for user input | |
| # For now, auto-continue after logging | |
| time.sleep(0.5) | |
| # ---- VERIFICATION PHASE ---- | |
| if use_verifier and plan: | |
| messages.append({"role": "assistant", "content": "π **Verifying task completion...**"}) | |
| yield messages.copy(), plan_md, cost_html, plan_md, cost_html | |
| final_screenshot_bytes = desktop.screenshot(format="bytes") | |
| final_screenshot = Image.open(BytesIO(final_screenshot_bytes)) | |
| trace = [str(s) for s in agent.memory.steps[-20:]] | |
| for st in plan.subtasks: | |
| result = verifier.verify(st, trace, final_screenshot) | |
| status_icon = "β " if result.get("success") else "β" | |
| messages.append({ | |
| "role": "assistant", | |
| "content": f"{status_icon} **{st.description}** β {result.get('reason', '')}", | |
| }) | |
| yield messages.copy(), plan_md, cost_html, plan_md, cost_html | |
| # Final summary | |
| final_output = agent.memory.steps[-1].observations if agent.memory.steps else "Task completed." | |
| memory.add_task( | |
| task=task_input, | |
| strategy_summary=f"Completed in {step_count} steps. Final: {str(final_output)[:200]}", | |
| success=True, | |
| domain=plan.subtasks[0].strategy if plan and plan.subtasks else "general", | |
| ) | |
| # Cost report | |
| report = tracker.get_task_report(interaction_id) | |
| # Sync router history into tracker | |
| for call in router.call_history: | |
| tracker.log_call(interaction_id, call) | |
| cost_summary = ( | |
| f"π **Task Complete**\n" | |
| f"- Steps: {step_count}\n" | |
| f"- Cost: ${report['total_cost_usd']:.4f}\n" | |
| f"- Tokens: {report['total_tokens']}\n" | |
| f"- Avg latency: {report['avg_latency_ms']}ms" | |
| ) | |
| messages.append({"role": "assistant", "content": cost_summary}) | |
| cost_html = f'<span class="cost-badge">Cost: ${report["total_cost_usd"]:.4f} / $2.00</span>' | |
| yield messages.copy(), plan_md, cost_html, plan_md, cost_html | |
| if consent_storage: | |
| from e2bqwen import get_agent_summary_erase_images | |
| summary = get_agent_summary_erase_images(agent) | |
| with open(os.path.join(data_dir, "metadata.json"), "w") as f: | |
| json.dump({"status": "completed", "summary": summary, "cost_report": report}, f, default=str) | |
| upload_to_hf_and_remove(data_dir) | |
| except Exception as e: | |
| error_msg = f"Error: {str(e)}" | |
| messages.append({"role": "assistant", "content": f"π₯ **Run failed:**\n{error_msg}"}) | |
| total_cost = sum( | |
| c.cost_usd for calls in tracker.tasks.values() for c in calls | |
| ) | |
| cost_html = f'<span class="cost-badge">Cost: ${total_cost:.4f} / $2.00</span>' | |
| yield messages.copy(), plan_md, cost_html, plan_md, cost_html | |
| if consent_storage: | |
| with open(os.path.join(data_dir, "metadata.json"), "w") as f: | |
| json.dump({"status": "failed", "error": error_msg}, f) | |
| upload_to_hf_and_remove(data_dir) | |
| finally: | |
| try: | |
| if browser_mcp: | |
| browser_mcp.close() | |
| except Exception: | |
| pass | |
| except Exception as outer_e: | |
| # Catch-all for setup errors so Gradio doesn't show generic "Error" | |
| yield [{"role": "assistant", "content": f"π₯ **Setup failed:** {outer_e}"}], "*Setup failed*", '<span class="cost-badge">Cost: $0.0000 / $2.00</span>' | |
| # ============================================================================= | |
| # Gradio UI | |
| # ============================================================================= | |
| theme = gr.themes.Default(font=["Oxanium", "sans-serif"], primary_hue="amber", secondary_hue="blue") | |
| with gr.Blocks(title="Computer Agent v2.0") as demo: | |
| session_uuid_state = gr.State(None) | |
| # Startup configuration warnings | |
| if STARTUP_WARNINGS: | |
| gr.HTML( | |
| value='<div style="padding:12px; background:#2c1a1a; color:#ff6b6b; border-left:4px solid #ff6b6b; margin-bottom:12px;">' | |
| + "<br>".join(STARTUP_WARNINGS) | |
| + '</div>' | |
| ) | |
| with gr.Row(): | |
| # Main sandbox view | |
| sandbox_html = gr.HTML( | |
| value=sandbox_html_template.format(stream_url="", status_class="status-interactive", status_text="Interactive"), | |
| label="Desktop", | |
| ) | |
| with gr.Sidebar(position="left"): | |
| if HAS_GRADIO_MODAL and Modal: | |
| with Modal(visible=True) as modal: | |
| gr.Markdown(""" | |
| ### π₯οΈ Open Computer Agent v2.0 | |
| Welcome to the **enhanced** computer agent powered by: | |
| - **Multi-Model Router** (auto-selects cheapest capable model) | |
| - **Playwright MCP** (semantic browser control) | |
| - **Hierarchical Planner** + **Verifier** | |
| - **Set-of-Marks Vision** + **Long-Term Memory** | |
| - **Voice I/O** + **Human-in-the-Loop** | |
| - **Cost Dashboard** + **Session Recording** | |
| π Type a task, hit **Run**, and watch the agent think, plan, and execute. | |
| """) | |
| else: | |
| with gr.Accordion("π₯οΈ Open Computer Agent v2.0 β Click to expand info", open=False): | |
| gr.Markdown(""" | |
| Welcome to the **enhanced** computer agent powered by: | |
| - **Multi-Model Router** (auto-selects cheapest capable model) | |
| - **Playwright MCP** (semantic browser control) | |
| - **Hierarchical Planner** + **Verifier** | |
| - **Set-of-Marks Vision** + **Long-Term Memory** | |
| - **Voice I/O** + **Human-in-the-Loop** | |
| - **Cost Dashboard** + **Session Recording** | |
| π Type a task, hit **Run**, and watch the agent think, plan, and execute. | |
| """) | |
| task_input = gr.Textbox( | |
| value="Find me pictures of cute puppies", | |
| label="Enter your task:", | |
| elem_classes="primary-color-label", | |
| ) | |
| with gr.Row(): | |
| run_btn = gr.Button("π Let's go!", variant="primary") | |
| voice_input = gr.Audio(sources=["microphone"], type="numpy", label="Or speak your task") | |
| gr.Examples( | |
| examples=[ | |
| "Use Google Maps to find the Hugging Face HQ in Paris", | |
| "Go to Wikipedia and find what happened on April 4th", | |
| "Find train travel time from Bern to Basel on Google Maps", | |
| "Go to Hugging Face Spaces, find flux.1 schnell, generate an image of a GPU", | |
| "Search HF Hub for top text-to-video models and list them", | |
| "Open GitHub trending and find the top Python repo today", | |
| ], | |
| inputs=task_input, | |
| label="Example Tasks", | |
| examples_per_page=6, | |
| ) | |
| with gr.Accordion("βοΈ Advanced Options", open=False): | |
| use_planner_cb = gr.Checkbox(label="Use Hierarchical Planner", value=True) | |
| use_verifier_cb = gr.Checkbox(label="Use Verifier", value=True) | |
| use_som_cb = gr.Checkbox(label="Use Set-of-Marks Vision", value=False) | |
| use_browser_cb = gr.Checkbox(label="Use Playwright Browser MCP", value=True) | |
| consent_storage_cb = gr.Checkbox(label="Store task & agent trace?", value=True) | |
| auto_approve_cb = gr.Checkbox(label="Auto-approve all actions (disable HITL)", value=False) | |
| session_state = gr.State({}) | |
| stored_messages = gr.State([]) | |
| # Cost display | |
| cost_display = gr.HTML(value='<span class="cost-badge">Cost: $0.0000 / $2.00</span>', label="Cost Tracker") | |
| gr.Markdown(""" | |
| - **Data**: Uncheck storage to opt-out. No personal data please. | |
| - **Captcha**: VMs may get flagged. Interrupt and solve manually if needed. | |
| - **HITL**: Sensitive actions pause for approval unless auto-approve is on. | |
| - **Restart**: Refresh the page if the agent seems stuck. | |
| """) | |
| footer = gr.HTML(value=footer_html) | |
| # Thought stream + logs | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| plan_display = gr.Markdown(label="π Plan", value="*Plan will appear here...*") | |
| with gr.Column(scale=2): | |
| chatbot_display = gr.Chatbot( | |
| elem_id="chatbot", | |
| label="Agent's Execution Logs", | |
| avatar_images=( | |
| None, | |
| "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png", | |
| ), | |
| resizable=True, | |
| ) | |
| stop_btn = gr.Button("π Stop the agent!", variant="huggingface") | |
| # ---- Event Wiring ---- | |
| def clear_and_set_view_only(task_input, session_uuid): | |
| return update_html(False, session_uuid) | |
| def set_interactive(session_uuid): | |
| return update_html(True, session_uuid) | |
| def reactivate_stop(): | |
| return gr.Button("π Stop the agent!", variant="huggingface") | |
| def update_cost_display(): | |
| # Aggregate cost from all sessions | |
| total = 0.0 | |
| for comps in SESSION_COMPONENTS.values(): | |
| total += comps.get("router", IntelligenceRouter(hf_token=hf_token)).cost_so_far_usd | |
| return f'<span class="cost-badge">Cost: ${total:.4f} / $2.00</span>' | |
| def process_voice(audio_tuple, session_uuid): | |
| if audio_tuple is None: | |
| return "" | |
| comps = SESSION_COMPONENTS.get(session_uuid) | |
| if not comps: | |
| # Build minimal components | |
| data_dir = os.path.join(TMP_DIR, session_uuid) | |
| comps = build_session_components(session_uuid, data_dir) | |
| voice: VoiceInterface = comps["voice"] | |
| try: | |
| text = voice.process_gradio_audio(audio_tuple) | |
| return text | |
| except Exception as e: | |
| return f"[Voice error: {e}]" | |
| def interrupt_agent(session_state): | |
| for sid, agent in list(ACTIVE_AGENTS.items()): | |
| if hasattr(agent, "interrupt") and hasattr(agent, "interrupt_switch") and not agent.interrupt_switch: | |
| try: | |
| agent.interrupt() | |
| del ACTIVE_AGENTS[sid] | |
| return gr.Button("Stopping agent...", variant="secondary") | |
| except Exception: | |
| pass | |
| return gr.Button("π Stop the agent!", variant="huggingface") | |
| # Voice -> textbox | |
| voice_input.stop_recording( | |
| fn=process_voice, | |
| inputs=[voice_input, session_uuid_state], | |
| outputs=[task_input], | |
| ) | |
| # Run button chain | |
| run_event = ( | |
| run_btn.click( | |
| fn=clear_and_set_view_only, | |
| inputs=[task_input, session_uuid_state], | |
| outputs=[sandbox_html], | |
| ) | |
| .then( | |
| fn=run_enhanced_agent, | |
| inputs=[ | |
| task_input, | |
| session_uuid_state, | |
| use_planner_cb, | |
| use_verifier_cb, | |
| use_som_cb, | |
| use_browser_cb, | |
| consent_storage_cb, | |
| ], | |
| outputs=[chatbot_display, plan_display, cost_display], | |
| ) | |
| .then(fn=set_interactive, inputs=[session_uuid_state], outputs=[sandbox_html]) | |
| .then(fn=reactivate_stop, outputs=[stop_btn]) | |
| ) | |
| stop_btn.click(fn=interrupt_agent, inputs=[session_state], outputs=[stop_btn]) | |
| # Init session | |
| demo.load( | |
| fn=lambda: True, | |
| outputs=[gr.Checkbox(value=True, visible=False)], | |
| ).then( | |
| fn=lambda interactive, browser_uuid: ( | |
| update_html(interactive, browser_uuid or str(uuid.uuid4())), | |
| browser_uuid or str(uuid.uuid4()), | |
| ), | |
| js="() => localStorage.getItem('gradio-session-uuid') || (() => { const id = self.crypto.randomUUID(); localStorage.setItem('gradio-session-uuid', id); return id })()", | |
| inputs=[gr.Checkbox(value=True, visible=False)], | |
| outputs=[sandbox_html, session_uuid_state], | |
| ) | |
| if __name__ == "__main__": | |
| Timer(60, cleanup_sandboxes).start() | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| theme=theme, | |
| css=custom_css, | |
| js=custom_js, | |
| ) | |