File size: 3,353 Bytes
e44e5dd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from __future__ import annotations

import logging
import os
from typing import Any, Dict, Optional

logger = logging.getLogger("integrachat.mcp")
if not logger.handlers:
    handler = logging.StreamHandler()
    formatter = logging.Formatter(
        "[%(asctime)s] %(levelname)s %(name)s - %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )
    handler.setFormatter(formatter)
    logger.addHandler(handler)

logger.setLevel(os.getenv("LOG_LEVEL", "INFO").upper())

try:
    from backend.api.storage.analytics_store import AnalyticsStore
except Exception:  # pragma: no cover - analytics storage is optional during tests
    AnalyticsStore = None  # type: ignore
    _analytics_store = None
else:
    _analytics_store = AnalyticsStore()


def log_tool_usage(
    tool_name: str,
    tenant_id: Optional[str],
    *,
    success: bool,
    latency_ms: Optional[int] = None,
    metadata: Optional[Dict[str, Any]] = None,
    error_message: Optional[str] = None,
    user_id: Optional[str] = None,
):
    log_data = {
        "tool": tool_name,
        "tenant_id": tenant_id,
        "success": success,
        "latency_ms": latency_ms,
        "user_id": user_id,
        "metadata": metadata or {},
    }
    if error_message:
        log_data["error"] = error_message

    if success:
        logger.info("tool_completed %s", log_data)
    else:
        logger.warning("tool_failed %s", log_data)

    if _analytics_store and tenant_id:
        try:
            _analytics_store.log_tool_usage(
                tenant_id=tenant_id,
                tool_name=tool_name,
                latency_ms=latency_ms,
                success=success,
                error_message=error_message,
                metadata=metadata,
                user_id=user_id,
            )
        except Exception as exc:  # pragma: no cover - analytics failures shouldn't crash tools
            logger.debug("analytics logging failed: %s", exc)


def log_rag_search_metrics(
    tenant_id: str,
    query: str,
    hits_count: int,
    avg_score: Optional[float],
    top_score: Optional[float],
    latency_ms: Optional[int] = None,
):
    if _analytics_store:
        try:
            _analytics_store.log_rag_search(
                tenant_id=tenant_id,
                query=query,
                hits_count=hits_count,
                avg_score=avg_score,
                top_score=top_score,
                latency_ms=latency_ms,
            )
        except Exception as exc:  # pragma: no cover
            logger.debug("rag analytics logging failed: %s", exc)


def log_redflag_violation(
    tenant_id: str,
    rule_id: str,
    rule_pattern: str,
    severity: str,
    matched_text: str,
    *,
    confidence: Optional[float] = None,
    message_preview: Optional[str] = None,
    user_id: Optional[str] = None,
):
    if _analytics_store:
        try:
            _analytics_store.log_redflag_violation(
                tenant_id=tenant_id,
                rule_id=rule_id,
                rule_pattern=rule_pattern,
                severity=severity,
                matched_text=matched_text,
                confidence=confidence,
                message_preview=message_preview,
                user_id=user_id,
            )
        except Exception as exc:  # pragma: no cover
            logger.debug("redflag logging failed: %s", exc)