File size: 5,913 Bytes
a36db1b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
from __future__ import annotations

import unittest

from cluster_trust_env import ClusterTrustEnv


class ClusterTrustEnvTests(unittest.TestCase):
    def test_reset_returns_gpu_cluster_observation(self) -> None:
        env = ClusterTrustEnv()
        result = env.reset(task_type="task3", seed=42)
        obs = result["observation"]

        self.assertEqual(obs["task_type"], "task3")
        self.assertEqual(len(obs["gpu_pool"]), 16)
        self.assertEqual(len(obs["job_queue_summary"]), 30)
        self.assertEqual(obs["trust_snapshot"], {"S0": 0.5, "S1": 0.5, "S2": 0.5, "S3": 0.5, "S4": 0.5})
        self.assertIn("ai_failure_coverage", obs)
        self.assertTrue(obs["ai_failure_coverage"]["reward_hacking"]["covered"])

    def test_allocate_updates_gpu_jobs_trust_and_reward(self) -> None:
        env = ClusterTrustEnv()
        result = env.reset(task_type="task3", seed=42)
        obs = result["observation"]
        job_id = obs["job_queue_summary"][0]["job_id"]

        result = env.step(
            {
                "session_id": obs["session_id"],
                "action_type": "allocate",
                "job_id": job_id,
                "gpu_id": "GPU-00",
                "worker_id": "S0",
            }
        )
        state = env.state()

        self.assertFalse(result["done"])
        self.assertGreater(result["reward"]["value"], 0.0)
        self.assertEqual(state["jobs"]["statuses"]["running"], 1)
        self.assertGreaterEqual(state["cluster"]["memory_used"], 10)
        self.assertGreater(state["trust_snapshot"]["S0"], 0.5)

    def test_overreporter_false_report_lowers_worker_trust(self) -> None:
        env = ClusterTrustEnv()
        result = env.reset(task_type="task3", seed=3)
        obs = result["observation"]
        profile = env.state()["worker_profile_hidden"]
        overreporter = next(public_id for public_id, behavior in profile.items() if behavior == "overreporter")
        job_id = obs["job_queue_summary"][0]["job_id"]

        result = env.step(
            {
                "session_id": obs["session_id"],
                "action_type": "allocate",
                "job_id": job_id,
                "gpu_id": "GPU-00",
                "worker_id": overreporter,
            }
        )
        state = env.state()

        self.assertIn("trust_snapshot", state)
        self.assertLess(state["trust_snapshot"][overreporter], 0.5)
        self.assertGreaterEqual(result["reward"]["signal_breakdown"]["trust_calibration"], 0.0)

    def test_reward_report_contains_cluster_health_and_trust(self) -> None:
        env = ClusterTrustEnv()
        result = env.reset(task_type="task2", seed=9)
        obs = result["observation"]
        result = env.step({"session_id": obs["session_id"], "action_type": "allocate"})
        report = env.reward_report()

        self.assertEqual(report["reward_events"], 1)
        self.assertIn("cluster_health", report["events"][0])
        self.assertIn("trust_snapshot", report["events"][0])
        self.assertIn("global", report["events"][0]["signal_breakdown"])
        self.assertIn("ai_reliability", report["events"][0]["signal_breakdown"])

    def test_stream_snapshot_contains_live_dashboard_fields(self) -> None:
        env = ClusterTrustEnv()
        result = env.reset(task_type="task3", seed=42)
        obs = result["observation"]
        env.step({"session_id": obs["session_id"], "action_type": "allocate"})

        snapshot = env.stream_snapshot()

        self.assertEqual(snapshot["environment_mode"], "cluster")
        self.assertIn("cluster", snapshot)
        self.assertIn("jobs", snapshot)
        self.assertIn("ai_failure_coverage", snapshot)
        self.assertIn("attack_attempts", snapshot)

    def test_adaptive_reset_exposes_cluster_difficulty_profile(self) -> None:
        env = ClusterTrustEnv()
        result = env.reset(task_type="task3", seed=42, adaptive=True)
        obs = result["observation"]

        self.assertTrue(obs["difficulty_profile"]["adaptive"])
        self.assertIn("adversarial_threshold", obs["difficulty_profile"])

    def test_repeated_failed_actions_trigger_loop_and_context_signals(self) -> None:
        env = ClusterTrustEnv()
        result = env.reset(task_type="task3", seed=11)
        obs = result["observation"]

        for _ in range(4):
            result = env.step(
                {
                    "session_id": obs["session_id"],
                    "action_type": "verify",
                    "job_id": "missing-job",
                }
            )

        coverage = env.state()["ai_failure_coverage"]

        self.assertGreater(coverage["agent_loop_reliability"]["loop_events"], 0)
        self.assertGreater(coverage["context_memory_loss"]["drift_events"], 0)
        self.assertLess(coverage["ai_reliability_modifier"], 1.0)

    def test_accepted_reward_hack_reduces_global_modifier(self) -> None:
        env = ClusterTrustEnv()
        result = env.reset(task_type="task3", seed=42)

        for worker_id in ("S0", "S1"):
            obs = result["observation"]
            job = next(job for job in obs["job_queue_summary"] if job["status"] == "queued")
            gpu = max(obs["gpu_pool"], key=lambda row: row["memory_free"])
            result = env.step(
                {
                    "session_id": obs["session_id"],
                    "action_type": "allocate",
                    "job_id": job["job_id"],
                    "gpu_id": gpu["id"],
                    "worker_id": worker_id,
                }
            )

        breakdown = result["reward"]["signal_breakdown"]
        coverage = env.state()["ai_failure_coverage"]

        self.assertGreater(env.state()["attack_poisonings"], 0)
        self.assertLess(breakdown["reward_hack_resistance"], 1.0)
        self.assertLess(coverage["reward_hacking"]["score"], 1.0)


if __name__ == "__main__":
    unittest.main()