Spaces:
Running
on
Zero
Running
on
Zero
| import os | |
| import re | |
| import json | |
| import numpy as np | |
| import torch | |
| import spaces | |
| import gradio as gr | |
| from PIL import Image, ImageDraw, ImageFont | |
| from typing import Tuple, Optional, List, Dict, Any | |
| # Transformers & Qwen Utils | |
| from transformers import ( | |
| Qwen2_5_VLForConditionalGeneration, | |
| AutoProcessor, | |
| ) | |
| from qwen_vl_utils import process_vision_info | |
| # ----------------------------------------------------------------------------- | |
| # 1. PROMPTS (from prompt.py) | |
| # ----------------------------------------------------------------------------- | |
| OS_ACTIONS = """ | |
| def final_answer(answer: any) -> any: | |
| \"\"\" | |
| Provides a final answer to the given problem. | |
| Args: | |
| answer: The final answer to the problem | |
| \"\"\" | |
| def move_mouse(self, x: float, y: float) -> str: | |
| \"\"\" | |
| Moves the mouse cursor to the specified coordinates | |
| Args: | |
| x: The x coordinate (horizontal position) | |
| y: The y coordinate (vertical position) | |
| \"\"\" | |
| def click(x: Optional[float] = None, y: Optional[float] = None) -> str: | |
| \"\"\" | |
| Performs a left-click at the specified normalized coordinates | |
| Args: | |
| x: The x coordinate (horizontal position) | |
| y: The y coordinate (vertical position) | |
| \"\"\" | |
| def double_click(x: Optional[float] = None, y: Optional[float] = None) -> str: | |
| \"\"\" | |
| Performs a double-click at the specified normalized coordinates | |
| Args: | |
| x: The x coordinate (horizontal position) | |
| y: The y coordinate (vertical position) | |
| \"\"\" | |
| def type(text: str) -> str: | |
| \"\"\" | |
| Types the specified text at the current cursor position. | |
| Args: | |
| text: The text to type | |
| \"\"\" | |
| def press(keys: str | list[str]) -> str: | |
| \"\"\" | |
| Presses a keyboard key | |
| Args: | |
| keys: The key or list of keys to press (e.g. "enter", "space", "backspace", "ctrl", etc.). | |
| \"\"\" | |
| def navigate_back() -> str: | |
| \"\"\" | |
| Goes back to the previous page in the browser. If using this tool doesn't work, just click the button directly. | |
| \"\"\" | |
| def drag(from_coord: list[float], to_coord: list[float]) -> str: | |
| \"\"\" | |
| Clicks [x1, y1], drags mouse to [x2, y2], then release click. | |
| Args: | |
| x1: origin x coordinate | |
| y1: origin y coordinate | |
| x2: end x coordinate | |
| y2: end y coordinate | |
| \"\"\" | |
| def scroll(direction: Literal["up", "down"] = "down", amount: int = 1) -> str: | |
| \"\"\" | |
| Moves the mouse to selected coordinates, then uses the scroll button: this could scroll the page or zoom, depending on the app. DO NOT use scroll to move through linux desktop menus. | |
| Args: | |
| x: The x coordinate (horizontal position) of the element to scroll/zoom, defaults to None to not focus on specific coordinates | |
| y: The y coordinate (vertical position) of the element to scroll/zoom, defaults to None to not focus on specific coordinates | |
| direction: The direction to scroll ("up" or "down"), defaults to "down". For zoom, "up" zooms in, "down" zooms out. | |
| amount: The amount to scroll. A good amount is 1 or 2. | |
| \"\"\" | |
| def wait(seconds: float) -> str: | |
| \"\"\" | |
| Waits for the specified number of seconds. Very useful in case the prior order is still executing (for example starting very heavy applications like browsers or office apps) | |
| Args: | |
| seconds: Number of seconds to wait, generally 2 is enough. | |
| \"\"\" | |
| """ | |
| OS_SYSTEM_PROMPT = f"""You are a helpful GUI agent. You’ll be given a task and a screenshot of the screen. Complete the task using Python function calls. | |
| For each step: | |
| • First, <think></think> to express the thought process guiding your next action and the reasoning behind it. | |
| • Then, use <code></code> to perform the action. it will be executed in a stateful environment. | |
| The following functions are exposed to the Python interpreter: | |
| <code> | |
| {OS_ACTIONS} | |
| </code> | |
| The state persists between code executions: so if in one step you've created variables or imported modules, these will all persist. | |
| """ | |
| # ----------------------------------------------------------------------------- | |
| # 2. MODEL WRAPPER (Modified for Fara/QwenVL) | |
| # ----------------------------------------------------------------------------- | |
| class TransformersModel: | |
| def __init__(self, model_id: str, to_device: str = "cuda"): | |
| print(f"Loading model: {model_id}...") | |
| self.model_id = model_id | |
| # Load Processor | |
| try: | |
| self.processor = AutoProcessor.from_pretrained(model_id, trust_remote_code=True) | |
| except Exception as e: | |
| print(f"Error loading processor: {e}") | |
| raise e | |
| # Load Model | |
| try: | |
| self.model = Qwen2_5_VLForConditionalGeneration.from_pretrained( | |
| model_id, | |
| trust_remote_code=True, | |
| torch_dtype=torch.bfloat16, | |
| device_map="auto" if to_device == "cuda" else None, | |
| ) | |
| if to_device == "cpu": | |
| self.model.to("cpu") | |
| print("Model loaded successfully.") | |
| except Exception as e: | |
| print(f"Error loading Fara/Qwen model: {e}. Ensure you have access/internet.") | |
| raise e | |
| def generate(self, messages: list[dict], **kwargs): | |
| # 1. Prepare text prompt using chat template | |
| text = self.processor.apply_chat_template( | |
| messages, tokenize=False, add_generation_prompt=True | |
| ) | |
| # 2. Process images/videos | |
| image_inputs, video_inputs = process_vision_info(messages) | |
| # 3. Create model inputs | |
| inputs = self.processor( | |
| text=[text], | |
| images=image_inputs, | |
| videos=video_inputs, | |
| padding=True, | |
| return_tensors="pt", | |
| ) | |
| inputs = inputs.to(self.model.device) | |
| # 4. Generate | |
| generated_ids = self.model.generate(**inputs, **kwargs) | |
| # 5. Decode (trimming input tokens) | |
| generated_ids_trimmed = [ | |
| out_ids[len(in_ids) :] for in_ids, out_ids in zip(inputs.input_ids, generated_ids) | |
| ] | |
| output_text = self.processor.batch_decode( | |
| generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False | |
| )[0] | |
| return output_text | |
| # ----------------------------------------------------------------------------- | |
| # 3. HELPER FUNCTIONS | |
| # ----------------------------------------------------------------------------- | |
| def array_to_image(image_array: np.ndarray) -> Image.Image: | |
| if image_array is None: | |
| raise ValueError("No image provided. Please upload an image before submitting.") | |
| return Image.fromarray(np.uint8(image_array)) | |
| def get_navigation_prompt(task, image): | |
| """Constructs the prompt messages for the model""" | |
| return [ | |
| { | |
| "role": "system", | |
| "content": [{"type": "text", "text": OS_SYSTEM_PROMPT}], | |
| }, | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "image", "image": image}, | |
| {"type": "text", "text": f"Instruction: {task}\n\nPrevious actions:\nNone"}, | |
| ], | |
| }, | |
| ] | |
| def parse_actions_from_response(response: str) -> list[str]: | |
| """Parse actions from model response using regex pattern.""" | |
| # Look for code block | |
| pattern = r"<code>\s*(.*?)\s*</code>" | |
| matches = re.findall(pattern, response, re.DOTALL) | |
| # If no code block, try to find raw function calls if the model forgot tags | |
| if not matches: | |
| # Fallback: look for lines starting with known functions | |
| funcs = ["click", "type", "press", "drag", "scroll", "wait"] | |
| lines = response.split('\n') | |
| found = [] | |
| for line in lines: | |
| line = line.strip() | |
| if any(line.startswith(f) for f in funcs): | |
| found.append(line) | |
| return found | |
| return matches | |
| def extract_coordinates_from_action(action_code: str) -> list[dict]: | |
| """Extract coordinates from action code for localization actions.""" | |
| localization_actions = [] | |
| # Patterns for different action types | |
| patterns = { | |
| 'click': r'click\((?:x=)?([0-9.]+)(?:,\s*(?:y=)?([0-9.]+))?\)', | |
| 'double_click': r'double_click\((?:x=)?([0-9.]+)(?:,\s*(?:y=)?([0-9.]+))?\)', | |
| 'move_mouse': r'move_mouse\((?:self,\s*)?(?:x=)?([0-9.]+)(?:,\s*(?:y=)?([0-9.]+))\)', | |
| 'drag': r'drag\(\[([0-9.]+),\s*([0-9.]+)\],\s*\[([0-9.]+),\s*([0-9.]+)\]\)' | |
| } | |
| for action_type, pattern in patterns.items(): | |
| matches = re.finditer(pattern, action_code) | |
| for match in matches: | |
| if action_type == 'drag': | |
| # Drag has from and to coordinates | |
| from_x, from_y, to_x, to_y = match.groups() | |
| localization_actions.append({ | |
| 'type': 'drag_from', 'x': float(from_x), 'y': float(from_y), 'action': action_type | |
| }) | |
| localization_actions.append({ | |
| 'type': 'drag_to', 'x': float(to_x), 'y': float(to_y), 'action': action_type | |
| }) | |
| else: | |
| # Single coordinate actions | |
| if match.groups()[0]: | |
| x_val = match.group(1) | |
| y_val = match.group(2) if match.group(2) else x_val | |
| # Convert pixel coords to normalized if they look like pixels (assuming > 1000 width usually) | |
| # Note: The prompt implies normalized (0.0-1.0), but if model outputs 500, we handle it visually later | |
| if x_val and y_val: | |
| localization_actions.append({ | |
| 'type': action_type, | |
| 'x': float(x_val), | |
| 'y': float(y_val), | |
| 'action': action_type | |
| }) | |
| return localization_actions | |
| def create_localized_image(original_image: Image.Image, coordinates: list[dict]) -> Optional[Image.Image]: | |
| """Create an image with localization markers drawn on it.""" | |
| if not coordinates: | |
| return None | |
| img_copy = original_image.copy() | |
| draw = ImageDraw.Draw(img_copy) | |
| width, height = img_copy.size | |
| try: | |
| font = ImageFont.load_default() | |
| except: | |
| font = None | |
| colors = { | |
| 'click': 'red', 'double_click': 'blue', 'move_mouse': 'green', | |
| 'drag_from': 'orange', 'drag_to': 'purple' | |
| } | |
| for i, coord in enumerate(coordinates): | |
| # Handle normalized vs pixel coordinates | |
| x, y = coord['x'], coord['y'] | |
| if x <= 1.0 and y <= 1.0: | |
| pixel_x = int(x * width) | |
| pixel_y = int(y * height) | |
| else: | |
| pixel_x = int(x) | |
| pixel_y = int(y) | |
| color = colors.get(coord['type'], 'red') | |
| # Draw Circle | |
| r = 8 | |
| draw.ellipse([pixel_x - r, pixel_y - r, pixel_x + r, pixel_y + r], | |
| fill=color, outline='white', width=2) | |
| # Draw Label | |
| label = f"{coord['type']}" | |
| text_pos = (pixel_x + 10, pixel_y - 10) | |
| if font: | |
| draw.text(text_pos, label, fill=color, font=font) | |
| else: | |
| draw.text(text_pos, label, fill=color) | |
| # Draw Arrow for Drag | |
| if coord['type'] == 'drag_from' and i + 1 < len(coordinates) and coordinates[i + 1]['type'] == 'drag_to': | |
| next_coord = coordinates[i + 1] | |
| nx, ny = next_coord['x'], next_coord['y'] | |
| if nx <= 1.0 and ny <= 1.0: | |
| end_x, end_y = int(nx * width), int(ny * height) | |
| else: | |
| end_x, end_y = int(nx), int(ny) | |
| draw.line([pixel_x, pixel_y, end_x, end_y], fill='orange', width=3) | |
| return img_copy | |
| # ----------------------------------------------------------------------------- | |
| # 4. INITIALIZATION | |
| # ----------------------------------------------------------------------------- | |
| # Using Fara-7B (or fallback) | |
| MODEL_ID = "microsoft/Fara-7B" | |
| print(f"Initializing {MODEL_ID}...") | |
| # Global model instance | |
| # Note: We initialize this lazily or globally depending on environment. | |
| # For Gradio Spaces, global init is standard. | |
| try: | |
| model = TransformersModel(model_id=MODEL_ID, to_device="cuda" if torch.cuda.is_available() else "cpu") | |
| except Exception as e: | |
| print(f"Failed to load Fara. Trying fallback Qwen...") | |
| model = TransformersModel(model_id="Qwen/Qwen2.5-VL-7B-Instruct", to_device="cuda" if torch.cuda.is_available() else "cpu") | |
| # ----------------------------------------------------------------------------- | |
| # 5. GRADIO APP | |
| # ----------------------------------------------------------------------------- | |
| def navigate(input_numpy_image: np.ndarray, task: str) -> Tuple[str, Optional[Image.Image]]: | |
| if input_numpy_image is None: | |
| return "Please upload an image.", None | |
| input_pil_image = array_to_image(input_numpy_image) | |
| # Generate Prompt | |
| prompt_msgs = get_navigation_prompt(task, input_pil_image) | |
| # Generate Response | |
| print("Generating response...") | |
| response_str = model.generate(prompt_msgs, max_new_tokens=500) | |
| print(f"Model Response: {response_str}") | |
| # Parse | |
| actions = parse_actions_from_response(response_str) | |
| # Extract Coordinates | |
| all_coordinates = [] | |
| for action_code in actions: | |
| coords = extract_coordinates_from_action(action_code) | |
| all_coordinates.extend(coords) | |
| # Visualize | |
| localized_image = input_pil_image | |
| if all_coordinates: | |
| localized_image = create_localized_image(input_pil_image, all_coordinates) | |
| return response_str, localized_image | |
| title = "Fara-7B GUI Operator 🤖" | |
| description = """ | |
| ### Fara GUI Agent Demo | |
| Upload a screenshot and give an instruction. The model will analyze the UI and output the Python code to execute the action. | |
| This demo visualizes where the model wants to click or drag. | |
| """ | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(f"<h1 style='text-align: center;'>{title}</h1>") | |
| gr.Markdown(description) | |
| with gr.Row(): | |
| input_image = gr.Image(label="Upload Screenshot", height=500, type="numpy") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| task_input = gr.Textbox( | |
| label="Instruction", | |
| placeholder="e.g. Click on the Search button...", | |
| lines=2 | |
| ) | |
| submit_btn = gr.Button("Generate Action", variant="primary") | |
| with gr.Column(scale=1): | |
| output_code = gr.Textbox(label="Generated Python Code", lines=10) | |
| # Output image gets updated with markers | |
| submit_btn.click( | |
| fn=navigate, | |
| inputs=[input_image, task_input], | |
| outputs=[output_code, input_image] | |
| ) | |
| # Optional: Examples | |
| # gr.Examples(...) | |
| if __name__ == "__main__": | |
| demo.launch() |