beaupreda's picture
Upload sensAI-Generic-Object-Detection with upload_repo.py
13170f7 verified
Raw
History Blame Contribute Delete
5.05 kB
"""Per-session activity tracking for EVE-based Gradio demos.
Centralises three concerns the apps used to manage by hand:
1. Counting connected sessions (mirrored on ``EveWorkerPool.session_count``
so log lines and rendered overlays can read it cheaply).
2. Emitting ``session_start`` / ``session_end`` analytics events.
3. Reviving silently-reaped sessions when the user comes back, so the
timeline in analytics doesn't show ``session_end`` followed by orphaned
activity.
The class wires together ``UsageTracker``, ``EveWorkerPool`` and a
``SessionReaper`` so demo apps don't need to repeat the bookkeeping.
"""
from __future__ import annotations
import logging
import threading
from typing import TYPE_CHECKING
import gradio as gr # runtime import: Gradio resolves type hints at wire time
from session_reaper import SessionReaper
if TYPE_CHECKING:
from eve_worker_pool import EveWorkerPool
from usage_analytics import UsageTracker
DEFAULT_IDLE_TIMEOUT_S = 600 # 10 minutes
class SessionTracker:
"""Track Gradio sessions and forward activity to analytics + reaper.
Args:
pool: Worker pool whose ``session_count`` counter is mirrored.
tracker: Analytics emitter (``log("session_start")`` etc.).
logger: Logger used for connect/disconnect lines.
idle_timeout_s: Seconds of cross-tab inactivity before a session is
reaped. ``None`` disables the reaper entirely.
"""
def __init__(
self,
pool: EveWorkerPool,
tracker: UsageTracker,
logger: logging.Logger,
idle_timeout_s: float | None = DEFAULT_IDLE_TIMEOUT_S,
) -> None:
self._pool = pool
self._tracker = tracker
self._logger = logger
self._active: set[str] = set()
self._lock = threading.Lock()
self._reaper: SessionReaper | None = None
if idle_timeout_s is not None:
self._reaper = SessionReaper(
timeout_s=idle_timeout_s,
on_expired=lambda sh: self.end(sh, reason="idle-timeout"),
)
# ----- session lifecycle -------------------------------------------------
def on_load(self, request: gr.Request) -> str:
"""Gradio ``demo.load`` handler — register the session and emit start."""
with self._lock:
self._pool.session_count += 1
self._active.add(request.session_hash)
if self._reaper:
self._reaper.add(request.session_hash)
self._logger.info(f"[session] connected [sessions={self._pool.session_count}]")
self._tracker.log(request.session_hash, "session_start")
return request.session_hash
def on_unload(self, request: gr.Request) -> None:
"""Gradio ``demo.unload`` handler — close the session normally."""
self.end(request.session_hash, reason="disconnect")
def end(self, session_hash: str, reason: str) -> None:
"""Close a session. Safe to call for an unknown / already-removed hash."""
with self._lock:
if session_hash not in self._active:
return
self._active.discard(session_hash)
self._pool.session_count = max(0, self._pool.session_count - 1)
if self._reaper:
self._reaper.remove(session_hash)
self._logger.info(f"[session] ended ({reason}) [sessions={self._pool.session_count}]")
self._tracker.log(session_hash, "session_end", reason=reason)
# ----- per-event tracking ------------------------------------------------
def track(self, session_hash: str, event: str, **details: str | int | float | bool) -> None:
"""Emit an analytics event and refresh the reaper's last-activity stamp.
If the reaper had already evicted this session, silently revive it
(re-emit ``session_start`` first) so the timeline stays consistent.
"""
if self._reaper and not self._reaper.touch(session_hash) and event != "session_start":
with self._lock:
self._active.add(session_hash)
self._pool.session_count += 1
self._reaper.add(session_hash)
self._logger.info(
f"[session] revived after idle reap [sessions={self._pool.session_count}]"
)
self._tracker.log(session_hash, "session_start")
self._tracker.log(session_hash, event, **details)
def on_tab_switch(self, evt: gr.SelectData, request: gr.Request) -> None:
"""Gradio tabs ``select`` handler — record which tab the user opened."""
self.track(request.session_hash, "tab_switch", tab=evt.value)
# ----- shutdown ----------------------------------------------------------
def shutdown(self, reason: str = "shutdown") -> None:
"""Stop the reaper and close every still-active session."""
if self._reaper:
self._reaper.shutdown()
with self._lock:
remaining = list(self._active)
for sh in remaining:
self.end(sh, reason=reason)