mbok_dev / supabase_logger.py
Renecto's picture
Add structured Supabase logging with per-request user context
3291d37 verified
raw
history blame
2.33 kB
"""
Supabase structured logger with per-request user context.
Usage:
from supabase_logger import init_logger, log_event, set_user_context, get_user_context
# At startup
init_logger(supabase_client)
# In middleware / auth
set_user_context({"user_id": "...", "email": "...", "role": "..."})
# Anywhere
log_event("step_executed", "Step01 completed", metadata={"session_id": "abc"})
"""
from __future__ import annotations
import threading
import traceback
from contextvars import ContextVar
from datetime import datetime, timezone
from typing import Any, Dict, Optional
from supabase import Client
_supabase: Optional[Client] = None
_current_user: ContextVar[Optional[Dict[str, Any]]] = ContextVar(
"current_user", default=None
)
def init_logger(client: Client) -> None:
global _supabase
_supabase = client
def set_user_context(user: Optional[Dict[str, Any]]) -> None:
_current_user.set(user)
def get_user_context() -> Optional[Dict[str, Any]]:
return _current_user.get()
def log_event(
event_type: str,
message: str,
*,
level: str = "INFO",
metadata: Optional[Dict[str, Any]] = None,
user_override: Optional[Dict[str, Any]] = None,
) -> None:
"""
Insert a structured log row into Supabase ``app_logs``.
Falls back to stdout if the insert fails so the main request is
never blocked by a logging error.
"""
user = user_override or _current_user.get()
row = {
"event_type": event_type,
"level": level,
"message": message,
"user_id": user.get("user_id") if user else None,
"user_email": user.get("email") if user else None,
"user_role": user.get("role") if user else None,
"metadata": metadata or {},
"created_at": datetime.now(timezone.utc).isoformat(),
}
def _insert():
try:
if _supabase is not None:
_supabase.table("app_logs").insert(row).execute()
except Exception:
print(f"[SUPABASE_LOG_ERROR] insert failed: {traceback.format_exc()}")
print(f"[SUPABASE_LOG_ERROR] row: {row}")
# Fire-and-forget so the request is never delayed
threading.Thread(target=_insert, daemon=True).start()