| """Incident Command Center environment (OpenEnv compliant). |
| |
| This module wires the transport-agnostic domain logic (incidents, rewards, |
| role permissions) into OpenEnv's `Environment` contract. |
| |
| Key design notes: |
| |
| - **Deterministic**: every reset derives per-incident randomness from a |
| seeded RNG so results are reproducible and debuggable. |
| - **Role-aware**: actions run by the wrong specialist incur a small |
| penalty but are still allowed, mirroring real-world process friction. |
| - **Transparent rewards**: every step attaches a `reward_components` dict |
| to the observation so agents, evaluators, and humans can see *why* a |
| step was scored the way it was. |
| - **Safe serialization**: only wire types ever leave this module; the |
| runtime `Incident` dataclass stays server-side. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import logging |
| import uuid |
| from typing import Dict, List, Optional |
|
|
| from openenv.core.env_server import Environment |
|
|
| from models import IncidentAction, IncidentObservation, IncidentState |
| from server.config import EnvConfig |
| from server.domain import ( |
| Incident, |
| IncidentLibrary, |
| SeededRNG, |
| build_incident_library, |
| check_actor_allowed, |
| ) |
| from server.domain.incidents import instantiate_incident |
| from server.domain.reward import RewardBreakdown, RewardEngine |
| from server.domain.roles import ( |
| ALL_ACTIONS, |
| ALL_ROLES, |
| allowed_actors_for, |
| default_role_permissions, |
| ) |
| from server.logging_utils import configure_logging, log_event |
|
|
| _LOG = logging.getLogger("icc.env") |
|
|
|
|
| class IncidentCommandCenterEnvironment(Environment): |
| """Multi-agent incident response simulation. |
| |
| The environment maintains a sequential queue of incidents per task. A |
| single action progresses the currently active incident. Closure advances |
| to the next incident; the episode ends when all incidents are closed, |
| when the investigation budget is exhausted, or when the global SLA |
| minute budget hits zero. |
| """ |
|
|
| def __init__( |
| self, |
| config: Optional[EnvConfig] = None, |
| library: Optional[IncidentLibrary] = None, |
| ) -> None: |
| super().__init__() |
| self.config = config or EnvConfig.from_env() |
| self.library = library or build_incident_library() |
| self.reward_engine = RewardEngine() |
| self.permissions = default_role_permissions() |
|
|
| configure_logging( |
| level=self.config.log_level, |
| structured=self.config.structured_logging, |
| ) |
| log_event( |
| _LOG, |
| "environment_boot", |
| env=self.config.name, |
| version=self.config.version, |
| tasks=self.library.tasks(), |
| incidents=self.library.total_incidents(), |
| ) |
|
|
| |
| self._incidents: List[Incident] = [] |
| self._episode_seed: int = self.config.default_seed |
| self._state = IncidentState( |
| episode_id=str(uuid.uuid4()), |
| task_id="easy", |
| seed=self._episode_seed, |
| version=self.config.version, |
| ) |
|
|
| |
| |
| |
|
|
| def reset( |
| self, |
| task_name: str = "easy", |
| seed: Optional[int] = None, |
| ) -> IncidentObservation: |
| """Prepare a new episode. |
| |
| Parameters |
| ---------- |
| task_name: |
| One of `easy`, `medium`, `hard`. Unknown task names fall back to |
| `easy` rather than raising, to maximize client robustness. |
| seed: |
| Optional seed for deterministic incident ordering and noise. |
| Falls back to `EnvConfig.default_seed` when omitted. |
| """ |
| selected = task_name if task_name in self.library.tasks() else "easy" |
| self._episode_seed = int(seed) if seed is not None else self.config.default_seed |
|
|
| rng = SeededRNG(self._episode_seed).child(f"task:{selected}") |
| templates = self.library.templates_for(selected) |
| self._incidents = [instantiate_incident(t, rng) for t in templates] |
|
|
| self._state = IncidentState( |
| episode_id=str(uuid.uuid4()), |
| task_id=selected, |
| seed=self._episode_seed, |
| version=self.config.version, |
| current_incident_index=0, |
| budget_remaining=self.config.budget_for(selected), |
| sla_minutes_remaining=self.config.sla_for(selected), |
| ) |
|
|
| log_event( |
| _LOG, |
| "episode_start", |
| episode_id=self._state.episode_id, |
| task=selected, |
| seed=self._episode_seed, |
| incidents=[i.id for i in self._incidents], |
| ) |
|
|
| return self._observation( |
| reward=0.0, |
| reward_components={}, |
| notes=["episode_started"], |
| terminal_output=( |
| "Incident Command Center initialized. " |
| "Coordinate triage_agent, investigator_agent and " |
| "ops_manager_agent to resolve the incident queue." |
| ), |
| done=False, |
| ) |
|
|
| def step(self, action: IncidentAction) -> IncidentObservation: |
| """Advance one turn. |
| |
| Returns an observation whose `reward_components` dict explains how |
| the step reward was composed. |
| """ |
| self._state.step_count += 1 |
| self._state.sla_minutes_remaining = max( |
| 0, self._state.sla_minutes_remaining - self.config.sla_tick_minutes |
| ) |
| self._state.budget_remaining -= 1 |
|
|
| |
| if self._state.current_incident_index >= len(self._incidents): |
| return self._terminate( |
| reason="already_completed", |
| reward=0.0, |
| breakdown=RewardBreakdown(), |
| terminal_output="All incidents already resolved.", |
| ) |
|
|
| if self._state.budget_remaining < 0: |
| breakdown = self.reward_engine.budget_exhausted() |
| return self._terminate( |
| reason="budget_exhausted", |
| reward=breakdown.total(), |
| breakdown=breakdown, |
| terminal_output="Episode terminated: investigation budget exhausted.", |
| ) |
|
|
| if self._state.sla_minutes_remaining <= 0: |
| current = self._incidents[self._state.current_incident_index] |
| breakdown = self.reward_engine.sla_exhaustion(current) |
| self._state.incidents_failed += 1 |
| return self._terminate( |
| reason="sla_exhausted", |
| reward=breakdown.total(), |
| breakdown=breakdown, |
| terminal_output="Episode terminated: global SLA budget reached zero.", |
| ) |
|
|
| |
| incident = self._incidents[self._state.current_incident_index] |
| incident_id = incident.id |
| self._state.per_incident_steps[incident_id] = ( |
| self._state.per_incident_steps.get(incident_id, 0) + 1 |
| ) |
| trace_line = f"{action.actor}:{action.action_type}:{action.target or '-'}" |
| self._state.action_trace.append(trace_line) |
|
|
| breakdown = RewardBreakdown() |
| breakdown.merge(self.reward_engine.step_cost(action.action_type)) |
|
|
| actor_allowed = check_actor_allowed( |
| action.actor, action.action_type, self.permissions |
| ) |
| breakdown.merge( |
| self.reward_engine.wrong_actor(action.actor, action.action_type, actor_allowed) |
| ) |
|
|
| terminal_output = "" |
| episode_done = False |
|
|
| handler = self._handlers().get(action.action_type) |
| if handler is None: |
| breakdown.merge(self.reward_engine.invalid_action(action.action_type)) |
| terminal_output = f"Unsupported action_type: {action.action_type}" |
| else: |
| terminal_output, episode_done = handler(action, incident, breakdown) |
|
|
| reward = breakdown.total() |
| self._state.cumulative_reward = round( |
| self._state.cumulative_reward + reward, 6 |
| ) |
| if len(self._state.reward_trace) < self.config.max_reward_trace_len: |
| self._state.reward_trace.append(breakdown.to_public_dict()) |
|
|
| log_event( |
| _LOG, |
| "step", |
| episode_id=self._state.episode_id, |
| action=trace_line, |
| reward=reward, |
| components=breakdown.to_public_dict(), |
| cumulative_reward=self._state.cumulative_reward, |
| budget_remaining=self._state.budget_remaining, |
| sla_minutes_remaining=self._state.sla_minutes_remaining, |
| ) |
|
|
| return self._observation( |
| reward=reward, |
| reward_components=breakdown.to_public_dict(), |
| notes=breakdown.notes, |
| terminal_output=terminal_output, |
| done=episode_done, |
| ) |
|
|
| @property |
| def state(self) -> IncidentState: |
| return self._state |
|
|
| |
| |
| |
|
|
| def _handlers(self): |
| return { |
| "inspect_logs": self._handle_inspect_logs, |
| "inspect_metrics": self._handle_inspect_metrics, |
| "consult_kb": self._handle_consult_kb, |
| "negotiate_handoff": self._handle_handoff, |
| "apply_fix": self._handle_apply_fix, |
| "escalate": self._handle_escalate, |
| "rollback": self._handle_rollback, |
| "submit_postmortem": self._handle_postmortem, |
| "close_incident": self._handle_close, |
| } |
|
|
| |
|
|
| def _handle_inspect_logs( |
| self, action: IncidentAction, incident: Incident, breakdown: RewardBreakdown |
| ) -> tuple[str, bool]: |
| lookup = (action.target or "").strip() |
| text = incident.logs.get(lookup, f"No logs found for target '{lookup}'.") |
| self._award_clue(incident, lookup, text, breakdown, scope="logs") |
| return text, False |
|
|
| def _handle_inspect_metrics( |
| self, action: IncidentAction, incident: Incident, breakdown: RewardBreakdown |
| ) -> tuple[str, bool]: |
| lookup = (action.target or "").strip() |
| text = incident.metrics.get(lookup, f"No metrics found for target '{lookup}'.") |
| self._award_clue(incident, lookup, text, breakdown, scope="metrics") |
| return text, False |
|
|
| def _handle_consult_kb( |
| self, action: IncidentAction, incident: Incident, breakdown: RewardBreakdown |
| ) -> tuple[str, bool]: |
| lookup = (action.target or "").strip() |
| text = incident.kb.get(lookup, f"No KB article found for key '{lookup}'.") |
| self._award_clue(incident, lookup, text, breakdown, scope="kb") |
| return text, False |
|
|
| def _award_clue( |
| self, |
| incident: Incident, |
| lookup_key: str, |
| text: str, |
| breakdown: RewardBreakdown, |
| scope: str, |
| ) -> None: |
| scoped_key = f"{scope}:{lookup_key}" |
| clue_breakdown, was_new, _matched = self.reward_engine.clue_reward( |
| incident, |
| text, |
| already_used_keys=self._state.clue_keywords_used, |
| current_clue_count=len([k for k in self._state.clue_keywords_used]), |
| ) |
| breakdown.merge(clue_breakdown) |
| if was_new and _matched is not None: |
| self._state.clue_keywords_used.append(_matched) |
| if scoped_key not in self._state.investigation_keys_used: |
| self._state.investigation_keys_used.append(scoped_key) |
|
|
| |
|
|
| def _handle_handoff( |
| self, action: IncidentAction, incident: Incident, breakdown: RewardBreakdown |
| ) -> tuple[str, bool]: |
| team = (action.target or "").strip() |
| self._state.handoff_history.append(team) |
| breakdown.merge(self.reward_engine.handoff(incident, team)) |
| if team == incident.good_handoff: |
| text = f"Handoff accepted by {team}. Hypothesis confidence increased." |
| else: |
| text = ( |
| f"Handoff to {team} introduced delay. " |
| f"Expected owner: {incident.good_handoff}." |
| ) |
| return text, False |
|
|
| def _handle_apply_fix( |
| self, action: IncidentAction, incident: Incident, breakdown: RewardBreakdown |
| ) -> tuple[str, bool]: |
| mitigation_breakdown, is_good = self.reward_engine.mitigation( |
| incident, action.resolution_summary or "" |
| ) |
| breakdown.merge(mitigation_breakdown) |
| if is_good: |
| self._state.mitigation_applied = True |
| text = "Mitigation accepted. Error rate is stabilizing." |
| else: |
| text = "Applied mitigation appears ineffective; diagnostics continue." |
| return text, False |
|
|
| def _handle_escalate( |
| self, action: IncidentAction, incident: Incident, breakdown: RewardBreakdown |
| ) -> tuple[str, bool]: |
| scope_limit = ( |
| incident.template.affected_users_estimate >= 50_000 |
| or incident.template.revenue_impact_usd_per_min >= 800 |
| or incident.template.postmortem_required |
| ) |
| breakdown.merge(self.reward_engine.escalation(incident, scope_limit)) |
| if scope_limit: |
| text = "Escalation paged: leadership channel opened; war room requested." |
| else: |
| text = "Escalation declined: impact below paging threshold." |
| return text, False |
|
|
| def _handle_rollback( |
| self, action: IncidentAction, incident: Incident, breakdown: RewardBreakdown |
| ) -> tuple[str, bool]: |
| text = (action.resolution_summary or "").lower() |
| if any( |
| token in text |
| for keyword_set in incident.accepted_fix_keywords |
| for token in keyword_set |
| if "rollback" in token or "roll back" in token |
| ): |
| breakdown.add("rollback_effective", 0.20, "rollback aligned with playbook") |
| self._state.mitigation_applied = True |
| output = "Rollback applied: change reverted to last known good." |
| else: |
| breakdown.add("rollback_ineffective", -0.15, "rollback did not match accepted fix") |
| output = "Rollback attempted but incident not stabilized." |
| return output, False |
|
|
| def _handle_postmortem( |
| self, action: IncidentAction, incident: Incident, breakdown: RewardBreakdown |
| ) -> tuple[str, bool]: |
| note = (action.postmortem_note or "").strip() |
| if not note: |
| breakdown.add( |
| "postmortem_empty", -0.10, "submit_postmortem without postmortem_note" |
| ) |
| return "Postmortem rejected: note missing.", False |
|
|
| self._state.postmortem_submitted = True |
| breakdown.add( |
| "postmortem_logged", |
| 0.05, |
| f"postmortem stored ({len(note)} chars)", |
| ) |
| return "Postmortem filed for review.", False |
|
|
| |
|
|
| def _handle_close( |
| self, action: IncidentAction, incident: Incident, breakdown: RewardBreakdown |
| ) -> tuple[str, bool]: |
| guess = (action.root_cause or "").strip() |
| steps = self._state.per_incident_steps.get(incident.id, 1) |
| clues = len(self._state.clue_keywords_used) |
| postmortem = self._state.postmortem_submitted |
|
|
| closure_breakdown, correct = self.reward_engine.closure( |
| incident, |
| predicted_root_cause=guess, |
| mitigation_applied=self._state.mitigation_applied, |
| clues_count=clues, |
| steps_on_incident=steps, |
| postmortem_submitted=postmortem, |
| ) |
| breakdown.merge(closure_breakdown) |
|
|
| if correct: |
| self._state.incidents_resolved += 1 |
| outcome_text = ( |
| "Incident resolved successfully. " |
| f"Root cause acknowledged: {incident.root_cause}." |
| ) |
| else: |
| self._state.incidents_failed += 1 |
| outcome_text = ( |
| "Incident closure rejected by postmortem checker. " |
| f"Prediction '{guess or 'unknown'}' did not match ground truth." |
| ) |
|
|
| self._advance_incident() |
| episode_done = self._state.current_incident_index >= len(self._incidents) |
| if episode_done: |
| outcome_text += " All assigned incidents processed." |
| else: |
| outcome_text += f" Next incident: {self._incidents[self._state.current_incident_index].id}." |
| return outcome_text, episode_done |
|
|
| |
| |
| |
|
|
| def _advance_incident(self) -> None: |
| self._state.current_incident_index += 1 |
| self._state.mitigation_applied = False |
| self._state.postmortem_submitted = False |
| self._state.clue_keywords_used = [] |
| self._state.investigation_keys_used = [] |
|
|
| def _terminate( |
| self, |
| reason: str, |
| reward: float, |
| breakdown: RewardBreakdown, |
| terminal_output: str, |
| ) -> IncidentObservation: |
| self._state.terminated_reason = reason |
| self._state.cumulative_reward = round( |
| self._state.cumulative_reward + reward, 6 |
| ) |
| log_event( |
| _LOG, |
| "episode_terminate", |
| episode_id=self._state.episode_id, |
| reason=reason, |
| cumulative_reward=self._state.cumulative_reward, |
| incidents_resolved=self._state.incidents_resolved, |
| incidents_failed=self._state.incidents_failed, |
| ) |
| return IncidentObservation( |
| done=True, |
| reward=reward, |
| incident_id="EOF", |
| incident_title="Episode ended", |
| incident_description="No further actions accepted.", |
| incident_category="", |
| incident_difficulty=self._state.task_id, |
| customer_tier="standard", |
| affected_users_estimate=0, |
| revenue_impact_usd_per_min=0, |
| postmortem_required=False, |
| available_actions=[], |
| available_teams=list(ALL_ROLES), |
| allowed_actors_by_action={}, |
| visible_signals=[], |
| investigation_targets={}, |
| playbook_hints=[], |
| terminal_output=terminal_output, |
| budget_remaining=max(self._state.budget_remaining, 0), |
| sla_minutes_remaining=self._state.sla_minutes_remaining, |
| incidents_remaining=max( |
| len(self._incidents) - self._state.current_incident_index, 0 |
| ), |
| episode_step=self._state.step_count, |
| incident_step=0, |
| clues_found=len(self._state.clue_keywords_used), |
| mitigation_applied=self._state.mitigation_applied, |
| postmortem_submitted=self._state.postmortem_submitted, |
| reward_components=breakdown.to_public_dict(), |
| last_action_notes=breakdown.notes, |
| ) |
|
|
| def _observation( |
| self, |
| reward: float, |
| reward_components: Dict[str, float], |
| notes: List[str], |
| terminal_output: str, |
| done: bool, |
| ) -> IncidentObservation: |
| if done or self._state.current_incident_index >= len(self._incidents): |
| return IncidentObservation( |
| done=True, |
| reward=reward, |
| incident_id="EOF", |
| incident_title="All incidents completed", |
| incident_description="Episode ended.", |
| incident_category="", |
| incident_difficulty=self._state.task_id, |
| customer_tier="standard", |
| affected_users_estimate=0, |
| revenue_impact_usd_per_min=0, |
| postmortem_required=False, |
| available_actions=[], |
| available_teams=list(ALL_ROLES), |
| allowed_actors_by_action={}, |
| visible_signals=[], |
| investigation_targets={}, |
| playbook_hints=[], |
| terminal_output=terminal_output, |
| budget_remaining=max(self._state.budget_remaining, 0), |
| sla_minutes_remaining=self._state.sla_minutes_remaining, |
| incidents_remaining=0, |
| episode_step=self._state.step_count, |
| incident_step=0, |
| clues_found=len(self._state.clue_keywords_used), |
| mitigation_applied=self._state.mitigation_applied, |
| postmortem_submitted=self._state.postmortem_submitted, |
| reward_components=reward_components, |
| last_action_notes=notes, |
| ) |
|
|
| incident = self._incidents[self._state.current_incident_index] |
| investigation_targets = { |
| "logs": list(incident.logs.keys()), |
| "metrics": list(incident.metrics.keys()), |
| "kb": list(incident.kb.keys()), |
| } |
| allowed_actors_by_action = { |
| action_type: list(allowed_actors_for(action_type, self.permissions)) |
| for action_type in ALL_ACTIONS |
| } |
| incident_step = self._state.per_incident_steps.get(incident.id, 0) |
|
|
| return IncidentObservation( |
| done=False, |
| reward=reward, |
| incident_id=incident.id, |
| incident_title=incident.title, |
| incident_description=incident.description, |
| incident_category=incident.template.category, |
| incident_difficulty=incident.template.difficulty, |
| customer_tier=incident.customer_tier, |
| affected_users_estimate=incident.affected_users_estimate, |
| revenue_impact_usd_per_min=incident.revenue_impact_usd_per_min, |
| postmortem_required=incident.postmortem_required, |
| available_actions=list(ALL_ACTIONS), |
| available_teams=list(ALL_ROLES), |
| allowed_actors_by_action=allowed_actors_by_action, |
| visible_signals=list(incident.signals), |
| investigation_targets=investigation_targets, |
| playbook_hints=list(incident.playbook_hints), |
| terminal_output=terminal_output, |
| budget_remaining=max(self._state.budget_remaining, 0), |
| sla_minutes_remaining=self._state.sla_minutes_remaining, |
| incidents_remaining=len(self._incidents) - self._state.current_incident_index, |
| episode_step=self._state.step_count, |
| incident_step=incident_step, |
| clues_found=len(self._state.clue_keywords_used), |
| mitigation_applied=self._state.mitigation_applied, |
| postmortem_submitted=self._state.postmortem_submitted, |
| reward_components=reward_components, |
| last_action_notes=notes, |
| ) |
|
|