"""Lightweight usage event logger for HF Space demos. Appends JSONL events to a local folder, which huggingface_hub's CommitScheduler periodically pushes to a private HF Dataset repo. Auto-enables on HF Spaces; disabled locally unless USAGE_ANALYTICS=1. """ from __future__ import annotations import io import json import os import sys import uuid from datetime import datetime, timezone from pathlib import Path from typing import TYPE_CHECKING from log_utils import setup_logger if TYPE_CHECKING: from huggingface_hub import CommitInfo logger = setup_logger("UsageTracker") def _make_cleanup_scheduler(active_log_file: Path, **kwargs: object) -> object: """Build a CommitScheduler that deletes pushed files (except the active one). Prevents stale files from being re-uploaded under a different ``path_in_repo`` if the demo name changes between process restarts. """ from huggingface_hub import CommitScheduler class _Cleanup(CommitScheduler): def __init__(self, active_file: Path, **kw: object) -> None: self._active_file = active_file.resolve() super().__init__(**kw) def push_to_hub(self) -> CommitInfo | None: result = super().push_to_hub() if result is None: return None with self.lock: for path in sorted(self.folder_path.glob("**/*")): if path.is_file() and path.resolve() != self._active_file: path.unlink(missing_ok=True) self.last_uploaded.pop(path, None) return result return _Cleanup(active_file=active_log_file, **kwargs) class UsageTracker: """Append-only usage event logger backed by a private HF Dataset repo. Events are written as JSONL to a local folder. A background thread (CommitScheduler) batches and pushes them to the dataset repo every ``push_interval_minutes``. Thread-safe, never raises from ``log()``. Args: repo_id: Target HF dataset repo (e.g. ``"LatticeSemi/STAGING-Demo-Analytics-v1.0"``). push_interval_minutes: How often CommitScheduler pushes to HF Hub. enabled: Explicit on/off. ``None`` = auto-detect (on when ``SPACE_ID`` env var is set, off otherwise). The ``USAGE_ANALYTICS`` env var always takes precedence over both this parameter and ``SPACE_ID``. """ def __init__( self, repo_id: str, push_interval_minutes: int = 10, enabled: bool | None = None, ) -> None: self._enabled = False self._log_file: Path | None = None self._fh: io.TextIOWrapper | None = None self._scheduler: object | None = None should_enable = self._resolve_enabled(enabled) if not should_enable: logger.info("Usage analytics: disabled") return demo_name = self._resolve_demo_name() try: log_dir = Path("usage_logs") log_dir.mkdir(exist_ok=True) self._log_file = log_dir / f"events_{uuid.uuid4().hex[:12]}.jsonl" token = ( os.environ.get("ANALYTICS_TOKEN") or os.environ.get("HF_TOKEN") or os.environ.get("MODEL_ACCESS_TOKEN") ) self._fh = self._log_file.open("a") self._scheduler = _make_cleanup_scheduler( active_log_file=self._log_file, repo_id=repo_id, repo_type="dataset", folder_path=log_dir, path_in_repo=demo_name, every=push_interval_minutes, token=token, ) self._enabled = True logger.info( f"Usage analytics: enabled -> {repo_id}/{demo_name} " f"(push every {push_interval_minutes}m)" ) except Exception as exc: logger.warning(f"Usage analytics: init failed, continuing without ({exc})") @property def enabled(self) -> bool: return self._enabled def log(self, session_hash: str, event: str, **details: str | int | float | bool) -> None: """Append a usage event. Thread-safe, never raises. Args: session_hash: Gradio session hash (truncated to 8 chars for privacy). event: Event name (e.g. ``"session_start"``, ``"video_process"``). **details: Arbitrary key-value pairs stored alongside the event. """ if not self._enabled or self._fh is None or self._scheduler is None: return try: entry = { "ts": datetime.now(timezone.utc).isoformat(), "session": session_hash[:8] if session_hash else "unknown", "event": event, **details, } with self._scheduler.lock: self._fh.write(json.dumps(entry, separators=(",", ":")) + "\n") self._fh.flush() except Exception: pass def shutdown(self) -> None: """Flush and close the log file handle.""" if self._fh is not None: try: self._fh.flush() self._fh.close() except Exception: pass self._fh = None @staticmethod def _resolve_demo_name() -> str: """Derive demo name from the HF Space name, or ``"debug-"`` locally. The folder suffix keeps eve_hmi / eve_gmod / future demos from sharing the same ``debug/`` path_in_repo when they're run locally. """ space_id = os.environ.get("SPACE_ID", "") if "/" in space_id: return space_id.split("/", 1)[1] if space_id: return space_id try: folder = Path(sys.argv[0]).resolve().parent.name if folder: return f"debug-{folder}" except Exception: pass return "debug" @staticmethod def _resolve_enabled(explicit: bool | None) -> bool: """Determine whether analytics should be enabled. Priority: USAGE_ANALYTICS env var > explicit param > SPACE_ID auto-detect. """ env = os.environ.get("USAGE_ANALYTICS", "").strip().lower() if env in ("0", "false", "off"): return False if env in ("1", "true", "on"): return True if explicit is not None: return explicit return bool(os.environ.get("SPACE_ID"))