| """Role-based permissions for the three specialist agents. |
| |
| In a real incident-response organization different roles have different |
| authority. We encode that so the environment can reward or penalize actions |
| taken by the wrong specialist, and so downstream policies learn realistic |
| coordination patterns. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from dataclasses import dataclass |
| from typing import Dict, Iterable, Set |
|
|
| ALL_ROLES: tuple[str, ...] = ( |
| "triage_agent", |
| "investigator_agent", |
| "ops_manager_agent", |
| ) |
|
|
| ALL_ACTIONS: tuple[str, ...] = ( |
| "inspect_logs", |
| "inspect_metrics", |
| "consult_kb", |
| "negotiate_handoff", |
| "apply_fix", |
| "close_incident", |
| "escalate", |
| "rollback", |
| "submit_postmortem", |
| ) |
|
|
|
|
| @dataclass(frozen=True) |
| class RolePermissions: |
| """Allowed actions per role and a list of role-gated actions.""" |
|
|
| allowed: Dict[str, Set[str]] |
|
|
| def is_allowed(self, actor: str, action_type: str) -> bool: |
| allowed_set = self.allowed.get(actor, set()) |
| return action_type in allowed_set |
|
|
| def allowed_actions(self, actor: str) -> Set[str]: |
| return set(self.allowed.get(actor, set())) |
|
|
|
|
| def default_role_permissions() -> RolePermissions: |
| """Default policy used by the environment. |
| |
| - triage_agent: first-line observability + initial handoff |
| - investigator_agent: deep diagnostics, knowledge base, fix proposals |
| - ops_manager_agent: coordination actions (handoff, escalate, rollback), |
| and is the only role authorized to close an incident or submit a |
| postmortem. |
| """ |
| allowed: Dict[str, Set[str]] = { |
| "triage_agent": { |
| "inspect_logs", |
| "inspect_metrics", |
| "consult_kb", |
| "negotiate_handoff", |
| }, |
| "investigator_agent": { |
| "inspect_logs", |
| "inspect_metrics", |
| "consult_kb", |
| "apply_fix", |
| "rollback", |
| }, |
| "ops_manager_agent": { |
| "negotiate_handoff", |
| "escalate", |
| "rollback", |
| "close_incident", |
| "submit_postmortem", |
| }, |
| } |
| return RolePermissions(allowed=allowed) |
|
|
|
|
| def check_actor_allowed( |
| actor: str, action_type: str, permissions: RolePermissions | None = None |
| ) -> bool: |
| """Return True if `actor` is permitted to run `action_type`. |
| |
| Returns False for unknown roles or actions so the caller can apply the |
| policy's wrong-actor penalty uniformly. |
| """ |
| if actor not in ALL_ROLES or action_type not in ALL_ACTIONS: |
| return False |
| permissions = permissions or default_role_permissions() |
| return permissions.is_allowed(actor, action_type) |
|
|
|
|
| def allowed_actors_for(action_type: str, permissions: RolePermissions | None = None) -> Iterable[str]: |
| permissions = permissions or default_role_permissions() |
| return tuple( |
| actor for actor in ALL_ROLES if permissions.is_allowed(actor, action_type) |
| ) |
|
|