"""Per-session activity tracking for EVE-based Gradio demos. Centralises three concerns the apps used to manage by hand: 1. Counting connected sessions (mirrored on ``EveWorkerPool.session_count`` so log lines and rendered overlays can read it cheaply). 2. Emitting ``session_start`` / ``session_end`` analytics events. 3. Reviving silently-reaped sessions when the user comes back, so the timeline in analytics doesn't show ``session_end`` followed by orphaned activity. The class wires together ``UsageTracker``, ``EveWorkerPool`` and a ``SessionReaper`` so demo apps don't need to repeat the bookkeeping. """ from __future__ import annotations import logging import threading from typing import TYPE_CHECKING import gradio as gr # runtime import: Gradio resolves type hints at wire time from session_reaper import SessionReaper if TYPE_CHECKING: from eve_worker_pool import EveWorkerPool from usage_analytics import UsageTracker DEFAULT_IDLE_TIMEOUT_S = 600 # 10 minutes class SessionTracker: """Track Gradio sessions and forward activity to analytics + reaper. Args: pool: Worker pool whose ``session_count`` counter is mirrored. tracker: Analytics emitter (``log("session_start")`` etc.). logger: Logger used for connect/disconnect lines. idle_timeout_s: Seconds of cross-tab inactivity before a session is reaped. ``None`` disables the reaper entirely. """ def __init__( self, pool: EveWorkerPool, tracker: UsageTracker, logger: logging.Logger, idle_timeout_s: float | None = DEFAULT_IDLE_TIMEOUT_S, ) -> None: self._pool = pool self._tracker = tracker self._logger = logger self._active: set[str] = set() self._lock = threading.Lock() self._reaper: SessionReaper | None = None if idle_timeout_s is not None: self._reaper = SessionReaper( timeout_s=idle_timeout_s, on_expired=lambda sh: self.end(sh, reason="idle-timeout"), ) # ----- session lifecycle ------------------------------------------------- def on_load(self, request: gr.Request) -> str: """Gradio ``demo.load`` handler — register the session and emit start.""" with self._lock: self._pool.session_count += 1 self._active.add(request.session_hash) if self._reaper: self._reaper.add(request.session_hash) self._logger.info(f"[session] connected [sessions={self._pool.session_count}]") self._tracker.log(request.session_hash, "session_start") return request.session_hash def on_unload(self, request: gr.Request) -> None: """Gradio ``demo.unload`` handler — close the session normally.""" self.end(request.session_hash, reason="disconnect") def end(self, session_hash: str, reason: str) -> None: """Close a session. Safe to call for an unknown / already-removed hash.""" with self._lock: if session_hash not in self._active: return self._active.discard(session_hash) self._pool.session_count = max(0, self._pool.session_count - 1) if self._reaper: self._reaper.remove(session_hash) self._logger.info(f"[session] ended ({reason}) [sessions={self._pool.session_count}]") self._tracker.log(session_hash, "session_end", reason=reason) # ----- per-event tracking ------------------------------------------------ def track(self, session_hash: str, event: str, **details: str | int | float | bool) -> None: """Emit an analytics event and refresh the reaper's last-activity stamp. If the reaper had already evicted this session, silently revive it (re-emit ``session_start`` first) so the timeline stays consistent. """ if self._reaper and not self._reaper.touch(session_hash) and event != "session_start": with self._lock: self._active.add(session_hash) self._pool.session_count += 1 self._reaper.add(session_hash) self._logger.info( f"[session] revived after idle reap [sessions={self._pool.session_count}]" ) self._tracker.log(session_hash, "session_start") self._tracker.log(session_hash, event, **details) def on_tab_switch(self, evt: gr.SelectData, request: gr.Request) -> None: """Gradio tabs ``select`` handler — record which tab the user opened.""" self.track(request.session_hash, "tab_switch", tab=evt.value) # ----- shutdown ---------------------------------------------------------- def shutdown(self, reason: str = "shutdown") -> None: """Stop the reaper and close every still-active session.""" if self._reaper: self._reaper.shutdown() with self._lock: remaining = list(self._active) for sh in remaining: self.end(sh, reason=reason)