File size: 4,057 Bytes
00c6951 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 | """
Supabase structured logger with per-request user context.
Usage:
from supabase_logger import init_logger, log_event, set_user_context, get_user_context, set_request_source
# At startup
init_logger(supabase_client)
# In middleware / auth
set_user_context({"user_id": "...", "email": "..."})
set_request_source({"source_domain": "dlpo-mbok-dev.hf.space"})
# Anywhere
log_event("user", "click_step01", session_id="abc123", metadata={"force": False})
"""
from __future__ import annotations
import threading
import traceback
from contextvars import ContextVar
from datetime import datetime, timezone
from typing import Any, Dict, Optional
from supabase import Client
_supabase: Optional[Client] = None
_current_user: ContextVar[Optional[Dict[str, Any]]] = ContextVar(
"current_user", default=None
)
_request_source: ContextVar[Optional[Dict[str, str]]] = ContextVar(
"request_source", default=None
)
# Domains whose events are never persisted (internal HF proxy health checks etc.)
_BLOCKED_SOURCE_DOMAINS = {
"proxy.spaces.internal.huggingface.tech",
}
def init_logger(client: Client) -> None:
global _supabase
_supabase = client
def set_user_context(user: Optional[Dict[str, Any]]) -> None:
_current_user.set(user)
def get_user_context() -> Optional[Dict[str, Any]]:
return _current_user.get()
def set_request_source(source: Optional[Dict[str, str]]) -> None:
"""リクエストの流入元情報をセット(FastAPI middleware から呼ぶ)"""
_request_source.set(source)
def log_event(
event_type: str,
message: str,
*,
level: str = "INFO",
metadata: Optional[Dict[str, Any]] = None,
user_override: Optional[Dict[str, Any]] = None,
session_id: Optional[str] = None,
source: Optional[Dict[str, str]] = None,
) -> None:
"""
Insert a structured log row into Supabase ``activity_logs``.
Source resolution priority:
1. Explicit ``source`` argument
2. ContextVar set by middleware (``set_request_source``)
3. ``source_domain`` key inside ``metadata``
If the resolved ``source_domain`` is an internal proxy domain the event is
silently dropped (not persisted).
Falls back to stdout if the insert fails so the main request is
never blocked by a logging error.
"""
user = user_override or _current_user.get()
# --- source resolution ---
clean_meta: Dict[str, Any] = dict(metadata) if metadata else {}
resolved_source: Dict[str, Any] = {}
if source:
resolved_source = source
elif _request_source.get():
resolved_source = _request_source.get() # type: ignore[assignment]
else:
# Fall back to key embedded in metadata
dm = clean_meta.pop("source_domain", None)
if dm:
resolved_source = {"source_domain": dm}
# Strip source_domain from metadata to avoid duplication
clean_meta.pop("source_domain", None)
# Also strip legacy source_channel if still present in older metadata
clean_meta.pop("source_channel", None)
source_domain = resolved_source.get("source_domain")
# Drop events from internal proxy domains
if source_domain in _BLOCKED_SOURCE_DOMAINS:
return
row = {
"event_type": event_type,
"level": level,
"message": message,
"user_id": user.get("user_id") if user else None,
"user_email": user.get("email") if user else None,
"session_id": session_id,
"source_domain": source_domain,
"metadata": clean_meta,
"created_at": datetime.now(timezone.utc).isoformat(),
}
def _insert():
try:
if _supabase is not None:
_supabase.table("activity_logs").insert(row).execute()
except Exception:
print(f"[SUPABASE_LOG_ERROR] insert failed: {traceback.format_exc()}")
print(f"[SUPABASE_LOG_ERROR] row: {row}")
# Fire-and-forget so the request is never delayed
threading.Thread(target=_insert, daemon=True).start()
|