import os import re import json import numpy as np import torch import spaces import gradio as gr from PIL import Image, ImageDraw, ImageFont from typing import Tuple, Optional, List, Dict, Any # Transformers & Qwen Utils from transformers import ( Qwen2_5_VLForConditionalGeneration, AutoProcessor, ) from qwen_vl_utils import process_vision_info # ----------------------------------------------------------------------------- # 1. PROMPTS (from prompt.py) # ----------------------------------------------------------------------------- OS_ACTIONS = """ def final_answer(answer: any) -> any: \"\"\" Provides a final answer to the given problem. Args: answer: The final answer to the problem \"\"\" def move_mouse(self, x: float, y: float) -> str: \"\"\" Moves the mouse cursor to the specified coordinates Args: x: The x coordinate (horizontal position) y: The y coordinate (vertical position) \"\"\" def click(x: Optional[float] = None, y: Optional[float] = None) -> str: \"\"\" Performs a left-click at the specified normalized coordinates Args: x: The x coordinate (horizontal position) y: The y coordinate (vertical position) \"\"\" def double_click(x: Optional[float] = None, y: Optional[float] = None) -> str: \"\"\" Performs a double-click at the specified normalized coordinates Args: x: The x coordinate (horizontal position) y: The y coordinate (vertical position) \"\"\" def type(text: str) -> str: \"\"\" Types the specified text at the current cursor position. Args: text: The text to type \"\"\" def press(keys: str | list[str]) -> str: \"\"\" Presses a keyboard key Args: keys: The key or list of keys to press (e.g. "enter", "space", "backspace", "ctrl", etc.). \"\"\" def navigate_back() -> str: \"\"\" Goes back to the previous page in the browser. If using this tool doesn't work, just click the button directly. \"\"\" def drag(from_coord: list[float], to_coord: list[float]) -> str: \"\"\" Clicks [x1, y1], drags mouse to [x2, y2], then release click. Args: x1: origin x coordinate y1: origin y coordinate x2: end x coordinate y2: end y coordinate \"\"\" def scroll(direction: Literal["up", "down"] = "down", amount: int = 1) -> str: \"\"\" Moves the mouse to selected coordinates, then uses the scroll button: this could scroll the page or zoom, depending on the app. DO NOT use scroll to move through linux desktop menus. Args: x: The x coordinate (horizontal position) of the element to scroll/zoom, defaults to None to not focus on specific coordinates y: The y coordinate (vertical position) of the element to scroll/zoom, defaults to None to not focus on specific coordinates direction: The direction to scroll ("up" or "down"), defaults to "down". For zoom, "up" zooms in, "down" zooms out. amount: The amount to scroll. A good amount is 1 or 2. \"\"\" def wait(seconds: float) -> str: \"\"\" Waits for the specified number of seconds. Very useful in case the prior order is still executing (for example starting very heavy applications like browsers or office apps) Args: seconds: Number of seconds to wait, generally 2 is enough. \"\"\" """ OS_SYSTEM_PROMPT = f"""You are a helpful GUI agent. You’ll be given a task and a screenshot of the screen. Complete the task using Python function calls. For each step: • First, to express the thought process guiding your next action and the reasoning behind it. • Then, use to perform the action. it will be executed in a stateful environment. The following functions are exposed to the Python interpreter: {OS_ACTIONS} The state persists between code executions: so if in one step you've created variables or imported modules, these will all persist. """ # ----------------------------------------------------------------------------- # 2. MODEL WRAPPER (Modified for Fara/QwenVL) # ----------------------------------------------------------------------------- class TransformersModel: def __init__(self, model_id: str, to_device: str = "cuda"): print(f"Loading model: {model_id}...") self.model_id = model_id # Load Processor try: self.processor = AutoProcessor.from_pretrained(model_id, trust_remote_code=True) except Exception as e: print(f"Error loading processor: {e}") raise e # Load Model try: self.model = Qwen2_5_VLForConditionalGeneration.from_pretrained( model_id, trust_remote_code=True, torch_dtype=torch.bfloat16, device_map="auto" if to_device == "cuda" else None, ) if to_device == "cpu": self.model.to("cpu") print("Model loaded successfully.") except Exception as e: print(f"Error loading Fara/Qwen model: {e}. Ensure you have access/internet.") raise e def generate(self, messages: list[dict], **kwargs): # 1. Prepare text prompt using chat template text = self.processor.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) # 2. Process images/videos image_inputs, video_inputs = process_vision_info(messages) # 3. Create model inputs inputs = self.processor( text=[text], images=image_inputs, videos=video_inputs, padding=True, return_tensors="pt", ) inputs = inputs.to(self.model.device) # 4. Generate generated_ids = self.model.generate(**inputs, **kwargs) # 5. Decode (trimming input tokens) generated_ids_trimmed = [ out_ids[len(in_ids) :] for in_ids, out_ids in zip(inputs.input_ids, generated_ids) ] output_text = self.processor.batch_decode( generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False )[0] return output_text # ----------------------------------------------------------------------------- # 3. HELPER FUNCTIONS # ----------------------------------------------------------------------------- def array_to_image(image_array: np.ndarray) -> Image.Image: if image_array is None: raise ValueError("No image provided. Please upload an image before submitting.") return Image.fromarray(np.uint8(image_array)) def get_navigation_prompt(task, image): """Constructs the prompt messages for the model""" return [ { "role": "system", "content": [{"type": "text", "text": OS_SYSTEM_PROMPT}], }, { "role": "user", "content": [ {"type": "image", "image": image}, {"type": "text", "text": f"Instruction: {task}\n\nPrevious actions:\nNone"}, ], }, ] def parse_actions_from_response(response: str) -> list[str]: """Parse actions from model response using regex pattern.""" # Look for code block pattern = r"\s*(.*?)\s*" matches = re.findall(pattern, response, re.DOTALL) # If no code block, try to find raw function calls if the model forgot tags if not matches: # Fallback: look for lines starting with known functions funcs = ["click", "type", "press", "drag", "scroll", "wait"] lines = response.split('\n') found = [] for line in lines: line = line.strip() if any(line.startswith(f) for f in funcs): found.append(line) return found return matches def extract_coordinates_from_action(action_code: str) -> list[dict]: """Extract coordinates from action code for localization actions.""" localization_actions = [] # Patterns for different action types patterns = { 'click': r'click\((?:x=)?([0-9.]+)(?:,\s*(?:y=)?([0-9.]+))?\)', 'double_click': r'double_click\((?:x=)?([0-9.]+)(?:,\s*(?:y=)?([0-9.]+))?\)', 'move_mouse': r'move_mouse\((?:self,\s*)?(?:x=)?([0-9.]+)(?:,\s*(?:y=)?([0-9.]+))\)', 'drag': r'drag\(\[([0-9.]+),\s*([0-9.]+)\],\s*\[([0-9.]+),\s*([0-9.]+)\]\)' } for action_type, pattern in patterns.items(): matches = re.finditer(pattern, action_code) for match in matches: if action_type == 'drag': # Drag has from and to coordinates from_x, from_y, to_x, to_y = match.groups() localization_actions.append({ 'type': 'drag_from', 'x': float(from_x), 'y': float(from_y), 'action': action_type }) localization_actions.append({ 'type': 'drag_to', 'x': float(to_x), 'y': float(to_y), 'action': action_type }) else: # Single coordinate actions if match.groups()[0]: x_val = match.group(1) y_val = match.group(2) if match.group(2) else x_val # Convert pixel coords to normalized if they look like pixels (assuming > 1000 width usually) # Note: The prompt implies normalized (0.0-1.0), but if model outputs 500, we handle it visually later if x_val and y_val: localization_actions.append({ 'type': action_type, 'x': float(x_val), 'y': float(y_val), 'action': action_type }) return localization_actions def create_localized_image(original_image: Image.Image, coordinates: list[dict]) -> Optional[Image.Image]: """Create an image with localization markers drawn on it.""" if not coordinates: return None img_copy = original_image.copy() draw = ImageDraw.Draw(img_copy) width, height = img_copy.size try: font = ImageFont.load_default() except: font = None colors = { 'click': 'red', 'double_click': 'blue', 'move_mouse': 'green', 'drag_from': 'orange', 'drag_to': 'purple' } for i, coord in enumerate(coordinates): # Handle normalized vs pixel coordinates x, y = coord['x'], coord['y'] if x <= 1.0 and y <= 1.0: pixel_x = int(x * width) pixel_y = int(y * height) else: pixel_x = int(x) pixel_y = int(y) color = colors.get(coord['type'], 'red') # Draw Circle r = 8 draw.ellipse([pixel_x - r, pixel_y - r, pixel_x + r, pixel_y + r], fill=color, outline='white', width=2) # Draw Label label = f"{coord['type']}" text_pos = (pixel_x + 10, pixel_y - 10) if font: draw.text(text_pos, label, fill=color, font=font) else: draw.text(text_pos, label, fill=color) # Draw Arrow for Drag if coord['type'] == 'drag_from' and i + 1 < len(coordinates) and coordinates[i + 1]['type'] == 'drag_to': next_coord = coordinates[i + 1] nx, ny = next_coord['x'], next_coord['y'] if nx <= 1.0 and ny <= 1.0: end_x, end_y = int(nx * width), int(ny * height) else: end_x, end_y = int(nx), int(ny) draw.line([pixel_x, pixel_y, end_x, end_y], fill='orange', width=3) return img_copy # ----------------------------------------------------------------------------- # 4. INITIALIZATION # ----------------------------------------------------------------------------- # Using Fara-7B (or fallback) MODEL_ID = "microsoft/Fara-7B" print(f"Initializing {MODEL_ID}...") # Global model instance # Note: We initialize this lazily or globally depending on environment. # For Gradio Spaces, global init is standard. try: model = TransformersModel(model_id=MODEL_ID, to_device="cuda" if torch.cuda.is_available() else "cpu") except Exception as e: print(f"Failed to load Fara. Trying fallback Qwen...") model = TransformersModel(model_id="Qwen/Qwen2.5-VL-7B-Instruct", to_device="cuda" if torch.cuda.is_available() else "cpu") # ----------------------------------------------------------------------------- # 5. GRADIO APP # ----------------------------------------------------------------------------- @spaces.GPU def navigate(input_numpy_image: np.ndarray, task: str) -> Tuple[str, Optional[Image.Image]]: if input_numpy_image is None: return "Please upload an image.", None input_pil_image = array_to_image(input_numpy_image) # Generate Prompt prompt_msgs = get_navigation_prompt(task, input_pil_image) # Generate Response print("Generating response...") response_str = model.generate(prompt_msgs, max_new_tokens=500) print(f"Model Response: {response_str}") # Parse actions = parse_actions_from_response(response_str) # Extract Coordinates all_coordinates = [] for action_code in actions: coords = extract_coordinates_from_action(action_code) all_coordinates.extend(coords) # Visualize localized_image = input_pil_image if all_coordinates: localized_image = create_localized_image(input_pil_image, all_coordinates) return response_str, localized_image title = "Fara-7B GUI Operator 🤖" description = """ ### Fara GUI Agent Demo Upload a screenshot and give an instruction. The model will analyze the UI and output the Python code to execute the action. This demo visualizes where the model wants to click or drag. """ with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown(f"

{title}

") gr.Markdown(description) with gr.Row(): input_image = gr.Image(label="Upload Screenshot", height=500, type="numpy") with gr.Row(): with gr.Column(scale=1): task_input = gr.Textbox( label="Instruction", placeholder="e.g. Click on the Search button...", lines=2 ) submit_btn = gr.Button("Generate Action", variant="primary") with gr.Column(scale=1): output_code = gr.Textbox(label="Generated Python Code", lines=10) # Output image gets updated with markers submit_btn.click( fn=navigate, inputs=[input_image, task_input], outputs=[output_code, input_image] ) # Optional: Examples # gr.Examples(...) if __name__ == "__main__": demo.launch()