import json import os import shutil import time import uuid import unicodedata from io import BytesIO from threading import Timer from typing import Any, Dict, List, Optional from datetime import datetime import gradio as gr import torch import spaces from dotenv import load_dotenv from e2b_desktop import Sandbox from gradio_modal import Modal from huggingface_hub import login, upload_folder from PIL import Image, ImageDraw # Smolagents imports from smolagents import CodeAgent, tool, AgentImage from smolagents.memory import ActionStep, TaskStep from smolagents.models import ChatMessage, Model, MessageRole from smolagents.gradio_ui import GradioUI, stream_to_gradio from smolagents.monitoring import LogLevel # Transformers for Fara Model from transformers import ( Qwen2_5_VLForConditionalGeneration, AutoProcessor, ) from qwen_vl_utils import process_vision_info load_dotenv(override=True) # ----------------------------------------------------------------------------- # CONFIGURATION & CONSTANTS # ----------------------------------------------------------------------------- E2B_API_KEY = os.getenv("E2B_API") HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_API_KEY") if HF_TOKEN: login(token=HF_TOKEN) SANDBOXES = {} SANDBOX_METADATA = {} SANDBOX_TIMEOUT = 600 WIDTH = 1024 HEIGHT = 768 TMP_DIR = "./tmp/" if not os.path.exists(TMP_DIR): os.makedirs(TMP_DIR) # ----------------------------------------------------------------------------- # MODEL INITIALIZATION (Fara-7B / Qwen2.5-VL) # ----------------------------------------------------------------------------- print("Loading Fara Model... This may take a moment.") DEVICE = "cuda" if torch.cuda.is_available() else "cpu" # Using the Microsoft Fara model as requested MODEL_ID_F = "microsoft/Fara-7B" # Global model variables model_f = None processor_f = None try: processor_f = AutoProcessor.from_pretrained(MODEL_ID_F, trust_remote_code=True) model_f = Qwen2_5_VLForConditionalGeneration.from_pretrained( MODEL_ID_F, trust_remote_code=True, torch_dtype=torch.bfloat16 if DEVICE == "cuda" else torch.float32, device_map="auto", ) print(f"Fara Model loaded successfully on {DEVICE}") except Exception as e: print(f"Error loading Fara Model: {e}") print("Falling back to Qwen/Qwen2.5-VL-7B-Instruct for demonstration if Fara is unavailable...") try: MODEL_ID_F = "Qwen/Qwen2.5-VL-7B-Instruct" processor_f = AutoProcessor.from_pretrained(MODEL_ID_F, trust_remote_code=True) model_f = Qwen2_5_VLForConditionalGeneration.from_pretrained( MODEL_ID_F, trust_remote_code=True, torch_dtype=torch.bfloat16 if DEVICE == "cuda" else torch.float32, device_map="auto", ) print(f"Fallback Model ({MODEL_ID_F}) loaded successfully.") except Exception as inner_e: print(f"Critical error loading model: {inner_e}") # ----------------------------------------------------------------------------- # GPU ISOLATED INFERENCE FUNCTION # ----------------------------------------------------------------------------- @spaces.GPU(duration=120) def run_model_inference(formatted_messages, max_tokens=1024, stop_sequences=None): """ This function runs on the GPU worker. It receives simple python objects (lists/dicts), not the complex Agent object. """ global model_f, processor_f if model_f is None: raise ValueError("Model is not loaded.") # Process Inputs (Tokenization happens here to ensure tensors are on correct device) text = processor_f.apply_chat_template( formatted_messages, tokenize=False, add_generation_prompt=True ) image_inputs, video_inputs = process_vision_info(formatted_messages) inputs = processor_f( text=[text], images=image_inputs, videos=video_inputs, padding=True, return_tensors="pt", ) # Move inputs to the model's device (GPU) inputs = inputs.to(model_f.device) # Generate with torch.no_grad(): generated_ids = model_f.generate( **inputs, max_new_tokens=max_tokens, stop_strings=stop_sequences, tokenizer=processor_f.tokenizer, ) # Decode generated_ids_trimmed = [ out_ids[len(in_ids) :] for in_ids, out_ids in zip(inputs.input_ids, generated_ids) ] output_text = processor_f.batch_decode( generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False )[0] return output_text class FaraLocalModel(Model): """ Wrapper for the local Fara (Qwen2.5-VL) model to work with SmolAgents. """ def __init__(self, **kwargs): super().__init__(**kwargs) def __call__( self, messages: List[Dict[str, Any]], stop_sequences: Optional[List[str]] = None, **kwargs, ) -> ChatMessage: formatted_messages = [] # Convert SmolAgents messages to Qwen/Transformers format # We perform this conversion here (CPU side) to create simple dicts/lists for msg in messages: role = msg["role"] content = msg["content"] new_content = [] if isinstance(content, str): new_content.append({"type": "text", "text": content}) elif isinstance(content, list): for item in content: if isinstance(item, str): new_content.append({"type": "text", "text": item}) elif isinstance(item, dict): if "type" in item: if item["type"] == "image": # Handle path or url - extract value to ensure serializability val = item.get("image") or item.get("url") or item.get("path") new_content.append({"type": "image", "image": val}) else: new_content.append(item) formatted_messages.append({"role": role, "content": new_content}) # Call the decorated global function # This crosses the boundary to the GPU worker safely because # formatted_messages contains only standard Python types (str, list, dict, PIL.Image) output_text = run_model_inference( formatted_messages=formatted_messages, max_tokens=kwargs.get("max_tokens", 1024), stop_sequences=stop_sequences ) return ChatMessage( role=MessageRole.ASSISTANT, content=output_text, ) # ----------------------------------------------------------------------------- # E2B AGENT & TOOLS # ----------------------------------------------------------------------------- E2B_SYSTEM_PROMPT_TEMPLATE = """You are a desktop automation assistant that can control a remote desktop environment. The current date is <>. You will be given a task to solve in several steps. At each step you will perform an action. After each action, you'll receive an updated screenshot. Then you will proceed as follows, with these sections: don't skip any! Short term goal: ... What I see: ... Reflection: ... Action: ```python click(254, 308) ``` Always format your action ('Action:' part) as Python code blocks as shown above. On top of performing computations in the Python code snippets that you create, you only have access to these tools to interact with the desktop, no additional ones: {%- for tool in tools.values() %} - {{ tool.name }}: {{ tool.description }} Takes inputs: {{tool.inputs}} Returns an output of type: {{tool.output_type}} {%- endfor %} Look at elements on the screen to determine what to click or interact with. The desktop has a resolution of <>x<> pixels, take it into account to decide clicking coordinates. NEVER USE HYPOTHETIC OR ASSUMED COORDINATES, USE TRUE COORDINATES that you can see from the screenshot. Use precise coordinates based on the current screenshot for mouse movements and clicks. Whenever you click, MAKE SURE to click in the middle of the button, text, link or any other clickable element. Not under, not on the side. IN THE MIDDLE, else you risk to miss it. In menus it is always better to click in the middle of the text rather than in the tiny icon. Calculate extremelly well the coordinates. A mistake here can make the full task fail. Sometimes you may have missed a click, so never assume that you're on the right page, always make sure that your previous action worked. In the screenshot you will see a green crosshair displayed over the position of your last click: this way can inspect if the mouse pointer is off of the targeted element, pay special attention to it. Always analyze the latest screenshot carefully before performing actions. You can wait for appropriate loading times using the wait() tool. But don't wait forever, sometimes you've just misclicked and the process didn't launch. Execute one action at a time: don't try to pack a click and typing in one action. On each step, look at the last screenshot and action to validate if previous steps worked and decide the next action. If you repeated an action already without effect, it means that this action is useless: don't repeat it and try something else. Use click to move through menus on the desktop and scroll for web and specific applications. Always analyze the latest screenshot carefully before performing actions. Desktop menus usually expand with more options, the tiny triangle next to some text in a menu means that menu expands. For example in Office in the Applications menu expands showing presentation or writing applications. NEVER CLICK THE WEB BROWSER ICON TO OPEN THE WEB BROWSER: use open_url directly. In browser, ignore any sign-in popups while they don't interfere with the elements you want to interact with. """.replace("<>", datetime.now().strftime("%A, %d-%B-%Y")) def draw_marker_on_image(image_copy, click_coordinates): x, y = click_coordinates draw = ImageDraw.Draw(image_copy) cross_size, linewidth = 10, 3 # Draw cross draw.line((x - cross_size, y, x + cross_size, y), fill="green", width=linewidth) draw.line((x, y - cross_size, x, y + cross_size), fill="green", width=linewidth) # Add a circle around it for better visibility draw.ellipse( ( x - cross_size * 2, y - cross_size * 2, x + cross_size * 2, y + cross_size * 2, ), outline="green", width=linewidth, ) return image_copy def get_agent_summary_erase_images(agent): for memory_step in agent.memory.steps: if hasattr(memory_step, "observations_images"): memory_step.observations_images = None if hasattr(memory_step, "task_images"): memory_step.task_images = None return agent.write_memory_to_messages() class E2BVisionAgent(CodeAgent): """Agent for e2b desktop automation with Vision capabilities""" def __init__( self, model: Model, data_dir: str, desktop: Sandbox, tools: List[tool] = None, max_steps: int = 200, verbosity_level: LogLevel = 2, planning_interval: int = None, use_v1_prompt: bool = False, **kwargs, ): self.desktop = desktop self.data_dir = data_dir self.planning_interval = planning_interval # Initialize Desktop self.width, self.height = self.desktop.get_screen_size() print(f"Screen size: {self.width}x{self.height}") # Set up temp directory os.makedirs(self.data_dir, exist_ok=True) print(f"Screenshots and steps will be saved to: {self.data_dir}") self.use_v1_prompt = use_v1_prompt # Initialize base agent super().__init__( tools=tools or [], model=model, max_steps=max_steps, verbosity_level=verbosity_level, planning_interval=self.planning_interval, **kwargs, ) self.prompt_templates["system_prompt"] = E2B_SYSTEM_PROMPT_TEMPLATE.replace( "<>", str(self.width) ).replace("<>", str(self.height)) # Add screen info to state self.state["screen_width"] = self.width self.state["screen_height"] = self.height # Add default tools self.logger.log("Setting up agent tools...") self._setup_desktop_tools() self.step_callbacks.append(self.take_screenshot_callback) def _setup_desktop_tools(self): """Register all desktop tools""" @tool def click(x: int, y: int) -> str: """ Performs a left-click at the specified coordinates Args: x: The x coordinate (horizontal position) y: The y coordinate (vertical position) """ self.desktop.move_mouse(x, y) self.desktop.left_click() self.click_coordinates = [x, y] self.logger.log(f"Clicked at coordinates ({x}, {y})") return f"Clicked at coordinates ({x}, {y})" @tool def right_click(x: int, y: int) -> str: """ Performs a right-click at the specified coordinates Args: x: The x coordinate (horizontal position) y: The y coordinate (vertical position) """ self.desktop.move_mouse(x, y) self.desktop.right_click() self.click_coordinates = [x, y] self.logger.log(f"Right-clicked at coordinates ({x}, {y})") return f"Right-clicked at coordinates ({x}, {y})" @tool def double_click(x: int, y: int) -> str: """ Performs a double-click at the specified coordinates Args: x: The x coordinate (horizontal position) y: The y coordinate (vertical position) """ self.desktop.move_mouse(x, y) self.desktop.double_click() self.click_coordinates = [x, y] self.logger.log(f"Double-clicked at coordinates ({x}, {y})") return f"Double-clicked at coordinates ({x}, {y})" @tool def move_mouse(x: int, y: int) -> str: """ Moves the mouse cursor to the specified coordinates Args: x: The x coordinate (horizontal position) y: The y coordinate (vertical position) """ self.desktop.move_mouse(x, y) self.logger.log(f"Moved mouse to coordinates ({x}, {y})") return f"Moved mouse to coordinates ({x}, {y})" def normalize_text(text): return "".join( c for c in unicodedata.normalize("NFD", text) if not unicodedata.combining(c) ) @tool def type_text(text: str) -> str: """ Types the specified text at the current cursor position. Args: text: The text to type """ clean_text = normalize_text(text) self.desktop.write(clean_text, delay_in_ms=75) self.logger.log(f"Typed text: '{clean_text}'") return f"Typed text: '{clean_text}'" @tool def press_key(key: str) -> str: """ Presses a keyboard key Args: key: The key to press (e.g. "enter", "space", "backspace", etc.). """ self.desktop.press(key) self.logger.log(f"Pressed key: {key}") return f"Pressed key: {key}" @tool def go_back() -> str: """ Goes back to the previous page in the browser. """ self.desktop.press(["alt", "left"]) self.logger.log("Went back one page") return "Went back one page" @tool def drag_and_drop(x1: int, y1: int, x2: int, y2: int) -> str: """ Clicks [x1, y1], drags mouse to [x2, y2], then release click. Args: x1: The x coordinate of the start position. y1: The y coordinate of the start position. x2: The x coordinate of the end position. y2: The y coordinate of the end position. """ self.desktop.drag([x1, y1], [x2, y2]) message = f"Dragged and dropped from [{x1}, {y1}] to [{x2}, {y2}]" self.logger.log(message) return message @tool def scroll(x: int, y: int, direction: str = "down", amount: int = 2) -> str: """ Moves the mouse to selected coordinates, then uses the scroll button. Args: x: The x coordinate y: The y coordinate direction: "up" or "down" amount: The amount to scroll. """ self.desktop.move_mouse(x, y) self.desktop.scroll(direction=direction, amount=amount) message = f"Scrolled {direction} by {amount}" self.logger.log(message) return message @tool def wait(seconds: float) -> str: """ Waits for the specified number of seconds. Args: seconds: The duration to wait in seconds. """ time.sleep(seconds) self.logger.log(f"Waited for {seconds} seconds") return f"Waited for {seconds} seconds" @tool def open_url(url: str) -> str: """ Directly opens a browser with the specified url. Args: url: The website URL to open. """ if not url.startswith(("http://", "https://")): url = "https://" + url self.desktop.open(url) time.sleep(2) self.logger.log(f"Opening URL: {url}") return f"Opened URL: {url}" @tool def find_on_page_ctrl_f(search_string: str) -> str: """ Scroll the browser viewport to the first occurrence of the search string (Ctrl+F). Args: search_string: The text to search for on the page. """ self.desktop.press(["ctrl", "f"]) time.sleep(0.3) clean_text = normalize_text(search_string) self.desktop.write(clean_text, delay_in_ms=75) time.sleep(0.3) self.desktop.press("enter") time.sleep(0.3) self.desktop.press("esc") output_message = f"Scrolled to the first occurrence of '{clean_text}'" self.logger.log(output_message) return output_message # Register the tools self.tools["click"] = click self.tools["right_click"] = right_click self.tools["double_click"] = double_click self.tools["move_mouse"] = move_mouse self.tools["type_text"] = type_text self.tools["press_key"] = press_key self.tools["scroll"] = scroll self.tools["wait"] = wait self.tools["open_url"] = open_url self.tools["go_back"] = go_back self.tools["drag_and_drop"] = drag_and_drop self.tools["find_on_page_ctrl_f"] = find_on_page_ctrl_f def take_screenshot_callback(self, memory_step: ActionStep, agent=None) -> None: """Callback that takes a screenshot + memory snapshot after a step completes""" self.logger.log("Analyzing screen content...") current_step = memory_step.step_number time.sleep(2.5) # Let things happen on the desktop screenshot_bytes = self.desktop.screenshot(format="bytes") image = Image.open(BytesIO(screenshot_bytes)) # Create a filename with step number screenshot_path = os.path.join(self.data_dir, f"step_{current_step:03d}.png") image.save(screenshot_path) image_copy = image.copy() if getattr(self, "click_coordinates", None): image_copy = draw_marker_on_image(image_copy, self.click_coordinates) self.last_marked_screenshot = AgentImage(screenshot_path) print(f"Saved screenshot for step {current_step} to {screenshot_path}") # Optimization: remove previous raw images from memory to save context/speed for previous_memory_step in agent.memory.steps: if ( isinstance(previous_memory_step, ActionStep) and previous_memory_step.step_number <= current_step - 1 ): previous_memory_step.observations_images = None elif isinstance(previous_memory_step, TaskStep): previous_memory_step.task_images = None # Add the marker-edited image to the current memory step memory_step.observations_images = [image_copy] self.click_coordinates = None # Reset click marker # ----------------------------------------------------------------------------- # SANDBOX MANAGEMENT & HELPERS # ----------------------------------------------------------------------------- def upload_to_hf_and_remove(folder_path): repo_id = "smolagents/computer-agent-logs" try: folder_name = os.path.basename(os.path.normpath(folder_path)) print(f"Uploading {folder_path} to {repo_id}/{folder_name}...") url = upload_folder( folder_path=folder_path, repo_id=repo_id, repo_type="dataset", path_in_repo=folder_name, ignore_patterns=[".git/*", ".gitignore"], ) print(f"Upload complete. Removing local folder {folder_path}...") shutil.rmtree(folder_path) return url except Exception as e: print(f"Error during upload or cleanup: {str(e)}") # Don't raise, just log, to keep app running return None def cleanup_sandboxes(): current_time = time.time() sandboxes_to_remove = [] for session_id, metadata in SANDBOX_METADATA.items(): if current_time - metadata["last_accessed"] > SANDBOX_TIMEOUT: sandboxes_to_remove.append(session_id) for session_id in sandboxes_to_remove: if session_id in SANDBOXES: try: data_dir = os.path.join(TMP_DIR, session_id) if os.path.exists(data_dir): shutil.rmtree(data_dir) # Just local cleanup for this demo SANDBOXES[session_id].kill() del SANDBOXES[session_id] del SANDBOX_METADATA[session_id] print(f"Cleaned up sandbox for session {session_id}") except Exception as e: print(f"Error cleaning up sandbox {session_id}: {str(e)}") def get_or_create_sandbox(session_uuid): current_time = time.time() if ( session_uuid in SANDBOXES and session_uuid in SANDBOX_METADATA and current_time - SANDBOX_METADATA[session_uuid]["created_at"] < SANDBOX_TIMEOUT ): print(f"Reusing Sandbox for {session_uuid}") SANDBOX_METADATA[session_uuid]["last_accessed"] = current_time return SANDBOXES[session_uuid] else: print("No sandbox found, creating a new one") if session_uuid in SANDBOXES: try: SANDBOXES[session_uuid].kill() except Exception: pass print(f"Creating new sandbox for session {session_uuid}") desktop = Sandbox( api_key=E2B_API_KEY, resolution=(WIDTH, HEIGHT), dpi=96, timeout=SANDBOX_TIMEOUT, template="k0wmnzir0zuzye6dndlw", ) desktop.stream.start(require_auth=True) setup_cmd = """sudo mkdir -p /usr/lib/firefox-esr/distribution && echo '{"policies":{"OverrideFirstRunPage":"","OverridePostUpdatePage":"","DisableProfileImport":true,"DontCheckDefaultBrowser":true}}' | sudo tee /usr/lib/firefox-esr/distribution/policies.json > /dev/null""" desktop.commands.run(setup_cmd) SANDBOXES[session_uuid] = desktop SANDBOX_METADATA[session_uuid] = { "created_at": current_time, "last_accessed": current_time, } return desktop def save_final_status(folder, status: str, summary, error_message=None) -> None: try: with open(os.path.join(folder, "metadata.json"), "w") as output_file: output_file.write( json.dumps( {"status": status, "summary": summary, "error_message": error_message}, default=str ) ) except Exception as e: print(f"Failed to save metadata: {e}") def create_agent(data_dir, desktop): # Initialize the wrapper that calls the global GPU function model = FaraLocalModel() return E2BVisionAgent( model=model, data_dir=data_dir, desktop=desktop, max_steps=200, verbosity_level=2, use_v1_prompt=True, ) def generate_interaction_id(session_uuid): return f"{session_uuid}_{int(time.time())}" # ----------------------------------------------------------------------------- # GRADIO UI & INTERACTION # ----------------------------------------------------------------------------- custom_css = """ .modal-container { margin: var(--size-16) auto!important; } .sandbox-container { position: relative; width: 910px; height: 800px; overflow: hidden; margin: auto; } .sandbox-frame { display: none; position: absolute; top: 0; left: 0; width: 910px; height: 800px; pointer-events:none; } .sandbox-iframe, .bsod-image { position: absolute; width: <>px; height: <>px; border: 4px solid #444444; transform-origin: 0 0; } .primary-color-label label span { font-weight: bold; color: var(--color-accent); } .status-bar { display: flex; flex-direction: row; align-items: center; z-index: 100; } .status-indicator { width: 15px; height: 15px; border-radius: 50%; } .status-text { font-size: 16px; font-weight: bold; padding-left: 8px; text-shadow: none; } .status-interactive { background-color: #2ecc71; animation: blink 2s infinite; } .status-view-only { background-color: #e74c3c; } .status-error { background-color: #e74c3c; animation: blink-error 1s infinite; } @keyframes blink-error { 0% { background-color: rgba(231, 76, 60, 1); } 50% { background-color: rgba(231, 76, 60, 0.4); } 100% { background-color: rgba(231, 76, 60, 1); } } @keyframes blink { 0% { background-color: rgba(46, 204, 113, 1); } 50% { background-color: rgba(46, 204, 113, 0.4); } 100% { background-color: rgba(46, 204, 113, 1); } } #chatbot { height:1000px!important; } #chatbot .role { max-width:95% } .logo-container { display: flex; flex-direction: column; align-items: flex-start; gap: 5px; } .logo-item { display: flex; align-items: center; padding: 0 30px; gap: 10px; text-decoration: none!important; color: #f59e0b; font-size:17px; } """.replace("<>", str(WIDTH + 15)).replace("<>", str(HEIGHT + 10)) sandbox_html_template = """

