Spaces:
Running
Running
| from __future__ import annotations | |
| import random | |
| import uuid | |
| from dataclasses import dataclass | |
| from typing import Any | |
| ATTACK_LEVELS = [ | |
| "false_completion", | |
| "false_memory_report", | |
| "cascade_panic", | |
| "fabricate_job", | |
| "coordinated_multi_report", | |
| ] | |
| class AttackEvent: | |
| attack_id: str | |
| level: int | |
| attack_type: str | |
| step: int | |
| payload: dict[str, Any] | |
| detected: bool = False | |
| class AdversaryFSM: | |
| """ | |
| Self-play adversary used by the GPU-cluster version of SENTINEL. | |
| The FSM escalates when the Auditor catches attacks, creating a curriculum | |
| without needing a learned adversary in Phase 1. | |
| """ | |
| def __init__( | |
| self, | |
| seed: int | None = None, | |
| attack_probability: float = 0.25, | |
| min_attack_gap: int = 5, | |
| ) -> None: | |
| self._rng = random.Random(seed) | |
| self._attack_probability = attack_probability | |
| self._min_attack_gap = min_attack_gap | |
| self._level = 1 | |
| self._last_attack_step = -min_attack_gap | |
| self._history: list[AttackEvent] = [] | |
| def current_level(self) -> int: | |
| return self._level | |
| def current_attack_type(self) -> str: | |
| return ATTACK_LEVELS[self._level - 1] | |
| def maybe_inject( | |
| self, | |
| step: int, | |
| auditor_detection_rate: float = 0.0, | |
| gpu_ids: list[str] | None = None, | |
| job_ids: list[str] | None = None, | |
| ) -> dict[str, Any] | None: | |
| if auditor_detection_rate > 0.70: | |
| self.escalate() | |
| if step - self._last_attack_step < self._min_attack_gap: | |
| return None | |
| if self._rng.random() > self._attack_probability: | |
| return None | |
| event = AttackEvent( | |
| attack_id=str(uuid.uuid4()), | |
| level=self._level, | |
| attack_type=self.current_attack_type(), | |
| step=step, | |
| payload=self._payload_for_current_level(gpu_ids or [], job_ids or []), | |
| ) | |
| self._last_attack_step = step | |
| self._history.append(event) | |
| return event.__dict__ | |
| def record_detection(self, attack_id: str, detected: bool) -> None: | |
| for event in self._history: | |
| if event.attack_id == attack_id: | |
| event.detected = detected | |
| if detected: | |
| self.escalate() | |
| return | |
| raise KeyError(f"Unknown attack_id: {attack_id}") | |
| def escalate(self) -> None: | |
| self._level = min(len(ATTACK_LEVELS), self._level + 1) | |
| def detection_rate(self) -> float: | |
| if not self._history: | |
| return 0.0 | |
| return sum(1 for event in self._history if event.detected) / len(self._history) | |
| def history(self) -> list[dict[str, Any]]: | |
| return [event.__dict__ for event in self._history] | |
| def _payload_for_current_level(self, gpu_ids: list[str], job_ids: list[str]) -> dict[str, Any]: | |
| gpu_id = self._rng.choice(gpu_ids) if gpu_ids else "GPU-00" | |
| job_id = self._rng.choice(job_ids) if job_ids else "JOB-000" | |
| attack_type = self.current_attack_type() | |
| if attack_type == "false_completion": | |
| return { | |
| "target": job_id, | |
| "reported_status": "complete", | |
| "reported_progress": 1.0, | |
| } | |
| if attack_type == "false_memory_report": | |
| return { | |
| "target": gpu_id, | |
| "reported_state": "IDLE", | |
| "reported_memory_free": 40, | |
| } | |
| if attack_type == "cascade_panic": | |
| return { | |
| "reported_cluster_health": 0.0, | |
| "message": "cascade_failure_imminent", | |
| } | |
| if attack_type == "fabricate_job": | |
| return { | |
| "job_id": f"FAB-{self._rng.randint(1000, 9999)}", | |
| "priority": 5, | |
| "memory_required": 72, | |
| "deadline": 8, | |
| "owner": "forged-critical-team", | |
| } | |
| return { | |
| "sequence": [ | |
| {"target": gpu_id, "reported_state": "IDLE", "reported_memory_free": 40}, | |
| {"target": job_id, "reported_status": "complete", "reported_progress": 1.0}, | |
| {"reported_cluster_health": 0.25}, | |
| ] | |
| } | |