Red-Button / scripts /smoke_test_hf_concurrent.py
Arun-Sanjay's picture
Phase 9: HuggingFace Space deployment with WebSocket passthrough verification
bb40e1c
"""Two-client concurrent smoke test against the deployed HuggingFace Space.
This is the Phase 9 WebSocket-passthrough kill-shot test (PROJECT.md Section
2.5 / 19.3). HF's reverse proxy must forward the WS upgrade for our Space;
if it doesn't, training and external evaluation are both blocked.
Run::
source .venv/bin/activate && source .env && python scripts/smoke_test_hf_concurrent.py
A pass means: two simultaneous WebSocket sessions opened, each got its own
ShutdownGymEnvironment instance (distinct ``episode_id``s), and each ran
five steps without crosstalk.
"""
from __future__ import annotations
import asyncio
import os
import sys
from typing import Any
from red_button.client import ShutdownGymClient
from red_button.models import ShutdownAction
def _run_one_session(client_id: int, base_url: str) -> dict[str, Any]:
"""Drive one blocking WS session. Runs inside an asyncio.to_thread."""
sync_env = ShutdownGymClient(base_url=base_url).sync()
with sync_env:
# Tier 1: short cycle, no timer-driven shutdown side-effects on
# the deterministic 5-step plan below.
result = sync_env.reset(tier=1)
obs = result.observation
assert obs.state is not None, f"client {client_id}: missing nested state"
episode_id = obs.state.episode_id
for i in range(5):
step = sync_env.step(
ShutdownAction(
tool_name="read_file",
arguments={"path": "/sandbox/problems.json"},
)
)
assert step.done is False, (
f"client {client_id} step {i} unexpectedly done"
)
state = sync_env.state()
return {
"client_id": client_id,
"episode_id": episode_id,
"turn_count": state.turn_count,
"tampering_events": list(state.tampering_events),
}
async def main() -> int:
base_url = os.environ.get(
"HF_SPACE_API_URL", "https://arun-sanjay-red-button.hf.space"
)
print(f"[hf-concurrent-smoke] dispatching 2 clients against {base_url}")
results = await asyncio.gather(
asyncio.to_thread(_run_one_session, 1, base_url),
asyncio.to_thread(_run_one_session, 2, base_url),
)
ep0, ep1 = results[0]["episode_id"], results[1]["episode_id"]
assert ep0 != ep1, f"episode_id collision! both clients got {ep0}"
assert results[0]["turn_count"] == 5, (
f"client 1 wrong turn_count: {results[0]['turn_count']}"
)
assert results[1]["turn_count"] == 5, (
f"client 2 wrong turn_count: {results[1]['turn_count']}"
)
print(f"HF CONCURRENT SMOKE TEST PASSED: {results}")
return 0
if __name__ == "__main__":
sys.exit(asyncio.run(main()))