File size: 4,057 Bytes
00c6951
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
"""
Supabase structured logger with per-request user context.

Usage:
    from supabase_logger import init_logger, log_event, set_user_context, get_user_context, set_request_source

    # At startup
    init_logger(supabase_client)

    # In middleware / auth
    set_user_context({"user_id": "...", "email": "..."})
    set_request_source({"source_domain": "dlpo-mbok-dev.hf.space"})

    # Anywhere
    log_event("user", "click_step01", session_id="abc123", metadata={"force": False})
"""

from __future__ import annotations

import threading
import traceback
from contextvars import ContextVar
from datetime import datetime, timezone
from typing import Any, Dict, Optional

from supabase import Client

_supabase: Optional[Client] = None

_current_user: ContextVar[Optional[Dict[str, Any]]] = ContextVar(
    "current_user", default=None
)
_request_source: ContextVar[Optional[Dict[str, str]]] = ContextVar(
    "request_source", default=None
)

# Domains whose events are never persisted (internal HF proxy health checks etc.)
_BLOCKED_SOURCE_DOMAINS = {
    "proxy.spaces.internal.huggingface.tech",
}


def init_logger(client: Client) -> None:
    global _supabase
    _supabase = client


def set_user_context(user: Optional[Dict[str, Any]]) -> None:
    _current_user.set(user)


def get_user_context() -> Optional[Dict[str, Any]]:
    return _current_user.get()


def set_request_source(source: Optional[Dict[str, str]]) -> None:
    """リクエストの流入元情報をセット(FastAPI middleware から呼ぶ)"""
    _request_source.set(source)


def log_event(
    event_type: str,
    message: str,
    *,
    level: str = "INFO",
    metadata: Optional[Dict[str, Any]] = None,
    user_override: Optional[Dict[str, Any]] = None,
    session_id: Optional[str] = None,
    source: Optional[Dict[str, str]] = None,
) -> None:
    """
    Insert a structured log row into Supabase ``activity_logs``.

    Source resolution priority:
      1. Explicit ``source`` argument
      2. ContextVar set by middleware (``set_request_source``)
      3. ``source_domain`` key inside ``metadata``

    If the resolved ``source_domain`` is an internal proxy domain the event is
    silently dropped (not persisted).

    Falls back to stdout if the insert fails so the main request is
    never blocked by a logging error.
    """
    user = user_override or _current_user.get()

    # --- source resolution ---
    clean_meta: Dict[str, Any] = dict(metadata) if metadata else {}
    resolved_source: Dict[str, Any] = {}

    if source:
        resolved_source = source
    elif _request_source.get():
        resolved_source = _request_source.get()  # type: ignore[assignment]
    else:
        # Fall back to key embedded in metadata
        dm = clean_meta.pop("source_domain", None)
        if dm:
            resolved_source = {"source_domain": dm}

    # Strip source_domain from metadata to avoid duplication
    clean_meta.pop("source_domain", None)
    # Also strip legacy source_channel if still present in older metadata
    clean_meta.pop("source_channel", None)

    source_domain = resolved_source.get("source_domain")

    # Drop events from internal proxy domains
    if source_domain in _BLOCKED_SOURCE_DOMAINS:
        return

    row = {
        "event_type": event_type,
        "level": level,
        "message": message,
        "user_id": user.get("user_id") if user else None,
        "user_email": user.get("email") if user else None,
        "session_id": session_id,
        "source_domain": source_domain,
        "metadata": clean_meta,
        "created_at": datetime.now(timezone.utc).isoformat(),
    }

    def _insert():
        try:
            if _supabase is not None:
                _supabase.table("activity_logs").insert(row).execute()
        except Exception:
            print(f"[SUPABASE_LOG_ERROR] insert failed: {traceback.format_exc()}")
            print(f"[SUPABASE_LOG_ERROR] row: {row}")

    # Fire-and-forget so the request is never delayed
    threading.Thread(target=_insert, daemon=True).start()