Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| from typing import Mapping | |
| from backend.api.storage.rules_store import RulesStore | |
| from backend.mcp_server.common.tenant import TenantContext | |
| from backend.mcp_server.common.utils import ToolValidationError, tool_handler | |
| _rules_store = RulesStore() | |
| async def admin_get_rules(context: TenantContext, payload: Mapping[str, object]) -> dict[str, object]: | |
| """ | |
| Return the active admin rules for the tenant. | |
| """ | |
| detailed = bool(payload.get("detailed", False)) | |
| rules = ( | |
| _rules_store.get_rules_detailed(context.tenant_id) | |
| if detailed | |
| else _rules_store.get_rules(context.tenant_id) | |
| ) | |
| return {"tenant_id": context.tenant_id, "rules": rules, "detailed": detailed} | |
| async def admin_add_rule(context: TenantContext, payload: Mapping[str, object]) -> dict[str, object]: | |
| """ | |
| Add a new governance rule. | |
| """ | |
| rule_text = payload.get("rule") | |
| if not isinstance(rule_text, str) or not rule_text.strip(): | |
| raise ToolValidationError("rule must be a non-empty string") | |
| pattern = payload.get("pattern") | |
| if pattern is not None and not isinstance(pattern, str): | |
| raise ToolValidationError("pattern must be a string if provided") | |
| severity = payload.get("severity", "medium") | |
| if not isinstance(severity, str): | |
| raise ToolValidationError("severity must be a string") | |
| description = payload.get("description") | |
| if description is not None and not isinstance(description, str): | |
| raise ToolValidationError("description must be a string if provided") | |
| enabled = bool(payload.get("enabled", True)) | |
| success = _rules_store.add_rule( | |
| tenant_id=context.tenant_id, | |
| rule=rule_text.strip(), | |
| pattern=pattern.strip() if isinstance(pattern, str) and pattern.strip() else None, | |
| severity=severity.strip(), | |
| description=description.strip() if isinstance(description, str) and description.strip() else None, | |
| enabled=enabled, | |
| ) | |
| if not success: | |
| raise ToolValidationError("rule already exists or could not be saved") | |
| return { | |
| "tenant_id": context.tenant_id, | |
| "rule": rule_text.strip(), | |
| "enabled": enabled, | |
| } | |
| async def admin_delete_rule(context: TenantContext, payload: Mapping[str, object]) -> dict[str, object]: | |
| """ | |
| Delete an existing rule by its text value. | |
| """ | |
| rule_text = payload.get("rule") | |
| if not isinstance(rule_text, str) or not rule_text.strip(): | |
| raise ToolValidationError("rule must be provided for deletion") | |
| deleted = _rules_store.delete_rule(context.tenant_id, rule_text.strip()) | |
| return { | |
| "tenant_id": context.tenant_id, | |
| "rule": rule_text.strip(), | |
| "deleted": deleted, | |
| } | |