Spaces:
Running
Running
File size: 25,568 Bytes
c745a99 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 | """Unit tests for the MiniStackPool and env factory (parallel-rollout support).
These are pure unit tests β no MiniStack, no Docker, no network.
Run:
python -m pytest tests/test_pool.py -v
"""
from __future__ import annotations
import threading
from unittest.mock import patch
import pytest
from server.app import MiniStackPool, make_env_factory
from server.aws_rl_env_environment import AwsRlEnvironment
# ---------------------------------------------------------------------------
# MiniStackPool
# ---------------------------------------------------------------------------
class TestMiniStackPoolBasics:
def test_init_records_all_ports_as_free(self) -> None:
pool = MiniStackPool([4566, 4567, 4568])
assert pool.free_count == 3
def test_init_with_empty_iterable(self) -> None:
pool = MiniStackPool([])
assert pool.free_count == 0
def test_acquire_decrements_free_count(self) -> None:
pool = MiniStackPool([4566, 4567])
pool.acquire()
assert pool.free_count == 1
def test_acquire_returns_port_from_pool(self) -> None:
pool = MiniStackPool([4566, 4567])
port = pool.acquire()
assert port in {4566, 4567}
def test_release_increments_free_count(self) -> None:
pool = MiniStackPool([4566, 4567])
port = pool.acquire()
pool.release(port)
assert pool.free_count == 2
class TestMiniStackPoolExhaustion:
def test_acquire_beyond_capacity_raises(self) -> None:
pool = MiniStackPool([4566])
pool.acquire()
with pytest.raises(RuntimeError, match="exhausted"):
pool.acquire()
def test_empty_pool_raises_on_acquire(self) -> None:
pool = MiniStackPool([])
with pytest.raises(RuntimeError, match="exhausted"):
pool.acquire()
def test_can_acquire_again_after_release(self) -> None:
pool = MiniStackPool([4566])
pool.acquire()
with pytest.raises(RuntimeError):
pool.acquire()
pool.release(4566)
assert pool.acquire() == 4566
class TestMiniStackPoolRecycling:
def test_released_port_is_reused(self) -> None:
pool = MiniStackPool([4566])
first = pool.acquire()
pool.release(first)
second = pool.acquire()
assert second == first
def test_multiple_cycles_stay_bounded(self) -> None:
"""Open+close 100 sessions on a pool of 4 ports β must never exhaust."""
pool = MiniStackPool(range(4566, 4570))
for _ in range(100):
port = pool.acquire()
pool.release(port)
assert pool.free_count == 4
def test_full_drain_then_full_refill(self) -> None:
pool = MiniStackPool(range(4566, 4574))
acquired = [pool.acquire() for _ in range(8)]
assert pool.free_count == 0
for port in acquired:
pool.release(port)
assert pool.free_count == 8
class TestMiniStackPoolConcurrency:
def test_concurrent_acquire_no_duplicate_ports(self) -> None:
"""100 threads compete for 50 ports. Winners must hold unique ports,
losers must see RuntimeError β no double-assignment.
"""
pool = MiniStackPool(range(10000, 10050))
acquired: list[int] = []
errors: list[Exception] = []
lock = threading.Lock()
def worker() -> None:
try:
port = pool.acquire()
with lock:
acquired.append(port)
except RuntimeError as e:
with lock:
errors.append(e)
threads = [threading.Thread(target=worker) for _ in range(100)]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(acquired) == 50
assert len(set(acquired)) == 50 # no duplicates
assert len(errors) == 50
assert pool.free_count == 0
def test_concurrent_release_preserves_all_ports(self) -> None:
"""All 50 ports released concurrently end up back in the pool."""
pool = MiniStackPool(range(10000, 10050))
ports = [pool.acquire() for _ in range(50)]
threads = [threading.Thread(target=pool.release, args=(p,)) for p in ports]
for t in threads:
t.start()
for t in threads:
t.join()
assert pool.free_count == 50
def test_acquire_release_cycle_under_contention(self) -> None:
"""10 threads acquire-release 50 times each against a pool of 3. No port is lost."""
pool = MiniStackPool([4566, 4567, 4568])
def churn() -> None:
for _ in range(50):
try:
p = pool.acquire()
pool.release(p)
except RuntimeError:
pass # contention β expected
threads = [threading.Thread(target=churn) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
assert pool.free_count == 3
# ---------------------------------------------------------------------------
# make_env_factory β single-mode vs multi-mode branch
# ---------------------------------------------------------------------------
class TestFactorySingleMode:
def test_pool_size_1_returns_no_pool(self) -> None:
pool, factory = make_env_factory(pool_size=1, base_port=4566)
assert pool is None
assert callable(factory)
def test_pool_size_0_returns_no_pool(self) -> None:
"""Treat 0 or negative the same as 1 β no pool, legacy behavior."""
pool, factory = make_env_factory(pool_size=0, base_port=4566)
assert pool is None
def test_factory_returns_env_without_pool_release(self) -> None:
_, factory = make_env_factory(pool_size=1, base_port=4566)
env = factory()
assert isinstance(env, AwsRlEnvironment)
assert env._pool_release is None
class TestServerAppImportIsSafeForLegacyPoolSizes:
"""Regression: `AWS_RL_ENV_POOL_SIZE=0` used to crash at module import
because OpenEnv's create_app rejects `max_concurrent_envs=0`. The server
now clamps the raw env var to >= 1 so legacy-style zero / negative values
silently fall back to single-MiniStack mode.
"""
def _import_server_app(self, pool_size_env: str) -> int:
"""Import server.app in a fresh subprocess with a controlled env var.
Returns the POOL_SIZE the module settled on after clamping.
"""
import os
import subprocess
import sys
code = "import server.app as m;import sys;sys.stdout.write(str(m.POOL_SIZE))"
env = {**os.environ, "AWS_RL_ENV_POOL_SIZE": pool_size_env}
result = subprocess.run(
[sys.executable, "-c", code],
env=env,
capture_output=True,
text=True,
check=False,
)
assert result.returncode == 0, (
f"server.app import crashed with POOL_SIZE={pool_size_env!r}: "
f"stderr={result.stderr}"
)
return int(result.stdout.strip().splitlines()[-1])
def test_pool_size_zero_clamps_to_one(self) -> None:
assert self._import_server_app("0") == 1
def test_pool_size_negative_clamps_to_one(self) -> None:
assert self._import_server_app("-5") == 1
def test_pool_size_one_is_unchanged(self) -> None:
assert self._import_server_app("1") == 1
def test_pool_size_eight_is_unchanged(self) -> None:
assert self._import_server_app("8") == 8
class TestFactoryMultiMode:
def test_pool_size_8_creates_pool_of_8(self) -> None:
pool, _ = make_env_factory(pool_size=8, base_port=4566)
assert pool is not None
assert pool.free_count == 8
def test_factory_acquires_port_from_pool(self) -> None:
pool, factory = make_env_factory(pool_size=4, base_port=4566)
assert pool is not None
assert pool.free_count == 4
env = factory()
assert pool.free_count == 3
assert env._pool_release is not None
def test_env_bound_to_port_in_configured_range(self) -> None:
pool, factory = make_env_factory(pool_size=4, base_port=5000)
env = factory()
url = env._backend._aws_infra_url
# Port should be one of 5000..5003
port = int(url.rsplit(":", 1)[-1])
assert 5000 <= port < 5004
def test_multiple_factory_calls_drain_pool(self) -> None:
pool, factory = make_env_factory(pool_size=3, base_port=4566)
assert pool is not None
envs = [factory() for _ in range(3)]
assert pool.free_count == 0
with pytest.raises(RuntimeError, match="exhausted"):
factory()
# Keep envs referenced to avoid GC warning
assert len(envs) == 3
def test_envs_get_distinct_ports(self) -> None:
_, factory = make_env_factory(pool_size=4, base_port=4566)
envs = [factory() for _ in range(4)]
urls = {e._backend._aws_infra_url for e in envs}
assert len(urls) == 4 # all distinct
def test_custom_base_port_is_respected(self) -> None:
pool, factory = make_env_factory(pool_size=3, base_port=9000)
env = factory()
port = int(env._backend._aws_infra_url.rsplit(":", 1)[-1])
assert 9000 <= port < 9003
# ---------------------------------------------------------------------------
# AwsRlEnvironment.close() β pool interaction
# ---------------------------------------------------------------------------
class TestEnvCloseReleasesPort:
def test_close_returns_port_to_pool(self) -> None:
pool, factory = make_env_factory(pool_size=4, base_port=4566)
assert pool is not None
env = factory()
assert pool.free_count == 3
# Mock the MiniStack scrub so close() doesn't try to hit the network
with patch.object(env._backend, "reset_environment"):
env.close()
assert pool.free_count == 4
def test_close_clears_pool_release_to_prevent_double_release(self) -> None:
pool, factory = make_env_factory(pool_size=4, base_port=4566)
env = factory()
with patch.object(env._backend, "reset_environment"):
env.close()
env.close() # second close must be a no-op
assert pool.free_count == 4 # not 5
def test_close_releases_port_even_if_scrub_fails(self) -> None:
"""If MiniStack is unreachable, close() still returns the port β leaking ports
on network hiccups would drain the pool.
"""
pool, factory = make_env_factory(pool_size=4, base_port=4566)
env = factory()
with patch.object(
env._backend,
"reset_environment",
side_effect=ConnectionError("boom"),
):
env.close()
assert pool.free_count == 4
def test_close_on_non_pooled_env_is_noop(self) -> None:
_, factory = make_env_factory(pool_size=1, base_port=4566)
env = factory()
# Not from pool β no release callback to fire
env.close()
assert env._pool_release is None # still None
def test_close_invokes_backend_scrub(self) -> None:
_, factory = make_env_factory(pool_size=2, base_port=4566)
env = factory()
with patch.object(env._backend, "reset_environment") as mock_scrub:
env.close()
mock_scrub.assert_called_once()
class TestFactoryConcurrencyIntegration:
def test_concurrent_factory_calls_get_distinct_ports(self) -> None:
"""The factory + pool combo must hand out unique ports under contention."""
_, factory = make_env_factory(pool_size=50, base_port=10000)
envs: list[AwsRlEnvironment] = []
lock = threading.Lock()
def worker() -> None:
env = factory()
with lock:
envs.append(env)
threads = [threading.Thread(target=worker) for _ in range(50)]
for t in threads:
t.start()
for t in threads:
t.join()
ports = {int(e._backend._aws_infra_url.rsplit(":", 1)[-1]) for e in envs}
assert len(ports) == 50
def test_concurrent_close_returns_all_ports(self) -> None:
pool, factory = make_env_factory(pool_size=20, base_port=10000)
assert pool is not None
envs = [factory() for _ in range(20)]
assert pool.free_count == 0
for env in envs:
env._backend.reset_environment = lambda: None # type: ignore[assignment]
threads = [threading.Thread(target=e.close) for e in envs]
for t in threads:
t.start()
for t in threads:
t.join()
assert pool.free_count == 20
# ---------------------------------------------------------------------------
# Web playground coexistence with the MiniStack pool
# ---------------------------------------------------------------------------
def _run_in_subprocess(env_overrides: dict[str, str], code: str) -> tuple[int, str, str]:
"""Run `code` in a fresh subprocess with the given env overrides.
Mirrors the pattern used by TestServerAppImportIsSafeForLegacyPoolSizes
to avoid module-cache pollution across env-var changes.
"""
import os
import subprocess
import sys
env = {**os.environ, **env_overrides}
result = subprocess.run(
[sys.executable, "-c", code],
env=env,
capture_output=True,
text=True,
check=False,
)
return result.returncode, result.stdout, result.stderr
class TestWebRoutesMountUnconditionally:
"""The web playground used to be gated on POOL_SIZE <= 1. It now mounts
regardless of pool size, with a dedicated lazy MiniStack on
AWS_RL_ENV_WEB_MINISTACK_PORT.
"""
def test_web_routes_present_when_pool_size_8(self) -> None:
code = (
"import server.app as m;"
"paths = {getattr(r, 'path', None) for r in m.app.routes};"
"import sys;"
"missing = {'/web', '/web/reset', '/web/state', '/web/step', '/web/solution'} - paths;"
"sys.stdout.write('MISSING=' + repr(missing))"
)
rc, out, err = _run_in_subprocess({"AWS_RL_ENV_POOL_SIZE": "8"}, code)
assert rc == 0, f"import failed: {err}"
assert "MISSING=set()" in out, out
def test_web_routes_present_when_pool_size_1(self) -> None:
code = (
"import server.app as m;"
"paths = {getattr(r, 'path', None) for r in m.app.routes};"
"import sys;"
"missing = {'/web', '/web/reset', '/web/state', '/web/step', '/web/solution'} - paths;"
"sys.stdout.write('MISSING=' + repr(missing))"
)
rc, out, err = _run_in_subprocess({"AWS_RL_ENV_POOL_SIZE": "1"}, code)
assert rc == 0, f"import failed: {err}"
assert "MISSING=set()" in out, out
class TestWebMiniStackPortConflictDetection:
"""The startup-time guard refuses to boot if the configured web port falls
inside the pool's port range. Without it, a WebSocket session could acquire
the same port the web _env writes to and corrupt state in both directions.
"""
def test_collision_inside_pool_range_raises(self) -> None:
code = "import server.app"
rc, _, err = _run_in_subprocess(
{
"AWS_RL_ENV_POOL_SIZE": "8",
"AWS_RL_ENV_MINISTACK_BASE_PORT": "4566",
"AWS_RL_ENV_WEB_MINISTACK_PORT": "4570", # inside [4566..4573]
},
code,
)
assert rc != 0
assert "collides with pool range" in err
def test_web_port_just_below_pool_range_is_allowed(self) -> None:
code = "import server.app"
rc, _, err = _run_in_subprocess(
{
"AWS_RL_ENV_POOL_SIZE": "8",
"AWS_RL_ENV_MINISTACK_BASE_PORT": "4566",
"AWS_RL_ENV_WEB_MINISTACK_PORT": "4565", # default
},
code,
)
assert rc == 0, err
def test_web_port_just_above_pool_range_is_allowed(self) -> None:
code = "import server.app"
rc, _, err = _run_in_subprocess(
{
"AWS_RL_ENV_POOL_SIZE": "8",
"AWS_RL_ENV_MINISTACK_BASE_PORT": "4566",
"AWS_RL_ENV_WEB_MINISTACK_PORT": "4574", # one past 4573
},
code,
)
assert rc == 0, err
def test_collision_check_skipped_when_pool_size_1(self) -> None:
"""POOL_SIZE=1 means no pool object exists, so the constant web port
is allowed to coincide with BASE_PORT (it just means the web env
shares the lone MiniStack). Backward-compat for legacy single-mode.
"""
code = "import server.app"
rc, _, err = _run_in_subprocess(
{
"AWS_RL_ENV_POOL_SIZE": "1",
"AWS_RL_ENV_MINISTACK_BASE_PORT": "4566",
"AWS_RL_ENV_WEB_MINISTACK_PORT": "4566",
},
code,
)
assert rc == 0, err
def test_collision_check_skipped_when_backend_aws(self) -> None:
"""BACKEND_TYPE=aws skips the pool entirely (all sessions share
AwsStrategy), so a "collision" with the pool's range is hypothetical
β the pool object is never constructed. Refusing to boot here would
be a false positive.
"""
code = "import server.app"
rc, _, err = _run_in_subprocess(
{
"AWS_RL_ENV_POOL_SIZE": "8",
"AWS_RL_ENV_MINISTACK_BASE_PORT": "4566",
"AWS_RL_ENV_WEB_MINISTACK_PORT": "4570", # would collide if simulator
"BACKEND_TYPE": "aws",
},
code,
)
assert rc == 0, err
class TestWebEnvLazyConstruction:
def test_web_env_is_none_immediately_after_import(self) -> None:
"""Lazy: the dedicated MiniStack should NOT spawn until a /web/*
request arrives. Importing the module must not subprocess anything.
"""
code = (
"import server.app as m;"
"import sys;"
"sys.stdout.write('\\nRESULT=' + ('NONE' if m._web_env is None else 'NOT_NONE'))"
)
rc, out, err = _run_in_subprocess({"AWS_RL_ENV_POOL_SIZE": "8"}, code)
assert rc == 0, err
assert out.strip().splitlines()[-1] == "RESULT=NONE"
def test_get_web_env_legacy_uses_default_port_for_pool_size_1(self) -> None:
"""POOL_SIZE=1: web env shares the single MiniStack on :4566 β the
original behavior, locked down so it doesn't drift.
"""
code = (
"import server.app as m;"
"env = m._get_web_env();"
"import sys;"
"sys.stdout.write('\\nRESULT=' + env._backend._aws_infra_url)"
)
rc, out, err = _run_in_subprocess({"AWS_RL_ENV_POOL_SIZE": "1"}, code)
assert rc == 0, err
assert out.strip().splitlines()[-1] == "RESULT=http://localhost:4566"
def test_get_web_env_uses_aws_strategy_when_backend_aws(self) -> None:
"""BACKEND_TYPE=aws: web env wires AwsStrategy too. No MiniStack spawn.
Fixes the latent inconsistency where the web playground always used
the simulator regardless of training backend.
"""
code = (
"import server.app as m;"
"from server.services.aws_strategy import AwsStrategy;"
"env = m._get_web_env();"
"import sys;"
"sys.stdout.write('\\nRESULT=' + ('AWS' if isinstance(env._backend, AwsStrategy) else 'NOT_AWS'))"
)
rc, out, err = _run_in_subprocess(
{"AWS_RL_ENV_POOL_SIZE": "8", "BACKEND_TYPE": "aws"},
code,
)
assert rc == 0, err
assert out.strip().splitlines()[-1] == "RESULT=AWS"
class TestSpawnWebMiniStackShortCircuit:
"""`_spawn_web_ministack` must not subprocess if the port is already
listening β otherwise a server restart would race against the existing
detached MiniStack and stall on the bind check.
"""
def test_does_not_spawn_when_port_already_listening(self) -> None:
import socket
from server.app import _spawn_web_ministack
# Bind an ephemeral port to simulate a MiniStack already running.
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sentinel:
sentinel.bind(("127.0.0.1", 0))
sentinel.listen(1)
port = sentinel.getsockname()[1]
with patch("server.app.subprocess.Popen") as popen:
_spawn_web_ministack(port, timeout_s=0.5)
popen.assert_not_called()
def test_raises_on_bind_timeout(self) -> None:
"""If the spawned MiniStack never binds, raise instead of hanging."""
from server.app import _spawn_web_ministack
# Pick a port that is almost certainly free; mock Popen so nothing
# actually starts. _spawn_web_ministack should poll and time out.
with patch("server.app.subprocess.Popen"):
with pytest.raises(RuntimeError, match="failed to bind"):
_spawn_web_ministack(port=1, timeout_s=0.3)
class TestGetWebEnvAdversarial:
"""Stress-test _get_web_env against the failure modes a real deployment
will eventually hit: concurrent first-request races, ministack-not-installed,
and spawn timeouts.
Each test patches at the module level inside an isolated subprocess so
real ministacks are never spawned.
"""
def test_concurrent_first_requests_spawn_at_most_once(self) -> None:
"""N threads racing on the cold start must result in exactly one
Popen call. The double-checked lock + cached _web_env enforce this.
Otherwise a busy /web/* moment at boot would spawn N ministacks all
fighting for the same port.
"""
code = """
import sys, threading
from unittest.mock import patch
import server.app as m
with patch('server.app._spawn_web_ministack') as spawn:
spawn.return_value = None
def call():
m._get_web_env()
threads = [threading.Thread(target=call) for _ in range(20)]
for t in threads: t.start()
for t in threads: t.join()
sys.stdout.write('\\nRESULT=' + str(spawn.call_count))
"""
rc, out, err = _run_in_subprocess({"AWS_RL_ENV_POOL_SIZE": "8"}, code)
assert rc == 0, err
assert out.strip().splitlines()[-1] == "RESULT=1"
def test_get_web_env_does_not_spawn_when_backend_aws(self) -> None:
"""BACKEND_TYPE=aws path takes the AwsStrategy branch and never
subprocesses ministack β even with POOL_SIZE=8.
"""
code = """
import sys
from unittest.mock import patch
import server.app as m
with patch('server.app.subprocess.Popen') as popen:
m._get_web_env()
sys.stdout.write('\\nRESULT=' + str(popen.call_count))
"""
rc, out, err = _run_in_subprocess(
{"AWS_RL_ENV_POOL_SIZE": "8", "BACKEND_TYPE": "aws"},
code,
)
assert rc == 0, err
assert out.strip().splitlines()[-1] == "RESULT=0"
def test_get_web_env_does_not_spawn_when_pool_size_1(self) -> None:
"""Legacy POOL_SIZE=1 path shares the lone pool MiniStack on :4566
and never spawns a separate web MiniStack.
"""
code = """
import sys
from unittest.mock import patch
import server.app as m
with patch('server.app.subprocess.Popen') as popen:
m._get_web_env()
sys.stdout.write('\\nRESULT=' + str(popen.call_count))
"""
rc, out, err = _run_in_subprocess({"AWS_RL_ENV_POOL_SIZE": "1"}, code)
assert rc == 0, err
assert out.strip().splitlines()[-1] == "RESULT=0"
def test_get_web_env_retries_after_spawn_failure(self) -> None:
"""If the first spawn fails (e.g., ministack not installed yet, or
the bind timed out), _web_env stays None so a later request can
retry instead of permanently caching the failure.
"""
code = """
import sys
from unittest.mock import patch
import server.app as m
with patch('server.app._spawn_web_ministack', side_effect=RuntimeError('boom')):
failed = False
try:
m._get_web_env()
except RuntimeError:
failed = True
assert failed, 'expected first call to raise'
assert m._web_env is None, '_web_env must stay None after spawn failure'
sys.stdout.write('\\nRESULT=ok')
"""
rc, out, err = _run_in_subprocess({"AWS_RL_ENV_POOL_SIZE": "8"}, code)
assert rc == 0, err
assert out.strip().splitlines()[-1] == "RESULT=ok"
def test_pool_factory_capacity_independent_of_web_env(self) -> None:
"""The web _env is a module-level singleton, NOT produced by the
WebSocket factory. So a pool of 8 still hands out 8 distinct ports;
the web env doesn't steal a slot. Critical for the user's "8 WS +
web UI" goal.
"""
pool, factory = make_env_factory(pool_size=8, base_port=4566)
assert pool is not None
envs = [factory() for _ in range(8)]
assert pool.free_count == 0
# 9th must fail β same as before this change
with pytest.raises(RuntimeError, match="exhausted"):
factory()
# Sanity: all 8 ports distinct, none equal to 4565 (web port)
ports = {int(e._backend._aws_infra_url.rsplit(":", 1)[-1]) for e in envs}
assert len(ports) == 8
assert 4565 not in ports
|