Spaces:
Runtime error
Runtime error
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| """ | |
| Emulator Pool Manager for parallel training. | |
| This module provides a pool of pre-warmed Android emulators for | |
| high-throughput parallel training on multi-core systems. | |
| """ | |
| import logging | |
| import queue | |
| import threading | |
| import time | |
| from typing import Dict, List, Optional | |
| from .android_environment import AndroidEnvironment | |
| logger = logging.getLogger(__name__) | |
| class EmulatorPool: | |
| """ | |
| Pool of pre-warmed Android emulators for parallel training. | |
| The pool: | |
| 1. Boots N emulators at startup (amortizes 30-60s boot time) | |
| 2. Keeps emulators running across episodes | |
| 3. Resets app state (not full emulator) between episodes | |
| 4. Provides instant environment access via get/put | |
| Optimized for systems with 100+ CPU cores and high memory capacity. | |
| Example: | |
| >>> # Boot 64 emulators once at startup (10 min one-time cost) | |
| >>> pool = EmulatorPool( | |
| ... pool_size=64, | |
| ... task_path="/workspace/tasks/my_task.textproto", | |
| ... avd_name="default_pixel_6" | |
| ... ) | |
| >>> | |
| >>> # Training loop - instant access! | |
| >>> for episode in range(10000): | |
| ... env = pool.get() # <1ms | |
| ... # ... run episode ... | |
| ... pool.put(env) # Returns env to pool (resets app state) | |
| >>> | |
| >>> pool.close() | |
| """ | |
| def __init__( | |
| self, | |
| pool_size: int, | |
| task_path: str, | |
| avd_name: str, | |
| adb_path: str = "~/Android/Sdk/platform-tools/adb", | |
| emulator_path: str = "~/Android/Sdk/emulator/emulator", | |
| android_avd_home: str = "~/.android/avd", | |
| android_sdk_root: str = "~/Android/Sdk", | |
| run_headless: bool = True, | |
| image_format: str = "JPEG", | |
| image_quality: int = 85, | |
| use_shared_memory: bool = False, | |
| ): | |
| """Initialize emulator pool. | |
| Args: | |
| pool_size: Number of emulators to pre-warm. | |
| task_path: Path to task textproto. | |
| avd_name: Name of Android Virtual Device. | |
| adb_path: Path to ADB executable. | |
| emulator_path: Path to emulator executable. | |
| android_avd_home: AVD home directory. | |
| android_sdk_root: SDK root directory. | |
| run_headless: Run emulators headless. | |
| image_format: Image encoding format. | |
| image_quality: JPEG quality (1-100). | |
| use_shared_memory: Use shared memory optimization. | |
| """ | |
| self.pool_size = pool_size | |
| self.task_path = task_path | |
| self.avd_name = avd_name | |
| self.adb_path = adb_path | |
| self.emulator_path = emulator_path | |
| self.android_avd_home = android_avd_home | |
| self.android_sdk_root = android_sdk_root | |
| self.run_headless = run_headless | |
| self.image_format = image_format | |
| self.image_quality = image_quality | |
| self.use_shared_memory = use_shared_memory | |
| # Thread-safe queue for available emulators | |
| self._available: queue.Queue = queue.Queue(maxsize=pool_size) | |
| self._all_emulators: List[AndroidEnvironment] = [] | |
| self._lock = threading.Lock() | |
| self._closed = False | |
| # Boot all emulators | |
| logger.info(f"Booting {pool_size} emulators... (this will take ~{pool_size} minutes)") | |
| self._boot_pool() | |
| logger.info(f"Emulator pool ready with {pool_size} instances!") | |
| def _boot_pool(self): | |
| """Boot all emulators in the pool.""" | |
| start_time = time.time() | |
| for i in range(self.pool_size): | |
| logger.info(f"Booting emulator {i+1}/{self.pool_size}...") | |
| # Create unique shared memory name if using shared memory | |
| shm_name = f"android_pool_{i}" if self.use_shared_memory else None | |
| env = AndroidEnvironment( | |
| task_path=self.task_path, | |
| avd_name=self.avd_name, | |
| adb_path=self.adb_path, | |
| emulator_path=self.emulator_path, | |
| android_avd_home=self.android_avd_home, | |
| android_sdk_root=self.android_sdk_root, | |
| run_headless=self.run_headless, | |
| image_format=self.image_format, | |
| image_quality=self.image_quality, | |
| use_shared_memory=self.use_shared_memory, | |
| shared_memory_name=shm_name, | |
| ) | |
| # Reset to ensure ready state | |
| env.reset() | |
| self._all_emulators.append(env) | |
| self._available.put(env) | |
| elapsed = time.time() - start_time | |
| logger.info(f"Pool boot complete in {elapsed:.1f} seconds ({elapsed/60:.1f} minutes)") | |
| logger.info(f"Average boot time per emulator: {elapsed/self.pool_size:.1f} seconds") | |
| def get(self, timeout: Optional[float] = None) -> AndroidEnvironment: | |
| """Get an emulator from the pool. | |
| Args: | |
| timeout: Max time to wait for available emulator (seconds). | |
| None = wait forever. | |
| Returns: | |
| AndroidEnvironment ready for use. | |
| Raises: | |
| queue.Empty: If timeout expires and no emulator available. | |
| RuntimeError: If pool is closed. | |
| """ | |
| if self._closed: | |
| raise RuntimeError("Emulator pool is closed") | |
| try: | |
| env = self._available.get(timeout=timeout) | |
| logger.debug(f"Dispatched emulator from pool ({self._available.qsize()} remaining)") | |
| return env | |
| except queue.Empty: | |
| raise queue.Empty( | |
| f"No emulator available after {timeout}s. " | |
| f"Pool size={self.pool_size}, all in use." | |
| ) | |
| def put(self, env: AndroidEnvironment, reset: bool = True): | |
| """Return an emulator to the pool. | |
| Args: | |
| env: Environment to return. | |
| reset: Whether to reset the environment before returning to pool. | |
| Set to False if you've already reset it. | |
| """ | |
| if self._closed: | |
| logger.warning("Attempted to return emulator to closed pool") | |
| return | |
| if reset: | |
| # Fast reset: just reset app state, not full emulator | |
| # This takes ~1s vs 30-60s for full emulator boot | |
| try: | |
| env.reset() | |
| except Exception as e: | |
| logger.error(f"Error resetting emulator: {e}") | |
| # Still return to pool, it might recover | |
| self._available.put(env) | |
| logger.debug(f"Returned emulator to pool ({self._available.qsize()} available)") | |
| def get_stats(self) -> Dict[str, int]: | |
| """Get pool statistics. | |
| Returns: | |
| Dict with pool_size, available, in_use counts. | |
| """ | |
| available = self._available.qsize() | |
| return { | |
| "pool_size": self.pool_size, | |
| "available": available, | |
| "in_use": self.pool_size - available, | |
| } | |
| def close(self): | |
| """Close all emulators in the pool.""" | |
| if self._closed: | |
| return | |
| logger.info("Closing emulator pool...") | |
| self._closed = True | |
| # Close all emulators | |
| for env in self._all_emulators: | |
| try: | |
| env.close() | |
| except Exception as e: | |
| logger.error(f"Error closing emulator: {e}") | |
| logger.info("Emulator pool closed") | |
| def __enter__(self): | |
| """Context manager entry.""" | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| """Context manager exit.""" | |
| self.close() | |
| def __del__(self): | |
| """Cleanup on deletion.""" | |
| self.close() | |
| class EmulatorPoolManager: | |
| """ | |
| Manager for multiple emulator pools (for multi-task training). | |
| Allows running multiple tasks simultaneously with separate pools. | |
| Example: | |
| >>> manager = EmulatorPoolManager() | |
| >>> manager.create_pool("task1", pool_size=32, task_path="/tasks/task1.textproto", ...) | |
| >>> manager.create_pool("task2", pool_size=32, task_path="/tasks/task2.textproto", ...) | |
| >>> | |
| >>> # Get emulator for specific task | |
| >>> env = manager.get("task1") | |
| >>> # ... use env ... | |
| >>> manager.put("task1", env) | |
| """ | |
| def __init__(self): | |
| """Initialize the pool manager.""" | |
| self._pools: Dict[str, EmulatorPool] = {} | |
| self._lock = threading.Lock() | |
| def create_pool(self, name: str, **pool_kwargs) -> EmulatorPool: | |
| """Create a new emulator pool. | |
| Args: | |
| name: Unique name for this pool. | |
| **pool_kwargs: Arguments passed to EmulatorPool constructor. | |
| Returns: | |
| Created EmulatorPool. | |
| """ | |
| with self._lock: | |
| if name in self._pools: | |
| raise ValueError(f"Pool '{name}' already exists") | |
| pool = EmulatorPool(**pool_kwargs) | |
| self._pools[name] = pool | |
| logger.info(f"Created pool '{name}' with {pool.pool_size} emulators") | |
| return pool | |
| def get(self, pool_name: str, timeout: Optional[float] = None) -> AndroidEnvironment: | |
| """Get emulator from named pool.""" | |
| pool = self._pools.get(pool_name) | |
| if not pool: | |
| raise ValueError(f"Pool '{pool_name}' not found") | |
| return pool.get(timeout=timeout) | |
| def put(self, pool_name: str, env: AndroidEnvironment, reset: bool = True): | |
| """Return emulator to named pool.""" | |
| pool = self._pools.get(pool_name) | |
| if not pool: | |
| raise ValueError(f"Pool '{pool_name}' not found") | |
| pool.put(env, reset=reset) | |
| def get_stats(self, pool_name: Optional[str] = None) -> Dict: | |
| """Get statistics for one or all pools.""" | |
| if pool_name: | |
| pool = self._pools.get(pool_name) | |
| if not pool: | |
| raise ValueError(f"Pool '{pool_name}' not found") | |
| return {pool_name: pool.get_stats()} | |
| else: | |
| return {name: pool.get_stats() for name, pool in self._pools.items()} | |
| def close(self, pool_name: Optional[str] = None): | |
| """Close one or all pools.""" | |
| if pool_name: | |
| pool = self._pools.pop(pool_name, None) | |
| if pool: | |
| pool.close() | |
| else: | |
| for pool in self._pools.values(): | |
| pool.close() | |
| self._pools.clear() | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| self.close() | |