Spaces:
Runtime error
Runtime error
File size: 10,663 Bytes
42cc6d2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 |
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
Emulator Pool Manager for parallel training.
This module provides a pool of pre-warmed Android emulators for
high-throughput parallel training on multi-core systems.
"""
import logging
import queue
import threading
import time
from typing import Dict, List, Optional
from .android_environment import AndroidEnvironment
logger = logging.getLogger(__name__)
class EmulatorPool:
"""
Pool of pre-warmed Android emulators for parallel training.
The pool:
1. Boots N emulators at startup (amortizes 30-60s boot time)
2. Keeps emulators running across episodes
3. Resets app state (not full emulator) between episodes
4. Provides instant environment access via get/put
Optimized for systems with 100+ CPU cores and high memory capacity.
Example:
>>> # Boot 64 emulators once at startup (10 min one-time cost)
>>> pool = EmulatorPool(
... pool_size=64,
... task_path="/workspace/tasks/my_task.textproto",
... avd_name="default_pixel_6"
... )
>>>
>>> # Training loop - instant access!
>>> for episode in range(10000):
... env = pool.get() # <1ms
... # ... run episode ...
... pool.put(env) # Returns env to pool (resets app state)
>>>
>>> pool.close()
"""
def __init__(
self,
pool_size: int,
task_path: str,
avd_name: str,
adb_path: str = "~/Android/Sdk/platform-tools/adb",
emulator_path: str = "~/Android/Sdk/emulator/emulator",
android_avd_home: str = "~/.android/avd",
android_sdk_root: str = "~/Android/Sdk",
run_headless: bool = True,
image_format: str = "JPEG",
image_quality: int = 85,
use_shared_memory: bool = False,
):
"""Initialize emulator pool.
Args:
pool_size: Number of emulators to pre-warm.
task_path: Path to task textproto.
avd_name: Name of Android Virtual Device.
adb_path: Path to ADB executable.
emulator_path: Path to emulator executable.
android_avd_home: AVD home directory.
android_sdk_root: SDK root directory.
run_headless: Run emulators headless.
image_format: Image encoding format.
image_quality: JPEG quality (1-100).
use_shared_memory: Use shared memory optimization.
"""
self.pool_size = pool_size
self.task_path = task_path
self.avd_name = avd_name
self.adb_path = adb_path
self.emulator_path = emulator_path
self.android_avd_home = android_avd_home
self.android_sdk_root = android_sdk_root
self.run_headless = run_headless
self.image_format = image_format
self.image_quality = image_quality
self.use_shared_memory = use_shared_memory
# Thread-safe queue for available emulators
self._available: queue.Queue = queue.Queue(maxsize=pool_size)
self._all_emulators: List[AndroidEnvironment] = []
self._lock = threading.Lock()
self._closed = False
# Boot all emulators
logger.info(f"Booting {pool_size} emulators... (this will take ~{pool_size} minutes)")
self._boot_pool()
logger.info(f"Emulator pool ready with {pool_size} instances!")
def _boot_pool(self):
"""Boot all emulators in the pool."""
start_time = time.time()
for i in range(self.pool_size):
logger.info(f"Booting emulator {i+1}/{self.pool_size}...")
# Create unique shared memory name if using shared memory
shm_name = f"android_pool_{i}" if self.use_shared_memory else None
env = AndroidEnvironment(
task_path=self.task_path,
avd_name=self.avd_name,
adb_path=self.adb_path,
emulator_path=self.emulator_path,
android_avd_home=self.android_avd_home,
android_sdk_root=self.android_sdk_root,
run_headless=self.run_headless,
image_format=self.image_format,
image_quality=self.image_quality,
use_shared_memory=self.use_shared_memory,
shared_memory_name=shm_name,
)
# Reset to ensure ready state
env.reset()
self._all_emulators.append(env)
self._available.put(env)
elapsed = time.time() - start_time
logger.info(f"Pool boot complete in {elapsed:.1f} seconds ({elapsed/60:.1f} minutes)")
logger.info(f"Average boot time per emulator: {elapsed/self.pool_size:.1f} seconds")
def get(self, timeout: Optional[float] = None) -> AndroidEnvironment:
"""Get an emulator from the pool.
Args:
timeout: Max time to wait for available emulator (seconds).
None = wait forever.
Returns:
AndroidEnvironment ready for use.
Raises:
queue.Empty: If timeout expires and no emulator available.
RuntimeError: If pool is closed.
"""
if self._closed:
raise RuntimeError("Emulator pool is closed")
try:
env = self._available.get(timeout=timeout)
logger.debug(f"Dispatched emulator from pool ({self._available.qsize()} remaining)")
return env
except queue.Empty:
raise queue.Empty(
f"No emulator available after {timeout}s. "
f"Pool size={self.pool_size}, all in use."
)
def put(self, env: AndroidEnvironment, reset: bool = True):
"""Return an emulator to the pool.
Args:
env: Environment to return.
reset: Whether to reset the environment before returning to pool.
Set to False if you've already reset it.
"""
if self._closed:
logger.warning("Attempted to return emulator to closed pool")
return
if reset:
# Fast reset: just reset app state, not full emulator
# This takes ~1s vs 30-60s for full emulator boot
try:
env.reset()
except Exception as e:
logger.error(f"Error resetting emulator: {e}")
# Still return to pool, it might recover
self._available.put(env)
logger.debug(f"Returned emulator to pool ({self._available.qsize()} available)")
def get_stats(self) -> Dict[str, int]:
"""Get pool statistics.
Returns:
Dict with pool_size, available, in_use counts.
"""
available = self._available.qsize()
return {
"pool_size": self.pool_size,
"available": available,
"in_use": self.pool_size - available,
}
def close(self):
"""Close all emulators in the pool."""
if self._closed:
return
logger.info("Closing emulator pool...")
self._closed = True
# Close all emulators
for env in self._all_emulators:
try:
env.close()
except Exception as e:
logger.error(f"Error closing emulator: {e}")
logger.info("Emulator pool closed")
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.close()
def __del__(self):
"""Cleanup on deletion."""
self.close()
class EmulatorPoolManager:
"""
Manager for multiple emulator pools (for multi-task training).
Allows running multiple tasks simultaneously with separate pools.
Example:
>>> manager = EmulatorPoolManager()
>>> manager.create_pool("task1", pool_size=32, task_path="/tasks/task1.textproto", ...)
>>> manager.create_pool("task2", pool_size=32, task_path="/tasks/task2.textproto", ...)
>>>
>>> # Get emulator for specific task
>>> env = manager.get("task1")
>>> # ... use env ...
>>> manager.put("task1", env)
"""
def __init__(self):
"""Initialize the pool manager."""
self._pools: Dict[str, EmulatorPool] = {}
self._lock = threading.Lock()
def create_pool(self, name: str, **pool_kwargs) -> EmulatorPool:
"""Create a new emulator pool.
Args:
name: Unique name for this pool.
**pool_kwargs: Arguments passed to EmulatorPool constructor.
Returns:
Created EmulatorPool.
"""
with self._lock:
if name in self._pools:
raise ValueError(f"Pool '{name}' already exists")
pool = EmulatorPool(**pool_kwargs)
self._pools[name] = pool
logger.info(f"Created pool '{name}' with {pool.pool_size} emulators")
return pool
def get(self, pool_name: str, timeout: Optional[float] = None) -> AndroidEnvironment:
"""Get emulator from named pool."""
pool = self._pools.get(pool_name)
if not pool:
raise ValueError(f"Pool '{pool_name}' not found")
return pool.get(timeout=timeout)
def put(self, pool_name: str, env: AndroidEnvironment, reset: bool = True):
"""Return emulator to named pool."""
pool = self._pools.get(pool_name)
if not pool:
raise ValueError(f"Pool '{pool_name}' not found")
pool.put(env, reset=reset)
def get_stats(self, pool_name: Optional[str] = None) -> Dict:
"""Get statistics for one or all pools."""
if pool_name:
pool = self._pools.get(pool_name)
if not pool:
raise ValueError(f"Pool '{pool_name}' not found")
return {pool_name: pool.get_stats()}
else:
return {name: pool.get_stats() for name, pool in self._pools.items()}
def close(self, pool_name: Optional[str] = None):
"""Close one or all pools."""
if pool_name:
pool = self._pools.pop(pool_name, None)
if pool:
pool.close()
else:
for pool in self._pools.values():
pool.close()
self._pools.clear()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
|