| """ |
| app.py β Enhanced Open Computer Agent v2.0 |
| ========================================== |
| Powered by smolagents + E2B + Playwright + Multi-Model Router + Memory + SoM + Voice |
| """ |
|
|
| import os |
| import json |
| import time |
| import uuid |
| import shutil |
| import base64 |
| from io import BytesIO |
| from threading import Timer |
| from typing import Any, Dict, List, Optional, Generator |
| from datetime import datetime |
|
|
| import gradio as gr |
| from dotenv import load_dotenv |
| from e2b_desktop import Sandbox |
| from gradio_modal import Modal |
| from huggingface_hub import login, upload_folder |
| from PIL import Image |
| from smolagents import CodeAgent |
| from smolagents.gradio_ui import GradioUI, stream_to_gradio |
|
|
| |
| from core_agent import ( |
| AgentConfig, |
| IntelligenceRouter, |
| HierarchicalPlanner, |
| VerifierAgent, |
| AgentMemory, |
| SoMPreprocessor, |
| SessionRecorder, |
| HITLCheckpoint, |
| CostTracker, |
| ModelCall, |
| Subtask, |
| ) |
| from mcp_tools import ( |
| BrowserMCP, |
| CodeExecutionMCP, |
| FileSystemMCP, |
| HFHubMCP, |
| make_browser_tools, |
| make_code_tools, |
| make_fs_tools, |
| make_hf_tools, |
| ) |
| from voice_interface import VoiceInterface |
| from eval_harness import EvaluationHarness, DEFAULT_BENCHMARKS |
|
|
| load_dotenv(override=True) |
|
|
| |
| |
| |
|
|
| E2B_API_KEY = os.getenv("E2B_API_KEY") |
| SANDBOXES: Dict[str, Sandbox] = {} |
| SANDBOX_METADATA: Dict[str, Dict[str, float]] = {} |
| SANDBOX_TIMEOUT = 600 |
| WIDTH = 1024 |
| HEIGHT = 768 |
| TMP_DIR = "./tmp/" |
| os.makedirs(TMP_DIR, exist_ok=True) |
|
|
| hf_token = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_API_KEY") |
| if hf_token: |
| login(token=hf_token) |
|
|
| |
| SESSION_COMPONENTS: Dict[str, Dict[str, Any]] = {} |
|
|
| |
| |
| |
|
|
| custom_css = """ |
| .modal-container { margin: var(--size-16) auto !important; } |
| .sandbox-container { position: relative; width: 910px; overflow: hidden; margin: auto; height: 800px; } |
| .sandbox-frame { display: none; position: absolute; top: 0; left: 0; width: 910px; height: 800px; pointer-events: none; } |
| .sandbox-iframe, .bsod-image { position: absolute; width: <<WIDTH>>px; height: <<HEIGHT>>px; border: 4px solid #444444; transform-origin: 0 0; } |
| .primary-color-label label span { font-weight: bold; color: var(--color-accent); } |
| .status-bar { display: flex; flex-direction: row; align-items: center; z-index: 100; } |
| .status-indicator { width: 15px; height: 15px; border-radius: 50%; } |
| .status-text { font-size: 16px; font-weight: bold; padding-left: 8px; text-shadow: none; } |
| .status-interactive { background-color: #2ecc71; animation: blink 2s infinite; } |
| .status-view-only { background-color: #e74c3c; } |
| .status-error { background-color: #e74c3c; animation: blink-error 1s infinite; } |
| @keyframes blink-error { 0% { background-color: rgba(231, 76, 60, 1); } 50% { background-color: rgba(231, 76, 60, 0.4); } 100% { background-color: rgba(231, 76, 60, 1); } } |
| @keyframes blink { 0% { background-color: rgba(46, 204, 113, 1); } 50% { background-color: rgba(46, 204, 113, 0.4); } 100% { background-color: rgba(46, 204, 113, 1); } } |
| #chatbot { height: 1000px !important; } |
| #chatbot .role { max-width: 95%; } |
| #chatbot .bubble-wrap { overflow-y: visible; } |
| .logo-container { display: flex; flex-direction: column; align-items: flex-start; width: 100%; box-sizing: border-box; gap: 5px; } |
| .logo-item { display: flex; align-items: center; padding: 0 30px; gap: 10px; text-decoration: none !important; color: #f59e0b; font-size: 17px; } |
| .logo-item:hover { color: #935f06 !important; } |
| .thought-stream { font-family: monospace; font-size: 13px; background: #1a1a2e; color: #a0c4ff; padding: 10px; border-radius: 8px; max-height: 300px; overflow-y: auto; white-space: pre-wrap; } |
| .plan-checklist { background: #16213e; padding: 10px; border-radius: 8px; } |
| .plan-checklist li { list-style: none; margin: 4px 0; } |
| .plan-checklist li.done::before { content: "β
"; } |
| .plan-checklist li.pending::before { content: "β¬ "; } |
| .plan-checklist li.running::before { content: "π "; } |
| .plan-checklist li.failed::before { content: "β "; } |
| .cost-badge { font-family: monospace; background: #0f3460; color: #e94560; padding: 4px 8px; border-radius: 4px; font-size: 12px; } |
| """.replace("<<WIDTH>>", str(WIDTH + 15)).replace("<<HEIGHT>>", str(HEIGHT + 10)) |
|
|
| footer_html = """ |
| <h3 style="text-align: center; margin-top:50px;"><i>Powered by open source:</i></h2> |
| <link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/4.7.0/css/font-awesome.min.css"> |
| <div class="logo-container"> |
| <a class="logo-item" href="https://github.com/huggingface/smolagents"><i class="fa fa-github"></i>smolagents</a> |
| <a class="logo-item" href="https://huggingface.co/Qwen/Qwen2.5-VL-72B-Instruct"><i class="fa fa-github"></i>Qwen2.5-VL</a> |
| <a class="logo-item" href="https://github.com/e2b-dev/desktop"><i class="fa fa-github"></i>E2B Desktop</a> |
| <a class="logo-item" href="https://playwright.dev"><i class="fa fa-github"></i>Playwright</a> |
| </div> |
| """ |
|
|
| sandbox_html_template = """ |
| <style>@import url('https://fonts.googleapis.com/css2?family=Oxanium:wght@200..800&display=swap');</style> |
| <h1 style="color:var(--color-accent);margin:0;">Open Computer Agent v2.0 β <i>Enhanced</i></h1> |
| <div class="sandbox-container" style="margin:0;"> |
| <div class="status-bar"> |
| <div class="status-indicator {status_class}"></div> |
| <div class="status-text">{status_text}</div> |
| </div> |
| <iframe id="sandbox-iframe" src="{stream_url}" class="sandbox-iframe" style="display:block;" allowfullscreen></iframe> |
| <img src="https://huggingface.co/datasets/mfarre/servedfiles/resolve/main/blue_screen_of_death.gif" class="bsod-image" style="display:none;"/> |
| <img src="https://huggingface.co/datasets/m-ric/images/resolve/main/HUD_thom.png" class="sandbox-frame" /> |
| </div> |
| """ |
|
|
| custom_js = """function() { |
| document.body.classList.add('dark'); |
| const checkSandboxTimeout = function() { |
| const timeElement = document.getElementById('sandbox-creation-time'); |
| if (timeElement) { |
| const creationTime = parseFloat(timeElement.getAttribute('data-time')); |
| const timeoutValue = parseFloat(timeElement.getAttribute('data-timeout')); |
| const currentTime = Math.floor(Date.now() / 1000); |
| const elapsedTime = currentTime - creationTime; |
| if (elapsedTime >= timeoutValue) { showBSOD('Error'); return; } |
| } |
| setTimeout(checkSandboxTimeout, 5000); |
| }; |
| const showBSOD = function(statusText = 'Error') { |
| const iframe = document.getElementById('sandbox-iframe'); |
| const bsod = document.querySelector('.bsod-image'); |
| if (iframe && bsod) { iframe.style.display = 'none'; bsod.style.display = 'block'; } |
| }; |
| const resetBSOD = function() { |
| const iframe = document.getElementById('sandbox-iframe'); |
| const bsod = document.querySelector('.bsod-image'); |
| if (iframe && bsod && bsod.style.display === 'block') { |
| iframe.style.display = 'block'; bsod.style.display = 'none'; return true; |
| } |
| return false; |
| }; |
| checkSandboxTimeout(); |
| document.addEventListener('click', function(e) { |
| if (e.target.tagName === 'BUTTON') { |
| if (e.target.innerText.includes("Let's go") || e.target.innerText.includes("Run")) { resetBSOD(); } |
| } |
| }); |
| const params = new URLSearchParams(window.location.search); |
| if (!params.has('__theme')) { params.set('__theme', 'dark'); window.location.search = params.toString(); } |
| }""" |
|
|
|
|
| |
| |
| |
|
|
| def upload_to_hf_and_remove(folder_path: str) -> str: |
| repo_id = "smolagents/computer-agent-logs" |
| try: |
| folder_name = os.path.basename(os.path.normpath(folder_path)) |
| url = upload_folder( |
| folder_path=folder_path, repo_id=repo_id, repo_type="dataset", |
| path_in_repo=folder_name, ignore_patterns=[".git/*", ".gitignore"], |
| ) |
| shutil.rmtree(folder_path) |
| return url |
| except Exception as e: |
| print(f"Upload error: {e}") |
| raise |
|
|
|
|
| def cleanup_sandboxes() -> None: |
| current_time = time.time() |
| to_remove = [sid for sid, meta in SANDBOX_METADATA.items() if current_time - meta["last_accessed"] > SANDBOX_TIMEOUT] |
| for sid in to_remove: |
| if sid in SANDBOXES: |
| try: |
| data_dir = os.path.join(TMP_DIR, sid) |
| if os.path.exists(data_dir): |
| upload_to_hf_and_remove(data_dir) |
| SANDBOXES[sid].kill() |
| del SANDBOXES[sid] |
| del SANDBOX_METADATA[sid] |
| print(f"Cleaned up sandbox {sid}") |
| except Exception as e: |
| print(f"Cleanup error for {sid}: {e}") |
|
|
|
|
| def get_or_create_sandbox(session_uuid: str) -> Sandbox: |
| current_time = time.time() |
| if session_uuid in SANDBOXES and session_uuid in SANDBOX_METADATA: |
| if current_time - SANDBOX_METADATA[session_uuid]["created_at"] < SANDBOX_TIMEOUT: |
| SANDBOX_METADATA[session_uuid]["last_accessed"] = current_time |
| return SANDBOXES[session_uuid] |
|
|
| if session_uuid in SANDBOXES: |
| try: |
| SANDBOXES[session_uuid].kill() |
| except Exception: |
| pass |
|
|
| desktop = Sandbox( |
| api_key=E2B_API_KEY, resolution=(WIDTH, HEIGHT), dpi=96, |
| timeout=SANDBOX_TIMEOUT, template="k0wmnzir0zuzye6dndlw", |
| ) |
| desktop.stream.start(require_auth=True) |
| setup_cmd = """sudo mkdir -p /usr/lib/firefox-esr/distribution && echo '{"policies":{"OverrideFirstRunPage":"","OverridePostUpdatePage":"","DisableProfileImport":true,"DontCheckDefaultBrowser":true}}' | sudo tee /usr/lib/firefox-esr/distribution/policies.json > /dev/null""" |
| desktop.commands.run(setup_cmd) |
| SANDBOXES[session_uuid] = desktop |
| SANDBOX_METADATA[session_uuid] = {"created_at": current_time, "last_accessed": current_time} |
| return desktop |
|
|
|
|
| def update_html(interactive_mode: bool, session_uuid: str) -> str: |
| desktop = get_or_create_sandbox(session_uuid) |
| auth_key = desktop.stream.get_auth_key() |
| base_url = desktop.stream.get_url(auth_key=auth_key) |
| stream_url = base_url if interactive_mode else f"{base_url}&view_only=true" |
| status_class = "status-interactive" if interactive_mode else "status-view-only" |
| status_text = "Interactive" if interactive_mode else "Agent running..." |
| creation_time = SANDBOX_METADATA.get(session_uuid, {}).get("created_at", time.time()) |
| html = sandbox_html_template.format( |
| stream_url=stream_url, status_class=status_class, status_text=status_text, |
| ) |
| html += f'<div id="sandbox-creation-time" style="display:none;" data-time="{creation_time}" data-timeout="{SANDBOX_TIMEOUT}"></div>' |
| return html |
|
|
|
|
| |
| |
| |
|
|
| def build_session_components(session_uuid: str, data_dir: str) -> Dict[str, Any]: |
| """Initialize all enhanced components for a session.""" |
| cfg = AgentConfig(hf_token=hf_token, cost_budget_usd=2.0) |
|
|
| |
| router = IntelligenceRouter(hf_token=hf_token) |
| planner = HierarchicalPlanner(router) |
| verifier = VerifierAgent(router) |
| memory = AgentMemory(persist_dir=f"./memory_db/{session_uuid}") |
| som = SoMPreprocessor(use_icon_detection=False) |
| hitl = HITLCheckpoint(auto_approve=False) |
| tracker = CostTracker() |
| recorder = SessionRecorder(session_uuid, output_dir=data_dir) |
| voice = VoiceInterface(hf_token=hf_token) |
|
|
| |
| try: |
| browser_mcp = BrowserMCP(headless=True) |
| except Exception: |
| browser_mcp = None |
| try: |
| code_mcp = CodeExecutionMCP(api_key=E2B_API_KEY) |
| except Exception: |
| code_mcp = None |
| fs_mcp = FileSystemMCP(base_dir=data_dir) |
| try: |
| hf_mcp = HFHubMCP(token=hf_token) |
| except Exception: |
| hf_mcp = None |
|
|
| components = { |
| "config": cfg, |
| "router": router, |
| "planner": planner, |
| "verifier": verifier, |
| "memory": memory, |
| "som": som, |
| "hitl": hitl, |
| "tracker": tracker, |
| "recorder": recorder, |
| "voice": voice, |
| "browser_mcp": browser_mcp, |
| "code_mcp": code_mcp, |
| "fs_mcp": fs_mcp, |
| "hf_mcp": hf_mcp, |
| } |
| SESSION_COMPONENTS[session_uuid] = components |
| return components |
|
|
|
|
| |
| |
| |
|
|
| def run_enhanced_agent( |
| task_input: str, |
| session_uuid: str, |
| use_planner: bool = True, |
| use_verifier: bool = True, |
| use_som: bool = False, |
| use_browser_mcp: bool = True, |
| consent_storage: bool = True, |
| ) -> Generator[List[gr.ChatMessage], None, None]: |
| """Yields chat messages with real-time thought streaming.""" |
|
|
| interaction_id = f"{session_uuid}_{int(time.time())}" |
| data_dir = os.path.join(TMP_DIR, interaction_id) |
| os.makedirs(data_dir, exist_ok=True) |
|
|
| desktop = get_or_create_sandbox(session_uuid) |
| comps = build_session_components(session_uuid, data_dir) |
| tracker: CostTracker = comps["tracker"] |
| recorder: SessionRecorder = comps["recorder"] |
| planner: HierarchicalPlanner = comps["planner"] |
| verifier: VerifierAgent = comps["verifier"] |
| memory: AgentMemory = comps["memory"] |
| hitl: HITLCheckpoint = comps["hitl"] |
| router: IntelligenceRouter = comps["router"] |
| som: SoMPreprocessor = comps["som"] |
| browser_mcp: BrowserMCP = comps["browser_mcp"] |
|
|
| tracker.start_task(interaction_id) |
|
|
| messages: List[gr.ChatMessage] = [] |
| messages.append(gr.ChatMessage(role="user", content=task_input)) |
| yield messages.copy() |
|
|
| |
| plan = None |
| if use_planner: |
| messages.append(gr.ChatMessage( |
| role="assistant", |
| content=f"π§ **Planning...** Breaking down: *{task_input}*", |
| )) |
| yield messages.copy() |
|
|
| |
| similar = memory.retrieve_similar(task_input, n_results=2) |
| context = "" |
| if similar: |
| context = "Previous successful strategies:\n" + "\n".join( |
| f"- {s.get('strategy_summary', '')}" for s in similar |
| ) |
|
|
| plan = planner.plan(task_input, context=context) |
| plan_md = "π **Plan**\n" |
| for st in plan.subtasks: |
| plan_md += f"- β¬ [{st.strategy}] {st.description}\n" |
| messages.append(gr.ChatMessage(role="assistant", content=plan_md)) |
| yield messages.copy() |
|
|
| |
| |
| |
|
|
| from e2bqwen import E2BVisionAgent, QwenVLAPIModel |
|
|
| |
| |
| vision_model = QwenVLAPIModel(model_id="Qwen/Qwen2.5-VL-72B-Instruct", hf_token=hf_token) |
|
|
| agent = E2BVisionAgent( |
| model=vision_model, |
| data_dir=data_dir, |
| desktop=desktop, |
| max_steps=100, |
| verbosity_level=2, |
| use_v1_prompt=True, |
| ) |
|
|
| |
| if use_browser_mcp: |
| try: |
| browser_mcp.start() |
| mcp_tools = make_browser_tools(browser_mcp) |
| |
| for name, fn in mcp_tools.items(): |
| agent.tools[name] = fn |
| messages.append(gr.ChatMessage( |
| role="assistant", |
| content="π **Playwright MCP connected.** Browser automation ready.", |
| )) |
| yield messages.copy() |
| except Exception as e: |
| messages.append(gr.ChatMessage( |
| role="assistant", |
| content=f"β οΈ Playwright MCP unavailable: {e}. Using vision-only fallback.", |
| )) |
| yield messages.copy() |
|
|
| |
| try: |
| hf_tools = make_hf_tools(comps["hf_mcp"]) |
| for name, fn in hf_tools.items(): |
| agent.tools[name] = fn |
| except Exception: |
| pass |
|
|
| |
| screenshot_bytes = desktop.screenshot(format="bytes") |
| initial_screenshot = Image.open(BytesIO(screenshot_bytes)) |
|
|
| |
| if use_som: |
| annotated, registry = som.preprocess(initial_screenshot) |
| annotated_path = os.path.join(data_dir, "som_initial.png") |
| annotated.save(annotated_path) |
| messages.append(gr.ChatMessage( |
| role="assistant", |
| content={"path": annotated_path, "mime_type": "image/png"}, |
| )) |
| yield messages.copy() |
|
|
| |
| step_count = 0 |
| try: |
| for msg in stream_to_gradio( |
| agent, task=task_input, task_images=[initial_screenshot], reset_agent_memory=False, |
| ): |
| step_count += 1 |
|
|
| |
| if step_count % 5 == 0: |
| cost_report = router.get_cost_report() |
| cost_text = f"π° Cost: ${cost_report['spent_usd']:.4f} / ${cost_report['budget_usd']:.2f} | Calls: {cost_report['calls']}" |
| messages.append(gr.ChatMessage(role="assistant", content=cost_text)) |
| yield messages.copy() |
|
|
| |
| if hasattr(agent, "last_marked_screenshot") and msg.content == "-----": |
| messages.append(gr.ChatMessage( |
| role="assistant", |
| content={"path": agent.last_marked_screenshot.to_string(), "mime_type": "image/png"}, |
| )) |
|
|
| messages.append(msg) |
| yield messages.copy() |
|
|
| |
| if hasattr(agent, "memory") and agent.memory.steps: |
| last_step = agent.memory.steps[-1] |
| if hasattr(last_step, "tool_calls") and last_step.tool_calls: |
| action_str = str(last_step.tool_calls[0]) |
| approved, reason = hitl.check_action(action_str) |
| if not approved: |
| messages.append(gr.ChatMessage( |
| role="assistant", |
| content=f"π **HITL Checkpoint:** {reason}\nPlease approve or modify the action.", |
| )) |
| yield messages.copy() |
| |
| |
| time.sleep(0.5) |
|
|
| |
| if use_verifier and plan: |
| messages.append(gr.ChatMessage(role="assistant", content="π **Verifying task completion...**")) |
| yield messages.copy() |
|
|
| final_screenshot_bytes = desktop.screenshot(format="bytes") |
| final_screenshot = Image.open(BytesIO(final_screenshot_bytes)) |
| trace = [str(s) for s in agent.memory.steps[-20:]] |
| for st in plan.subtasks: |
| result = verifier.verify(st, trace, final_screenshot) |
| status_icon = "β
" if result.get("success") else "β" |
| messages.append(gr.ChatMessage( |
| role="assistant", |
| content=f"{status_icon} **{st.description}** β {result.get('reason', '')}", |
| )) |
| yield messages.copy() |
|
|
| |
| final_output = agent.memory.steps[-1].observations if agent.memory.steps else "Task completed." |
| memory.add_task( |
| task=task_input, |
| strategy_summary=f"Completed in {step_count} steps. Final: {str(final_output)[:200]}", |
| success=True, |
| domain=plan.subtasks[0].strategy if plan and plan.subtasks else "general", |
| ) |
|
|
| |
| report = tracker.get_task_report(interaction_id) |
| cost_summary = ( |
| f"π **Task Complete**\n" |
| f"- Steps: {step_count}\n" |
| f"- Cost: ${report['total_cost_usd']:.4f}\n" |
| f"- Tokens: {report['total_tokens']}\n" |
| f"- Avg latency: {report['avg_latency_ms']}ms" |
| ) |
| messages.append(gr.ChatMessage(role="assistant", content=cost_summary)) |
| yield messages.copy() |
|
|
| if consent_storage: |
| from e2bqwen import get_agent_summary_erase_images |
| summary = get_agent_summary_erase_images(agent) |
| with open(os.path.join(data_dir, "metadata.json"), "w") as f: |
| json.dump({"status": "completed", "summary": summary, "cost_report": report}, f, default=str) |
| upload_to_hf_and_remove(data_dir) |
|
|
| except Exception as e: |
| error_msg = f"Error: {str(e)}" |
| messages.append(gr.ChatMessage(role="assistant", content=f"π₯ **Run failed:**\n{error_msg}")) |
| yield messages.copy() |
| if consent_storage: |
| with open(os.path.join(data_dir, "metadata.json"), "w") as f: |
| json.dump({"status": "failed", "error": error_msg}, f) |
| upload_to_hf_and_remove(data_dir) |
| finally: |
| try: |
| if browser_mcp: |
| browser_mcp.close() |
| except Exception: |
| pass |
|
|
|
|
| |
| |
| |
|
|
| theme = gr.themes.Default(font=["Oxanium", "sans-serif"], primary_hue="amber", secondary_hue="blue") |
|
|
| with gr.Blocks(theme=theme, css=custom_css, js=custom_js, title="Computer Agent v2.0") as demo: |
| session_uuid_state = gr.State(None) |
|
|
| with gr.Row(): |
| |
| sandbox_html = gr.HTML( |
| value=sandbox_html_template.format(stream_url="", status_class="status-interactive", status_text="Interactive"), |
| label="Desktop", |
| ) |
|
|
| with gr.Sidebar(position="left"): |
| with Modal(visible=True) as modal: |
| gr.Markdown(""" |
| ### π₯οΈ Open Computer Agent v2.0 |
| Welcome to the **enhanced** computer agent powered by: |
| - **Multi-Model Router** (auto-selects cheapest capable model) |
| - **Playwright MCP** (semantic browser control) |
| - **Hierarchical Planner** + **Verifier** |
| - **Set-of-Marks Vision** + **Long-Term Memory** |
| - **Voice I/O** + **Human-in-the-Loop** |
| - **Cost Dashboard** + **Session Recording** |
| |
| π Type a task, hit **Run**, and watch the agent think, plan, and execute. |
| """) |
|
|
| task_input = gr.Textbox( |
| value="Find me pictures of cute puppies", |
| label="Enter your task:", |
| elem_classes="primary-color-label", |
| ) |
|
|
| with gr.Row(): |
| run_btn = gr.Button("π Let's go!", variant="primary") |
| voice_input = gr.Audio(sources=["microphone"], type="numpy", label="Or speak your task") |
|
|
| gr.Examples( |
| examples=[ |
| "Use Google Maps to find the Hugging Face HQ in Paris", |
| "Go to Wikipedia and find what happened on April 4th", |
| "Find train travel time from Bern to Basel on Google Maps", |
| "Go to Hugging Face Spaces, find flux.1 schnell, generate an image of a GPU", |
| "Search HF Hub for top text-to-video models and list them", |
| "Open GitHub trending and find the top Python repo today", |
| ], |
| inputs=task_input, |
| label="Example Tasks", |
| examples_per_page=6, |
| ) |
|
|
| with gr.Accordion("βοΈ Advanced Options", open=False): |
| use_planner_cb = gr.Checkbox(label="Use Hierarchical Planner", value=True) |
| use_verifier_cb = gr.Checkbox(label="Use Verifier", value=True) |
| use_som_cb = gr.Checkbox(label="Use Set-of-Marks Vision", value=False) |
| use_browser_cb = gr.Checkbox(label="Use Playwright Browser MCP", value=True) |
| consent_storage_cb = gr.Checkbox(label="Store task & agent trace?", value=True) |
| auto_approve_cb = gr.Checkbox(label="Auto-approve all actions (disable HITL)", value=False) |
|
|
| session_state = gr.State({}) |
| stored_messages = gr.State([]) |
|
|
| |
| cost_display = gr.HTML(value='<span class="cost-badge">Cost: $0.0000 / $2.00</span>', label="Cost Tracker") |
|
|
| gr.Markdown(""" |
| - **Data**: Uncheck storage to opt-out. No personal data please. |
| - **Captcha**: VMs may get flagged. Interrupt and solve manually if needed. |
| - **HITL**: Sensitive actions pause for approval unless auto-approve is on. |
| - **Restart**: Refresh the page if the agent seems stuck. |
| """) |
|
|
| footer = gr.HTML(value=footer_html) |
|
|
| |
| with gr.Row(): |
| with gr.Column(scale=1): |
| plan_display = gr.Markdown(label="π Plan", value="*Plan will appear here...*") |
| with gr.Column(scale=2): |
| chatbot_display = gr.Chatbot( |
| elem_id="chatbot", |
| label="Agent's Execution Logs", |
| type="messages", |
| avatar_images=( |
| None, |
| "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png", |
| ), |
| resizable=True, |
| ) |
|
|
| stop_btn = gr.Button("π Stop the agent!", variant="huggingface") |
|
|
| |
|
|
| def clear_and_set_view_only(task_input, session_uuid): |
| return update_html(False, session_uuid) |
|
|
| def set_interactive(session_uuid): |
| return update_html(True, session_uuid) |
|
|
| def reactivate_stop(): |
| return gr.Button("π Stop the agent!", variant="huggingface") |
|
|
| def update_cost_display(): |
| |
| total = 0.0 |
| for comps in SESSION_COMPONENTS.values(): |
| total += comps.get("router", IntelligenceRouter(hf_token=hf_token)).cost_so_far_usd |
| return f'<span class="cost-badge">Cost: ${total:.4f} / $2.00</span>' |
|
|
| def process_voice(audio_tuple, session_uuid): |
| if audio_tuple is None: |
| return "" |
| comps = SESSION_COMPONENTS.get(session_uuid) |
| if not comps: |
| |
| data_dir = os.path.join(TMP_DIR, session_uuid) |
| comps = build_session_components(session_uuid, data_dir) |
| voice: VoiceInterface = comps["voice"] |
| try: |
| text = voice.process_gradio_audio(audio_tuple) |
| return text |
| except Exception as e: |
| return f"[Voice error: {e}]" |
|
|
| def interrupt_agent(session_state): |
| agent = session_state.get("agent") |
| if agent and hasattr(agent, "interrupt_switch") and not agent.interrupt_switch: |
| agent.interrupt() |
| return gr.Button("Stopping agent...", variant="secondary") |
| return gr.Button("π Stop the agent!", variant="huggingface") |
|
|
| |
| voice_input.stop_recording( |
| fn=process_voice, |
| inputs=[voice_input, session_uuid_state], |
| outputs=[task_input], |
| ) |
|
|
| |
| run_event = ( |
| run_btn.click( |
| fn=clear_and_set_view_only, |
| inputs=[task_input, session_uuid_state], |
| outputs=[sandbox_html], |
| ) |
| .then( |
| fn=run_enhanced_agent, |
| inputs=[ |
| task_input, |
| session_uuid_state, |
| use_planner_cb, |
| use_verifier_cb, |
| use_som_cb, |
| use_browser_cb, |
| consent_storage_cb, |
| ], |
| outputs=[chatbot_display], |
| ) |
| .then(fn=set_interactive, inputs=[session_uuid_state], outputs=[sandbox_html]) |
| .then(fn=update_cost_display, outputs=[cost_display]) |
| .then(fn=reactivate_stop, outputs=[stop_btn]) |
| ) |
|
|
| stop_btn.click(fn=interrupt_agent, inputs=[session_state], outputs=[stop_btn]) |
|
|
| |
| demo.load( |
| fn=lambda: True, |
| outputs=[gr.Checkbox(value=True, visible=False)], |
| ).then( |
| fn=lambda interactive, browser_uuid: ( |
| update_html(interactive, browser_uuid or str(uuid.uuid4())), |
| browser_uuid or str(uuid.uuid4()), |
| ), |
| js="() => localStorage.getItem('gradio-session-uuid') || (() => { const id = self.crypto.randomUUID(); localStorage.setItem('gradio-session-uuid', id); return id })()", |
| inputs=[gr.Checkbox(value=True, visible=False)], |
| outputs=[sandbox_html, session_uuid_state], |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| Timer(60, cleanup_sandboxes).start() |
| demo.launch(server_name="0.0.0.0", server_port=7860) |
|
|