| | from __future__ import annotations |
| |
|
| | import contextlib |
| | import importlib |
| | import socket |
| | import threading |
| | import time |
| | from urllib.error import URLError |
| | from urllib.request import urlopen |
| |
|
| | import numpy as np |
| | import pytest |
| | from PIL import Image |
| |
|
| |
|
| | gr = pytest.importorskip("gradio") |
| | pytest.importorskip("fastapi") |
| | pytest.importorskip("uvicorn") |
| | pytest.importorskip("playwright.sync_api") |
| |
|
| | import uvicorn |
| | from fastapi import FastAPI |
| | from playwright.sync_api import sync_playwright |
| |
|
| |
|
| | def _free_port() -> int: |
| | with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: |
| | sock.bind(("127.0.0.1", 0)) |
| | return int(sock.getsockname()[1]) |
| |
|
| |
|
| | def _wait_http_ready(url: str, timeout_s: float = 20.0) -> None: |
| | end = time.time() + timeout_s |
| | while time.time() < end: |
| | try: |
| | with urlopen(url, timeout=1.0) as resp: |
| | if int(getattr(resp, "status", 200)) < 500: |
| | return |
| | except URLError: |
| | time.sleep(0.2) |
| | except Exception: |
| | time.sleep(0.2) |
| | raise RuntimeError(f"Server did not become ready: {url}") |
| |
|
| |
|
| | def _resolve_button_snapshot(page, elem_id: str) -> dict[str, str | bool | None]: |
| | return page.evaluate( |
| | """(elemId) => { |
| | const button = document.querySelector(`#${elemId} button`) || document.querySelector(`button#${elemId}`); |
| | if (!button) { |
| | return { |
| | found: false, |
| | disabled: null, |
| | backgroundColor: null, |
| | borderColor: null, |
| | color: null, |
| | }; |
| | } |
| | const style = getComputedStyle(button); |
| | return { |
| | found: true, |
| | disabled: button.disabled, |
| | backgroundColor: style.backgroundColor, |
| | borderColor: style.borderColor, |
| | color: style.color, |
| | }; |
| | }""", |
| | elem_id, |
| | ) |
| |
|
| |
|
| | def _read_header_task_value(page) -> str | None: |
| | return page.evaluate( |
| | """() => { |
| | const root = document.getElementById('header_task'); |
| | if (!root) return null; |
| | const input = root.querySelector('input'); |
| | if (input && typeof input.value === 'string') { |
| | const value = input.value.trim(); |
| | return value || null; |
| | } |
| | const selected = root.querySelector('.single-select'); |
| | if (!selected) return null; |
| | const text = (selected.textContent || '').trim(); |
| | return text || null; |
| | }""" |
| | ) |
| |
|
| |
|
| | def _read_header_goal_value(page) -> str | None: |
| | return page.evaluate( |
| | """() => { |
| | const root = document.getElementById('header_goal'); |
| | if (!root) return null; |
| | const field = root.querySelector('textarea, input'); |
| | if (!field) return null; |
| | const value = typeof field.value === 'string' ? field.value.trim() : ''; |
| | return value || null; |
| | }""" |
| | ) |
| |
|
| |
|
| | def _read_coords_box_value(page) -> str | None: |
| | return page.evaluate( |
| | """() => { |
| | const root = document.getElementById('coords_box'); |
| | if (!root) return null; |
| | const field = root.querySelector('textarea, input'); |
| | if (!field) return null; |
| | const value = typeof field.value === 'string' ? field.value.trim() : ''; |
| | return value || null; |
| | }""" |
| | ) |
| |
|
| |
|
| | def _read_log_output_value(page) -> str | None: |
| | return page.evaluate( |
| | """() => { |
| | const root = document.getElementById('log_output'); |
| | if (!root) return null; |
| | const field = root.querySelector('textarea, input'); |
| | if (field && typeof field.value === 'string') { |
| | const value = field.value.trim(); |
| | return value || null; |
| | } |
| | const value = (root.textContent || '').trim(); |
| | return value || null; |
| | }""" |
| | ) |
| |
|
| |
|
| | def _read_progress_markdown_snapshot(page) -> dict[str, bool | str | None]: |
| | return page.evaluate( |
| | """() => { |
| | const host = document.getElementById('native_progress_host'); |
| | const pending = host?.querySelector('.pending'); |
| | const markdown = host?.querySelector('[data-testid="markdown"]'); |
| | const prose = markdown ? markdown.querySelector('.prose, .md') || markdown : null; |
| | if (!host) { |
| | return { |
| | pendingPresent: false, |
| | pendingVisible: false, |
| | markdownVisible: false, |
| | text: null, |
| | }; |
| | } |
| | const pendingStyle = pending ? getComputedStyle(pending) : null; |
| | const markdownStyle = markdown ? getComputedStyle(markdown) : null; |
| | const text = prose ? ((prose.textContent || '').trim()) : ''; |
| | return { |
| | pendingPresent: !!pending, |
| | pendingVisible: !!pendingStyle && pendingStyle.display !== 'none' && pendingStyle.visibility !== 'hidden', |
| | markdownVisible: |
| | !!markdownStyle && |
| | markdownStyle.display !== 'none' && |
| | markdownStyle.visibility !== 'hidden', |
| | text: text || null, |
| | }; |
| | }""" |
| | ) |
| |
|
| |
|
| | def _read_progress_text_snapshot(page) -> dict[str, float | bool | str | None]: |
| | return page.evaluate( |
| | """() => { |
| | const node = document.querySelector('.progress-text'); |
| | if (!node) { |
| | return { |
| | present: false, |
| | visible: false, |
| | text: null, |
| | x: null, |
| | y: null, |
| | width: null, |
| | height: null, |
| | }; |
| | } |
| | const style = getComputedStyle(node); |
| | const rect = node.getBoundingClientRect(); |
| | return { |
| | present: true, |
| | visible: |
| | style.display !== 'none' && |
| | style.visibility !== 'hidden' && |
| | Number.parseFloat(style.opacity || '1') > 0 && |
| | rect.width > 0 && |
| | rect.height > 0, |
| | text: (node.textContent || '').trim() || null, |
| | x: rect.x, |
| | y: rect.y, |
| | width: rect.width, |
| | height: rect.height, |
| | }; |
| | }""" |
| | ) |
| |
|
| |
|
| | def _read_elem_classes(page, elem_id: str) -> list[str] | None: |
| | return page.evaluate( |
| | """(elemId) => { |
| | const root = document.getElementById(elemId); |
| | return root ? Array.from(root.classList) : null; |
| | }""", |
| | elem_id, |
| | ) |
| |
|
| |
|
| | def _read_media_card_wait_snapshot(page) -> dict[str, str | float | None]: |
| | return page.evaluate( |
| | """() => { |
| | const card = document.getElementById('media_card'); |
| | if (!card) { |
| | return { |
| | opacity: null, |
| | borderColor: null, |
| | boxShadow: null, |
| | animationName: null, |
| | }; |
| | } |
| | const style = getComputedStyle(card, '::after'); |
| | return { |
| | opacity: Number.parseFloat(style.opacity || '0'), |
| | borderColor: style.borderColor || null, |
| | boxShadow: style.boxShadow || null, |
| | animationName: style.animationName || null, |
| | }; |
| | }""" |
| | ) |
| |
|
| |
|
| | def _read_live_obs_transform_snapshot(page) -> dict[str, str | None]: |
| | return page.evaluate( |
| | """() => { |
| | const img = document.querySelector('#live_obs img'); |
| | const frame = document.querySelector('#live_obs .image-frame'); |
| | return { |
| | imgTransform: img ? getComputedStyle(img).transform : null, |
| | frameTransform: frame ? getComputedStyle(frame).transform : null, |
| | }; |
| | }""" |
| | ) |
| |
|
| |
|
| | def _read_phase_visibility(page) -> dict[str, bool | str | None]: |
| | return page.evaluate( |
| | """() => { |
| | const visible = (id) => { |
| | const el = document.getElementById(id); |
| | if (!el) return false; |
| | const st = getComputedStyle(el); |
| | return st.display !== 'none' && st.visibility !== 'hidden' && el.getClientRects().length > 0; |
| | }; |
| | const videoEl = document.querySelector('#demo_video video'); |
| | const executeVideoEl = document.querySelector('#execute_video video'); |
| | return { |
| | videoPhase: visible('video_phase_group'), |
| | video: visible('demo_video'), |
| | executionVideoPhase: visible('execution_video_group'), |
| | executionVideo: visible('execute_video'), |
| | watchButton: visible('watch_demo_video_btn'), |
| | actionPhase: visible('action_phase_group'), |
| | action: visible('live_obs'), |
| | controlPhase: visible('control_panel_group'), |
| | control: visible('action_radio'), |
| | currentSrc: videoEl ? videoEl.currentSrc : null, |
| | executeCurrentSrc: executeVideoEl ? executeVideoEl.currentSrc : null, |
| | }; |
| | }""" |
| | ) |
| |
|
| |
|
| | def _read_demo_video_controls(page, elem_id: str = "demo_video", button_elem_id: str | None = "watch_demo_video_btn") -> dict[str, bool | None]: |
| | return page.evaluate( |
| | """({ elemId, buttonElemId }) => { |
| | const visible = (id) => { |
| | if (!id) return false; |
| | const el = document.getElementById(id); |
| | if (!el) return false; |
| | const st = getComputedStyle(el); |
| | return st.display !== 'none' && st.visibility !== 'hidden' && el.getClientRects().length > 0; |
| | }; |
| | const videoEl = document.querySelector(`#${elemId} video`); |
| | const button = buttonElemId |
| | ? (document.querySelector(`#${buttonElemId} button`) || document.querySelector(`button#${buttonElemId}`)) |
| | : null; |
| | return { |
| | videoVisible: visible(elemId), |
| | buttonVisible: visible(buttonElemId), |
| | buttonDisabled: button ? button.disabled : null, |
| | autoplay: videoEl ? videoEl.autoplay : null, |
| | paused: videoEl ? videoEl.paused : null, |
| | }; |
| | }""", |
| | {"elemId": elem_id, "buttonElemId": button_elem_id}, |
| | ) |
| |
|
| |
|
| | def _click_demo_video_button(page) -> None: |
| | page.locator("#watch_demo_video_btn button, button#watch_demo_video_btn").first.click() |
| |
|
| |
|
| | def _dispatch_video_event(page, event_name: str, elem_id: str = "demo_video") -> bool: |
| | return page.evaluate( |
| | """({ eventName, elemId }) => { |
| | const targets = [ |
| | document.querySelector(`#${elemId} video`), |
| | document.getElementById(elemId), |
| | ].filter(Boolean); |
| | if (!targets.length) return false; |
| | for (const target of targets) { |
| | target.dispatchEvent(new Event(eventName, { bubbles: true, composed: true })); |
| | } |
| | return true; |
| | }""", |
| | {"eventName": event_name, "elemId": elem_id}, |
| | ) |
| |
|
| |
|
| | def _read_live_obs_geometry(page) -> dict[str, dict[str, float] | None]: |
| | return page.evaluate( |
| | """() => { |
| | const root = document.getElementById('live_obs'); |
| | const container = root?.querySelector('.image-container'); |
| | const uploadContainer = root?.querySelector('.upload-container'); |
| | const frame = root?.querySelector('.image-frame'); |
| | const img = root?.querySelector('img'); |
| | const measure = (node) => { |
| | if (!node) return null; |
| | const rect = node.getBoundingClientRect(); |
| | return { width: rect.width, height: rect.height }; |
| | }; |
| | return { |
| | root: measure(root), |
| | container: measure(container), |
| | uploadContainer: measure(uploadContainer), |
| | frame: measure(frame), |
| | img: measure(img), |
| | }; |
| | }""" |
| | ) |
| |
|
| |
|
| | def _read_font_probe_snapshot(page) -> dict[str, str | None]: |
| | return page.evaluate( |
| | """() => { |
| | const heading = document.querySelector('#header_title h2'); |
| | const field = document.querySelector('#font_probe textarea, #font_probe input'); |
| | const prose = document.querySelector('#body_probe p'); |
| | const readSize = (node) => (node ? getComputedStyle(node).fontSize : null); |
| | return { |
| | header: readSize(heading), |
| | field: readSize(field), |
| | body: readSize(prose), |
| | }; |
| | }""" |
| | ) |
| |
|
| |
|
| | def _read_theme_snapshot(page) -> dict[str, str | bool | None]: |
| | return page.evaluate( |
| | """() => { |
| | const html = document.documentElement; |
| | const body = document.body; |
| | const overlay = document.getElementById('loading_overlay_group'); |
| | const readStore = (store, key) => { |
| | try { |
| | return store.getItem(key); |
| | } catch (error) { |
| | return null; |
| | } |
| | }; |
| | return { |
| | htmlHasDark: html ? html.classList.contains('dark') : null, |
| | bodyHasDark: body ? body.classList.contains('dark') : null, |
| | htmlTheme: html ? html.dataset.theme || null : null, |
| | bodyTheme: body ? body.dataset.theme || null : null, |
| | htmlInlineColorScheme: html ? html.style.colorScheme || null : null, |
| | bodyInlineColorScheme: body ? body.style.colorScheme || null : null, |
| | htmlColorScheme: html ? getComputedStyle(html).colorScheme : null, |
| | bodyColorScheme: body ? getComputedStyle(body).colorScheme : null, |
| | overlayBackground: overlay ? getComputedStyle(overlay).backgroundColor : null, |
| | storedTheme: readStore(window.localStorage, 'theme'), |
| | storedGradioTheme: readStore(window.localStorage, 'gradio-theme'), |
| | }; |
| | }""" |
| | ) |
| |
|
| |
|
| | @pytest.fixture |
| | def font_size_probe_ui_url(monkeypatch): |
| | config_module = importlib.reload(importlib.import_module("config")) |
| | monkeypatch.setattr(config_module, "UI_GLOBAL_FONT_SIZE", "32px") |
| | ui_layout = importlib.reload(importlib.import_module("ui_layout")) |
| |
|
| | with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: |
| | sock.bind(("127.0.0.1", 0)) |
| | port = int(sock.getsockname()[1]) |
| |
|
| | with gr.Blocks(title="Native font size probe test") as demo: |
| | gr.Markdown("## RoboMME Human Evaluation", elem_id="header_title") |
| | gr.Textbox(value="font probe", label="Probe", elem_id="font_probe") |
| | gr.Markdown("Probe body text", elem_id="body_probe") |
| |
|
| | _app, root_url, _share_url = demo.launch( |
| | server_name="127.0.0.1", |
| | server_port=port, |
| | prevent_thread_lock=True, |
| | quiet=True, |
| | show_error=True, |
| | ssr_mode=False, |
| | css=ui_layout.CSS, |
| | ) |
| | _wait_http_ready(root_url) |
| |
|
| | try: |
| | yield root_url |
| | finally: |
| | demo.close() |
| |
|
| |
|
| | @pytest.fixture |
| | def phase_machine_ui_url(): |
| | state = {"precheck_calls": 0, "play_clicks": 0} |
| | demo_video_url = "https://interactive-examples.mdn.mozilla.net/media/cc0-videos/flower.mp4" |
| | execution_video_path = gr.get_video("world.mp4") |
| | ui_layout = importlib.reload(importlib.import_module("ui_layout")) |
| |
|
| | with gr.Blocks(title="Native phase machine test") as demo: |
| | gr.HTML(f"<style>{ui_layout.CSS}</style>") |
| | phase_state = gr.State("init") |
| | post_execute_exec_state = gr.State(True) |
| |
|
| | with gr.Column(visible=True, elem_id="login_group") as login_group: |
| | login_btn = gr.Button("Login", elem_id="login_btn") |
| |
|
| | with gr.Column(visible=False, elem_id="main_interface") as main_interface: |
| | with gr.Column(visible=False, elem_id="video_phase_group") as video_phase_group: |
| | video_display = gr.Video(value=None, elem_id="demo_video", autoplay=False) |
| | watch_demo_video_btn = gr.Button( |
| | "Watch Video Input🎬", |
| | elem_id="watch_demo_video_btn", |
| | interactive=False, |
| | visible=False, |
| | ) |
| |
|
| | with gr.Column(visible=False, elem_id="execution_video_group") as execution_video_group: |
| | execute_video_display = gr.Video(value=None, elem_id="execute_video", autoplay=True) |
| |
|
| | with gr.Column(visible=False, elem_id="action_phase_group") as action_phase_group: |
| | img_display = gr.Image(value=np.zeros((24, 24, 3), dtype=np.uint8), elem_id="live_obs") |
| |
|
| | with gr.Column(visible=False, elem_id="control_panel_group") as control_panel_group: |
| | options_radio = gr.Radio(choices=[("pick", 0)], value=0, elem_id="action_radio") |
| | coords_box = gr.Textbox(value="please click the point selection image", elem_id="coords_box") |
| | with gr.Column(visible=False, elem_id="action_buttons_row") as action_buttons_row: |
| | exec_btn = gr.Button("EXECUTE", elem_id="exec_btn") |
| | reference_action_btn = gr.Button( |
| | "Ground Truth Action", |
| | elem_id="reference_action_btn", |
| | interactive=False, |
| | ) |
| | next_task_btn = gr.Button("Next Task", elem_id="next_task_btn") |
| | task_hint_display = gr.Textbox(value="hint", interactive=True, elem_id="task_hint_display") |
| |
|
| | log_output = gr.Markdown("", elem_id="log_output") |
| | simulate_stop_btn = gr.Button("Simulate Stop", elem_id="simulate_stop_btn") |
| |
|
| | demo.load( |
| | fn=None, |
| | js=ui_layout.DEMO_VIDEO_PLAY_BINDING_JS, |
| | queue=False, |
| | ) |
| |
|
| | def login_fn(): |
| | return ( |
| | gr.update(visible=False), |
| | gr.update(visible=True), |
| | gr.update(visible=True), |
| | gr.update(value=demo_video_url, visible=True), |
| | gr.update(visible=True, interactive=True), |
| | gr.update(visible=False), |
| | gr.update(visible=False), |
| | gr.update(visible=False), |
| | gr.update(interactive=False), |
| | gr.update(value="please click the point selection image"), |
| | gr.update(visible=False), |
| | "demo_video", |
| | ) |
| |
|
| | def on_play_demo_fn(): |
| | state["play_clicks"] += 1 |
| | return gr.update(visible=True, interactive=False) |
| |
|
| | def on_simulate_stop_fn(): |
| | return "stopped" |
| |
|
| | def on_video_end_fn(): |
| | return ( |
| | gr.update(visible=False), |
| | gr.update(visible=True), |
| | gr.update(visible=True), |
| | gr.update(visible=True), |
| | gr.update(interactive=True), |
| | gr.update(visible=False, interactive=False), |
| | "action_point", |
| | ) |
| |
|
| | def on_execute_video_end_fn(exec_enabled): |
| | return ( |
| | gr.update(visible=False), |
| | gr.update(visible=True), |
| | gr.update(visible=True), |
| | gr.update(interactive=True), |
| | gr.update(interactive=bool(exec_enabled)), |
| | gr.update(interactive=True), |
| | gr.update(interactive=False), |
| | gr.update(interactive=True), |
| | gr.update(interactive=True), |
| | "action_point", |
| | ) |
| |
|
| | def precheck_fn(_option_idx, _coords): |
| | state["precheck_calls"] += 1 |
| | if state["precheck_calls"] == 1: |
| | raise gr.Error("please click the point selection image before execute!") |
| |
|
| | def to_execute_fn(): |
| | return ( |
| | gr.update(interactive=False), |
| | gr.update(interactive=False), |
| | gr.update(interactive=False), |
| | gr.update(interactive=False), |
| | gr.update(interactive=False), |
| | gr.update(interactive=False), |
| | ) |
| |
|
| | def execute_fn(): |
| | time.sleep(0.8) |
| | return ( |
| | "executed", |
| | gr.update(visible=False), |
| | gr.update(interactive=False), |
| | gr.update(interactive=False), |
| | gr.update(value=execution_video_path, visible=True, playback_position=0), |
| | gr.update(visible=True), |
| | gr.update(interactive=False), |
| | "No need for coordinates", |
| | gr.update(interactive=False), |
| | gr.update(interactive=False), |
| | True, |
| | "execution_video", |
| | ) |
| |
|
| | login_btn.click( |
| | fn=login_fn, |
| | outputs=[ |
| | login_group, |
| | main_interface, |
| | video_phase_group, |
| | video_display, |
| | watch_demo_video_btn, |
| | action_phase_group, |
| | control_panel_group, |
| | action_buttons_row, |
| | reference_action_btn, |
| | coords_box, |
| | execution_video_group, |
| | phase_state, |
| | ], |
| | queue=False, |
| | ) |
| |
|
| | watch_demo_video_btn.click( |
| | fn=on_play_demo_fn, |
| | outputs=[watch_demo_video_btn], |
| | queue=False, |
| | ) |
| |
|
| | video_display.end( |
| | fn=on_video_end_fn, |
| | outputs=[ |
| | video_phase_group, |
| | action_phase_group, |
| | control_panel_group, |
| | action_buttons_row, |
| | reference_action_btn, |
| | watch_demo_video_btn, |
| | phase_state, |
| | ], |
| | queue=False, |
| | ) |
| | video_display.stop( |
| | fn=on_video_end_fn, |
| | outputs=[ |
| | video_phase_group, |
| | action_phase_group, |
| | control_panel_group, |
| | action_buttons_row, |
| | reference_action_btn, |
| | watch_demo_video_btn, |
| | phase_state, |
| | ], |
| | queue=False, |
| | ) |
| | execute_video_display.end( |
| | fn=on_execute_video_end_fn, |
| | inputs=[post_execute_exec_state], |
| | outputs=[ |
| | execution_video_group, |
| | action_phase_group, |
| | control_panel_group, |
| | options_radio, |
| | exec_btn, |
| | next_task_btn, |
| | img_display, |
| | reference_action_btn, |
| | task_hint_display, |
| | phase_state, |
| | ], |
| | queue=False, |
| | ) |
| | execute_video_display.stop( |
| | fn=on_execute_video_end_fn, |
| | inputs=[post_execute_exec_state], |
| | outputs=[ |
| | execution_video_group, |
| | action_phase_group, |
| | control_panel_group, |
| | options_radio, |
| | exec_btn, |
| | next_task_btn, |
| | img_display, |
| | reference_action_btn, |
| | task_hint_display, |
| | phase_state, |
| | ], |
| | queue=False, |
| | ) |
| | simulate_stop_btn.click( |
| | fn=on_simulate_stop_fn, |
| | outputs=[log_output], |
| | js="""() => { |
| | const show = (id, visible) => { |
| | const el = document.getElementById(id); |
| | if (!el) return; |
| | el.style.display = visible ? '' : 'none'; |
| | }; |
| | show('video_phase_group', false); |
| | show('demo_video', false); |
| | show('execution_video_group', false); |
| | show('execute_video', false); |
| | show('action_phase_group', true); |
| | show('live_obs', true); |
| | show('control_panel_group', true); |
| | show('action_radio', true); |
| | show('action_buttons_row', true); |
| | show('watch_demo_video_btn', false); |
| | const refBtn = |
| | document.querySelector('#reference_action_btn button') || |
| | document.querySelector('button#reference_action_btn'); |
| | if (refBtn) { |
| | refBtn.disabled = false; |
| | } |
| | return []; |
| | }""", |
| | queue=False, |
| | ) |
| |
|
| | exec_btn.click( |
| | fn=precheck_fn, |
| | inputs=[options_radio, coords_box], |
| | outputs=[], |
| | queue=False, |
| | ).then( |
| | fn=to_execute_fn, |
| | outputs=[ |
| | options_radio, |
| | exec_btn, |
| | next_task_btn, |
| | img_display, |
| | reference_action_btn, |
| | task_hint_display, |
| | ], |
| | queue=False, |
| | ).then( |
| | fn=execute_fn, |
| | outputs=[ |
| | log_output, |
| | action_phase_group, |
| | exec_btn, |
| | next_task_btn, |
| | execute_video_display, |
| | execution_video_group, |
| | options_radio, |
| | coords_box, |
| | reference_action_btn, |
| | task_hint_display, |
| | post_execute_exec_state, |
| | phase_state, |
| | ], |
| | queue=False, |
| | ) |
| |
|
| | port = _free_port() |
| | host = "127.0.0.1" |
| | root_url = f"http://{host}:{port}/" |
| |
|
| | app = FastAPI(title="native-phase-machine-test") |
| | app = gr.mount_gradio_app(app, demo, path="/") |
| |
|
| | config = uvicorn.Config(app, host=host, port=port, log_level="error") |
| | server = uvicorn.Server(config) |
| | thread = threading.Thread(target=server.run, daemon=True) |
| | thread.start() |
| | _wait_http_ready(root_url) |
| |
|
| | try: |
| | yield root_url, state |
| | finally: |
| | server.should_exit = True |
| | thread.join(timeout=10) |
| | demo.close() |
| |
|
| |
|
| | def test_global_font_size_applies_except_header_title(font_size_probe_ui_url): |
| | root_url = font_size_probe_ui_url |
| |
|
| | with sync_playwright() as p: |
| | browser = p.chromium.launch(headless=True) |
| | page = browser.new_page(viewport={"width": 1280, "height": 900}) |
| | page.goto(root_url, wait_until="domcontentloaded") |
| |
|
| | page.wait_for_selector("#header_title h2", timeout=10000) |
| | page.wait_for_selector("#font_probe textarea, #font_probe input", timeout=10000) |
| | page.wait_for_selector("#body_probe p", timeout=10000) |
| | page.wait_for_function( |
| | """() => { |
| | const heading = document.querySelector('#header_title h2'); |
| | const field = document.querySelector('#font_probe textarea, #font_probe input'); |
| | const prose = document.querySelector('#body_probe p'); |
| | if (!heading || !field || !prose) return false; |
| | return ( |
| | getComputedStyle(heading).fontSize === '26px' && |
| | getComputedStyle(field).fontSize === '32px' && |
| | getComputedStyle(prose).fontSize === '32px' |
| | ); |
| | }""", |
| | timeout=10000, |
| | ) |
| |
|
| | snapshot = _read_font_probe_snapshot(page) |
| | assert snapshot["header"] == "26px" |
| | assert snapshot["field"] == "32px" |
| | assert snapshot["body"] == "32px" |
| | assert snapshot["header"] != snapshot["field"] |
| |
|
| | browser.close() |
| |
|
| |
|
| | def test_create_ui_blocks_stays_light_under_dark_system_preference(monkeypatch): |
| | ui_layout = importlib.reload(importlib.import_module("ui_layout")) |
| |
|
| | fake_obs = np.zeros((24, 24, 3), dtype=np.uint8) |
| | fake_obs_img = Image.fromarray(fake_obs) |
| |
|
| | def fake_init_app(_request): |
| | return ( |
| | "uid-1", |
| | gr.update(visible=True), |
| | fake_obs_img, |
| | "ready", |
| | gr.update(choices=[("pick", 0)], value=None), |
| | "goal", |
| | "No need for coordinates", |
| | gr.update(value=None, visible=False), |
| | gr.update(visible=False, interactive=False), |
| | "PickXtimes (Episode 1)", |
| | "Completed: 0", |
| | gr.update(interactive=True), |
| | gr.update(interactive=True), |
| | gr.update(interactive=True), |
| | gr.update(visible=False), |
| | gr.update(visible=True), |
| | gr.update(visible=True), |
| | gr.update(value="hint"), |
| | gr.update(visible=False), |
| | gr.update(interactive=True), |
| | ) |
| |
|
| | monkeypatch.setattr(ui_layout, "init_app", fake_init_app) |
| |
|
| | demo = ui_layout.create_ui_blocks() |
| | port = _free_port() |
| | _app, root_url, _share_url = demo.launch( |
| | server_name="127.0.0.1", |
| | server_port=port, |
| | prevent_thread_lock=True, |
| | quiet=True, |
| | show_error=True, |
| | ssr_mode=False, |
| | theme=ui_layout.APP_THEME, |
| | css=ui_layout.CSS, |
| | head=ui_layout.THEME_LOCK_HEAD, |
| | ) |
| | _wait_http_ready(root_url) |
| |
|
| | try: |
| | with sync_playwright() as p: |
| | browser = p.chromium.launch(headless=True) |
| | context = browser.new_context( |
| | viewport={"width": 1280, "height": 900}, |
| | color_scheme="dark", |
| | ) |
| | context.add_init_script( |
| | """ |
| | window.localStorage.setItem('theme', 'dark'); |
| | window.localStorage.setItem('gradio-theme', 'dark'); |
| | """ |
| | ) |
| | page = context.new_page() |
| | page.goto(root_url, wait_until="domcontentloaded") |
| |
|
| | page.wait_for_function( |
| | """() => { |
| | const html = document.documentElement; |
| | const body = document.body; |
| | if (!html || !body) return false; |
| | return ( |
| | typeof window.__robommeForceLightTheme === 'function' && |
| | !html.classList.contains('dark') && |
| | !body.classList.contains('dark') && |
| | html.dataset.theme === 'light' && |
| | body.dataset.theme === 'light' && |
| | html.style.colorScheme === 'light' && |
| | body.style.colorScheme === 'light' |
| | ); |
| | }""", |
| | timeout=15000, |
| | ) |
| |
|
| | snapshot = _read_theme_snapshot(page) |
| | assert snapshot["htmlHasDark"] is False |
| | assert snapshot["bodyHasDark"] is False |
| | assert snapshot["htmlTheme"] == "light" |
| | assert snapshot["bodyTheme"] == "light" |
| | assert snapshot["htmlInlineColorScheme"] == "light" |
| | assert snapshot["bodyInlineColorScheme"] == "light" |
| | assert snapshot["storedTheme"] == "light" |
| | assert snapshot["storedGradioTheme"] == "light" |
| |
|
| | page.reload(wait_until="domcontentloaded") |
| | page.wait_for_function( |
| | """() => { |
| | const html = document.documentElement; |
| | const body = document.body; |
| | return ( |
| | !!html && |
| | !!body && |
| | typeof window.__robommeForceLightTheme === 'function' && |
| | !html.classList.contains('dark') && |
| | !body.classList.contains('dark') && |
| | html.dataset.theme === 'light' && |
| | body.dataset.theme === 'light' |
| | ); |
| | }""", |
| | timeout=15000, |
| | ) |
| |
|
| | reloaded_snapshot = _read_theme_snapshot(page) |
| | assert reloaded_snapshot["htmlHasDark"] is False |
| | assert reloaded_snapshot["bodyHasDark"] is False |
| | assert reloaded_snapshot["htmlInlineColorScheme"] == "light" |
| | assert reloaded_snapshot["bodyInlineColorScheme"] == "light" |
| | assert reloaded_snapshot["storedTheme"] == "light" |
| | assert reloaded_snapshot["storedGradioTheme"] == "light" |
| |
|
| | context.close() |
| | browser.close() |
| | finally: |
| | demo.close() |
| |
|
| |
|
| | def test_phase_machine_runtime_flow_and_execute_precheck(phase_machine_ui_url): |
| | root_url, state = phase_machine_ui_url |
| |
|
| | with sync_playwright() as p: |
| | browser = p.chromium.launch(headless=True) |
| | page = browser.new_page(viewport={"width": 1280, "height": 900}) |
| | page.goto(root_url, wait_until="domcontentloaded") |
| |
|
| | page.wait_for_timeout(2500) |
| | page.wait_for_selector("#login_btn", timeout=20000) |
| | page.click("#login_btn") |
| |
|
| | page.wait_for_function( |
| | """() => { |
| | const el = document.getElementById('demo_video'); |
| | return !!el && getComputedStyle(el).display !== 'none'; |
| | }""" |
| | ) |
| |
|
| | phase_after_login = page.evaluate( |
| | """() => { |
| | const visible = (id) => { |
| | const el = document.getElementById(id); |
| | if (!el) return false; |
| | const st = getComputedStyle(el); |
| | return st.display !== 'none' && st.visibility !== 'hidden' && el.getClientRects().length > 0; |
| | }; |
| | return { |
| | video: visible('demo_video'), |
| | watchButton: visible('watch_demo_video_btn'), |
| | action: visible('live_obs'), |
| | control: visible('action_radio'), |
| | }; |
| | }""" |
| | ) |
| | assert phase_after_login == { |
| | "video": True, |
| | "watchButton": True, |
| | "action": False, |
| | "control": False, |
| | } |
| |
|
| | page.wait_for_selector("#demo_video video", timeout=5000) |
| | page.wait_for_function( |
| | """() => { |
| | const videoEl = document.querySelector('#demo_video video'); |
| | const button = |
| | document.querySelector('#watch_demo_video_btn button') || |
| | document.querySelector('button#watch_demo_video_btn'); |
| | return !!videoEl && !!videoEl.currentSrc && !!button && button.disabled === false && videoEl.paused === true; |
| | }""", |
| | timeout=10000, |
| | ) |
| | controls_after_login = _read_demo_video_controls(page) |
| | assert controls_after_login["videoVisible"] is True |
| | assert controls_after_login["buttonVisible"] is True |
| | assert controls_after_login["buttonDisabled"] is False |
| | assert controls_after_login["autoplay"] is False |
| | assert controls_after_login["paused"] is True |
| |
|
| | _click_demo_video_button(page) |
| | page.wait_for_function( |
| | """() => { |
| | const videoEl = document.querySelector('#demo_video video'); |
| | const button = |
| | document.querySelector('#watch_demo_video_btn button') || |
| | document.querySelector('button#watch_demo_video_btn'); |
| | if (!videoEl || !button) return false; |
| | return button.disabled === true && (videoEl.paused === false || videoEl.currentTime > 0); |
| | }""", |
| | timeout=10000, |
| | ) |
| | controls_after_click = _read_demo_video_controls(page) |
| | assert controls_after_click["buttonDisabled"] is True |
| | assert controls_after_click["paused"] is False |
| |
|
| | did_dispatch_end = _dispatch_video_event(page, "ended") |
| | assert did_dispatch_end |
| |
|
| | page.wait_for_function( |
| | """() => { |
| | const action = document.getElementById('live_obs'); |
| | const control = document.getElementById('action_radio'); |
| | const watchButton = document.getElementById('watch_demo_video_btn'); |
| | if (!action || !control || !watchButton) return false; |
| | return ( |
| | getComputedStyle(action).display !== 'none' && |
| | getComputedStyle(control).display !== 'none' && |
| | getComputedStyle(watchButton).display === 'none' |
| | ); |
| | }""" |
| | ) |
| |
|
| | did_click_exec = page.evaluate( |
| | """() => { |
| | const btn = document.getElementById('exec_btn'); |
| | if (!btn) return false; |
| | btn.click(); |
| | return true; |
| | }""" |
| | ) |
| | assert did_click_exec |
| | page.wait_for_timeout(300) |
| |
|
| | phase_after_failed_precheck = page.evaluate( |
| | """() => { |
| | const visible = (id) => { |
| | const el = document.getElementById(id); |
| | if (!el) return false; |
| | return getComputedStyle(el).display !== 'none'; |
| | }; |
| | return { |
| | action: visible('live_obs'), |
| | }; |
| | }""" |
| | ) |
| | assert phase_after_failed_precheck == {"action": True} |
| |
|
| | did_click_exec = page.evaluate( |
| | """() => { |
| | const btn = document.getElementById('exec_btn'); |
| | if (!btn) return false; |
| | btn.click(); |
| | return true; |
| | }""" |
| | ) |
| | assert did_click_exec |
| |
|
| | page.wait_for_function( |
| | """() => { |
| | const resolveButton = (id) => { |
| | return document.querySelector(`#${id} button`) || document.querySelector(`button#${id}`); |
| | }; |
| | const execBtn = resolveButton('exec_btn'); |
| | const nextBtn = resolveButton('next_task_btn'); |
| | return !!execBtn && !!nextBtn && execBtn.disabled === true && nextBtn.disabled === true; |
| | }""" |
| | ) |
| |
|
| | interactive_snapshot = page.evaluate( |
| | """() => { |
| | const resolveButton = (id) => { |
| | return document.querySelector(`#${id} button`) || document.querySelector(`button#${id}`); |
| | }; |
| | const execBtn = resolveButton('exec_btn'); |
| | const nextBtn = resolveButton('next_task_btn'); |
| | return { |
| | execDisabled: execBtn ? execBtn.disabled : null, |
| | nextDisabled: nextBtn ? nextBtn.disabled : null, |
| | }; |
| | }""" |
| | ) |
| | assert interactive_snapshot["execDisabled"] is True |
| | assert interactive_snapshot["nextDisabled"] is True |
| |
|
| | page.wait_for_function( |
| | """() => { |
| | const videoEl = document.querySelector('#execute_video video'); |
| | return !!videoEl && videoEl.autoplay === true && (videoEl.paused === false || videoEl.currentTime > 0); |
| | }""", |
| | timeout=6000, |
| | ) |
| | execute_video_controls = _read_demo_video_controls(page, elem_id="execute_video", button_elem_id=None) |
| | assert execute_video_controls["autoplay"] is True |
| | assert execute_video_controls["paused"] is False |
| | execute_phase_snapshot = _read_phase_visibility(page) |
| | assert execute_phase_snapshot["actionPhase"] is False |
| | assert execute_phase_snapshot["controlPhase"] is True |
| | panel_snapshot = page.evaluate( |
| | """() => { |
| | const resolveButton = (id) => { |
| | return document.querySelector(`#${id} button`) || document.querySelector(`button#${id}`); |
| | }; |
| | const radio = document.querySelector('#action_radio input[type="radio"]'); |
| | const refBtn = resolveButton('reference_action_btn'); |
| | const hint = document.querySelector('#task_hint_display textarea, #task_hint_display input'); |
| | return { |
| | radioDisabled: radio ? radio.disabled : null, |
| | refDisabled: refBtn ? refBtn.disabled : null, |
| | hintDisabled: hint ? hint.disabled : null, |
| | }; |
| | }""" |
| | ) |
| | assert panel_snapshot["radioDisabled"] is True |
| | assert panel_snapshot["refDisabled"] is True |
| | assert panel_snapshot["hintDisabled"] is True |
| |
|
| | did_dispatch_end = _dispatch_video_event(page, "ended", elem_id="execute_video") |
| | assert did_dispatch_end |
| |
|
| | page.wait_for_function( |
| | """() => { |
| | const execBtn = document.querySelector('button#exec_btn') || document.querySelector('#exec_btn button'); |
| | const action = document.getElementById('live_obs'); |
| | if (!execBtn || !action) return false; |
| | return execBtn.disabled === false && getComputedStyle(action).display !== 'none'; |
| | }""", |
| | timeout=6000, |
| | ) |
| |
|
| | final_interactive_snapshot = page.evaluate( |
| | """() => { |
| | const resolveButton = (id) => { |
| | return document.querySelector(`#${id} button`) || document.querySelector(`button#${id}`); |
| | }; |
| | const execBtn = resolveButton('exec_btn'); |
| | const nextBtn = resolveButton('next_task_btn'); |
| | return { |
| | execDisabled: execBtn ? execBtn.disabled : null, |
| | nextDisabled: nextBtn ? nextBtn.disabled : null, |
| | }; |
| | }""" |
| | ) |
| | assert final_interactive_snapshot["execDisabled"] is False |
| | assert final_interactive_snapshot["nextDisabled"] is False |
| |
|
| | browser.close() |
| |
|
| | assert state["precheck_calls"] >= 2 |
| | assert state["play_clicks"] == 1 |
| |
|
| |
|
| | def test_reference_action_button_is_green_only_when_interactive(phase_machine_ui_url): |
| | root_url, _state = phase_machine_ui_url |
| |
|
| | with sync_playwright() as p: |
| | browser = p.chromium.launch(headless=True) |
| | page = browser.new_page(viewport={"width": 1280, "height": 900}) |
| | page.goto(root_url, wait_until="domcontentloaded") |
| |
|
| | page.wait_for_timeout(2500) |
| | page.wait_for_selector("#login_btn", timeout=20000) |
| | page.click("#login_btn") |
| |
|
| | disabled_snapshot = _resolve_button_snapshot(page, "reference_action_btn") |
| | if disabled_snapshot["found"]: |
| | assert disabled_snapshot["disabled"] is True |
| | assert disabled_snapshot["backgroundColor"] != "rgb(31, 139, 76)" |
| |
|
| | page.wait_for_selector("#demo_video video", timeout=5000) |
| | _click_demo_video_button(page) |
| | page.wait_for_function( |
| | """() => { |
| | const button = |
| | document.querySelector('#watch_demo_video_btn button') || |
| | document.querySelector('button#watch_demo_video_btn'); |
| | return !!button && button.disabled === true; |
| | }""", |
| | timeout=5000, |
| | ) |
| | did_dispatch_end = _dispatch_video_event(page, "ended") |
| | assert did_dispatch_end |
| |
|
| | page.wait_for_function( |
| | """() => { |
| | const button = document.querySelector('#reference_action_btn button') || document.querySelector('button#reference_action_btn'); |
| | return !!button && button.disabled === false; |
| | }""", |
| | timeout=6000, |
| | ) |
| |
|
| | enabled_snapshot = _resolve_button_snapshot(page, "reference_action_btn") |
| | assert enabled_snapshot["found"] is True |
| | assert enabled_snapshot["disabled"] is False |
| | assert enabled_snapshot["backgroundColor"] == "rgb(31, 139, 76)" |
| | assert enabled_snapshot["borderColor"] == "rgb(31, 139, 76)" |
| | assert enabled_snapshot["color"] == "rgb(255, 255, 255)" |
| |
|
| | browser.close() |
| |
|
| |
|
| | @pytest.mark.xfail( |
| | reason="Gradio 6.9.0 output video stop path is not reliably triggerable in headless Chromium; transition contract is covered by unit tests.", |
| | strict=False, |
| | ) |
| | def test_demo_video_stop_event_transitions_and_hides_button(phase_machine_ui_url): |
| | root_url, state = phase_machine_ui_url |
| |
|
| | with sync_playwright() as p: |
| | browser = p.chromium.launch(headless=True) |
| | page = browser.new_page(viewport={"width": 1280, "height": 900}) |
| | page.goto(root_url, wait_until="domcontentloaded") |
| |
|
| | page.wait_for_timeout(2500) |
| | page.wait_for_selector("#login_btn", timeout=20000) |
| | page.click("#login_btn") |
| | page.wait_for_selector("#demo_video video", timeout=5000) |
| |
|
| | _click_demo_video_button(page) |
| | page.wait_for_function( |
| | """() => { |
| | const button = |
| | document.querySelector('#watch_demo_video_btn button') || |
| | document.querySelector('button#watch_demo_video_btn'); |
| | return !!button && button.disabled === true; |
| | }""", |
| | timeout=5000, |
| | ) |
| |
|
| | page.locator("#simulate_stop_btn button, button#simulate_stop_btn").first.click() |
| |
|
| | page.wait_for_function( |
| | """() => { |
| | const visible = (id) => { |
| | const el = document.getElementById(id); |
| | if (!el) return false; |
| | const st = getComputedStyle(el); |
| | return st.display !== 'none' && st.visibility !== 'hidden' && el.getClientRects().length > 0; |
| | }; |
| | return ( |
| | !visible('watch_demo_video_btn') && |
| | !visible('demo_video') && |
| | visible('live_obs') && |
| | visible('action_radio') |
| | ); |
| | }""", |
| | timeout=5000, |
| | ) |
| |
|
| | browser.close() |
| |
|
| | assert state["play_clicks"] == 1 |
| |
|
| |
|
| | def test_unified_loading_overlay_init_flow(monkeypatch): |
| | config_module = importlib.reload(importlib.import_module("config")) |
| | monkeypatch.setattr(config_module, "UI_GLOBAL_FONT_SIZE", "32px") |
| | ui_layout = importlib.reload(importlib.import_module("ui_layout")) |
| |
|
| | canonical_copy = "The episode is loading..." |
| | legacy_copy = "Loading environment, please wait..." |
| | superseded_copy = "Logging in and setting up environment... Please wait." |
| | fake_obs = np.zeros((24, 24, 3), dtype=np.uint8) |
| | fake_obs_img = Image.fromarray(fake_obs) |
| | calls = {"init": 0} |
| |
|
| | def fake_init_app(_request=None): |
| | calls["init"] += 1 |
| | time.sleep(0.8) |
| | return ( |
| | "uid-init", |
| | gr.update(visible=True), |
| | gr.update(value=fake_obs_img, interactive=False), |
| | "ready", |
| | gr.update(choices=[("pick", 0)], value=None), |
| | "goal", |
| | "No need for coordinates", |
| | gr.update(value=None, visible=False), |
| | gr.update(visible=False, interactive=False), |
| | "PickXtimes (Episode 1)", |
| | "Completed: 0", |
| | gr.update(interactive=True), |
| | gr.update(interactive=True), |
| | gr.update(interactive=True), |
| | gr.update(visible=False), |
| | gr.update(visible=True), |
| | gr.update(visible=True), |
| | gr.update(value="hint"), |
| | gr.update(interactive=True), |
| | ) |
| |
|
| | monkeypatch.setattr(ui_layout, "init_app", fake_init_app) |
| |
|
| | demo = ui_layout.create_ui_blocks() |
| |
|
| | port = _free_port() |
| | host = "127.0.0.1" |
| | _app, root_url, _share_url = demo.launch( |
| | server_name=host, |
| | server_port=port, |
| | prevent_thread_lock=True, |
| | quiet=True, |
| | show_error=True, |
| | ssr_mode=False, |
| | theme=ui_layout.APP_THEME, |
| | css=ui_layout.CSS, |
| | head=ui_layout.THEME_LOCK_HEAD, |
| | ) |
| | _wait_http_ready(root_url) |
| |
|
| | try: |
| | with sync_playwright() as p: |
| | browser = p.chromium.launch(headless=True) |
| | page = browser.new_page(viewport={"width": 1280, "height": 900}) |
| | page.goto(root_url, wait_until="domcontentloaded") |
| |
|
| | page.wait_for_function( |
| | """() => { |
| | const node = document.querySelector('.progress-text'); |
| | if (!node) return false; |
| | const style = getComputedStyle(node); |
| | const rect = node.getBoundingClientRect(); |
| | return ( |
| | style.display !== 'none' && |
| | style.visibility !== 'hidden' && |
| | Number.parseFloat(style.opacity || '1') > 0 && |
| | rect.width > 0 && |
| | rect.height > 0 && |
| | (node.textContent || '').includes('The episode is loading...') |
| | ); |
| | }""", |
| | timeout=5000, |
| | ) |
| | progress_snapshot = _read_progress_text_snapshot(page) |
| | markdown_snapshot = _read_progress_markdown_snapshot(page) |
| |
|
| | assert progress_snapshot["present"] is True |
| | assert progress_snapshot["visible"] is True |
| | assert progress_snapshot["text"] == canonical_copy |
| | assert progress_snapshot["x"] is not None and progress_snapshot["x"] < 500 |
| | assert progress_snapshot["y"] is not None and progress_snapshot["y"] > 300 |
| | assert markdown_snapshot["text"] is None |
| | assert markdown_snapshot["pendingVisible"] is False |
| | assert markdown_snapshot["markdownVisible"] is False |
| | assert page.locator("#robomme_episode_loading_copy").count() == 0 |
| | assert superseded_copy not in str(progress_snapshot["text"] or "") |
| | assert legacy_copy not in page.content() |
| | assert page.locator("#loading_overlay_group").count() == 0 |
| |
|
| | page.wait_for_function( |
| | """() => !document.querySelector('.progress-text')""", |
| | timeout=15000, |
| | ) |
| | page.wait_for_selector("#main_interface_root", state="visible", timeout=15000) |
| | page.wait_for_function( |
| | """() => { |
| | const root = document.getElementById('header_task'); |
| | if (!root) return false; |
| | const input = root.querySelector('input'); |
| | if (input && typeof input.value === 'string' && input.value.trim() === 'PickXtimes') { |
| | return true; |
| | } |
| | const selected = root.querySelector('.single-select'); |
| | return !!selected && (selected.textContent || '').trim() === 'PickXtimes'; |
| | }""", |
| | timeout=15000, |
| | ) |
| | assert _read_header_task_value(page) == "PickXtimes" |
| |
|
| | browser.close() |
| | finally: |
| | demo.close() |
| |
|
| | assert calls["init"] >= 1 |
| |
|
| |
|
| | def test_episode_loading_copy_after_change_episode(monkeypatch): |
| | ui_layout = importlib.reload(importlib.import_module("ui_layout")) |
| |
|
| | fake_obs = np.zeros((24, 24, 3), dtype=np.uint8) |
| | fake_obs_img = Image.fromarray(fake_obs) |
| | calls = {"init": 0, "next": 0} |
| |
|
| | def _load_result(uid: str, episode_idx: int, log_text: str): |
| | return ( |
| | uid, |
| | gr.update(visible=True), |
| | gr.update(value=fake_obs_img.copy(), interactive=False), |
| | log_text, |
| | gr.update(choices=[("pick", 0)], value=None), |
| | "goal", |
| | "No need for coordinates", |
| | gr.update(value=None, visible=False), |
| | gr.update(visible=False, interactive=False), |
| | f"PickXtimes (Episode {episode_idx})", |
| | f"Completed: {episode_idx - 1}", |
| | gr.update(interactive=True), |
| | gr.update(interactive=True), |
| | gr.update(interactive=True), |
| | gr.update(visible=False), |
| | gr.update(visible=True), |
| | gr.update(visible=True), |
| | gr.update(value="hint"), |
| | gr.update(interactive=True), |
| | ) |
| |
|
| | def fake_init_app(_request=None): |
| | calls["init"] += 1 |
| | return _load_result("uid-next-episode", 1, "ready-1") |
| |
|
| | def fake_load_next_task_wrapper(uid): |
| | calls["next"] += 1 |
| | time.sleep(0.8) |
| | return _load_result(uid, 2, "ready-2") |
| |
|
| | monkeypatch.setattr(ui_layout, "init_app", fake_init_app) |
| | monkeypatch.setattr(ui_layout, "load_next_task_wrapper", fake_load_next_task_wrapper) |
| |
|
| | demo = ui_layout.create_ui_blocks() |
| |
|
| | port = _free_port() |
| | host = "127.0.0.1" |
| | root_url = f"http://{host}:{port}/" |
| |
|
| | app = FastAPI(title="episode-loading-copy-change-episode-test") |
| | app = gr.mount_gradio_app(app, demo, path="/") |
| |
|
| | config = uvicorn.Config(app, host=host, port=port, log_level="error") |
| | server = uvicorn.Server(config) |
| | thread = threading.Thread(target=server.run, daemon=True) |
| | thread.start() |
| | _wait_http_ready(root_url) |
| |
|
| | try: |
| | with sync_playwright() as p: |
| | browser = p.chromium.launch(headless=True) |
| | page = browser.new_page(viewport={"width": 1280, "height": 900}) |
| | page.goto(root_url, wait_until="domcontentloaded") |
| | page.wait_for_selector("#main_interface_root", state="visible", timeout=15000) |
| | page.wait_for_function( |
| | """() => { |
| | const root = document.getElementById('header_task'); |
| | if (!root) return false; |
| | const input = root.querySelector('input'); |
| | if (input && typeof input.value === 'string' && input.value.trim() === 'PickXtimes') { |
| | return true; |
| | } |
| | const selected = root.querySelector('.single-select'); |
| | return !!selected && (selected.textContent || '').trim() === 'PickXtimes'; |
| | }""", |
| | timeout=5000, |
| | ) |
| | page.wait_for_function( |
| | """() => { |
| | const host = document.getElementById('native_progress_host'); |
| | const markdown = host?.querySelector('[data-testid="markdown"]'); |
| | const prose = markdown ? markdown.querySelector('.prose, .md') || markdown : null; |
| | const text = prose ? ((prose.innerText || prose.textContent || '').trim()) : ''; |
| | return text === ''; |
| | }""", |
| | timeout=5000, |
| | ) |
| |
|
| | page.locator("#next_task_btn button, button#next_task_btn").first.click() |
| |
|
| | page.wait_for_function( |
| | """() => { |
| | const node = document.querySelector('.progress-text'); |
| | if (!node) return false; |
| | const style = getComputedStyle(node); |
| | const rect = node.getBoundingClientRect(); |
| | return ( |
| | style.display !== 'none' && |
| | style.visibility !== 'hidden' && |
| | Number.parseFloat(style.opacity || '1') > 0 && |
| | rect.width > 0 && |
| | rect.height > 0 && |
| | (node.textContent || '').trim() === 'The episode is loading...' |
| | ); |
| | }""", |
| | timeout=5000, |
| | ) |
| |
|
| | progress_snapshot = _read_progress_text_snapshot(page) |
| | markdown_snapshot = _read_progress_markdown_snapshot(page) |
| | assert progress_snapshot["present"] is True |
| | assert progress_snapshot["visible"] is True |
| | assert progress_snapshot["text"] == "The episode is loading..." |
| | assert markdown_snapshot["text"] is None |
| | assert markdown_snapshot["pendingVisible"] is False |
| | assert markdown_snapshot["markdownVisible"] is False |
| | assert page.locator("#robomme_episode_loading_copy").count() == 0 |
| |
|
| | deadline = time.time() + 15.0 |
| | while time.time() < deadline: |
| | if _read_log_output_value(page) == "ready-2": |
| | break |
| | time.sleep(0.1) |
| | else: |
| | raise AssertionError("next episode load did not complete") |
| | assert page.locator("#robomme_episode_loading_copy").count() == 0 |
| |
|
| | browser.close() |
| | finally: |
| | server.should_exit = True |
| | thread.join(timeout=10) |
| | demo.close() |
| |
|
| | assert calls == {"init": 1, "next": 1} |
| |
|
| |
|
| | def test_no_video_task_hides_manual_demo_button(monkeypatch): |
| | ui_layout = importlib.reload(importlib.import_module("ui_layout")) |
| |
|
| | fake_obs = np.zeros((24, 24, 3), dtype=np.uint8) |
| | fake_obs_img = Image.fromarray(fake_obs) |
| |
|
| | def fake_init_app(_request=None): |
| | return ( |
| | "uid-no-video", |
| | gr.update(visible=True), |
| | gr.update(value=fake_obs_img.copy(), interactive=False), |
| | "ready", |
| | gr.update(choices=[("pick", 0)], value=None), |
| | "goal", |
| | "No need for coordinates", |
| | gr.update(value=None, visible=False), |
| | gr.update(visible=False, interactive=False), |
| | "PickXtimes (Episode 1)", |
| | "Completed: 0", |
| | gr.update(interactive=True), |
| | gr.update(interactive=True), |
| | gr.update(interactive=True), |
| | gr.update(visible=False), |
| | gr.update(visible=True), |
| | gr.update(visible=True), |
| | gr.update(value="hint"), |
| | gr.update(interactive=True), |
| | ) |
| |
|
| | monkeypatch.setattr(ui_layout, "init_app", fake_init_app) |
| |
|
| | demo = ui_layout.create_ui_blocks() |
| |
|
| | port = _free_port() |
| | host = "127.0.0.1" |
| | root_url = f"http://{host}:{port}/" |
| |
|
| | app = FastAPI(title="native-no-video-test") |
| | app = gr.mount_gradio_app(app, demo, path="/") |
| |
|
| | config = uvicorn.Config(app, host=host, port=port, log_level="error") |
| | server = uvicorn.Server(config) |
| | thread = threading.Thread(target=server.run, daemon=True) |
| | thread.start() |
| | _wait_http_ready(root_url) |
| |
|
| | try: |
| | with sync_playwright() as p: |
| | browser = p.chromium.launch(headless=True) |
| | page = browser.new_page(viewport={"width": 1280, "height": 900}) |
| | page.goto(root_url, wait_until="domcontentloaded") |
| | page.wait_for_selector("#main_interface_root", state="visible", timeout=15000) |
| |
|
| | page.wait_for_function( |
| | """() => { |
| | const visible = (id) => { |
| | const el = document.getElementById(id); |
| | if (!el) return false; |
| | const st = getComputedStyle(el); |
| | return st.display !== 'none' && st.visibility !== 'hidden' && el.getClientRects().length > 0; |
| | }; |
| | return ( |
| | !visible('video_phase_group') && |
| | !visible('demo_video') && |
| | !visible('watch_demo_video_btn') && |
| | visible('action_phase_group') && |
| | visible('control_panel_group') |
| | ); |
| | }""", |
| | timeout=5000, |
| | ) |
| |
|
| | phase_snapshot = _read_phase_visibility(page) |
| | controls_snapshot = _read_demo_video_controls(page) |
| | assert phase_snapshot["videoPhase"] is False |
| | assert phase_snapshot["video"] is False |
| | assert phase_snapshot["watchButton"] is False |
| | assert phase_snapshot["actionPhase"] is True |
| | assert phase_snapshot["controlPhase"] is True |
| | assert controls_snapshot["buttonVisible"] is False |
| |
|
| | browser.close() |
| | finally: |
| | server.should_exit = True |
| | thread.join(timeout=10) |
| | demo.close() |
| |
|
| |
|
| | def test_point_wait_state_pulses_live_obs_and_updates_system_log(monkeypatch): |
| | config_module = importlib.reload(importlib.import_module("config")) |
| | callbacks = importlib.reload(importlib.import_module("gradio_callbacks")) |
| | ui_layout = importlib.reload(importlib.import_module("ui_layout")) |
| |
|
| | fake_obs = np.zeros((24, 48, 3), dtype=np.uint8) |
| | fake_obs[:, :] = [15, 20, 25] |
| | fake_obs_img = Image.fromarray(fake_obs) |
| |
|
| | class FakeSession: |
| | raw_solve_options = [ |
| | {"label": "a", "available": [object()]}, |
| | {"label": "b", "available": False}, |
| | ] |
| |
|
| | def get_pil_image(self, use_segmented=False): |
| | _ = use_segmented |
| | return fake_obs_img.copy() |
| |
|
| | def fake_init_app(_request=None): |
| | return ( |
| | "uid-point-wait", |
| | gr.update(visible=True), |
| | gr.update( |
| | value=fake_obs_img.copy(), |
| | interactive=False, |
| | elem_classes=config_module.get_live_obs_elem_classes(), |
| | ), |
| | config_module.UI_TEXT["log"]["action_selection_prompt"], |
| | gr.update(choices=[("pick", 0), ("skip", 1)], value=None), |
| | "goal", |
| | gr.update( |
| | value=config_module.UI_TEXT["coords"]["not_needed"], |
| | visible=True, |
| | interactive=False, |
| | ), |
| | gr.update(value=None, visible=False), |
| | gr.update(visible=False, interactive=False), |
| | "PointEnv (Episode 1)", |
| | "Completed: 0", |
| | gr.update(interactive=True), |
| | gr.update(interactive=True), |
| | gr.update(interactive=True), |
| | gr.update(visible=False), |
| | gr.update(visible=True), |
| | gr.update(visible=True), |
| | gr.update(value="hint"), |
| | gr.update(interactive=True), |
| | ) |
| |
|
| | monkeypatch.setattr(ui_layout, "init_app", fake_init_app) |
| | monkeypatch.setattr(callbacks, "get_session", lambda uid: FakeSession()) |
| |
|
| | demo = ui_layout.create_ui_blocks() |
| |
|
| | port = _free_port() |
| | host = "127.0.0.1" |
| | root_url = f"http://{host}:{port}/" |
| |
|
| | app = FastAPI(title="point-wait-state-test") |
| | app = gr.mount_gradio_app(app, demo, path="/") |
| |
|
| | config = uvicorn.Config(app, host=host, port=port, log_level="error") |
| | server = uvicorn.Server(config) |
| | thread = threading.Thread(target=server.run, daemon=True) |
| | thread.start() |
| | _wait_http_ready(root_url) |
| |
|
| | try: |
| | with sync_playwright() as p: |
| | browser = p.chromium.launch(headless=True) |
| | page = browser.new_page(viewport={"width": 1280, "height": 900}) |
| | page.goto(root_url, wait_until="domcontentloaded") |
| | page.add_style_tag(content=ui_layout.CSS) |
| | page.wait_for_selector("#main_interface_root", state="visible", timeout=15000) |
| | page.wait_for_selector("#live_obs img", timeout=15000) |
| |
|
| | initial_classes = _read_elem_classes(page, "live_obs") |
| | assert initial_classes is not None |
| | assert config_module.LIVE_OBS_POINT_WAIT_CLASS not in initial_classes |
| | assert _read_log_output_value(page) == config_module.UI_TEXT["log"]["action_selection_prompt"] |
| | initial_card_wait = _read_media_card_wait_snapshot(page) |
| | initial_transforms = _read_live_obs_transform_snapshot(page) |
| | initial_img_box = page.locator("#live_obs img").bounding_box() |
| | initial_frame_box = page.locator("#live_obs .image-frame").bounding_box() |
| | assert initial_card_wait["opacity"] == 0 |
| | assert initial_card_wait["animationName"] == "none" |
| | assert initial_transforms["imgTransform"] == "none" |
| | assert initial_transforms["frameTransform"] == "none" |
| | assert initial_img_box is not None |
| | assert initial_frame_box is not None |
| |
|
| | page.locator("#action_radio input[type='radio']").first.check(force=True) |
| |
|
| | page.wait_for_function( |
| | """(state) => { |
| | const liveObs = document.getElementById('live_obs'); |
| | const coordsRoot = document.getElementById('coords_box'); |
| | const coordsField = coordsRoot?.querySelector('textarea, input'); |
| | const logRoot = document.getElementById('log_output'); |
| | const logField = logRoot?.querySelector('textarea, input'); |
| | const mediaCard = document.getElementById('media_card'); |
| | const mediaAfter = mediaCard ? getComputedStyle(mediaCard, '::after') : null; |
| | const coordsValue = coordsField ? coordsField.value.trim() : ''; |
| | const logValue = logField ? logField.value.trim() : (logRoot?.textContent || '').trim(); |
| | return ( |
| | !!liveObs && |
| | liveObs.classList.contains(state.waitClass) && |
| | !!mediaAfter && |
| | Number.parseFloat(mediaAfter.opacity || '0') > 0.5 && |
| | mediaAfter.animationName === state.cardAnimation && |
| | coordsValue === state.coordsPrompt && |
| | logValue === state.waitLog |
| | ); |
| | }""", |
| | arg={ |
| | "cardAnimation": "media-card-point-ring", |
| | "waitClass": config_module.LIVE_OBS_POINT_WAIT_CLASS, |
| | "coordsPrompt": config_module.UI_TEXT["coords"]["select_point"], |
| | "waitLog": config_module.UI_TEXT["log"]["point_selection_prompt"], |
| | }, |
| | timeout=5000, |
| | ) |
| |
|
| | wait_classes = _read_elem_classes(page, "live_obs") |
| | assert wait_classes is not None |
| | assert config_module.LIVE_OBS_POINT_WAIT_CLASS in wait_classes |
| | assert _read_coords_box_value(page) == config_module.UI_TEXT["coords"]["select_point"] |
| | assert _read_log_output_value(page) == config_module.UI_TEXT["log"]["point_selection_prompt"] |
| | wait_card = _read_media_card_wait_snapshot(page) |
| | wait_transforms = _read_live_obs_transform_snapshot(page) |
| | wait_img_box = page.locator("#live_obs img").bounding_box() |
| | wait_frame_box = page.locator("#live_obs .image-frame").bounding_box() |
| | assert wait_card["opacity"] is not None and wait_card["opacity"] > 0.5 |
| | assert wait_card["animationName"] == "media-card-point-ring" |
| | assert wait_card["borderColor"] != "rgba(225, 29, 72, 0)" |
| | assert wait_transforms["imgTransform"] == "none" |
| | assert wait_transforms["frameTransform"] == "none" |
| | assert wait_img_box is not None |
| | assert wait_frame_box is not None |
| | assert wait_img_box["x"] == pytest.approx(initial_img_box["x"], abs=1.0) |
| | assert wait_img_box["y"] == pytest.approx(initial_img_box["y"], abs=1.0) |
| | assert wait_img_box["width"] == pytest.approx(initial_img_box["width"], abs=1.0) |
| | assert wait_img_box["height"] == pytest.approx(initial_img_box["height"], abs=1.0) |
| | assert wait_frame_box["x"] == pytest.approx(initial_frame_box["x"], abs=1.0) |
| | assert wait_frame_box["y"] == pytest.approx(initial_frame_box["y"], abs=1.0) |
| | assert wait_frame_box["width"] == pytest.approx(initial_frame_box["width"], abs=1.0) |
| | assert wait_frame_box["height"] == pytest.approx(initial_frame_box["height"], abs=1.0) |
| |
|
| | box = page.locator("#live_obs img").bounding_box() |
| | assert box is not None |
| | target_x = box["x"] + ((24.5) / 48.0) * box["width"] |
| | target_y = box["y"] + ((8.5) / 24.0) * box["height"] |
| | page.mouse.click(target_x, target_y) |
| |
|
| | page.wait_for_function( |
| | """(state) => { |
| | const liveObs = document.getElementById('live_obs'); |
| | const coordsRoot = document.getElementById('coords_box'); |
| | const coordsField = coordsRoot?.querySelector('textarea, input'); |
| | const logRoot = document.getElementById('log_output'); |
| | const logField = logRoot?.querySelector('textarea, input'); |
| | const coordsValue = coordsField ? coordsField.value.trim() : ''; |
| | const logValue = logField ? logField.value.trim() : (logRoot?.textContent || '').trim(); |
| | return ( |
| | !!liveObs && |
| | !liveObs.classList.contains(state.waitClass) && |
| | /^\\d+\\s*,\\s*\\d+$/.test(coordsValue) && |
| | logValue === `Select: ${state.label} | point <${coordsValue}>` |
| | ); |
| | }""", |
| | arg={ |
| | "waitClass": config_module.LIVE_OBS_POINT_WAIT_CLASS, |
| | "label": "A", |
| | }, |
| | timeout=5000, |
| | ) |
| |
|
| | coords_value = _read_coords_box_value(page) |
| | assert coords_value is not None |
| | coord_x, coord_y = [int(part.strip()) for part in coords_value.split(",", 1)] |
| | assert abs(coord_x - 24) <= 1 |
| | assert abs(coord_y - 8) <= 1 |
| | final_classes = _read_elem_classes(page, "live_obs") |
| | assert final_classes is not None |
| | assert config_module.LIVE_OBS_POINT_WAIT_CLASS not in final_classes |
| | assert config_module.LIVE_OBS_BASE_CLASS in final_classes |
| | assert _read_log_output_value(page) == config_module.UI_TEXT["log"]["point_selected_message"].format( |
| | label="A", |
| | x=coord_x, |
| | y=coord_y, |
| | ) |
| | final_card_wait = _read_media_card_wait_snapshot(page) |
| | final_transforms = _read_live_obs_transform_snapshot(page) |
| | assert final_card_wait["opacity"] == 0 |
| | assert final_card_wait["animationName"] == "none" |
| | assert final_transforms["imgTransform"] == "none" |
| | assert final_transforms["frameTransform"] == "none" |
| |
|
| | browser.close() |
| | finally: |
| | server.should_exit = True |
| | thread.join(timeout=10) |
| | demo.close() |
| |
|
| |
|
| | def test_reference_action_single_click_applies_coords_without_wait_state(monkeypatch): |
| | config_module = importlib.reload(importlib.import_module("config")) |
| | callbacks = importlib.reload(importlib.import_module("gradio_callbacks")) |
| | ui_layout = importlib.reload(importlib.import_module("ui_layout")) |
| |
|
| | fake_obs = np.zeros((24, 48, 3), dtype=np.uint8) |
| | fake_obs[:, :] = [15, 20, 25] |
| | fake_obs_img = Image.fromarray(fake_obs) |
| |
|
| | class FakeSession: |
| | env_id = "BinFill" |
| | raw_solve_options = [ |
| | {"label": "a", "action": "pick the left cube", "available": [object()]}, |
| | {"label": "b", "action": "pick the right cube", "available": [object()]}, |
| | ] |
| | available_options = [ |
| | ("a. pick the left cube", 0), |
| | ("b. pick the right cube", 1), |
| | ] |
| |
|
| | def get_pil_image(self, use_segmented=False): |
| | _ = use_segmented |
| | return fake_obs_img.copy() |
| |
|
| | def get_reference_action(self): |
| | return { |
| | "ok": True, |
| | "option_idx": 0, |
| | "option_label": "a", |
| | "option_action": "pick the left cube", |
| | "need_coords": True, |
| | "coords_xy": [5, 6], |
| | "message": "ok", |
| | } |
| |
|
| | def fake_init_app(_request=None): |
| | return ( |
| | "uid-reference-action", |
| | gr.update(visible=True), |
| | gr.update( |
| | value=fake_obs_img.copy(), |
| | interactive=False, |
| | elem_classes=config_module.get_live_obs_elem_classes(), |
| | ), |
| | config_module.UI_TEXT["log"]["action_selection_prompt"], |
| | gr.update( |
| | choices=[ |
| | ("a. pick the left cube", 0), |
| | ("b. pick the right cube", 1), |
| | ], |
| | value=None, |
| | ), |
| | "goal", |
| | gr.update( |
| | value=config_module.UI_TEXT["coords"]["not_needed"], |
| | visible=True, |
| | interactive=False, |
| | ), |
| | gr.update(value=None, visible=False), |
| | gr.update(visible=False, interactive=False), |
| | "BinFill (Episode 1)", |
| | "Completed: 0", |
| | gr.update(interactive=True), |
| | gr.update(interactive=True), |
| | gr.update(interactive=True), |
| | gr.update(visible=False), |
| | gr.update(visible=True), |
| | gr.update(visible=True), |
| | gr.update(value="hint"), |
| | gr.update(interactive=True), |
| | ) |
| |
|
| | monkeypatch.setattr(ui_layout, "init_app", fake_init_app) |
| | monkeypatch.setattr(callbacks, "get_session", lambda uid: FakeSession()) |
| |
|
| | demo = ui_layout.create_ui_blocks() |
| |
|
| | port = _free_port() |
| | host = "127.0.0.1" |
| | root_url = f"http://{host}:{port}/" |
| |
|
| | app = FastAPI(title="reference-action-single-click-test") |
| | app = gr.mount_gradio_app(app, demo, path="/") |
| |
|
| | config = uvicorn.Config(app, host=host, port=port, log_level="error") |
| | server = uvicorn.Server(config) |
| | thread = threading.Thread(target=server.run, daemon=True) |
| | thread.start() |
| | _wait_http_ready(root_url) |
| |
|
| | try: |
| | with sync_playwright() as p: |
| | browser = p.chromium.launch(headless=True) |
| | page = browser.new_page(viewport={"width": 1280, "height": 900}) |
| | page.goto(root_url, wait_until="domcontentloaded") |
| | page.wait_for_selector("#main_interface_root", state="visible", timeout=15000) |
| | page.wait_for_selector("#live_obs img", timeout=15000) |
| | page.wait_for_selector("#reference_action_btn button, button#reference_action_btn", timeout=15000) |
| |
|
| | expected_reference_log = config_module.UI_TEXT["log"]["reference_action_message_with_coords"].format( |
| | option_label="A", |
| | coords_text="5, 6", |
| | ) |
| | page.locator("#reference_action_btn button, button#reference_action_btn").first.click() |
| |
|
| | page.wait_for_function( |
| | """(state) => { |
| | const coordsRoot = document.getElementById('coords_box'); |
| | const coordsField = coordsRoot?.querySelector('textarea, input'); |
| | const logRoot = document.getElementById('log_output'); |
| | const logField = logRoot?.querySelector('textarea, input'); |
| | const liveObs = document.getElementById('live_obs'); |
| | const checked = document.querySelector('#action_radio input[type="radio"]:checked'); |
| | const coordsValue = coordsField ? coordsField.value.trim() : ''; |
| | const logValue = logField ? logField.value.trim() : (logRoot?.textContent || '').trim(); |
| | return ( |
| | !!checked && |
| | checked.value === state.checkedValue && |
| | coordsValue === state.coordsValue && |
| | logValue === state.logValue && |
| | !!liveObs && |
| | !liveObs.classList.contains(state.waitClass) |
| | ); |
| | }""", |
| | arg={ |
| | "checkedValue": "0", |
| | "coordsValue": "5, 6", |
| | "logValue": expected_reference_log, |
| | "waitClass": config_module.LIVE_OBS_POINT_WAIT_CLASS, |
| | }, |
| | timeout=5000, |
| | ) |
| |
|
| | classes_after_reference = _read_elem_classes(page, "live_obs") |
| | assert classes_after_reference is not None |
| | assert config_module.LIVE_OBS_POINT_WAIT_CLASS not in classes_after_reference |
| | assert _read_coords_box_value(page) == "5, 6" |
| | assert _read_log_output_value(page) == expected_reference_log |
| |
|
| | page.locator("#action_radio input[type='radio']").nth(1).check(force=True) |
| | page.wait_for_function( |
| | """(state) => { |
| | const coordsRoot = document.getElementById('coords_box'); |
| | const coordsField = coordsRoot?.querySelector('textarea, input'); |
| | const logRoot = document.getElementById('log_output'); |
| | const logField = logRoot?.querySelector('textarea, input'); |
| | const liveObs = document.getElementById('live_obs'); |
| | const checked = document.querySelector('#action_radio input[type="radio"]:checked'); |
| | const coordsValue = coordsField ? coordsField.value.trim() : ''; |
| | const logValue = logField ? logField.value.trim() : (logRoot?.textContent || '').trim(); |
| | return ( |
| | !!checked && |
| | checked.value === state.checkedValue && |
| | coordsValue === state.coordsValue && |
| | logValue === state.logValue && |
| | !!liveObs && |
| | liveObs.classList.contains(state.waitClass) |
| | ); |
| | }""", |
| | arg={ |
| | "checkedValue": "1", |
| | "coordsValue": config_module.UI_TEXT["coords"]["select_point"], |
| | "logValue": config_module.UI_TEXT["log"]["point_selection_prompt"], |
| | "waitClass": config_module.LIVE_OBS_POINT_WAIT_CLASS, |
| | }, |
| | timeout=5000, |
| | ) |
| |
|
| | classes_after_manual_change = _read_elem_classes(page, "live_obs") |
| | assert classes_after_manual_change is not None |
| | assert config_module.LIVE_OBS_POINT_WAIT_CLASS in classes_after_manual_change |
| | assert _read_coords_box_value(page) == config_module.UI_TEXT["coords"]["select_point"] |
| | assert _read_log_output_value(page) == config_module.UI_TEXT["log"]["point_selection_prompt"] |
| |
|
| | browser.close() |
| | finally: |
| | server.should_exit = True |
| | thread.join(timeout=10) |
| | demo.close() |
| |
|
| |
|
| | def test_live_obs_client_resize_fills_width_and_keeps_click_mapping(monkeypatch): |
| | callbacks = importlib.reload(importlib.import_module("gradio_callbacks")) |
| | ui_layout = importlib.reload(importlib.import_module("ui_layout")) |
| |
|
| | fake_obs = np.zeros((24, 48, 3), dtype=np.uint8) |
| | fake_obs_img = Image.fromarray(fake_obs) |
| |
|
| | class FakeSession: |
| | raw_solve_options = [{"available": True}] |
| |
|
| | def get_pil_image(self, use_segmented=False): |
| | _ = use_segmented |
| | return fake_obs_img.copy() |
| |
|
| | def fake_init_app(_request=None): |
| | return ( |
| | "uid-live-obs-resize", |
| | gr.update(visible=True), |
| | gr.update(value=fake_obs_img.copy(), interactive=False), |
| | "ready", |
| | gr.update(choices=[("pick", 0)], value=0), |
| | "goal", |
| | gr.update( |
| | value="please click the point selection image", |
| | visible=True, |
| | interactive=False, |
| | ), |
| | gr.update(value=None, visible=False), |
| | gr.update(visible=False, interactive=False), |
| | "ResizeEnv (Episode 1)", |
| | "Completed: 0", |
| | gr.update(interactive=True), |
| | gr.update(interactive=True), |
| | gr.update(interactive=True), |
| | gr.update(visible=False), |
| | gr.update(visible=True), |
| | gr.update(visible=True), |
| | gr.update(value="hint"), |
| | gr.update(interactive=True), |
| | ) |
| |
|
| | monkeypatch.setattr(ui_layout, "init_app", fake_init_app) |
| | monkeypatch.setattr(callbacks, "get_session", lambda uid: FakeSession()) |
| |
|
| | demo = ui_layout.create_ui_blocks() |
| |
|
| | port = _free_port() |
| | host = "127.0.0.1" |
| | root_url = f"http://{host}:{port}/" |
| |
|
| | app = FastAPI(title="live-obs-client-resize-test") |
| | app = gr.mount_gradio_app(app, demo, path="/") |
| |
|
| | config = uvicorn.Config(app, host=host, port=port, log_level="error") |
| | server = uvicorn.Server(config) |
| | thread = threading.Thread(target=server.run, daemon=True) |
| | thread.start() |
| | _wait_http_ready(root_url) |
| |
|
| | try: |
| | with sync_playwright() as p: |
| | browser = p.chromium.launch(headless=True) |
| | page = browser.new_page(viewport={"width": 1280, "height": 900}) |
| | page.goto(root_url, wait_until="domcontentloaded") |
| | page.wait_for_selector("#main_interface_root", state="visible", timeout=15000) |
| | page.wait_for_selector("#live_obs img", timeout=15000) |
| | page.wait_for_selector("#coords_box textarea, #coords_box input", timeout=15000) |
| | page.wait_for_function( |
| | """() => { |
| | const container = document.querySelector('#live_obs .image-container'); |
| | const img = document.querySelector('#live_obs img'); |
| | if (!container || !img) return false; |
| | const containerRect = container.getBoundingClientRect(); |
| | const imgRect = img.getBoundingClientRect(); |
| | return imgRect.width > 200 && Math.abs(containerRect.width - imgRect.width) <= 2; |
| | }""", |
| | timeout=10000, |
| | ) |
| |
|
| | initial_geometry = _read_live_obs_geometry(page) |
| | assert initial_geometry["container"] is not None |
| | assert initial_geometry["img"] is not None |
| | assert initial_geometry["uploadContainer"] is not None |
| | assert initial_geometry["frame"] is not None |
| | assert initial_geometry["img"]["width"] > 200 |
| | assert abs(initial_geometry["container"]["width"] - initial_geometry["img"]["width"]) <= 2 |
| | assert abs(initial_geometry["uploadContainer"]["width"] - initial_geometry["img"]["width"]) <= 2 |
| | assert abs(initial_geometry["frame"]["width"] - initial_geometry["img"]["width"]) <= 2 |
| | assert initial_geometry["img"]["width"] / initial_geometry["img"]["height"] == pytest.approx(2.0, rel=0.02) |
| |
|
| | page.set_viewport_size({"width": 1024, "height": 900}) |
| | page.wait_for_function( |
| | """(prevWidth) => { |
| | const container = document.querySelector('#live_obs .image-container'); |
| | const img = document.querySelector('#live_obs img'); |
| | if (!container || !img) return false; |
| | const containerRect = container.getBoundingClientRect(); |
| | const imgRect = img.getBoundingClientRect(); |
| | return imgRect.width < prevWidth - 20 && Math.abs(containerRect.width - imgRect.width) <= 2; |
| | }""", |
| | arg=initial_geometry["img"]["width"], |
| | timeout=10000, |
| | ) |
| |
|
| | resized_geometry = _read_live_obs_geometry(page) |
| | assert resized_geometry["img"] is not None |
| | assert resized_geometry["container"] is not None |
| | assert resized_geometry["img"]["width"] < initial_geometry["img"]["width"] - 20 |
| | assert abs(resized_geometry["container"]["width"] - resized_geometry["img"]["width"]) <= 2 |
| | assert resized_geometry["img"]["width"] / resized_geometry["img"]["height"] == pytest.approx(2.0, rel=0.02) |
| |
|
| | box = page.locator("#live_obs img").bounding_box() |
| | assert box is not None |
| | target_x = box["x"] + ((36.5) / 48.0) * box["width"] |
| | target_y = box["y"] + ((12.5) / 24.0) * box["height"] |
| | page.mouse.click(target_x, target_y) |
| | page.wait_for_function( |
| | """() => { |
| | const root = document.getElementById('coords_box'); |
| | const field = root?.querySelector('textarea, input'); |
| | return !!field && /^\\d+\\s*,\\s*\\d+$/.test(field.value.trim()); |
| | }""", |
| | timeout=5000, |
| | ) |
| | coords_value = _read_coords_box_value(page) |
| | assert coords_value is not None |
| | coord_x, coord_y = [int(part.strip()) for part in coords_value.split(",", 1)] |
| | assert abs(coord_x - 36) <= 1 |
| | assert abs(coord_y - 12) <= 1 |
| |
|
| | browser.close() |
| | finally: |
| | server.should_exit = True |
| | thread.join(timeout=10) |
| | demo.close() |
| |
|
| |
|
| | def test_live_obs_client_resize_after_hidden_phase_becomes_visible(tmp_path): |
| | ui_layout = importlib.reload(importlib.import_module("ui_layout")) |
| |
|
| | full_red = np.zeros((256, 256, 3), dtype=np.uint8) |
| | full_red[:, :] = [255, 0, 0] |
| |
|
| | with gr.Blocks() as demo: |
| | demo.css = ui_layout.CSS |
| |
|
| | show_btn = gr.Button("Show", elem_id="show_btn") |
| |
|
| | with gr.Column(visible=False, elem_id="action_phase_group") as action_phase_group: |
| | gr.Image( |
| | value=full_red, |
| | elem_id="live_obs", |
| | elem_classes=["live-obs-resizable"], |
| | buttons=[], |
| | sources=[], |
| | ) |
| |
|
| | demo.load( |
| | fn=None, |
| | js=ui_layout.LIVE_OBS_CLIENT_RESIZE_JS, |
| | queue=False, |
| | ) |
| |
|
| | show_btn.click( |
| | fn=lambda: gr.update(visible=True), |
| | outputs=[action_phase_group], |
| | queue=False, |
| | ) |
| |
|
| | port = _free_port() |
| | host = "127.0.0.1" |
| | root_url = f"http://{host}:{port}/" |
| |
|
| | app = FastAPI(title="live-obs-hidden-phase-resize-test") |
| | app = gr.mount_gradio_app(app, demo, path="/") |
| |
|
| | config = uvicorn.Config(app, host=host, port=port, log_level="error") |
| | server = uvicorn.Server(config) |
| | thread = threading.Thread(target=server.run, daemon=True) |
| | thread.start() |
| | _wait_http_ready(root_url) |
| | screenshot_path = tmp_path / "live_obs_hidden_phase.png" |
| |
|
| | try: |
| | with sync_playwright() as p: |
| | browser = p.chromium.launch(headless=True) |
| | page = browser.new_page(viewport={"width": 1440, "height": 900}) |
| | page.goto(root_url, wait_until="domcontentloaded") |
| | page.wait_for_function( |
| | "() => !!window.__robommeLiveObsResizerInstalled", |
| | timeout=5000, |
| | ) |
| |
|
| | page.click("#show_btn") |
| | page.wait_for_selector("#live_obs img", timeout=10000) |
| | page.wait_for_function( |
| | """() => { |
| | const container = document.querySelector('#live_obs .image-container'); |
| | const img = document.querySelector('#live_obs img'); |
| | if (!container || !img) return false; |
| | const containerRect = container.getBoundingClientRect(); |
| | const imgRect = img.getBoundingClientRect(); |
| | return imgRect.width > 300 && Math.abs(containerRect.width - imgRect.width) <= 2; |
| | }""", |
| | timeout=10000, |
| | ) |
| |
|
| | geometry = _read_live_obs_geometry(page) |
| | assert geometry["container"] is not None |
| | assert geometry["img"] is not None |
| | assert geometry["img"]["width"] > 300 |
| | assert abs(geometry["container"]["width"] - geometry["img"]["width"]) <= 2 |
| | object_fit = page.evaluate( |
| | """() => getComputedStyle(document.querySelector('#live_obs img')).objectFit""" |
| | ) |
| | assert object_fit == "contain" |
| |
|
| | page.locator("#live_obs img").screenshot(path=str(screenshot_path)) |
| |
|
| | browser.close() |
| | finally: |
| | server.should_exit = True |
| | thread.join(timeout=10) |
| | demo.close() |
| |
|
| | screenshot = Image.open(screenshot_path).convert("RGB") |
| | width, height = screenshot.size |
| | samples = [ |
| | screenshot.getpixel((width // 2, height // 2)), |
| | screenshot.getpixel((max(1, width // 10), height // 2)), |
| | screenshot.getpixel((min(width - 2, (width * 9) // 10), height // 2)), |
| | ] |
| | for pixel in samples: |
| | assert pixel[0] > 200 |
| | assert pixel[1] < 30 |
| | assert pixel[2] < 30 |
| |
|
| |
|
| | def test_header_task_shows_env_after_init(monkeypatch): |
| | ui_layout = importlib.reload(importlib.import_module("ui_layout")) |
| |
|
| | fake_obs = np.zeros((24, 24, 3), dtype=np.uint8) |
| | fake_obs_img = Image.fromarray(fake_obs) |
| |
|
| | def fake_init_app(request=None): |
| | _ = request |
| | return ( |
| | "uid-auto", |
| | gr.update(visible=True), |
| | gr.update(value=fake_obs_img, interactive=False), |
| | "ready", |
| | gr.update(choices=[("pick", 0)], value=None), |
| | "goal", |
| | "No need for coordinates", |
| | gr.update(value=None, visible=False), |
| | gr.update(visible=False, interactive=False), |
| | "PickXtimes (Episode 1)", |
| | "Completed: 0", |
| | gr.update(interactive=True), |
| | gr.update(interactive=True), |
| | gr.update(interactive=True), |
| | gr.update(visible=False), |
| | gr.update(visible=True), |
| | gr.update(visible=True), |
| | gr.update(value="hint"), |
| | gr.update(interactive=True), |
| | ) |
| |
|
| | monkeypatch.setattr(ui_layout, "init_app", fake_init_app) |
| |
|
| | demo = ui_layout.create_ui_blocks() |
| |
|
| | port = _free_port() |
| | host = "127.0.0.1" |
| | root_url = f"http://{host}:{port}/" |
| |
|
| | app = FastAPI(title="header-task-url-auto-login-test") |
| | app = gr.mount_gradio_app(app, demo, path="/") |
| |
|
| | config = uvicorn.Config(app, host=host, port=port, log_level="error") |
| | server = uvicorn.Server(config) |
| | thread = threading.Thread(target=server.run, daemon=True) |
| | thread.start() |
| | _wait_http_ready(root_url) |
| |
|
| | try: |
| | with sync_playwright() as p: |
| | browser = p.chromium.launch(headless=True) |
| | page = browser.new_page(viewport={"width": 1280, "height": 900}) |
| | page.goto(f"{root_url}?user=user1", wait_until="domcontentloaded") |
| | page.wait_for_selector("#main_interface_root", state="visible", timeout=15000) |
| | page.wait_for_function( |
| | """() => { |
| | const root = document.getElementById('header_task'); |
| | const input = root ? root.querySelector('input') : null; |
| | return !!input && input.value.trim() === 'PickXtimes'; |
| | }""", |
| | timeout=5000, |
| | ) |
| | assert _read_header_task_value(page) == "PickXtimes" |
| | assert _read_header_goal_value(page) == "Goal" |
| | browser.close() |
| | finally: |
| | server.should_exit = True |
| | thread.join(timeout=10) |
| | demo.close() |
| |
|
| |
|
| | def test_header_goal_capitalizes_displayed_value_after_init(monkeypatch): |
| | ui_layout = importlib.reload(importlib.import_module("ui_layout")) |
| |
|
| | fake_obs = np.zeros((24, 24, 3), dtype=np.uint8) |
| | fake_obs_img = Image.fromarray(fake_obs) |
| |
|
| | def fake_init_app(request=None): |
| | _ = request |
| | return ( |
| | "uid-auto", |
| | gr.update(visible=True), |
| | gr.update(value=fake_obs_img, interactive=False), |
| | "ready", |
| | gr.update(choices=[("pick", 0)], value=None), |
| | "place cube on target", |
| | "No need for coordinates", |
| | gr.update(value=None, visible=False), |
| | gr.update(visible=False, interactive=False), |
| | "PickXtimes (Episode 1)", |
| | "Completed: 0", |
| | gr.update(interactive=True), |
| | gr.update(interactive=True), |
| | gr.update(interactive=True), |
| | gr.update(visible=False), |
| | gr.update(visible=True), |
| | gr.update(visible=True), |
| | gr.update(value="hint"), |
| | gr.update(interactive=True), |
| | ) |
| |
|
| | monkeypatch.setattr(ui_layout, "init_app", fake_init_app) |
| |
|
| | demo = ui_layout.create_ui_blocks() |
| |
|
| | port = _free_port() |
| | host = "127.0.0.1" |
| | root_url = f"http://{host}:{port}/" |
| |
|
| | app = FastAPI(title="header-goal-capitalization-test") |
| | app = gr.mount_gradio_app(app, demo, path="/") |
| |
|
| | config = uvicorn.Config(app, host=host, port=port, log_level="error") |
| | server = uvicorn.Server(config) |
| | thread = threading.Thread(target=server.run, daemon=True) |
| | thread.start() |
| | _wait_http_ready(root_url) |
| |
|
| | try: |
| | with sync_playwright() as p: |
| | browser = p.chromium.launch(headless=True) |
| | page = browser.new_page(viewport={"width": 1280, "height": 900}) |
| | page.goto(root_url, wait_until="domcontentloaded") |
| | page.wait_for_selector("#main_interface_root", state="visible", timeout=15000) |
| | page.wait_for_function( |
| | """() => { |
| | const root = document.getElementById('header_goal'); |
| | const input = root ? root.querySelector('textarea, input') : null; |
| | return !!input && input.value.trim() === 'Place cube on target'; |
| | }""", |
| | timeout=5000, |
| | ) |
| | assert _read_header_goal_value(page) == "Place cube on target" |
| | browser.close() |
| | finally: |
| | server.should_exit = True |
| | thread.join(timeout=10) |
| | demo.close() |
| |
|
| |
|
| | @pytest.mark.parametrize( |
| | "task_info_text,expected_header_value", |
| | [ |
| | ("pickxtimes (Episode 1)", "PickXtimes"), |
| | ("EnvFromSessionOnly (Episode 1)", "EnvFromSessionOnly"), |
| | ], |
| | ) |
| | def test_header_task_env_normalization_and_fallback(monkeypatch, task_info_text, expected_header_value): |
| | ui_layout = importlib.reload(importlib.import_module("ui_layout")) |
| |
|
| | fake_obs = np.zeros((24, 24, 3), dtype=np.uint8) |
| | fake_obs_img = Image.fromarray(fake_obs) |
| |
|
| | def fake_init_app(_request=None): |
| | return ( |
| | "uid-auto", |
| | gr.update(visible=True), |
| | gr.update(value=fake_obs_img, interactive=False), |
| | "ready", |
| | gr.update(choices=[("pick", 0)], value=None), |
| | "goal", |
| | "No need for coordinates", |
| | gr.update(value=None, visible=False), |
| | gr.update(visible=False, interactive=False), |
| | task_info_text, |
| | "Completed: 0", |
| | gr.update(interactive=True), |
| | gr.update(interactive=True), |
| | gr.update(interactive=True), |
| | gr.update(visible=False), |
| | gr.update(visible=True), |
| | gr.update(visible=True), |
| | gr.update(value="hint"), |
| | gr.update(interactive=True), |
| | ) |
| |
|
| | monkeypatch.setattr(ui_layout, "init_app", fake_init_app) |
| |
|
| | demo = ui_layout.create_ui_blocks() |
| |
|
| | port = _free_port() |
| | host = "127.0.0.1" |
| | root_url = f"http://{host}:{port}/" |
| |
|
| | app = FastAPI(title="header-task-normalization-fallback-test") |
| | app = gr.mount_gradio_app(app, demo, path="/") |
| |
|
| | config = uvicorn.Config(app, host=host, port=port, log_level="error") |
| | server = uvicorn.Server(config) |
| | thread = threading.Thread(target=server.run, daemon=True) |
| | thread.start() |
| | _wait_http_ready(root_url) |
| |
|
| | try: |
| | with sync_playwright() as p: |
| | browser = p.chromium.launch(headless=True) |
| | page = browser.new_page(viewport={"width": 1280, "height": 900}) |
| | page.goto(root_url, wait_until="domcontentloaded") |
| | page.wait_for_selector("#main_interface_root", state="visible", timeout=15000) |
| | page.wait_for_function( |
| | """(expectedValue) => { |
| | const root = document.getElementById('header_task'); |
| | const input = root ? root.querySelector('input') : null; |
| | return !!input && input.value.trim() === expectedValue; |
| | }""", |
| | arg=expected_header_value, |
| | timeout=5000, |
| | ) |
| | assert _read_header_task_value(page) == expected_header_value |
| | browser.close() |
| | finally: |
| | server.should_exit = True |
| | thread.join(timeout=10) |
| | demo.close() |
| |
|
| |
|
| | def test_header_task_switch_to_video_task_shows_demo_phase(monkeypatch): |
| | ui_layout = importlib.reload(importlib.import_module("ui_layout")) |
| |
|
| | fake_obs = np.zeros((24, 24, 3), dtype=np.uint8) |
| | fake_obs_img = Image.fromarray(fake_obs) |
| | demo_video_path = gr.get_video("world.mp4") |
| | switch_calls = [] |
| |
|
| | def _pick_task_response(uid, task_name, show_video): |
| | return ( |
| | uid, |
| | gr.update(visible=True), |
| | gr.update(value=fake_obs_img, interactive=False), |
| | "demo prompt" if show_video else "ready", |
| | gr.update(choices=[("pick", 0)], value=None), |
| | "video goal" if show_video else "goal", |
| | "No need for coordinates", |
| | gr.update(value=demo_video_path if show_video else None, visible=show_video), |
| | gr.update(visible=show_video, interactive=show_video), |
| | f"{task_name} (Episode 1)", |
| | "Completed: 0", |
| | gr.update(interactive=True), |
| | gr.update(interactive=True), |
| | gr.update(interactive=True), |
| | gr.update(visible=show_video), |
| | gr.update(visible=not show_video), |
| | gr.update(visible=not show_video), |
| | gr.update(value="video hint" if show_video else "hint"), |
| | gr.update(interactive=True), |
| | ) |
| |
|
| | def fake_init_app(request=None): |
| | _ = request |
| | return _pick_task_response("uid-header-video", "PickXtimes", show_video=False) |
| |
|
| | def fake_switch_env_wrapper(uid, selected_env): |
| | switch_calls.append((uid, selected_env)) |
| | return _pick_task_response( |
| | uid, |
| | selected_env, |
| | show_video=selected_env == "VideoPlaceButton", |
| | ) |
| |
|
| | monkeypatch.setattr(ui_layout, "init_app", fake_init_app) |
| | monkeypatch.setattr(ui_layout, "switch_env_wrapper", fake_switch_env_wrapper) |
| | monkeypatch.setattr(ui_layout.user_manager, "env_choices", ["PickXtimes", "VideoPlaceButton"]) |
| |
|
| | demo = ui_layout.create_ui_blocks() |
| |
|
| | port = _free_port() |
| | host = "127.0.0.1" |
| | root_url = f"http://{host}:{port}/" |
| |
|
| | app = FastAPI(title="header-task-switch-video-phase-test") |
| | app = gr.mount_gradio_app(app, demo, path="/") |
| |
|
| | config = uvicorn.Config(app, host=host, port=port, log_level="error") |
| | server = uvicorn.Server(config) |
| | thread = threading.Thread(target=server.run, daemon=True) |
| | thread.start() |
| | _wait_http_ready(root_url) |
| |
|
| | try: |
| | with sync_playwright() as p: |
| | browser = p.chromium.launch(headless=True) |
| | page = browser.new_page(viewport={"width": 1280, "height": 900}) |
| | page.goto(root_url, wait_until="domcontentloaded") |
| | page.wait_for_selector("#main_interface_root", state="visible", timeout=15000) |
| | page.wait_for_function( |
| | """() => { |
| | const root = document.getElementById('header_task'); |
| | const input = root ? root.querySelector('input') : null; |
| | return !!input && input.value.trim() === 'PickXtimes'; |
| | }""", |
| | timeout=5000, |
| | ) |
| | assert switch_calls == [] |
| |
|
| | page.click("#header_task input") |
| | page.get_by_role("option", name="VideoPlaceButton").click() |
| |
|
| | page.wait_for_function( |
| | """() => { |
| | const visible = (id) => { |
| | const el = document.getElementById(id); |
| | if (!el) return false; |
| | const st = getComputedStyle(el); |
| | return st.display !== 'none' && st.visibility !== 'hidden' && el.getClientRects().length > 0; |
| | }; |
| | const videoEl = document.querySelector('#demo_video video'); |
| | const button = |
| | document.querySelector('#watch_demo_video_btn button') || |
| | document.querySelector('button#watch_demo_video_btn'); |
| | return ( |
| | visible('video_phase_group') && |
| | visible('demo_video') && |
| | visible('watch_demo_video_btn') && |
| | !visible('action_phase_group') && |
| | !visible('control_panel_group') && |
| | !!(videoEl && videoEl.currentSrc) && |
| | !!button && |
| | button.disabled === false && |
| | videoEl.paused === true && |
| | videoEl.autoplay === false |
| | ); |
| | }""", |
| | timeout=10000, |
| | ) |
| |
|
| | phase_after_switch = _read_phase_visibility(page) |
| | assert phase_after_switch["videoPhase"] is True |
| | assert phase_after_switch["video"] is True |
| | assert phase_after_switch["watchButton"] is True |
| | assert phase_after_switch["actionPhase"] is False |
| | assert phase_after_switch["controlPhase"] is False |
| | assert phase_after_switch["currentSrc"] |
| | assert switch_calls == [("uid-header-video", "VideoPlaceButton")] |
| |
|
| | page.wait_for_timeout(1500) |
| | assert switch_calls == [("uid-header-video", "VideoPlaceButton")] |
| | assert _read_header_task_value(page) == "VideoPlaceButton" |
| |
|
| | _click_demo_video_button(page) |
| | page.wait_for_function( |
| | """() => { |
| | const button = |
| | document.querySelector('#watch_demo_video_btn button') || |
| | document.querySelector('button#watch_demo_video_btn'); |
| | return !!button && button.disabled === true; |
| | }""", |
| | timeout=5000, |
| | ) |
| |
|
| | did_dispatch_end = _dispatch_video_event(page, "ended") |
| | assert did_dispatch_end |
| |
|
| | page.wait_for_function( |
| | """() => { |
| | const visible = (id) => { |
| | const el = document.getElementById(id); |
| | if (!el) return false; |
| | const st = getComputedStyle(el); |
| | return st.display !== 'none' && st.visibility !== 'hidden' && el.getClientRects().length > 0; |
| | }; |
| | return ( |
| | !visible('video_phase_group') && |
| | !visible('demo_video') && |
| | !visible('watch_demo_video_btn') && |
| | visible('action_phase_group') && |
| | visible('control_panel_group') && |
| | visible('live_obs') && |
| | visible('action_radio') |
| | ); |
| | }""", |
| | timeout=5000, |
| | ) |
| |
|
| | phase_after_end = _read_phase_visibility(page) |
| | assert phase_after_end["videoPhase"] is False |
| | assert phase_after_end["video"] is False |
| | assert phase_after_end["watchButton"] is False |
| | assert phase_after_end["actionPhase"] is True |
| | assert phase_after_end["action"] is True |
| | assert phase_after_end["controlPhase"] is True |
| | assert phase_after_end["control"] is True |
| |
|
| | browser.close() |
| | finally: |
| | server.should_exit = True |
| | thread.join(timeout=10) |
| | demo.close() |
| |
|
| |
|
| | def _run_local_execute_video_transition_test( |
| | *, |
| | status_text, |
| | done, |
| | expect_terminal_buttons_disabled, |
| | expected_terminal_log=None, |
| | ): |
| | import gradio_callbacks as cb |
| | import config as config_module |
| |
|
| | ui_layout = importlib.reload(importlib.import_module("ui_layout")) |
| | demo_video_path = gr.get_video("world.mp4") |
| | fake_obs = np.zeros((24, 24, 3), dtype=np.uint8) |
| |
|
| | class FakeSession: |
| | def __init__(self): |
| | self.env_id = "BinFill" |
| | self.episode_idx = 1 |
| | self.language_goal = "place cube on target" |
| | self.available_options = [("pick", 0), ("point", 1)] |
| | self.raw_solve_options = [ |
| | {"label": "a", "action": "pick", "available": False}, |
| | {"label": "b", "action": "point", "available": [object()]}, |
| | ] |
| | self.demonstration_frames = [] |
| | self.last_execution_frames = [] |
| | self.base_frames = [fake_obs.copy()] |
| | self.non_demonstration_task_length = None |
| | self.difficulty = "easy" |
| | self.seed = 123 |
| |
|
| | def get_pil_image(self, use_segmented=False): |
| | _ = use_segmented |
| | return fake_obs.copy() |
| |
|
| | def update_observation(self, use_segmentation=False): |
| | _ = use_segmentation |
| | return None |
| |
|
| | def execute_action(self, option_idx, click_coords): |
| | _ = option_idx, click_coords |
| | self.last_execution_frames = [fake_obs.copy() for _ in range(3)] |
| | self.base_frames.extend(self.last_execution_frames) |
| | return fake_obs.copy(), status_text, done |
| |
|
| | originals = { |
| | "get_session": cb.get_session, |
| | "increment_execute_count": cb.increment_execute_count, |
| | "save_video": cb.save_video, |
| | } |
| |
|
| | fake_session = FakeSession() |
| |
|
| | cb.get_session = lambda uid: fake_session |
| | cb.increment_execute_count = lambda uid, env_id, ep_num: 1 |
| | cb.save_video = lambda frames, suffix="": demo_video_path |
| |
|
| | try: |
| | with gr.Blocks(title="Native phase machine local video test") as demo: |
| | uid_state = gr.State(value="uid-local-video") |
| | phase_state = gr.State(value="action_point") |
| | post_execute_controls_state = gr.State( |
| | value={ |
| | "exec_btn_interactive": True, |
| | "reference_action_interactive": True, |
| | } |
| | ) |
| | post_execute_log_state = gr.State( |
| | value={ |
| | "preserve_terminal_log": False, |
| | "terminal_log_value": None, |
| | "preserve_execute_video_log": False, |
| | "execute_video_log_value": None, |
| | } |
| | ) |
| | suppress_state = gr.State(value=False) |
| | with gr.Column(visible=True, elem_id="main_interface") as main_interface: |
| | with gr.Column(visible=False, elem_id="video_phase_group") as video_phase_group: |
| | video_display = gr.Video(value=None, elem_id="demo_video", autoplay=False) |
| | watch_demo_video_btn = gr.Button( |
| | "Watch Video Input🎬", |
| | elem_id="watch_demo_video_btn", |
| | interactive=False, |
| | visible=False, |
| | ) |
| |
|
| | with gr.Column(visible=False, elem_id="execution_video_group") as execution_video_group: |
| | execute_video_display = gr.Video(value=None, elem_id="execute_video", autoplay=True) |
| |
|
| | with gr.Column(visible=True, elem_id="action_phase_group") as action_phase_group: |
| | img_display = gr.Image(value=fake_obs.copy(), elem_id="live_obs") |
| |
|
| | with gr.Column(visible=True, elem_id="control_panel_group") as control_panel_group: |
| | options_radio = gr.Radio(choices=[("pick", 0), ("point", 1)], value=None, elem_id="action_radio") |
| | coords_box = gr.Textbox(config_module.UI_TEXT["coords"]["not_needed"], elem_id="coords_box") |
| | exec_btn = gr.Button("execute", interactive=True, elem_id="exec_btn") |
| | reference_action_btn = gr.Button("reference", interactive=True, elem_id="reference_action_btn") |
| | restart_episode_btn = gr.Button("restart", interactive=True, elem_id="restart_episode_btn") |
| | next_task_btn = gr.Button("next", interactive=True, elem_id="next_task_btn") |
| | task_hint_display = gr.Textbox("hint", interactive=True, elem_id="task_hint_display") |
| |
|
| | log_output = gr.Markdown("", elem_id="log_output") |
| | task_info_box = gr.Textbox("") |
| | progress_info_box = gr.Textbox("") |
| |
|
| | exec_btn.click( |
| | fn=cb.precheck_execute_inputs, |
| | inputs=[uid_state, options_radio, coords_box], |
| | outputs=[], |
| | queue=False, |
| | ).then( |
| | fn=cb.switch_to_execute_phase, |
| | inputs=[uid_state], |
| | outputs=[ |
| | options_radio, |
| | exec_btn, |
| | restart_episode_btn, |
| | next_task_btn, |
| | img_display, |
| | reference_action_btn, |
| | task_hint_display, |
| | ], |
| | queue=False, |
| | ).then( |
| | fn=cb.execute_step, |
| | inputs=[uid_state, options_radio, coords_box], |
| | outputs=[ |
| | img_display, |
| | log_output, |
| | task_info_box, |
| | progress_info_box, |
| | restart_episode_btn, |
| | next_task_btn, |
| | exec_btn, |
| | execute_video_display, |
| | action_phase_group, |
| | control_panel_group, |
| | execution_video_group, |
| | options_radio, |
| | coords_box, |
| | reference_action_btn, |
| | task_hint_display, |
| | post_execute_controls_state, |
| | post_execute_log_state, |
| | phase_state, |
| | ], |
| | queue=False, |
| | ) |
| | options_radio.change( |
| | fn=cb.on_option_select, |
| | inputs=[uid_state, options_radio, coords_box, suppress_state, post_execute_log_state], |
| | outputs=[coords_box, img_display, log_output, suppress_state, post_execute_log_state], |
| | queue=False, |
| | ) |
| | img_display.select( |
| | fn=cb.on_map_click, |
| | inputs=[uid_state, options_radio], |
| | outputs=[img_display, coords_box, log_output], |
| | queue=False, |
| | ) |
| |
|
| | execute_video_display.end( |
| | fn=cb.on_execute_video_end_transition, |
| | inputs=[uid_state, post_execute_controls_state, post_execute_log_state], |
| | outputs=[ |
| | execution_video_group, |
| | action_phase_group, |
| | control_panel_group, |
| | options_radio, |
| | exec_btn, |
| | restart_episode_btn, |
| | next_task_btn, |
| | img_display, |
| | log_output, |
| | reference_action_btn, |
| | task_hint_display, |
| | post_execute_log_state, |
| | phase_state, |
| | ], |
| | queue=False, |
| | ) |
| | execute_video_display.stop( |
| | fn=cb.on_execute_video_end_transition, |
| | inputs=[uid_state, post_execute_controls_state, post_execute_log_state], |
| | outputs=[ |
| | execution_video_group, |
| | action_phase_group, |
| | control_panel_group, |
| | options_radio, |
| | exec_btn, |
| | restart_episode_btn, |
| | next_task_btn, |
| | img_display, |
| | log_output, |
| | reference_action_btn, |
| | task_hint_display, |
| | post_execute_log_state, |
| | phase_state, |
| | ], |
| | queue=False, |
| | ) |
| |
|
| | port = _free_port() |
| | host = "127.0.0.1" |
| | root_url = f"http://{host}:{port}/" |
| |
|
| | app = FastAPI(title="native-phase-machine-local-video-test") |
| | app = gr.mount_gradio_app(app, demo, path="/") |
| |
|
| | config = uvicorn.Config(app, host=host, port=port, log_level="error") |
| | server = uvicorn.Server(config) |
| | thread = threading.Thread(target=server.run, daemon=True) |
| | thread.start() |
| | _wait_http_ready(root_url) |
| |
|
| | try: |
| | with sync_playwright() as p: |
| | browser = p.chromium.launch(headless=True) |
| | page = browser.new_page(viewport={"width": 1280, "height": 900}) |
| | page.goto(root_url, wait_until="domcontentloaded") |
| | page.wait_for_selector("#main_interface", state="visible", timeout=20000) |
| | page.locator("#action_radio input[type='radio']").nth(1).check(force=True) |
| | page.wait_for_function( |
| | """(state) => { |
| | const coordsRoot = document.getElementById('coords_box'); |
| | const coordsField = coordsRoot?.querySelector('textarea, input'); |
| | const logRoot = document.getElementById('log_output'); |
| | const logField = logRoot?.querySelector('textarea, input'); |
| | const coordsValue = coordsField ? coordsField.value.trim() : ''; |
| | const logValue = logField ? logField.value.trim() : (logRoot?.textContent || '').trim(); |
| | return coordsValue === state.coordsPrompt && logValue === state.waitLog; |
| | }""", |
| | arg={ |
| | "coordsPrompt": config_module.UI_TEXT["coords"]["select_point"], |
| | "waitLog": config_module.UI_TEXT["log"]["point_selection_prompt"], |
| | }, |
| | timeout=5000, |
| | ) |
| | box = page.locator("#live_obs img").bounding_box() |
| | assert box is not None |
| | page.mouse.click(box["x"] + box["width"] / 2, box["y"] + box["height"] / 2) |
| | page.wait_for_function( |
| | """(state) => { |
| | const coordsRoot = document.getElementById('coords_box'); |
| | const coordsField = coordsRoot?.querySelector('textarea, input'); |
| | const logRoot = document.getElementById('log_output'); |
| | const logField = logRoot?.querySelector('textarea, input'); |
| | const coordsValue = coordsField ? coordsField.value.trim() : ''; |
| | const logValue = logField ? logField.value.trim() : (logRoot?.textContent || '').trim(); |
| | return /^\\d+\\s*,\\s*\\d+$/.test(coordsValue) && logValue === `Select: ${state.label} | point <${coordsValue}>`; |
| | }""", |
| | arg={"label": "B"}, |
| | timeout=5000, |
| | ) |
| | selected_coords = _read_coords_box_value(page) |
| | assert selected_coords is not None |
| | page.locator("#exec_btn button, button#exec_btn").first.click() |
| | page.wait_for_selector("#execute_video video", timeout=5000) |
| | page.wait_for_function( |
| | """() => { |
| | const visible = (id) => { |
| | const el = document.getElementById(id); |
| | if (!el) return false; |
| | const st = getComputedStyle(el); |
| | return st.display !== 'none' && st.visibility !== 'hidden' && el.getClientRects().length > 0; |
| | }; |
| | const videoEl = document.querySelector('#execute_video video'); |
| | return ( |
| | visible('execution_video_group') && |
| | visible('execute_video') && |
| | !visible('action_phase_group') && |
| | visible('control_panel_group') && |
| | !!videoEl && |
| | videoEl.autoplay === true && |
| | (videoEl.paused === false || videoEl.currentTime > 0) |
| | ); |
| | }""", |
| | timeout=10000, |
| | ) |
| | if not done: |
| | execution_log = _read_log_output_value(page) |
| | assert execution_log == config_module.UI_TEXT["log"]["execute_action_prompt_with_coords"].format( |
| | label="B", |
| | coords_text=selected_coords, |
| | ) |
| | assert execution_log != config_module.UI_TEXT["log"]["point_selection_prompt"] |
| | controls_after_execute = _read_demo_video_controls(page, elem_id="execute_video", button_elem_id=None) |
| | assert controls_after_execute["autoplay"] is True |
| | assert controls_after_execute["paused"] is False |
| | panel_snapshot = page.evaluate( |
| | """() => { |
| | const resolveButton = (id) => { |
| | return document.querySelector(`#${id} button`) || document.querySelector(`button#${id}`); |
| | }; |
| | const radio = document.querySelector('#action_radio input[type="radio"]'); |
| | const refBtn = resolveButton('reference_action_btn'); |
| | const restartBtn = resolveButton('restart_episode_btn'); |
| | const nextBtn = resolveButton('next_task_btn'); |
| | const hint = document.querySelector('#task_hint_display textarea, #task_hint_display input'); |
| | return { |
| | radioDisabled: radio ? radio.disabled : null, |
| | refDisabled: refBtn ? refBtn.disabled : null, |
| | restartDisabled: restartBtn ? restartBtn.disabled : null, |
| | nextDisabled: nextBtn ? nextBtn.disabled : null, |
| | hintDisabled: hint ? hint.disabled : null, |
| | }; |
| | }""" |
| | ) |
| | assert panel_snapshot == { |
| | "radioDisabled": True, |
| | "refDisabled": True, |
| | "restartDisabled": True, |
| | "nextDisabled": True, |
| | "hintDisabled": True, |
| | } |
| |
|
| | did_dispatch_end = _dispatch_video_event(page, "ended", elem_id="execute_video") |
| | assert did_dispatch_end |
| |
|
| | page.wait_for_function( |
| | """() => { |
| | const visible = (id) => { |
| | const el = document.getElementById(id); |
| | if (!el) return false; |
| | const st = getComputedStyle(el); |
| | return st.display !== 'none' && st.visibility !== 'hidden' && el.getClientRects().length > 0; |
| | }; |
| | return ( |
| | visible('live_obs') && |
| | visible('action_radio') && |
| | !visible('execute_video') && |
| | visible('control_panel_group') |
| | ); |
| | }""", |
| | timeout=2000, |
| | ) |
| | if expect_terminal_buttons_disabled: |
| | button_snapshot = page.evaluate( |
| | """() => { |
| | const resolveButton = (id) => { |
| | return document.querySelector(`#${id} button`) || document.querySelector(`button#${id}`); |
| | }; |
| | const execBtn = resolveButton('exec_btn'); |
| | const refBtn = resolveButton('reference_action_btn'); |
| | return { |
| | execDisabled: execBtn ? execBtn.disabled : null, |
| | refDisabled: refBtn ? refBtn.disabled : null, |
| | }; |
| | }""" |
| | ) |
| | assert button_snapshot == { |
| | "execDisabled": True, |
| | "refDisabled": True, |
| | } |
| | terminal_log_before = _read_log_output_value(page) |
| | assert terminal_log_before is not None |
| | assert expected_terminal_log is not None |
| | assert expected_terminal_log in terminal_log_before |
| | page.locator("#action_radio input[type='radio']").nth(1).check(force=True) |
| | page.wait_for_timeout(300) |
| | assert _read_log_output_value(page) == terminal_log_before |
| | else: |
| | button_snapshot = page.evaluate( |
| | """() => { |
| | const resolveButton = (id) => { |
| | return document.querySelector(`#${id} button`) || document.querySelector(`button#${id}`); |
| | }; |
| | const execBtn = resolveButton('exec_btn'); |
| | const refBtn = resolveButton('reference_action_btn'); |
| | return { |
| | execDisabled: execBtn ? execBtn.disabled : null, |
| | refDisabled: refBtn ? refBtn.disabled : null, |
| | }; |
| | }""" |
| | ) |
| | assert button_snapshot == { |
| | "execDisabled": False, |
| | "refDisabled": False, |
| | } |
| | assert _read_log_output_value(page) == config_module.UI_TEXT["log"]["action_selection_prompt"] |
| |
|
| | browser.close() |
| | finally: |
| | server.should_exit = True |
| | thread.join(timeout=10) |
| | demo.close() |
| | finally: |
| | for name, value in originals.items(): |
| | setattr(cb, name, value) |
| |
|
| |
|
| | def test_phase_machine_runtime_local_video_path_end_transition(): |
| | _run_local_execute_video_transition_test( |
| | status_text="Executing: pick", |
| | done=False, |
| | expect_terminal_buttons_disabled=False, |
| | ) |
| |
|
| |
|
| | def test_phase_machine_runtime_local_video_path_end_transition_terminal_success(): |
| | _run_local_execute_video_transition_test( |
| | status_text="SUCCESS", |
| | done=True, |
| | expect_terminal_buttons_disabled=True, |
| | expected_terminal_log="episode success", |
| | ) |
| |
|
| |
|
| | def test_phase_machine_runtime_local_video_path_end_transition_terminal_failed(): |
| | _run_local_execute_video_transition_test( |
| | status_text="Executing: pick | FAILED", |
| | done=True, |
| | expect_terminal_buttons_disabled=True, |
| | expected_terminal_log="episode failed", |
| | ) |
| |
|
| |
|
| | def test_phase_machine_runtime_stopcube_remain_static_merges_short_tail(monkeypatch): |
| | import gradio_callbacks as cb |
| | import config as config_module |
| | import vqa_options_override as override |
| |
|
| | demo_video_path = gr.get_video("world.mp4") |
| | fake_obs = np.zeros((24, 24, 3), dtype=np.uint8) |
| | hold_calls = [] |
| |
|
| | def _hold_spy(env, planner, absTimestep): |
| | _ = planner |
| | hold_calls.append(int(absTimestep)) |
| | env.elapsed_steps = int(absTimestep) |
| | return None |
| |
|
| | monkeypatch.setattr(override, "solve_hold_obj_absTimestep", _hold_spy) |
| |
|
| | class FakeBase: |
| | def __init__(self): |
| | self.steps_press = 270 |
| | self.interval = 30 |
| | self.button = object() |
| |
|
| | class FakeEnv: |
| | def __init__(self): |
| | self.unwrapped = FakeBase() |
| | self.elapsed_steps = 0 |
| |
|
| | class FakeSession: |
| | def __init__(self): |
| | self.env_id = "StopCube" |
| | self.episode_idx = 1 |
| | self.language_goal = "stop the moving cube" |
| | self.difficulty = "easy" |
| | self.seed = 123 |
| | self.non_demonstration_task_length = None |
| | self.demonstration_frames = [] |
| | self.last_execution_frames = [] |
| | self.base_frames = [fake_obs.copy()] |
| | self.env = FakeEnv() |
| | self.planner = object() |
| | self.raw_solve_options = override.get_vqa_options( |
| | self.env, |
| | self.planner, |
| | {"obj": None}, |
| | self.env_id, |
| | ) |
| | self.available_options = [ |
| | (f"{opt['label']}. {opt['action']}", idx) |
| | for idx, opt in enumerate(self.raw_solve_options) |
| | ] |
| |
|
| | def get_pil_image(self, use_segmented=False): |
| | _ = use_segmented |
| | return fake_obs.copy() |
| |
|
| | def update_observation(self, use_segmentation=False): |
| | _ = use_segmentation |
| | return None |
| |
|
| | def execute_action(self, option_idx, click_coords): |
| | _ = click_coords |
| | current_options = override.get_vqa_options( |
| | self.env, |
| | self.planner, |
| | {"obj": None}, |
| | self.env_id, |
| | ) |
| | current_options[option_idx]["solve"]() |
| |
|
| | frame_value = hold_calls[-1] if hold_calls else 0 |
| | frame = np.full((24, 24, 3), frame_value, dtype=np.uint8) |
| | self.last_execution_frames = [frame.copy(), frame.copy()] |
| | self.base_frames.extend(self.last_execution_frames) |
| | return frame.copy(), f"Executing: {current_options[option_idx]['label']}", False |
| |
|
| | fake_session = FakeSession() |
| | monkeypatch.setattr(cb, "get_session", lambda uid: fake_session) |
| | monkeypatch.setattr(cb, "increment_execute_count", lambda uid, env_id, ep_num: 1) |
| | monkeypatch.setattr(cb, "save_video", lambda frames, suffix="": demo_video_path) |
| | monkeypatch.setattr(cb, "concatenate_frames_horizontally", lambda frames, env_id=None: list(frames)) |
| | monkeypatch.setattr(cb.os.path, "exists", lambda path: True) |
| | monkeypatch.setattr(cb.os.path, "getsize", lambda path: 10) |
| |
|
| | with gr.Blocks(title="Native StopCube merge test") as demo: |
| | uid_state = gr.State(value="uid-stopcube-merge") |
| | phase_state = gr.State(value="action_point") |
| | post_execute_controls_state = gr.State( |
| | value={ |
| | "exec_btn_interactive": True, |
| | "reference_action_interactive": True, |
| | } |
| | ) |
| | post_execute_log_state = gr.State( |
| | value={ |
| | "preserve_terminal_log": False, |
| | "terminal_log_value": None, |
| | "preserve_execute_video_log": False, |
| | "execute_video_log_value": None, |
| | } |
| | ) |
| | suppress_state = gr.State(value=False) |
| | with gr.Column(visible=True, elem_id="main_interface") as main_interface: |
| | with gr.Column(visible=False, elem_id="video_phase_group") as video_phase_group: |
| | video_display = gr.Video(value=None, elem_id="demo_video", autoplay=False) |
| | watch_demo_video_btn = gr.Button( |
| | "Watch Video Input🎬", |
| | elem_id="watch_demo_video_btn", |
| | interactive=False, |
| | visible=False, |
| | ) |
| |
|
| | with gr.Column(visible=False, elem_id="execution_video_group") as execution_video_group: |
| | execute_video_display = gr.Video(value=None, elem_id="execute_video", autoplay=True) |
| |
|
| | with gr.Column(visible=True, elem_id="action_phase_group") as action_phase_group: |
| | img_display = gr.Image(value=fake_obs.copy(), elem_id="live_obs") |
| |
|
| | with gr.Column(visible=True, elem_id="control_panel_group") as control_panel_group: |
| | options_radio = gr.Radio( |
| | choices=fake_session.available_options, |
| | value=None, |
| | elem_id="action_radio", |
| | ) |
| | coords_box = gr.Textbox(config_module.UI_TEXT["coords"]["not_needed"], elem_id="coords_box") |
| | exec_btn = gr.Button("execute", interactive=True, elem_id="exec_btn") |
| | reference_action_btn = gr.Button("reference", interactive=True, elem_id="reference_action_btn") |
| | restart_episode_btn = gr.Button("restart", interactive=True, elem_id="restart_episode_btn") |
| | next_task_btn = gr.Button("next", interactive=True, elem_id="next_task_btn") |
| | task_hint_display = gr.Textbox("hint", interactive=True, elem_id="task_hint_display") |
| |
|
| | log_output = gr.Markdown("", elem_id="log_output") |
| | task_info_box = gr.Textbox("") |
| | progress_info_box = gr.Textbox("") |
| |
|
| | exec_btn.click( |
| | fn=cb.precheck_execute_inputs, |
| | inputs=[uid_state, options_radio, coords_box], |
| | outputs=[], |
| | queue=False, |
| | ).then( |
| | fn=cb.switch_to_execute_phase, |
| | inputs=[uid_state], |
| | outputs=[ |
| | options_radio, |
| | exec_btn, |
| | restart_episode_btn, |
| | next_task_btn, |
| | img_display, |
| | reference_action_btn, |
| | task_hint_display, |
| | ], |
| | queue=False, |
| | ).then( |
| | fn=cb.execute_step, |
| | inputs=[uid_state, options_radio, coords_box], |
| | outputs=[ |
| | img_display, |
| | log_output, |
| | task_info_box, |
| | progress_info_box, |
| | restart_episode_btn, |
| | next_task_btn, |
| | exec_btn, |
| | execute_video_display, |
| | action_phase_group, |
| | control_panel_group, |
| | execution_video_group, |
| | options_radio, |
| | coords_box, |
| | reference_action_btn, |
| | task_hint_display, |
| | post_execute_controls_state, |
| | post_execute_log_state, |
| | phase_state, |
| | ], |
| | queue=False, |
| | ) |
| | options_radio.change( |
| | fn=cb.on_option_select, |
| | inputs=[uid_state, options_radio, coords_box, suppress_state, post_execute_log_state], |
| | outputs=[coords_box, img_display, log_output, suppress_state, post_execute_log_state], |
| | queue=False, |
| | ) |
| |
|
| | execute_video_display.end( |
| | fn=cb.on_execute_video_end_transition, |
| | inputs=[uid_state, post_execute_controls_state, post_execute_log_state], |
| | outputs=[ |
| | execution_video_group, |
| | action_phase_group, |
| | control_panel_group, |
| | options_radio, |
| | exec_btn, |
| | restart_episode_btn, |
| | next_task_btn, |
| | img_display, |
| | log_output, |
| | reference_action_btn, |
| | task_hint_display, |
| | post_execute_log_state, |
| | phase_state, |
| | ], |
| | queue=False, |
| | ) |
| | execute_video_display.stop( |
| | fn=cb.on_execute_video_end_transition, |
| | inputs=[uid_state, post_execute_controls_state, post_execute_log_state], |
| | outputs=[ |
| | execution_video_group, |
| | action_phase_group, |
| | control_panel_group, |
| | options_radio, |
| | exec_btn, |
| | restart_episode_btn, |
| | next_task_btn, |
| | img_display, |
| | log_output, |
| | reference_action_btn, |
| | task_hint_display, |
| | post_execute_log_state, |
| | phase_state, |
| | ], |
| | queue=False, |
| | ) |
| |
|
| | port = _free_port() |
| | host = "127.0.0.1" |
| | root_url = f"http://{host}:{port}/" |
| |
|
| | app = FastAPI(title="native-stopcube-merge-test") |
| | app = gr.mount_gradio_app(app, demo, path="/") |
| |
|
| | config = uvicorn.Config(app, host=host, port=port, log_level="error") |
| | server = uvicorn.Server(config) |
| | thread = threading.Thread(target=server.run, daemon=True) |
| | thread.start() |
| | _wait_http_ready(root_url) |
| |
|
| | try: |
| | with sync_playwright() as p: |
| | browser = p.chromium.launch(headless=True) |
| | page = browser.new_page(viewport={"width": 1280, "height": 900}) |
| | page.goto(root_url, wait_until="domcontentloaded") |
| | page.wait_for_selector("#main_interface", state="visible", timeout=20000) |
| |
|
| | for _ in range(2): |
| | page.locator("#action_radio input[type='radio']").nth(1).check(force=True) |
| | page.locator("#exec_btn button, button#exec_btn").first.click() |
| | page.wait_for_selector("#execute_video video", timeout=5000) |
| | page.wait_for_function( |
| | """() => { |
| | const visible = (id) => { |
| | const el = document.getElementById(id); |
| | if (!el) return false; |
| | const st = getComputedStyle(el); |
| | return st.display !== 'none' && st.visibility !== 'hidden' && el.getClientRects().length > 0; |
| | }; |
| | const videoEl = document.querySelector('#execute_video video'); |
| | return ( |
| | visible('execution_video_group') && |
| | visible('execute_video') && |
| | !visible('action_phase_group') && |
| | !!videoEl && |
| | videoEl.autoplay === true |
| | ); |
| | }""", |
| | timeout=10000, |
| | ) |
| | assert _dispatch_video_event(page, "ended", elem_id="execute_video") |
| | page.wait_for_function( |
| | """() => { |
| | const visible = (id) => { |
| | const el = document.getElementById(id); |
| | if (!el) return false; |
| | const st = getComputedStyle(el); |
| | return st.display !== 'none' && st.visibility !== 'hidden' && el.getClientRects().length > 0; |
| | }; |
| | const execBtn = document.querySelector('#exec_btn button') || document.querySelector('button#exec_btn'); |
| | return ( |
| | visible('action_phase_group') && |
| | visible('control_panel_group') && |
| | !visible('execute_video') && |
| | !!execBtn && |
| | execBtn.disabled === false |
| | ); |
| | }""", |
| | timeout=5000, |
| | ) |
| |
|
| | assert hold_calls == [100, 240] |
| | browser.close() |
| | finally: |
| | server.should_exit = True |
| | thread.join(timeout=10) |
| | demo.close() |
| |
|