import os import json import threading from datetime import datetime from pathlib import Path from typing import Optional from huggingface_hub import HfApi, upload_file, hf_hub_download class StatsManager: STATS_FILENAME = "stats.json" WRITE_INTERVAL_SECONDS = 60 def __init__(self, dataset_repo_id: str, hf_token: Optional[str] = None, local_backup: bool = True): self.dataset_repo_id = dataset_repo_id self.hf_token = hf_token or os.environ.get("HF_TOKEN") self.local_backup = local_backup self._lock = threading.Lock() self.api = HfApi(token=self.hf_token) if (self.dataset_repo_id and self.hf_token) else None if os.environ.get("SPACE_ID"): self.local_dir = Path("/tmp/feedback_data") else: self.local_dir = Path(__file__).parent / "feedback_data" self.local_dir.mkdir(exist_ok=True, parents=True) self.local_file = self.local_dir / self.STATS_FILENAME self._stats = self._load_stats() self._last_hf_write = datetime.min def _load_stats(self) -> dict: if self.api and self.dataset_repo_id: try: local_path = hf_hub_download( repo_id=self.dataset_repo_id, filename=self.STATS_FILENAME, repo_type="dataset", token=self.hf_token, force_download=True, cache_dir=str(self.local_dir) ) with open(local_path, "r", encoding="utf-8") as f: return json.load(f) except Exception as e: if "404" not in str(e): print(f"[StatsManager] HF load warning: {e}") if self.local_file.exists(): try: with open(self.local_file, "r", encoding="utf-8") as f: return json.load(f) except Exception as e: print(f"[StatsManager] Local load error: {e}") now = datetime.now().isoformat() return {"visit_count": 0, "detection_count": 0, "last_updated": now, "created_at": now} def _write_to_local(self): try: with open(self.local_file, "w", encoding="utf-8") as f: json.dump(self._stats, f, indent=2) except Exception as e: print(f"[StatsManager] Local write error: {e}") def _upload_to_hf(self): if not (self.api and self.dataset_repo_id): return try: upload_file( path_or_fileobj=str(self.local_file), path_in_repo=self.STATS_FILENAME, repo_id=self.dataset_repo_id, repo_type="dataset", token=self.hf_token, commit_message="stats: visits={} detections={}".format( self._stats["visit_count"], self._stats["detection_count"] ) ) except Exception as e: print(f"[StatsManager] HF upload error (non-fatal): {e}") def _maybe_flush(self): self._stats["last_updated"] = datetime.now().isoformat() self._write_to_local() elapsed = (datetime.now() - self._last_hf_write).total_seconds() if elapsed >= self.WRITE_INTERVAL_SECONDS: self._upload_to_hf() self._last_hf_write = datetime.now() def increment_visit(self): with self._lock: self._stats["visit_count"] += 1 self._maybe_flush() def increment_detection(self): with self._lock: self._stats["detection_count"] += 1 self._maybe_flush() @property def visit_count(self) -> int: return self._stats.get("visit_count", 0) @property def detection_count(self) -> int: return self._stats.get("detection_count", 0)