Spaces:
Running
on
Zero
Running
on
Zero
| """ | |
| Usage logger that pushes to a HF Dataset repo. | |
| Uses a ParquetScheduler (subclass of CommitScheduler) to buffer rows in memory | |
| and periodically write+upload parquet files with embedded audio to the Hub. | |
| Error logs use a separate CommitScheduler with JSONL files. | |
| Falls back to local-only logging if schedulers can't initialize. | |
| Scheduler creation is deferred to first use so that background threads don't | |
| interfere with ZeroGPU's startup function scan. | |
| """ | |
| import hashlib | |
| import io | |
| import json | |
| import tempfile | |
| import threading | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple, Union | |
| from uuid import uuid4 | |
| import numpy as np | |
| # ========================================================================= | |
| # Directory setup | |
| # ========================================================================= | |
| LOG_DIR = Path("usage_logs") | |
| LOG_DIR.mkdir(parents=True, exist_ok=True) | |
| ERROR_DIR = LOG_DIR / "errors" | |
| ERROR_DIR.mkdir(parents=True, exist_ok=True) | |
| # UUID-suffixed error log to avoid collision across Space restarts | |
| ERROR_LOG_PATH = ERROR_DIR / f"error_log-{uuid4()}.jsonl" | |
| # ========================================================================= | |
| # ParquetScheduler class definition (no instances created at import time) | |
| # ========================================================================= | |
| _HAS_DEPS = False | |
| try: | |
| import pyarrow as pa | |
| import pyarrow.parquet as pq | |
| from huggingface_hub import CommitScheduler | |
| from config import USAGE_LOG_DATASET_REPO, USAGE_LOG_PUSH_INTERVAL_MINUTES | |
| _HAS_DEPS = True | |
| except Exception: | |
| pass | |
| # Schema for the datasets library (embedded in parquet metadata) | |
| _RECITATION_SCHEMA: Dict[str, Dict[str, str]] = { | |
| "audio": {"_type": "Audio"}, | |
| "timestamp": {"_type": "Value", "dtype": "string"}, | |
| "user_id": {"_type": "Value", "dtype": "string"}, | |
| "verse_ref": {"_type": "Value", "dtype": "string"}, | |
| "canonical_text": {"_type": "Value", "dtype": "string"}, | |
| "segments": {"_type": "Value", "dtype": "string"}, | |
| "multi_model": {"_type": "Value", "dtype": "bool"}, | |
| "settings": {"_type": "Value", "dtype": "string"}, | |
| "vad_timestamps": {"_type": "Value", "dtype": "string"}, | |
| } | |
| if _HAS_DEPS: | |
| class ParquetScheduler(CommitScheduler): | |
| """Buffers rows in memory and uploads a parquet file each interval. | |
| Adapted from https://huggingface.co/spaces/Wauplin/space-to-dataset-parquet. | |
| Audio values are stored as file paths in the row dict; on push they are | |
| read as bytes and embedded in the parquet using the HF Audio struct. | |
| """ | |
| def __init__( | |
| self, | |
| *, | |
| repo_id: str, | |
| schema: Optional[Dict[str, Dict[str, str]]] = None, | |
| every: Union[int, float] = 5, | |
| path_in_repo: Optional[str] = "data", | |
| repo_type: Optional[str] = "dataset", | |
| private: bool = False, | |
| ) -> None: | |
| super().__init__( | |
| repo_id=repo_id, | |
| folder_path="dummy", # not used – we upload directly | |
| every=every, | |
| path_in_repo=path_in_repo, | |
| repo_type=repo_type, | |
| private=private, | |
| ) | |
| self._rows: List[Dict[str, Any]] = [] | |
| self._schema = schema | |
| def append(self, row: Dict[str, Any]) -> None: | |
| """Add a new row to be uploaded on the next push.""" | |
| with self.lock: | |
| self._rows.append(row) | |
| def push_to_hub(self) -> None: | |
| # Grab buffered rows | |
| with self.lock: | |
| rows = self._rows | |
| self._rows = [] | |
| if not rows: | |
| return | |
| print(f"[USAGE_LOG] Pushing {len(rows)} recitation row(s) to Hub.") | |
| schema: Dict[str, Dict] = dict(self._schema) if self._schema else {} | |
| paths_to_cleanup: List[Path] = [] | |
| for row in rows: | |
| for key, value in row.items(): | |
| # Infer schema if not predefined | |
| if key not in schema: | |
| schema[key] = _infer_schema(key, value) | |
| # Load audio/image binary data | |
| if value is not None and schema[key].get("_type") in ( | |
| "Image", | |
| "Audio", | |
| ): | |
| file_path = Path(value) | |
| if file_path.is_file(): | |
| row[key] = { | |
| "path": file_path.name, | |
| "bytes": file_path.read_bytes(), | |
| } | |
| paths_to_cleanup.append(file_path) | |
| else: | |
| row[key] = None | |
| # Fill missing features with None | |
| for row in rows: | |
| for feature in schema: | |
| if feature not in row: | |
| row[feature] = None | |
| # Build Arrow table with schema metadata | |
| table = pa.Table.from_pylist(rows) | |
| table = table.replace_schema_metadata( | |
| {"huggingface": json.dumps({"info": {"features": schema}})} | |
| ) | |
| # Write to temp parquet and upload | |
| archive = tempfile.NamedTemporaryFile(suffix=".parquet", delete=False) | |
| try: | |
| pq.write_table(table, archive.name) | |
| self.api.upload_file( | |
| repo_id=self.repo_id, | |
| repo_type=self.repo_type, | |
| revision=self.revision, | |
| path_in_repo=f"{self.path_in_repo}/{uuid4()}.parquet", | |
| path_or_fileobj=archive.name, | |
| ) | |
| print("[USAGE_LOG] Parquet commit completed.") | |
| except Exception as e: | |
| print(f"[USAGE_LOG] Failed to upload parquet: {e}") | |
| finally: | |
| archive.close() | |
| Path(archive.name).unlink(missing_ok=True) | |
| # Clean up temp audio files | |
| for path in paths_to_cleanup: | |
| path.unlink(missing_ok=True) | |
| def _infer_schema(key: str, value: Any) -> Dict[str, str]: | |
| """Infer HF datasets schema from a key/value pair.""" | |
| if "image" in key: | |
| return {"_type": "Image"} | |
| if "audio" in key: | |
| return {"_type": "Audio"} | |
| if isinstance(value, bool): | |
| return {"_type": "Value", "dtype": "bool"} | |
| if isinstance(value, int): | |
| return {"_type": "Value", "dtype": "int64"} | |
| if isinstance(value, float): | |
| return {"_type": "Value", "dtype": "float64"} | |
| if isinstance(value, bytes): | |
| return {"_type": "Value", "dtype": "binary"} | |
| return {"_type": "Value", "dtype": "string"} | |
| # ========================================================================= | |
| # Lazy scheduler initialization (deferred to first use) | |
| # ========================================================================= | |
| _recitation_scheduler = None | |
| _error_scheduler = None | |
| _schedulers_initialized = False | |
| _init_lock = threading.Lock() | |
| _fallback_lock = threading.Lock() | |
| def _ensure_schedulers() -> None: | |
| """Create scheduler instances on first call. Thread-safe.""" | |
| global _recitation_scheduler, _error_scheduler, _schedulers_initialized | |
| if _schedulers_initialized: | |
| return | |
| with _init_lock: | |
| if _schedulers_initialized: | |
| return | |
| _schedulers_initialized = True | |
| if not _HAS_DEPS: | |
| print("[USAGE_LOG] Dependencies missing (local-only mode).") | |
| return | |
| try: | |
| _recitation_scheduler = ParquetScheduler( | |
| repo_id=USAGE_LOG_DATASET_REPO, | |
| schema=_RECITATION_SCHEMA, | |
| every=USAGE_LOG_PUSH_INTERVAL_MINUTES, | |
| path_in_repo="data", | |
| repo_type="dataset", | |
| private=True, | |
| ) | |
| _error_scheduler = CommitScheduler( | |
| repo_id=USAGE_LOG_DATASET_REPO, | |
| repo_type="dataset", | |
| folder_path=ERROR_DIR, | |
| path_in_repo="data/errors", | |
| private=True, | |
| every=USAGE_LOG_PUSH_INTERVAL_MINUTES, | |
| ) | |
| except Exception as e: | |
| print(f"[USAGE_LOG] Scheduler init failed (local-only mode): {e}") | |
| # ========================================================================= | |
| # Helpers | |
| # ========================================================================= | |
| def _get_error_lock(): | |
| """Return the appropriate lock for error logging.""" | |
| _ensure_schedulers() | |
| if _error_scheduler is not None: | |
| return _error_scheduler.lock | |
| return _fallback_lock | |
| def get_user_id(request) -> str: | |
| """Get a pseudonymous user identifier from the request. | |
| Always returns a SHA-256 hash (truncated to 12 hex chars) to avoid | |
| storing personally identifiable information. Uses HF username for | |
| logged-in users, or IP + User-Agent for anonymous users. | |
| Returns "unknown" if the request object is unavailable. | |
| """ | |
| try: | |
| # Logged-in HF user: hash username | |
| username = getattr(request, "username", None) | |
| if username: | |
| return hashlib.sha256(username.encode()).hexdigest()[:12] | |
| # Anonymous: hash IP + User-Agent | |
| headers = request.headers | |
| ip = ( | |
| headers.get("x-forwarded-for", "").split(",")[0].strip() | |
| or headers.get("x-real-ip", "") | |
| or "" | |
| ) | |
| ua = headers.get("user-agent", "") | |
| raw = f"{ip}|{ua}" | |
| return hashlib.sha256(raw.encode()).hexdigest()[:12] | |
| except Exception: | |
| return "unknown" | |
| # ========================================================================= | |
| # Public logging API | |
| # ========================================================================= | |
| def log_error(user_id: str, verse_ref: str, error_message: str) -> None: | |
| """Log a technical error that occurred during analysis.""" | |
| try: | |
| with _get_error_lock(): | |
| with ERROR_LOG_PATH.open("a") as f: | |
| json.dump({ | |
| "timestamp": datetime.now().isoformat(), | |
| "user_id": user_id, | |
| "verse_ref": verse_ref or "", | |
| "error_message": error_message or "", | |
| }, f) | |
| f.write("\n") | |
| except Exception: | |
| pass | |
| def log_analysis( | |
| user_id: str, | |
| verse_ref: str, | |
| canonical_text: str, | |
| segments: List[dict], | |
| multi_model: bool = False, | |
| settings: Optional[dict] = None, | |
| audio: Optional[Tuple[int, np.ndarray]] = None, | |
| vad_timestamps: Optional[List[list]] = None, | |
| ) -> None: | |
| """Log a complete analysis run. | |
| Buffers the row for the next ParquetScheduler push. If audio is provided, | |
| it is encoded to FLAC in a temp file; the scheduler will embed the bytes | |
| in the parquet and clean up the file. | |
| Args: | |
| segments: List of dicts with ``segment_ref``, ``canonical_phonemes``, | |
| ``detected_phonemes``. | |
| audio: Optional (sample_rate, audio_array) tuple to embed. | |
| """ | |
| _ensure_schedulers() | |
| try: | |
| row: Dict[str, Any] = { | |
| "timestamp": datetime.now().isoformat(), | |
| "user_id": user_id, | |
| "verse_ref": verse_ref or "", | |
| "canonical_text": canonical_text or "", | |
| "segments": json.dumps(segments), | |
| "multi_model": multi_model, | |
| "settings": json.dumps(settings or {}), | |
| "vad_timestamps": json.dumps(vad_timestamps) if vad_timestamps else None, | |
| } | |
| # Encode audio to a temp FLAC file (scheduler reads bytes on push) | |
| if audio is not None: | |
| import soundfile as sf | |
| sample_rate, audio_array = audio | |
| ts = datetime.now().strftime("%Y%m%dT%H%M%S") | |
| safe_ref = (verse_ref or "unknown").replace(":", "-") | |
| filename = f"{ts}_{safe_ref}_{user_id}.flac" | |
| tmp_dir = LOG_DIR / "tmp_audio" | |
| tmp_dir.mkdir(parents=True, exist_ok=True) | |
| filepath = tmp_dir / filename | |
| sf.write(str(filepath), audio_array, sample_rate, format="FLAC") | |
| row["audio"] = str(filepath) | |
| if _recitation_scheduler is not None: | |
| _recitation_scheduler.append(row) | |
| else: | |
| # Local-only fallback: write JSONL | |
| fallback_path = LOG_DIR / "recitations_fallback.jsonl" | |
| with _fallback_lock: | |
| with fallback_path.open("a") as f: | |
| # Drop audio file path for JSONL fallback | |
| fallback_row = {k: v for k, v in row.items() if k != "audio"} | |
| json.dump(fallback_row, f) | |
| f.write("\n") | |
| except Exception as e: | |
| print(f"[USAGE_LOG] Failed to log analysis: {e}") | |