File size: 2,338 Bytes
e44e5dd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from __future__ import annotations

from typing import Mapping, Optional

from backend.mcp_server.common.logging import log_redflag_violation
from backend.mcp_server.common.tenant import TenantContext
from backend.mcp_server.common.utils import ToolValidationError, tool_handler


@tool_handler("admin.logViolation")
async def log_violation(context: TenantContext, payload: Mapping[str, object]) -> dict[str, object]:
    """
    Persist a red-flag violation for analytics and auditing.
    """

    rule_id = payload.get("rule_id") or payload.get("ruleId")
    if not isinstance(rule_id, str) or not rule_id.strip():
        raise ToolValidationError("rule_id must be provided")

    rule_pattern = payload.get("rule_pattern") or payload.get("rulePattern") or rule_id
    if not isinstance(rule_pattern, str):
        raise ToolValidationError("rule_pattern must be a string")

    severity = payload.get("severity", "medium")
    if not isinstance(severity, str):
        raise ToolValidationError("severity must be a string")

    matched_text = payload.get("matched_text") or payload.get("matchedText")
    if not isinstance(matched_text, str) or not matched_text.strip():
        raise ToolValidationError("matched_text is required")

    confidence = payload.get("confidence")
    if confidence is not None:
        try:
            confidence_value: Optional[float] = float(confidence)
        except (TypeError, ValueError):
            raise ToolValidationError("confidence must be numeric")
    else:
        confidence_value = None

    message_preview = payload.get("message_preview") or payload.get("messagePreview")
    if message_preview is not None and not isinstance(message_preview, str):
        raise ToolValidationError("message_preview must be a string if provided")

    log_redflag_violation(
        tenant_id=context.tenant_id,
        rule_id=rule_id.strip(),
        rule_pattern=rule_pattern.strip(),
        severity=severity.strip(),
        matched_text=matched_text.strip(),
        confidence=confidence_value,
        message_preview=message_preview.strip() if isinstance(message_preview, str) else None,
        user_id=context.user_id,
    )

    return {
        "tenant_id": context.tenant_id,
        "rule_id": rule_id.strip(),
        "severity": severity.strip(),
        "logged": True,
    }