File size: 4,258 Bytes
a36db1b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
from __future__ import annotations

import random
import uuid
from dataclasses import dataclass
from typing import Any


ATTACK_LEVELS = [
    "false_completion",
    "false_memory_report",
    "cascade_panic",
    "fabricate_job",
    "coordinated_multi_report",
]


@dataclass
class AttackEvent:
    attack_id: str
    level: int
    attack_type: str
    step: int
    payload: dict[str, Any]
    detected: bool = False


class AdversaryFSM:
    """
    Self-play adversary used by the GPU-cluster version of SENTINEL.

    The FSM escalates when the Auditor catches attacks, creating a curriculum
    without needing a learned adversary in Phase 1.
    """

    def __init__(
        self,
        seed: int | None = None,
        attack_probability: float = 0.25,
        min_attack_gap: int = 5,
    ) -> None:
        self._rng = random.Random(seed)
        self._attack_probability = attack_probability
        self._min_attack_gap = min_attack_gap
        self._level = 1
        self._last_attack_step = -min_attack_gap
        self._history: list[AttackEvent] = []

    def current_level(self) -> int:
        return self._level

    def current_attack_type(self) -> str:
        return ATTACK_LEVELS[self._level - 1]

    def maybe_inject(
        self,
        step: int,
        auditor_detection_rate: float = 0.0,
        gpu_ids: list[str] | None = None,
        job_ids: list[str] | None = None,
    ) -> dict[str, Any] | None:
        if auditor_detection_rate > 0.70:
            self.escalate()
        if step - self._last_attack_step < self._min_attack_gap:
            return None
        if self._rng.random() > self._attack_probability:
            return None

        event = AttackEvent(
            attack_id=str(uuid.uuid4()),
            level=self._level,
            attack_type=self.current_attack_type(),
            step=step,
            payload=self._payload_for_current_level(gpu_ids or [], job_ids or []),
        )
        self._last_attack_step = step
        self._history.append(event)
        return event.__dict__

    def record_detection(self, attack_id: str, detected: bool) -> None:
        for event in self._history:
            if event.attack_id == attack_id:
                event.detected = detected
                if detected:
                    self.escalate()
                return
        raise KeyError(f"Unknown attack_id: {attack_id}")

    def escalate(self) -> None:
        self._level = min(len(ATTACK_LEVELS), self._level + 1)

    def detection_rate(self) -> float:
        if not self._history:
            return 0.0
        return sum(1 for event in self._history if event.detected) / len(self._history)

    def history(self) -> list[dict[str, Any]]:
        return [event.__dict__ for event in self._history]

    def _payload_for_current_level(self, gpu_ids: list[str], job_ids: list[str]) -> dict[str, Any]:
        gpu_id = self._rng.choice(gpu_ids) if gpu_ids else "GPU-00"
        job_id = self._rng.choice(job_ids) if job_ids else "JOB-000"
        attack_type = self.current_attack_type()

        if attack_type == "false_completion":
            return {
                "target": job_id,
                "reported_status": "complete",
                "reported_progress": 1.0,
            }
        if attack_type == "false_memory_report":
            return {
                "target": gpu_id,
                "reported_state": "IDLE",
                "reported_memory_free": 40,
            }
        if attack_type == "cascade_panic":
            return {
                "reported_cluster_health": 0.0,
                "message": "cascade_failure_imminent",
            }
        if attack_type == "fabricate_job":
            return {
                "job_id": f"FAB-{self._rng.randint(1000, 9999)}",
                "priority": 5,
                "memory_required": 72,
                "deadline": 8,
                "owner": "forged-critical-team",
            }
        return {
            "sequence": [
                {"target": gpu_id, "reported_state": "IDLE", "reported_memory_free": 40},
                {"target": job_id, "reported_status": "complete", "reported_progress": 1.0},
                {"reported_cluster_health": 0.25},
            ]
        }