""" Supabase structured logger with per-request user context. Usage: from supabase_logger import init_logger, log_event, set_user_context, get_user_context, set_request_source # At startup init_logger(supabase_client) # In middleware / auth set_user_context({"user_id": "...", "email": "..."}) set_request_source({"source_domain": "dlpo-mbok-dev.hf.space"}) # Anywhere log_event("user", "click_step01", session_id="abc123", metadata={"force": False}) """ from __future__ import annotations import threading import traceback from contextvars import ContextVar from datetime import datetime, timezone from typing import Any, Dict, Optional from supabase import Client _supabase: Optional[Client] = None _current_user: ContextVar[Optional[Dict[str, Any]]] = ContextVar( "current_user", default=None ) _request_source: ContextVar[Optional[Dict[str, str]]] = ContextVar( "request_source", default=None ) # Domains whose events are never persisted (internal HF proxy health checks etc.) _BLOCKED_SOURCE_DOMAINS = { "proxy.spaces.internal.huggingface.tech", } def init_logger(client: Client) -> None: global _supabase _supabase = client def set_user_context(user: Optional[Dict[str, Any]]) -> None: _current_user.set(user) def get_user_context() -> Optional[Dict[str, Any]]: return _current_user.get() def set_request_source(source: Optional[Dict[str, str]]) -> None: """リクエストの流入元情報をセット(FastAPI middleware から呼ぶ)""" _request_source.set(source) def log_event( event_type: str, message: str, *, level: str = "INFO", metadata: Optional[Dict[str, Any]] = None, user_override: Optional[Dict[str, Any]] = None, session_id: Optional[str] = None, source: Optional[Dict[str, str]] = None, ) -> None: """ Insert a structured log row into Supabase ``activity_logs``. Source resolution priority: 1. Explicit ``source`` argument 2. ContextVar set by middleware (``set_request_source``) 3. ``source_domain`` key inside ``metadata`` If the resolved ``source_domain`` is an internal proxy domain the event is silently dropped (not persisted). Falls back to stdout if the insert fails so the main request is never blocked by a logging error. """ user = user_override or _current_user.get() # --- source resolution --- clean_meta: Dict[str, Any] = dict(metadata) if metadata else {} resolved_source: Dict[str, Any] = {} if source: resolved_source = source elif _request_source.get(): resolved_source = _request_source.get() # type: ignore[assignment] else: # Fall back to key embedded in metadata dm = clean_meta.pop("source_domain", None) if dm: resolved_source = {"source_domain": dm} # Strip source_domain from metadata to avoid duplication clean_meta.pop("source_domain", None) # Also strip legacy source_channel if still present in older metadata clean_meta.pop("source_channel", None) source_domain = resolved_source.get("source_domain") # Drop events from internal proxy domains if source_domain in _BLOCKED_SOURCE_DOMAINS: return row = { "event_type": event_type, "level": level, "message": message, "user_id": user.get("user_id") if user else None, "user_email": user.get("email") if user else None, "session_id": session_id, "source_domain": source_domain, "metadata": clean_meta, "created_at": datetime.now(timezone.utc).isoformat(), } def _insert(): try: if _supabase is not None: _supabase.table("activity_logs").insert(row).execute() except Exception: print(f"[SUPABASE_LOG_ERROR] insert failed: {traceback.format_exc()}") print(f"[SUPABASE_LOG_ERROR] row: {row}") # Fire-and-forget so the request is never delayed threading.Thread(target=_insert, daemon=True).start()