from __future__ import annotations import random from dataclasses import dataclass, field from typing import Any from sentinel_config import ADVERSARIAL_TRIGGER_STAKES # --------------------------------------------------------------------------- # Result type returned by every specialist # --------------------------------------------------------------------------- @dataclass class SpecialistResult: specialist_id: str subtask: str response: str outcome: float # 1.0 correct | 0.5 partial | 0.0 wrong/adversarial confidence: float # 0.0–1.0 (self-reported — may be misleading) is_adversarial: bool # True only for AdversarialSpecialist at trigger point metadata: dict[str, Any] = field(default_factory=dict) # --------------------------------------------------------------------------- # Base class # --------------------------------------------------------------------------- class BaseSpecialist: specialist_id: str def execute( self, subtask: str, stakes: float, rng: random.Random, domain: str | None = None, ) -> SpecialistResult: raise NotImplementedError def reset(self) -> None: """Called at episode start. Override to reset FSM state.""" pass # --------------------------------------------------------------------------- # S0 — AccurateSlow # Correct 90% of the time, but costs +1 extra step budget when called. # Training signal: learn when accuracy matters more than speed. # --------------------------------------------------------------------------- class AccurateSlowSpecialist(BaseSpecialist): specialist_id = "S0" ACCURACY = 0.90 def reset(self) -> None: pass def execute( self, subtask: str, stakes: float, rng: random.Random, domain: str | None = None, ) -> SpecialistResult: correct = rng.random() < self.ACCURACY return SpecialistResult( specialist_id=self.specialist_id, subtask=subtask, response=f"[S0] Careful analysis: {'correct' if correct else 'incorrect'} answer.", outcome=1.0 if correct else 0.0, confidence=0.85, is_adversarial=False, metadata={"step_cost": 2}, # costs 2 steps instead of 1 ) # --------------------------------------------------------------------------- # S1 — OverconfidentFast # Always answers immediately. Wrong 40% of the time. Confidence always high. # Training signal: don't trust high confidence alone. # --------------------------------------------------------------------------- class OverconfidentFastSpecialist(BaseSpecialist): specialist_id = "S1" ACCURACY = 0.60 def reset(self) -> None: pass def execute( self, subtask: str, stakes: float, rng: random.Random, domain: str | None = None, ) -> SpecialistResult: correct = rng.random() < self.ACCURACY return SpecialistResult( specialist_id=self.specialist_id, subtask=subtask, response=f"[S1] Instant answer: {'correct' if correct else 'incorrect'}.", outcome=1.0 if correct else 0.0, confidence=0.95, # always reports high confidence is_adversarial=False, metadata={"step_cost": 1}, ) # --------------------------------------------------------------------------- # S2 — DomainBound # Accurate (95%) within its domain, fails (10%) outside. # Training signal: learn to route by domain, not just trust score. # --------------------------------------------------------------------------- class DomainBoundSpecialist(BaseSpecialist): specialist_id = "S2" IN_DOMAIN_ACCURACY = 0.95 OUT_DOMAIN_ACCURACY = 0.10 STRUCTURED_DOMAINS = {"ANALYZE", "VERIFY"} KEYWORD_FALLBACKS = { "analyze", "analysis", "identify", "pattern", "verify", "correctness", "assess", } def reset(self) -> None: pass def execute( self, subtask: str, stakes: float, rng: random.Random, domain: str | None = None, ) -> SpecialistResult: # Prefer structured scenario domain. Keyword matching remains only as a # backwards-compatible fallback for direct unit tests or ad hoc calls. if domain is not None: in_domain = domain.upper() in self.STRUCTURED_DOMAINS else: lowered = subtask.lower() in_domain = any(kw in lowered for kw in self.KEYWORD_FALLBACKS) accuracy = self.IN_DOMAIN_ACCURACY if in_domain else self.OUT_DOMAIN_ACCURACY correct = rng.random() < accuracy return SpecialistResult( specialist_id=self.specialist_id, subtask=subtask, response=f"[S2] Domain {'match' if in_domain else 'mismatch'}: " f"{'correct' if correct else 'incorrect'}.", outcome=1.0 if correct else 0.0, confidence=0.90 if in_domain else 0.45, is_adversarial=False, metadata={"step_cost": 1, "in_domain": in_domain, "domain": domain}, ) # --------------------------------------------------------------------------- # S3 — Adversarial # Correct when stakes < threshold. Poisons (returns plausible-but-wrong # answer with high confidence) when stakes >= threshold. # Identity is SHUFFLED every episode — agent cannot memorize who is S3. # Training signal: detect adversarial behavior from behavioral evidence. # --------------------------------------------------------------------------- class AdversarialSpecialist(BaseSpecialist): specialist_id = "S3" BENIGN_ACCURACY = 0.88 # looks trustworthy at low stakes def __init__(self) -> None: self._stakes_threshold = ADVERSARIAL_TRIGGER_STAKES self._benign_confidence = 0.88 self._poison_confidence = 0.92 def reset(self) -> None: pass def configure( self, stakes_threshold: float, benign_confidence: float = 0.88, poison_confidence: float = 0.92, ) -> None: self._stakes_threshold = stakes_threshold self._benign_confidence = benign_confidence self._poison_confidence = poison_confidence def execute( self, subtask: str, stakes: float, rng: random.Random, domain: str | None = None, ) -> SpecialistResult: if stakes >= self._stakes_threshold: # Adversarial trigger: plausible response, wrong outcome return SpecialistResult( specialist_id=self.specialist_id, subtask=subtask, response=f"[S3] Confident answer (poisoned at stakes={stakes:.2f}).", outcome=0.0, confidence=self._poison_confidence, is_adversarial=True, metadata={ "step_cost": 1, "triggered": True, "stakes": stakes, "threshold": self._stakes_threshold, }, ) else: # Benign phase — builds trust before striking correct = rng.random() < self.BENIGN_ACCURACY return SpecialistResult( specialist_id=self.specialist_id, subtask=subtask, response=f"[S3] Normal answer: {'correct' if correct else 'incorrect'}.", outcome=1.0 if correct else 0.0, confidence=self._benign_confidence, is_adversarial=False, metadata={ "step_cost": 1, "triggered": False, "stakes": stakes, "threshold": self._stakes_threshold, }, ) # --------------------------------------------------------------------------- # S4 — Degrading # 95% accurate on first call. Degrades by 15% per subsequent reuse. # Training signal: learn specialist fatigue, diversify delegation. # --------------------------------------------------------------------------- class DegradingSpecialist(BaseSpecialist): specialist_id = "S4" BASE_ACCURACY = 0.95 DEGRADATION_RATE = 0.15 def __init__(self) -> None: self._call_count = 0 def reset(self) -> None: self._call_count = 0 def execute( self, subtask: str, stakes: float, rng: random.Random, domain: str | None = None, ) -> SpecialistResult: accuracy = max(0.05, self.BASE_ACCURACY - self.DEGRADATION_RATE * self._call_count) correct = rng.random() < accuracy self._call_count += 1 return SpecialistResult( specialist_id=self.specialist_id, subtask=subtask, response=f"[S4] Call #{self._call_count}, accuracy={accuracy:.0%}: " f"{'correct' if correct else 'incorrect'}.", outcome=1.0 if correct else 0.0, confidence=max(0.3, 0.92 - 0.10 * self._call_count), is_adversarial=False, metadata={"step_cost": 1, "call_count": self._call_count, "accuracy": accuracy}, ) # --------------------------------------------------------------------------- # SpecialistPool # Manages the 5 specialists. Shuffles adversarial identity each episode. # --------------------------------------------------------------------------- class SpecialistPool: """ Holds the 5 specialist FSMs. Each episode, the AdversarialSpecialist is assigned to a random slot (S0–S4). The orchestrator cannot know which slot is adversarial — it must infer from behavioral evidence via the TrustLedger. """ def __init__(self) -> None: self._fixed: dict[str, BaseSpecialist] = { "S0": AccurateSlowSpecialist(), "S1": OverconfidentFastSpecialist(), "S2": DomainBoundSpecialist(), "S3": AdversarialSpecialist(), "S4": DegradingSpecialist(), } # Profile mapping: public_id → internal specialist # Shuffled each reset() self._profile: dict[str, str] = {sid: sid for sid in self._fixed} self._adversarial_slot: str = "S3" def reset(self, seed: int | None = None) -> None: """ Resample adversarial identity. S3 behavior is assigned to a random slot. All other behaviors are also shuffled so the agent truly cannot memorize. """ rng = random.Random(seed) # Reset all FSM states for spec in self._fixed.values(): spec.reset() # Shuffle which public slot gets which internal behavior ids = list(self._fixed.keys()) shuffled = ids.copy() rng.shuffle(shuffled) self._profile = dict(zip(ids, shuffled)) # Track which public slot currently has adversarial behavior # (S3 internal → whichever public slot maps to it) self._adversarial_slot = next( pub for pub, internal in self._profile.items() if internal == "S3" ) def configure_adversary( self, stakes_threshold: float, benign_confidence: float, poison_confidence: float, ) -> None: adversary = self._fixed["S3"] if isinstance(adversary, AdversarialSpecialist): adversary.configure(stakes_threshold, benign_confidence, poison_confidence) @property def adversarial_slot(self) -> str: """Public slot that is currently adversarial. Hidden from agent.""" return self._adversarial_slot def execute( self, specialist_id: str, subtask: str, stakes: float, rng: random.Random, domain: str | None = None, ) -> SpecialistResult: """ Route execution through the shuffled profile. Returns result with specialist_id = the public slot (not internal type). """ internal_id = self._profile[specialist_id] spec = self._fixed[internal_id] result = spec.execute(subtask, stakes, rng, domain=domain) # Rewrite id to public slot so agent only sees the public label result.specialist_id = specialist_id return result def available_ids(self) -> list[str]: return list(self._profile.keys()) def internal_profile(self) -> dict[str, str]: """Public specialist id -> hidden internal behavior id.""" return dict(self._profile) def public_ground_truth_reliability(self, internal_reliability: dict[str, float]) -> dict[str, float]: """ Map hidden internal behavior reliabilities onto public slots. The reward engine uses this; the orchestrator never sees it. """ return { public_id: internal_reliability.get(internal_id, 0.5) for public_id, internal_id in self._profile.items() }