SwapnilPatil28's picture
Major Update 1 - Add server, domain, client, models, and tests
4058302 verified
"""Role-based permissions for the three specialist agents.
In a real incident-response organization different roles have different
authority. We encode that so the environment can reward or penalize actions
taken by the wrong specialist, and so downstream policies learn realistic
coordination patterns.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, Iterable, Set
ALL_ROLES: tuple[str, ...] = (
"triage_agent",
"investigator_agent",
"ops_manager_agent",
)
ALL_ACTIONS: tuple[str, ...] = (
"inspect_logs",
"inspect_metrics",
"consult_kb",
"negotiate_handoff",
"apply_fix",
"close_incident",
"escalate",
"rollback",
"submit_postmortem",
)
@dataclass(frozen=True)
class RolePermissions:
"""Allowed actions per role and a list of role-gated actions."""
allowed: Dict[str, Set[str]]
def is_allowed(self, actor: str, action_type: str) -> bool:
allowed_set = self.allowed.get(actor, set())
return action_type in allowed_set
def allowed_actions(self, actor: str) -> Set[str]:
return set(self.allowed.get(actor, set()))
def default_role_permissions() -> RolePermissions:
"""Default policy used by the environment.
- triage_agent: first-line observability + initial handoff
- investigator_agent: deep diagnostics, knowledge base, fix proposals
- ops_manager_agent: coordination actions (handoff, escalate, rollback),
and is the only role authorized to close an incident or submit a
postmortem.
"""
allowed: Dict[str, Set[str]] = {
"triage_agent": {
"inspect_logs",
"inspect_metrics",
"consult_kb",
"negotiate_handoff",
},
"investigator_agent": {
"inspect_logs",
"inspect_metrics",
"consult_kb",
"apply_fix",
"rollback",
},
"ops_manager_agent": {
"negotiate_handoff",
"escalate",
"rollback",
"close_incident",
"submit_postmortem",
},
}
return RolePermissions(allowed=allowed)
def check_actor_allowed(
actor: str, action_type: str, permissions: RolePermissions | None = None
) -> bool:
"""Return True if `actor` is permitted to run `action_type`.
Returns False for unknown roles or actions so the caller can apply the
policy's wrong-actor penalty uniformly.
"""
if actor not in ALL_ROLES or action_type not in ALL_ACTIONS:
return False
permissions = permissions or default_role_permissions()
return permissions.is_allowed(actor, action_type)
def allowed_actors_for(action_type: str, permissions: RolePermissions | None = None) -> Iterable[str]:
permissions = permissions or default_role_permissions()
return tuple(
actor for actor in ALL_ROLES if permissions.is_allowed(actor, action_type)
)