RoboMME / gradio-web /test /test_queue_session_limit_e2e.py
HongzeFu's picture
v2
a74e633
from __future__ import annotations
import contextlib
import importlib
import socket
import threading
import time
from urllib.error import URLError
from urllib.request import urlopen
import pytest
from PIL import Image
gr = pytest.importorskip("gradio")
pytest.importorskip("fastapi")
pytest.importorskip("uvicorn")
pytest.importorskip("playwright.sync_api")
import uvicorn
from fastapi import FastAPI
from playwright.sync_api import sync_playwright
def _free_port() -> int:
with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.bind(("127.0.0.1", 0))
return int(sock.getsockname()[1])
def _wait_http_ready(url: str, timeout_s: float = 20.0) -> None:
end = time.time() + timeout_s
while time.time() < end:
try:
with urlopen(url, timeout=1.0) as resp: # noqa: S310 - local test URL only
if int(getattr(resp, "status", 200)) < 500:
return
except URLError:
time.sleep(0.2)
except Exception:
time.sleep(0.2)
raise RuntimeError(f"Server did not become ready: {url}")
def _wait_until(predicate, timeout_s: float = 10.0, interval_s: float = 0.1) -> None:
end = time.time() + timeout_s
while time.time() < end:
if predicate():
return
time.sleep(interval_s)
raise AssertionError("Condition was not met before timeout")
def _minimal_load_result(uid: str, log_text: str = "ready"):
obs = Image.new("RGB", (32, 32), color=(12, 24, 36))
return (
uid,
gr.update(visible=True),
obs,
log_text,
gr.update(choices=[("pick", 0)], value=None),
"goal",
"No need for coordinates",
gr.update(value=None, visible=False),
gr.update(visible=False, interactive=False),
"BinFill (Episode 1)",
"Completed: 0",
gr.update(interactive=True),
gr.update(interactive=True),
gr.update(interactive=True),
gr.update(visible=False),
gr.update(visible=True),
gr.update(visible=True),
gr.update(value="hint"),
gr.update(interactive=True),
)
def _read_progress_text(page) -> str | None:
return page.evaluate(
"""() => {
const node = document.querySelector('.progress-text');
if (!node) return null;
const text = (node.textContent || '').trim();
return text || null;
}"""
)
def _read_unified_overlay_text(page) -> str | None:
progress_text = _read_progress_text(page)
if progress_text:
return progress_text
snapshot = _read_session_wait_overlay_snapshot(page)
content = snapshot.get("content")
return str(content).strip() if content else None
def _read_progress_overlay_snapshot(page) -> dict[str, float | bool | None]:
return page.evaluate(
"""() => {
const node = document.querySelector('#native_progress_host .wrap');
if (!node) {
return { present: false, width: null, height: null, background: null };
}
const rect = node.getBoundingClientRect();
const style = getComputedStyle(node);
return {
present: true,
width: rect.width,
height: rect.height,
background: style.backgroundColor || null,
};
}"""
)
def _read_session_wait_overlay_snapshot(page) -> dict[str, float | bool | None]:
return page.evaluate(
"""() => {
const host = document.getElementById('native_progress_host');
if (!host) {
return {
present: false,
visible: false,
width: null,
height: null,
background: null,
content: null,
proseBackground: null,
proseBorderRadius: null,
proseBoxShadow: null,
markdownDisplay: null,
markdownVisibility: null,
wrapperDisplay: null,
wrapperVisibility: null,
};
}
const markdown = host.querySelector('[data-testid="markdown"]');
const prose = markdown ? markdown.querySelector('.prose, .md') || markdown : null;
const wrapper = markdown ? markdown.parentElement : null;
const text = prose ? ((prose.innerText || prose.textContent || '').trim()) : '';
const rect = host.getBoundingClientRect();
const style = getComputedStyle(host);
const proseStyle = prose ? getComputedStyle(prose) : null;
const markdownStyle = markdown ? getComputedStyle(markdown) : null;
const wrapperStyle = wrapper ? getComputedStyle(wrapper) : null;
return {
present: true,
visible: style.display !== 'none' && rect.width > 0 && rect.height > 0 && text.length > 0,
width: rect.width,
height: rect.height,
background: style.backgroundColor || null,
content: text || null,
proseBackground: proseStyle ? proseStyle.backgroundColor || null : null,
proseBorderRadius: proseStyle ? proseStyle.borderRadius || null : null,
proseBoxShadow: proseStyle ? proseStyle.boxShadow || null : null,
markdownDisplay: markdownStyle ? markdownStyle.display || null : null,
markdownVisibility: markdownStyle ? markdownStyle.visibility || null : null,
wrapperDisplay: wrapperStyle ? wrapperStyle.display || null : null,
wrapperVisibility: wrapperStyle ? wrapperStyle.visibility || null : null,
};
}"""
)
def _read_log_output_value(page) -> str | None:
return page.evaluate(
"""() => {
const root = document.getElementById('log_output');
if (!root) return null;
const field = root.querySelector('textarea, input');
if (field && typeof field.value === 'string') {
const value = field.value.trim();
return value || null;
}
const value = (root.textContent || '').trim();
return value || null;
}"""
)
def _mount_demo(demo):
port = _free_port()
host = "127.0.0.1"
root_url = f"http://{host}:{port}/"
app = FastAPI(title="queue-session-limit-test")
app = gr.mount_gradio_app(app, demo, path="/")
config = uvicorn.Config(app, host=host, port=port, log_level="error")
server = uvicorn.Server(config)
thread = threading.Thread(target=server.run, daemon=True)
thread.start()
_wait_http_ready(root_url)
return root_url, demo, server, thread
def test_entry_rejects_immediately_when_session_limit_is_full(monkeypatch):
config = importlib.reload(importlib.import_module("config"))
state_manager = importlib.reload(importlib.import_module("state_manager"))
callbacks = importlib.reload(importlib.import_module("gradio_callbacks"))
ui_layout = importlib.reload(importlib.import_module("ui_layout"))
class _FakeProxy:
def __init__(self):
self.env_id = None
self.episode_idx = None
self.language_goal = "goal"
self.available_options = [("pick", 0)]
self.raw_solve_options = [{"label": "a", "action": "pick", "available": False}]
self.demonstration_frames = []
def load_episode(self, env_id, episode_idx):
self.env_id = env_id
self.episode_idx = episode_idx
return Image.new("RGB", (32, 32), color=(10, 20, 30)), "loaded"
def get_pil_image(self, use_segmented=False):
_ = use_segmented
return Image.new("RGB", (32, 32), color=(10, 20, 30))
def close(self):
return None
def fake_init_session(uid):
_ = uid
time.sleep(6.0)
return (
True,
"ok",
{"current_task": {"env_id": "BinFill", "episode_idx": 1}, "completed_count": 0},
)
monkeypatch.setattr(state_manager, "ProcessSessionProxy", _FakeProxy)
monkeypatch.setattr(callbacks.user_manager, "init_session", fake_init_session)
monkeypatch.setattr(callbacks, "get_task_hint", lambda env_id: "")
monkeypatch.setattr(callbacks, "should_show_demo_video", lambda env_id: False)
demo = ui_layout.create_ui_blocks()
root_url, demo, server, thread = _mount_demo(demo)
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
pages = []
total_pages = int(config.SESSION_CONCURRENCY_LIMIT) + 2
for _ in range(total_pages):
page = browser.new_page(viewport={"width": 1280, "height": 900})
page.goto(root_url, wait_until="domcontentloaded")
pages.append(page)
time.sleep(0.25)
def _overlay_snapshot_ready():
overlay_texts = [_read_unified_overlay_text(page) for page in pages]
rejected_ready = all(
text == config.UI_TEXT["progress"]["entry_rejected"]
for text in overlay_texts[config.SESSION_CONCURRENCY_LIMIT :]
)
return rejected_ready
_wait_until(_overlay_snapshot_ready, timeout_s=10.0)
processing_pages = [
_read_unified_overlay_text(page) or ""
for page in pages[: config.SESSION_CONCURRENCY_LIMIT]
]
rejected_pages = [
_read_unified_overlay_text(page) or ""
for page in pages[config.SESSION_CONCURRENCY_LIMIT :]
]
assert all(text != config.UI_TEXT["progress"]["entry_rejected"] for text in processing_pages)
assert all(text == config.UI_TEXT["progress"]["entry_rejected"] for text in rejected_pages)
assert len(state_manager.GLOBAL_SESSIONS) == config.SESSION_CONCURRENCY_LIMIT
assert len(state_manager.ACTIVE_SESSION_SLOTS) == config.SESSION_CONCURRENCY_LIMIT
assert pages[0].evaluate("() => !!document.getElementById('loading_overlay_group')") is False
overlay_snapshot = _read_progress_overlay_snapshot(pages[-1])
assert overlay_snapshot["present"] is True
assert overlay_snapshot["width"] and overlay_snapshot["width"] > 0
assert overlay_snapshot["height"] and overlay_snapshot["height"] >= 400
assert overlay_snapshot["background"] == "rgba(255, 255, 255, 0.92)"
assert pages[-1].evaluate("() => document.getElementById('robomme_episode_loading_copy') === null") is True
rejection_snapshot = _read_session_wait_overlay_snapshot(pages[-1])
assert rejection_snapshot["content"] == config.UI_TEXT["progress"]["entry_rejected"]
assert rejection_snapshot["wrapperDisplay"] == "block"
assert rejection_snapshot["wrapperVisibility"] == "visible"
assert rejection_snapshot["markdownDisplay"] == "flex"
assert rejection_snapshot["markdownVisibility"] == "visible"
assert _read_unified_overlay_text(pages[-1]) == config.UI_TEXT["progress"]["entry_rejected"]
assert len(state_manager.GLOBAL_SESSIONS) == config.SESSION_CONCURRENCY_LIMIT
browser.close()
finally:
server.should_exit = True
thread.join(timeout=10)
demo.close()
def test_gradio_state_ttl_cleans_up_idle_session(monkeypatch):
state_manager = importlib.reload(importlib.import_module("state_manager"))
user_manager_mod = importlib.reload(importlib.import_module("user_manager"))
importlib.reload(importlib.import_module("gradio_callbacks"))
ui_layout = importlib.reload(importlib.import_module("ui_layout"))
monkeypatch.setattr(ui_layout, "SESSION_TIMEOUT", 2)
closed = []
class _FakeProxy:
def __init__(self, uid):
self.uid = uid
def get_pil_image(self, use_segmented=False):
_ = use_segmented
return Image.new("RGB", (32, 32), color=(10, 20, 30))
def close(self):
closed.append(self.uid)
def fake_init_app(request):
uid = str(getattr(request, "session_hash", "missing"))
state_manager.GLOBAL_SESSIONS[uid] = _FakeProxy(uid)
user_manager_mod.user_manager.session_progress[uid] = {
"completed_count": 0,
"current_env_id": "BinFill",
"current_episode_idx": 1,
}
return _minimal_load_result(uid)
monkeypatch.setattr(ui_layout, "init_app", fake_init_app)
demo = ui_layout.create_ui_blocks()
root_url, demo, server, thread = _mount_demo(demo)
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page(viewport={"width": 1280, "height": 900})
page.goto(root_url, wait_until="domcontentloaded")
page.wait_for_selector("#main_interface_root", state="visible", timeout=15000)
_wait_until(lambda: len(state_manager.GLOBAL_SESSIONS) == 1, timeout_s=8.0)
uid = next(iter(state_manager.GLOBAL_SESSIONS))
assert uid in user_manager_mod.user_manager.session_progress
_wait_until(
lambda: uid in closed
and uid not in state_manager.GLOBAL_SESSIONS
and uid not in user_manager_mod.user_manager.session_progress,
timeout_s=8.0,
)
browser.close()
finally:
server.should_exit = True
thread.join(timeout=10)
demo.close()
def test_single_load_uses_native_episode_loading_copy(monkeypatch):
config = importlib.reload(importlib.import_module("config"))
importlib.reload(importlib.import_module("gradio_callbacks"))
ui_layout = importlib.reload(importlib.import_module("ui_layout"))
def fake_init_app(request):
uid = str(getattr(request, "session_hash", "missing"))
time.sleep(2.5)
return _minimal_load_result(uid)
monkeypatch.setattr(ui_layout, "init_app", fake_init_app)
demo = ui_layout.create_ui_blocks()
root_url, demo, server, thread = _mount_demo(demo)
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page(viewport={"width": 1280, "height": 900})
page.goto(root_url, wait_until="domcontentloaded")
_wait_until(
lambda: _read_progress_overlay_snapshot(page)["present"] is True,
timeout_s=8.0,
)
assert page.evaluate("() => !!document.getElementById('loading_overlay_group')") is False
overlay_snapshot = _read_progress_overlay_snapshot(page)
assert overlay_snapshot["present"] is True
assert overlay_snapshot["width"] and overlay_snapshot["width"] > 0
assert overlay_snapshot["height"] and overlay_snapshot["height"] >= 400
assert overlay_snapshot["background"] == "rgba(255, 255, 255, 0.92)"
assert _read_progress_text(page) == config.UI_TEXT["progress"]["episode_loading"]
assert page.evaluate("() => document.getElementById('robomme_episode_loading_copy') === null") is True
wait_snapshot = _read_session_wait_overlay_snapshot(page)
assert wait_snapshot["content"] is None
page.wait_for_selector("#main_interface_root", state="visible", timeout=15000)
browser.close()
finally:
server.should_exit = True
thread.join(timeout=10)
demo.close()
def test_execute_does_not_use_episode_loading_copy(monkeypatch):
importlib.reload(importlib.import_module("gradio_callbacks"))
ui_layout = importlib.reload(importlib.import_module("ui_layout"))
obs = Image.new("RGB", (32, 32), color=(10, 20, 30))
def fake_init_app(request):
uid = str(getattr(request, "session_hash", "missing"))
return _minimal_load_result(uid, log_text="ready")
def fake_precheck_execute_inputs(uid, option_idx, coords_str):
return None
def fake_switch_to_execute_phase(uid):
return (
gr.update(interactive=False),
gr.update(interactive=False),
gr.update(interactive=False),
gr.update(interactive=False),
gr.update(interactive=False),
gr.update(interactive=False),
gr.update(interactive=False),
)
def fake_execute_step(uid, option_idx, coords_str):
time.sleep(1.5)
return (
gr.update(value=obs, interactive=False),
"executed",
"BinFill (Episode 1)",
"Completed: 0",
gr.update(interactive=True),
gr.update(interactive=True),
gr.update(interactive=True),
gr.update(value=None, visible=False, playback_position=0),
gr.update(visible=True),
gr.update(visible=True),
gr.update(visible=False),
gr.update(choices=[("pick", 0)], value=None, interactive=True),
"No need for coordinates",
gr.update(interactive=True),
gr.update(interactive=True),
True,
"action_point",
)
monkeypatch.setattr(ui_layout, "init_app", fake_init_app)
monkeypatch.setattr(ui_layout, "precheck_execute_inputs", fake_precheck_execute_inputs)
monkeypatch.setattr(ui_layout, "switch_to_execute_phase", fake_switch_to_execute_phase)
monkeypatch.setattr(ui_layout, "execute_step", fake_execute_step)
demo = ui_layout.create_ui_blocks()
root_url, demo, server, thread = _mount_demo(demo)
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page(viewport={"width": 1280, "height": 900})
page.goto(root_url, wait_until="domcontentloaded")
page.wait_for_selector("#main_interface_root", state="visible", timeout=15000)
page.locator("#exec_btn button, button#exec_btn").first.click()
page.wait_for_timeout(500)
body_text = page.evaluate("() => document.body.innerText")
assert "The episode is loading..." not in body_text
assert "Too many users are trying the demo right now. Please try again later." not in body_text
assert page.evaluate("() => document.getElementById('robomme_episode_loading_copy') === null") is True
browser.close()
finally:
server.should_exit = True
thread.join(timeout=10)
demo.close()
def test_rejected_user_stays_rejected_after_active_session_slot_release(monkeypatch):
config = importlib.reload(importlib.import_module("config"))
state_manager = importlib.reload(importlib.import_module("state_manager"))
callbacks = importlib.reload(importlib.import_module("gradio_callbacks"))
ui_layout = importlib.reload(importlib.import_module("ui_layout"))
closed = []
class _FakeProxy:
def __init__(self):
self.env_id = None
self.episode_idx = None
self.language_goal = "goal"
self.available_options = [("pick", 0)]
self.raw_solve_options = [{"label": "a", "action": "pick", "available": False}]
self.demonstration_frames = []
def load_episode(self, env_id, episode_idx):
self.env_id = env_id
self.episode_idx = episode_idx
return Image.new("RGB", (32, 32), color=(10, 20, 30)), "loaded"
def get_pil_image(self, use_segmented=False):
_ = use_segmented
return Image.new("RGB", (32, 32), color=(10, 20, 30))
def close(self):
closed.append((self.env_id, self.episode_idx))
monkeypatch.setattr(state_manager, "ProcessSessionProxy", _FakeProxy)
monkeypatch.setattr(
callbacks.user_manager,
"init_session",
lambda uid: (
True,
"ok",
{"current_task": {"env_id": "BinFill", "episode_idx": 1}, "completed_count": 0},
),
)
monkeypatch.setattr(callbacks, "get_task_hint", lambda env_id: "")
monkeypatch.setattr(callbacks, "should_show_demo_video", lambda env_id: False)
demo = ui_layout.create_ui_blocks()
root_url, demo, server, thread = _mount_demo(demo)
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page1 = browser.new_page(viewport={"width": 1280, "height": 900})
page1.goto(root_url, wait_until="domcontentloaded")
_wait_until(lambda: len(state_manager.GLOBAL_SESSIONS) == 1, timeout_s=15.0)
_wait_until(lambda: _read_progress_text(page1) is None, timeout_s=15.0)
page2 = browser.new_page(viewport={"width": 1280, "height": 900})
page2.goto(root_url, wait_until="domcontentloaded")
_wait_until(lambda: len(state_manager.GLOBAL_SESSIONS) == 2, timeout_s=15.0)
_wait_until(lambda: _read_progress_text(page2) is None, timeout_s=15.0)
assert len(state_manager.GLOBAL_SESSIONS) == config.SESSION_CONCURRENCY_LIMIT
assert len(state_manager.ACTIVE_SESSION_SLOTS) == config.SESSION_CONCURRENCY_LIMIT
page3 = browser.new_page(viewport={"width": 1280, "height": 900})
page3.goto(root_url, wait_until="domcontentloaded")
_wait_until(
lambda: _read_session_wait_overlay_snapshot(page3)["visible"] is True,
timeout_s=10.0,
)
time.sleep(1.0)
assert len(state_manager.GLOBAL_SESSIONS) == config.SESSION_CONCURRENCY_LIMIT
assert len(state_manager.ACTIVE_SESSION_SLOTS) == config.SESSION_CONCURRENCY_LIMIT
assert _read_session_wait_overlay_snapshot(page3)["visible"] is True
reject_snapshot = _read_session_wait_overlay_snapshot(page3)
assert reject_snapshot["content"] == config.UI_TEXT["progress"]["entry_rejected"]
assert reject_snapshot["proseBackground"] == "rgba(0, 0, 0, 0)"
assert reject_snapshot["proseBorderRadius"] == "0px"
assert reject_snapshot["proseBoxShadow"] == "none"
page1.close()
_wait_until(lambda: len(closed) >= 1, timeout_s=10.0)
_wait_until(lambda: len(state_manager.GLOBAL_SESSIONS) == config.SESSION_CONCURRENCY_LIMIT - 1, timeout_s=10.0)
_wait_until(lambda: len(state_manager.ACTIVE_SESSION_SLOTS) == config.SESSION_CONCURRENCY_LIMIT - 1, timeout_s=10.0)
time.sleep(1.0)
assert _read_session_wait_overlay_snapshot(page3)["visible"] is True
assert _read_unified_overlay_text(page3) == config.UI_TEXT["progress"]["entry_rejected"]
browser.close()
finally:
server.should_exit = True
thread.join(timeout=10)
demo.close()
def test_second_and_later_late_users_show_rejected_overlay(monkeypatch):
config = importlib.reload(importlib.import_module("config"))
state_manager = importlib.reload(importlib.import_module("state_manager"))
callbacks = importlib.reload(importlib.import_module("gradio_callbacks"))
ui_layout = importlib.reload(importlib.import_module("ui_layout"))
class _FakeProxy:
def __init__(self):
self.env_id = None
self.episode_idx = None
self.language_goal = "goal"
self.available_options = [("pick", 0)]
self.raw_solve_options = [{"label": "a", "action": "pick", "available": False}]
self.demonstration_frames = []
def load_episode(self, env_id, episode_idx):
self.env_id = env_id
self.episode_idx = episode_idx
return Image.new("RGB", (32, 32), color=(10, 20, 30)), "loaded"
def get_pil_image(self, use_segmented=False):
_ = use_segmented
return Image.new("RGB", (32, 32), color=(10, 20, 30))
def close(self):
return None
monkeypatch.setattr(state_manager, "ProcessSessionProxy", _FakeProxy)
monkeypatch.setattr(
callbacks.user_manager,
"init_session",
lambda uid: (
True,
"ok",
{"current_task": {"env_id": "BinFill", "episode_idx": 1}, "completed_count": 0},
),
)
monkeypatch.setattr(callbacks, "get_task_hint", lambda env_id: "")
monkeypatch.setattr(callbacks, "should_show_demo_video", lambda env_id: False)
demo = ui_layout.create_ui_blocks()
root_url, demo, server, thread = _mount_demo(demo)
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
active_pages = []
for _ in range(config.SESSION_CONCURRENCY_LIMIT):
page = browser.new_page(viewport={"width": 1280, "height": 900})
page.goto(root_url, wait_until="domcontentloaded")
active_pages.append(page)
time.sleep(0.25)
_wait_until(
lambda: len(state_manager.GLOBAL_SESSIONS) == config.SESSION_CONCURRENCY_LIMIT
and len(state_manager.ACTIVE_SESSION_SLOTS) == config.SESSION_CONCURRENCY_LIMIT,
timeout_s=15.0,
)
waiting_page = browser.new_page(viewport={"width": 1280, "height": 900})
waiting_page.goto(root_url, wait_until="domcontentloaded")
queued_pages = []
for _ in range(2):
page = browser.new_page(viewport={"width": 1280, "height": 900})
page.goto(root_url, wait_until="domcontentloaded")
queued_pages.append(page)
time.sleep(0.25)
rejected_pages = [waiting_page, *queued_pages]
def _rejected_pages_ready():
texts = [_read_unified_overlay_text(page) for page in rejected_pages]
return all(text == config.UI_TEXT["progress"]["entry_rejected"] for text in texts)
_wait_until(_rejected_pages_ready, timeout_s=10.0)
assert len(state_manager.GLOBAL_SESSIONS) == config.SESSION_CONCURRENCY_LIMIT
for page in rejected_pages:
overlay_snapshot = _read_session_wait_overlay_snapshot(page)
assert overlay_snapshot["present"] is True
assert overlay_snapshot["width"] and overlay_snapshot["width"] > 0
assert overlay_snapshot["height"] and overlay_snapshot["height"] >= 400
assert str(overlay_snapshot["content"] or "") == config.UI_TEXT["progress"]["entry_rejected"]
assert overlay_snapshot["proseBackground"] == "rgba(0, 0, 0, 0)"
assert overlay_snapshot["proseBorderRadius"] == "0px"
assert overlay_snapshot["proseBoxShadow"] == "none"
browser.close()
finally:
server.should_exit = True
thread.join(timeout=10)
demo.close()
def test_active_user_execute_is_not_blocked_by_rejected_init_loads(monkeypatch):
config = importlib.reload(importlib.import_module("config"))
state_manager = importlib.reload(importlib.import_module("state_manager"))
callbacks = importlib.reload(importlib.import_module("gradio_callbacks"))
ui_layout = importlib.reload(importlib.import_module("ui_layout"))
class _FakeProxy:
def __init__(self):
self.env_id = None
self.episode_idx = None
self.language_goal = "goal"
self.available_options = [("pick", 0)]
self.raw_solve_options = [{"label": "a", "action": "pick", "available": False}]
self.demonstration_frames = []
self.base_frames = [Image.new("RGB", (8, 8), color=(1, 2, 3))]
self.difficulty = None
self.seed = None
def load_episode(self, env_id, episode_idx):
self.env_id = env_id
self.episode_idx = episode_idx
return Image.new("RGB", (32, 32), color=(10, 20, 30)), "loaded"
def get_pil_image(self, use_segmented=False):
_ = use_segmented
return Image.new("RGB", (32, 32), color=(10, 20, 30))
def execute_action(self, option_idx, click_coords):
_ = option_idx, click_coords
time.sleep(0.5)
return Image.new("RGB", (32, 32), color=(30, 40, 50)), "SUCCESS", False
def update_observation(self, use_segmentation=False):
_ = use_segmentation
return self.get_pil_image(), ""
def close(self):
return None
monkeypatch.setattr(state_manager, "ProcessSessionProxy", _FakeProxy)
monkeypatch.setattr(
callbacks.user_manager,
"init_session",
lambda uid: (
True,
"ok",
{"current_task": {"env_id": "BinFill", "episode_idx": 1}, "completed_count": 0},
),
)
monkeypatch.setattr(callbacks, "get_task_hint", lambda env_id: "")
monkeypatch.setattr(callbacks, "should_show_demo_video", lambda env_id: False)
demo = ui_layout.create_ui_blocks()
root_url, demo, server, thread = _mount_demo(demo)
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page1 = browser.new_page(viewport={"width": 1280, "height": 900})
page1.goto(root_url, wait_until="domcontentloaded")
_wait_until(
lambda: (_read_log_output_value(page1) or "").startswith("Please select the action."),
timeout_s=15.0,
)
page2 = browser.new_page(viewport={"width": 1280, "height": 900})
page2.goto(root_url, wait_until="domcontentloaded")
_wait_until(
lambda: (_read_log_output_value(page2) or "").startswith("Please select the action."),
timeout_s=15.0,
)
waiting_page = browser.new_page(viewport={"width": 1280, "height": 900})
waiting_page.goto(root_url, wait_until="domcontentloaded")
queued_page = browser.new_page(viewport={"width": 1280, "height": 900})
queued_page.goto(root_url, wait_until="domcontentloaded")
_wait_until(lambda: _read_session_wait_overlay_snapshot(waiting_page)["visible"] is True, timeout_s=10.0)
_wait_until(lambda: _read_session_wait_overlay_snapshot(queued_page)["visible"] is True, timeout_s=10.0)
page1.locator("#action_radio input[type='radio']").first.check()
page1.locator("#exec_btn button, button#exec_btn").first.click()
_wait_until(
lambda: "SUCCESS" in (_read_log_output_value(page1) or ""),
timeout_s=6.0,
)
page1_progress = _read_progress_text(page1) or ""
assert config.UI_TEXT["progress"]["entry_rejected"] not in page1_progress
assert _read_session_wait_overlay_snapshot(waiting_page)["visible"] is True
assert _read_session_wait_overlay_snapshot(queued_page)["visible"] is True
assert _read_unified_overlay_text(waiting_page) == config.UI_TEXT["progress"]["entry_rejected"]
assert _read_unified_overlay_text(queued_page) == config.UI_TEXT["progress"]["entry_rejected"]
browser.close()
finally:
server.should_exit = True
thread.join(timeout=10)
demo.close()
def test_rejected_page_requires_refresh_after_idle_session_ttl_release(monkeypatch):
config = importlib.reload(importlib.import_module("config"))
state_manager = importlib.reload(importlib.import_module("state_manager"))
callbacks = importlib.reload(importlib.import_module("gradio_callbacks"))
ui_layout = importlib.reload(importlib.import_module("ui_layout"))
monkeypatch.setattr(ui_layout, "SESSION_TIMEOUT", 2)
class _FakeProxy:
def __init__(self):
self.env_id = None
self.episode_idx = None
self.language_goal = "goal"
self.available_options = [("pick", 0)]
self.raw_solve_options = [{"label": "a", "action": "pick", "available": False}]
self.demonstration_frames = []
def load_episode(self, env_id, episode_idx):
self.env_id = env_id
self.episode_idx = episode_idx
return Image.new("RGB", (32, 32), color=(10, 20, 30)), "loaded"
def get_pil_image(self, use_segmented=False):
_ = use_segmented
return Image.new("RGB", (32, 32), color=(10, 20, 30))
def close(self):
return None
monkeypatch.setattr(state_manager, "ProcessSessionProxy", _FakeProxy)
monkeypatch.setattr(
callbacks.user_manager,
"init_session",
lambda uid: (
True,
"ok",
{"current_task": {"env_id": "BinFill", "episode_idx": 1}, "completed_count": 0},
),
)
monkeypatch.setattr(callbacks, "get_task_hint", lambda env_id: "")
monkeypatch.setattr(callbacks, "should_show_demo_video", lambda env_id: False)
demo = ui_layout.create_ui_blocks()
root_url, demo, server, thread = _mount_demo(demo)
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
active_pages = []
for _ in range(config.SESSION_CONCURRENCY_LIMIT):
page = browser.new_page(viewport={"width": 1280, "height": 900})
page.goto(root_url, wait_until="domcontentloaded")
active_pages.append(page)
time.sleep(0.25)
_wait_until(
lambda: len(state_manager.GLOBAL_SESSIONS) == config.SESSION_CONCURRENCY_LIMIT
and len(state_manager.ACTIVE_SESSION_SLOTS) == config.SESSION_CONCURRENCY_LIMIT,
timeout_s=15.0,
)
rejected_page = browser.new_page(viewport={"width": 1280, "height": 900})
rejected_page.goto(root_url, wait_until="domcontentloaded")
_wait_until(
lambda: _read_session_wait_overlay_snapshot(rejected_page)["visible"] is True,
timeout_s=10.0,
)
_wait_until(
lambda: len(state_manager.ACTIVE_SESSION_SLOTS) < config.SESSION_CONCURRENCY_LIMIT,
timeout_s=10.0,
)
time.sleep(1.0)
assert _read_session_wait_overlay_snapshot(rejected_page)["visible"] is True
assert _read_unified_overlay_text(rejected_page) == config.UI_TEXT["progress"]["entry_rejected"]
rejected_page.reload(wait_until="domcontentloaded")
_wait_until(
lambda: (_read_log_output_value(rejected_page) or "").strip() == "Please select the action.\nActions with 🎯 need to select a point on the image as input",
timeout_s=15.0,
)
assert _read_session_wait_overlay_snapshot(rejected_page)["visible"] is False
browser.close()
finally:
server.should_exit = True
thread.join(timeout=10)
demo.close()