Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Comprehensive DAG Physics Validation for AntiAtropos training readiness. | |
| Verifies: | |
| A. DAG traffic routing (parent->child propagation) | |
| B. Task-2 scripted failure flows through DAG (not bypassed) | |
| C. Task-3 surge correct overlay on DAG | |
| D. Backpressure is temporary (not permanent capacity drain) | |
| E. Gradual recovery completes fully | |
| F. Graph-bounded cascades | |
| G. Reroute weights work with DAG | |
| H. Graph Lyapunov edge penalty | |
| I. Environment observation populates graph fields | |
| J. Reward components are non-degenerate across tasks | |
| Run: python validate_dag_physics.py | |
| """ | |
| import sys, os, math | |
| sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) | |
| from simulator import ( | |
| ClusterSimulator, NodeStatus, DEFAULT_CAPACITY, | |
| CLUSTER_TOPOLOGY, EXTERNAL_TRAFFIC_NODES, _TOPOLOGICAL_ORDER, | |
| DEFAULT_ROUTING_SPLIT, T1_INITIAL_LAMBDA, T2_INITIAL_LAMBDA, | |
| T3_INITIAL_LAMBDA, T3_SURGE_MAGNITUDE, | |
| COST_PER_CAPACITY_UNIT_PER_HOUR, FATAL_FAIL_THRESHOLD, | |
| NODE_RECOVERY_TICKS, BACKPRESSURE_THRESHOLD, | |
| ) | |
| from stability import ( | |
| compute_lyapunov, compute_lyapunov_graph, compute_reward, | |
| compute_barrier, normalize_reward, smooth_sla_penalty, compute_drift, | |
| BARRIER_NORM_SCALE, | |
| ) | |
| # --- Test harness --- | |
| PASS = "PASS" | |
| FAIL = "FAIL" | |
| results = [] | |
| def record(name, status, detail=""): | |
| results.append((name, status, detail)) | |
| icon = "+" if status == PASS else "X" | |
| msg = f" [{icon}] {name}" | |
| if detail: | |
| msg += f" -- {detail}" | |
| print(msg) | |
| # ============================================================================ | |
| # A. DAG Traffic Routing | |
| # ============================================================================ | |
| def test_A_dag_routing(): | |
| print("\n=== A. DAG Traffic Routing ===") | |
| sim = ClusterSimulator(n_nodes=5, task_id="task-1", seed=1) | |
| sim.reset(task_id="task-1", seed=1) | |
| # Tick once with NO_OP | |
| class _A: pass | |
| a = _A(); a.action_type = "NO_OP"; a.target_node_id = "node-0"; a.parameter = 0.0 | |
| sim.apply_action(a) | |
| sim.tick() | |
| nodes = {n.node_id: n for n in sim._nodes} | |
| # node-0 and node-4 should receive external traffic | |
| n0_in = nodes["node-0"].incoming_request_rate | |
| n4_in = nodes["node-4"].incoming_request_rate | |
| record("node-0 (ingress) receives traffic", | |
| PASS if n0_in > 0 else FAIL, | |
| f"incoming={n0_in:.1f}") | |
| record("node-4 (ingress) receives traffic", | |
| PASS if n4_in > 0 else FAIL, | |
| f"incoming={n4_in:.1f}") | |
| # node-1, node-2 should receive outflow from node-0 | |
| n1_in = nodes["node-1"].incoming_request_rate | |
| n2_in = nodes["node-2"].incoming_request_rate | |
| record("node-1 receives from node-0 (DAG child)", | |
| PASS if n1_in > 0 else FAIL, | |
| f"incoming={n1_in:.1f}") | |
| record("node-2 receives from node-0 (DAG child)", | |
| PASS if n2_in > 0 else FAIL, | |
| f"incoming={n2_in:.1f}") | |
| # node-3 should receive outflow from node-2 | |
| n3_in = nodes["node-3"].incoming_request_rate | |
| record("node-3 receives from node-2 (DAG grandchild)", | |
| PASS if n3_in > 0 else FAIL, | |
| f"incoming={n3_in:.1f}") | |
| # node-0 outflow should be ~incoming (since capacity >> lambda at start) | |
| record("node-0 has positive outflow_rate", | |
| PASS if nodes["node-0"].outflow_rate > 0 else FAIL, | |
| f"outflow={nodes['node-0'].outflow_rate:.1f}") | |
| # ============================================================================ | |
| # B. Task-2 Scripted Failure Flows Through DAG | |
| # ============================================================================ | |
| def test_B_task2_dag(): | |
| print("\n=== B. Task-2 Failure Through DAG ===") | |
| sim = ClusterSimulator(n_nodes=5, task_id="task-2", seed=42) | |
| sim.reset(task_id="task-2", seed=42) | |
| # Run enough ticks for failure to trigger | |
| class _A: pass | |
| a = _A(); a.action_type = "NO_OP"; a.target_node_id = "node-0"; a.parameter = 0.0 | |
| failed_id = None | |
| for _ in range(60): | |
| sim.apply_action(a) | |
| sim.tick() | |
| if sim._failed_node_id and failed_id is None: | |
| failed_id = sim._failed_node_id | |
| record("Scripted failure was assigned", | |
| PASS if failed_id is not None else FAIL, | |
| f"failed_id={failed_id}") | |
| nodes = {n.node_id: n for n in sim._nodes} | |
| failed_node = nodes.get(failed_id) | |
| record("Failed node has FAILED status", | |
| PASS if failed_node and failed_node.status == NodeStatus.FAILED else FAIL, | |
| f"status={failed_node.status if failed_node else 'N/A'}") | |
| # If failed node is a child of node-0 (e.g., node-1 or node-2), | |
| # node-0 should still be sending traffic to it (flow not bypassed) | |
| if failed_id in CLUSTER_TOPOLOGY.get("node-0", []): | |
| # The failed node outflow should be 0 (service_rate=0), | |
| # but it should still have incoming from DAG | |
| record("Failed child still receives DAG traffic (as dropped requests)", | |
| PASS if failed_node.incoming_request_rate >= 0 else FAIL, | |
| f"incoming={failed_node.incoming_request_rate:.1f} dropped={failed_node.dropped_requests:.1f}") | |
| # If failed node was node-2, node-3 should be starved (outflow=0 upstream) | |
| if failed_id == "node-2": | |
| n3 = nodes["node-3"] | |
| record("node-3 starved when parent node-2 fails", | |
| PASS if n3.incoming_request_rate == 0 else FAIL, | |
| f"node-3 incoming={n3.incoming_request_rate:.1f}") | |
| # ============================================================================ | |
| # C. Task-3 Surge Overlay on DAG | |
| # ============================================================================ | |
| def test_C_task3_surge_dag(): | |
| print("\n=== C. Task-3 Surge Overlay ===") | |
| sim = ClusterSimulator(n_nodes=5, task_id="task-3", seed=7) | |
| sim.reset(task_id="task-3", seed=7) | |
| # Force surge window to be active immediately | |
| sim._t3_surge_start = 0 | |
| sim._t3_surge_end = 999 | |
| class _A: pass | |
| a = _A(); a.action_type = "NO_OP"; a.target_node_id = "node-0"; a.parameter = 0.0 | |
| sim.apply_action(a) | |
| sim.tick() | |
| nodes = {n.node_id: n for n in sim._nodes} | |
| # node-1 and node-2 should have surge + DAG traffic | |
| n1_in = nodes["node-1"].incoming_request_rate | |
| n2_in = nodes["node-2"].incoming_request_rate | |
| record("node-1 receives surge + DAG traffic", | |
| PASS if n1_in > T3_SURGE_MAGNITUDE else FAIL, | |
| f"incoming={n1_in:.1f} (surge={T3_SURGE_MAGNITUDE})") | |
| record("node-2 receives surge + DAG traffic", | |
| PASS if n2_in > T3_SURGE_MAGNITUDE else FAIL, | |
| f"incoming={n2_in:.1f} (surge={T3_SURGE_MAGNITUDE})") | |
| # node-0 should still have base DAG traffic (not affected by surge directly) | |
| n0_in = nodes["node-0"].incoming_request_rate | |
| record("node-0 gets base DAG traffic (surge is side-channel)", | |
| PASS if n0_in < T3_SURGE_MAGNITUDE else FAIL, | |
| f"node-0 incoming={n0_in:.1f}") | |
| # ============================================================================ | |
| # D. Backpressure Is Temporary | |
| # ============================================================================ | |
| def test_D_backpressure_temporary(): | |
| print("\n=== D. Backpressure Temporary ===") | |
| sim = ClusterSimulator(n_nodes=5, task_id="task-1", seed=1) | |
| sim.reset(task_id="task-1", seed=1) | |
| node0 = next(n for n in sim._nodes if n.node_id == "node-0") | |
| original_cap = node0.capacity | |
| # Artificially overload node-0's children to trigger backpressure | |
| for n in sim._nodes: | |
| if n.node_id in CLUSTER_TOPOLOGY.get("node-0", []): | |
| n.queue_depth = BACKPRESSURE_THRESHOLD + 100.0 # well above threshold | |
| # Tick: backpressure should reduce node-0's capacity for THIS tick only | |
| class _A: pass | |
| a = _A(); a.action_type = "NO_OP"; a.target_node_id = "node-0"; a.parameter = 0.0 | |
| sim.apply_action(a) | |
| sim.tick() | |
| cap_after_tick = node0.capacity | |
| record("node-0 capacity restored after backpressure tick", | |
| PASS if abs(cap_after_tick - original_cap) < 0.01 else FAIL, | |
| f"before={original_cap:.2f} after={cap_after_tick:.2f}") | |
| # Tick again (children still overloaded) — capacity should still be original | |
| sim.apply_action(a) | |
| sim.tick() | |
| cap_after_tick2 = node0.capacity | |
| record("node-0 capacity intact after multiple backpressure ticks", | |
| PASS if abs(cap_after_tick2 - original_cap) < 0.01 else FAIL, | |
| f"after 2 ticks={cap_after_tick2:.2f} original={original_cap:.2f}") | |
| # Clear children overload — capacity should remain original | |
| for n in sim._nodes: | |
| if n.node_id in CLUSTER_TOPOLOGY.get("node-0", []): | |
| n.queue_depth = 0.0 | |
| sim.apply_action(a) | |
| sim.tick() | |
| cap_clear = node0.capacity | |
| record("node-0 capacity unchanged after children clear", | |
| PASS if abs(cap_clear - original_cap) < 0.01 else FAIL, | |
| f"capacity={cap_clear:.2f}") | |
| # ============================================================================ | |
| # E. Gradual Recovery Completes | |
| # ============================================================================ | |
| def test_E_gradual_recovery(): | |
| print("\n=== E. Gradual Recovery ===") | |
| sim = ClusterSimulator(n_nodes=5, task_id="task-1", seed=1) | |
| sim.reset(task_id="task-1", seed=1) | |
| node = sim._nodes[2] | |
| node.queue_depth = 250.0 | |
| sim._update_statuses() | |
| record("Node becomes FAILED on overload", | |
| PASS if node.status == NodeStatus.FAILED else FAIL, | |
| f"status={node.status}") | |
| record("Capacity drops to 0.5 at failure", | |
| PASS if abs(node.capacity - 0.5) < 0.01 else FAIL, | |
| f"capacity={node.capacity}") | |
| # Tick through full recovery (NODE_RECOVERY_TICKS + some margin) | |
| class _A: pass | |
| a = _A(); a.action_type = "NO_OP"; a.target_node_id = "node-0"; a.parameter = 0.0 | |
| for _ in range(NODE_RECOVERY_TICKS + 5): | |
| sim.apply_action(a) | |
| sim.tick() | |
| record("Node reaches HEALTHY after full recovery", | |
| PASS if node.status == NodeStatus.HEALTHY else FAIL, | |
| f"status={node.status}") | |
| # Capacity should have ramped: start=0.5, each recovery tick adds 0.5 | |
| # After NODE_RECOVERY_TICKS=20 ticks: 0.5 + 20*0.5 = 10.5, capped at 3.0 | |
| record("Capacity recovered to DEFAULT_CAPACITY (capped)", | |
| PASS if abs(node.capacity - DEFAULT_CAPACITY) < 0.01 else FAIL, | |
| f"capacity={node.capacity:.2f} expected={DEFAULT_CAPACITY}") | |
| # ============================================================================ | |
| # F. Graph-Bounded Cascades | |
| # ============================================================================ | |
| def test_F_graph_cascade(): | |
| print("\n=== F. Graph-Bounded Cascades ===") | |
| sim = ClusterSimulator(n_nodes=5, task_id="task-1", seed=1) | |
| sim.reset(task_id="task-1", seed=1) | |
| # Fail node-2, overload its children/parents | |
| node2 = sim._nodes[2] | |
| node2.queue_depth = 250.0 | |
| sim._update_statuses() # node-2 becomes FAILED, triggers cascade | |
| # node-3 is child of node-2 — should be at_risk | |
| # node-0 is parent of node-2 — should be at_risk | |
| node3 = sim._nodes[3] | |
| node0 = sim._nodes[0] | |
| # Overload node-3 to trigger cascade | |
| node3.queue_depth = FATAL_FAIL_THRESHOLD * 1.5 # > cascade threshold | |
| sim._cascade_failures() | |
| record("node-3 (child of failed node-2) cascades to DEGRADED", | |
| PASS if node3.status == NodeStatus.DEGRADED else FAIL, | |
| f"node-3 status={node3.status}") | |
| # node-4 is NOT adjacent to node-2 — should NOT cascade | |
| node4 = sim._nodes[4] | |
| node4.queue_depth = FATAL_FAIL_THRESHOLD * 1.5 | |
| sim._cascade_failures() | |
| record("node-4 (not adjacent to failed) does NOT cascade", | |
| PASS if node4.status != NodeStatus.DEGRADED else FAIL, | |
| f"node-4 status={node4.status}") | |
| # ============================================================================ | |
| # G. Reroute Weights with DAG | |
| # ============================================================================ | |
| def test_G_reroute_with_dag(): | |
| print("\n=== G. Reroute Weights with DAG ===") | |
| sim = ClusterSimulator(n_nodes=5, task_id="task-1", seed=1) | |
| sim.reset(task_id="task-1", seed=1) | |
| class _A: pass | |
| # Tick once to establish baseline | |
| a = _A(); a.action_type = "NO_OP"; a.target_node_id = "node-0"; a.parameter = 0.0 | |
| sim.apply_action(a) | |
| sim.tick() | |
| node0 = next(n for n in sim._nodes if n.node_id == "node-0") | |
| node4 = next(n for n in sim._nodes if n.node_id == "node-4") | |
| baseline_n0_in = node0.incoming_request_rate | |
| # Reroute 100% of node-0 traffic away | |
| a2 = _A(); a2.action_type = "REROUTE_TRAFFIC"; a2.target_node_id = "node-0"; a2.parameter = 1.0 | |
| sim.apply_action(a2) | |
| sim.tick() | |
| n0_in_after = node0.incoming_request_rate | |
| record("Reroute reduces node-0 incoming traffic", | |
| PASS if n0_in_after < baseline_n0_in else FAIL, | |
| f"before={baseline_n0_in:.1f} after={n0_in_after:.1f}") | |
| # Verify outflow_rate is also reduced (since incoming is lower) | |
| record("node-0 outflow reduced after reroute", | |
| PASS if node0.outflow_rate < baseline_n0_in else FAIL, | |
| f"outflow={node0.outflow_rate:.1f}") | |
| # Reroute weight should decay each tick (0.5 factor) | |
| w = sim._reroute_weights.get("node-0", 0.0) | |
| record("Reroute weight decays (0.5 * prev)", | |
| PASS if w < 1.0 else FAIL, | |
| f"weight after first decay={w:.3f}") | |
| # ============================================================================ | |
| # H. Graph Lyapunov Edge Penalty | |
| # ============================================================================ | |
| def test_H_graph_lyapunov(): | |
| print("\n=== H. Graph Lyapunov Edge Penalty ===") | |
| nodes_balanced = [ | |
| {"node_id": "node-0", "queue_depth": 50.0, "importance_weight": 2.0}, | |
| {"node_id": "node-4", "queue_depth": 50.0, "importance_weight": 1.0}, | |
| {"node_id": "node-1", "queue_depth": 50.0, "importance_weight": 1.0}, | |
| {"node_id": "node-2", "queue_depth": 50.0, "importance_weight": 1.0}, | |
| {"node_id": "node-3", "queue_depth": 50.0, "importance_weight": 1.0}, | |
| ] | |
| nodes_imbalanced = [ | |
| {"node_id": "node-0", "queue_depth": 200.0, "importance_weight": 2.0}, | |
| {"node_id": "node-4", "queue_depth": 10.0, "importance_weight": 1.0}, | |
| {"node_id": "node-1", "queue_depth": 10.0, "importance_weight": 1.0}, | |
| {"node_id": "node-2", "queue_depth": 10.0, "importance_weight": 1.0}, | |
| {"node_id": "node-3", "queue_depth": 10.0, "importance_weight": 1.0}, | |
| ] | |
| v_bal = compute_lyapunov_graph(nodes_balanced, CLUSTER_TOPOLOGY) | |
| v_imb = compute_lyapunov_graph(nodes_imbalanced, CLUSTER_TOPOLOGY) | |
| record("Graph Lyapunov: imbalanced > balanced", | |
| PASS if v_imb > v_bal else FAIL, | |
| f"balanced={v_bal:.1f} imbalanced={v_imb:.1f}") | |
| # Compare with flat Lyapunov: graph version should add edge penalty | |
| v_flat_imb = compute_lyapunov(nodes_imbalanced) | |
| record("Graph Lyapunov > flat Lyapunov for imbalanced cluster", | |
| PASS if v_imb > v_flat_imb else FAIL, | |
| f"graph={v_imb:.1f} flat={v_flat_imb:.1f}") | |
| # ============================================================================ | |
| # I. Environment Observation Populates Graph Fields | |
| # ============================================================================ | |
| def test_I_env_graph_fields(): | |
| print("\n=== I. Environment Graph Fields ===") | |
| try: | |
| from server.AntiAtropos_environment import AntiAtroposEnvironment | |
| from models import SREAction, ActionType | |
| except ImportError: | |
| record("Environment import", FAIL, "Cannot import AntiAtroposEnvironment") | |
| return | |
| env = AntiAtroposEnvironment() | |
| obs = env.reset(task_id="task-1", mode="simulated", seed=42) | |
| # Check that NodeObservations have graph fields | |
| n0 = next((n for n in obs.nodes if n.node_id == "node-0"), None) | |
| record("Environment reset succeeds", | |
| PASS if n0 is not None else FAIL, "") | |
| if n0: | |
| record("node-0 has downstream_nodes", | |
| PASS if isinstance(n0.downstream_nodes, list) and len(n0.downstream_nodes) > 0 else FAIL, | |
| f"downstream={n0.downstream_nodes}") | |
| record("node-0 has upstream_nodes", | |
| PASS if isinstance(n0.upstream_nodes, list) else FAIL, | |
| f"upstream={n0.upstream_nodes}") | |
| record("node-0 has upstream_pressure", | |
| PASS if n0.upstream_pressure is not None else FAIL, | |
| f"pressure={n0.upstream_pressure:.3f}") | |
| record("node-0 has outflow_rate", | |
| PASS if n0.outflow_rate is not None else FAIL, | |
| f"outflow={n0.outflow_rate:.3f}") | |
| # Step once with SCALE_UP to verify reward computation | |
| action = SREAction(action_type=ActionType.SCALE_UP, target_node_id="node-0", parameter=0.5) | |
| obs2 = env.step(action) | |
| record("Step reward is non-zero", | |
| PASS if obs2.reward != 0.0 else FAIL, | |
| f"reward={obs2.reward:.4f}") | |
| # Lyapunov can be 0 on first tick if all queues are below capacity. | |
| # Just verify it's a number (not None/NaN). | |
| lyap_ok = obs2.lyapunov_energy is not None and not math.isnan(obs2.lyapunov_energy) | |
| record("lyapunov_energy is valid (>=0, not NaN)", | |
| PASS if lyap_ok else FAIL, | |
| f"energy={obs2.lyapunov_energy}") | |
| # ============================================================================ | |
| # J. Reward Components Across Tasks | |
| # ============================================================================ | |
| def test_J_reward_components(): | |
| print("\n=== J. Reward Components ===") | |
| env_module = None | |
| try: | |
| from server.AntiAtropos_environment import AntiAtroposEnvironment | |
| from models import SREAction, ActionType | |
| except ImportError: | |
| record("Reward components", FAIL, "Cannot import environment") | |
| return | |
| for task_id, warmup_ticks in [("task-1", 30), ("task-2", 5), ("task-3", 5)]: | |
| env = AntiAtroposEnvironment() | |
| env.reset(task_id=task_id, mode="simulated", seed=42) | |
| # Task-3 surge may not be active in early ticks (window depends on | |
| # jitter). Force the surge window to be wide-open for validation. | |
| if task_id == "task-3": | |
| env._sim._t3_surge_start = 1 | |
| env._sim._t3_surge_end = 999 | |
| # Run enough ticks for queues to accumulate and drift to become non-zero. | |
| # Task-1 has a slow ramp (0.5/tick) and needs ~15+ ticks to exceed ingress | |
| # capacity of 90 req/tick (starting from ~86). | |
| action = SREAction(action_type=ActionType.NO_OP, target_node_id="node-0", parameter=0.0) | |
| obs = None | |
| for _ in range(warmup_ticks): | |
| obs = env.step(action) | |
| # All sub-components should be present | |
| has_drift = obs.reward_drift != 0.0 | |
| has_cost = obs.reward_cost != 0.0 | |
| record(f"{task_id}: reward_drift non-zero (after {warmup_ticks} ticks)", | |
| PASS if has_drift else FAIL, | |
| f"drift={obs.reward_drift:.4f}") | |
| record(f"{task_id}: reward_cost non-zero", | |
| PASS if has_cost else FAIL, | |
| f"cost={obs.reward_cost:.4f}") | |
| record(f"{task_id}: no NaN in raw_reward", | |
| PASS if not math.isnan(obs.raw_reward) else FAIL, | |
| f"raw={obs.raw_reward:.4f}") | |
| # Quick check: drift and cost should be negative (penalties) | |
| env = AntiAtroposEnvironment() | |
| env.reset(task_id="task-1", mode="simulated", seed=42) | |
| action = SREAction(action_type=ActionType.NO_OP, target_node_id="node-0", parameter=0.0) | |
| obs = env.step(action) | |
| record("reward_drift <= 0 (penalty, not reward)", | |
| PASS if obs.reward_drift <= 0 else FAIL, | |
| f"drift={obs.reward_drift:.4f}") | |
| # ============================================================================ | |
| # Main | |
| # ============================================================================ | |
| def main(): | |
| print("=" * 65) | |
| print("AntiAtropos DAG Physics Validation") | |
| print("=" * 65) | |
| test_A_dag_routing() | |
| test_B_task2_dag() | |
| test_C_task3_surge_dag() | |
| test_D_backpressure_temporary() | |
| test_E_gradual_recovery() | |
| test_F_graph_cascade() | |
| test_G_reroute_with_dag() | |
| test_H_graph_lyapunov() | |
| test_I_env_graph_fields() | |
| test_J_reward_components() | |
| passed = sum(1 for _, s, _ in results if s == PASS) | |
| failed = sum(1 for _, s, _ in results if s == FAIL) | |
| total = len(results) | |
| print("\n" + "=" * 65) | |
| print(f"RESULTS: {passed}/{total} passed, {failed} failed") | |
| print("=" * 65) | |
| if failed > 0: | |
| print("\nFAILED TESTS:") | |
| for name, status, detail in results: | |
| if status == FAIL: | |
| print(f" X {name}: {detail}") | |
| return 0 if failed == 0 else 1 | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |