| |
| |
| """HF mode user queue and session time limit.""" |
|
|
| import math |
| import threading |
| import time |
| from collections.abc import Callable |
| from typing import Any |
|
|
| import viser |
|
|
| from .config import DEMO_UI_QUICK_START_MODAL_MD, MAX_SESSION_MINUTES |
|
|
| |
| DUPLICATE_SPACE_URL = "https://huggingface.co/spaces/nvidia/Kimodo?duplicate=true" |
| GITHUB_REPO_URL = "https://github.com/nv-tlabs/kimodo" |
|
|
| |
| QUEUE_MODAL_REFRESH_INTERVAL_SEC = 15 |
|
|
|
|
| class UserQueue: |
| """Thread-safe queue: active users (with activation timestamp) and waiting queue.""" |
|
|
| def __init__(self, max_active: int, max_minutes: float) -> None: |
| self._max_active = max_active |
| self._max_minutes = max_minutes |
| self._max_seconds = max_minutes * 60.0 |
| self._active: dict[int, float] = {} |
| self._queued: list[int] = [] |
| self._lock = threading.Lock() |
|
|
| def try_activate(self, client_id: int) -> bool: |
| """If a slot is free, add client as active and return True. |
| |
| Else return False. |
| """ |
| with self._lock: |
| if len(self._active) < self._max_active: |
| self._active[client_id] = time.time() |
| return True |
| return False |
|
|
| def enqueue(self, client_id: int) -> None: |
| with self._lock: |
| if client_id not in self._queued: |
| self._queued.append(client_id) |
|
|
| def remove(self, client_id: int) -> bool: |
| """Remove from active or queue. |
| |
| Returns True if was active. |
| """ |
| with self._lock: |
| was_active = client_id in self._active |
| self._active.pop(client_id, None) |
| if client_id in self._queued: |
| self._queued.remove(client_id) |
| return was_active |
|
|
| def promote_next(self) -> int | None: |
| """If queue non-empty, pop first, activate them, return their client_id. |
| |
| Else None. |
| """ |
| with self._lock: |
| if not self._queued: |
| return None |
| client_id = self._queued.pop(0) |
| self._active[client_id] = time.time() |
| return client_id |
|
|
| def get_queue_position(self, client_id: int) -> tuple[int, int] | None: |
| """(1-based position, total_in_queue) or None if not queued.""" |
| with self._lock: |
| if client_id not in self._queued: |
| return None |
| pos = self._queued.index(client_id) |
| return (pos + 1, len(self._queued)) |
|
|
| def get_estimated_wait_seconds(self, client_id: int) -> float: |
| """Estimated seconds until this queued client gets a slot.""" |
| with self._lock: |
| if client_id not in self._queued: |
| return 0.0 |
| pos = self._queued.index(client_id) + 1 |
| |
| now = time.time() |
| expiries = sorted(now + self._max_seconds - (now - t) for t in self._active.values()) |
| if not expiries: |
| return 0.0 |
| |
| idx = (pos - 1) % len(expiries) |
| cycles = (pos - 1) // len(expiries) |
| slot_free_time = expiries[idx] + cycles * self._max_seconds |
| return max(0.0, slot_free_time - now) |
|
|
| def is_active(self, client_id: int) -> bool: |
| with self._lock: |
| return client_id in self._active |
|
|
| def was_active(self, client_id: int) -> bool: |
| """True if client is currently active (for use when already holding lock).""" |
| return client_id in self._active |
|
|
|
|
| def _format_wait(seconds: float) -> str: |
| if seconds < 60: |
| return "less than a minute" |
| mins = int(math.ceil(seconds / 60)) |
| return f"~{mins} minute{'s' if mins != 1 else ''}" |
|
|
|
|
| def _queue_modal_markdown(position: int, total: int, estimated_wait_sec: float) -> str: |
| wait_str = _format_wait(estimated_wait_sec) |
| mins = int(MAX_SESSION_MINUTES) if MAX_SESSION_MINUTES == int(MAX_SESSION_MINUTES) else MAX_SESSION_MINUTES |
| return f"""## Kimodo Demo — Please Wait |
| |
| This demo runs with limited capacity. |
| Each user gets **{mins} minute{"s" if mins != 1 else ""}** of interactive time. |
| |
| **Your position in queue:** {position} / {total} |
| |
| **Estimated wait:** {wait_str} |
| |
| Please keep this tab open — the demo will start automatically when it's your turn. |
| |
| --- |
| *Want unlimited access? [Duplicate this Space]({DUPLICATE_SPACE_URL}) or clone the [GitHub repo]({GITHUB_REPO_URL}) to run locally!* |
| """ |
|
|
|
|
| def _welcome_modal_markdown() -> str: |
| mins = int(MAX_SESSION_MINUTES) if MAX_SESSION_MINUTES == int(MAX_SESSION_MINUTES) else MAX_SESSION_MINUTES |
| return f"""## Welcome to Kimodo Demo |
| |
| You have been granted a **{mins}-minute** demo session. |
| Your session timer has started. |
| |
| Click the button below to begin! |
| """ |
|
|
|
|
| def _expiry_modal_markdown() -> str: |
| mins = int(MAX_SESSION_MINUTES) if MAX_SESSION_MINUTES == int(MAX_SESSION_MINUTES) else MAX_SESSION_MINUTES |
| return f"""## Session Expired |
| |
| Your {mins}-minute demo session has ended. |
| Thank you for trying Kimodo! |
| |
| Refresh this page to rejoin the queue, or [duplicate this Space]({DUPLICATE_SPACE_URL}) for unlimited access. |
| """ |
|
|
|
|
| class QueueManager: |
| """Orchestrates HF mode: queue modals, welcome modal, session timer, promotion.""" |
|
|
| def __init__( |
| self, |
| queue: UserQueue, |
| server: viser.ViserServer, |
| setup_demo_for_client: Callable[[viser.ClientHandle], None], |
| cleanup_session: Callable[[int], None], |
| ) -> None: |
| self._queue = queue |
| self._server = server |
| self._setup_demo_for_client = setup_demo_for_client |
| self._cleanup_session = cleanup_session |
| self._max_seconds = queue._max_seconds |
|
|
| self._queue_modal_handles: dict[int, tuple[Any, Any]] = {} |
| self._welcome_modal_handles: dict[int, Any] = {} |
| self._expiry_timers: dict[int, threading.Timer] = {} |
| self._lock = threading.Lock() |
| self._refresh_stop = threading.Event() |
| self._refresh_thread = threading.Thread( |
| target=self._queue_modal_refresh_loop, |
| name="queue-modal-refresh", |
| daemon=True, |
| ) |
| self._refresh_thread.start() |
|
|
| def _queue_modal_refresh_loop(self) -> None: |
| """Periodically refresh queue modals so position, total, and estimated wait stay current.""" |
| while not self._refresh_stop.wait(timeout=QUEUE_MODAL_REFRESH_INTERVAL_SEC): |
| self._update_all_queue_modals() |
|
|
| def on_client_connect(self, client: viser.ClientHandle) -> None: |
| """Handle new connection: activate if slot free, else enqueue and show queue modal.""" |
| client_id = client.client_id |
| if self._queue.try_activate(client_id): |
| try: |
| self._setup_demo_for_client(client) |
| except RuntimeError as e: |
| if "CUDA error" in str(e): |
| print(f"CUDA error while setting up client {client_id}: {e}") |
| return |
| raise |
| self._start_session_timer(client_id) |
| self._show_welcome_modal(client) |
| else: |
| self._queue.enqueue(client_id) |
| self._show_queue_modal(client) |
| self._update_all_queue_modals() |
|
|
| def on_client_disconnect(self, client_id: int) -> None: |
| """Remove from queue/active, cancel timer, promote next if was active. |
| |
| Session/scene cleanup is done by the demo's on_client_disconnect. |
| """ |
| with self._lock: |
| self._expiry_timers.pop(client_id, None) |
| self._queue_modal_handles.pop(client_id, None) |
| self._welcome_modal_handles.pop(client_id, None) |
| was_active = self._queue.remove(client_id) |
| if was_active: |
| self._promote_next_user() |
| else: |
| self._update_all_queue_modals() |
|
|
| def _show_queue_modal(self, client: viser.ClientHandle) -> None: |
| client_id = client.client_id |
| pos, total = self._queue.get_queue_position(client_id) or (0, 0) |
| wait_sec = self._queue.get_estimated_wait_seconds(client_id) |
| md_content = _queue_modal_markdown(pos, total, wait_sec) |
|
|
| modal = client.gui.add_modal( |
| "Kimodo Demo — Please Wait", |
| size="xl", |
| show_close_button=False, |
| ) |
| with modal: |
| md_handle = client.gui.add_markdown(md_content) |
| with self._lock: |
| self._queue_modal_handles[client_id] = (modal, md_handle) |
|
|
| def _show_quick_start_modal(self, client: viser.ClientHandle) -> None: |
| """Show the quick start instructions modal (same as non-HF mode).""" |
| with client.gui.add_modal( |
| "Welcome — Quick Start", |
| size="xl", |
| show_close_button=True, |
| save_choice="kimodo.demo.quick_start_ack", |
| ) as quick_start_modal: |
| client.gui.add_markdown(DEMO_UI_QUICK_START_MODAL_MD) |
| client.gui.add_button("Got it (don't remind me again)").on_click(lambda _: quick_start_modal.close()) |
|
|
| def _show_welcome_modal(self, client: viser.ClientHandle) -> None: |
| client_id = client.client_id |
|
|
| def _on_start_demo(_: Any) -> None: |
| modal.close() |
| self._show_quick_start_modal(client) |
|
|
| modal = client.gui.add_modal( |
| "Welcome to Kimodo Demo", |
| size="xl", |
| show_close_button=True, |
| ) |
| with modal: |
| client.gui.add_markdown(_welcome_modal_markdown()) |
| client.gui.add_button("Start Demo").on_click(_on_start_demo) |
| with self._lock: |
| self._welcome_modal_handles[client_id] = modal |
|
|
| def _update_all_queue_modals(self) -> None: |
| with self._lock: |
| handles = list(self._queue_modal_handles.items()) |
| for client_id, (modal, md_handle) in handles: |
| pos_total = self._queue.get_queue_position(client_id) |
| if pos_total is None: |
| continue |
| pos, total = pos_total |
| wait_sec = self._queue.get_estimated_wait_seconds(client_id) |
| try: |
| md_handle.content = _queue_modal_markdown(pos, total, wait_sec) |
| except Exception: |
| pass |
|
|
| def _promote_next_user(self) -> None: |
| promoted_id = self._queue.promote_next() |
| if promoted_id is None: |
| return |
| clients = self._server.get_clients() |
| client = clients.get(promoted_id) |
| if client is None: |
| return |
| with self._lock: |
| old = self._queue_modal_handles.pop(promoted_id, None) |
| if old is not None: |
| try: |
| old[0].close() |
| except Exception: |
| pass |
| try: |
| self._setup_demo_for_client(client) |
| except RuntimeError as e: |
| if "CUDA error" in str(e): |
| print(f"CUDA error while setting up client {promoted_id}: {e}") |
| return |
| raise |
| self._start_session_timer(promoted_id) |
| self._show_welcome_modal(client) |
| self._update_all_queue_modals() |
|
|
| def _start_session_timer(self, client_id: int) -> None: |
| def on_expiry() -> None: |
| self._on_session_expired(client_id) |
|
|
| t = threading.Timer(self._max_seconds, on_expiry) |
| t.daemon = True |
| with self._lock: |
| self._expiry_timers[client_id] = t |
| t.start() |
|
|
| def _on_session_expired(self, client_id: int) -> None: |
| with self._lock: |
| self._expiry_timers.pop(client_id, None) |
| if not self._queue.is_active(client_id): |
| return |
| self._queue.remove(client_id) |
| clients = self._server.get_clients() |
| client = clients.get(client_id) |
| if client is not None: |
| try: |
| with client.gui.add_modal( |
| "Session Expired", |
| size="lg", |
| show_close_button=False, |
| ) as modal_ctx: |
| client.gui.add_markdown(_expiry_modal_markdown()) |
| except Exception: |
| pass |
| self._cleanup_session(client_id) |
| self._promote_next_user() |
|
|