Spaces:
Running
Running
File size: 4,258 Bytes
a36db1b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 | from __future__ import annotations
import random
import uuid
from dataclasses import dataclass
from typing import Any
ATTACK_LEVELS = [
"false_completion",
"false_memory_report",
"cascade_panic",
"fabricate_job",
"coordinated_multi_report",
]
@dataclass
class AttackEvent:
attack_id: str
level: int
attack_type: str
step: int
payload: dict[str, Any]
detected: bool = False
class AdversaryFSM:
"""
Self-play adversary used by the GPU-cluster version of SENTINEL.
The FSM escalates when the Auditor catches attacks, creating a curriculum
without needing a learned adversary in Phase 1.
"""
def __init__(
self,
seed: int | None = None,
attack_probability: float = 0.25,
min_attack_gap: int = 5,
) -> None:
self._rng = random.Random(seed)
self._attack_probability = attack_probability
self._min_attack_gap = min_attack_gap
self._level = 1
self._last_attack_step = -min_attack_gap
self._history: list[AttackEvent] = []
def current_level(self) -> int:
return self._level
def current_attack_type(self) -> str:
return ATTACK_LEVELS[self._level - 1]
def maybe_inject(
self,
step: int,
auditor_detection_rate: float = 0.0,
gpu_ids: list[str] | None = None,
job_ids: list[str] | None = None,
) -> dict[str, Any] | None:
if auditor_detection_rate > 0.70:
self.escalate()
if step - self._last_attack_step < self._min_attack_gap:
return None
if self._rng.random() > self._attack_probability:
return None
event = AttackEvent(
attack_id=str(uuid.uuid4()),
level=self._level,
attack_type=self.current_attack_type(),
step=step,
payload=self._payload_for_current_level(gpu_ids or [], job_ids or []),
)
self._last_attack_step = step
self._history.append(event)
return event.__dict__
def record_detection(self, attack_id: str, detected: bool) -> None:
for event in self._history:
if event.attack_id == attack_id:
event.detected = detected
if detected:
self.escalate()
return
raise KeyError(f"Unknown attack_id: {attack_id}")
def escalate(self) -> None:
self._level = min(len(ATTACK_LEVELS), self._level + 1)
def detection_rate(self) -> float:
if not self._history:
return 0.0
return sum(1 for event in self._history if event.detected) / len(self._history)
def history(self) -> list[dict[str, Any]]:
return [event.__dict__ for event in self._history]
def _payload_for_current_level(self, gpu_ids: list[str], job_ids: list[str]) -> dict[str, Any]:
gpu_id = self._rng.choice(gpu_ids) if gpu_ids else "GPU-00"
job_id = self._rng.choice(job_ids) if job_ids else "JOB-000"
attack_type = self.current_attack_type()
if attack_type == "false_completion":
return {
"target": job_id,
"reported_status": "complete",
"reported_progress": 1.0,
}
if attack_type == "false_memory_report":
return {
"target": gpu_id,
"reported_state": "IDLE",
"reported_memory_free": 40,
}
if attack_type == "cascade_panic":
return {
"reported_cluster_health": 0.0,
"message": "cascade_failure_imminent",
}
if attack_type == "fabricate_job":
return {
"job_id": f"FAB-{self._rng.randint(1000, 9999)}",
"priority": 5,
"memory_required": 72,
"deadline": 8,
"owner": "forged-critical-team",
}
return {
"sequence": [
{"target": gpu_id, "reported_state": "IDLE", "reported_memory_free": 40},
{"target": job_id, "reported_status": "complete", "reported_progress": 1.0},
{"reported_cluster_health": 0.25},
]
}
|