Spaces:
Sleeping
Sleeping
File size: 2,868 Bytes
e44e5dd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
from __future__ import annotations
from typing import Mapping
from backend.api.storage.rules_store import RulesStore
from backend.mcp_server.common.tenant import TenantContext
from backend.mcp_server.common.utils import ToolValidationError, tool_handler
_rules_store = RulesStore()
@tool_handler("admin.getRules")
async def admin_get_rules(context: TenantContext, payload: Mapping[str, object]) -> dict[str, object]:
"""
Return the active admin rules for the tenant.
"""
detailed = bool(payload.get("detailed", False))
rules = (
_rules_store.get_rules_detailed(context.tenant_id)
if detailed
else _rules_store.get_rules(context.tenant_id)
)
return {"tenant_id": context.tenant_id, "rules": rules, "detailed": detailed}
@tool_handler("admin.addRule")
async def admin_add_rule(context: TenantContext, payload: Mapping[str, object]) -> dict[str, object]:
"""
Add a new governance rule.
"""
rule_text = payload.get("rule")
if not isinstance(rule_text, str) or not rule_text.strip():
raise ToolValidationError("rule must be a non-empty string")
pattern = payload.get("pattern")
if pattern is not None and not isinstance(pattern, str):
raise ToolValidationError("pattern must be a string if provided")
severity = payload.get("severity", "medium")
if not isinstance(severity, str):
raise ToolValidationError("severity must be a string")
description = payload.get("description")
if description is not None and not isinstance(description, str):
raise ToolValidationError("description must be a string if provided")
enabled = bool(payload.get("enabled", True))
success = _rules_store.add_rule(
tenant_id=context.tenant_id,
rule=rule_text.strip(),
pattern=pattern.strip() if isinstance(pattern, str) and pattern.strip() else None,
severity=severity.strip(),
description=description.strip() if isinstance(description, str) and description.strip() else None,
enabled=enabled,
)
if not success:
raise ToolValidationError("rule already exists or could not be saved")
return {
"tenant_id": context.tenant_id,
"rule": rule_text.strip(),
"enabled": enabled,
}
@tool_handler("admin.deleteRule")
async def admin_delete_rule(context: TenantContext, payload: Mapping[str, object]) -> dict[str, object]:
"""
Delete an existing rule by its text value.
"""
rule_text = payload.get("rule")
if not isinstance(rule_text, str) or not rule_text.strip():
raise ToolValidationError("rule must be provided for deletion")
deleted = _rules_store.delete_rule(context.tenant_id, rule_text.strip())
return {
"tenant_id": context.tenant_id,
"rule": rule_text.strip(),
"deleted": deleted,
}
|