movimento / queue_manager.py
rydlrKE's picture
Fix CUDA import order - import spaces before torch (commit e28bffd)
2a5255e verified
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""HF mode user queue and session time limit."""
import math
import threading
import time
from collections.abc import Callable
from typing import Any
import viser
from .config import DEMO_UI_QUICK_START_MODAL_MD, MAX_SESSION_MINUTES
# Link for "Duplicate this Space" on Hugging Face (used in queue and expiry modals).
DUPLICATE_SPACE_URL = "https://huggingface.co/spaces/nvidia/Kimodo?duplicate=true"
GITHUB_REPO_URL = "https://github.com/nv-tlabs/kimodo"
# How often to refresh queue modal content (position, total, estimated wait).
QUEUE_MODAL_REFRESH_INTERVAL_SEC = 15
class UserQueue:
"""Thread-safe queue: active users (with activation timestamp) and waiting queue."""
def __init__(self, max_active: int, max_minutes: float) -> None:
self._max_active = max_active
self._max_minutes = max_minutes
self._max_seconds = max_minutes * 60.0
self._active: dict[int, float] = {} # client_id -> activation timestamp
self._queued: list[int] = []
self._lock = threading.Lock()
def try_activate(self, client_id: int) -> bool:
"""If a slot is free, add client as active and return True.
Else return False.
"""
with self._lock:
if len(self._active) < self._max_active:
self._active[client_id] = time.time()
return True
return False
def enqueue(self, client_id: int) -> None:
with self._lock:
if client_id not in self._queued:
self._queued.append(client_id)
def remove(self, client_id: int) -> bool:
"""Remove from active or queue.
Returns True if was active.
"""
with self._lock:
was_active = client_id in self._active
self._active.pop(client_id, None)
if client_id in self._queued:
self._queued.remove(client_id)
return was_active
def promote_next(self) -> int | None:
"""If queue non-empty, pop first, activate them, return their client_id.
Else None.
"""
with self._lock:
if not self._queued:
return None
client_id = self._queued.pop(0)
self._active[client_id] = time.time()
return client_id
def get_queue_position(self, client_id: int) -> tuple[int, int] | None:
"""(1-based position, total_in_queue) or None if not queued."""
with self._lock:
if client_id not in self._queued:
return None
pos = self._queued.index(client_id)
return (pos + 1, len(self._queued))
def get_estimated_wait_seconds(self, client_id: int) -> float:
"""Estimated seconds until this queued client gets a slot."""
with self._lock:
if client_id not in self._queued:
return 0.0
pos = self._queued.index(client_id) + 1 # 1-based
# Expiry times of active users (when they free a slot)
now = time.time()
expiries = sorted(now + self._max_seconds - (now - t) for t in self._active.values())
if not expiries:
return 0.0
# Nth slot to free (1-indexed) wraps over expiries
idx = (pos - 1) % len(expiries)
cycles = (pos - 1) // len(expiries)
slot_free_time = expiries[idx] + cycles * self._max_seconds
return max(0.0, slot_free_time - now)
def is_active(self, client_id: int) -> bool:
with self._lock:
return client_id in self._active
def was_active(self, client_id: int) -> bool:
"""True if client is currently active (for use when already holding lock)."""
return client_id in self._active
def _format_wait(seconds: float) -> str:
if seconds < 60:
return "less than a minute"
mins = int(math.ceil(seconds / 60))
return f"~{mins} minute{'s' if mins != 1 else ''}"
def _queue_modal_markdown(position: int, total: int, estimated_wait_sec: float) -> str:
wait_str = _format_wait(estimated_wait_sec)
mins = int(MAX_SESSION_MINUTES) if MAX_SESSION_MINUTES == int(MAX_SESSION_MINUTES) else MAX_SESSION_MINUTES
return f"""## Kimodo Demo — Please Wait
This demo runs with limited capacity.
Each user gets **{mins} minute{"s" if mins != 1 else ""}** of interactive time.
**Your position in queue:** {position} / {total}
**Estimated wait:** {wait_str}
Please keep this tab open — the demo will start automatically when it's your turn.
---
*Want unlimited access? [Duplicate this Space]({DUPLICATE_SPACE_URL}) or clone the [GitHub repo]({GITHUB_REPO_URL}) to run locally!*
"""
def _welcome_modal_markdown() -> str:
mins = int(MAX_SESSION_MINUTES) if MAX_SESSION_MINUTES == int(MAX_SESSION_MINUTES) else MAX_SESSION_MINUTES
return f"""## Welcome to Kimodo Demo
You have been granted a **{mins}-minute** demo session.
Your session timer has started.
Click the button below to begin!
"""
def _expiry_modal_markdown() -> str:
mins = int(MAX_SESSION_MINUTES) if MAX_SESSION_MINUTES == int(MAX_SESSION_MINUTES) else MAX_SESSION_MINUTES
return f"""## Session Expired
Your {mins}-minute demo session has ended.
Thank you for trying Kimodo!
Refresh this page to rejoin the queue, or [duplicate this Space]({DUPLICATE_SPACE_URL}) for unlimited access.
"""
class QueueManager:
"""Orchestrates HF mode: queue modals, welcome modal, session timer, promotion."""
def __init__(
self,
queue: UserQueue,
server: viser.ViserServer,
setup_demo_for_client: Callable[[viser.ClientHandle], None],
cleanup_session: Callable[[int], None],
) -> None:
self._queue = queue
self._server = server
self._setup_demo_for_client = setup_demo_for_client
self._cleanup_session = cleanup_session
self._max_seconds = queue._max_seconds
self._queue_modal_handles: dict[int, tuple[Any, Any]] = {}
self._welcome_modal_handles: dict[int, Any] = {}
self._expiry_timers: dict[int, threading.Timer] = {}
self._lock = threading.Lock()
self._refresh_stop = threading.Event()
self._refresh_thread = threading.Thread(
target=self._queue_modal_refresh_loop,
name="queue-modal-refresh",
daemon=True,
)
self._refresh_thread.start()
def _queue_modal_refresh_loop(self) -> None:
"""Periodically refresh queue modals so position, total, and estimated wait stay current."""
while not self._refresh_stop.wait(timeout=QUEUE_MODAL_REFRESH_INTERVAL_SEC):
self._update_all_queue_modals()
def on_client_connect(self, client: viser.ClientHandle) -> None:
"""Handle new connection: activate if slot free, else enqueue and show queue modal."""
client_id = client.client_id
if self._queue.try_activate(client_id):
try:
self._setup_demo_for_client(client)
except RuntimeError as e:
if "CUDA error" in str(e):
print(f"CUDA error while setting up client {client_id}: {e}")
return
raise
self._start_session_timer(client_id)
self._show_welcome_modal(client)
else:
self._queue.enqueue(client_id)
self._show_queue_modal(client)
self._update_all_queue_modals()
def on_client_disconnect(self, client_id: int) -> None:
"""Remove from queue/active, cancel timer, promote next if was active.
Session/scene cleanup is done by the demo's on_client_disconnect.
"""
with self._lock:
self._expiry_timers.pop(client_id, None)
self._queue_modal_handles.pop(client_id, None)
self._welcome_modal_handles.pop(client_id, None)
was_active = self._queue.remove(client_id)
if was_active:
self._promote_next_user()
else:
self._update_all_queue_modals()
def _show_queue_modal(self, client: viser.ClientHandle) -> None:
client_id = client.client_id
pos, total = self._queue.get_queue_position(client_id) or (0, 0)
wait_sec = self._queue.get_estimated_wait_seconds(client_id)
md_content = _queue_modal_markdown(pos, total, wait_sec)
modal = client.gui.add_modal(
"Kimodo Demo — Please Wait",
size="xl",
show_close_button=False,
)
with modal:
md_handle = client.gui.add_markdown(md_content)
with self._lock:
self._queue_modal_handles[client_id] = (modal, md_handle)
def _show_quick_start_modal(self, client: viser.ClientHandle) -> None:
"""Show the quick start instructions modal (same as non-HF mode)."""
with client.gui.add_modal(
"Welcome — Quick Start",
size="xl",
show_close_button=True,
save_choice="kimodo.demo.quick_start_ack",
) as quick_start_modal:
client.gui.add_markdown(DEMO_UI_QUICK_START_MODAL_MD)
client.gui.add_button("Got it (don't remind me again)").on_click(lambda _: quick_start_modal.close())
def _show_welcome_modal(self, client: viser.ClientHandle) -> None:
client_id = client.client_id
def _on_start_demo(_: Any) -> None:
modal.close()
self._show_quick_start_modal(client)
modal = client.gui.add_modal(
"Welcome to Kimodo Demo",
size="xl",
show_close_button=True,
)
with modal:
client.gui.add_markdown(_welcome_modal_markdown())
client.gui.add_button("Start Demo").on_click(_on_start_demo)
with self._lock:
self._welcome_modal_handles[client_id] = modal
def _update_all_queue_modals(self) -> None:
with self._lock:
handles = list(self._queue_modal_handles.items())
for client_id, (modal, md_handle) in handles:
pos_total = self._queue.get_queue_position(client_id)
if pos_total is None:
continue
pos, total = pos_total
wait_sec = self._queue.get_estimated_wait_seconds(client_id)
try:
md_handle.content = _queue_modal_markdown(pos, total, wait_sec)
except Exception:
pass
def _promote_next_user(self) -> None:
promoted_id = self._queue.promote_next()
if promoted_id is None:
return
clients = self._server.get_clients()
client = clients.get(promoted_id)
if client is None:
return
with self._lock:
old = self._queue_modal_handles.pop(promoted_id, None)
if old is not None:
try:
old[0].close()
except Exception:
pass
try:
self._setup_demo_for_client(client)
except RuntimeError as e:
if "CUDA error" in str(e):
print(f"CUDA error while setting up client {promoted_id}: {e}")
return
raise
self._start_session_timer(promoted_id)
self._show_welcome_modal(client)
self._update_all_queue_modals()
def _start_session_timer(self, client_id: int) -> None:
def on_expiry() -> None:
self._on_session_expired(client_id)
t = threading.Timer(self._max_seconds, on_expiry)
t.daemon = True
with self._lock:
self._expiry_timers[client_id] = t
t.start()
def _on_session_expired(self, client_id: int) -> None:
with self._lock:
self._expiry_timers.pop(client_id, None)
if not self._queue.is_active(client_id):
return
self._queue.remove(client_id)
clients = self._server.get_clients()
client = clients.get(client_id)
if client is not None:
try:
with client.gui.add_modal(
"Session Expired",
size="lg",
show_close_button=False,
) as modal_ctx:
client.gui.add_markdown(_expiry_modal_markdown())
except Exception:
pass
self._cleanup_session(client_id)
self._promote_next_user()