RoboMME / gradio-web /test /test_ui_phase_machine_runtime_e2e.py
HongzeFu's picture
unified system log
41a86ed
from __future__ import annotations
import contextlib
import importlib
import socket
import threading
import time
from urllib.error import URLError
from urllib.request import urlopen
import numpy as np
import pytest
from PIL import Image
gr = pytest.importorskip("gradio")
pytest.importorskip("fastapi")
pytest.importorskip("uvicorn")
pytest.importorskip("playwright.sync_api")
import uvicorn
from fastapi import FastAPI
from playwright.sync_api import sync_playwright
def _free_port() -> int:
with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.bind(("127.0.0.1", 0))
return int(sock.getsockname()[1])
def _wait_http_ready(url: str, timeout_s: float = 20.0) -> None:
end = time.time() + timeout_s
while time.time() < end:
try:
with urlopen(url, timeout=1.0) as resp: # noqa: S310 - local test URL only
if int(getattr(resp, "status", 200)) < 500:
return
except URLError:
time.sleep(0.2)
except Exception:
time.sleep(0.2)
raise RuntimeError(f"Server did not become ready: {url}")
def _resolve_button_snapshot(page, elem_id: str) -> dict[str, str | bool | None]:
return page.evaluate(
"""(elemId) => {
const button = document.querySelector(`#${elemId} button`) || document.querySelector(`button#${elemId}`);
if (!button) {
return {
found: false,
disabled: null,
backgroundColor: null,
borderColor: null,
color: null,
};
}
const style = getComputedStyle(button);
return {
found: true,
disabled: button.disabled,
backgroundColor: style.backgroundColor,
borderColor: style.borderColor,
color: style.color,
};
}""",
elem_id,
)
def _read_header_task_value(page) -> str | None:
return page.evaluate(
"""() => {
const root = document.getElementById('header_task');
if (!root) return null;
const input = root.querySelector('input');
if (input && typeof input.value === 'string') {
const value = input.value.trim();
return value || null;
}
const selected = root.querySelector('.single-select');
if (!selected) return null;
const text = (selected.textContent || '').trim();
return text || null;
}"""
)
def _read_header_goal_value(page) -> str | None:
return page.evaluate(
"""() => {
const root = document.getElementById('header_goal');
if (!root) return null;
const field = root.querySelector('textarea, input');
if (!field) return null;
const value = typeof field.value === 'string' ? field.value.trim() : '';
return value || null;
}"""
)
def _read_coords_box_value(page) -> str | None:
return page.evaluate(
"""() => {
const root = document.getElementById('coords_box');
if (!root) return null;
const field = root.querySelector('textarea, input');
if (!field) return null;
const value = typeof field.value === 'string' ? field.value.trim() : '';
return value || null;
}"""
)
def _read_log_output_value(page) -> str | None:
return page.evaluate(
"""() => {
const root = document.getElementById('log_output');
if (!root) return null;
const field = root.querySelector('textarea, input');
if (field && typeof field.value === 'string') {
const value = field.value.trim();
return value || null;
}
const value = (root.textContent || '').trim();
return value || null;
}"""
)
def _read_progress_markdown_snapshot(page) -> dict[str, bool | str | None]:
return page.evaluate(
"""() => {
const host = document.getElementById('native_progress_host');
const pending = host?.querySelector('.pending');
const markdown = host?.querySelector('[data-testid="markdown"]');
const prose = markdown ? markdown.querySelector('.prose, .md') || markdown : null;
if (!host) {
return {
pendingPresent: false,
pendingVisible: false,
markdownVisible: false,
text: null,
};
}
const pendingStyle = pending ? getComputedStyle(pending) : null;
const markdownStyle = markdown ? getComputedStyle(markdown) : null;
const text = prose ? ((prose.textContent || '').trim()) : '';
return {
pendingPresent: !!pending,
pendingVisible: !!pendingStyle && pendingStyle.display !== 'none' && pendingStyle.visibility !== 'hidden',
markdownVisible:
!!markdownStyle &&
markdownStyle.display !== 'none' &&
markdownStyle.visibility !== 'hidden',
text: text || null,
};
}"""
)
def _read_progress_text_snapshot(page) -> dict[str, float | bool | str | None]:
return page.evaluate(
"""() => {
const node = document.querySelector('.progress-text');
if (!node) {
return {
present: false,
visible: false,
text: null,
x: null,
y: null,
width: null,
height: null,
};
}
const style = getComputedStyle(node);
const rect = node.getBoundingClientRect();
return {
present: true,
visible:
style.display !== 'none' &&
style.visibility !== 'hidden' &&
Number.parseFloat(style.opacity || '1') > 0 &&
rect.width > 0 &&
rect.height > 0,
text: (node.textContent || '').trim() || null,
x: rect.x,
y: rect.y,
width: rect.width,
height: rect.height,
};
}"""
)
def _read_elem_classes(page, elem_id: str) -> list[str] | None:
return page.evaluate(
"""(elemId) => {
const root = document.getElementById(elemId);
return root ? Array.from(root.classList) : null;
}""",
elem_id,
)
def _read_media_card_wait_snapshot(page) -> dict[str, str | float | None]:
return page.evaluate(
"""() => {
const card = document.getElementById('media_card');
if (!card) {
return {
opacity: null,
borderColor: null,
boxShadow: null,
animationName: null,
};
}
const style = getComputedStyle(card, '::after');
return {
opacity: Number.parseFloat(style.opacity || '0'),
borderColor: style.borderColor || null,
boxShadow: style.boxShadow || null,
animationName: style.animationName || null,
};
}"""
)
def _read_live_obs_transform_snapshot(page) -> dict[str, str | None]:
return page.evaluate(
"""() => {
const img = document.querySelector('#live_obs img');
const frame = document.querySelector('#live_obs .image-frame');
return {
imgTransform: img ? getComputedStyle(img).transform : null,
frameTransform: frame ? getComputedStyle(frame).transform : null,
};
}"""
)
def _read_phase_visibility(page) -> dict[str, bool | str | None]:
return page.evaluate(
"""() => {
const visible = (id) => {
const el = document.getElementById(id);
if (!el) return false;
const st = getComputedStyle(el);
return st.display !== 'none' && st.visibility !== 'hidden' && el.getClientRects().length > 0;
};
const videoEl = document.querySelector('#demo_video video');
const executeVideoEl = document.querySelector('#execute_video video');
return {
videoPhase: visible('video_phase_group'),
video: visible('demo_video'),
executionVideoPhase: visible('execution_video_group'),
executionVideo: visible('execute_video'),
watchButton: visible('watch_demo_video_btn'),
actionPhase: visible('action_phase_group'),
action: visible('live_obs'),
controlPhase: visible('control_panel_group'),
control: visible('action_radio'),
currentSrc: videoEl ? videoEl.currentSrc : null,
executeCurrentSrc: executeVideoEl ? executeVideoEl.currentSrc : null,
};
}"""
)
def _read_demo_video_controls(page, elem_id: str = "demo_video", button_elem_id: str | None = "watch_demo_video_btn") -> dict[str, bool | None]:
return page.evaluate(
"""({ elemId, buttonElemId }) => {
const visible = (id) => {
if (!id) return false;
const el = document.getElementById(id);
if (!el) return false;
const st = getComputedStyle(el);
return st.display !== 'none' && st.visibility !== 'hidden' && el.getClientRects().length > 0;
};
const videoEl = document.querySelector(`#${elemId} video`);
const button = buttonElemId
? (document.querySelector(`#${buttonElemId} button`) || document.querySelector(`button#${buttonElemId}`))
: null;
return {
videoVisible: visible(elemId),
buttonVisible: visible(buttonElemId),
buttonDisabled: button ? button.disabled : null,
autoplay: videoEl ? videoEl.autoplay : null,
paused: videoEl ? videoEl.paused : null,
};
}""",
{"elemId": elem_id, "buttonElemId": button_elem_id},
)
def _click_demo_video_button(page) -> None:
page.locator("#watch_demo_video_btn button, button#watch_demo_video_btn").first.click()
def _dispatch_video_event(page, event_name: str, elem_id: str = "demo_video") -> bool:
return page.evaluate(
"""({ eventName, elemId }) => {
const targets = [
document.querySelector(`#${elemId} video`),
document.getElementById(elemId),
].filter(Boolean);
if (!targets.length) return false;
for (const target of targets) {
target.dispatchEvent(new Event(eventName, { bubbles: true, composed: true }));
}
return true;
}""",
{"eventName": event_name, "elemId": elem_id},
)
def _read_live_obs_geometry(page) -> dict[str, dict[str, float] | None]:
return page.evaluate(
"""() => {
const root = document.getElementById('live_obs');
const container = root?.querySelector('.image-container');
const uploadContainer = root?.querySelector('.upload-container');
const frame = root?.querySelector('.image-frame');
const img = root?.querySelector('img');
const measure = (node) => {
if (!node) return null;
const rect = node.getBoundingClientRect();
return { width: rect.width, height: rect.height };
};
return {
root: measure(root),
container: measure(container),
uploadContainer: measure(uploadContainer),
frame: measure(frame),
img: measure(img),
};
}"""
)
def _read_font_probe_snapshot(page) -> dict[str, str | None]:
return page.evaluate(
"""() => {
const heading = document.querySelector('#header_title h2');
const field = document.querySelector('#font_probe textarea, #font_probe input');
const prose = document.querySelector('#body_probe p');
const readSize = (node) => (node ? getComputedStyle(node).fontSize : null);
return {
header: readSize(heading),
field: readSize(field),
body: readSize(prose),
};
}"""
)
def _read_theme_snapshot(page) -> dict[str, str | bool | None]:
return page.evaluate(
"""() => {
const html = document.documentElement;
const body = document.body;
const overlay = document.getElementById('loading_overlay_group');
const readStore = (store, key) => {
try {
return store.getItem(key);
} catch (error) {
return null;
}
};
return {
htmlHasDark: html ? html.classList.contains('dark') : null,
bodyHasDark: body ? body.classList.contains('dark') : null,
htmlTheme: html ? html.dataset.theme || null : null,
bodyTheme: body ? body.dataset.theme || null : null,
htmlInlineColorScheme: html ? html.style.colorScheme || null : null,
bodyInlineColorScheme: body ? body.style.colorScheme || null : null,
htmlColorScheme: html ? getComputedStyle(html).colorScheme : null,
bodyColorScheme: body ? getComputedStyle(body).colorScheme : null,
overlayBackground: overlay ? getComputedStyle(overlay).backgroundColor : null,
storedTheme: readStore(window.localStorage, 'theme'),
storedGradioTheme: readStore(window.localStorage, 'gradio-theme'),
};
}"""
)
@pytest.fixture
def font_size_probe_ui_url(monkeypatch):
config_module = importlib.reload(importlib.import_module("config"))
monkeypatch.setattr(config_module, "UI_GLOBAL_FONT_SIZE", "32px")
ui_layout = importlib.reload(importlib.import_module("ui_layout"))
with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.bind(("127.0.0.1", 0))
port = int(sock.getsockname()[1])
with gr.Blocks(title="Native font size probe test") as demo:
gr.Markdown("## RoboMME Human Evaluation", elem_id="header_title")
gr.Textbox(value="font probe", label="Probe", elem_id="font_probe")
gr.Markdown("Probe body text", elem_id="body_probe")
_app, root_url, _share_url = demo.launch(
server_name="127.0.0.1",
server_port=port,
prevent_thread_lock=True,
quiet=True,
show_error=True,
ssr_mode=False,
css=ui_layout.CSS,
)
_wait_http_ready(root_url)
try:
yield root_url
finally:
demo.close()
@pytest.fixture
def phase_machine_ui_url():
state = {"precheck_calls": 0, "play_clicks": 0}
demo_video_url = "https://interactive-examples.mdn.mozilla.net/media/cc0-videos/flower.mp4"
execution_video_path = gr.get_video("world.mp4")
ui_layout = importlib.reload(importlib.import_module("ui_layout"))
with gr.Blocks(title="Native phase machine test") as demo:
gr.HTML(f"<style>{ui_layout.CSS}</style>")
phase_state = gr.State("init")
post_execute_exec_state = gr.State(True)
with gr.Column(visible=True, elem_id="login_group") as login_group:
login_btn = gr.Button("Login", elem_id="login_btn")
with gr.Column(visible=False, elem_id="main_interface") as main_interface:
with gr.Column(visible=False, elem_id="video_phase_group") as video_phase_group:
video_display = gr.Video(value=None, elem_id="demo_video", autoplay=False)
watch_demo_video_btn = gr.Button(
"Watch Video Input🎬",
elem_id="watch_demo_video_btn",
interactive=False,
visible=False,
)
with gr.Column(visible=False, elem_id="execution_video_group") as execution_video_group:
execute_video_display = gr.Video(value=None, elem_id="execute_video", autoplay=True)
with gr.Column(visible=False, elem_id="action_phase_group") as action_phase_group:
img_display = gr.Image(value=np.zeros((24, 24, 3), dtype=np.uint8), elem_id="live_obs")
with gr.Column(visible=False, elem_id="control_panel_group") as control_panel_group:
options_radio = gr.Radio(choices=[("pick", 0)], value=0, elem_id="action_radio")
coords_box = gr.Textbox(value="please click the point selection image", elem_id="coords_box")
with gr.Column(visible=False, elem_id="action_buttons_row") as action_buttons_row:
exec_btn = gr.Button("EXECUTE", elem_id="exec_btn")
reference_action_btn = gr.Button(
"Ground Truth Action",
elem_id="reference_action_btn",
interactive=False,
)
next_task_btn = gr.Button("Next Task", elem_id="next_task_btn")
task_hint_display = gr.Textbox(value="hint", interactive=True, elem_id="task_hint_display")
log_output = gr.Markdown("", elem_id="log_output")
simulate_stop_btn = gr.Button("Simulate Stop", elem_id="simulate_stop_btn")
demo.load(
fn=None,
js=ui_layout.DEMO_VIDEO_PLAY_BINDING_JS,
queue=False,
)
def login_fn():
return (
gr.update(visible=False),
gr.update(visible=True),
gr.update(visible=True),
gr.update(value=demo_video_url, visible=True),
gr.update(visible=True, interactive=True),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(interactive=False),
gr.update(value="please click the point selection image"),
gr.update(visible=False),
"demo_video",
)
def on_play_demo_fn():
state["play_clicks"] += 1
return gr.update(visible=True, interactive=False)
def on_simulate_stop_fn():
return "stopped"
def on_video_end_fn():
return (
gr.update(visible=False),
gr.update(visible=True),
gr.update(visible=True),
gr.update(visible=True),
gr.update(interactive=True),
gr.update(visible=False, interactive=False),
"action_point",
)
def on_execute_video_end_fn(exec_enabled):
return (
gr.update(visible=False),
gr.update(visible=True),
gr.update(visible=True),
gr.update(interactive=True),
gr.update(interactive=bool(exec_enabled)),
gr.update(interactive=True),
gr.update(interactive=False),
gr.update(interactive=True),
gr.update(interactive=True),
"action_point",
)
def precheck_fn(_option_idx, _coords):
state["precheck_calls"] += 1
if state["precheck_calls"] == 1:
raise gr.Error("please click the point selection image before execute!")
def to_execute_fn():
return (
gr.update(interactive=False),
gr.update(interactive=False),
gr.update(interactive=False),
gr.update(interactive=False),
gr.update(interactive=False),
gr.update(interactive=False),
)
def execute_fn():
time.sleep(0.8)
return (
"executed",
gr.update(visible=False),
gr.update(interactive=False),
gr.update(interactive=False),
gr.update(value=execution_video_path, visible=True, playback_position=0),
gr.update(visible=True),
gr.update(interactive=False),
"No need for coordinates",
gr.update(interactive=False),
gr.update(interactive=False),
True,
"execution_video",
)
login_btn.click(
fn=login_fn,
outputs=[
login_group,
main_interface,
video_phase_group,
video_display,
watch_demo_video_btn,
action_phase_group,
control_panel_group,
action_buttons_row,
reference_action_btn,
coords_box,
execution_video_group,
phase_state,
],
queue=False,
)
watch_demo_video_btn.click(
fn=on_play_demo_fn,
outputs=[watch_demo_video_btn],
queue=False,
)
video_display.end(
fn=on_video_end_fn,
outputs=[
video_phase_group,
action_phase_group,
control_panel_group,
action_buttons_row,
reference_action_btn,
watch_demo_video_btn,
phase_state,
],
queue=False,
)
video_display.stop(
fn=on_video_end_fn,
outputs=[
video_phase_group,
action_phase_group,
control_panel_group,
action_buttons_row,
reference_action_btn,
watch_demo_video_btn,
phase_state,
],
queue=False,
)
execute_video_display.end(
fn=on_execute_video_end_fn,
inputs=[post_execute_exec_state],
outputs=[
execution_video_group,
action_phase_group,
control_panel_group,
options_radio,
exec_btn,
next_task_btn,
img_display,
reference_action_btn,
task_hint_display,
phase_state,
],
queue=False,
)
execute_video_display.stop(
fn=on_execute_video_end_fn,
inputs=[post_execute_exec_state],
outputs=[
execution_video_group,
action_phase_group,
control_panel_group,
options_radio,
exec_btn,
next_task_btn,
img_display,
reference_action_btn,
task_hint_display,
phase_state,
],
queue=False,
)
simulate_stop_btn.click(
fn=on_simulate_stop_fn,
outputs=[log_output],
js="""() => {
const show = (id, visible) => {
const el = document.getElementById(id);
if (!el) return;
el.style.display = visible ? '' : 'none';
};
show('video_phase_group', false);
show('demo_video', false);
show('execution_video_group', false);
show('execute_video', false);
show('action_phase_group', true);
show('live_obs', true);
show('control_panel_group', true);
show('action_radio', true);
show('action_buttons_row', true);
show('watch_demo_video_btn', false);
const refBtn =
document.querySelector('#reference_action_btn button') ||
document.querySelector('button#reference_action_btn');
if (refBtn) {
refBtn.disabled = false;
}
return [];
}""",
queue=False,
)
exec_btn.click(
fn=precheck_fn,
inputs=[options_radio, coords_box],
outputs=[],
queue=False,
).then(
fn=to_execute_fn,
outputs=[
options_radio,
exec_btn,
next_task_btn,
img_display,
reference_action_btn,
task_hint_display,
],
queue=False,
).then(
fn=execute_fn,
outputs=[
log_output,
action_phase_group,
exec_btn,
next_task_btn,
execute_video_display,
execution_video_group,
options_radio,
coords_box,
reference_action_btn,
task_hint_display,
post_execute_exec_state,
phase_state,
],
queue=False,
)
port = _free_port()
host = "127.0.0.1"
root_url = f"http://{host}:{port}/"
app = FastAPI(title="native-phase-machine-test")
app = gr.mount_gradio_app(app, demo, path="/")
config = uvicorn.Config(app, host=host, port=port, log_level="error")
server = uvicorn.Server(config)
thread = threading.Thread(target=server.run, daemon=True)
thread.start()
_wait_http_ready(root_url)
try:
yield root_url, state
finally:
server.should_exit = True
thread.join(timeout=10)
demo.close()
def test_global_font_size_applies_except_header_title(font_size_probe_ui_url):
root_url = font_size_probe_ui_url
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page(viewport={"width": 1280, "height": 900})
page.goto(root_url, wait_until="domcontentloaded")
page.wait_for_selector("#header_title h2", timeout=10000)
page.wait_for_selector("#font_probe textarea, #font_probe input", timeout=10000)
page.wait_for_selector("#body_probe p", timeout=10000)
page.wait_for_function(
"""() => {
const heading = document.querySelector('#header_title h2');
const field = document.querySelector('#font_probe textarea, #font_probe input');
const prose = document.querySelector('#body_probe p');
if (!heading || !field || !prose) return false;
return (
getComputedStyle(heading).fontSize === '26px' &&
getComputedStyle(field).fontSize === '32px' &&
getComputedStyle(prose).fontSize === '32px'
);
}""",
timeout=10000,
)
snapshot = _read_font_probe_snapshot(page)
assert snapshot["header"] == "26px"
assert snapshot["field"] == "32px"
assert snapshot["body"] == "32px"
assert snapshot["header"] != snapshot["field"]
browser.close()
def test_create_ui_blocks_stays_light_under_dark_system_preference(monkeypatch):
ui_layout = importlib.reload(importlib.import_module("ui_layout"))
fake_obs = np.zeros((24, 24, 3), dtype=np.uint8)
fake_obs_img = Image.fromarray(fake_obs)
def fake_init_app(_request):
return (
"uid-1",
gr.update(visible=True),
fake_obs_img,
"ready",
gr.update(choices=[("pick", 0)], value=None),
"goal",
"No need for coordinates",
gr.update(value=None, visible=False),
gr.update(visible=False, interactive=False),
"PickXtimes (Episode 1)",
"Completed: 0",
gr.update(interactive=True),
gr.update(interactive=True),
gr.update(interactive=True),
gr.update(visible=False),
gr.update(visible=True),
gr.update(visible=True),
gr.update(value="hint"),
gr.update(visible=False),
gr.update(interactive=True),
)
monkeypatch.setattr(ui_layout, "init_app", fake_init_app)
demo = ui_layout.create_ui_blocks()
port = _free_port()
_app, root_url, _share_url = demo.launch(
server_name="127.0.0.1",
server_port=port,
prevent_thread_lock=True,
quiet=True,
show_error=True,
ssr_mode=False,
theme=ui_layout.APP_THEME,
css=ui_layout.CSS,
head=ui_layout.THEME_LOCK_HEAD,
)
_wait_http_ready(root_url)
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context(
viewport={"width": 1280, "height": 900},
color_scheme="dark",
)
context.add_init_script(
"""
window.localStorage.setItem('theme', 'dark');
window.localStorage.setItem('gradio-theme', 'dark');
"""
)
page = context.new_page()
page.goto(root_url, wait_until="domcontentloaded")
page.wait_for_function(
"""() => {
const html = document.documentElement;
const body = document.body;
if (!html || !body) return false;
return (
typeof window.__robommeForceLightTheme === 'function' &&
!html.classList.contains('dark') &&
!body.classList.contains('dark') &&
html.dataset.theme === 'light' &&
body.dataset.theme === 'light' &&
html.style.colorScheme === 'light' &&
body.style.colorScheme === 'light'
);
}""",
timeout=15000,
)
snapshot = _read_theme_snapshot(page)
assert snapshot["htmlHasDark"] is False
assert snapshot["bodyHasDark"] is False
assert snapshot["htmlTheme"] == "light"
assert snapshot["bodyTheme"] == "light"
assert snapshot["htmlInlineColorScheme"] == "light"
assert snapshot["bodyInlineColorScheme"] == "light"
assert snapshot["storedTheme"] == "light"
assert snapshot["storedGradioTheme"] == "light"
page.reload(wait_until="domcontentloaded")
page.wait_for_function(
"""() => {
const html = document.documentElement;
const body = document.body;
return (
!!html &&
!!body &&
typeof window.__robommeForceLightTheme === 'function' &&
!html.classList.contains('dark') &&
!body.classList.contains('dark') &&
html.dataset.theme === 'light' &&
body.dataset.theme === 'light'
);
}""",
timeout=15000,
)
reloaded_snapshot = _read_theme_snapshot(page)
assert reloaded_snapshot["htmlHasDark"] is False
assert reloaded_snapshot["bodyHasDark"] is False
assert reloaded_snapshot["htmlInlineColorScheme"] == "light"
assert reloaded_snapshot["bodyInlineColorScheme"] == "light"
assert reloaded_snapshot["storedTheme"] == "light"
assert reloaded_snapshot["storedGradioTheme"] == "light"
context.close()
browser.close()
finally:
demo.close()
def test_phase_machine_runtime_flow_and_execute_precheck(phase_machine_ui_url):
root_url, state = phase_machine_ui_url
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page(viewport={"width": 1280, "height": 900})
page.goto(root_url, wait_until="domcontentloaded")
page.wait_for_timeout(2500)
page.wait_for_selector("#login_btn", timeout=20000)
page.click("#login_btn")
page.wait_for_function(
"""() => {
const el = document.getElementById('demo_video');
return !!el && getComputedStyle(el).display !== 'none';
}"""
)
phase_after_login = page.evaluate(
"""() => {
const visible = (id) => {
const el = document.getElementById(id);
if (!el) return false;
const st = getComputedStyle(el);
return st.display !== 'none' && st.visibility !== 'hidden' && el.getClientRects().length > 0;
};
return {
video: visible('demo_video'),
watchButton: visible('watch_demo_video_btn'),
action: visible('live_obs'),
control: visible('action_radio'),
};
}"""
)
assert phase_after_login == {
"video": True,
"watchButton": True,
"action": False,
"control": False,
}
page.wait_for_selector("#demo_video video", timeout=5000)
page.wait_for_function(
"""() => {
const videoEl = document.querySelector('#demo_video video');
const button =
document.querySelector('#watch_demo_video_btn button') ||
document.querySelector('button#watch_demo_video_btn');
return !!videoEl && !!videoEl.currentSrc && !!button && button.disabled === false && videoEl.paused === true;
}""",
timeout=10000,
)
controls_after_login = _read_demo_video_controls(page)
assert controls_after_login["videoVisible"] is True
assert controls_after_login["buttonVisible"] is True
assert controls_after_login["buttonDisabled"] is False
assert controls_after_login["autoplay"] is False
assert controls_after_login["paused"] is True
_click_demo_video_button(page)
page.wait_for_function(
"""() => {
const videoEl = document.querySelector('#demo_video video');
const button =
document.querySelector('#watch_demo_video_btn button') ||
document.querySelector('button#watch_demo_video_btn');
if (!videoEl || !button) return false;
return button.disabled === true && (videoEl.paused === false || videoEl.currentTime > 0);
}""",
timeout=10000,
)
controls_after_click = _read_demo_video_controls(page)
assert controls_after_click["buttonDisabled"] is True
assert controls_after_click["paused"] is False
did_dispatch_end = _dispatch_video_event(page, "ended")
assert did_dispatch_end
page.wait_for_function(
"""() => {
const action = document.getElementById('live_obs');
const control = document.getElementById('action_radio');
const watchButton = document.getElementById('watch_demo_video_btn');
if (!action || !control || !watchButton) return false;
return (
getComputedStyle(action).display !== 'none' &&
getComputedStyle(control).display !== 'none' &&
getComputedStyle(watchButton).display === 'none'
);
}"""
)
did_click_exec = page.evaluate(
"""() => {
const btn = document.getElementById('exec_btn');
if (!btn) return false;
btn.click();
return true;
}"""
)
assert did_click_exec
page.wait_for_timeout(300)
phase_after_failed_precheck = page.evaluate(
"""() => {
const visible = (id) => {
const el = document.getElementById(id);
if (!el) return false;
return getComputedStyle(el).display !== 'none';
};
return {
action: visible('live_obs'),
};
}"""
)
assert phase_after_failed_precheck == {"action": True}
did_click_exec = page.evaluate(
"""() => {
const btn = document.getElementById('exec_btn');
if (!btn) return false;
btn.click();
return true;
}"""
)
assert did_click_exec
page.wait_for_function(
"""() => {
const resolveButton = (id) => {
return document.querySelector(`#${id} button`) || document.querySelector(`button#${id}`);
};
const execBtn = resolveButton('exec_btn');
const nextBtn = resolveButton('next_task_btn');
return !!execBtn && !!nextBtn && execBtn.disabled === true && nextBtn.disabled === true;
}"""
)
interactive_snapshot = page.evaluate(
"""() => {
const resolveButton = (id) => {
return document.querySelector(`#${id} button`) || document.querySelector(`button#${id}`);
};
const execBtn = resolveButton('exec_btn');
const nextBtn = resolveButton('next_task_btn');
return {
execDisabled: execBtn ? execBtn.disabled : null,
nextDisabled: nextBtn ? nextBtn.disabled : null,
};
}"""
)
assert interactive_snapshot["execDisabled"] is True
assert interactive_snapshot["nextDisabled"] is True
page.wait_for_function(
"""() => {
const videoEl = document.querySelector('#execute_video video');
return !!videoEl && videoEl.autoplay === true && (videoEl.paused === false || videoEl.currentTime > 0);
}""",
timeout=6000,
)
execute_video_controls = _read_demo_video_controls(page, elem_id="execute_video", button_elem_id=None)
assert execute_video_controls["autoplay"] is True
assert execute_video_controls["paused"] is False
execute_phase_snapshot = _read_phase_visibility(page)
assert execute_phase_snapshot["actionPhase"] is False
assert execute_phase_snapshot["controlPhase"] is True
panel_snapshot = page.evaluate(
"""() => {
const resolveButton = (id) => {
return document.querySelector(`#${id} button`) || document.querySelector(`button#${id}`);
};
const radio = document.querySelector('#action_radio input[type="radio"]');
const refBtn = resolveButton('reference_action_btn');
const hint = document.querySelector('#task_hint_display textarea, #task_hint_display input');
return {
radioDisabled: radio ? radio.disabled : null,
refDisabled: refBtn ? refBtn.disabled : null,
hintDisabled: hint ? hint.disabled : null,
};
}"""
)
assert panel_snapshot["radioDisabled"] is True
assert panel_snapshot["refDisabled"] is True
assert panel_snapshot["hintDisabled"] is True
did_dispatch_end = _dispatch_video_event(page, "ended", elem_id="execute_video")
assert did_dispatch_end
page.wait_for_function(
"""() => {
const execBtn = document.querySelector('button#exec_btn') || document.querySelector('#exec_btn button');
const action = document.getElementById('live_obs');
if (!execBtn || !action) return false;
return execBtn.disabled === false && getComputedStyle(action).display !== 'none';
}""",
timeout=6000,
)
final_interactive_snapshot = page.evaluate(
"""() => {
const resolveButton = (id) => {
return document.querySelector(`#${id} button`) || document.querySelector(`button#${id}`);
};
const execBtn = resolveButton('exec_btn');
const nextBtn = resolveButton('next_task_btn');
return {
execDisabled: execBtn ? execBtn.disabled : null,
nextDisabled: nextBtn ? nextBtn.disabled : null,
};
}"""
)
assert final_interactive_snapshot["execDisabled"] is False
assert final_interactive_snapshot["nextDisabled"] is False
browser.close()
assert state["precheck_calls"] >= 2
assert state["play_clicks"] == 1
def test_reference_action_button_is_green_only_when_interactive(phase_machine_ui_url):
root_url, _state = phase_machine_ui_url
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page(viewport={"width": 1280, "height": 900})
page.goto(root_url, wait_until="domcontentloaded")
page.wait_for_timeout(2500)
page.wait_for_selector("#login_btn", timeout=20000)
page.click("#login_btn")
disabled_snapshot = _resolve_button_snapshot(page, "reference_action_btn")
if disabled_snapshot["found"]:
assert disabled_snapshot["disabled"] is True
assert disabled_snapshot["backgroundColor"] != "rgb(31, 139, 76)"
page.wait_for_selector("#demo_video video", timeout=5000)
_click_demo_video_button(page)
page.wait_for_function(
"""() => {
const button =
document.querySelector('#watch_demo_video_btn button') ||
document.querySelector('button#watch_demo_video_btn');
return !!button && button.disabled === true;
}""",
timeout=5000,
)
did_dispatch_end = _dispatch_video_event(page, "ended")
assert did_dispatch_end
page.wait_for_function(
"""() => {
const button = document.querySelector('#reference_action_btn button') || document.querySelector('button#reference_action_btn');
return !!button && button.disabled === false;
}""",
timeout=6000,
)
enabled_snapshot = _resolve_button_snapshot(page, "reference_action_btn")
assert enabled_snapshot["found"] is True
assert enabled_snapshot["disabled"] is False
assert enabled_snapshot["backgroundColor"] == "rgb(31, 139, 76)"
assert enabled_snapshot["borderColor"] == "rgb(31, 139, 76)"
assert enabled_snapshot["color"] == "rgb(255, 255, 255)"
browser.close()
@pytest.mark.xfail(
reason="Gradio 6.9.0 output video stop path is not reliably triggerable in headless Chromium; transition contract is covered by unit tests.",
strict=False,
)
def test_demo_video_stop_event_transitions_and_hides_button(phase_machine_ui_url):
root_url, state = phase_machine_ui_url
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page(viewport={"width": 1280, "height": 900})
page.goto(root_url, wait_until="domcontentloaded")
page.wait_for_timeout(2500)
page.wait_for_selector("#login_btn", timeout=20000)
page.click("#login_btn")
page.wait_for_selector("#demo_video video", timeout=5000)
_click_demo_video_button(page)
page.wait_for_function(
"""() => {
const button =
document.querySelector('#watch_demo_video_btn button') ||
document.querySelector('button#watch_demo_video_btn');
return !!button && button.disabled === true;
}""",
timeout=5000,
)
page.locator("#simulate_stop_btn button, button#simulate_stop_btn").first.click()
page.wait_for_function(
"""() => {
const visible = (id) => {
const el = document.getElementById(id);
if (!el) return false;
const st = getComputedStyle(el);
return st.display !== 'none' && st.visibility !== 'hidden' && el.getClientRects().length > 0;
};
return (
!visible('watch_demo_video_btn') &&
!visible('demo_video') &&
visible('live_obs') &&
visible('action_radio')
);
}""",
timeout=5000,
)
browser.close()
assert state["play_clicks"] == 1
def test_unified_loading_overlay_init_flow(monkeypatch):
config_module = importlib.reload(importlib.import_module("config"))
monkeypatch.setattr(config_module, "UI_GLOBAL_FONT_SIZE", "32px")
ui_layout = importlib.reload(importlib.import_module("ui_layout"))
canonical_copy = "The episode is loading..."
legacy_copy = "Loading environment, please wait..."
superseded_copy = "Logging in and setting up environment... Please wait."
fake_obs = np.zeros((24, 24, 3), dtype=np.uint8)
fake_obs_img = Image.fromarray(fake_obs)
calls = {"init": 0}
def fake_init_app(_request=None):
calls["init"] += 1
time.sleep(0.8)
return (
"uid-init",
gr.update(visible=True), # main_interface
gr.update(value=fake_obs_img, interactive=False), # img_display
"ready", # log_output
gr.update(choices=[("pick", 0)], value=None), # options_radio
"goal", # goal_box
"No need for coordinates", # coords_box
gr.update(value=None, visible=False), # video_display
gr.update(visible=False, interactive=False), # watch_demo_video_btn
"PickXtimes (Episode 1)", # task_info_box
"Completed: 0", # progress_info_box
gr.update(interactive=True), # restart_episode_btn
gr.update(interactive=True), # next_task_btn
gr.update(interactive=True), # exec_btn
gr.update(visible=False), # video_phase_group
gr.update(visible=True), # action_phase_group
gr.update(visible=True), # control_panel_group
gr.update(value="hint"), # task_hint_display
gr.update(interactive=True), # reference_action_btn
)
monkeypatch.setattr(ui_layout, "init_app", fake_init_app)
demo = ui_layout.create_ui_blocks()
port = _free_port()
host = "127.0.0.1"
_app, root_url, _share_url = demo.launch(
server_name=host,
server_port=port,
prevent_thread_lock=True,
quiet=True,
show_error=True,
ssr_mode=False,
theme=ui_layout.APP_THEME,
css=ui_layout.CSS,
head=ui_layout.THEME_LOCK_HEAD,
)
_wait_http_ready(root_url)
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page(viewport={"width": 1280, "height": 900})
page.goto(root_url, wait_until="domcontentloaded")
page.wait_for_function(
"""() => {
const node = document.querySelector('.progress-text');
if (!node) return false;
const style = getComputedStyle(node);
const rect = node.getBoundingClientRect();
return (
style.display !== 'none' &&
style.visibility !== 'hidden' &&
Number.parseFloat(style.opacity || '1') > 0 &&
rect.width > 0 &&
rect.height > 0 &&
(node.textContent || '').includes('The episode is loading...')
);
}""",
timeout=5000,
)
progress_snapshot = _read_progress_text_snapshot(page)
markdown_snapshot = _read_progress_markdown_snapshot(page)
assert progress_snapshot["present"] is True
assert progress_snapshot["visible"] is True
assert progress_snapshot["text"] == canonical_copy
assert progress_snapshot["x"] is not None and progress_snapshot["x"] < 500
assert progress_snapshot["y"] is not None and progress_snapshot["y"] > 300
assert markdown_snapshot["text"] is None
assert markdown_snapshot["pendingVisible"] is False
assert markdown_snapshot["markdownVisible"] is False
assert page.locator("#robomme_episode_loading_copy").count() == 0
assert superseded_copy not in str(progress_snapshot["text"] or "")
assert legacy_copy not in page.content()
assert page.locator("#loading_overlay_group").count() == 0
page.wait_for_function(
"""() => !document.querySelector('.progress-text')""",
timeout=15000,
)
page.wait_for_selector("#main_interface_root", state="visible", timeout=15000)
page.wait_for_function(
"""() => {
const root = document.getElementById('header_task');
if (!root) return false;
const input = root.querySelector('input');
if (input && typeof input.value === 'string' && input.value.trim() === 'PickXtimes') {
return true;
}
const selected = root.querySelector('.single-select');
return !!selected && (selected.textContent || '').trim() === 'PickXtimes';
}""",
timeout=15000,
)
assert _read_header_task_value(page) == "PickXtimes"
browser.close()
finally:
demo.close()
assert calls["init"] >= 1
def test_episode_loading_copy_after_change_episode(monkeypatch):
ui_layout = importlib.reload(importlib.import_module("ui_layout"))
fake_obs = np.zeros((24, 24, 3), dtype=np.uint8)
fake_obs_img = Image.fromarray(fake_obs)
calls = {"init": 0, "next": 0}
def _load_result(uid: str, episode_idx: int, log_text: str):
return (
uid,
gr.update(visible=True), # main_interface
gr.update(value=fake_obs_img.copy(), interactive=False), # img_display
log_text, # log_output
gr.update(choices=[("pick", 0)], value=None), # options_radio
"goal", # goal_box
"No need for coordinates", # coords_box
gr.update(value=None, visible=False), # video_display
gr.update(visible=False, interactive=False), # watch_demo_video_btn
f"PickXtimes (Episode {episode_idx})", # task_info_box
f"Completed: {episode_idx - 1}", # progress_info_box
gr.update(interactive=True), # restart_episode_btn
gr.update(interactive=True), # next_task_btn
gr.update(interactive=True), # exec_btn
gr.update(visible=False), # video_phase_group
gr.update(visible=True), # action_phase_group
gr.update(visible=True), # control_panel_group
gr.update(value="hint"), # task_hint_display
gr.update(interactive=True), # reference_action_btn
)
def fake_init_app(_request=None):
calls["init"] += 1
return _load_result("uid-next-episode", 1, "ready-1")
def fake_load_next_task_wrapper(uid):
calls["next"] += 1
time.sleep(0.8)
return _load_result(uid, 2, "ready-2")
monkeypatch.setattr(ui_layout, "init_app", fake_init_app)
monkeypatch.setattr(ui_layout, "load_next_task_wrapper", fake_load_next_task_wrapper)
demo = ui_layout.create_ui_blocks()
port = _free_port()
host = "127.0.0.1"
root_url = f"http://{host}:{port}/"
app = FastAPI(title="episode-loading-copy-change-episode-test")
app = gr.mount_gradio_app(app, demo, path="/")
config = uvicorn.Config(app, host=host, port=port, log_level="error")
server = uvicorn.Server(config)
thread = threading.Thread(target=server.run, daemon=True)
thread.start()
_wait_http_ready(root_url)
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page(viewport={"width": 1280, "height": 900})
page.goto(root_url, wait_until="domcontentloaded")
page.wait_for_selector("#main_interface_root", state="visible", timeout=15000)
page.wait_for_function(
"""() => {
const root = document.getElementById('header_task');
if (!root) return false;
const input = root.querySelector('input');
if (input && typeof input.value === 'string' && input.value.trim() === 'PickXtimes') {
return true;
}
const selected = root.querySelector('.single-select');
return !!selected && (selected.textContent || '').trim() === 'PickXtimes';
}""",
timeout=5000,
)
page.wait_for_function(
"""() => {
const host = document.getElementById('native_progress_host');
const markdown = host?.querySelector('[data-testid="markdown"]');
const prose = markdown ? markdown.querySelector('.prose, .md') || markdown : null;
const text = prose ? ((prose.innerText || prose.textContent || '').trim()) : '';
return text === '';
}""",
timeout=5000,
)
page.locator("#next_task_btn button, button#next_task_btn").first.click()
page.wait_for_function(
"""() => {
const node = document.querySelector('.progress-text');
if (!node) return false;
const style = getComputedStyle(node);
const rect = node.getBoundingClientRect();
return (
style.display !== 'none' &&
style.visibility !== 'hidden' &&
Number.parseFloat(style.opacity || '1') > 0 &&
rect.width > 0 &&
rect.height > 0 &&
(node.textContent || '').trim() === 'The episode is loading...'
);
}""",
timeout=5000,
)
progress_snapshot = _read_progress_text_snapshot(page)
markdown_snapshot = _read_progress_markdown_snapshot(page)
assert progress_snapshot["present"] is True
assert progress_snapshot["visible"] is True
assert progress_snapshot["text"] == "The episode is loading..."
assert markdown_snapshot["text"] is None
assert markdown_snapshot["pendingVisible"] is False
assert markdown_snapshot["markdownVisible"] is False
assert page.locator("#robomme_episode_loading_copy").count() == 0
deadline = time.time() + 15.0
while time.time() < deadline:
if _read_log_output_value(page) == "ready-2":
break
time.sleep(0.1)
else:
raise AssertionError("next episode load did not complete")
assert page.locator("#robomme_episode_loading_copy").count() == 0
browser.close()
finally:
server.should_exit = True
thread.join(timeout=10)
demo.close()
assert calls == {"init": 1, "next": 1}
def test_no_video_task_hides_manual_demo_button(monkeypatch):
ui_layout = importlib.reload(importlib.import_module("ui_layout"))
fake_obs = np.zeros((24, 24, 3), dtype=np.uint8)
fake_obs_img = Image.fromarray(fake_obs)
def fake_init_app(_request=None):
return (
"uid-no-video",
gr.update(visible=True), # main_interface
gr.update(value=fake_obs_img.copy(), interactive=False), # img_display
"ready", # log_output
gr.update(choices=[("pick", 0)], value=None), # options_radio
"goal", # goal_box
"No need for coordinates", # coords_box
gr.update(value=None, visible=False), # video_display
gr.update(visible=False, interactive=False), # watch_demo_video_btn
"PickXtimes (Episode 1)", # task_info_box
"Completed: 0", # progress_info_box
gr.update(interactive=True), # restart_episode_btn
gr.update(interactive=True), # next_task_btn
gr.update(interactive=True), # exec_btn
gr.update(visible=False), # video_phase_group
gr.update(visible=True), # action_phase_group
gr.update(visible=True), # control_panel_group
gr.update(value="hint"), # task_hint_display
gr.update(interactive=True), # reference_action_btn
)
monkeypatch.setattr(ui_layout, "init_app", fake_init_app)
demo = ui_layout.create_ui_blocks()
port = _free_port()
host = "127.0.0.1"
root_url = f"http://{host}:{port}/"
app = FastAPI(title="native-no-video-test")
app = gr.mount_gradio_app(app, demo, path="/")
config = uvicorn.Config(app, host=host, port=port, log_level="error")
server = uvicorn.Server(config)
thread = threading.Thread(target=server.run, daemon=True)
thread.start()
_wait_http_ready(root_url)
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page(viewport={"width": 1280, "height": 900})
page.goto(root_url, wait_until="domcontentloaded")
page.wait_for_selector("#main_interface_root", state="visible", timeout=15000)
page.wait_for_function(
"""() => {
const visible = (id) => {
const el = document.getElementById(id);
if (!el) return false;
const st = getComputedStyle(el);
return st.display !== 'none' && st.visibility !== 'hidden' && el.getClientRects().length > 0;
};
return (
!visible('video_phase_group') &&
!visible('demo_video') &&
!visible('watch_demo_video_btn') &&
visible('action_phase_group') &&
visible('control_panel_group')
);
}""",
timeout=5000,
)
phase_snapshot = _read_phase_visibility(page)
controls_snapshot = _read_demo_video_controls(page)
assert phase_snapshot["videoPhase"] is False
assert phase_snapshot["video"] is False
assert phase_snapshot["watchButton"] is False
assert phase_snapshot["actionPhase"] is True
assert phase_snapshot["controlPhase"] is True
assert controls_snapshot["buttonVisible"] is False
browser.close()
finally:
server.should_exit = True
thread.join(timeout=10)
demo.close()
def test_point_wait_state_pulses_live_obs_and_updates_system_log(monkeypatch):
config_module = importlib.reload(importlib.import_module("config"))
callbacks = importlib.reload(importlib.import_module("gradio_callbacks"))
ui_layout = importlib.reload(importlib.import_module("ui_layout"))
fake_obs = np.zeros((24, 48, 3), dtype=np.uint8)
fake_obs[:, :] = [15, 20, 25]
fake_obs_img = Image.fromarray(fake_obs)
class FakeSession:
raw_solve_options = [
{"label": "a", "available": [object()]},
{"label": "b", "available": False},
]
def get_pil_image(self, use_segmented=False):
_ = use_segmented
return fake_obs_img.copy()
def fake_init_app(_request=None):
return (
"uid-point-wait",
gr.update(visible=True), # main_interface
gr.update(
value=fake_obs_img.copy(),
interactive=False,
elem_classes=config_module.get_live_obs_elem_classes(),
), # img_display
config_module.UI_TEXT["log"]["action_selection_prompt"], # log_output
gr.update(choices=[("pick", 0), ("skip", 1)], value=None), # options_radio
"goal", # goal_box
gr.update(
value=config_module.UI_TEXT["coords"]["not_needed"],
visible=True,
interactive=False,
), # coords_box
gr.update(value=None, visible=False), # video_display
gr.update(visible=False, interactive=False), # watch_demo_video_btn
"PointEnv (Episode 1)", # task_info_box
"Completed: 0", # progress_info_box
gr.update(interactive=True), # restart_episode_btn
gr.update(interactive=True), # next_task_btn
gr.update(interactive=True), # exec_btn
gr.update(visible=False), # video_phase_group
gr.update(visible=True), # action_phase_group
gr.update(visible=True), # control_panel_group
gr.update(value="hint"), # task_hint_display
gr.update(interactive=True), # reference_action_btn
)
monkeypatch.setattr(ui_layout, "init_app", fake_init_app)
monkeypatch.setattr(callbacks, "get_session", lambda uid: FakeSession())
demo = ui_layout.create_ui_blocks()
port = _free_port()
host = "127.0.0.1"
root_url = f"http://{host}:{port}/"
app = FastAPI(title="point-wait-state-test")
app = gr.mount_gradio_app(app, demo, path="/")
config = uvicorn.Config(app, host=host, port=port, log_level="error")
server = uvicorn.Server(config)
thread = threading.Thread(target=server.run, daemon=True)
thread.start()
_wait_http_ready(root_url)
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page(viewport={"width": 1280, "height": 900})
page.goto(root_url, wait_until="domcontentloaded")
page.add_style_tag(content=ui_layout.CSS)
page.wait_for_selector("#main_interface_root", state="visible", timeout=15000)
page.wait_for_selector("#live_obs img", timeout=15000)
initial_classes = _read_elem_classes(page, "live_obs")
assert initial_classes is not None
assert config_module.LIVE_OBS_POINT_WAIT_CLASS not in initial_classes
assert _read_log_output_value(page) == config_module.UI_TEXT["log"]["action_selection_prompt"]
initial_card_wait = _read_media_card_wait_snapshot(page)
initial_transforms = _read_live_obs_transform_snapshot(page)
initial_img_box = page.locator("#live_obs img").bounding_box()
initial_frame_box = page.locator("#live_obs .image-frame").bounding_box()
assert initial_card_wait["opacity"] == 0
assert initial_card_wait["animationName"] == "none"
assert initial_transforms["imgTransform"] == "none"
assert initial_transforms["frameTransform"] == "none"
assert initial_img_box is not None
assert initial_frame_box is not None
page.locator("#action_radio input[type='radio']").first.check(force=True)
page.wait_for_function(
"""(state) => {
const liveObs = document.getElementById('live_obs');
const coordsRoot = document.getElementById('coords_box');
const coordsField = coordsRoot?.querySelector('textarea, input');
const logRoot = document.getElementById('log_output');
const logField = logRoot?.querySelector('textarea, input');
const mediaCard = document.getElementById('media_card');
const mediaAfter = mediaCard ? getComputedStyle(mediaCard, '::after') : null;
const coordsValue = coordsField ? coordsField.value.trim() : '';
const logValue = logField ? logField.value.trim() : (logRoot?.textContent || '').trim();
return (
!!liveObs &&
liveObs.classList.contains(state.waitClass) &&
!!mediaAfter &&
Number.parseFloat(mediaAfter.opacity || '0') > 0.5 &&
mediaAfter.animationName === state.cardAnimation &&
coordsValue === state.coordsPrompt &&
logValue === state.waitLog
);
}""",
arg={
"cardAnimation": "media-card-point-ring",
"waitClass": config_module.LIVE_OBS_POINT_WAIT_CLASS,
"coordsPrompt": config_module.UI_TEXT["coords"]["select_point"],
"waitLog": config_module.UI_TEXT["log"]["point_selection_prompt"],
},
timeout=5000,
)
wait_classes = _read_elem_classes(page, "live_obs")
assert wait_classes is not None
assert config_module.LIVE_OBS_POINT_WAIT_CLASS in wait_classes
assert _read_coords_box_value(page) == config_module.UI_TEXT["coords"]["select_point"]
assert _read_log_output_value(page) == config_module.UI_TEXT["log"]["point_selection_prompt"]
wait_card = _read_media_card_wait_snapshot(page)
wait_transforms = _read_live_obs_transform_snapshot(page)
wait_img_box = page.locator("#live_obs img").bounding_box()
wait_frame_box = page.locator("#live_obs .image-frame").bounding_box()
assert wait_card["opacity"] is not None and wait_card["opacity"] > 0.5
assert wait_card["animationName"] == "media-card-point-ring"
assert wait_card["borderColor"] != "rgba(225, 29, 72, 0)"
assert wait_transforms["imgTransform"] == "none"
assert wait_transforms["frameTransform"] == "none"
assert wait_img_box is not None
assert wait_frame_box is not None
assert wait_img_box["x"] == pytest.approx(initial_img_box["x"], abs=1.0)
assert wait_img_box["y"] == pytest.approx(initial_img_box["y"], abs=1.0)
assert wait_img_box["width"] == pytest.approx(initial_img_box["width"], abs=1.0)
assert wait_img_box["height"] == pytest.approx(initial_img_box["height"], abs=1.0)
assert wait_frame_box["x"] == pytest.approx(initial_frame_box["x"], abs=1.0)
assert wait_frame_box["y"] == pytest.approx(initial_frame_box["y"], abs=1.0)
assert wait_frame_box["width"] == pytest.approx(initial_frame_box["width"], abs=1.0)
assert wait_frame_box["height"] == pytest.approx(initial_frame_box["height"], abs=1.0)
box = page.locator("#live_obs img").bounding_box()
assert box is not None
target_x = box["x"] + ((24.5) / 48.0) * box["width"]
target_y = box["y"] + ((8.5) / 24.0) * box["height"]
page.mouse.click(target_x, target_y)
page.wait_for_function(
"""(state) => {
const liveObs = document.getElementById('live_obs');
const coordsRoot = document.getElementById('coords_box');
const coordsField = coordsRoot?.querySelector('textarea, input');
const logRoot = document.getElementById('log_output');
const logField = logRoot?.querySelector('textarea, input');
const coordsValue = coordsField ? coordsField.value.trim() : '';
const logValue = logField ? logField.value.trim() : (logRoot?.textContent || '').trim();
return (
!!liveObs &&
!liveObs.classList.contains(state.waitClass) &&
/^\\d+\\s*,\\s*\\d+$/.test(coordsValue) &&
logValue === `Select: ${state.label} | point <${coordsValue}>`
);
}""",
arg={
"waitClass": config_module.LIVE_OBS_POINT_WAIT_CLASS,
"label": "A",
},
timeout=5000,
)
coords_value = _read_coords_box_value(page)
assert coords_value is not None
coord_x, coord_y = [int(part.strip()) for part in coords_value.split(",", 1)]
assert abs(coord_x - 24) <= 1
assert abs(coord_y - 8) <= 1
final_classes = _read_elem_classes(page, "live_obs")
assert final_classes is not None
assert config_module.LIVE_OBS_POINT_WAIT_CLASS not in final_classes
assert config_module.LIVE_OBS_BASE_CLASS in final_classes
assert _read_log_output_value(page) == config_module.UI_TEXT["log"]["point_selected_message"].format(
label="A",
x=coord_x,
y=coord_y,
)
final_card_wait = _read_media_card_wait_snapshot(page)
final_transforms = _read_live_obs_transform_snapshot(page)
assert final_card_wait["opacity"] == 0
assert final_card_wait["animationName"] == "none"
assert final_transforms["imgTransform"] == "none"
assert final_transforms["frameTransform"] == "none"
browser.close()
finally:
server.should_exit = True
thread.join(timeout=10)
demo.close()
def test_reference_action_single_click_applies_coords_without_wait_state(monkeypatch):
config_module = importlib.reload(importlib.import_module("config"))
callbacks = importlib.reload(importlib.import_module("gradio_callbacks"))
ui_layout = importlib.reload(importlib.import_module("ui_layout"))
fake_obs = np.zeros((24, 48, 3), dtype=np.uint8)
fake_obs[:, :] = [15, 20, 25]
fake_obs_img = Image.fromarray(fake_obs)
class FakeSession:
env_id = "BinFill"
raw_solve_options = [
{"label": "a", "action": "pick the left cube", "available": [object()]},
{"label": "b", "action": "pick the right cube", "available": [object()]},
]
available_options = [
("a. pick the left cube", 0),
("b. pick the right cube", 1),
]
def get_pil_image(self, use_segmented=False):
_ = use_segmented
return fake_obs_img.copy()
def get_reference_action(self):
return {
"ok": True,
"option_idx": 0,
"option_label": "a",
"option_action": "pick the left cube",
"need_coords": True,
"coords_xy": [5, 6],
"message": "ok",
}
def fake_init_app(_request=None):
return (
"uid-reference-action",
gr.update(visible=True), # main_interface
gr.update(
value=fake_obs_img.copy(),
interactive=False,
elem_classes=config_module.get_live_obs_elem_classes(),
), # img_display
config_module.UI_TEXT["log"]["action_selection_prompt"], # log_output
gr.update(
choices=[
("a. pick the left cube", 0),
("b. pick the right cube", 1),
],
value=None,
), # options_radio
"goal", # goal_box
gr.update(
value=config_module.UI_TEXT["coords"]["not_needed"],
visible=True,
interactive=False,
), # coords_box
gr.update(value=None, visible=False), # video_display
gr.update(visible=False, interactive=False), # watch_demo_video_btn
"BinFill (Episode 1)", # task_info_box
"Completed: 0", # progress_info_box
gr.update(interactive=True), # restart_episode_btn
gr.update(interactive=True), # next_task_btn
gr.update(interactive=True), # exec_btn
gr.update(visible=False), # video_phase_group
gr.update(visible=True), # action_phase_group
gr.update(visible=True), # control_panel_group
gr.update(value="hint"), # task_hint_display
gr.update(interactive=True), # reference_action_btn
)
monkeypatch.setattr(ui_layout, "init_app", fake_init_app)
monkeypatch.setattr(callbacks, "get_session", lambda uid: FakeSession())
demo = ui_layout.create_ui_blocks()
port = _free_port()
host = "127.0.0.1"
root_url = f"http://{host}:{port}/"
app = FastAPI(title="reference-action-single-click-test")
app = gr.mount_gradio_app(app, demo, path="/")
config = uvicorn.Config(app, host=host, port=port, log_level="error")
server = uvicorn.Server(config)
thread = threading.Thread(target=server.run, daemon=True)
thread.start()
_wait_http_ready(root_url)
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page(viewport={"width": 1280, "height": 900})
page.goto(root_url, wait_until="domcontentloaded")
page.wait_for_selector("#main_interface_root", state="visible", timeout=15000)
page.wait_for_selector("#live_obs img", timeout=15000)
page.wait_for_selector("#reference_action_btn button, button#reference_action_btn", timeout=15000)
expected_reference_log = config_module.UI_TEXT["log"]["reference_action_message_with_coords"].format(
option_label="A",
coords_text="5, 6",
)
page.locator("#reference_action_btn button, button#reference_action_btn").first.click()
page.wait_for_function(
"""(state) => {
const coordsRoot = document.getElementById('coords_box');
const coordsField = coordsRoot?.querySelector('textarea, input');
const logRoot = document.getElementById('log_output');
const logField = logRoot?.querySelector('textarea, input');
const liveObs = document.getElementById('live_obs');
const checked = document.querySelector('#action_radio input[type="radio"]:checked');
const coordsValue = coordsField ? coordsField.value.trim() : '';
const logValue = logField ? logField.value.trim() : (logRoot?.textContent || '').trim();
return (
!!checked &&
checked.value === state.checkedValue &&
coordsValue === state.coordsValue &&
logValue === state.logValue &&
!!liveObs &&
!liveObs.classList.contains(state.waitClass)
);
}""",
arg={
"checkedValue": "0",
"coordsValue": "5, 6",
"logValue": expected_reference_log,
"waitClass": config_module.LIVE_OBS_POINT_WAIT_CLASS,
},
timeout=5000,
)
classes_after_reference = _read_elem_classes(page, "live_obs")
assert classes_after_reference is not None
assert config_module.LIVE_OBS_POINT_WAIT_CLASS not in classes_after_reference
assert _read_coords_box_value(page) == "5, 6"
assert _read_log_output_value(page) == expected_reference_log
page.locator("#action_radio input[type='radio']").nth(1).check(force=True)
page.wait_for_function(
"""(state) => {
const coordsRoot = document.getElementById('coords_box');
const coordsField = coordsRoot?.querySelector('textarea, input');
const logRoot = document.getElementById('log_output');
const logField = logRoot?.querySelector('textarea, input');
const liveObs = document.getElementById('live_obs');
const checked = document.querySelector('#action_radio input[type="radio"]:checked');
const coordsValue = coordsField ? coordsField.value.trim() : '';
const logValue = logField ? logField.value.trim() : (logRoot?.textContent || '').trim();
return (
!!checked &&
checked.value === state.checkedValue &&
coordsValue === state.coordsValue &&
logValue === state.logValue &&
!!liveObs &&
liveObs.classList.contains(state.waitClass)
);
}""",
arg={
"checkedValue": "1",
"coordsValue": config_module.UI_TEXT["coords"]["select_point"],
"logValue": config_module.UI_TEXT["log"]["point_selection_prompt"],
"waitClass": config_module.LIVE_OBS_POINT_WAIT_CLASS,
},
timeout=5000,
)
classes_after_manual_change = _read_elem_classes(page, "live_obs")
assert classes_after_manual_change is not None
assert config_module.LIVE_OBS_POINT_WAIT_CLASS in classes_after_manual_change
assert _read_coords_box_value(page) == config_module.UI_TEXT["coords"]["select_point"]
assert _read_log_output_value(page) == config_module.UI_TEXT["log"]["point_selection_prompt"]
browser.close()
finally:
server.should_exit = True
thread.join(timeout=10)
demo.close()
def test_live_obs_client_resize_fills_width_and_keeps_click_mapping(monkeypatch):
callbacks = importlib.reload(importlib.import_module("gradio_callbacks"))
ui_layout = importlib.reload(importlib.import_module("ui_layout"))
fake_obs = np.zeros((24, 48, 3), dtype=np.uint8)
fake_obs_img = Image.fromarray(fake_obs)
class FakeSession:
raw_solve_options = [{"available": True}]
def get_pil_image(self, use_segmented=False):
_ = use_segmented
return fake_obs_img.copy()
def fake_init_app(_request=None):
return (
"uid-live-obs-resize",
gr.update(visible=True), # main_interface
gr.update(value=fake_obs_img.copy(), interactive=False), # img_display
"ready", # log_output
gr.update(choices=[("pick", 0)], value=0), # options_radio
"goal", # goal_box
gr.update(
value="please click the point selection image",
visible=True,
interactive=False,
), # coords_box
gr.update(value=None, visible=False), # video_display
gr.update(visible=False, interactive=False), # watch_demo_video_btn
"ResizeEnv (Episode 1)", # task_info_box
"Completed: 0", # progress_info_box
gr.update(interactive=True), # restart_episode_btn
gr.update(interactive=True), # next_task_btn
gr.update(interactive=True), # exec_btn
gr.update(visible=False), # video_phase_group
gr.update(visible=True), # action_phase_group
gr.update(visible=True), # control_panel_group
gr.update(value="hint"), # task_hint_display
gr.update(interactive=True), # reference_action_btn
)
monkeypatch.setattr(ui_layout, "init_app", fake_init_app)
monkeypatch.setattr(callbacks, "get_session", lambda uid: FakeSession())
demo = ui_layout.create_ui_blocks()
port = _free_port()
host = "127.0.0.1"
root_url = f"http://{host}:{port}/"
app = FastAPI(title="live-obs-client-resize-test")
app = gr.mount_gradio_app(app, demo, path="/")
config = uvicorn.Config(app, host=host, port=port, log_level="error")
server = uvicorn.Server(config)
thread = threading.Thread(target=server.run, daemon=True)
thread.start()
_wait_http_ready(root_url)
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page(viewport={"width": 1280, "height": 900})
page.goto(root_url, wait_until="domcontentloaded")
page.wait_for_selector("#main_interface_root", state="visible", timeout=15000)
page.wait_for_selector("#live_obs img", timeout=15000)
page.wait_for_selector("#coords_box textarea, #coords_box input", timeout=15000)
page.wait_for_function(
"""() => {
const container = document.querySelector('#live_obs .image-container');
const img = document.querySelector('#live_obs img');
if (!container || !img) return false;
const containerRect = container.getBoundingClientRect();
const imgRect = img.getBoundingClientRect();
return imgRect.width > 200 && Math.abs(containerRect.width - imgRect.width) <= 2;
}""",
timeout=10000,
)
initial_geometry = _read_live_obs_geometry(page)
assert initial_geometry["container"] is not None
assert initial_geometry["img"] is not None
assert initial_geometry["uploadContainer"] is not None
assert initial_geometry["frame"] is not None
assert initial_geometry["img"]["width"] > 200
assert abs(initial_geometry["container"]["width"] - initial_geometry["img"]["width"]) <= 2
assert abs(initial_geometry["uploadContainer"]["width"] - initial_geometry["img"]["width"]) <= 2
assert abs(initial_geometry["frame"]["width"] - initial_geometry["img"]["width"]) <= 2
assert initial_geometry["img"]["width"] / initial_geometry["img"]["height"] == pytest.approx(2.0, rel=0.02)
page.set_viewport_size({"width": 1024, "height": 900})
page.wait_for_function(
"""(prevWidth) => {
const container = document.querySelector('#live_obs .image-container');
const img = document.querySelector('#live_obs img');
if (!container || !img) return false;
const containerRect = container.getBoundingClientRect();
const imgRect = img.getBoundingClientRect();
return imgRect.width < prevWidth - 20 && Math.abs(containerRect.width - imgRect.width) <= 2;
}""",
arg=initial_geometry["img"]["width"],
timeout=10000,
)
resized_geometry = _read_live_obs_geometry(page)
assert resized_geometry["img"] is not None
assert resized_geometry["container"] is not None
assert resized_geometry["img"]["width"] < initial_geometry["img"]["width"] - 20
assert abs(resized_geometry["container"]["width"] - resized_geometry["img"]["width"]) <= 2
assert resized_geometry["img"]["width"] / resized_geometry["img"]["height"] == pytest.approx(2.0, rel=0.02)
box = page.locator("#live_obs img").bounding_box()
assert box is not None
target_x = box["x"] + ((36.5) / 48.0) * box["width"]
target_y = box["y"] + ((12.5) / 24.0) * box["height"]
page.mouse.click(target_x, target_y)
page.wait_for_function(
"""() => {
const root = document.getElementById('coords_box');
const field = root?.querySelector('textarea, input');
return !!field && /^\\d+\\s*,\\s*\\d+$/.test(field.value.trim());
}""",
timeout=5000,
)
coords_value = _read_coords_box_value(page)
assert coords_value is not None
coord_x, coord_y = [int(part.strip()) for part in coords_value.split(",", 1)]
assert abs(coord_x - 36) <= 1
assert abs(coord_y - 12) <= 1
browser.close()
finally:
server.should_exit = True
thread.join(timeout=10)
demo.close()
def test_live_obs_client_resize_after_hidden_phase_becomes_visible(tmp_path):
ui_layout = importlib.reload(importlib.import_module("ui_layout"))
full_red = np.zeros((256, 256, 3), dtype=np.uint8)
full_red[:, :] = [255, 0, 0]
with gr.Blocks() as demo:
demo.css = ui_layout.CSS
show_btn = gr.Button("Show", elem_id="show_btn")
with gr.Column(visible=False, elem_id="action_phase_group") as action_phase_group:
gr.Image(
value=full_red,
elem_id="live_obs",
elem_classes=["live-obs-resizable"],
buttons=[],
sources=[],
)
demo.load(
fn=None,
js=ui_layout.LIVE_OBS_CLIENT_RESIZE_JS,
queue=False,
)
show_btn.click(
fn=lambda: gr.update(visible=True),
outputs=[action_phase_group],
queue=False,
)
port = _free_port()
host = "127.0.0.1"
root_url = f"http://{host}:{port}/"
app = FastAPI(title="live-obs-hidden-phase-resize-test")
app = gr.mount_gradio_app(app, demo, path="/")
config = uvicorn.Config(app, host=host, port=port, log_level="error")
server = uvicorn.Server(config)
thread = threading.Thread(target=server.run, daemon=True)
thread.start()
_wait_http_ready(root_url)
screenshot_path = tmp_path / "live_obs_hidden_phase.png"
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page(viewport={"width": 1440, "height": 900})
page.goto(root_url, wait_until="domcontentloaded")
page.wait_for_function(
"() => !!window.__robommeLiveObsResizerInstalled",
timeout=5000,
)
page.click("#show_btn")
page.wait_for_selector("#live_obs img", timeout=10000)
page.wait_for_function(
"""() => {
const container = document.querySelector('#live_obs .image-container');
const img = document.querySelector('#live_obs img');
if (!container || !img) return false;
const containerRect = container.getBoundingClientRect();
const imgRect = img.getBoundingClientRect();
return imgRect.width > 300 && Math.abs(containerRect.width - imgRect.width) <= 2;
}""",
timeout=10000,
)
geometry = _read_live_obs_geometry(page)
assert geometry["container"] is not None
assert geometry["img"] is not None
assert geometry["img"]["width"] > 300
assert abs(geometry["container"]["width"] - geometry["img"]["width"]) <= 2
object_fit = page.evaluate(
"""() => getComputedStyle(document.querySelector('#live_obs img')).objectFit"""
)
assert object_fit == "contain"
page.locator("#live_obs img").screenshot(path=str(screenshot_path))
browser.close()
finally:
server.should_exit = True
thread.join(timeout=10)
demo.close()
screenshot = Image.open(screenshot_path).convert("RGB")
width, height = screenshot.size
samples = [
screenshot.getpixel((width // 2, height // 2)),
screenshot.getpixel((max(1, width // 10), height // 2)),
screenshot.getpixel((min(width - 2, (width * 9) // 10), height // 2)),
]
for pixel in samples:
assert pixel[0] > 200
assert pixel[1] < 30
assert pixel[2] < 30
def test_header_task_shows_env_after_init(monkeypatch):
ui_layout = importlib.reload(importlib.import_module("ui_layout"))
fake_obs = np.zeros((24, 24, 3), dtype=np.uint8)
fake_obs_img = Image.fromarray(fake_obs)
def fake_init_app(request=None):
_ = request
return (
"uid-auto",
gr.update(visible=True), # main_interface
gr.update(value=fake_obs_img, interactive=False), # img_display
"ready", # log_output
gr.update(choices=[("pick", 0)], value=None), # options_radio
"goal", # goal_box
"No need for coordinates", # coords_box
gr.update(value=None, visible=False), # video_display
gr.update(visible=False, interactive=False), # watch_demo_video_btn
"PickXtimes (Episode 1)", # task_info_box
"Completed: 0", # progress_info_box
gr.update(interactive=True), # restart_episode_btn
gr.update(interactive=True), # next_task_btn
gr.update(interactive=True), # exec_btn
gr.update(visible=False), # video_phase_group
gr.update(visible=True), # action_phase_group
gr.update(visible=True), # control_panel_group
gr.update(value="hint"), # task_hint_display
gr.update(interactive=True), # reference_action_btn
)
monkeypatch.setattr(ui_layout, "init_app", fake_init_app)
demo = ui_layout.create_ui_blocks()
port = _free_port()
host = "127.0.0.1"
root_url = f"http://{host}:{port}/"
app = FastAPI(title="header-task-url-auto-login-test")
app = gr.mount_gradio_app(app, demo, path="/")
config = uvicorn.Config(app, host=host, port=port, log_level="error")
server = uvicorn.Server(config)
thread = threading.Thread(target=server.run, daemon=True)
thread.start()
_wait_http_ready(root_url)
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page(viewport={"width": 1280, "height": 900})
page.goto(f"{root_url}?user=user1", wait_until="domcontentloaded")
page.wait_for_selector("#main_interface_root", state="visible", timeout=15000)
page.wait_for_function(
"""() => {
const root = document.getElementById('header_task');
const input = root ? root.querySelector('input') : null;
return !!input && input.value.trim() === 'PickXtimes';
}""",
timeout=5000,
)
assert _read_header_task_value(page) == "PickXtimes"
assert _read_header_goal_value(page) == "Goal"
browser.close()
finally:
server.should_exit = True
thread.join(timeout=10)
demo.close()
def test_header_goal_capitalizes_displayed_value_after_init(monkeypatch):
ui_layout = importlib.reload(importlib.import_module("ui_layout"))
fake_obs = np.zeros((24, 24, 3), dtype=np.uint8)
fake_obs_img = Image.fromarray(fake_obs)
def fake_init_app(request=None):
_ = request
return (
"uid-auto",
gr.update(visible=True), # main_interface
gr.update(value=fake_obs_img, interactive=False), # img_display
"ready", # log_output
gr.update(choices=[("pick", 0)], value=None), # options_radio
"place cube on target", # goal_box
"No need for coordinates", # coords_box
gr.update(value=None, visible=False), # video_display
gr.update(visible=False, interactive=False), # watch_demo_video_btn
"PickXtimes (Episode 1)", # task_info_box
"Completed: 0", # progress_info_box
gr.update(interactive=True), # restart_episode_btn
gr.update(interactive=True), # next_task_btn
gr.update(interactive=True), # exec_btn
gr.update(visible=False), # video_phase_group
gr.update(visible=True), # action_phase_group
gr.update(visible=True), # control_panel_group
gr.update(value="hint"), # task_hint_display
gr.update(interactive=True), # reference_action_btn
)
monkeypatch.setattr(ui_layout, "init_app", fake_init_app)
demo = ui_layout.create_ui_blocks()
port = _free_port()
host = "127.0.0.1"
root_url = f"http://{host}:{port}/"
app = FastAPI(title="header-goal-capitalization-test")
app = gr.mount_gradio_app(app, demo, path="/")
config = uvicorn.Config(app, host=host, port=port, log_level="error")
server = uvicorn.Server(config)
thread = threading.Thread(target=server.run, daemon=True)
thread.start()
_wait_http_ready(root_url)
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page(viewport={"width": 1280, "height": 900})
page.goto(root_url, wait_until="domcontentloaded")
page.wait_for_selector("#main_interface_root", state="visible", timeout=15000)
page.wait_for_function(
"""() => {
const root = document.getElementById('header_goal');
const input = root ? root.querySelector('textarea, input') : null;
return !!input && input.value.trim() === 'Place cube on target';
}""",
timeout=5000,
)
assert _read_header_goal_value(page) == "Place cube on target"
browser.close()
finally:
server.should_exit = True
thread.join(timeout=10)
demo.close()
@pytest.mark.parametrize(
"task_info_text,expected_header_value",
[
("pickxtimes (Episode 1)", "PickXtimes"),
("EnvFromSessionOnly (Episode 1)", "EnvFromSessionOnly"),
],
)
def test_header_task_env_normalization_and_fallback(monkeypatch, task_info_text, expected_header_value):
ui_layout = importlib.reload(importlib.import_module("ui_layout"))
fake_obs = np.zeros((24, 24, 3), dtype=np.uint8)
fake_obs_img = Image.fromarray(fake_obs)
def fake_init_app(_request=None):
return (
"uid-auto",
gr.update(visible=True), # main_interface
gr.update(value=fake_obs_img, interactive=False), # img_display
"ready", # log_output
gr.update(choices=[("pick", 0)], value=None), # options_radio
"goal", # goal_box
"No need for coordinates", # coords_box
gr.update(value=None, visible=False), # video_display
gr.update(visible=False, interactive=False), # watch_demo_video_btn
task_info_text, # task_info_box
"Completed: 0", # progress_info_box
gr.update(interactive=True), # restart_episode_btn
gr.update(interactive=True), # next_task_btn
gr.update(interactive=True), # exec_btn
gr.update(visible=False), # video_phase_group
gr.update(visible=True), # action_phase_group
gr.update(visible=True), # control_panel_group
gr.update(value="hint"), # task_hint_display
gr.update(interactive=True), # reference_action_btn
)
monkeypatch.setattr(ui_layout, "init_app", fake_init_app)
demo = ui_layout.create_ui_blocks()
port = _free_port()
host = "127.0.0.1"
root_url = f"http://{host}:{port}/"
app = FastAPI(title="header-task-normalization-fallback-test")
app = gr.mount_gradio_app(app, demo, path="/")
config = uvicorn.Config(app, host=host, port=port, log_level="error")
server = uvicorn.Server(config)
thread = threading.Thread(target=server.run, daemon=True)
thread.start()
_wait_http_ready(root_url)
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page(viewport={"width": 1280, "height": 900})
page.goto(root_url, wait_until="domcontentloaded")
page.wait_for_selector("#main_interface_root", state="visible", timeout=15000)
page.wait_for_function(
"""(expectedValue) => {
const root = document.getElementById('header_task');
const input = root ? root.querySelector('input') : null;
return !!input && input.value.trim() === expectedValue;
}""",
arg=expected_header_value,
timeout=5000,
)
assert _read_header_task_value(page) == expected_header_value
browser.close()
finally:
server.should_exit = True
thread.join(timeout=10)
demo.close()
def test_header_task_switch_to_video_task_shows_demo_phase(monkeypatch):
ui_layout = importlib.reload(importlib.import_module("ui_layout"))
fake_obs = np.zeros((24, 24, 3), dtype=np.uint8)
fake_obs_img = Image.fromarray(fake_obs)
demo_video_path = gr.get_video("world.mp4")
switch_calls = []
def _pick_task_response(uid, task_name, show_video):
return (
uid,
gr.update(visible=True), # main_interface
gr.update(value=fake_obs_img, interactive=False), # img_display
"demo prompt" if show_video else "ready", # log_output
gr.update(choices=[("pick", 0)], value=None), # options_radio
"video goal" if show_video else "goal", # goal_box
"No need for coordinates", # coords_box
gr.update(value=demo_video_path if show_video else None, visible=show_video), # video_display
gr.update(visible=show_video, interactive=show_video), # watch_demo_video_btn
f"{task_name} (Episode 1)", # task_info_box
"Completed: 0", # progress_info_box
gr.update(interactive=True), # restart_episode_btn
gr.update(interactive=True), # next_task_btn
gr.update(interactive=True), # exec_btn
gr.update(visible=show_video), # video_phase_group
gr.update(visible=not show_video), # action_phase_group
gr.update(visible=not show_video), # control_panel_group
gr.update(value="video hint" if show_video else "hint"), # task_hint_display
gr.update(interactive=True), # reference_action_btn
)
def fake_init_app(request=None):
_ = request
return _pick_task_response("uid-header-video", "PickXtimes", show_video=False)
def fake_switch_env_wrapper(uid, selected_env):
switch_calls.append((uid, selected_env))
return _pick_task_response(
uid,
selected_env,
show_video=selected_env == "VideoPlaceButton",
)
monkeypatch.setattr(ui_layout, "init_app", fake_init_app)
monkeypatch.setattr(ui_layout, "switch_env_wrapper", fake_switch_env_wrapper)
monkeypatch.setattr(ui_layout.user_manager, "env_choices", ["PickXtimes", "VideoPlaceButton"])
demo = ui_layout.create_ui_blocks()
port = _free_port()
host = "127.0.0.1"
root_url = f"http://{host}:{port}/"
app = FastAPI(title="header-task-switch-video-phase-test")
app = gr.mount_gradio_app(app, demo, path="/")
config = uvicorn.Config(app, host=host, port=port, log_level="error")
server = uvicorn.Server(config)
thread = threading.Thread(target=server.run, daemon=True)
thread.start()
_wait_http_ready(root_url)
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page(viewport={"width": 1280, "height": 900})
page.goto(root_url, wait_until="domcontentloaded")
page.wait_for_selector("#main_interface_root", state="visible", timeout=15000)
page.wait_for_function(
"""() => {
const root = document.getElementById('header_task');
const input = root ? root.querySelector('input') : null;
return !!input && input.value.trim() === 'PickXtimes';
}""",
timeout=5000,
)
assert switch_calls == []
page.click("#header_task input")
page.get_by_role("option", name="VideoPlaceButton").click()
page.wait_for_function(
"""() => {
const visible = (id) => {
const el = document.getElementById(id);
if (!el) return false;
const st = getComputedStyle(el);
return st.display !== 'none' && st.visibility !== 'hidden' && el.getClientRects().length > 0;
};
const videoEl = document.querySelector('#demo_video video');
const button =
document.querySelector('#watch_demo_video_btn button') ||
document.querySelector('button#watch_demo_video_btn');
return (
visible('video_phase_group') &&
visible('demo_video') &&
visible('watch_demo_video_btn') &&
!visible('action_phase_group') &&
!visible('control_panel_group') &&
!!(videoEl && videoEl.currentSrc) &&
!!button &&
button.disabled === false &&
videoEl.paused === true &&
videoEl.autoplay === false
);
}""",
timeout=10000,
)
phase_after_switch = _read_phase_visibility(page)
assert phase_after_switch["videoPhase"] is True
assert phase_after_switch["video"] is True
assert phase_after_switch["watchButton"] is True
assert phase_after_switch["actionPhase"] is False
assert phase_after_switch["controlPhase"] is False
assert phase_after_switch["currentSrc"]
assert switch_calls == [("uid-header-video", "VideoPlaceButton")]
page.wait_for_timeout(1500)
assert switch_calls == [("uid-header-video", "VideoPlaceButton")]
assert _read_header_task_value(page) == "VideoPlaceButton"
_click_demo_video_button(page)
page.wait_for_function(
"""() => {
const button =
document.querySelector('#watch_demo_video_btn button') ||
document.querySelector('button#watch_demo_video_btn');
return !!button && button.disabled === true;
}""",
timeout=5000,
)
did_dispatch_end = _dispatch_video_event(page, "ended")
assert did_dispatch_end
page.wait_for_function(
"""() => {
const visible = (id) => {
const el = document.getElementById(id);
if (!el) return false;
const st = getComputedStyle(el);
return st.display !== 'none' && st.visibility !== 'hidden' && el.getClientRects().length > 0;
};
return (
!visible('video_phase_group') &&
!visible('demo_video') &&
!visible('watch_demo_video_btn') &&
visible('action_phase_group') &&
visible('control_panel_group') &&
visible('live_obs') &&
visible('action_radio')
);
}""",
timeout=5000,
)
phase_after_end = _read_phase_visibility(page)
assert phase_after_end["videoPhase"] is False
assert phase_after_end["video"] is False
assert phase_after_end["watchButton"] is False
assert phase_after_end["actionPhase"] is True
assert phase_after_end["action"] is True
assert phase_after_end["controlPhase"] is True
assert phase_after_end["control"] is True
browser.close()
finally:
server.should_exit = True
thread.join(timeout=10)
demo.close()
def _run_local_execute_video_transition_test(
*,
status_text,
done,
expect_terminal_buttons_disabled,
expected_terminal_log=None,
):
import gradio_callbacks as cb
import config as config_module
ui_layout = importlib.reload(importlib.import_module("ui_layout"))
demo_video_path = gr.get_video("world.mp4")
fake_obs = np.zeros((24, 24, 3), dtype=np.uint8)
class FakeSession:
def __init__(self):
self.env_id = "BinFill"
self.episode_idx = 1
self.language_goal = "place cube on target"
self.available_options = [("pick", 0), ("point", 1)]
self.raw_solve_options = [
{"label": "a", "action": "pick", "available": False},
{"label": "b", "action": "point", "available": [object()]},
]
self.demonstration_frames = []
self.last_execution_frames = []
self.base_frames = [fake_obs.copy()]
self.non_demonstration_task_length = None
self.difficulty = "easy"
self.seed = 123
def get_pil_image(self, use_segmented=False):
_ = use_segmented
return fake_obs.copy()
def update_observation(self, use_segmentation=False):
_ = use_segmentation
return None
def execute_action(self, option_idx, click_coords):
_ = option_idx, click_coords
self.last_execution_frames = [fake_obs.copy() for _ in range(3)]
self.base_frames.extend(self.last_execution_frames)
return fake_obs.copy(), status_text, done
originals = {
"get_session": cb.get_session,
"increment_execute_count": cb.increment_execute_count,
"save_video": cb.save_video,
}
fake_session = FakeSession()
cb.get_session = lambda uid: fake_session
cb.increment_execute_count = lambda uid, env_id, ep_num: 1
cb.save_video = lambda frames, suffix="": demo_video_path
try:
with gr.Blocks(title="Native phase machine local video test") as demo:
uid_state = gr.State(value="uid-local-video")
phase_state = gr.State(value="action_point")
post_execute_controls_state = gr.State(
value={
"exec_btn_interactive": True,
"reference_action_interactive": True,
}
)
post_execute_log_state = gr.State(
value={
"preserve_terminal_log": False,
"terminal_log_value": None,
"preserve_execute_video_log": False,
"execute_video_log_value": None,
}
)
suppress_state = gr.State(value=False)
with gr.Column(visible=True, elem_id="main_interface") as main_interface:
with gr.Column(visible=False, elem_id="video_phase_group") as video_phase_group:
video_display = gr.Video(value=None, elem_id="demo_video", autoplay=False)
watch_demo_video_btn = gr.Button(
"Watch Video Input🎬",
elem_id="watch_demo_video_btn",
interactive=False,
visible=False,
)
with gr.Column(visible=False, elem_id="execution_video_group") as execution_video_group:
execute_video_display = gr.Video(value=None, elem_id="execute_video", autoplay=True)
with gr.Column(visible=True, elem_id="action_phase_group") as action_phase_group:
img_display = gr.Image(value=fake_obs.copy(), elem_id="live_obs")
with gr.Column(visible=True, elem_id="control_panel_group") as control_panel_group:
options_radio = gr.Radio(choices=[("pick", 0), ("point", 1)], value=None, elem_id="action_radio")
coords_box = gr.Textbox(config_module.UI_TEXT["coords"]["not_needed"], elem_id="coords_box")
exec_btn = gr.Button("execute", interactive=True, elem_id="exec_btn")
reference_action_btn = gr.Button("reference", interactive=True, elem_id="reference_action_btn")
restart_episode_btn = gr.Button("restart", interactive=True, elem_id="restart_episode_btn")
next_task_btn = gr.Button("next", interactive=True, elem_id="next_task_btn")
task_hint_display = gr.Textbox("hint", interactive=True, elem_id="task_hint_display")
log_output = gr.Markdown("", elem_id="log_output")
task_info_box = gr.Textbox("")
progress_info_box = gr.Textbox("")
exec_btn.click(
fn=cb.precheck_execute_inputs,
inputs=[uid_state, options_radio, coords_box],
outputs=[],
queue=False,
).then(
fn=cb.switch_to_execute_phase,
inputs=[uid_state],
outputs=[
options_radio,
exec_btn,
restart_episode_btn,
next_task_btn,
img_display,
reference_action_btn,
task_hint_display,
],
queue=False,
).then(
fn=cb.execute_step,
inputs=[uid_state, options_radio, coords_box],
outputs=[
img_display,
log_output,
task_info_box,
progress_info_box,
restart_episode_btn,
next_task_btn,
exec_btn,
execute_video_display,
action_phase_group,
control_panel_group,
execution_video_group,
options_radio,
coords_box,
reference_action_btn,
task_hint_display,
post_execute_controls_state,
post_execute_log_state,
phase_state,
],
queue=False,
)
options_radio.change(
fn=cb.on_option_select,
inputs=[uid_state, options_radio, coords_box, suppress_state, post_execute_log_state],
outputs=[coords_box, img_display, log_output, suppress_state, post_execute_log_state],
queue=False,
)
img_display.select(
fn=cb.on_map_click,
inputs=[uid_state, options_radio],
outputs=[img_display, coords_box, log_output],
queue=False,
)
execute_video_display.end(
fn=cb.on_execute_video_end_transition,
inputs=[uid_state, post_execute_controls_state, post_execute_log_state],
outputs=[
execution_video_group,
action_phase_group,
control_panel_group,
options_radio,
exec_btn,
restart_episode_btn,
next_task_btn,
img_display,
log_output,
reference_action_btn,
task_hint_display,
post_execute_log_state,
phase_state,
],
queue=False,
)
execute_video_display.stop(
fn=cb.on_execute_video_end_transition,
inputs=[uid_state, post_execute_controls_state, post_execute_log_state],
outputs=[
execution_video_group,
action_phase_group,
control_panel_group,
options_radio,
exec_btn,
restart_episode_btn,
next_task_btn,
img_display,
log_output,
reference_action_btn,
task_hint_display,
post_execute_log_state,
phase_state,
],
queue=False,
)
port = _free_port()
host = "127.0.0.1"
root_url = f"http://{host}:{port}/"
app = FastAPI(title="native-phase-machine-local-video-test")
app = gr.mount_gradio_app(app, demo, path="/")
config = uvicorn.Config(app, host=host, port=port, log_level="error")
server = uvicorn.Server(config)
thread = threading.Thread(target=server.run, daemon=True)
thread.start()
_wait_http_ready(root_url)
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page(viewport={"width": 1280, "height": 900})
page.goto(root_url, wait_until="domcontentloaded")
page.wait_for_selector("#main_interface", state="visible", timeout=20000)
page.locator("#action_radio input[type='radio']").nth(1).check(force=True)
page.wait_for_function(
"""(state) => {
const coordsRoot = document.getElementById('coords_box');
const coordsField = coordsRoot?.querySelector('textarea, input');
const logRoot = document.getElementById('log_output');
const logField = logRoot?.querySelector('textarea, input');
const coordsValue = coordsField ? coordsField.value.trim() : '';
const logValue = logField ? logField.value.trim() : (logRoot?.textContent || '').trim();
return coordsValue === state.coordsPrompt && logValue === state.waitLog;
}""",
arg={
"coordsPrompt": config_module.UI_TEXT["coords"]["select_point"],
"waitLog": config_module.UI_TEXT["log"]["point_selection_prompt"],
},
timeout=5000,
)
box = page.locator("#live_obs img").bounding_box()
assert box is not None
page.mouse.click(box["x"] + box["width"] / 2, box["y"] + box["height"] / 2)
page.wait_for_function(
"""(state) => {
const coordsRoot = document.getElementById('coords_box');
const coordsField = coordsRoot?.querySelector('textarea, input');
const logRoot = document.getElementById('log_output');
const logField = logRoot?.querySelector('textarea, input');
const coordsValue = coordsField ? coordsField.value.trim() : '';
const logValue = logField ? logField.value.trim() : (logRoot?.textContent || '').trim();
return /^\\d+\\s*,\\s*\\d+$/.test(coordsValue) && logValue === `Select: ${state.label} | point <${coordsValue}>`;
}""",
arg={"label": "B"},
timeout=5000,
)
selected_coords = _read_coords_box_value(page)
assert selected_coords is not None
page.locator("#exec_btn button, button#exec_btn").first.click()
page.wait_for_selector("#execute_video video", timeout=5000)
page.wait_for_function(
"""() => {
const visible = (id) => {
const el = document.getElementById(id);
if (!el) return false;
const st = getComputedStyle(el);
return st.display !== 'none' && st.visibility !== 'hidden' && el.getClientRects().length > 0;
};
const videoEl = document.querySelector('#execute_video video');
return (
visible('execution_video_group') &&
visible('execute_video') &&
!visible('action_phase_group') &&
visible('control_panel_group') &&
!!videoEl &&
videoEl.autoplay === true &&
(videoEl.paused === false || videoEl.currentTime > 0)
);
}""",
timeout=10000,
)
if not done:
execution_log = _read_log_output_value(page)
assert execution_log == config_module.UI_TEXT["log"]["execute_action_prompt_with_coords"].format(
label="B",
coords_text=selected_coords,
)
assert execution_log != config_module.UI_TEXT["log"]["point_selection_prompt"]
controls_after_execute = _read_demo_video_controls(page, elem_id="execute_video", button_elem_id=None)
assert controls_after_execute["autoplay"] is True
assert controls_after_execute["paused"] is False
panel_snapshot = page.evaluate(
"""() => {
const resolveButton = (id) => {
return document.querySelector(`#${id} button`) || document.querySelector(`button#${id}`);
};
const radio = document.querySelector('#action_radio input[type="radio"]');
const refBtn = resolveButton('reference_action_btn');
const restartBtn = resolveButton('restart_episode_btn');
const nextBtn = resolveButton('next_task_btn');
const hint = document.querySelector('#task_hint_display textarea, #task_hint_display input');
return {
radioDisabled: radio ? radio.disabled : null,
refDisabled: refBtn ? refBtn.disabled : null,
restartDisabled: restartBtn ? restartBtn.disabled : null,
nextDisabled: nextBtn ? nextBtn.disabled : null,
hintDisabled: hint ? hint.disabled : null,
};
}"""
)
assert panel_snapshot == {
"radioDisabled": True,
"refDisabled": True,
"restartDisabled": True,
"nextDisabled": True,
"hintDisabled": True,
}
did_dispatch_end = _dispatch_video_event(page, "ended", elem_id="execute_video")
assert did_dispatch_end
page.wait_for_function(
"""() => {
const visible = (id) => {
const el = document.getElementById(id);
if (!el) return false;
const st = getComputedStyle(el);
return st.display !== 'none' && st.visibility !== 'hidden' && el.getClientRects().length > 0;
};
return (
visible('live_obs') &&
visible('action_radio') &&
!visible('execute_video') &&
visible('control_panel_group')
);
}""",
timeout=2000,
)
if expect_terminal_buttons_disabled:
button_snapshot = page.evaluate(
"""() => {
const resolveButton = (id) => {
return document.querySelector(`#${id} button`) || document.querySelector(`button#${id}`);
};
const execBtn = resolveButton('exec_btn');
const refBtn = resolveButton('reference_action_btn');
return {
execDisabled: execBtn ? execBtn.disabled : null,
refDisabled: refBtn ? refBtn.disabled : null,
};
}"""
)
assert button_snapshot == {
"execDisabled": True,
"refDisabled": True,
}
terminal_log_before = _read_log_output_value(page)
assert terminal_log_before is not None
assert expected_terminal_log is not None
assert expected_terminal_log in terminal_log_before
page.locator("#action_radio input[type='radio']").nth(1).check(force=True)
page.wait_for_timeout(300)
assert _read_log_output_value(page) == terminal_log_before
else:
button_snapshot = page.evaluate(
"""() => {
const resolveButton = (id) => {
return document.querySelector(`#${id} button`) || document.querySelector(`button#${id}`);
};
const execBtn = resolveButton('exec_btn');
const refBtn = resolveButton('reference_action_btn');
return {
execDisabled: execBtn ? execBtn.disabled : null,
refDisabled: refBtn ? refBtn.disabled : null,
};
}"""
)
assert button_snapshot == {
"execDisabled": False,
"refDisabled": False,
}
assert _read_log_output_value(page) == config_module.UI_TEXT["log"]["action_selection_prompt"]
browser.close()
finally:
server.should_exit = True
thread.join(timeout=10)
demo.close()
finally:
for name, value in originals.items():
setattr(cb, name, value)
def test_phase_machine_runtime_local_video_path_end_transition():
_run_local_execute_video_transition_test(
status_text="Executing: pick",
done=False,
expect_terminal_buttons_disabled=False,
)
def test_phase_machine_runtime_local_video_path_end_transition_terminal_success():
_run_local_execute_video_transition_test(
status_text="SUCCESS",
done=True,
expect_terminal_buttons_disabled=True,
expected_terminal_log="episode success",
)
def test_phase_machine_runtime_local_video_path_end_transition_terminal_failed():
_run_local_execute_video_transition_test(
status_text="Executing: pick | FAILED",
done=True,
expect_terminal_buttons_disabled=True,
expected_terminal_log="episode failed",
)
def test_phase_machine_runtime_stopcube_remain_static_merges_short_tail(monkeypatch):
import gradio_callbacks as cb
import config as config_module
import vqa_options_override as override
demo_video_path = gr.get_video("world.mp4")
fake_obs = np.zeros((24, 24, 3), dtype=np.uint8)
hold_calls = []
def _hold_spy(env, planner, absTimestep):
_ = planner
hold_calls.append(int(absTimestep))
env.elapsed_steps = int(absTimestep)
return None
monkeypatch.setattr(override, "solve_hold_obj_absTimestep", _hold_spy)
class FakeBase:
def __init__(self):
self.steps_press = 270
self.interval = 30
self.button = object()
class FakeEnv:
def __init__(self):
self.unwrapped = FakeBase()
self.elapsed_steps = 0
class FakeSession:
def __init__(self):
self.env_id = "StopCube"
self.episode_idx = 1
self.language_goal = "stop the moving cube"
self.difficulty = "easy"
self.seed = 123
self.non_demonstration_task_length = None
self.demonstration_frames = []
self.last_execution_frames = []
self.base_frames = [fake_obs.copy()]
self.env = FakeEnv()
self.planner = object()
self.raw_solve_options = override.get_vqa_options(
self.env,
self.planner,
{"obj": None},
self.env_id,
)
self.available_options = [
(f"{opt['label']}. {opt['action']}", idx)
for idx, opt in enumerate(self.raw_solve_options)
]
def get_pil_image(self, use_segmented=False):
_ = use_segmented
return fake_obs.copy()
def update_observation(self, use_segmentation=False):
_ = use_segmentation
return None
def execute_action(self, option_idx, click_coords):
_ = click_coords
current_options = override.get_vqa_options(
self.env,
self.planner,
{"obj": None},
self.env_id,
)
current_options[option_idx]["solve"]()
frame_value = hold_calls[-1] if hold_calls else 0
frame = np.full((24, 24, 3), frame_value, dtype=np.uint8)
self.last_execution_frames = [frame.copy(), frame.copy()]
self.base_frames.extend(self.last_execution_frames)
return frame.copy(), f"Executing: {current_options[option_idx]['label']}", False
fake_session = FakeSession()
monkeypatch.setattr(cb, "get_session", lambda uid: fake_session)
monkeypatch.setattr(cb, "increment_execute_count", lambda uid, env_id, ep_num: 1)
monkeypatch.setattr(cb, "save_video", lambda frames, suffix="": demo_video_path)
monkeypatch.setattr(cb, "concatenate_frames_horizontally", lambda frames, env_id=None: list(frames))
monkeypatch.setattr(cb.os.path, "exists", lambda path: True)
monkeypatch.setattr(cb.os.path, "getsize", lambda path: 10)
with gr.Blocks(title="Native StopCube merge test") as demo:
uid_state = gr.State(value="uid-stopcube-merge")
phase_state = gr.State(value="action_point")
post_execute_controls_state = gr.State(
value={
"exec_btn_interactive": True,
"reference_action_interactive": True,
}
)
post_execute_log_state = gr.State(
value={
"preserve_terminal_log": False,
"terminal_log_value": None,
"preserve_execute_video_log": False,
"execute_video_log_value": None,
}
)
suppress_state = gr.State(value=False)
with gr.Column(visible=True, elem_id="main_interface") as main_interface:
with gr.Column(visible=False, elem_id="video_phase_group") as video_phase_group:
video_display = gr.Video(value=None, elem_id="demo_video", autoplay=False)
watch_demo_video_btn = gr.Button(
"Watch Video Input🎬",
elem_id="watch_demo_video_btn",
interactive=False,
visible=False,
)
with gr.Column(visible=False, elem_id="execution_video_group") as execution_video_group:
execute_video_display = gr.Video(value=None, elem_id="execute_video", autoplay=True)
with gr.Column(visible=True, elem_id="action_phase_group") as action_phase_group:
img_display = gr.Image(value=fake_obs.copy(), elem_id="live_obs")
with gr.Column(visible=True, elem_id="control_panel_group") as control_panel_group:
options_radio = gr.Radio(
choices=fake_session.available_options,
value=None,
elem_id="action_radio",
)
coords_box = gr.Textbox(config_module.UI_TEXT["coords"]["not_needed"], elem_id="coords_box")
exec_btn = gr.Button("execute", interactive=True, elem_id="exec_btn")
reference_action_btn = gr.Button("reference", interactive=True, elem_id="reference_action_btn")
restart_episode_btn = gr.Button("restart", interactive=True, elem_id="restart_episode_btn")
next_task_btn = gr.Button("next", interactive=True, elem_id="next_task_btn")
task_hint_display = gr.Textbox("hint", interactive=True, elem_id="task_hint_display")
log_output = gr.Markdown("", elem_id="log_output")
task_info_box = gr.Textbox("")
progress_info_box = gr.Textbox("")
exec_btn.click(
fn=cb.precheck_execute_inputs,
inputs=[uid_state, options_radio, coords_box],
outputs=[],
queue=False,
).then(
fn=cb.switch_to_execute_phase,
inputs=[uid_state],
outputs=[
options_radio,
exec_btn,
restart_episode_btn,
next_task_btn,
img_display,
reference_action_btn,
task_hint_display,
],
queue=False,
).then(
fn=cb.execute_step,
inputs=[uid_state, options_radio, coords_box],
outputs=[
img_display,
log_output,
task_info_box,
progress_info_box,
restart_episode_btn,
next_task_btn,
exec_btn,
execute_video_display,
action_phase_group,
control_panel_group,
execution_video_group,
options_radio,
coords_box,
reference_action_btn,
task_hint_display,
post_execute_controls_state,
post_execute_log_state,
phase_state,
],
queue=False,
)
options_radio.change(
fn=cb.on_option_select,
inputs=[uid_state, options_radio, coords_box, suppress_state, post_execute_log_state],
outputs=[coords_box, img_display, log_output, suppress_state, post_execute_log_state],
queue=False,
)
execute_video_display.end(
fn=cb.on_execute_video_end_transition,
inputs=[uid_state, post_execute_controls_state, post_execute_log_state],
outputs=[
execution_video_group,
action_phase_group,
control_panel_group,
options_radio,
exec_btn,
restart_episode_btn,
next_task_btn,
img_display,
log_output,
reference_action_btn,
task_hint_display,
post_execute_log_state,
phase_state,
],
queue=False,
)
execute_video_display.stop(
fn=cb.on_execute_video_end_transition,
inputs=[uid_state, post_execute_controls_state, post_execute_log_state],
outputs=[
execution_video_group,
action_phase_group,
control_panel_group,
options_radio,
exec_btn,
restart_episode_btn,
next_task_btn,
img_display,
log_output,
reference_action_btn,
task_hint_display,
post_execute_log_state,
phase_state,
],
queue=False,
)
port = _free_port()
host = "127.0.0.1"
root_url = f"http://{host}:{port}/"
app = FastAPI(title="native-stopcube-merge-test")
app = gr.mount_gradio_app(app, demo, path="/")
config = uvicorn.Config(app, host=host, port=port, log_level="error")
server = uvicorn.Server(config)
thread = threading.Thread(target=server.run, daemon=True)
thread.start()
_wait_http_ready(root_url)
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page(viewport={"width": 1280, "height": 900})
page.goto(root_url, wait_until="domcontentloaded")
page.wait_for_selector("#main_interface", state="visible", timeout=20000)
for _ in range(2):
page.locator("#action_radio input[type='radio']").nth(1).check(force=True)
page.locator("#exec_btn button, button#exec_btn").first.click()
page.wait_for_selector("#execute_video video", timeout=5000)
page.wait_for_function(
"""() => {
const visible = (id) => {
const el = document.getElementById(id);
if (!el) return false;
const st = getComputedStyle(el);
return st.display !== 'none' && st.visibility !== 'hidden' && el.getClientRects().length > 0;
};
const videoEl = document.querySelector('#execute_video video');
return (
visible('execution_video_group') &&
visible('execute_video') &&
!visible('action_phase_group') &&
!!videoEl &&
videoEl.autoplay === true
);
}""",
timeout=10000,
)
assert _dispatch_video_event(page, "ended", elem_id="execute_video")
page.wait_for_function(
"""() => {
const visible = (id) => {
const el = document.getElementById(id);
if (!el) return false;
const st = getComputedStyle(el);
return st.display !== 'none' && st.visibility !== 'hidden' && el.getClientRects().length > 0;
};
const execBtn = document.querySelector('#exec_btn button') || document.querySelector('button#exec_btn');
return (
visible('action_phase_group') &&
visible('control_panel_group') &&
!visible('execute_video') &&
!!execBtn &&
execBtn.disabled === false
);
}""",
timeout=5000,
)
assert hold_calls == [100, 240]
browser.close()
finally:
server.should_exit = True
thread.join(timeout=10)
demo.close()