mbok_dev / supabase_logger.py
Renecto's picture
feat: add /reset-password page
00c6951 verified
"""
Supabase structured logger with per-request user context.
Usage:
from supabase_logger import init_logger, log_event, set_user_context, get_user_context, set_request_source
# At startup
init_logger(supabase_client)
# In middleware / auth
set_user_context({"user_id": "...", "email": "..."})
set_request_source({"source_domain": "dlpo-mbok-dev.hf.space"})
# Anywhere
log_event("user", "click_step01", session_id="abc123", metadata={"force": False})
"""
from __future__ import annotations
import threading
import traceback
from contextvars import ContextVar
from datetime import datetime, timezone
from typing import Any, Dict, Optional
from supabase import Client
_supabase: Optional[Client] = None
_current_user: ContextVar[Optional[Dict[str, Any]]] = ContextVar(
"current_user", default=None
)
_request_source: ContextVar[Optional[Dict[str, str]]] = ContextVar(
"request_source", default=None
)
# Domains whose events are never persisted (internal HF proxy health checks etc.)
_BLOCKED_SOURCE_DOMAINS = {
"proxy.spaces.internal.huggingface.tech",
}
def init_logger(client: Client) -> None:
global _supabase
_supabase = client
def set_user_context(user: Optional[Dict[str, Any]]) -> None:
_current_user.set(user)
def get_user_context() -> Optional[Dict[str, Any]]:
return _current_user.get()
def set_request_source(source: Optional[Dict[str, str]]) -> None:
"""リクエストの流入元情報をセット(FastAPI middleware から呼ぶ)"""
_request_source.set(source)
def log_event(
event_type: str,
message: str,
*,
level: str = "INFO",
metadata: Optional[Dict[str, Any]] = None,
user_override: Optional[Dict[str, Any]] = None,
session_id: Optional[str] = None,
source: Optional[Dict[str, str]] = None,
) -> None:
"""
Insert a structured log row into Supabase ``activity_logs``.
Source resolution priority:
1. Explicit ``source`` argument
2. ContextVar set by middleware (``set_request_source``)
3. ``source_domain`` key inside ``metadata``
If the resolved ``source_domain`` is an internal proxy domain the event is
silently dropped (not persisted).
Falls back to stdout if the insert fails so the main request is
never blocked by a logging error.
"""
user = user_override or _current_user.get()
# --- source resolution ---
clean_meta: Dict[str, Any] = dict(metadata) if metadata else {}
resolved_source: Dict[str, Any] = {}
if source:
resolved_source = source
elif _request_source.get():
resolved_source = _request_source.get() # type: ignore[assignment]
else:
# Fall back to key embedded in metadata
dm = clean_meta.pop("source_domain", None)
if dm:
resolved_source = {"source_domain": dm}
# Strip source_domain from metadata to avoid duplication
clean_meta.pop("source_domain", None)
# Also strip legacy source_channel if still present in older metadata
clean_meta.pop("source_channel", None)
source_domain = resolved_source.get("source_domain")
# Drop events from internal proxy domains
if source_domain in _BLOCKED_SOURCE_DOMAINS:
return
row = {
"event_type": event_type,
"level": level,
"message": message,
"user_id": user.get("user_id") if user else None,
"user_email": user.get("email") if user else None,
"session_id": session_id,
"source_domain": source_domain,
"metadata": clean_meta,
"created_at": datetime.now(timezone.utc).isoformat(),
}
def _insert():
try:
if _supabase is not None:
_supabase.table("activity_logs").insert(row).execute()
except Exception:
print(f"[SUPABASE_LOG_ERROR] insert failed: {traceback.format_exc()}")
print(f"[SUPABASE_LOG_ERROR] row: {row}")
# Fire-and-forget so the request is never delayed
threading.Thread(target=_insert, daemon=True).start()