from __future__ import annotations import copy import random import re import uuid from typing import Any from difficulty_controller import ( GLOBAL_DIFFICULTY_CONTROLLER, DifficultyController, DifficultyProfile, ) from graders import ( grade_task1_step, grade_task2_step, grade_task2_terminal, grade_task3_step, grade_task3_terminal, ) from scenarios import sample_scenario, get_scenario, Scenario from sentinel_config import CRITICAL_POISON_STAKES, VERIFY_EXTRA_STEP_COST from specialists import SpecialistPool from task_graph import TaskGraph from trust_ledger import TrustLedger # --------------------------------------------------------------------------- # Lazy Pydantic imports — falls back to plain dicts if not installed # --------------------------------------------------------------------------- try: from models import ( SentinelObservation, SentinelAction, SentinelReward, SentinelState, StepResult, EpisodeStatus, TaskType, ActionType, SpecialistId, ResetRequest, ) _PYDANTIC = True except ImportError: _PYDANTIC = False # Hidden ground-truth reliability for each specialist FSM type. # These are NEVER sent to the agent — only used by the reward engine # to compute calibration score. _GROUND_TRUTH_RELIABILITY: dict[str, float] = { "S0": 0.90, # AccurateSlow "S1": 0.60, # OverconfidentFast "S2": 0.70, # DomainBound (avg of in/out domain) "S3": 0.15, # Adversarial (unreliable overall) "S4": 0.65, # Degrading (avg over calls) } MAX_STEPS = { "task1": 15, "task2": 30, "task3": 45, } class SentinelEnv: """ Core environment class — mirrors Round 1 IncidentEnv pattern exactly. reset() / step() / state() API. No BaseEnv subclassing needed — plain Python class, FastAPI wraps it. """ def __init__(self) -> None: self.current_scenario: Scenario | None = None self.episode_id: str = "" self.session_id: str = "" self.step_count: int = 0 self.max_steps: int = 0 self.total_reward: float = 0.0 self.reward_events: int = 0 self.last_reward: float = 0.0 self.done: bool = False self.episode_status: str = "active" self.last_action_summary: str | None = None self._reward_trace: list[dict[str, Any]] = [] self._graph: TaskGraph | None = None self._ledger: TrustLedger = TrustLedger() self._pool: SpecialistPool = SpecialistPool() self._rng: random.Random = random.Random() self._difficulty_controller: DifficultyController = GLOBAL_DIFFICULTY_CONTROLLER self._difficulty_profile: DifficultyProfile = DifficultyProfile() # ------------------------------------------------------------------ # reset() # ------------------------------------------------------------------ def reset( self, task_type: str | None = None, scenario_id: str | None = None, seed: int | None = None, adaptive: bool = False, ) -> dict: self._rng = random.Random(seed) # Select scenario if scenario_id: scenario = get_scenario(scenario_id) else: task = task_type or "task3" scenario = sample_scenario(task, seed=seed) self._difficulty_profile = self._difficulty_controller.profile(adaptive=adaptive) scenario = self._apply_difficulty_profile(scenario, self._difficulty_profile) self.current_scenario = scenario self.episode_id = str(uuid.uuid4()) self.session_id = str(uuid.uuid4()) self.step_count = 0 self.max_steps = max( len(scenario["subtasks"]), MAX_STEPS[scenario["task_type"]] - self._difficulty_profile.verify_budget_penalty, ) self.total_reward = 0.0 self.reward_events = 0 self.last_reward = 0.0 self.done = False self.episode_status = "active" self.last_action_summary = None self._reward_trace = [] # Reset subcomponents self._graph = TaskGraph(scenario) self._ledger.reset() self._pool.reset(seed=seed) self._pool.configure_adversary( stakes_threshold=self._difficulty_profile.adversarial_threshold, benign_confidence=self._difficulty_profile.adversary_benign_confidence, poison_confidence=self._difficulty_profile.adversary_poison_confidence, ) return self._build_step_result( reward_value=0.0, reason="Episode initialized.", breakdown={}, done=False, extra_info={"episode_id": self.episode_id, "session_id": self.session_id}, ) # ------------------------------------------------------------------ # step() # ------------------------------------------------------------------ def step(self, action: dict) -> dict: if self.current_scenario is None: raise RuntimeError("Call reset() before step().") if self.done: raise RuntimeError("Episode already completed. Call reset().") # --- Validate session --- if action.get("session_id") and action["session_id"] != self.session_id: raise ValueError( f"session_id mismatch: expected '{self.session_id}', got '{action['session_id']}'" ) action_type = action.get("action_type", "delegate") specialist_id = action.get("specialist_id") task_type = self.current_scenario["task_type"] # --- Validate action fields --- if action_type in ("delegate", "verify") and not specialist_id: raise ValueError(f"action_type='{action_type}' requires specialist_id.") if action_type == "solve_independently" and not action.get("subtask_response"): raise ValueError("action_type='solve_independently' requires subtask_response.") # --- Get current subtask --- node = self._graph.current_node() if node is None: # All nodes done — emit terminal reward return self._terminal_reward() subtask = node.subtask stakes = subtask["stakes"] confidence: float | None = None result_metadata: dict[str, Any] = {} trust_before = self._ledger.trust(specialist_id) if specialist_id else None step_cost = 1 # --- Execute specialist or self-solve --- if action_type == "skip": self._graph.skip_node(subtask["id"]) outcome = 0.0 was_adversarial = False self.last_action_summary = f"Skipped {subtask['id']}" elif action_type == "solve_independently": # Agent solves itself — always correct (no specialist involved) # But costs 2 steps (enforced via max_steps budget pressure). step_cost = 2 outcome = 1.0 was_adversarial = False self._graph.record_outcome(subtask["id"], outcome, "SELF") self.last_action_summary = f"Self-solved {subtask['id']}" elif action_type == "verify": # First get result, then verify (costs +1 step) result = self._pool.execute( specialist_id, subtask["description"], stakes, self._rng, domain=subtask.get("domain"), ) step_cost = int(result.metadata.get("step_cost", 1)) + VERIFY_EXTRA_STEP_COST outcome = result.outcome if not result.is_adversarial else 0.0 was_adversarial = result.is_adversarial confidence = result.confidence result_metadata = dict(result.metadata) # Verification means agent caught adversarial — treat as detection if result.is_adversarial: outcome = 1.0 # successfully avoided self._graph.record_outcome(subtask["id"], outcome, specialist_id, was_adversarial) # Important: trust must learn from the specialist's raw answer, not # from the corrected/avoided graph outcome. If S0 was caught lying, # the task node is safe, but S0's trust should still drop. self._ledger.update( specialist_id, result.outcome, stakes, confidence=result.confidence, domain=subtask.get("domain"), ) self.last_action_summary = f"Verified {specialist_id} on {subtask['id']}" else: # delegate result = self._pool.execute( specialist_id, subtask["description"], stakes, self._rng, domain=subtask.get("domain"), ) step_cost = int(result.metadata.get("step_cost", 1)) was_adversarial = result.is_adversarial outcome = 0.0 if was_adversarial else result.outcome confidence = result.confidence result_metadata = dict(result.metadata) self._graph.record_outcome(subtask["id"], outcome, specialist_id, was_adversarial) self._ledger.update( specialist_id, result.outcome, stakes, confidence=result.confidence, domain=subtask.get("domain"), ) self.last_action_summary = f"Delegated to {specialist_id} on {subtask['id']}" self.step_count += max(1, step_cost) # --- Grade this step --- reward_value, reason, breakdown = self._grade_step( task_type, action_type, specialist_id, outcome, stakes, was_adversarial, confidence, result_metadata, trust_before, ) self.last_reward = reward_value self.total_reward += reward_value self.reward_events += 1 self._record_reward_event( kind="step", action_type=action_type, specialist_id=specialist_id, subtask=subtask, stakes=stakes, reward_value=reward_value, reason=reason, breakdown=breakdown, was_adversarial=was_adversarial, confidence=confidence, result_metadata=result_metadata, trust_before=trust_before, ) # --- Check episode end --- all_done = self._graph.is_done() budget_gone = self.step_count >= self.max_steps poisoned = was_adversarial and action_type == "delegate" and stakes >= CRITICAL_POISON_STAKES if all_done or budget_gone or poisoned: # Emit terminal reward on top of step reward return self._terminal_reward( step_reward=reward_value, step_reason=reason, step_breakdown=breakdown, forced_end=poisoned, ) return self._build_step_result(reward_value, reason, breakdown, done=False) # ------------------------------------------------------------------ # state() # ------------------------------------------------------------------ def state(self, session_id: str | None = None) -> dict: if self.current_scenario is None: raise RuntimeError("No active episode. Call reset() first.") return { "episode_id": self.episode_id, "session_id": session_id or self.session_id, "step_count": self.step_count, "max_steps": self.max_steps, "total_reward": round(self.total_reward, 4), "score": round(self.normalized_score(), 4), "done": self.done, "scenario_id": self.current_scenario["scenario_id"], "task_type": self.current_scenario["task_type"], "difficulty": self._difficulty(), "status": self.episode_status, "last_reward": round(self.last_reward, 4), "subtasks_completed": self._graph.subtasks_completed(), "subtasks_total": self._graph.subtasks_total(), "trust_snapshot": self._ledger.snapshot(), "adversarial_detections": self._graph.adversarial_detections(), "adversarial_poisonings": self._graph.adversarial_poisonings(), "behavioral_fingerprints": self._ledger.behavioral_fingerprints(), "difficulty_profile": self._difficulty_profile.to_dict(), } # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _grade_step( self, task_type: str, action_type: str, specialist_id: str | None, outcome: float, stakes: float, was_adversarial: bool, confidence: float | None, result_metadata: dict[str, Any], trust_score: float | None, ) -> tuple[float, str, dict]: if task_type == "task1": return grade_task1_step( chosen_specialist=specialist_id or "SELF", specialist_outcome=outcome, stakes=stakes, was_adversarial=was_adversarial, action_type=action_type, confidence=confidence, result_metadata=result_metadata, trust_score=trust_score, ) elif task_type == "task2": return grade_task2_step( specialist_outcome=outcome, action_type=action_type, step_count=self.step_count, max_steps=self.max_steps, confidence=confidence, result_metadata=result_metadata, ) else: # task3 return grade_task3_step( specialist_outcome=outcome, stakes=stakes, was_adversarial=was_adversarial, action_type=action_type, step_count=self.step_count, max_steps=self.max_steps, confidence=confidence, result_metadata=result_metadata, trust_score=trust_score, ) def _terminal_reward( self, step_reward: float = 0.0, step_reason: str = "", step_breakdown: dict | None = None, forced_end: bool = False, ) -> dict: task_type = self.current_scenario["task_type"] if task_type == "task1": # Terminal for task1: completion-based if no step reward provided from graders import clamp if step_reward > 0.0: terminal_value = step_reward terminal_reason = step_reason terminal_breakdown = step_breakdown or {} else: completion = self._graph.completion_rate() terminal_value = clamp(0.4 + 0.5 * completion) terminal_reason = f"Task1 complete. Completion={completion:.0%}." terminal_breakdown = {"completion_rate": round(completion, 3)} elif task_type == "task2": terminal_value, terminal_reason, terminal_breakdown = grade_task2_terminal( self._graph, self._ledger, self._public_ground_truth_reliability() ) else: terminal_value, terminal_reason, terminal_breakdown = grade_task3_terminal( self._graph, self._ledger, self._public_ground_truth_reliability(), self.step_count, self.max_steps, ) if forced_end: terminal_value = 0.01 terminal_reason = "Critical adversarial poison accepted at peak stakes — episode failed." self.last_reward = terminal_value self.total_reward += terminal_value self.reward_events += 1 self.done = True self.episode_status = "failed" if forced_end else "completed" self._record_reward_event( kind="terminal", action_type="terminal", specialist_id=None, subtask=None, stakes=0.0, reward_value=terminal_value, reason=terminal_reason, breakdown=terminal_breakdown, was_adversarial=False, confidence=None, result_metadata={}, trust_before=None, ) if self._difficulty_profile.adaptive: self._difficulty_controller.update( { "adversarial_detections": self._graph.adversarial_detections(), "adversarial_poisonings": self._graph.adversarial_poisonings(), "adversarial_encounters": ( self._graph.adversarial_detections() + self._graph.adversarial_poisonings() ), } ) return self._build_step_result( terminal_value, terminal_reason, terminal_breakdown, done=True, extra_info={ **self._graph.summary(), "trust_snapshot": self._ledger.snapshot(), "forced_end": forced_end, "difficulty_profile": self._difficulty_profile.to_dict(), "reward_report": self.reward_report(), }, ) def _build_step_result( self, reward_value: float, reason: str, breakdown: dict, done: bool, extra_info: dict | None = None, ) -> dict: node = self._graph.current_node() if self._graph and not done else None subtask_index = self._graph.node_index(node.subtask["id"]) if node else ( self._graph.subtasks_total() if self._graph else 0 ) obs = { "session_id": self.session_id, "scenario_id": self.current_scenario["scenario_id"] if self.current_scenario else "", "task_type": self.current_scenario["task_type"] if self.current_scenario else "", "difficulty": self._difficulty(), "task_description": self.current_scenario["description"] if self.current_scenario else "", "current_subtask": node.subtask["description"] if node else "All subtasks complete.", "subtask_index": subtask_index, "subtasks_total": self._graph.subtasks_total() if self._graph else 0, "subtasks_remaining": self._graph.subtasks_remaining() if self._graph else 0, "available_specialists": self._pool.available_ids(), "trust_snapshot": self._ledger.snapshot(), "behavioral_fingerprints": self._ledger.behavioral_fingerprints(), "difficulty_profile": self._difficulty_profile.to_dict(), "stakes_level": node.subtask["stakes"] if node else 0.0, "step_count": self.step_count, "max_steps": self.max_steps, "last_action_summary": self.last_action_summary, "last_reward": round(self.last_reward, 4), "episode_status": self.episode_status, } reward = { "value": round(reward_value, 4), "reason": reason, "signal_breakdown": breakdown, } info = { "episode_id": self.episode_id, "session_id": self.session_id, "step_count": self.step_count, "max_steps": self.max_steps, "total_reward": round(self.total_reward, 4), "score": round(self.normalized_score(), 4), } if extra_info: info.update(extra_info) return {"observation": obs, "reward": reward, "done": done, "info": info} def _difficulty(self) -> str: return {"task1": "easy", "task2": "medium", "task3": "hard"}.get( self.current_scenario["task_type"] if self.current_scenario else "task3", "hard" ) def normalized_score(self) -> float: """ Mean reward over emitted grading events, normalized to 0.0-1.0. This is intentionally not a cumulative return. The terminal reward and efficiency terms already penalize unfinished or wasteful episodes, while this scalar stays comparable across tasks with different horizons. """ if self.reward_events <= 0: return 0.0 return max(0.0, min(1.0, self.total_reward / self.reward_events)) def _public_ground_truth_reliability(self) -> dict[str, float]: return self._pool.public_ground_truth_reliability(_GROUND_TRUTH_RELIABILITY) def stream_snapshot(self) -> dict: return { "session_id": self.session_id, "step_count": self.step_count, "max_steps": self.max_steps, "done": self.done, "trust_snapshot": self._ledger.snapshot(), "behavioral_fingerprints": self._ledger.behavioral_fingerprints(), "difficulty_profile": self._difficulty_profile.to_dict(), "last_action_summary": self.last_action_summary, "last_reward": round(self.last_reward, 4), } def reward_report(self) -> dict: return { "episode_id": self.episode_id, "session_id": self.session_id, "task_type": self.current_scenario["task_type"] if self.current_scenario else "", "score": round(self.normalized_score(), 4), "total_reward": round(self.total_reward, 4), "reward_events": self.reward_events, "component_averages": self._reward_component_averages(), "events": list(self._reward_trace), "formula": { "task1_step": "0.43 accuracy + 0.30 stakes + 0.12 efficiency + 0.07 confidence + 0.04 domain + 0.04 verify", "task2_step": "0.55 accuracy + 0.25 efficiency + 0.10 confidence + 0.10 domain", "task3_step": "0.32 accuracy + 0.33 stakes + 0.10 efficiency + 0.10 confidence + 0.10 verify + 0.05 domain", "task3_terminal": "0.35 completion + 0.30 detection + 0.25 calibration + 0.10 efficiency", }, } def _record_reward_event( self, kind: str, action_type: str, specialist_id: str | None, subtask: dict[str, Any] | None, stakes: float, reward_value: float, reason: str, breakdown: dict, was_adversarial: bool, confidence: float | None, result_metadata: dict[str, Any], trust_before: float | None, ) -> None: event = { "kind": kind, "step_count": self.step_count, "action_type": action_type, "specialist_id": specialist_id, "subtask_id": subtask.get("id") if subtask else None, "domain": subtask.get("domain") if subtask else None, "stakes": round(stakes, 3), "reward": round(reward_value, 4), "reason": reason, "signal_breakdown": breakdown, "was_adversarial": was_adversarial, "confidence": round(confidence, 3) if confidence is not None else None, "trust_before": round(trust_before, 3) if trust_before is not None else None, "trust_after": self._ledger.snapshot().get(specialist_id) if specialist_id else None, "trust_snapshot": self._ledger.snapshot(), "result_metadata": result_metadata, } self._reward_trace.append(event) def _reward_component_averages(self) -> dict[str, float]: totals: dict[str, float] = {} counts: dict[str, int] = {} for event in self._reward_trace: for key, value in event.get("signal_breakdown", {}).items(): if isinstance(value, (int, float)): totals[key] = totals.get(key, 0.0) + float(value) counts[key] = counts.get(key, 0) + 1 return { key: round(total / max(1, counts[key]), 4) for key, total in sorted(totals.items()) } def _apply_difficulty_profile( self, scenario: Scenario, profile: DifficultyProfile, ) -> Scenario: scenario_copy = copy.deepcopy(scenario) if not profile.adaptive or scenario_copy["task_type"] != "task3": return scenario_copy subtasks = scenario_copy["subtasks"] desired_high_stakes = max(1, round(len(subtasks) * profile.high_stakes_ratio)) for offset, subtask in enumerate(subtasks[-desired_high_stakes:]): target_stakes = min(0.99, profile.adversarial_threshold + 0.05 + offset * 0.02) if subtask["stakes"] < target_stakes: subtask["stakes"] = round(target_stakes, 2) subtask["description"] = re.sub( r"stakes=\d+\.\d+", f"stakes={subtask['stakes']:.2f}", subtask["description"], ) return scenario_copy