Spaces:
Build error
Build error
| """ | |
| Gemini 3 Computer Use API Client | |
| A comprehensive Python client for interacting with the Gemini 3 Computer Use model API | |
| which excels at browser and mobile task automation. | |
| """ | |
| import asyncio | |
| import base64 | |
| import json | |
| import logging | |
| import time | |
| from datetime import datetime | |
| from typing import Dict, List, Optional, Union, Any | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| import requests | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image | |
| import pyautogui | |
| from selenium import webdriver | |
| from selenium.webdriver.chrome.service import Service as ChromeService | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.common.keys import Keys | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| from selenium.webdriver.support import expected_conditions as EC | |
| from selenium.common.exceptions import TimeoutException, NoSuchElementException | |
| from webdriver_manager.chrome import ChromeDriverManager | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| class ComputerAction: | |
| """Represents a computer action to be executed""" | |
| action_type: str # 'click', 'type', 'scroll', 'drag', 'key_press', etc. | |
| target: Optional[str] = None # Element selector, coordinates, or other identifier | |
| value: Optional[str] = None # Value to input or additional parameters | |
| coordinates: Optional[tuple] = None # (x, y) coordinates if applicable | |
| duration: Optional[float] = None # Duration for complex actions | |
| wait_time: Optional[float] = 1.0 # Wait time after action | |
| class BrowserSession: | |
| """Manages browser session state""" | |
| driver: webdriver.Chrome | |
| current_url: str | |
| screenshot_path: Optional[str] = None | |
| last_action_time: float = 0.0 | |
| class Gemini3ComputerUseAPI: | |
| """ | |
| Main client for Gemini 3 Computer Use model API | |
| This client provides advanced browser and mobile automation capabilities | |
| powered by the Gemini 3 Computer Use model which excels at computer vision | |
| and multi-modal task execution. | |
| """ | |
| def __init__(self, api_key: str, base_url: str = "https://api.gemini3.com/v1"): | |
| """ | |
| Initialize the Gemini 3 Computer Use API client | |
| Args: | |
| api_key: Your API key for Gemini 3 | |
| base_url: Base URL for the API endpoint | |
| """ | |
| self.api_key = api_key | |
| self.base_url = base_url.rstrip('/') | |
| self.session = requests.Session() | |
| self.headers = { | |
| 'Authorization': f'Bearer {api_key}', | |
| 'Content-Type': 'application/json', | |
| 'User-Agent': 'Gemini3-ComputerUse-Client/1.0' | |
| } | |
| # Browser automation state | |
| self.browser_session: Optional[BrowserSession] = None | |
| self.screenshot_dir = Path("screenshots") | |
| self.screenshot_dir.mkdir(exist_ok=True) | |
| # Mobile automation state | |
| self.mobile_session = None | |
| logger.info("Gemini 3 Computer Use API Client initialized") | |
| def _make_request(self, endpoint: str, method: str = "POST", data: Dict = None, files: Dict = None) -> Dict: | |
| """Make authenticated request to the API""" | |
| url = f"{self.base_url}/{endpoint.lstrip('/')}" | |
| try: | |
| if method.upper() == "GET": | |
| response = self.session.get(url, headers=self.headers, params=data) | |
| elif method.upper() == "POST": | |
| if files: | |
| # Remove Content-Type header for file uploads | |
| headers = {k: v for k, v in self.headers.items() if k != 'Content-Type'} | |
| response = self.session.post(url, headers=headers, data=data, files=files) | |
| else: | |
| response = self.session.post(url, headers=self.headers, json=data) | |
| else: | |
| response = self.session.request(method, url, headers=self.headers, json=data) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"API request failed: {e}") | |
| raise Exception(f"API request failed: {e}") | |
| def analyze_screen(self, image_path: Optional[str] = None, screenshot: Optional[np.ndarray] = None) -> Dict: | |
| """ | |
| Analyze the current screen state using computer vision | |
| Args: | |
| image_path: Path to image file to analyze | |
| screenshot: Numpy array of screenshot image | |
| Returns: | |
| Dictionary containing analysis results | |
| """ | |
| if screenshot is not None: | |
| # Convert screenshot to base64 | |
| _, buffer = cv2.imencode('.png', screenshot) | |
| image_b64 = base64.b64encode(buffer).decode() | |
| elif image_path: | |
| with open(image_path, 'rb') as f: | |
| image_b64 = base64.b64encode(f.read()).decode() | |
| else: | |
| # Take a screenshot of current screen | |
| screenshot = pyautogui.screenshot() | |
| buffer = np.array(screenshot) | |
| _, img_buffer = cv2.imencode('.png', cv2.cvtColor(buffer, cv2.COLOR_RGB2BGR)) | |
| image_b64 = base64.b64encode(img_buffer).decode() | |
| data = { | |
| "model": "gemini-3-computer-use", | |
| "image": f"data:image/png;base64,{image_b64}", | |
| "task": "analyze_screen" | |
| } | |
| result = self._make_request("analyze", data=data) | |
| logger.info("Screen analysis completed") | |
| return result | |
| def execute_browser_action(self, action: ComputerAction) -> Dict: | |
| """ | |
| Execute a browser action using Gemini 3 computer use capabilities | |
| Args: | |
| action: ComputerAction object describing the action to execute | |
| Returns: | |
| Dictionary containing execution results | |
| """ | |
| if not self.browser_session: | |
| raise Exception("No active browser session. Start a browser session first.") | |
| # Take screenshot before action | |
| screenshot = self.browser_session.driver.get_screenshot_as_png() | |
| screenshot_np = np.frombuffer(screenshot, dtype=np.uint8) | |
| screenshot_np = cv2.imdecode(screenshot_np, cv2.IMREAD_COLOR) | |
| data = { | |
| "model": "gemini-3-computer-use", | |
| "image": f"data:image/png;base64,{base64.b64encode(screenshot).decode()}", | |
| "task": "execute_action", | |
| "action": { | |
| "type": action.action_type, | |
| "target": action.target, | |
| "value": action.value, | |
| "coordinates": action.coordinates, | |
| "duration": action.duration | |
| }, | |
| "context": { | |
| "current_url": self.browser_session.current_url, | |
| "browser_session_id": id(self.browser_session.driver) | |
| } | |
| } | |
| result = self._make_request("execute", data=data) | |
| # Execute the action in the actual browser | |
| self._execute_action_locally(action) | |
| # Update browser session | |
| self.browser_session.last_action_time = time.time() | |
| logger.info(f"Browser action '{action.action_type}' executed") | |
| return result | |
| def _execute_action_locally(self, action: ComputerAction) -> None: | |
| """Execute action in the local browser session""" | |
| driver = self.browser_session.driver | |
| try: | |
| if action.action_type == "click": | |
| if action.coordinates: | |
| webdriver.common.action_chains.ActionChains(driver).move_by_offset( | |
| action.coordinates[0], action.coordinates[1] | |
| ).click().perform() | |
| elif action.target: | |
| element = WebDriverWait(driver, 10).until( | |
| EC.element_to_be_clickable((By.CSS_SELECTOR, action.target)) | |
| ) | |
| element.click() | |
| elif action.action_type == "type": | |
| if action.target: | |
| element = WebDriverWait(driver, 10).until( | |
| EC.presence_of_element_located((By.CSS_SELECTOR, action.target)) | |
| ) | |
| element.clear() | |
| element.send_keys(action.value) | |
| else: | |
| pyautogui.write(action.value) | |
| elif action.action_type == "scroll": | |
| if action.coordinates: | |
| driver.execute_script(f"window.scrollTo({action.coordinates[0]}, {action.coordinates[1]});") | |
| else: | |
| driver.execute_script("window.scrollBy(0, 500);") | |
| elif action.action_type == "key_press": | |
| if action.value == "Enter": | |
| pyautogui.press('enter') | |
| elif action.value == "Escape": | |
| pyautogui.press('esc') | |
| elif action.value == "Tab": | |
| pyautogui.press('tab') | |
| else: | |
| pyautogui.press(action.value) | |
| # Wait after action if specified | |
| if action.wait_time: | |
| time.sleep(action.wait_time) | |
| except Exception as e: | |
| logger.error(f"Failed to execute action locally: {e}") | |
| raise | |
| def start_browser_session(self, url: str = "https://www.google.com", headless: bool = False) -> BrowserSession: | |
| """ | |
| Start a new browser automation session | |
| Args: | |
| url: URL to navigate to | |
| headless: Whether to run browser in headless mode | |
| Returns: | |
| BrowserSession object | |
| """ | |
| options = webdriver.ChromeOptions() | |
| if headless: | |
| options.add_argument("--headless") | |
| options.add_argument("--no-sandbox") | |
| options.add_argument("--disable-dev-shm-usage") | |
| options.add_argument("--disable-gpu") | |
| options.add_argument("--window-size=1920,1080") | |
| try: | |
| driver = webdriver.Chrome( | |
| service=ChromeService(ChromeDriverManager().install()), | |
| options=options | |
| ) | |
| driver.get(url) | |
| driver.maximize_window() | |
| self.browser_session = BrowserSession( | |
| driver=driver, | |
| current_url=url | |
| ) | |
| logger.info(f"Browser session started at {url}") | |
| return self.browser_session | |
| except Exception as e: | |
| logger.error(f"Failed to start browser session: {e}") | |
| raise | |
| def close_browser_session(self) -> None: | |
| """Close the current browser session""" | |
| if self.browser_session: | |
| self.browser_session.driver.quit() | |
| self.browser_session = None | |
| logger.info("Browser session closed") | |
| def execute_mobile_action(self, action: ComputerAction, device_config: Dict = None) -> Dict: | |
| """ | |
| Execute a mobile action using Gemini 3 computer use capabilities | |
| Args: | |
| action: ComputerAction object describing the action to execute | |
| device_config: Device configuration for mobile automation | |
| Returns: | |
| Dictionary containing execution results | |
| """ | |
| # This would require Appium setup for actual mobile automation | |
| data = { | |
| "model": "gemini-3-computer-use", | |
| "task": "mobile_action", | |
| "action": { | |
| "type": action.action_type, | |
| "target": action.target, | |
| "value": action.value, | |
| "coordinates": action.coordinates, | |
| "duration": action.duration | |
| }, | |
| "device_config": device_config or {} | |
| } | |
| result = self._make_request("mobile-execute", data=data) | |
| logger.info(f"Mobile action '{action.action_type}' executed") | |
| return result | |
| def automate_browser_task(self, task_description: str, max_steps: int = 10) -> Dict: | |
| """ | |
| Automate a complex browser task using Gemini 3 computer use model | |
| Args: | |
| task_description: Natural language description of the task | |
| max_steps: Maximum number of action steps | |
| Returns: | |
| Dictionary containing task execution results | |
| """ | |
| if not self.browser_session: | |
| raise Exception("No active browser session. Start a browser session first.") | |
| # Take initial screenshot | |
| screenshot = self.browser_session.driver.get_screenshot_as_png() | |
| data = { | |
| "model": "gemini-3-computer-use", | |
| "image": f"data:image/png;base64,{base64.b64encode(screenshot).decode()}", | |
| "task": "automate_browser_task", | |
| "task_description": task_description, | |
| "current_url": self.browser_session.current_url, | |
| "max_steps": max_steps | |
| } | |
| result = self._make_request("automate", data=data) | |
| # Execute the planned actions | |
| if 'actions' in result: | |
| for action_data in result['actions']: | |
| action = ComputerAction( | |
| action_type=action_data['type'], | |
| target=action_data.get('target'), | |
| value=action_data.get('value'), | |
| coordinates=action_data.get('coordinates'), | |
| duration=action_data.get('duration'), | |
| wait_time=action_data.get('wait_time', 1.0) | |
| ) | |
| self.execute_browser_action(action) | |
| logger.info(f"Browser task '{task_description}' completed") | |
| return result | |
| def get_system_info(self) -> Dict: | |
| """Get system information about the computer use environment""" | |
| data = { | |
| "task": "get_system_info" | |
| } | |
| return self._make_request("system-info", data=data) | |
| def take_screenshot(self, filename: Optional[str] = None) -> str: | |
| """Take a screenshot and save it""" | |
| if not self.browser_session: | |
| # Take desktop screenshot | |
| screenshot = pyautogui.screenshot() | |
| else: | |
| # Take browser screenshot | |
| screenshot = self.browser_session.driver.get_screenshot_as_png() | |
| if not filename: | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"screenshot_{timestamp}.png" | |
| filepath = self.screenshot_dir / filename | |
| if isinstance(screenshot, bytes): | |
| with open(filepath, 'wb') as f: | |
| f.write(screenshot) | |
| else: | |
| screenshot.save(filepath) | |
| logger.info(f"Screenshot saved: {filepath}") | |
| return str(filepath) | |
| def __enter__(self): | |
| """Context manager entry""" | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| """Context manager exit""" | |
| self.close_browser_session() | |
| # Example usage and test functions | |
| if __name__ == "__main__": | |
| # Example usage | |
| api = Gemini3ComputerUseAPI(api_key="your-api-key-here") | |
| try: | |
| # Start browser session | |
| with api.start_browser_session("https://example.com") as session: | |
| # Analyze screen | |
| analysis = api.analyze_screen() | |
| print(f"Screen analysis: {analysis}") | |
| # Execute a simple action | |
| action = ComputerAction( | |
| action_type="click", | |
| target="body", | |
| wait_time=1.0 | |
| ) | |
| result = api.execute_browser_action(action) | |
| print(f"Action result: {result}") | |
| # Automate a complex task | |
| task_result = api.automate_browser_task("Navigate to the search box and search for 'Python programming'", max_steps=5) | |
| print(f"Task result: {task_result}") | |
| except Exception as e: | |
| logger.error(f"Example failed: {e}") |