releaseops-env / server /releaseops_environment.py
eastbrick's picture
Unify score normalization and add validator parity checks
140d024
"""ReleaseOps Environment: production change review simulator."""
import json
import sqlite3
from pathlib import Path
from typing import Any
from uuid import uuid4
from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import Action, Observation, State
from releaseops_env.models import (
ReleaseAction,
ReleaseObservation,
ReleaseState,
RiskSignal,
ToolResult,
)
from releaseops_env.scoring import normalize_score
TASKS_DIR = Path(__file__).parent.parent / "tasks"
INCIDENTS_DB = Path(__file__).parent.parent / "data" / "incidents.db"
class ReleaseOpsEnvironment(Environment):
"""
Simulates production change review and rollout governance.
The agent investigates a proposed change using tools (inspect, query,
search) and makes a release decision. Scenarios are pre-authored JSON
files loaded at reset(). No live computation — all reactions are array
lookups on pre-authored telemetry curves.
"""
SUPPORTS_CONCURRENT_SESSIONS = True
def __init__(self):
super().__init__()
self._state = ReleaseState(episode_id=str(uuid4()))
self._scenario: dict = {}
# Tracks discovered RiskSignal objects (signal_id → RiskSignal)
self._discovered_risks: dict[str, RiskSignal] = {}
# ── RESET ──────────────────────────────────────────────
def reset(self, seed=None, episode_id=None, **kwargs) -> Observation:
task_id = kwargs.get("task_id", "easy_001")
self._scenario = self._load_scenario(task_id)
self._discovered_risks = {}
self._state = ReleaseState(
episode_id=episode_id or str(uuid4()),
step_count=0,
task_id=task_id,
rollout_phase="precheck",
)
return ReleaseObservation(
done=False,
reward=0.0,
task_id=task_id,
change_summary=self._scenario["change"]["description"],
known_risk_signals=[],
last_tool_result=None,
allowed_actions=self._get_allowed_actions(),
rollout_phase="precheck",
time_remaining=self._max_steps,
cumulative_reward=0.0,
metadata={"status": "ready", "task_id": task_id},
)
# ── STEP ───────────────────────────────────────────────
def step(self, action: Action, timeout_s=None, **kwargs) -> Observation:
if not isinstance(action, ReleaseAction):
return ReleaseObservation(
done=False,
reward=-0.05,
task_id=self._state.task_id,
change_summary=self._scenario.get("change", {}).get("description", ""),
allowed_actions=self._get_allowed_actions(),
rollout_phase=self._state.rollout_phase,
time_remaining=self._max_steps - self._state.step_count,
cumulative_reward=self._state.cumulative_reward,
metadata={
"error": f"Expected ReleaseAction, got {type(action).__name__}"
},
)
self._state.step_count += 1
self._state.steps_in_phase += 1
self._state.actions_taken.append(action.action_type)
reward = 0.0
tool_result = None
new_risks: list[RiskSignal] = []
try:
if action.action_type == "inspect_change":
reward, tool_result, new_risks = self._handle_inspect_change(action)
elif action.action_type == "inspect_services":
reward, tool_result, new_risks = self._handle_inspect_services(action)
elif action.action_type == "inspect_dependencies":
reward, tool_result, new_risks = self._handle_inspect_dependencies(
action
)
elif action.action_type == "search_incidents":
reward, tool_result, new_risks = self._handle_search_incidents(action)
elif action.action_type == "check_policy":
reward, tool_result, new_risks = self._handle_check_policy(action)
elif action.action_type == "query_telemetry":
reward, tool_result, new_risks = self._handle_query_telemetry(action)
elif action.action_type == "request_artifact":
reward, tool_result, new_risks = self._handle_request_artifact(action)
elif action.action_type == "control_rollout":
reward, tool_result, new_risks = self._handle_control_rollout(action)
elif action.action_type == "submit_decision":
return self._handle_submit_decision(action)
else:
reward = -0.05
tool_result = ToolResult(
tool_name=action.action_type,
success=False,
content=f"Unknown action type: {action.action_type}",
)
except Exception as e:
reward = -0.05
tool_result = ToolResult(
tool_name=action.action_type,
success=False,
content=f"Error handling action: {e}",
)
# Register new risk signals
for risk in new_risks:
if risk.signal_id not in self._discovered_risks:
self._discovered_risks[risk.signal_id] = risk
self._state.risk_signals_found.append(risk.signal_id)
self._state.cumulative_reward += reward
# Check max steps (timeout)
done = self._state.step_count >= self._max_steps
if done and not self._state.terminal:
self._state.terminal = True
self._state.rollout_phase = "terminal"
timeout_penalty = -0.20
reward += timeout_penalty
self._state.cumulative_reward += timeout_penalty
return ReleaseObservation(
done=done,
reward=reward,
task_id=self._state.task_id,
change_summary=self._scenario["change"]["description"],
known_risk_signals=self._get_visible_risks(),
last_tool_result=tool_result,
allowed_actions=self._get_allowed_actions(),
rollout_phase=self._state.rollout_phase,
time_remaining=max(0, self._max_steps - self._state.step_count),
cumulative_reward=self._state.cumulative_reward,
metadata={"step": self._state.step_count},
)
# ── STATE ──────────────────────────────────────────────
@property
def state(self) -> State:
return self._state
# ── CONSTANTS ──────────────────────────────────────────
@property
def _max_steps(self) -> int:
return self._scenario.get("max_steps", 12)
# ── SCENARIO LOADING ───────────────────────────────────
def _load_scenario(self, task_id: str) -> dict:
"""Load all 6 JSON files for a scenario into one dict."""
task_dir = TASKS_DIR / task_id
scenario: dict[str, Any] = {}
for name in [
"services",
"change",
"telemetry_curves",
"incidents",
"policy",
"ground_truth",
]:
with open(task_dir / f"{name}.json") as f:
scenario[name] = json.load(f)
scenario["max_steps"] = scenario.get("ground_truth", {}).get("max_steps", 12)
return scenario
# ── ACTION HANDLERS ────────────────────────────────────
# Each returns (reward, ToolResult, list[RiskSignal])
def _handle_inspect_change(self, action: ReleaseAction):
section = action.section or "diff"
evidence_key = f"inspected_{section}"
change = self._scenario["change"]
if evidence_key in self._state.evidence_gathered:
return (
-0.05,
ToolResult(
tool_name="inspect_change",
success=True,
content=f"Already inspected '{section}'. No new information.",
),
[],
)
self._state.evidence_gathered.append(evidence_key)
if section == "diff":
content = change.get("diff_summary", "No diff available.")
payload = {"diff": content, "files": change.get("files_changed", [])}
elif section == "tests":
content = json.dumps(change.get("test_results", {}), indent=2)
payload = change.get("test_results", {})
elif section == "approvals":
content = json.dumps(change.get("approvals", {}), indent=2)
payload = change.get("approvals", {})
elif section == "files_changed":
files = change.get("files_changed", [])
content = "\n".join(files)
payload = {"files": files}
else:
content = "Unknown section."
payload = {}
new_risks = self._discover_risks_from_section(section)
reward = 0.10 if evidence_key in self._get_required_evidence() else 0.0
return (
reward,
ToolResult(
tool_name="inspect_change",
success=True,
content=content,
structured_payload=payload,
),
new_risks,
)
def _handle_inspect_services(self, action: ReleaseAction):
service_name = action.service
if not service_name:
return (
-0.02,
ToolResult(
tool_name="inspect_services",
success=False,
content="Must specify service. Use inspect_services with a service name.",
),
[],
)
services = self._scenario["services"].get("services", {})
if service_name not in services:
available = list(services.keys())
return (
0.0,
ToolResult(
tool_name="inspect_services",
success=False,
content=f"Service '{service_name}' not found. Available: {available}",
),
[],
)
evidence_key = f"inspected_service_{service_name}"
is_new = evidence_key not in self._state.evidence_gathered
if is_new:
self._state.evidence_gathered.append(evidence_key)
# Also mark the generic "inspected_services" evidence key (first service inspected)
if "inspected_services" not in self._state.evidence_gathered:
self._state.evidence_gathered.append("inspected_services")
svc = services[service_name]
new_risks: list[RiskSignal] = []
if is_new:
if svc.get("current_health") == "degraded":
new_risks.append(
RiskSignal(
signal_id=f"degraded_{service_name}",
category="dependency_health",
severity="medium",
summary=f"{service_name} is currently degraded",
numeric_value=svc.get("metrics", {}).get("error_rate"),
)
)
# Detect connection capacity risk for database services
max_conn = svc.get("max_connections")
cur_conn = svc.get("current_connections")
if max_conn and cur_conn:
change_services = self._scenario["change"].get("services_touched", [])
# Find pool size increase in the diff
diff = self._scenario["change"].get("diff_summary", "").lower()
if ("connection pool" in diff or "max_connections" in diff) and any(
s in service_name for s in ["postgres", "mysql", "db", "database"]
):
replicas = None
for svc_name, svc_data in self._scenario["services"].get("services", {}).items():
if svc_data.get("replicas"):
replicas = svc_data["replicas"]
break
# Try to estimate total connections (pool_size × replicas)
new_risks.append(
RiskSignal(
signal_id="db_connection_exhaustion",
category="resource_saturation",
severity="high",
summary=(
f"{service_name} at {cur_conn}/{max_conn} connections "
f"({int(cur_conn/max_conn*100)}% capacity). "
f"Pool increase may exhaust server connection limit."
),
numeric_value=cur_conn,
threshold=max_conn,
)
)
reward = 0.05 if is_new else -0.05
return (
reward,
ToolResult(
tool_name="inspect_services",
success=True,
content=json.dumps(svc, indent=2),
structured_payload=svc,
),
new_risks,
)
def _handle_inspect_dependencies(self, action: ReleaseAction):
evidence_key = "inspected_dependencies"
is_new = evidence_key not in self._state.evidence_gathered
if is_new:
self._state.evidence_gathered.append(evidence_key)
deps = self._scenario["services"].get("dependencies", {})
blast = self._scenario["change"].get("blast_radius", [])
payload = {
"dependencies": deps,
"blast_radius": blast,
"services_touched": self._scenario["change"].get("services_touched", []),
}
new_risks: list[RiskSignal] = []
if is_new and len(blast) >= 3:
new_risks.append(
RiskSignal(
signal_id="large_blast_radius",
category="blast_radius",
severity="high",
summary=f"Change affects {len(blast)} services: {blast}",
)
)
reward = (
0.10
if is_new and "inspected_dependencies" in self._get_required_evidence()
else (0.02 if is_new else -0.05)
)
return (
reward,
ToolResult(
tool_name="inspect_dependencies",
success=True,
content=json.dumps(payload, indent=2),
structured_payload=payload,
),
new_risks,
)
def _handle_search_incidents(self, action: ReleaseAction):
keywords = action.keywords or []
evidence_key = "checked_incidents"
is_new = evidence_key not in self._state.evidence_gathered
if is_new:
self._state.evidence_gathered.append(evidence_key)
# ── Scenario incidents (used for risk signal emission) ──────────────
scenario_incidents = self._scenario["incidents"].get("incidents", [])
if keywords:
matched_scenario = [
inc
for inc in scenario_incidents
if any(kw.lower() in json.dumps(inc).lower() for kw in keywords)
]
else:
matched_scenario = scenario_incidents
# ── Real incident database (SQLite) ────────────────────────���────────
db_results = self._query_incident_db(keywords)
# Combine: scenario incidents first (with similarity scores), then
# real-world incidents from the database for broader context.
all_results = matched_scenario + db_results
content = (
json.dumps(all_results, indent=2) if all_results else "No matching incidents found."
)
new_risks: list[RiskSignal] = []
if is_new:
for inc in matched_scenario:
if inc.get("similarity_to_current", 0) > 0.6:
new_risks.append(
RiskSignal(
signal_id=f"similar_incident_{inc['incident_id']}",
category="incident_similarity",
severity="high",
summary=f"Similar incident: {inc['title']} (similarity: {inc['similarity_to_current']})",
)
)
reward = (
0.10
if is_new and "checked_incidents" in self._get_required_evidence()
else (0.02 if is_new else -0.05)
)
if new_risks and any(r.severity == "high" for r in new_risks):
reward += 0.05
return (
reward,
ToolResult(
tool_name="search_incidents",
success=True,
content=content,
structured_payload={
"scenario_incidents": matched_scenario,
"historical_incidents": db_results,
"count": len(all_results),
},
),
new_risks,
)
def _query_incident_db(self, keywords: list[str]) -> list[dict]:
"""Query the real incident database for historically similar incidents."""
if not INCIDENTS_DB.exists():
return []
try:
conn = sqlite3.connect(f"file:{INCIDENTS_DB}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
if not keywords:
rows = conn.execute(
"SELECT id, title, company, category, severity, description, root_cause, url "
"FROM incidents ORDER BY severity DESC LIMIT 5"
).fetchall()
else:
conditions = " OR ".join(
["title LIKE ? OR description LIKE ? OR keywords LIKE ?"] * len(keywords)
)
params: list[str] = []
for kw in keywords:
like = f"%{kw.lower()}%"
params.extend([like, like, like])
rows = conn.execute(
f"SELECT id, title, company, category, severity, description, root_cause, url "
f"FROM incidents WHERE {conditions} "
f"ORDER BY severity DESC LIMIT 8",
params,
).fetchall()
conn.close()
return [dict(r) for r in rows]
except Exception:
return []
def _handle_check_policy(self, action: ReleaseAction):
evidence_key = "checked_policy"
is_new = evidence_key not in self._state.evidence_gathered
if is_new:
self._state.evidence_gathered.append(evidence_key)
policy = self._scenario["policy"]
new_risks = self._evaluate_policy_violations(policy) if is_new else []
reward = (
0.10
if is_new and "checked_policy" in self._get_required_evidence()
else (0.02 if is_new else -0.05)
)
if new_risks and any(r.severity in ("high", "critical") for r in new_risks):
reward += 0.05
return (
reward,
ToolResult(
tool_name="check_policy",
success=True,
content=json.dumps(policy, indent=2),
structured_payload=policy,
),
new_risks,
)
def _handle_query_telemetry(self, action: ReleaseAction):
metric = action.metric or "p99"
service = action.service
window = action.window or "5m"
if not service:
return (
-0.02,
ToolResult(
tool_name="query_telemetry",
success=False,
content="Must specify service.",
),
[],
)
curves = self._scenario["telemetry_curves"]
phase = self._state.rollout_phase
curve_key = {
"precheck": "pre_canary",
"canary": "during_canary",
"promoted": "after_promotion",
"rolled_back": "after_rollback",
"paused": "during_canary",
}.get(phase, "pre_canary")
phase_data = curves.get(curve_key, {})
service_data = phase_data.get(service, {})
# Support both "p99" and "p99_ms" key variants in telemetry JSON
metric_data = service_data.get(metric, [])
if not metric_data and metric in ("p50", "p95", "p99"):
metric_data = service_data.get(f"{metric}_ms", [])
if not metric_data:
return (
0.0,
ToolResult(
tool_name="query_telemetry",
success=True,
content=f"No {metric} data for {service} in {phase} phase.",
structured_payload={
"metric": metric,
"service": service,
"values": [],
},
),
[],
)
idx = min(self._state.steps_in_phase, len(metric_data) - 1)
current_value = metric_data[idx]
evidence_key = f"telemetry_{service}_{metric}_{phase}"
is_new = evidence_key not in self._state.evidence_gathered
if is_new:
self._state.evidence_gathered.append(evidence_key)
new_risks: list[RiskSignal] = []
if is_new and isinstance(current_value, (int, float)):
baseline = metric_data[0]
if isinstance(baseline, (int, float)) and baseline > 0:
ratio = current_value / baseline
if ratio > 2.0:
new_risks.append(
RiskSignal(
signal_id=f"{metric}_breach_{service}",
category="latency" if "p" in metric else "error_rate",
severity="critical" if ratio > 3.0 else "high",
summary=f"{service} {metric} is {ratio:.1f}x baseline ({current_value} vs {baseline})",
numeric_value=current_value,
threshold=baseline * 2,
)
)
reward = 0.05 if is_new else -0.05
if new_risks:
reward += 0.15
return (
reward,
ToolResult(
tool_name="query_telemetry",
success=True,
content=f"{service} {metric} ({window}, {phase} phase): {current_value}",
structured_payload={
"metric": metric,
"service": service,
"window": window,
"phase": phase,
"current_value": current_value,
"history": metric_data[: idx + 1],
},
),
new_risks,
)
def _handle_request_artifact(self, action: ReleaseAction):
artifact_type = action.artifact_type
if not artifact_type:
return (
-0.02,
ToolResult(
tool_name="request_artifact",
success=False,
content="Must specify artifact_type.",
),
[],
)
change = self._scenario["change"]
if artifact_type == "load_test":
data = change.get("test_results", {}).get("load_test")
elif artifact_type == "approval":
data = change.get("approvals", {})
elif artifact_type == "rollback_plan":
data = change.get("rollback_plan")
else:
data = change.get(artifact_type)
is_new = artifact_type not in self._state.artifacts_requested
if is_new:
self._state.artifacts_requested.append(artifact_type)
if data is None:
missing_code = f"MISSING_{artifact_type.upper()}"
new_risks: list[RiskSignal] = []
if is_new:
new_risks.append(
RiskSignal(
signal_id=f"missing_{artifact_type}",
category=(
"policy_violation"
if artifact_type in ("load_test", "approval")
else "compliance"
),
severity="high",
summary=f"Required artifact '{artifact_type}' is not available",
)
)
reward = (
0.20
if is_new and missing_code in self._get_required_reason_codes()
else (0.05 if is_new else -0.05)
)
return (
reward,
ToolResult(
tool_name="request_artifact",
success=False,
content=f"Artifact '{artifact_type}' is not available.",
),
new_risks,
)
content = (
json.dumps(data, indent=2) if isinstance(data, (dict, list)) else str(data)
)
reward = 0.05 if is_new else -0.05
return (
reward,
ToolResult(
tool_name="request_artifact",
success=True,
content=content,
structured_payload={"artifact_type": artifact_type, "data": data},
),
[],
)
def _handle_control_rollout(self, action: ReleaseAction):
decision = action.decision
if not decision:
return (
-0.02,
ToolResult(
tool_name="control_rollout",
success=False,
content="Must specify decision.",
),
[],
)
phase = self._state.rollout_phase
valid_transitions = {
"precheck": ["start_canary"],
"canary": ["pause", "promote", "rollback"],
"paused": ["promote", "rollback"],
}
if decision not in valid_transitions.get(phase, []):
allowed = valid_transitions.get(phase, [])
return (
-0.10,
ToolResult(
tool_name="control_rollout",
success=False,
content=f"Cannot '{decision}' from phase '{phase}'. Allowed: {allowed}",
),
[],
)
prev_phase = phase
reward = 0.0
new_risks: list[RiskSignal] = []
if decision == "start_canary":
self._state.rollout_phase = "canary"
self._state.steps_in_phase = 0
gt = self._scenario["ground_truth"]
reward = (
0.25
if "CANARY_REQUIRED" in gt.get("required_reason_codes", [])
else 0.05
)
elif decision == "pause":
self._state.rollout_phase = "paused"
reward = 0.05
elif decision == "promote":
self._state.rollout_phase = "promoted"
self._state.steps_in_phase = 0
gt = self._scenario["ground_truth"]
if gt.get("optimal_decision") in ("block", "request_changes"):
reward = -0.40
new_risks.append(
RiskSignal(
signal_id="catastrophic_promotion",
category="blast_radius",
severity="critical",
summary="Promoted a change that should have been blocked!",
)
)
self._state.terminal = True
self._state.rollout_phase = "terminal"
else:
reward = 0.15
elif decision == "rollback":
self._state.rollout_phase = "rolled_back"
self._state.steps_in_phase = 0
if phase == "canary" and self._has_observed_bad_telemetry():
reward = 0.30
else:
reward = 0.05
return (
reward,
ToolResult(
tool_name="control_rollout",
success=True,
content=f"Rollout phase: {prev_phase}{self._state.rollout_phase}",
structured_payload={
"previous_phase": prev_phase,
"new_phase": self._state.rollout_phase,
},
),
new_risks,
)
def _handle_submit_decision(self, action: ReleaseAction) -> ReleaseObservation:
"""Terminal action — grades the full trajectory."""
final_decision = action.final_decision
reason_codes = action.reason_codes or []
if not final_decision:
return ReleaseObservation(
done=False,
reward=-0.05,
task_id=self._state.task_id,
change_summary=self._scenario["change"]["description"],
known_risk_signals=self._get_visible_risks(),
allowed_actions=self._get_allowed_actions(),
rollout_phase=self._state.rollout_phase,
time_remaining=max(0, self._max_steps - self._state.step_count),
cumulative_reward=self._state.cumulative_reward,
metadata={"error": "Must specify final_decision."},
)
gt = self._scenario["ground_truth"]
grader_result = self._grade(final_decision, reason_codes, gt)
self._state.terminal = True
self._state.rollout_phase = "terminal"
self._state.final_score = grader_result["score"]
decision_reward = (
0.40
if grader_result["decision_correct"]
else (0.20 if grader_result["decision_partial"] else -0.15)
)
decision_reward += 0.10 * grader_result["risk_signal_discovery"]
self._state.cumulative_reward += decision_reward
return ReleaseObservation(
done=True,
reward=decision_reward,
task_id=self._state.task_id,
change_summary=self._scenario["change"]["description"],
known_risk_signals=self._get_visible_risks(),
last_tool_result=ToolResult(
tool_name="submit_decision",
success=True,
content=f"Decision: {final_decision}. Score: {grader_result['score']:.3f}",
structured_payload=grader_result,
),
allowed_actions=[],
rollout_phase="terminal",
time_remaining=0,
cumulative_reward=self._state.cumulative_reward,
final_score=grader_result["score"],
grader_breakdown=grader_result["breakdown"],
metadata={"final_decision": final_decision, "reason_codes": reason_codes},
)
# ── GRADER ─────────────────────────────────────────────
def _grade(
self, final_decision: str, reason_codes: list[str], ground_truth: dict
) -> dict:
"""Deterministic grading formula. Returns score in [0.0, 1.0].
Components:
0.35 evidence_coverage — did the agent inspect the right sources?
0.25 risk_signal_discovery — did the agent trigger discovery of key signals?
0.30 decision_correctness — was the final decision correct?
0.10 efficiency — did the agent avoid wasted steps?
risk_signal_discovery is objective: it checks state.risk_signals_found
(signal_ids actually emitted by the environment during the episode)
against required_risk_signals in ground_truth. This mirrors the
Calendar env SQL verifier pattern — the environment measures what the
agent observed, not what strings the agent typed.
"""
# 1. Evidence coverage (weight 0.35)
required_ev = set(ground_truth.get("required_evidence", []))
gathered_ev = set(self._state.evidence_gathered)
evidence_coverage = len(required_ev & gathered_ev) / max(len(required_ev), 1)
# 2. Risk signal discovery (weight 0.25) — objective: checks env-emitted signal_ids
required_signals = set(ground_truth.get("required_risk_signals", []))
discovered_signals = set(self._state.risk_signals_found)
if required_signals:
signal_discovery = len(required_signals & discovered_signals) / len(required_signals)
else:
# No required signals (e.g. trivial approve scenario) — full credit
signal_discovery = 1.0
# 3. Decision correctness (weight 0.30)
optimal = ground_truth.get("optimal_decision", "")
acceptable = set(ground_truth.get("acceptable_decisions", [optimal]))
if final_decision == optimal:
decision_score = 1.0
elif final_decision in acceptable:
decision_score = 0.5
else:
decision_score = 0.0
# 4. Efficiency (weight 0.10)
usage = self._state.step_count / self._max_steps
if 0.3 <= usage <= 0.7:
efficiency = 1.0
elif usage < 0.3:
efficiency = usage / 0.3
else:
efficiency = max(0.0, 1.0 - (usage - 0.7) / 0.3)
# 5. Forbidden action penalty
forbidden = ground_truth.get("forbidden_actions", [])
took_forbidden = any(fa in self._state.actions_taken for fa in forbidden)
forbidden_penalty = 0.3 if took_forbidden else 0.0
raw_score = (
0.35 * evidence_coverage
+ 0.25 * signal_discovery
+ 0.30 * decision_score
+ 0.10 * efficiency
)
score = normalize_score(raw_score - forbidden_penalty)
return {
"score": round(score, 3),
"decision_correct": final_decision == optimal,
"decision_partial": final_decision in acceptable,
"risk_signal_discovery": round(signal_discovery, 3),
"breakdown": {
"evidence_coverage": round(evidence_coverage, 3),
"risk_signal_discovery": round(signal_discovery, 3),
"decision_correctness": round(decision_score, 3),
"efficiency": round(efficiency, 3),
"forbidden_penalty": round(forbidden_penalty, 3),
},
}
# ── HELPERS ─────────────────────────────────────────────
def _get_allowed_actions(self) -> list[str]:
phase = self._state.rollout_phase
if phase == "terminal":
return []
base = [
"inspect_change",
"inspect_services",
"inspect_dependencies",
"search_incidents",
"check_policy",
"query_telemetry",
"request_artifact",
"submit_decision",
]
if phase in ("precheck", "canary", "paused"):
base.append("control_rollout")
return base
def _get_visible_risks(self) -> list[RiskSignal]:
return list(self._discovered_risks.values())
def _get_required_evidence(self) -> set[str]:
return set(self._scenario.get("ground_truth", {}).get("required_evidence", []))
def _get_required_reason_codes(self) -> set[str]:
return set(
self._scenario.get("ground_truth", {}).get("required_reason_codes", [])
)
def _discover_risks_from_section(self, section: str) -> list[RiskSignal]:
"""Surface-level risks visible from inspecting a change section."""
risks: list[RiskSignal] = []
change = self._scenario["change"]
if section == "tests":
test_results = change.get("test_results", {})
integration = test_results.get("integration", {})
if integration.get("failed", 0) > 0:
risks.append(
RiskSignal(
signal_id="integration_test_failure",
category="test_coverage",
severity="high",
summary=f"Integration test failures: {integration.get('failures', [])}",
)
)
if test_results.get("load_test") is None:
risks.append(
RiskSignal(
signal_id="missing_load_test",
category="test_coverage",
severity="medium",
summary="No load test results available",
)
)
elif integration.get("failed", 0) == 0:
risks.append(
RiskSignal(
signal_id="all_tests_passed",
category="test_coverage",
severity="low",
summary="All unit, integration, and load tests passed",
)
)
elif section == "approvals":
approvals = change.get("approvals", {})
pending = [role for role, status in approvals.items() if status == "pending"]
for role in pending:
risks.append(
RiskSignal(
signal_id=f"missing_approval_{role}",
category="missing_approval",
severity="high",
summary=f"Approval pending: {role}",
)
)
if not pending and approvals:
risks.append(
RiskSignal(
signal_id="all_approvals_granted",
category="compliance",
severity="low",
summary="All required approvals are in place",
)
)
elif section == "diff":
diff = change.get("diff_summary", "").lower()
if any(kw in diff for kw in ("backward-compatible", "backward compatible", "concurrently", "no code changes")):
risks.append(
RiskSignal(
signal_id="backward_compatible_change",
category="compliance",
severity="low",
summary="Change is backward-compatible — existing interfaces and data are unaffected",
)
)
if any(
kw in diff for kw in ("hot path", "sync", "synchronous", "blocking")
):
risks.append(
RiskSignal(
signal_id="hot_path_sync_io",
category="latency",
severity="high",
summary="Change introduces synchronous I/O on a hot path",
)
)
if "retry" in diff and any(kw in diff for kw in ("concurrency", "queue", "consumer")):
risks.append(
RiskSignal(
signal_id="retry_amplification_risk",
category="resource_saturation",
severity="high",
summary="Increased retry attempts combined with higher concurrency may cause thundering herd under partial failure",
)
)
if any(kw in diff for kw in ("connection pool", "max_connections", "pool size")):
risks.append(
RiskSignal(
signal_id="missing_capacity_check",
category="resource_saturation",
severity="high",
summary="Connection pool increase without documented capacity validation against database server limits",
)
)
# Removing a protection mechanism (rate limiting, circuit breaker, etc.)
# without a replacement is categorically critical — not fixable by adding approvals
if "rate limit" in diff and any(kw in diff for kw in ("remov", "disabl", "delet")):
risks.append(
RiskSignal(
signal_id="protection_mechanism_removal",
category="policy_violation",
severity="critical",
summary="Change removes rate limiting without introducing a replacement throttling mechanism — unthrottled traffic can cascade to downstream services",
)
)
return risks
def _evaluate_policy_violations(self, policy: dict) -> list[RiskSignal]:
"""Check for policy violations given current scenario state."""
risks: list[RiskSignal] = []
change = self._scenario["change"]
services = self._scenario["services"].get("services", {})
for rule in policy.get("rules", []):
rule_text = rule.get("rule", "").lower()
rule_id = rule.get("id", "")
# Rule: critical-tier services need owner approval
if "critical" in rule_text and "approval" in rule_text:
for svc_name in change.get("services_touched", []):
svc = services.get(svc_name, {})
if svc.get("tier") == "critical":
if (
change.get("approvals", {}).get("service_owner")
== "pending"
):
risks.append(
RiskSignal(
signal_id="policy_missing_owner_approval",
category="policy_violation",
severity="high",
summary=f"{rule_id}: Critical service '{svc_name}' requires owner approval (pending)",
)
)
# Rule: peak traffic window
if "peak" in rule_text and "traffic" in rule_text:
if policy.get("traffic_window") == "peak":
risks.append(
RiskSignal(
signal_id="policy_peak_traffic",
category="policy_violation",
severity="high",
summary=f"{rule_id}: Current time is peak traffic window ({policy.get('current_time', '?')})",
)
)
# Rule: load test required
if "load test" in rule_text:
if change.get("test_results", {}).get("load_test") is None:
risks.append(
RiskSignal(
signal_id="policy_missing_load_test",
category="policy_violation",
severity="high",
summary=f"{rule_id}: Load test required but not provided",
)
)
# Rule: canary required for schema/migration or retry/queue changes
if "canary" in rule_text and any(kw in rule_text for kw in ("schema", "migration", "retry", "queue", "concurrency")):
diff = change.get("diff_summary", "").lower()
is_schema = any(kw in diff for kw in ("migration", "schema", "index", "create index", "alter table"))
is_retry_queue = any(kw in diff for kw in ("retry", "concurrency", "queue consumer"))
if is_schema or is_retry_queue:
risks.append(
RiskSignal(
signal_id="canary_required_policy",
category="policy_violation",
severity="high",
summary=f"{rule_id}: Change type requires canary deployment before full promotion",
)
)
return risks
def _has_observed_bad_telemetry(self) -> bool:
"""True if agent has seen a metric threshold breach during canary."""
return any("breach" in sig_id for sig_id in self._state.risk_signals_found)