nothingworry's picture
Update the backend
e44e5dd
raw
history blame
2.34 kB
from __future__ import annotations
from typing import Mapping, Optional
from backend.mcp_server.common.logging import log_redflag_violation
from backend.mcp_server.common.tenant import TenantContext
from backend.mcp_server.common.utils import ToolValidationError, tool_handler
@tool_handler("admin.logViolation")
async def log_violation(context: TenantContext, payload: Mapping[str, object]) -> dict[str, object]:
"""
Persist a red-flag violation for analytics and auditing.
"""
rule_id = payload.get("rule_id") or payload.get("ruleId")
if not isinstance(rule_id, str) or not rule_id.strip():
raise ToolValidationError("rule_id must be provided")
rule_pattern = payload.get("rule_pattern") or payload.get("rulePattern") or rule_id
if not isinstance(rule_pattern, str):
raise ToolValidationError("rule_pattern must be a string")
severity = payload.get("severity", "medium")
if not isinstance(severity, str):
raise ToolValidationError("severity must be a string")
matched_text = payload.get("matched_text") or payload.get("matchedText")
if not isinstance(matched_text, str) or not matched_text.strip():
raise ToolValidationError("matched_text is required")
confidence = payload.get("confidence")
if confidence is not None:
try:
confidence_value: Optional[float] = float(confidence)
except (TypeError, ValueError):
raise ToolValidationError("confidence must be numeric")
else:
confidence_value = None
message_preview = payload.get("message_preview") or payload.get("messagePreview")
if message_preview is not None and not isinstance(message_preview, str):
raise ToolValidationError("message_preview must be a string if provided")
log_redflag_violation(
tenant_id=context.tenant_id,
rule_id=rule_id.strip(),
rule_pattern=rule_pattern.strip(),
severity=severity.strip(),
matched_text=matched_text.strip(),
confidence=confidence_value,
message_preview=message_preview.strip() if isinstance(message_preview, str) else None,
user_id=context.user_id,
)
return {
"tenant_id": context.tenant_id,
"rule_id": rule_id.strip(),
"severity": severity.strip(),
"logged": True,
}