File size: 4,540 Bytes
e44e5dd
 
 
 
 
29116ed
 
 
e44e5dd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
29116ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e44e5dd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
29116ed
 
e44e5dd
29116ed
e44e5dd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
29116ed
 
e44e5dd
29116ed
e44e5dd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
29116ed
 
e44e5dd
29116ed
e44e5dd
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
from __future__ import annotations

import logging
import os
from typing import Any, Dict, Optional
from dotenv import load_dotenv

load_dotenv()

logger = logging.getLogger("integrachat.mcp")
if not logger.handlers:
    handler = logging.StreamHandler()
    formatter = logging.Formatter(
        "[%(asctime)s] %(levelname)s %(name)s - %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )
    handler.setFormatter(formatter)
    logger.addHandler(handler)

logger.setLevel(os.getenv("LOG_LEVEL", "INFO").upper())

try:
    from backend.api.storage.analytics_store import AnalyticsStore
except Exception:  # pragma: no cover - analytics storage is optional during tests
    AnalyticsStore = None  # type: ignore

_analytics_store: Optional["AnalyticsStore"] = None
_analytics_failed = False
_analytics_disabled = os.getenv("ANALYTICS_DISABLED", "").lower() in {"1", "true", "yes"}


def _get_analytics_store() -> Optional["AnalyticsStore"]:
    """
    Lazily create the analytics store so missing Supabase credentials or package
    do not prevent the MCP server from starting. When initialization fails we
    keep analytics disabled for the remainder of the process.
    """

    global _analytics_store, _analytics_failed

    if _analytics_disabled or _analytics_failed:
        return None

    if _analytics_store is not None:
        return _analytics_store

    if AnalyticsStore is None:
        _analytics_failed = True
        return None

    try:
        _analytics_store = AnalyticsStore()
    except RuntimeError as exc:
        logger.warning("Analytics disabled: %s", exc)
        _analytics_failed = True
        _analytics_store = None
    except Exception as exc:  # pragma: no cover - unexpected failures
        logger.debug("Unexpected analytics init failure: %s", exc)
        _analytics_failed = True
        _analytics_store = None

    return _analytics_store


def log_tool_usage(
    tool_name: str,
    tenant_id: Optional[str],
    *,
    success: bool,
    latency_ms: Optional[int] = None,
    metadata: Optional[Dict[str, Any]] = None,
    error_message: Optional[str] = None,
    user_id: Optional[str] = None,
):
    log_data = {
        "tool": tool_name,
        "tenant_id": tenant_id,
        "success": success,
        "latency_ms": latency_ms,
        "user_id": user_id,
        "metadata": metadata or {},
    }
    if error_message:
        log_data["error"] = error_message

    if success:
        logger.info("tool_completed %s", log_data)
    else:
        logger.warning("tool_failed %s", log_data)

    store = _get_analytics_store()
    if store and tenant_id:
        try:
            store.log_tool_usage(
                tenant_id=tenant_id,
                tool_name=tool_name,
                latency_ms=latency_ms,
                success=success,
                error_message=error_message,
                metadata=metadata,
                user_id=user_id,
            )
        except Exception as exc:  # pragma: no cover - analytics failures shouldn't crash tools
            logger.debug("analytics logging failed: %s", exc)


def log_rag_search_metrics(
    tenant_id: str,
    query: str,
    hits_count: int,
    avg_score: Optional[float],
    top_score: Optional[float],
    latency_ms: Optional[int] = None,
):
    store = _get_analytics_store()
    if store:
        try:
            store.log_rag_search(
                tenant_id=tenant_id,
                query=query,
                hits_count=hits_count,
                avg_score=avg_score,
                top_score=top_score,
                latency_ms=latency_ms,
            )
        except Exception as exc:  # pragma: no cover
            logger.debug("rag analytics logging failed: %s", exc)


def log_redflag_violation(
    tenant_id: str,
    rule_id: str,
    rule_pattern: str,
    severity: str,
    matched_text: str,
    *,
    confidence: Optional[float] = None,
    message_preview: Optional[str] = None,
    user_id: Optional[str] = None,
):
    store = _get_analytics_store()
    if store:
        try:
            store.log_redflag_violation(
                tenant_id=tenant_id,
                rule_id=rule_id,
                rule_pattern=rule_pattern,
                severity=severity,
                matched_text=matched_text,
                confidence=confidence,
                message_preview=message_preview,
                user_id=user_id,
            )
        except Exception as exc:  # pragma: no cover
            logger.debug("redflag logging failed: %s", exc)