Fara CUA - Powered by smolagents

{status_text}
""".replace("<>", str(WIDTH + 15)).replace("<>", str(HEIGHT + 10)) custom_js = """function() { document.body.classList.add('dark'); // Function to check if sandbox is timing out const checkSandboxTimeout = function() { const timeElement = document.getElementById('sandbox-creation-time'); if (timeElement) { const creationTime = parseFloat(timeElement.getAttribute('data-time')); const timeoutValue = parseFloat(timeElement.getAttribute('data-timeout')); const currentTime = Math.floor(Date.now() / 1000); if (currentTime - creationTime >= timeoutValue) { showBSOD('Error'); return; } } setTimeout(checkSandboxTimeout, 5000); }; const showBSOD = function(statusText = 'Error') { const iframe = document.getElementById('sandbox-iframe'); const bsod = document.getElementById('bsod-image'); if (iframe && bsod) { iframe.style.display = 'none'; bsod.style.display = 'block'; document.querySelector('.status-indicator').className = 'status-indicator status-error'; document.querySelector('.status-text').innerText = statusText; } }; const resetBSOD = function() { const iframe = document.getElementById('sandbox-iframe'); const bsod = document.getElementById('bsod-image'); if (iframe && bsod && bsod.style.display === 'block') { iframe.style.display = 'block'; bsod.style.display = 'none'; } }; document.addEventListener('click', function(e) { if (e.target.tagName === 'BUTTON' && e.target.innerText === "Let's go!") { resetBSOD(); } }); checkSandboxTimeout(); const params = new URLSearchParams(window.location.search); if (!params.has('__theme')) { params.set('__theme', 'dark'); window.location.search = params.toString(); } }""" def update_html(interactive_mode: bool, session_uuid): desktop = get_or_create_sandbox(session_uuid) auth_key = desktop.stream.get_auth_key() base_url = desktop.stream.get_url(auth_key=auth_key) stream_url = base_url if interactive_mode else f"{base_url}&view_only=true" status_class = "status-interactive" if interactive_mode else "status-view-only" status_text = "Interactive" if interactive_mode else "Agent running..." creation_time = ( SANDBOX_METADATA[session_uuid]["created_at"] if session_uuid in SANDBOX_METADATA else time.time() ) sandbox_html_content = sandbox_html_template.format( stream_url=stream_url, status_class=status_class, status_text=status_text, ) sandbox_html_content += f'' return sandbox_html_content def initialize_session(interactive_mode, browser_uuid): if not browser_uuid: new_uuid = str(uuid.uuid4()) return update_html(interactive_mode, new_uuid), new_uuid else: return update_html(interactive_mode, browser_uuid), browser_uuid class EnrichedGradioUI(GradioUI): def interact_with_agent( self, task_input, stored_messages, session_state, session_uuid, consent_storage, request: gr.Request, ): interaction_id = generate_interaction_id(session_uuid) desktop = get_or_create_sandbox(session_uuid) data_dir = os.path.join(TMP_DIR, interaction_id) if not os.path.exists(data_dir): os.makedirs(data_dir) # Create fresh agent. # Note: We do NOT store the full agent in session_state passed between Gradio events # if possible, or if we do, we ensure this function isn't wrapped in @spaces.GPU agent = create_agent(data_dir=data_dir, desktop=desktop) session_state["agent"] = agent # Storing in state is fine if this function runs on CPU try: stored_messages.append(gr.ChatMessage(role="user", content=task_input)) yield stored_messages screenshot_bytes = agent.desktop.screenshot(format="bytes") initial_screenshot = Image.open(BytesIO(screenshot_bytes)) for msg in stream_to_gradio( agent, task=task_input, task_images=[initial_screenshot], reset_agent_memory=False, ): if ( hasattr(agent, "last_marked_screenshot") and msg.content == "-----" ): stored_messages.append( gr.ChatMessage( role="assistant", content={ "path": agent.last_marked_screenshot.to_string(), "mime_type": "image/png", }, ) ) stored_messages.append(msg) yield stored_messages if consent_storage: summary = get_agent_summary_erase_images(agent) save_final_status(data_dir, "completed", summary=summary) yield stored_messages except Exception as e: error_message = f"Error in interaction: {str(e)}" print(error_message) stored_messages.append( gr.ChatMessage(role="assistant", content="Run failed:\n" + error_message) ) if consent_storage: save_final_status(data_dir, "failed", summary=None, error_message=error_message) yield stored_messages # ----------------------------------------------------------------------------- # MAIN APP CONSTRUCTION # ----------------------------------------------------------------------------- theme = gr.themes.Default( font=["Oxanium", "sans-serif"], primary_hue="amber", secondary_hue="blue" ) with gr.Blocks(theme=theme, css=custom_css, js=custom_js) as demo: session_uuid_state = gr.State(None) with gr.Row(): sandbox_html = gr.HTML( value=sandbox_html_template.format( stream_url="", status_class="status-interactive", status_text="Interactive", ), label="Output", ) with gr.Sidebar(position="left"): with Modal(visible=True) as modal: gr.Markdown("""### Welcome to Fara CUA Demo 🖥️ This agent uses **microsoft/Fara-7B** (running locally via ZeroGPU) and **smolagents** to control a remote computer. 👉 Type a task, click 'Let's go!', and watch the agent work. """) task_input = gr.Textbox( value="Find me pictures of cute puppies", label="Enter your task below:", elem_classes="primary-color-label", ) run_btn = gr.Button("Let's go!", variant="primary") # Simple controls stop_btn = gr.Button("Stop the agent!", variant="secondary") consent_storage = gr.Checkbox(label="Store logs locally?", value=True) gr.Examples( examples=[ "Use Google Maps to find the Hugging Face HQ in Paris", "Go to Wikipedia and find what happened on April 4th", "Find out the travel time by train from Bern to Basel on Google Maps", ], inputs=task_input, ) session_state = gr.State({}) stored_messages = gr.State([]) chatbot_display = gr.Chatbot( elem_id="chatbot", label="Agent's execution logs", type="messages", avatar_images=(None, "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png"), resizable=True, ) # Dummy agent init for UI wrapper (actual agent created in interaction loop) # We pass a dummy CodeAgent just to initialize the UI class agent_ui = EnrichedGradioUI(CodeAgent(tools=[], model=Model(), name="init")) is_interactive = gr.Checkbox(value=True, visible=False) def clear_and_set_view_only(task_input, session_uuid): return update_html(False, session_uuid) def set_interactive(session_uuid): return update_html(True, session_uuid) def interrupt_agent(session_state): if "agent" in session_state and hasattr(session_state["agent"], "interrupt_switch") and not session_state["agent"].interrupt_switch: session_state["agent"].interrupt() return "Stopped" return "Stop" # Event Wiring run_event = ( run_btn.click( fn=clear_and_set_view_only, inputs=[task_input, session_uuid_state], outputs=[sandbox_html], ) .then( agent_ui.interact_with_agent, inputs=[ task_input, stored_messages, session_state, session_uuid_state, consent_storage, ], outputs=[chatbot_display], ) .then(fn=set_interactive, inputs=[session_uuid_state], outputs=[sandbox_html]) ) stop_btn.click(fn=interrupt_agent, inputs=[session_state], outputs=[]) # Initialization on load demo.load( fn=lambda: True, outputs=[is_interactive], ).then( fn=initialize_session, js="() => localStorage.getItem('gradio-session-uuid') || (() => { const id = self.crypto.randomUUID(); localStorage.setItem('gradio-session-uuid', id); return id })()", inputs=[is_interactive], outputs=[sandbox_html, session_uuid_state], ) if __name__ == "__main__": Timer(60, cleanup_sandboxes).start() demo.launch()