Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """Lightweight usage event logger for HF Space demos. | |
| Appends JSONL events to a local folder, which huggingface_hub's CommitScheduler | |
| periodically pushes to a private HF Dataset repo. Auto-enables on HF Spaces; | |
| disabled locally unless USAGE_ANALYTICS=1. | |
| """ | |
| from __future__ import annotations | |
| import io | |
| import json | |
| import os | |
| import sys | |
| import uuid | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import TYPE_CHECKING | |
| from log_utils import setup_logger | |
| if TYPE_CHECKING: | |
| from huggingface_hub import CommitInfo | |
| logger = setup_logger("UsageTracker") | |
| def _make_cleanup_scheduler(active_log_file: Path, **kwargs: object) -> object: | |
| """Build a CommitScheduler that deletes pushed files (except the active one). | |
| Prevents stale files from being re-uploaded under a different ``path_in_repo`` | |
| if the demo name changes between process restarts. | |
| """ | |
| from huggingface_hub import CommitScheduler | |
| class _Cleanup(CommitScheduler): | |
| def __init__(self, active_file: Path, **kw: object) -> None: | |
| self._active_file = active_file.resolve() | |
| super().__init__(**kw) | |
| def push_to_hub(self) -> CommitInfo | None: | |
| result = super().push_to_hub() | |
| if result is None: | |
| return None | |
| with self.lock: | |
| for path in sorted(self.folder_path.glob("**/*")): | |
| if path.is_file() and path.resolve() != self._active_file: | |
| path.unlink(missing_ok=True) | |
| self.last_uploaded.pop(path, None) | |
| return result | |
| return _Cleanup(active_file=active_log_file, **kwargs) | |
| class UsageTracker: | |
| """Append-only usage event logger backed by a private HF Dataset repo. | |
| Events are written as JSONL to a local folder. A background thread | |
| (CommitScheduler) batches and pushes them to the dataset repo every | |
| ``push_interval_minutes``. Thread-safe, never raises from ``log()``. | |
| Args: | |
| repo_id: Target HF dataset repo | |
| (e.g. ``"LatticeSemi/STAGING-Demo-Analytics-v1.0"``). | |
| push_interval_minutes: How often CommitScheduler pushes to HF Hub. | |
| enabled: Explicit on/off. ``None`` = auto-detect (on when ``SPACE_ID`` | |
| env var is set, off otherwise). The ``USAGE_ANALYTICS`` env var | |
| always takes precedence over both this parameter and ``SPACE_ID``. | |
| """ | |
| def __init__( | |
| self, | |
| repo_id: str, | |
| push_interval_minutes: int = 10, | |
| enabled: bool | None = None, | |
| ) -> None: | |
| self._enabled = False | |
| self._log_file: Path | None = None | |
| self._fh: io.TextIOWrapper | None = None | |
| self._scheduler: object | None = None | |
| should_enable = self._resolve_enabled(enabled) | |
| if not should_enable: | |
| logger.info("Usage analytics: disabled") | |
| return | |
| demo_name = self._resolve_demo_name() | |
| try: | |
| log_dir = Path("usage_logs") | |
| log_dir.mkdir(exist_ok=True) | |
| self._log_file = log_dir / f"events_{uuid.uuid4().hex[:12]}.jsonl" | |
| token = ( | |
| os.environ.get("ANALYTICS_TOKEN") | |
| or os.environ.get("HF_TOKEN") | |
| or os.environ.get("MODEL_ACCESS_TOKEN") | |
| ) | |
| self._fh = self._log_file.open("a") | |
| self._scheduler = _make_cleanup_scheduler( | |
| active_log_file=self._log_file, | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| folder_path=log_dir, | |
| path_in_repo=demo_name, | |
| every=push_interval_minutes, | |
| token=token, | |
| ) | |
| self._enabled = True | |
| logger.info( | |
| f"Usage analytics: enabled -> {repo_id}/{demo_name} " | |
| f"(push every {push_interval_minutes}m)" | |
| ) | |
| except Exception as exc: | |
| logger.warning(f"Usage analytics: init failed, continuing without ({exc})") | |
| def enabled(self) -> bool: | |
| return self._enabled | |
| def log(self, session_hash: str, event: str, **details: str | int | float | bool) -> None: | |
| """Append a usage event. Thread-safe, never raises. | |
| Args: | |
| session_hash: Gradio session hash (truncated to 8 chars for privacy). | |
| event: Event name (e.g. ``"session_start"``, ``"video_process"``). | |
| **details: Arbitrary key-value pairs stored alongside the event. | |
| """ | |
| if not self._enabled or self._fh is None or self._scheduler is None: | |
| return | |
| try: | |
| entry = { | |
| "ts": datetime.now(timezone.utc).isoformat(), | |
| "session": session_hash[:8] if session_hash else "unknown", | |
| "event": event, | |
| **details, | |
| } | |
| with self._scheduler.lock: | |
| self._fh.write(json.dumps(entry, separators=(",", ":")) + "\n") | |
| self._fh.flush() | |
| except Exception: | |
| pass | |
| def shutdown(self) -> None: | |
| """Flush and close the log file handle.""" | |
| if self._fh is not None: | |
| try: | |
| self._fh.flush() | |
| self._fh.close() | |
| except Exception: | |
| pass | |
| self._fh = None | |
| def _resolve_demo_name() -> str: | |
| """Derive demo name from the HF Space name, or ``"debug-<folder>"`` locally. | |
| The folder suffix keeps eve_hmi / eve_gmod / future demos from | |
| sharing the same ``debug/`` path_in_repo when they're run locally. | |
| """ | |
| space_id = os.environ.get("SPACE_ID", "") | |
| if "/" in space_id: | |
| return space_id.split("/", 1)[1] | |
| if space_id: | |
| return space_id | |
| try: | |
| folder = Path(sys.argv[0]).resolve().parent.name | |
| if folder: | |
| return f"debug-{folder}" | |
| except Exception: | |
| pass | |
| return "debug" | |
| def _resolve_enabled(explicit: bool | None) -> bool: | |
| """Determine whether analytics should be enabled. | |
| Priority: USAGE_ANALYTICS env var > explicit param > SPACE_ID auto-detect. | |
| """ | |
| env = os.environ.get("USAGE_ANALYTICS", "").strip().lower() | |
| if env in ("0", "false", "off"): | |
| return False | |
| if env in ("1", "true", "on"): | |
| return True | |
| if explicit is not None: | |
| return explicit | |
| return bool(os.environ.get("SPACE_ID")) | |