Spaces:
Sleeping
Sleeping
File size: 4,865 Bytes
8346aac d2537d2 8346aac d2537d2 8346aac d2537d2 8346aac d2537d2 8346aac d2537d2 8346aac d2537d2 8346aac d2537d2 8346aac d2537d2 8346aac d2537d2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 | """Sustained 1-hour concurrency load test (PROJECT.md §22.1).
Runs ``concurrency`` simultaneous async WebSocket sessions in a loop
for ``duration_minutes``. Each cycle reports cumulative episodes
completed and the *load generator's* RSS (proxy — not the server's
RSS, which would need Docker stats / HF metrics).
Pass criteria per §22.2:
* ≥1000 episodes complete in 1 hour
* No errors after the first warmup minute
* Memory growth <2× over the hour
* All episode IDs are unique
Configure target via ``SHUTDOWN_GYM_URL`` env var; defaults to the
deployed HF Space.
Launch in background::
nohup python -m evaluation.concurrent_load_test \\
> /tmp/shutdown_gym_logs/sustained_$(date +%Y%m%d_%H%M).log 2>&1 &
"""
import asyncio
import os
import time
from typing import Any, Dict
import psutil
from shutdown_gym import ShutdownGymClient
from shutdown_gym.models import ShutdownAction
DEFAULT_SPACE_URL = "https://arun-sanjay-redbutton.hf.space"
async def session(seed: int, env_url: str) -> Dict[str, Any]:
"""Drive a short Tier-2 episode (≤30 steps) and return its summary."""
async with ShutdownGymClient(base_url=env_url) as env:
result = await env.reset(seed=seed, tier=2)
steps = 0
while not result.done and steps < 30:
action = ShutdownAction(
tool_name="list_files",
arguments={"directory": "/sandbox/"},
)
result = await env.step(action)
steps += 1
return {
"seed": seed,
"episode_id": result.observation.metadata.get("episode_id"),
"steps": steps,
"done": result.done,
}
async def sustained_test(
env_url: str,
duration_minutes: int = 60,
concurrency: int = 16,
) -> int:
"""Returns 0 on PASS (all §22.2 criteria met), 1 on FAIL."""
deadline = time.monotonic() + duration_minutes * 60
seed_counter = 0
episodes_completed = 0
error_count = 0
seen_episode_ids: set = set()
started_at = time.monotonic()
initial_rss_mb = psutil.Process().memory_info().rss / 1024 / 1024
final_rss_mb = initial_rss_mb
print(
f"[sustained] env_url={env_url} concurrency={concurrency} "
f"duration_minutes={duration_minutes} initial_rss={initial_rss_mb:.0f} MB"
)
while time.monotonic() < deadline:
tasks = [
session(seed_counter + i, env_url) for i in range(concurrency)
]
seed_counter += concurrency
results = await asyncio.gather(*tasks, return_exceptions=True)
for r in results:
if isinstance(r, Exception):
error_count += 1
else:
episodes_completed += 1
eid = r.get("episode_id")
if eid:
seen_episode_ids.add(eid)
final_rss_mb = psutil.Process().memory_info().rss / 1024 / 1024
elapsed = time.monotonic() - started_at
print(
f"[{elapsed:.0f}s] completed={episodes_completed} "
f"errors={error_count} unique_eids={len(seen_episode_ids)} "
f"rss={final_rss_mb:.0f} MB",
flush=True,
)
elapsed = time.monotonic() - started_at
print(
f"DONE: {episodes_completed} episodes, "
f"{error_count} errors, "
f"{len(seen_episode_ids)} unique episode_ids "
f"in {elapsed:.0f}s "
f"(initial_rss={initial_rss_mb:.0f} MB, final_rss={final_rss_mb:.0f} MB)"
)
# §22.2 pass criteria.
failures = []
if episodes_completed < 1000:
failures.append(
f"completed={episodes_completed} < 1000"
)
if error_count > 0:
failures.append(f"error_count={error_count} > 0")
# NB: load-generator's RSS is a proxy; the server's RSS would
# need Docker stats / HF metrics. We still check growth ratio.
if initial_rss_mb > 0 and final_rss_mb / initial_rss_mb >= 2.0:
failures.append(
f"rss growth {initial_rss_mb:.0f}→{final_rss_mb:.0f} MB ≥ 2x"
)
if seen_episode_ids and len(seen_episode_ids) != episodes_completed:
failures.append(
f"unique_eids={len(seen_episode_ids)} != "
f"completed={episodes_completed}"
)
elif not seen_episode_ids:
failures.append(
"no episode_ids surfaced via metadata; "
"uniqueness check is a no-op"
)
if failures:
print(f"RESULT: FAIL — {'; '.join(failures)}", flush=True)
return 1
print("RESULT: PASS", flush=True)
return 0
if __name__ == "__main__":
import sys
env_url = os.environ.get("SHUTDOWN_GYM_URL", DEFAULT_SPACE_URL)
duration = int(os.environ.get("SUSTAINED_DURATION_MINUTES", "60"))
sys.exit(asyncio.run(sustained_test(env_url, duration_minutes=duration)))
|