SwapnilPatil28's picture
Major Update 1 - Add server, domain, client, models, and tests
4058302 verified
"""Incident Command Center environment (OpenEnv compliant).
This module wires the transport-agnostic domain logic (incidents, rewards,
role permissions) into OpenEnv's `Environment` contract.
Key design notes:
- **Deterministic**: every reset derives per-incident randomness from a
seeded RNG so results are reproducible and debuggable.
- **Role-aware**: actions run by the wrong specialist incur a small
penalty but are still allowed, mirroring real-world process friction.
- **Transparent rewards**: every step attaches a `reward_components` dict
to the observation so agents, evaluators, and humans can see *why* a
step was scored the way it was.
- **Safe serialization**: only wire types ever leave this module; the
runtime `Incident` dataclass stays server-side.
"""
from __future__ import annotations
import logging
import uuid
from typing import Dict, List, Optional
from openenv.core.env_server import Environment
from models import IncidentAction, IncidentObservation, IncidentState
from server.config import EnvConfig
from server.domain import (
Incident,
IncidentLibrary,
SeededRNG,
build_incident_library,
check_actor_allowed,
)
from server.domain.incidents import instantiate_incident
from server.domain.reward import RewardBreakdown, RewardEngine
from server.domain.roles import (
ALL_ACTIONS,
ALL_ROLES,
allowed_actors_for,
default_role_permissions,
)
from server.logging_utils import configure_logging, log_event
_LOG = logging.getLogger("icc.env")
class IncidentCommandCenterEnvironment(Environment):
"""Multi-agent incident response simulation.
The environment maintains a sequential queue of incidents per task. A
single action progresses the currently active incident. Closure advances
to the next incident; the episode ends when all incidents are closed,
when the investigation budget is exhausted, or when the global SLA
minute budget hits zero.
"""
def __init__(
self,
config: Optional[EnvConfig] = None,
library: Optional[IncidentLibrary] = None,
) -> None:
super().__init__()
self.config = config or EnvConfig.from_env()
self.library = library or build_incident_library()
self.reward_engine = RewardEngine()
self.permissions = default_role_permissions()
configure_logging(
level=self.config.log_level,
structured=self.config.structured_logging,
)
log_event(
_LOG,
"environment_boot",
env=self.config.name,
version=self.config.version,
tasks=self.library.tasks(),
incidents=self.library.total_incidents(),
)
# Runtime containers — populated by `reset`.
self._incidents: List[Incident] = []
self._episode_seed: int = self.config.default_seed
self._state = IncidentState(
episode_id=str(uuid.uuid4()),
task_id="easy",
seed=self._episode_seed,
version=self.config.version,
)
# ------------------------------------------------------------------
# OpenEnv Environment contract
# ------------------------------------------------------------------
def reset(
self,
task_name: str = "easy",
seed: Optional[int] = None,
) -> IncidentObservation:
"""Prepare a new episode.
Parameters
----------
task_name:
One of `easy`, `medium`, `hard`. Unknown task names fall back to
`easy` rather than raising, to maximize client robustness.
seed:
Optional seed for deterministic incident ordering and noise.
Falls back to `EnvConfig.default_seed` when omitted.
"""
selected = task_name if task_name in self.library.tasks() else "easy"
self._episode_seed = int(seed) if seed is not None else self.config.default_seed
rng = SeededRNG(self._episode_seed).child(f"task:{selected}")
templates = self.library.templates_for(selected)
self._incidents = [instantiate_incident(t, rng) for t in templates]
self._state = IncidentState(
episode_id=str(uuid.uuid4()),
task_id=selected,
seed=self._episode_seed,
version=self.config.version,
current_incident_index=0,
budget_remaining=self.config.budget_for(selected),
sla_minutes_remaining=self.config.sla_for(selected),
)
log_event(
_LOG,
"episode_start",
episode_id=self._state.episode_id,
task=selected,
seed=self._episode_seed,
incidents=[i.id for i in self._incidents],
)
return self._observation(
reward=0.0,
reward_components={},
notes=["episode_started"],
terminal_output=(
"Incident Command Center initialized. "
"Coordinate triage_agent, investigator_agent and "
"ops_manager_agent to resolve the incident queue."
),
done=False,
)
def step(self, action: IncidentAction) -> IncidentObservation:
"""Advance one turn.
Returns an observation whose `reward_components` dict explains how
the step reward was composed.
"""
self._state.step_count += 1
self._state.sla_minutes_remaining = max(
0, self._state.sla_minutes_remaining - self.config.sla_tick_minutes
)
self._state.budget_remaining -= 1
# Episode-level terminations -------------------------------------
if self._state.current_incident_index >= len(self._incidents):
return self._terminate(
reason="already_completed",
reward=0.0,
breakdown=RewardBreakdown(),
terminal_output="All incidents already resolved.",
)
if self._state.budget_remaining < 0:
breakdown = self.reward_engine.budget_exhausted()
return self._terminate(
reason="budget_exhausted",
reward=breakdown.total(),
breakdown=breakdown,
terminal_output="Episode terminated: investigation budget exhausted.",
)
if self._state.sla_minutes_remaining <= 0:
current = self._incidents[self._state.current_incident_index]
breakdown = self.reward_engine.sla_exhaustion(current)
self._state.incidents_failed += 1
return self._terminate(
reason="sla_exhausted",
reward=breakdown.total(),
breakdown=breakdown,
terminal_output="Episode terminated: global SLA budget reached zero.",
)
# Per-turn scoring -----------------------------------------------
incident = self._incidents[self._state.current_incident_index]
incident_id = incident.id
self._state.per_incident_steps[incident_id] = (
self._state.per_incident_steps.get(incident_id, 0) + 1
)
trace_line = f"{action.actor}:{action.action_type}:{action.target or '-'}"
self._state.action_trace.append(trace_line)
breakdown = RewardBreakdown()
breakdown.merge(self.reward_engine.step_cost(action.action_type))
actor_allowed = check_actor_allowed(
action.actor, action.action_type, self.permissions
)
breakdown.merge(
self.reward_engine.wrong_actor(action.actor, action.action_type, actor_allowed)
)
terminal_output = ""
episode_done = False
handler = self._handlers().get(action.action_type)
if handler is None:
breakdown.merge(self.reward_engine.invalid_action(action.action_type))
terminal_output = f"Unsupported action_type: {action.action_type}"
else:
terminal_output, episode_done = handler(action, incident, breakdown)
reward = breakdown.total()
self._state.cumulative_reward = round(
self._state.cumulative_reward + reward, 6
)
if len(self._state.reward_trace) < self.config.max_reward_trace_len:
self._state.reward_trace.append(breakdown.to_public_dict())
log_event(
_LOG,
"step",
episode_id=self._state.episode_id,
action=trace_line,
reward=reward,
components=breakdown.to_public_dict(),
cumulative_reward=self._state.cumulative_reward,
budget_remaining=self._state.budget_remaining,
sla_minutes_remaining=self._state.sla_minutes_remaining,
)
return self._observation(
reward=reward,
reward_components=breakdown.to_public_dict(),
notes=breakdown.notes,
terminal_output=terminal_output,
done=episode_done,
)
@property
def state(self) -> IncidentState:
return self._state
# ------------------------------------------------------------------
# Action handlers
# ------------------------------------------------------------------
def _handlers(self):
return {
"inspect_logs": self._handle_inspect_logs,
"inspect_metrics": self._handle_inspect_metrics,
"consult_kb": self._handle_consult_kb,
"negotiate_handoff": self._handle_handoff,
"apply_fix": self._handle_apply_fix,
"escalate": self._handle_escalate,
"rollback": self._handle_rollback,
"submit_postmortem": self._handle_postmortem,
"close_incident": self._handle_close,
}
# -- inspection actions --------------------------------------------
def _handle_inspect_logs(
self, action: IncidentAction, incident: Incident, breakdown: RewardBreakdown
) -> tuple[str, bool]:
lookup = (action.target or "").strip()
text = incident.logs.get(lookup, f"No logs found for target '{lookup}'.")
self._award_clue(incident, lookup, text, breakdown, scope="logs")
return text, False
def _handle_inspect_metrics(
self, action: IncidentAction, incident: Incident, breakdown: RewardBreakdown
) -> tuple[str, bool]:
lookup = (action.target or "").strip()
text = incident.metrics.get(lookup, f"No metrics found for target '{lookup}'.")
self._award_clue(incident, lookup, text, breakdown, scope="metrics")
return text, False
def _handle_consult_kb(
self, action: IncidentAction, incident: Incident, breakdown: RewardBreakdown
) -> tuple[str, bool]:
lookup = (action.target or "").strip()
text = incident.kb.get(lookup, f"No KB article found for key '{lookup}'.")
self._award_clue(incident, lookup, text, breakdown, scope="kb")
return text, False
def _award_clue(
self,
incident: Incident,
lookup_key: str,
text: str,
breakdown: RewardBreakdown,
scope: str,
) -> None:
scoped_key = f"{scope}:{lookup_key}"
clue_breakdown, was_new, _matched = self.reward_engine.clue_reward(
incident,
text,
already_used_keys=self._state.clue_keywords_used,
current_clue_count=len([k for k in self._state.clue_keywords_used]),
)
breakdown.merge(clue_breakdown)
if was_new and _matched is not None:
self._state.clue_keywords_used.append(_matched)
if scoped_key not in self._state.investigation_keys_used:
self._state.investigation_keys_used.append(scoped_key)
# -- coordination actions ------------------------------------------
def _handle_handoff(
self, action: IncidentAction, incident: Incident, breakdown: RewardBreakdown
) -> tuple[str, bool]:
team = (action.target or "").strip()
self._state.handoff_history.append(team)
breakdown.merge(self.reward_engine.handoff(incident, team))
if team == incident.good_handoff:
text = f"Handoff accepted by {team}. Hypothesis confidence increased."
else:
text = (
f"Handoff to {team} introduced delay. "
f"Expected owner: {incident.good_handoff}."
)
return text, False
def _handle_apply_fix(
self, action: IncidentAction, incident: Incident, breakdown: RewardBreakdown
) -> tuple[str, bool]:
mitigation_breakdown, is_good = self.reward_engine.mitigation(
incident, action.resolution_summary or ""
)
breakdown.merge(mitigation_breakdown)
if is_good:
self._state.mitigation_applied = True
text = "Mitigation accepted. Error rate is stabilizing."
else:
text = "Applied mitigation appears ineffective; diagnostics continue."
return text, False
def _handle_escalate(
self, action: IncidentAction, incident: Incident, breakdown: RewardBreakdown
) -> tuple[str, bool]:
scope_limit = (
incident.template.affected_users_estimate >= 50_000
or incident.template.revenue_impact_usd_per_min >= 800
or incident.template.postmortem_required
)
breakdown.merge(self.reward_engine.escalation(incident, scope_limit))
if scope_limit:
text = "Escalation paged: leadership channel opened; war room requested."
else:
text = "Escalation declined: impact below paging threshold."
return text, False
def _handle_rollback(
self, action: IncidentAction, incident: Incident, breakdown: RewardBreakdown
) -> tuple[str, bool]:
text = (action.resolution_summary or "").lower()
if any(
token in text
for keyword_set in incident.accepted_fix_keywords
for token in keyword_set
if "rollback" in token or "roll back" in token
):
breakdown.add("rollback_effective", 0.20, "rollback aligned with playbook")
self._state.mitigation_applied = True
output = "Rollback applied: change reverted to last known good."
else:
breakdown.add("rollback_ineffective", -0.15, "rollback did not match accepted fix")
output = "Rollback attempted but incident not stabilized."
return output, False
def _handle_postmortem(
self, action: IncidentAction, incident: Incident, breakdown: RewardBreakdown
) -> tuple[str, bool]:
note = (action.postmortem_note or "").strip()
if not note:
breakdown.add(
"postmortem_empty", -0.10, "submit_postmortem without postmortem_note"
)
return "Postmortem rejected: note missing.", False
self._state.postmortem_submitted = True
breakdown.add(
"postmortem_logged",
0.05,
f"postmortem stored ({len(note)} chars)",
)
return "Postmortem filed for review.", False
# -- closure --------------------------------------------------------
def _handle_close(
self, action: IncidentAction, incident: Incident, breakdown: RewardBreakdown
) -> tuple[str, bool]:
guess = (action.root_cause or "").strip()
steps = self._state.per_incident_steps.get(incident.id, 1)
clues = len(self._state.clue_keywords_used)
postmortem = self._state.postmortem_submitted
closure_breakdown, correct = self.reward_engine.closure(
incident,
predicted_root_cause=guess,
mitigation_applied=self._state.mitigation_applied,
clues_count=clues,
steps_on_incident=steps,
postmortem_submitted=postmortem,
)
breakdown.merge(closure_breakdown)
if correct:
self._state.incidents_resolved += 1
outcome_text = (
"Incident resolved successfully. "
f"Root cause acknowledged: {incident.root_cause}."
)
else:
self._state.incidents_failed += 1
outcome_text = (
"Incident closure rejected by postmortem checker. "
f"Prediction '{guess or 'unknown'}' did not match ground truth."
)
self._advance_incident()
episode_done = self._state.current_incident_index >= len(self._incidents)
if episode_done:
outcome_text += " All assigned incidents processed."
else:
outcome_text += f" Next incident: {self._incidents[self._state.current_incident_index].id}."
return outcome_text, episode_done
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _advance_incident(self) -> None:
self._state.current_incident_index += 1
self._state.mitigation_applied = False
self._state.postmortem_submitted = False
self._state.clue_keywords_used = []
self._state.investigation_keys_used = []
def _terminate(
self,
reason: str,
reward: float,
breakdown: RewardBreakdown,
terminal_output: str,
) -> IncidentObservation:
self._state.terminated_reason = reason
self._state.cumulative_reward = round(
self._state.cumulative_reward + reward, 6
)
log_event(
_LOG,
"episode_terminate",
episode_id=self._state.episode_id,
reason=reason,
cumulative_reward=self._state.cumulative_reward,
incidents_resolved=self._state.incidents_resolved,
incidents_failed=self._state.incidents_failed,
)
return IncidentObservation(
done=True,
reward=reward,
incident_id="EOF",
incident_title="Episode ended",
incident_description="No further actions accepted.",
incident_category="",
incident_difficulty=self._state.task_id,
customer_tier="standard",
affected_users_estimate=0,
revenue_impact_usd_per_min=0,
postmortem_required=False,
available_actions=[],
available_teams=list(ALL_ROLES),
allowed_actors_by_action={},
visible_signals=[],
investigation_targets={},
playbook_hints=[],
terminal_output=terminal_output,
budget_remaining=max(self._state.budget_remaining, 0),
sla_minutes_remaining=self._state.sla_minutes_remaining,
incidents_remaining=max(
len(self._incidents) - self._state.current_incident_index, 0
),
episode_step=self._state.step_count,
incident_step=0,
clues_found=len(self._state.clue_keywords_used),
mitigation_applied=self._state.mitigation_applied,
postmortem_submitted=self._state.postmortem_submitted,
reward_components=breakdown.to_public_dict(),
last_action_notes=breakdown.notes,
)
def _observation(
self,
reward: float,
reward_components: Dict[str, float],
notes: List[str],
terminal_output: str,
done: bool,
) -> IncidentObservation:
if done or self._state.current_incident_index >= len(self._incidents):
return IncidentObservation(
done=True,
reward=reward,
incident_id="EOF",
incident_title="All incidents completed",
incident_description="Episode ended.",
incident_category="",
incident_difficulty=self._state.task_id,
customer_tier="standard",
affected_users_estimate=0,
revenue_impact_usd_per_min=0,
postmortem_required=False,
available_actions=[],
available_teams=list(ALL_ROLES),
allowed_actors_by_action={},
visible_signals=[],
investigation_targets={},
playbook_hints=[],
terminal_output=terminal_output,
budget_remaining=max(self._state.budget_remaining, 0),
sla_minutes_remaining=self._state.sla_minutes_remaining,
incidents_remaining=0,
episode_step=self._state.step_count,
incident_step=0,
clues_found=len(self._state.clue_keywords_used),
mitigation_applied=self._state.mitigation_applied,
postmortem_submitted=self._state.postmortem_submitted,
reward_components=reward_components,
last_action_notes=notes,
)
incident = self._incidents[self._state.current_incident_index]
investigation_targets = {
"logs": list(incident.logs.keys()),
"metrics": list(incident.metrics.keys()),
"kb": list(incident.kb.keys()),
}
allowed_actors_by_action = {
action_type: list(allowed_actors_for(action_type, self.permissions))
for action_type in ALL_ACTIONS
}
incident_step = self._state.per_incident_steps.get(incident.id, 0)
return IncidentObservation(
done=False,
reward=reward,
incident_id=incident.id,
incident_title=incident.title,
incident_description=incident.description,
incident_category=incident.template.category,
incident_difficulty=incident.template.difficulty,
customer_tier=incident.customer_tier,
affected_users_estimate=incident.affected_users_estimate,
revenue_impact_usd_per_min=incident.revenue_impact_usd_per_min,
postmortem_required=incident.postmortem_required,
available_actions=list(ALL_ACTIONS),
available_teams=list(ALL_ROLES),
allowed_actors_by_action=allowed_actors_by_action,
visible_signals=list(incident.signals),
investigation_targets=investigation_targets,
playbook_hints=list(incident.playbook_hints),
terminal_output=terminal_output,
budget_remaining=max(self._state.budget_remaining, 0),
sla_minutes_remaining=self._state.sla_minutes_remaining,
incidents_remaining=len(self._incidents) - self._state.current_incident_index,
episode_step=self._state.step_count,
incident_step=incident_step,
clues_found=len(self._state.clue_keywords_used),
mitigation_applied=self._state.mitigation_applied,
postmortem_submitted=self._state.postmortem_submitted,
reward_components=reward_components,
last_action_notes=notes,
)