Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| from typing import Mapping, Optional | |
| from backend.mcp_server.common.logging import log_redflag_violation | |
| from backend.mcp_server.common.tenant import TenantContext | |
| from backend.mcp_server.common.utils import ToolValidationError, tool_handler | |
| async def log_violation(context: TenantContext, payload: Mapping[str, object]) -> dict[str, object]: | |
| """ | |
| Persist a red-flag violation for analytics and auditing. | |
| """ | |
| rule_id = payload.get("rule_id") or payload.get("ruleId") | |
| if not isinstance(rule_id, str) or not rule_id.strip(): | |
| raise ToolValidationError("rule_id must be provided") | |
| rule_pattern = payload.get("rule_pattern") or payload.get("rulePattern") or rule_id | |
| if not isinstance(rule_pattern, str): | |
| raise ToolValidationError("rule_pattern must be a string") | |
| severity = payload.get("severity", "medium") | |
| if not isinstance(severity, str): | |
| raise ToolValidationError("severity must be a string") | |
| matched_text = payload.get("matched_text") or payload.get("matchedText") | |
| if not isinstance(matched_text, str) or not matched_text.strip(): | |
| raise ToolValidationError("matched_text is required") | |
| confidence = payload.get("confidence") | |
| if confidence is not None: | |
| try: | |
| confidence_value: Optional[float] = float(confidence) | |
| except (TypeError, ValueError): | |
| raise ToolValidationError("confidence must be numeric") | |
| else: | |
| confidence_value = None | |
| message_preview = payload.get("message_preview") or payload.get("messagePreview") | |
| if message_preview is not None and not isinstance(message_preview, str): | |
| raise ToolValidationError("message_preview must be a string if provided") | |
| log_redflag_violation( | |
| tenant_id=context.tenant_id, | |
| rule_id=rule_id.strip(), | |
| rule_pattern=rule_pattern.strip(), | |
| severity=severity.strip(), | |
| matched_text=matched_text.strip(), | |
| confidence=confidence_value, | |
| message_preview=message_preview.strip() if isinstance(message_preview, str) else None, | |
| user_id=context.user_id, | |
| ) | |
| return { | |
| "tenant_id": context.tenant_id, | |
| "rule_id": rule_id.strip(), | |
| "severity": severity.strip(), | |
| "logged": True, | |
| } | |