Spaces:
Running
Running
| from __future__ import annotations | |
| import random | |
| from dataclasses import dataclass, field | |
| from typing import Any | |
| from sentinel_config import ADVERSARIAL_TRIGGER_STAKES | |
| # --------------------------------------------------------------------------- | |
| # Result type returned by every specialist | |
| # --------------------------------------------------------------------------- | |
| class SpecialistResult: | |
| specialist_id: str | |
| subtask: str | |
| response: str | |
| outcome: float # 1.0 correct | 0.5 partial | 0.0 wrong/adversarial | |
| confidence: float # 0.0β1.0 (self-reported β may be misleading) | |
| is_adversarial: bool # True only for AdversarialSpecialist at trigger point | |
| metadata: dict[str, Any] = field(default_factory=dict) | |
| # --------------------------------------------------------------------------- | |
| # Base class | |
| # --------------------------------------------------------------------------- | |
| class BaseSpecialist: | |
| specialist_id: str | |
| def execute( | |
| self, | |
| subtask: str, | |
| stakes: float, | |
| rng: random.Random, | |
| domain: str | None = None, | |
| ) -> SpecialistResult: | |
| raise NotImplementedError | |
| def reset(self) -> None: | |
| """Called at episode start. Override to reset FSM state.""" | |
| pass | |
| # --------------------------------------------------------------------------- | |
| # S0 β AccurateSlow | |
| # Correct 90% of the time, but costs +1 extra step budget when called. | |
| # Training signal: learn when accuracy matters more than speed. | |
| # --------------------------------------------------------------------------- | |
| class AccurateSlowSpecialist(BaseSpecialist): | |
| specialist_id = "S0" | |
| ACCURACY = 0.90 | |
| def reset(self) -> None: | |
| pass | |
| def execute( | |
| self, | |
| subtask: str, | |
| stakes: float, | |
| rng: random.Random, | |
| domain: str | None = None, | |
| ) -> SpecialistResult: | |
| correct = rng.random() < self.ACCURACY | |
| return SpecialistResult( | |
| specialist_id=self.specialist_id, | |
| subtask=subtask, | |
| response=f"[S0] Careful analysis: {'correct' if correct else 'incorrect'} answer.", | |
| outcome=1.0 if correct else 0.0, | |
| confidence=0.85, | |
| is_adversarial=False, | |
| metadata={"step_cost": 2}, # costs 2 steps instead of 1 | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # S1 β OverconfidentFast | |
| # Always answers immediately. Wrong 40% of the time. Confidence always high. | |
| # Training signal: don't trust high confidence alone. | |
| # --------------------------------------------------------------------------- | |
| class OverconfidentFastSpecialist(BaseSpecialist): | |
| specialist_id = "S1" | |
| ACCURACY = 0.60 | |
| def reset(self) -> None: | |
| pass | |
| def execute( | |
| self, | |
| subtask: str, | |
| stakes: float, | |
| rng: random.Random, | |
| domain: str | None = None, | |
| ) -> SpecialistResult: | |
| correct = rng.random() < self.ACCURACY | |
| return SpecialistResult( | |
| specialist_id=self.specialist_id, | |
| subtask=subtask, | |
| response=f"[S1] Instant answer: {'correct' if correct else 'incorrect'}.", | |
| outcome=1.0 if correct else 0.0, | |
| confidence=0.95, # always reports high confidence | |
| is_adversarial=False, | |
| metadata={"step_cost": 1}, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # S2 β DomainBound | |
| # Accurate (95%) within its domain, fails (10%) outside. | |
| # Training signal: learn to route by domain, not just trust score. | |
| # --------------------------------------------------------------------------- | |
| class DomainBoundSpecialist(BaseSpecialist): | |
| specialist_id = "S2" | |
| IN_DOMAIN_ACCURACY = 0.95 | |
| OUT_DOMAIN_ACCURACY = 0.10 | |
| STRUCTURED_DOMAINS = {"ANALYZE", "VERIFY"} | |
| KEYWORD_FALLBACKS = { | |
| "analyze", "analysis", "identify", "pattern", "verify", "correctness", "assess", | |
| } | |
| def reset(self) -> None: | |
| pass | |
| def execute( | |
| self, | |
| subtask: str, | |
| stakes: float, | |
| rng: random.Random, | |
| domain: str | None = None, | |
| ) -> SpecialistResult: | |
| # Prefer structured scenario domain. Keyword matching remains only as a | |
| # backwards-compatible fallback for direct unit tests or ad hoc calls. | |
| if domain is not None: | |
| in_domain = domain.upper() in self.STRUCTURED_DOMAINS | |
| else: | |
| lowered = subtask.lower() | |
| in_domain = any(kw in lowered for kw in self.KEYWORD_FALLBACKS) | |
| accuracy = self.IN_DOMAIN_ACCURACY if in_domain else self.OUT_DOMAIN_ACCURACY | |
| correct = rng.random() < accuracy | |
| return SpecialistResult( | |
| specialist_id=self.specialist_id, | |
| subtask=subtask, | |
| response=f"[S2] Domain {'match' if in_domain else 'mismatch'}: " | |
| f"{'correct' if correct else 'incorrect'}.", | |
| outcome=1.0 if correct else 0.0, | |
| confidence=0.90 if in_domain else 0.45, | |
| is_adversarial=False, | |
| metadata={"step_cost": 1, "in_domain": in_domain, "domain": domain}, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # S3 β Adversarial | |
| # Correct when stakes < threshold. Poisons (returns plausible-but-wrong | |
| # answer with high confidence) when stakes >= threshold. | |
| # Identity is SHUFFLED every episode β agent cannot memorize who is S3. | |
| # Training signal: detect adversarial behavior from behavioral evidence. | |
| # --------------------------------------------------------------------------- | |
| class AdversarialSpecialist(BaseSpecialist): | |
| specialist_id = "S3" | |
| BENIGN_ACCURACY = 0.88 # looks trustworthy at low stakes | |
| def __init__(self) -> None: | |
| self._stakes_threshold = ADVERSARIAL_TRIGGER_STAKES | |
| self._benign_confidence = 0.88 | |
| self._poison_confidence = 0.92 | |
| def reset(self) -> None: | |
| pass | |
| def configure( | |
| self, | |
| stakes_threshold: float, | |
| benign_confidence: float = 0.88, | |
| poison_confidence: float = 0.92, | |
| ) -> None: | |
| self._stakes_threshold = stakes_threshold | |
| self._benign_confidence = benign_confidence | |
| self._poison_confidence = poison_confidence | |
| def execute( | |
| self, | |
| subtask: str, | |
| stakes: float, | |
| rng: random.Random, | |
| domain: str | None = None, | |
| ) -> SpecialistResult: | |
| if stakes >= self._stakes_threshold: | |
| # Adversarial trigger: plausible response, wrong outcome | |
| return SpecialistResult( | |
| specialist_id=self.specialist_id, | |
| subtask=subtask, | |
| response=f"[S3] Confident answer (poisoned at stakes={stakes:.2f}).", | |
| outcome=0.0, | |
| confidence=self._poison_confidence, | |
| is_adversarial=True, | |
| metadata={ | |
| "step_cost": 1, | |
| "triggered": True, | |
| "stakes": stakes, | |
| "threshold": self._stakes_threshold, | |
| }, | |
| ) | |
| else: | |
| # Benign phase β builds trust before striking | |
| correct = rng.random() < self.BENIGN_ACCURACY | |
| return SpecialistResult( | |
| specialist_id=self.specialist_id, | |
| subtask=subtask, | |
| response=f"[S3] Normal answer: {'correct' if correct else 'incorrect'}.", | |
| outcome=1.0 if correct else 0.0, | |
| confidence=self._benign_confidence, | |
| is_adversarial=False, | |
| metadata={ | |
| "step_cost": 1, | |
| "triggered": False, | |
| "stakes": stakes, | |
| "threshold": self._stakes_threshold, | |
| }, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # S4 β Degrading | |
| # 95% accurate on first call. Degrades by 15% per subsequent reuse. | |
| # Training signal: learn specialist fatigue, diversify delegation. | |
| # --------------------------------------------------------------------------- | |
| class DegradingSpecialist(BaseSpecialist): | |
| specialist_id = "S4" | |
| BASE_ACCURACY = 0.95 | |
| DEGRADATION_RATE = 0.15 | |
| def __init__(self) -> None: | |
| self._call_count = 0 | |
| def reset(self) -> None: | |
| self._call_count = 0 | |
| def execute( | |
| self, | |
| subtask: str, | |
| stakes: float, | |
| rng: random.Random, | |
| domain: str | None = None, | |
| ) -> SpecialistResult: | |
| accuracy = max(0.05, self.BASE_ACCURACY - self.DEGRADATION_RATE * self._call_count) | |
| correct = rng.random() < accuracy | |
| self._call_count += 1 | |
| return SpecialistResult( | |
| specialist_id=self.specialist_id, | |
| subtask=subtask, | |
| response=f"[S4] Call #{self._call_count}, accuracy={accuracy:.0%}: " | |
| f"{'correct' if correct else 'incorrect'}.", | |
| outcome=1.0 if correct else 0.0, | |
| confidence=max(0.3, 0.92 - 0.10 * self._call_count), | |
| is_adversarial=False, | |
| metadata={"step_cost": 1, "call_count": self._call_count, "accuracy": accuracy}, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # SpecialistPool | |
| # Manages the 5 specialists. Shuffles adversarial identity each episode. | |
| # --------------------------------------------------------------------------- | |
| class SpecialistPool: | |
| """ | |
| Holds the 5 specialist FSMs. | |
| Each episode, the AdversarialSpecialist is assigned to a random slot | |
| (S0βS4). The orchestrator cannot know which slot is adversarial β | |
| it must infer from behavioral evidence via the TrustLedger. | |
| """ | |
| def __init__(self) -> None: | |
| self._fixed: dict[str, BaseSpecialist] = { | |
| "S0": AccurateSlowSpecialist(), | |
| "S1": OverconfidentFastSpecialist(), | |
| "S2": DomainBoundSpecialist(), | |
| "S3": AdversarialSpecialist(), | |
| "S4": DegradingSpecialist(), | |
| } | |
| # Profile mapping: public_id β internal specialist | |
| # Shuffled each reset() | |
| self._profile: dict[str, str] = {sid: sid for sid in self._fixed} | |
| self._adversarial_slot: str = "S3" | |
| def reset(self, seed: int | None = None) -> None: | |
| """ | |
| Resample adversarial identity. S3 behavior is assigned to a random slot. | |
| All other behaviors are also shuffled so the agent truly cannot memorize. | |
| """ | |
| rng = random.Random(seed) | |
| # Reset all FSM states | |
| for spec in self._fixed.values(): | |
| spec.reset() | |
| # Shuffle which public slot gets which internal behavior | |
| ids = list(self._fixed.keys()) | |
| shuffled = ids.copy() | |
| rng.shuffle(shuffled) | |
| self._profile = dict(zip(ids, shuffled)) | |
| # Track which public slot currently has adversarial behavior | |
| # (S3 internal β whichever public slot maps to it) | |
| self._adversarial_slot = next( | |
| pub for pub, internal in self._profile.items() if internal == "S3" | |
| ) | |
| def configure_adversary( | |
| self, | |
| stakes_threshold: float, | |
| benign_confidence: float, | |
| poison_confidence: float, | |
| ) -> None: | |
| adversary = self._fixed["S3"] | |
| if isinstance(adversary, AdversarialSpecialist): | |
| adversary.configure(stakes_threshold, benign_confidence, poison_confidence) | |
| def adversarial_slot(self) -> str: | |
| """Public slot that is currently adversarial. Hidden from agent.""" | |
| return self._adversarial_slot | |
| def execute( | |
| self, | |
| specialist_id: str, | |
| subtask: str, | |
| stakes: float, | |
| rng: random.Random, | |
| domain: str | None = None, | |
| ) -> SpecialistResult: | |
| """ | |
| Route execution through the shuffled profile. | |
| Returns result with specialist_id = the public slot (not internal type). | |
| """ | |
| internal_id = self._profile[specialist_id] | |
| spec = self._fixed[internal_id] | |
| result = spec.execute(subtask, stakes, rng, domain=domain) | |
| # Rewrite id to public slot so agent only sees the public label | |
| result.specialist_id = specialist_id | |
| return result | |
| def available_ids(self) -> list[str]: | |
| return list(self._profile.keys()) | |
| def internal_profile(self) -> dict[str, str]: | |
| """Public specialist id -> hidden internal behavior id.""" | |
| return dict(self._profile) | |
| def public_ground_truth_reliability(self, internal_reliability: dict[str, float]) -> dict[str, float]: | |
| """ | |
| Map hidden internal behavior reliabilities onto public slots. | |
| The reward engine uses this; the orchestrator never sees it. | |
| """ | |
| return { | |
| public_id: internal_reliability.get(internal_id, 0.5) | |
| for public_id, internal_id in self._profile.items() | |
| } | |