atlasops / config /runtime.py
Harikishanth R
fix: skip-kubectl + scroll + health β€” HF Space ready
7e9a520
"""Central runtime constants and reward contract helpers.
Keeping these values in one module avoids config drift across benchmark,
training, and evaluation pipelines.
"""
from __future__ import annotations
import json
import math
import random
from typing import Any
FROZEN_SCENARIOS = [
# single-fault
"single_fault/sf-001", "single_fault/sf-002", "single_fault/sf-003", "single_fault/sf-004",
"single_fault/sf-005", "single_fault/sf-006", "single_fault/sf-007", "single_fault/sf-008",
# cascade
"cascade/cs-001", "cascade/cs-002", "cascade/cs-003", "cascade/cs-004", "cascade/cs-005",
# multi-fault
"multi_fault/mf-001", "multi_fault/mf-002", "multi_fault/mf-003", "multi_fault/mf-004",
"multi_fault/mf-005",
# named replays
"named_replays/hist-cloudflare-2019", "named_replays/hist-aws-s3-2017",
"named_replays/hist-github-2018", "named_replays/hist-datadog-2023",
"named_replays/hist-discord-2022", "named_replays/hist-fastly-2021",
"named_replays/hist-facebook-bgp-2021", "named_replays/hist-slack-2022",
"named_replays/hist-azure-dns-2019", "named_replays/hist-knight-capital-2012",
]
EVAL_SCENARIOS_BY_TIER = {
"single_fault": [
"single_fault/sf-001", "single_fault/sf-002", "single_fault/sf-003",
"single_fault/sf-004", "single_fault/sf-005",
],
"cascade": ["cascade/cs-001", "cascade/cs-002", "cascade/cs-003"],
"named_replays": [
"named_replays/hist-cloudflare-2019",
"named_replays/hist-github-2018",
"named_replays/hist-discord-2022",
],
}
LEADERBOARD_SCENARIOS = [
("single_fault/sf-001", "single_fault"),
("single_fault/sf-002", "single_fault"),
("single_fault/sf-006", "single_fault"),
("cascade/cs-001", "cascade"),
("cascade/cs-002", "cascade"),
("named_replays/hist-cloudflare-2019", "named_replays"),
("named_replays/hist-github-2018", "named_replays"),
]
SCENARIOS_BY_TIER = {
"single_fault": [f"single_fault/sf-{i:03d}" for i in range(1, 9)],
"cascade": [f"cascade/cs-{i:03d}" for i in range(1, 6)],
"multi_fault": [f"multi_fault/mf-{i:03d}" for i in range(1, 6)],
"named_replays": [
"named_replays/hist-cloudflare-2019",
"named_replays/hist-github-2018",
"named_replays/hist-discord-2022",
"named_replays/hist-datadog-2023",
"named_replays/hist-aws-s3-2017",
],
}
TIER_SAMPLING_WEIGHTS = {
"single_fault": 0.20,
"cascade": 0.30,
"multi_fault": 0.25,
"named_replays": 0.25,
}
SPEED_MIDPOINTS = {
"warmup": 90.0,
"single_fault": 150.0,
"cascade": 240.0,
"multi_fault": 300.0,
"named_replays": 300.0,
"adversarial": 360.0,
}
BASE_REWARD_WEIGHTS = {
"r_resolve": 0.35,
"r_speed": 0.15,
"r_evidence": 0.20,
"r_safety": 0.20,
"r_comms": 0.10,
}
def clamp01(v: float) -> float:
return max(0.0, min(1.0, v))
def bounded_speed_score(seconds: float, tier: str) -> float:
midpoint = SPEED_MIDPOINTS.get(tier, 240.0)
slope = 40.0
return max(0.0, min(1.0, 1.0 / (1.0 + math.exp((seconds - midpoint) / slope))))
def evaluate_reward_contract(episode: dict[str, Any]) -> dict[str, Any]:
"""Tier-aware anti-gaming reward contract used across train/eval/bench.
Improvements over baseline:
+ Red herring bonus (+0.15): rewards agents that dismiss misleading symptoms
on multi-fault/adversarial/named-replay tiers (scored by 72B judge)
+ Phase skip penalty (-0.20): penalises agents that skip investigation phases
+ Lazy investigation penalty (-0.15): penalises fast resolution without evidence
gathering on hard tiers (likely guessed, not diagnosed)
"""
tier = str(episode.get("tier", "unknown"))
resolved = bool(episode.get("resolved", False))
outcome = str(episode.get("outcome", "unknown"))
turns = int(episode.get("total_turns", 0))
ttr = float(episode.get("time_to_resolve_s", 9999))
judge = episode.get("judge", {}) or {}
reasoning = float(judge.get("reasoning", 0.0))
correctness = float(judge.get("correctness", 0.0))
efficiency = float(judge.get("efficiency", 0.0))
red_herring = float(judge.get("red_herring_handling", 0.5))
r_resolve = 1.0 if resolved else (0.5 if outcome == "partial" else 0.0)
r_speed = bounded_speed_score(ttr, tier)
r_evidence = clamp01((reasoning + correctness) / 2.0)
r_safety = clamp01(efficiency)
r_comms = 1.0 if episode.get("postmortem_path") else 0.3
# ── Red herring bonus ─────────────────────────────────────────────────────
# Only awarded on tiers that contain misleading symptoms. Requires the 72B
# judge to score red_herring_handling >= 0.8 (correctly dismissed red herrings).
_rh_tiers = {"multi_fault", "named_replays", "adversarial"}
red_herring_bonus = 0.15 if (tier in _rh_tiers and red_herring >= 0.8) else 0.0
# ── Penalties ─────────────────────────────────────────────────────────────
penalties = {
"command_spam": 0.10 if turns > 40 else 0.0,
"false_resolution": 0.25 if (not resolved and outcome == "resolved") else 0.0,
"unsafe_shortcut": 0.20 if efficiency < 0.3 else 0.0,
"hallucinated_evidence": 0.20 if (reasoning < 0.25 and correctness < 0.5) else 0.0,
"over_silence": 0.10 if ("silence" in json.dumps(episode).lower() and not resolved) else 0.0,
# Phase ordering: too few turns without resolution = investigation was skipped
"phase_skip": 0.20 if (turns < 4 and not resolved) else 0.0,
# Lazy investigation: suspiciously fast resolution on hard tiers without
# enough tool calls suggests the agent guessed rather than diagnosed
"lazy_investigation": 0.15 if (
resolved and turns < 5
and tier not in ("warmup", "single_fault")
) else 0.0,
}
# ── Tier-specific weight adjustments ─────────────────────────────────────
weights = dict(BASE_REWARD_WEIGHTS)
if tier == "single_fault":
weights.update({"r_evidence": 0.25, "r_speed": 0.10})
elif tier == "cascade":
weights.update({"r_resolve": 0.30, "r_evidence": 0.25, "r_speed": 0.10})
elif tier == "multi_fault":
weights.update({"r_safety": 0.25, "r_evidence": 0.25, "r_speed": 0.10})
elif tier in ("adversarial", "named_replays"):
penalties = {k: v * 1.25 for k, v in penalties.items()}
weights.update({"r_safety": 0.25, "r_evidence": 0.25, "r_speed": 0.05})
weighted = (
weights["r_resolve"] * r_resolve
+ weights["r_speed"] * r_speed
+ weights["r_evidence"] * r_evidence
+ weights["r_safety"] * r_safety
+ weights["r_comms"] * r_comms
+ red_herring_bonus
)
penalty_total = sum(penalties.values())
total = clamp01(weighted - penalty_total)
return {
"components": {
"resolve": round(r_resolve, 4),
"speed": round(r_speed, 4),
"evidence": round(r_evidence, 4),
"safety": round(r_safety, 4),
"comms": round(r_comms, 4),
"red_herring_bonus": round(red_herring_bonus, 4),
},
"penalties": {k: round(v, 4) for k, v in penalties.items()},
"penalty_total": round(penalty_total, 4),
"total": round(total, 4),
}
# ── Spaced-Repetition Curriculum Manager ─────────────────────────────────────
class CurriculumManager:
"""Adaptive scenario selector with spaced repetition.
Priority score = +100 (novel) + 50Γ—(1βˆ’success_rate) + 30 (SR bonus) βˆ’ 20 (recency).
Graduated scenarios resurface at intervals [3, 6, 12, 24, 48] episodes;
pass β†’ double interval, fail β†’ reset to 3.
"""
SPACED_REP_INTERVALS = [3, 6, 12, 24, 48]
MASTERY_THRESHOLD = 0.7
MASTERY_WINDOW = 10
MASTERY_DECAY = 0.85
MIN_ATTEMPTS_FOR_MASTERY = 3
def __init__(self) -> None:
# scenario_id β†’ list of (episode_idx, reward) tuples
self._history: dict[str, list[tuple[int, float]]] = {}
# scenario_id β†’ index into SPACED_REP_INTERVALS
self._graduated: dict[str, int] = {}
# scenario_id β†’ episode number when it should resurface
self._next_resurface: dict[str, int] = {}
# last 2 scenario IDs (recency penalty)
self._recent: list[str] = []
self._episode_count = 0
def record(self, scenario_id: str, resolved: bool, reward: float) -> None:
"""Call after every episode to update mastery and spaced-rep state."""
self._history.setdefault(scenario_id, []).append(
(self._episode_count, reward)
)
self._recent = (self._recent + [scenario_id])[-2:]
if scenario_id in self._graduated:
# Already graduated β€” update interval based on result
idx = self._graduated[scenario_id]
if resolved:
idx = min(idx + 1, len(self.SPACED_REP_INTERVALS) - 1)
else:
idx = 0
self._graduated[scenario_id] = idx
elif self._is_mastered(scenario_id):
# Newly mastered β†’ enter spaced repetition
self._graduated[scenario_id] = 0
if scenario_id in self._graduated:
interval = self.SPACED_REP_INTERVALS[self._graduated[scenario_id]]
self._next_resurface[scenario_id] = self._episode_count + interval
self._episode_count += 1
def next_scenario(self, pool: list[tuple[str, str]]) -> tuple[str, str]:
"""Return (scenario_id, tier) chosen by priority scoring.
Uses soft-max over the top-3 candidates so training isn't fully
deterministic (avoids overfitting to the argmax scenario).
"""
if not pool:
pool = [(s, s.split("/")[0]) for s in FROZEN_SCENARIOS]
scored = sorted(
((self._priority_score(sid), sid, tier) for sid, tier in pool),
reverse=True,
)
top = scored[: min(3, len(scored))]
weights = [max(s, 0.01) for s, _, _ in top]
total_w = sum(weights)
probs = [w / total_w for w in weights]
chosen_idx = random.choices(range(len(top)), weights=probs)[0]
_, scenario_id, tier = top[chosen_idx]
return scenario_id, tier
def stats(self) -> dict[str, Any]:
return {
"total_episodes": self._episode_count,
"scenarios_tried": len(self._history),
"graduated": len(self._graduated),
"due_for_resurface": sum(
1 for sid, ep in self._next_resurface.items()
if self._episode_count >= ep
),
}
# ── internals ──
def _success_rate(self, scenario_id: str) -> float:
history = self._history.get(scenario_id, [])
if not history:
return 0.0
window = history[-self.MASTERY_WINDOW :]
weighted_sum = total_w = 0.0
for i, (_, reward) in enumerate(window):
w = self.MASTERY_DECAY ** (len(window) - 1 - i)
weighted_sum += w * reward
total_w += w
return weighted_sum / total_w if total_w else 0.0
def _is_mastered(self, scenario_id: str) -> bool:
history = self._history.get(scenario_id, [])
return (
len(history) >= self.MIN_ATTEMPTS_FOR_MASTERY
and self._success_rate(scenario_id) >= self.MASTERY_THRESHOLD
)
def _priority_score(self, scenario_id: str) -> float:
score = 0.0
history = self._history.get(scenario_id, [])
if not history:
score += 100.0 # novelty bonus
score += 50.0 * (1.0 - self._success_rate(scenario_id)) # weakness targeting
if (
scenario_id in self._next_resurface
and self._episode_count >= self._next_resurface[scenario_id]
):
score += 30.0 # spaced-rep resurface bonus
if scenario_id in self._recent:
score -= 20.0 # recency penalty
return score
# ── Dense Per-Step Reward Tracker ─────────────────────────────────────────────
class StepRewardTracker:
"""Accumulates dense per-tool-call rewards within one agent role's turn loop.
Base formula (progress-based):
progress_delta Γ— 0.8 + 0.1 if forward progress
Γ— 0.5 if tool failed
βˆ’ 0.1 per rollback
Tool category bonuses (per-action scoring):
+0.05 investigative tool success (evidence gathering rewarded)
+0.08 mutating tool success (remediation action rewarded)
βˆ’0.08 mutating tool failure (failed fix penalised harder)
βˆ’0.05 redundant call (exact same tool+args seen before)
Clamped to [βˆ’0.5, 0.99].
Partial progress = success_count / total_calls (monotonic β€” never decreases).
"""
_MUTATING = frozenset({
"argocd_rollback", "kubectl_rollout", "kubectl_scale", "alertmanager_silence",
})
_INVESTIGATIVE = frozenset({
"promql_query", "promql_query_range", "jaeger_search", "jaeger_get_trace",
"kubectl_logs", "kubectl_describe", "alertmanager_list_alerts",
"gcloud_logs_read", "cloud_monitoring_query",
})
def __init__(self) -> None:
self._calls: list[tuple[str, dict]] = []
self._success = 0
self._fail = 0
self._rollbacks = 0
self._partial_progress = 0.0
self._step_rewards: list[float] = []
def record(self, tool_name: str, args: dict, output: dict) -> float:
"""Record one tool call. Returns the per-step reward for this call."""
success = bool(output.get("success", True)) and "error" not in output
rollback = self._detect_rollback(tool_name, args)
if rollback:
self._rollbacks += 1
idempotent_retry = success and self._was_tried(tool_name, args)
self._calls.append((tool_name, args))
if success:
self._success += 1
else:
self._fail += 1
# Monotonic partial-progress
total = self._success + self._fail
prev = self._partial_progress
self._partial_progress = max(self._success / max(total, 1), prev)
delta = self._partial_progress - prev
r = delta * 0.8
if delta > 0:
r += 0.1
if not success:
r *= 0.5
if rollback:
r -= 0.1
if idempotent_retry:
r += 0.02
# ── Tool category bonuses ──────────────────────────────────────────
if success and tool_name in self._INVESTIGATIVE:
r += 0.05 # reward evidence gathering
if success and tool_name in self._MUTATING:
r += 0.08 # reward successful remediation
if not success and tool_name in self._MUTATING:
r -= 0.08 # extra penalty for failed mutating action
if not idempotent_retry and self._was_tried(tool_name, args):
r -= 0.05 # penalty for redundant call with same args
r = max(-0.5, min(0.99, r))
self._step_rewards.append(r)
return r
def total(self) -> float:
return sum(self._step_rewards)
def partial_progress(self) -> float:
return self._partial_progress
def summary(self) -> dict[str, Any]:
return {
"success_count": self._success,
"fail_count": self._fail,
"rollback_count": self._rollbacks,
"partial_progress": round(self._partial_progress, 4),
"dense_reward_total": round(self.total(), 4),
"step_rewards": [round(r, 4) for r in self._step_rewards],
}
def _detect_rollback(self, tool_name: str, args: dict) -> bool:
if tool_name not in self._MUTATING:
return False
resource = args.get("deployment") or args.get("app") or args.get("resource") or ""
return any(
t == tool_name
and (a.get("deployment") or a.get("app") or a.get("resource")) == resource
for t, a in self._calls
)
def _was_tried(self, tool_name: str, args: dict) -> bool:
return any(t == tool_name and a == args for t, a in self._calls[:-1])
# Module-level curriculum singleton shared across bench/train/eval
curriculum = CurriculumManager()