File size: 2,868 Bytes
e44e5dd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from __future__ import annotations

from typing import Mapping

from backend.api.storage.rules_store import RulesStore
from backend.mcp_server.common.tenant import TenantContext
from backend.mcp_server.common.utils import ToolValidationError, tool_handler

_rules_store = RulesStore()


@tool_handler("admin.getRules")
async def admin_get_rules(context: TenantContext, payload: Mapping[str, object]) -> dict[str, object]:
    """
    Return the active admin rules for the tenant.
    """

    detailed = bool(payload.get("detailed", False))
    rules = (
        _rules_store.get_rules_detailed(context.tenant_id)
        if detailed
        else _rules_store.get_rules(context.tenant_id)
    )
    return {"tenant_id": context.tenant_id, "rules": rules, "detailed": detailed}


@tool_handler("admin.addRule")
async def admin_add_rule(context: TenantContext, payload: Mapping[str, object]) -> dict[str, object]:
    """
    Add a new governance rule.
    """

    rule_text = payload.get("rule")
    if not isinstance(rule_text, str) or not rule_text.strip():
        raise ToolValidationError("rule must be a non-empty string")

    pattern = payload.get("pattern")
    if pattern is not None and not isinstance(pattern, str):
        raise ToolValidationError("pattern must be a string if provided")

    severity = payload.get("severity", "medium")
    if not isinstance(severity, str):
        raise ToolValidationError("severity must be a string")

    description = payload.get("description")
    if description is not None and not isinstance(description, str):
        raise ToolValidationError("description must be a string if provided")

    enabled = bool(payload.get("enabled", True))

    success = _rules_store.add_rule(
        tenant_id=context.tenant_id,
        rule=rule_text.strip(),
        pattern=pattern.strip() if isinstance(pattern, str) and pattern.strip() else None,
        severity=severity.strip(),
        description=description.strip() if isinstance(description, str) and description.strip() else None,
        enabled=enabled,
    )

    if not success:
        raise ToolValidationError("rule already exists or could not be saved")

    return {
        "tenant_id": context.tenant_id,
        "rule": rule_text.strip(),
        "enabled": enabled,
    }


@tool_handler("admin.deleteRule")
async def admin_delete_rule(context: TenantContext, payload: Mapping[str, object]) -> dict[str, object]:
    """
    Delete an existing rule by its text value.
    """

    rule_text = payload.get("rule")
    if not isinstance(rule_text, str) or not rule_text.strip():
        raise ToolValidationError("rule must be provided for deletion")

    deleted = _rules_store.delete_rule(context.tenant_id, rule_text.strip())
    return {
        "tenant_id": context.tenant_id,
        "rule": rule_text.strip(),
        "deleted": deleted,
    }