File size: 21,387 Bytes
b5e5650 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 | #!/usr/bin/env python3
"""
Comprehensive DAG Physics Validation for AntiAtropos training readiness.
Verifies:
A. DAG traffic routing (parent->child propagation)
B. Task-2 scripted failure flows through DAG (not bypassed)
C. Task-3 surge correct overlay on DAG
D. Backpressure is temporary (not permanent capacity drain)
E. Gradual recovery completes fully
F. Graph-bounded cascades
G. Reroute weights work with DAG
H. Graph Lyapunov edge penalty
I. Environment observation populates graph fields
J. Reward components are non-degenerate across tasks
Run: python validate_dag_physics.py
"""
import sys, os, math
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from simulator import (
ClusterSimulator, NodeStatus, DEFAULT_CAPACITY,
CLUSTER_TOPOLOGY, EXTERNAL_TRAFFIC_NODES, _TOPOLOGICAL_ORDER,
DEFAULT_ROUTING_SPLIT, T1_INITIAL_LAMBDA, T2_INITIAL_LAMBDA,
T3_INITIAL_LAMBDA, T3_SURGE_MAGNITUDE,
COST_PER_CAPACITY_UNIT_PER_HOUR, FATAL_FAIL_THRESHOLD,
NODE_RECOVERY_TICKS, BACKPRESSURE_THRESHOLD,
)
from stability import (
compute_lyapunov, compute_lyapunov_graph, compute_reward,
compute_barrier, normalize_reward, smooth_sla_penalty, compute_drift,
BARRIER_NORM_SCALE,
)
# --- Test harness ---
PASS = "PASS"
FAIL = "FAIL"
results = []
def record(name, status, detail=""):
results.append((name, status, detail))
icon = "+" if status == PASS else "X"
msg = f" [{icon}] {name}"
if detail:
msg += f" -- {detail}"
print(msg)
# ============================================================================
# A. DAG Traffic Routing
# ============================================================================
def test_A_dag_routing():
print("\n=== A. DAG Traffic Routing ===")
sim = ClusterSimulator(n_nodes=5, task_id="task-1", seed=1)
sim.reset(task_id="task-1", seed=1)
# Tick once with NO_OP
class _A: pass
a = _A(); a.action_type = "NO_OP"; a.target_node_id = "node-0"; a.parameter = 0.0
sim.apply_action(a)
sim.tick()
nodes = {n.node_id: n for n in sim._nodes}
# node-0 and node-4 should receive external traffic
n0_in = nodes["node-0"].incoming_request_rate
n4_in = nodes["node-4"].incoming_request_rate
record("node-0 (ingress) receives traffic",
PASS if n0_in > 0 else FAIL,
f"incoming={n0_in:.1f}")
record("node-4 (ingress) receives traffic",
PASS if n4_in > 0 else FAIL,
f"incoming={n4_in:.1f}")
# node-1, node-2 should receive outflow from node-0
n1_in = nodes["node-1"].incoming_request_rate
n2_in = nodes["node-2"].incoming_request_rate
record("node-1 receives from node-0 (DAG child)",
PASS if n1_in > 0 else FAIL,
f"incoming={n1_in:.1f}")
record("node-2 receives from node-0 (DAG child)",
PASS if n2_in > 0 else FAIL,
f"incoming={n2_in:.1f}")
# node-3 should receive outflow from node-2
n3_in = nodes["node-3"].incoming_request_rate
record("node-3 receives from node-2 (DAG grandchild)",
PASS if n3_in > 0 else FAIL,
f"incoming={n3_in:.1f}")
# node-0 outflow should be ~incoming (since capacity >> lambda at start)
record("node-0 has positive outflow_rate",
PASS if nodes["node-0"].outflow_rate > 0 else FAIL,
f"outflow={nodes['node-0'].outflow_rate:.1f}")
# ============================================================================
# B. Task-2 Scripted Failure Flows Through DAG
# ============================================================================
def test_B_task2_dag():
print("\n=== B. Task-2 Failure Through DAG ===")
sim = ClusterSimulator(n_nodes=5, task_id="task-2", seed=42)
sim.reset(task_id="task-2", seed=42)
# Run enough ticks for failure to trigger
class _A: pass
a = _A(); a.action_type = "NO_OP"; a.target_node_id = "node-0"; a.parameter = 0.0
failed_id = None
for _ in range(60):
sim.apply_action(a)
sim.tick()
if sim._failed_node_id and failed_id is None:
failed_id = sim._failed_node_id
record("Scripted failure was assigned",
PASS if failed_id is not None else FAIL,
f"failed_id={failed_id}")
nodes = {n.node_id: n for n in sim._nodes}
failed_node = nodes.get(failed_id)
record("Failed node has FAILED status",
PASS if failed_node and failed_node.status == NodeStatus.FAILED else FAIL,
f"status={failed_node.status if failed_node else 'N/A'}")
# If failed node is a child of node-0 (e.g., node-1 or node-2),
# node-0 should still be sending traffic to it (flow not bypassed)
if failed_id in CLUSTER_TOPOLOGY.get("node-0", []):
# The failed node outflow should be 0 (service_rate=0),
# but it should still have incoming from DAG
record("Failed child still receives DAG traffic (as dropped requests)",
PASS if failed_node.incoming_request_rate >= 0 else FAIL,
f"incoming={failed_node.incoming_request_rate:.1f} dropped={failed_node.dropped_requests:.1f}")
# If failed node was node-2, node-3 should be starved (outflow=0 upstream)
if failed_id == "node-2":
n3 = nodes["node-3"]
record("node-3 starved when parent node-2 fails",
PASS if n3.incoming_request_rate == 0 else FAIL,
f"node-3 incoming={n3.incoming_request_rate:.1f}")
# ============================================================================
# C. Task-3 Surge Overlay on DAG
# ============================================================================
def test_C_task3_surge_dag():
print("\n=== C. Task-3 Surge Overlay ===")
sim = ClusterSimulator(n_nodes=5, task_id="task-3", seed=7)
sim.reset(task_id="task-3", seed=7)
# Force surge window to be active immediately
sim._t3_surge_start = 0
sim._t3_surge_end = 999
class _A: pass
a = _A(); a.action_type = "NO_OP"; a.target_node_id = "node-0"; a.parameter = 0.0
sim.apply_action(a)
sim.tick()
nodes = {n.node_id: n for n in sim._nodes}
# node-1 and node-2 should have surge + DAG traffic
n1_in = nodes["node-1"].incoming_request_rate
n2_in = nodes["node-2"].incoming_request_rate
record("node-1 receives surge + DAG traffic",
PASS if n1_in > T3_SURGE_MAGNITUDE else FAIL,
f"incoming={n1_in:.1f} (surge={T3_SURGE_MAGNITUDE})")
record("node-2 receives surge + DAG traffic",
PASS if n2_in > T3_SURGE_MAGNITUDE else FAIL,
f"incoming={n2_in:.1f} (surge={T3_SURGE_MAGNITUDE})")
# node-0 should still have base DAG traffic (not affected by surge directly)
n0_in = nodes["node-0"].incoming_request_rate
record("node-0 gets base DAG traffic (surge is side-channel)",
PASS if n0_in < T3_SURGE_MAGNITUDE else FAIL,
f"node-0 incoming={n0_in:.1f}")
# ============================================================================
# D. Backpressure Is Temporary
# ============================================================================
def test_D_backpressure_temporary():
print("\n=== D. Backpressure Temporary ===")
sim = ClusterSimulator(n_nodes=5, task_id="task-1", seed=1)
sim.reset(task_id="task-1", seed=1)
node0 = next(n for n in sim._nodes if n.node_id == "node-0")
original_cap = node0.capacity
# Artificially overload node-0's children to trigger backpressure
for n in sim._nodes:
if n.node_id in CLUSTER_TOPOLOGY.get("node-0", []):
n.queue_depth = BACKPRESSURE_THRESHOLD + 100.0 # well above threshold
# Tick: backpressure should reduce node-0's capacity for THIS tick only
class _A: pass
a = _A(); a.action_type = "NO_OP"; a.target_node_id = "node-0"; a.parameter = 0.0
sim.apply_action(a)
sim.tick()
cap_after_tick = node0.capacity
record("node-0 capacity restored after backpressure tick",
PASS if abs(cap_after_tick - original_cap) < 0.01 else FAIL,
f"before={original_cap:.2f} after={cap_after_tick:.2f}")
# Tick again (children still overloaded) — capacity should still be original
sim.apply_action(a)
sim.tick()
cap_after_tick2 = node0.capacity
record("node-0 capacity intact after multiple backpressure ticks",
PASS if abs(cap_after_tick2 - original_cap) < 0.01 else FAIL,
f"after 2 ticks={cap_after_tick2:.2f} original={original_cap:.2f}")
# Clear children overload — capacity should remain original
for n in sim._nodes:
if n.node_id in CLUSTER_TOPOLOGY.get("node-0", []):
n.queue_depth = 0.0
sim.apply_action(a)
sim.tick()
cap_clear = node0.capacity
record("node-0 capacity unchanged after children clear",
PASS if abs(cap_clear - original_cap) < 0.01 else FAIL,
f"capacity={cap_clear:.2f}")
# ============================================================================
# E. Gradual Recovery Completes
# ============================================================================
def test_E_gradual_recovery():
print("\n=== E. Gradual Recovery ===")
sim = ClusterSimulator(n_nodes=5, task_id="task-1", seed=1)
sim.reset(task_id="task-1", seed=1)
node = sim._nodes[2]
node.queue_depth = 250.0
sim._update_statuses()
record("Node becomes FAILED on overload",
PASS if node.status == NodeStatus.FAILED else FAIL,
f"status={node.status}")
record("Capacity drops to 0.5 at failure",
PASS if abs(node.capacity - 0.5) < 0.01 else FAIL,
f"capacity={node.capacity}")
# Tick through full recovery (NODE_RECOVERY_TICKS + some margin)
class _A: pass
a = _A(); a.action_type = "NO_OP"; a.target_node_id = "node-0"; a.parameter = 0.0
for _ in range(NODE_RECOVERY_TICKS + 5):
sim.apply_action(a)
sim.tick()
record("Node reaches HEALTHY after full recovery",
PASS if node.status == NodeStatus.HEALTHY else FAIL,
f"status={node.status}")
# Capacity should have ramped: start=0.5, each recovery tick adds 0.5
# After NODE_RECOVERY_TICKS=20 ticks: 0.5 + 20*0.5 = 10.5, capped at 3.0
record("Capacity recovered to DEFAULT_CAPACITY (capped)",
PASS if abs(node.capacity - DEFAULT_CAPACITY) < 0.01 else FAIL,
f"capacity={node.capacity:.2f} expected={DEFAULT_CAPACITY}")
# ============================================================================
# F. Graph-Bounded Cascades
# ============================================================================
def test_F_graph_cascade():
print("\n=== F. Graph-Bounded Cascades ===")
sim = ClusterSimulator(n_nodes=5, task_id="task-1", seed=1)
sim.reset(task_id="task-1", seed=1)
# Fail node-2, overload its children/parents
node2 = sim._nodes[2]
node2.queue_depth = 250.0
sim._update_statuses() # node-2 becomes FAILED, triggers cascade
# node-3 is child of node-2 — should be at_risk
# node-0 is parent of node-2 — should be at_risk
node3 = sim._nodes[3]
node0 = sim._nodes[0]
# Overload node-3 to trigger cascade
node3.queue_depth = FATAL_FAIL_THRESHOLD * 1.5 # > cascade threshold
sim._cascade_failures()
record("node-3 (child of failed node-2) cascades to DEGRADED",
PASS if node3.status == NodeStatus.DEGRADED else FAIL,
f"node-3 status={node3.status}")
# node-4 is NOT adjacent to node-2 — should NOT cascade
node4 = sim._nodes[4]
node4.queue_depth = FATAL_FAIL_THRESHOLD * 1.5
sim._cascade_failures()
record("node-4 (not adjacent to failed) does NOT cascade",
PASS if node4.status != NodeStatus.DEGRADED else FAIL,
f"node-4 status={node4.status}")
# ============================================================================
# G. Reroute Weights with DAG
# ============================================================================
def test_G_reroute_with_dag():
print("\n=== G. Reroute Weights with DAG ===")
sim = ClusterSimulator(n_nodes=5, task_id="task-1", seed=1)
sim.reset(task_id="task-1", seed=1)
class _A: pass
# Tick once to establish baseline
a = _A(); a.action_type = "NO_OP"; a.target_node_id = "node-0"; a.parameter = 0.0
sim.apply_action(a)
sim.tick()
node0 = next(n for n in sim._nodes if n.node_id == "node-0")
node4 = next(n for n in sim._nodes if n.node_id == "node-4")
baseline_n0_in = node0.incoming_request_rate
# Reroute 100% of node-0 traffic away
a2 = _A(); a2.action_type = "REROUTE_TRAFFIC"; a2.target_node_id = "node-0"; a2.parameter = 1.0
sim.apply_action(a2)
sim.tick()
n0_in_after = node0.incoming_request_rate
record("Reroute reduces node-0 incoming traffic",
PASS if n0_in_after < baseline_n0_in else FAIL,
f"before={baseline_n0_in:.1f} after={n0_in_after:.1f}")
# Verify outflow_rate is also reduced (since incoming is lower)
record("node-0 outflow reduced after reroute",
PASS if node0.outflow_rate < baseline_n0_in else FAIL,
f"outflow={node0.outflow_rate:.1f}")
# Reroute weight should decay each tick (0.5 factor)
w = sim._reroute_weights.get("node-0", 0.0)
record("Reroute weight decays (0.5 * prev)",
PASS if w < 1.0 else FAIL,
f"weight after first decay={w:.3f}")
# ============================================================================
# H. Graph Lyapunov Edge Penalty
# ============================================================================
def test_H_graph_lyapunov():
print("\n=== H. Graph Lyapunov Edge Penalty ===")
nodes_balanced = [
{"node_id": "node-0", "queue_depth": 50.0, "importance_weight": 2.0},
{"node_id": "node-4", "queue_depth": 50.0, "importance_weight": 1.0},
{"node_id": "node-1", "queue_depth": 50.0, "importance_weight": 1.0},
{"node_id": "node-2", "queue_depth": 50.0, "importance_weight": 1.0},
{"node_id": "node-3", "queue_depth": 50.0, "importance_weight": 1.0},
]
nodes_imbalanced = [
{"node_id": "node-0", "queue_depth": 200.0, "importance_weight": 2.0},
{"node_id": "node-4", "queue_depth": 10.0, "importance_weight": 1.0},
{"node_id": "node-1", "queue_depth": 10.0, "importance_weight": 1.0},
{"node_id": "node-2", "queue_depth": 10.0, "importance_weight": 1.0},
{"node_id": "node-3", "queue_depth": 10.0, "importance_weight": 1.0},
]
v_bal = compute_lyapunov_graph(nodes_balanced, CLUSTER_TOPOLOGY)
v_imb = compute_lyapunov_graph(nodes_imbalanced, CLUSTER_TOPOLOGY)
record("Graph Lyapunov: imbalanced > balanced",
PASS if v_imb > v_bal else FAIL,
f"balanced={v_bal:.1f} imbalanced={v_imb:.1f}")
# Compare with flat Lyapunov: graph version should add edge penalty
v_flat_imb = compute_lyapunov(nodes_imbalanced)
record("Graph Lyapunov > flat Lyapunov for imbalanced cluster",
PASS if v_imb > v_flat_imb else FAIL,
f"graph={v_imb:.1f} flat={v_flat_imb:.1f}")
# ============================================================================
# I. Environment Observation Populates Graph Fields
# ============================================================================
def test_I_env_graph_fields():
print("\n=== I. Environment Graph Fields ===")
try:
from server.AntiAtropos_environment import AntiAtroposEnvironment
from models import SREAction, ActionType
except ImportError:
record("Environment import", FAIL, "Cannot import AntiAtroposEnvironment")
return
env = AntiAtroposEnvironment()
obs = env.reset(task_id="task-1", mode="simulated", seed=42)
# Check that NodeObservations have graph fields
n0 = next((n for n in obs.nodes if n.node_id == "node-0"), None)
record("Environment reset succeeds",
PASS if n0 is not None else FAIL, "")
if n0:
record("node-0 has downstream_nodes",
PASS if isinstance(n0.downstream_nodes, list) and len(n0.downstream_nodes) > 0 else FAIL,
f"downstream={n0.downstream_nodes}")
record("node-0 has upstream_nodes",
PASS if isinstance(n0.upstream_nodes, list) else FAIL,
f"upstream={n0.upstream_nodes}")
record("node-0 has upstream_pressure",
PASS if n0.upstream_pressure is not None else FAIL,
f"pressure={n0.upstream_pressure:.3f}")
record("node-0 has outflow_rate",
PASS if n0.outflow_rate is not None else FAIL,
f"outflow={n0.outflow_rate:.3f}")
# Step once with SCALE_UP to verify reward computation
action = SREAction(action_type=ActionType.SCALE_UP, target_node_id="node-0", parameter=0.5)
obs2 = env.step(action)
record("Step reward is non-zero",
PASS if obs2.reward != 0.0 else FAIL,
f"reward={obs2.reward:.4f}")
# Lyapunov can be 0 on first tick if all queues are below capacity.
# Just verify it's a number (not None/NaN).
lyap_ok = obs2.lyapunov_energy is not None and not math.isnan(obs2.lyapunov_energy)
record("lyapunov_energy is valid (>=0, not NaN)",
PASS if lyap_ok else FAIL,
f"energy={obs2.lyapunov_energy}")
# ============================================================================
# J. Reward Components Across Tasks
# ============================================================================
def test_J_reward_components():
print("\n=== J. Reward Components ===")
env_module = None
try:
from server.AntiAtropos_environment import AntiAtroposEnvironment
from models import SREAction, ActionType
except ImportError:
record("Reward components", FAIL, "Cannot import environment")
return
for task_id, warmup_ticks in [("task-1", 30), ("task-2", 5), ("task-3", 5)]:
env = AntiAtroposEnvironment()
env.reset(task_id=task_id, mode="simulated", seed=42)
# Task-3 surge may not be active in early ticks (window depends on
# jitter). Force the surge window to be wide-open for validation.
if task_id == "task-3":
env._sim._t3_surge_start = 1
env._sim._t3_surge_end = 999
# Run enough ticks for queues to accumulate and drift to become non-zero.
# Task-1 has a slow ramp (0.5/tick) and needs ~15+ ticks to exceed ingress
# capacity of 90 req/tick (starting from ~86).
action = SREAction(action_type=ActionType.NO_OP, target_node_id="node-0", parameter=0.0)
obs = None
for _ in range(warmup_ticks):
obs = env.step(action)
# All sub-components should be present
has_drift = obs.reward_drift != 0.0
has_cost = obs.reward_cost != 0.0
record(f"{task_id}: reward_drift non-zero (after {warmup_ticks} ticks)",
PASS if has_drift else FAIL,
f"drift={obs.reward_drift:.4f}")
record(f"{task_id}: reward_cost non-zero",
PASS if has_cost else FAIL,
f"cost={obs.reward_cost:.4f}")
record(f"{task_id}: no NaN in raw_reward",
PASS if not math.isnan(obs.raw_reward) else FAIL,
f"raw={obs.raw_reward:.4f}")
# Quick check: drift and cost should be negative (penalties)
env = AntiAtroposEnvironment()
env.reset(task_id="task-1", mode="simulated", seed=42)
action = SREAction(action_type=ActionType.NO_OP, target_node_id="node-0", parameter=0.0)
obs = env.step(action)
record("reward_drift <= 0 (penalty, not reward)",
PASS if obs.reward_drift <= 0 else FAIL,
f"drift={obs.reward_drift:.4f}")
# ============================================================================
# Main
# ============================================================================
def main():
print("=" * 65)
print("AntiAtropos DAG Physics Validation")
print("=" * 65)
test_A_dag_routing()
test_B_task2_dag()
test_C_task3_surge_dag()
test_D_backpressure_temporary()
test_E_gradual_recovery()
test_F_graph_cascade()
test_G_reroute_with_dag()
test_H_graph_lyapunov()
test_I_env_graph_fields()
test_J_reward_components()
passed = sum(1 for _, s, _ in results if s == PASS)
failed = sum(1 for _, s, _ in results if s == FAIL)
total = len(results)
print("\n" + "=" * 65)
print(f"RESULTS: {passed}/{total} passed, {failed} failed")
print("=" * 65)
if failed > 0:
print("\nFAILED TESTS:")
for name, status, detail in results:
if status == FAIL:
print(f" X {name}: {detail}")
return 0 if failed == 0 else 1
if __name__ == "__main__":
sys.exit(main())
|