| """ |
| Supabase structured logger with per-request user context. |
| |
| Usage: |
| from supabase_logger import init_logger, log_event, set_user_context, get_user_context, set_request_source |
| |
| # At startup |
| init_logger(supabase_client) |
| |
| # In middleware / auth |
| set_user_context({"user_id": "...", "email": "..."}) |
| set_request_source({"source_domain": "dlpo-mbok-dev.hf.space"}) |
| |
| # Anywhere |
| log_event("user", "click_step01", session_id="abc123", metadata={"force": False}) |
| """ |
|
|
| from __future__ import annotations |
|
|
| import threading |
| import traceback |
| from contextvars import ContextVar |
| from datetime import datetime, timezone |
| from typing import Any, Dict, Optional |
|
|
| from supabase import Client |
|
|
| _supabase: Optional[Client] = None |
|
|
| _current_user: ContextVar[Optional[Dict[str, Any]]] = ContextVar( |
| "current_user", default=None |
| ) |
| _request_source: ContextVar[Optional[Dict[str, str]]] = ContextVar( |
| "request_source", default=None |
| ) |
|
|
| |
| _BLOCKED_SOURCE_DOMAINS = { |
| "proxy.spaces.internal.huggingface.tech", |
| } |
|
|
|
|
| def init_logger(client: Client) -> None: |
| global _supabase |
| _supabase = client |
|
|
|
|
| def set_user_context(user: Optional[Dict[str, Any]]) -> None: |
| _current_user.set(user) |
|
|
|
|
| def get_user_context() -> Optional[Dict[str, Any]]: |
| return _current_user.get() |
|
|
|
|
| def set_request_source(source: Optional[Dict[str, str]]) -> None: |
| """リクエストの流入元情報をセット(FastAPI middleware から呼ぶ)""" |
| _request_source.set(source) |
|
|
|
|
| def log_event( |
| event_type: str, |
| message: str, |
| *, |
| level: str = "INFO", |
| metadata: Optional[Dict[str, Any]] = None, |
| user_override: Optional[Dict[str, Any]] = None, |
| session_id: Optional[str] = None, |
| source: Optional[Dict[str, str]] = None, |
| ) -> None: |
| """ |
| Insert a structured log row into Supabase ``activity_logs``. |
| |
| Source resolution priority: |
| 1. Explicit ``source`` argument |
| 2. ContextVar set by middleware (``set_request_source``) |
| 3. ``source_domain`` key inside ``metadata`` |
| |
| If the resolved ``source_domain`` is an internal proxy domain the event is |
| silently dropped (not persisted). |
| |
| Falls back to stdout if the insert fails so the main request is |
| never blocked by a logging error. |
| """ |
| user = user_override or _current_user.get() |
|
|
| |
| clean_meta: Dict[str, Any] = dict(metadata) if metadata else {} |
| resolved_source: Dict[str, Any] = {} |
|
|
| if source: |
| resolved_source = source |
| elif _request_source.get(): |
| resolved_source = _request_source.get() |
| else: |
| |
| dm = clean_meta.pop("source_domain", None) |
| if dm: |
| resolved_source = {"source_domain": dm} |
|
|
| |
| clean_meta.pop("source_domain", None) |
| |
| clean_meta.pop("source_channel", None) |
|
|
| source_domain = resolved_source.get("source_domain") |
|
|
| |
| if source_domain in _BLOCKED_SOURCE_DOMAINS: |
| return |
|
|
| row = { |
| "event_type": event_type, |
| "level": level, |
| "message": message, |
| "user_id": user.get("user_id") if user else None, |
| "user_email": user.get("email") if user else None, |
| "session_id": session_id, |
| "source_domain": source_domain, |
| "metadata": clean_meta, |
| "created_at": datetime.now(timezone.utc).isoformat(), |
| } |
|
|
| def _insert(): |
| try: |
| if _supabase is not None: |
| _supabase.table("activity_logs").insert(row).execute() |
| except Exception: |
| print(f"[SUPABASE_LOG_ERROR] insert failed: {traceback.format_exc()}") |
| print(f"[SUPABASE_LOG_ERROR] row: {row}") |
|
|
| |
| threading.Thread(target=_insert, daemon=True).start() |
|
|