burtenshaw's picture
burtenshaw HF Staff
Upload folder using huggingface_hub
42cc6d2 verified
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
Emulator Pool Manager for parallel training.
This module provides a pool of pre-warmed Android emulators for
high-throughput parallel training on multi-core systems.
"""
import logging
import queue
import threading
import time
from typing import Dict, List, Optional
from .android_environment import AndroidEnvironment
logger = logging.getLogger(__name__)
class EmulatorPool:
"""
Pool of pre-warmed Android emulators for parallel training.
The pool:
1. Boots N emulators at startup (amortizes 30-60s boot time)
2. Keeps emulators running across episodes
3. Resets app state (not full emulator) between episodes
4. Provides instant environment access via get/put
Optimized for systems with 100+ CPU cores and high memory capacity.
Example:
>>> # Boot 64 emulators once at startup (10 min one-time cost)
>>> pool = EmulatorPool(
... pool_size=64,
... task_path="/workspace/tasks/my_task.textproto",
... avd_name="default_pixel_6"
... )
>>>
>>> # Training loop - instant access!
>>> for episode in range(10000):
... env = pool.get() # <1ms
... # ... run episode ...
... pool.put(env) # Returns env to pool (resets app state)
>>>
>>> pool.close()
"""
def __init__(
self,
pool_size: int,
task_path: str,
avd_name: str,
adb_path: str = "~/Android/Sdk/platform-tools/adb",
emulator_path: str = "~/Android/Sdk/emulator/emulator",
android_avd_home: str = "~/.android/avd",
android_sdk_root: str = "~/Android/Sdk",
run_headless: bool = True,
image_format: str = "JPEG",
image_quality: int = 85,
use_shared_memory: bool = False,
):
"""Initialize emulator pool.
Args:
pool_size: Number of emulators to pre-warm.
task_path: Path to task textproto.
avd_name: Name of Android Virtual Device.
adb_path: Path to ADB executable.
emulator_path: Path to emulator executable.
android_avd_home: AVD home directory.
android_sdk_root: SDK root directory.
run_headless: Run emulators headless.
image_format: Image encoding format.
image_quality: JPEG quality (1-100).
use_shared_memory: Use shared memory optimization.
"""
self.pool_size = pool_size
self.task_path = task_path
self.avd_name = avd_name
self.adb_path = adb_path
self.emulator_path = emulator_path
self.android_avd_home = android_avd_home
self.android_sdk_root = android_sdk_root
self.run_headless = run_headless
self.image_format = image_format
self.image_quality = image_quality
self.use_shared_memory = use_shared_memory
# Thread-safe queue for available emulators
self._available: queue.Queue = queue.Queue(maxsize=pool_size)
self._all_emulators: List[AndroidEnvironment] = []
self._lock = threading.Lock()
self._closed = False
# Boot all emulators
logger.info(f"Booting {pool_size} emulators... (this will take ~{pool_size} minutes)")
self._boot_pool()
logger.info(f"Emulator pool ready with {pool_size} instances!")
def _boot_pool(self):
"""Boot all emulators in the pool."""
start_time = time.time()
for i in range(self.pool_size):
logger.info(f"Booting emulator {i+1}/{self.pool_size}...")
# Create unique shared memory name if using shared memory
shm_name = f"android_pool_{i}" if self.use_shared_memory else None
env = AndroidEnvironment(
task_path=self.task_path,
avd_name=self.avd_name,
adb_path=self.adb_path,
emulator_path=self.emulator_path,
android_avd_home=self.android_avd_home,
android_sdk_root=self.android_sdk_root,
run_headless=self.run_headless,
image_format=self.image_format,
image_quality=self.image_quality,
use_shared_memory=self.use_shared_memory,
shared_memory_name=shm_name,
)
# Reset to ensure ready state
env.reset()
self._all_emulators.append(env)
self._available.put(env)
elapsed = time.time() - start_time
logger.info(f"Pool boot complete in {elapsed:.1f} seconds ({elapsed/60:.1f} minutes)")
logger.info(f"Average boot time per emulator: {elapsed/self.pool_size:.1f} seconds")
def get(self, timeout: Optional[float] = None) -> AndroidEnvironment:
"""Get an emulator from the pool.
Args:
timeout: Max time to wait for available emulator (seconds).
None = wait forever.
Returns:
AndroidEnvironment ready for use.
Raises:
queue.Empty: If timeout expires and no emulator available.
RuntimeError: If pool is closed.
"""
if self._closed:
raise RuntimeError("Emulator pool is closed")
try:
env = self._available.get(timeout=timeout)
logger.debug(f"Dispatched emulator from pool ({self._available.qsize()} remaining)")
return env
except queue.Empty:
raise queue.Empty(
f"No emulator available after {timeout}s. "
f"Pool size={self.pool_size}, all in use."
)
def put(self, env: AndroidEnvironment, reset: bool = True):
"""Return an emulator to the pool.
Args:
env: Environment to return.
reset: Whether to reset the environment before returning to pool.
Set to False if you've already reset it.
"""
if self._closed:
logger.warning("Attempted to return emulator to closed pool")
return
if reset:
# Fast reset: just reset app state, not full emulator
# This takes ~1s vs 30-60s for full emulator boot
try:
env.reset()
except Exception as e:
logger.error(f"Error resetting emulator: {e}")
# Still return to pool, it might recover
self._available.put(env)
logger.debug(f"Returned emulator to pool ({self._available.qsize()} available)")
def get_stats(self) -> Dict[str, int]:
"""Get pool statistics.
Returns:
Dict with pool_size, available, in_use counts.
"""
available = self._available.qsize()
return {
"pool_size": self.pool_size,
"available": available,
"in_use": self.pool_size - available,
}
def close(self):
"""Close all emulators in the pool."""
if self._closed:
return
logger.info("Closing emulator pool...")
self._closed = True
# Close all emulators
for env in self._all_emulators:
try:
env.close()
except Exception as e:
logger.error(f"Error closing emulator: {e}")
logger.info("Emulator pool closed")
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.close()
def __del__(self):
"""Cleanup on deletion."""
self.close()
class EmulatorPoolManager:
"""
Manager for multiple emulator pools (for multi-task training).
Allows running multiple tasks simultaneously with separate pools.
Example:
>>> manager = EmulatorPoolManager()
>>> manager.create_pool("task1", pool_size=32, task_path="/tasks/task1.textproto", ...)
>>> manager.create_pool("task2", pool_size=32, task_path="/tasks/task2.textproto", ...)
>>>
>>> # Get emulator for specific task
>>> env = manager.get("task1")
>>> # ... use env ...
>>> manager.put("task1", env)
"""
def __init__(self):
"""Initialize the pool manager."""
self._pools: Dict[str, EmulatorPool] = {}
self._lock = threading.Lock()
def create_pool(self, name: str, **pool_kwargs) -> EmulatorPool:
"""Create a new emulator pool.
Args:
name: Unique name for this pool.
**pool_kwargs: Arguments passed to EmulatorPool constructor.
Returns:
Created EmulatorPool.
"""
with self._lock:
if name in self._pools:
raise ValueError(f"Pool '{name}' already exists")
pool = EmulatorPool(**pool_kwargs)
self._pools[name] = pool
logger.info(f"Created pool '{name}' with {pool.pool_size} emulators")
return pool
def get(self, pool_name: str, timeout: Optional[float] = None) -> AndroidEnvironment:
"""Get emulator from named pool."""
pool = self._pools.get(pool_name)
if not pool:
raise ValueError(f"Pool '{pool_name}' not found")
return pool.get(timeout=timeout)
def put(self, pool_name: str, env: AndroidEnvironment, reset: bool = True):
"""Return emulator to named pool."""
pool = self._pools.get(pool_name)
if not pool:
raise ValueError(f"Pool '{pool_name}' not found")
pool.put(env, reset=reset)
def get_stats(self, pool_name: Optional[str] = None) -> Dict:
"""Get statistics for one or all pools."""
if pool_name:
pool = self._pools.get(pool_name)
if not pool:
raise ValueError(f"Pool '{pool_name}' not found")
return {pool_name: pool.get_stats()}
else:
return {name: pool.get_stats() for name, pool in self._pools.items()}
def close(self, pool_name: Optional[str] = None):
"""Close one or all pools."""
if pool_name:
pool = self._pools.pop(pool_name, None)
if pool:
pool.close()
else:
for pool in self._pools.values():
pool.close()
self._pools.clear()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()