|
|
| """
|
| Comprehensive DAG Physics Validation for AntiAtropos training readiness.
|
|
|
| Verifies:
|
| A. DAG traffic routing (parent->child propagation)
|
| B. Task-2 scripted failure flows through DAG (not bypassed)
|
| C. Task-3 surge correct overlay on DAG
|
| D. Backpressure is temporary (not permanent capacity drain)
|
| E. Gradual recovery completes fully
|
| F. Graph-bounded cascades
|
| G. Reroute weights work with DAG
|
| H. Graph Lyapunov edge penalty
|
| I. Environment observation populates graph fields
|
| J. Reward components are non-degenerate across tasks
|
|
|
| Run: python validate_dag_physics.py
|
| """
|
|
|
| import sys, os, math
|
| sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
| from simulator import (
|
| ClusterSimulator, NodeStatus, DEFAULT_CAPACITY,
|
| CLUSTER_TOPOLOGY, EXTERNAL_TRAFFIC_NODES, _TOPOLOGICAL_ORDER,
|
| DEFAULT_ROUTING_SPLIT, T1_INITIAL_LAMBDA, T2_INITIAL_LAMBDA,
|
| T3_INITIAL_LAMBDA, T3_SURGE_MAGNITUDE,
|
| COST_PER_CAPACITY_UNIT_PER_HOUR, FATAL_FAIL_THRESHOLD,
|
| NODE_RECOVERY_TICKS, BACKPRESSURE_THRESHOLD,
|
| )
|
| from stability import (
|
| compute_lyapunov, compute_lyapunov_graph, compute_reward,
|
| compute_barrier, normalize_reward, smooth_sla_penalty, compute_drift,
|
| BARRIER_NORM_SCALE,
|
| )
|
|
|
|
|
| PASS = "PASS"
|
| FAIL = "FAIL"
|
| results = []
|
|
|
| def record(name, status, detail=""):
|
| results.append((name, status, detail))
|
| icon = "+" if status == PASS else "X"
|
| msg = f" [{icon}] {name}"
|
| if detail:
|
| msg += f" -- {detail}"
|
| print(msg)
|
|
|
|
|
|
|
|
|
| def test_A_dag_routing():
|
| print("\n=== A. DAG Traffic Routing ===")
|
| sim = ClusterSimulator(n_nodes=5, task_id="task-1", seed=1)
|
| sim.reset(task_id="task-1", seed=1)
|
|
|
|
|
| class _A: pass
|
| a = _A(); a.action_type = "NO_OP"; a.target_node_id = "node-0"; a.parameter = 0.0
|
| sim.apply_action(a)
|
| sim.tick()
|
|
|
| nodes = {n.node_id: n for n in sim._nodes}
|
|
|
|
|
| n0_in = nodes["node-0"].incoming_request_rate
|
| n4_in = nodes["node-4"].incoming_request_rate
|
| record("node-0 (ingress) receives traffic",
|
| PASS if n0_in > 0 else FAIL,
|
| f"incoming={n0_in:.1f}")
|
| record("node-4 (ingress) receives traffic",
|
| PASS if n4_in > 0 else FAIL,
|
| f"incoming={n4_in:.1f}")
|
|
|
|
|
| n1_in = nodes["node-1"].incoming_request_rate
|
| n2_in = nodes["node-2"].incoming_request_rate
|
| record("node-1 receives from node-0 (DAG child)",
|
| PASS if n1_in > 0 else FAIL,
|
| f"incoming={n1_in:.1f}")
|
| record("node-2 receives from node-0 (DAG child)",
|
| PASS if n2_in > 0 else FAIL,
|
| f"incoming={n2_in:.1f}")
|
|
|
|
|
| n3_in = nodes["node-3"].incoming_request_rate
|
| record("node-3 receives from node-2 (DAG grandchild)",
|
| PASS if n3_in > 0 else FAIL,
|
| f"incoming={n3_in:.1f}")
|
|
|
|
|
| record("node-0 has positive outflow_rate",
|
| PASS if nodes["node-0"].outflow_rate > 0 else FAIL,
|
| f"outflow={nodes['node-0'].outflow_rate:.1f}")
|
|
|
|
|
|
|
|
|
|
|
| def test_B_task2_dag():
|
| print("\n=== B. Task-2 Failure Through DAG ===")
|
| sim = ClusterSimulator(n_nodes=5, task_id="task-2", seed=42)
|
| sim.reset(task_id="task-2", seed=42)
|
|
|
|
|
| class _A: pass
|
| a = _A(); a.action_type = "NO_OP"; a.target_node_id = "node-0"; a.parameter = 0.0
|
|
|
| failed_id = None
|
| for _ in range(60):
|
| sim.apply_action(a)
|
| sim.tick()
|
| if sim._failed_node_id and failed_id is None:
|
| failed_id = sim._failed_node_id
|
|
|
| record("Scripted failure was assigned",
|
| PASS if failed_id is not None else FAIL,
|
| f"failed_id={failed_id}")
|
|
|
| nodes = {n.node_id: n for n in sim._nodes}
|
| failed_node = nodes.get(failed_id)
|
| record("Failed node has FAILED status",
|
| PASS if failed_node and failed_node.status == NodeStatus.FAILED else FAIL,
|
| f"status={failed_node.status if failed_node else 'N/A'}")
|
|
|
|
|
|
|
| if failed_id in CLUSTER_TOPOLOGY.get("node-0", []):
|
|
|
|
|
| record("Failed child still receives DAG traffic (as dropped requests)",
|
| PASS if failed_node.incoming_request_rate >= 0 else FAIL,
|
| f"incoming={failed_node.incoming_request_rate:.1f} dropped={failed_node.dropped_requests:.1f}")
|
|
|
|
|
| if failed_id == "node-2":
|
| n3 = nodes["node-3"]
|
| record("node-3 starved when parent node-2 fails",
|
| PASS if n3.incoming_request_rate == 0 else FAIL,
|
| f"node-3 incoming={n3.incoming_request_rate:.1f}")
|
|
|
|
|
|
|
|
|
|
|
| def test_C_task3_surge_dag():
|
| print("\n=== C. Task-3 Surge Overlay ===")
|
| sim = ClusterSimulator(n_nodes=5, task_id="task-3", seed=7)
|
| sim.reset(task_id="task-3", seed=7)
|
|
|
|
|
| sim._t3_surge_start = 0
|
| sim._t3_surge_end = 999
|
|
|
| class _A: pass
|
| a = _A(); a.action_type = "NO_OP"; a.target_node_id = "node-0"; a.parameter = 0.0
|
| sim.apply_action(a)
|
| sim.tick()
|
|
|
| nodes = {n.node_id: n for n in sim._nodes}
|
|
|
|
|
| n1_in = nodes["node-1"].incoming_request_rate
|
| n2_in = nodes["node-2"].incoming_request_rate
|
| record("node-1 receives surge + DAG traffic",
|
| PASS if n1_in > T3_SURGE_MAGNITUDE else FAIL,
|
| f"incoming={n1_in:.1f} (surge={T3_SURGE_MAGNITUDE})")
|
| record("node-2 receives surge + DAG traffic",
|
| PASS if n2_in > T3_SURGE_MAGNITUDE else FAIL,
|
| f"incoming={n2_in:.1f} (surge={T3_SURGE_MAGNITUDE})")
|
|
|
|
|
| n0_in = nodes["node-0"].incoming_request_rate
|
| record("node-0 gets base DAG traffic (surge is side-channel)",
|
| PASS if n0_in < T3_SURGE_MAGNITUDE else FAIL,
|
| f"node-0 incoming={n0_in:.1f}")
|
|
|
|
|
|
|
|
|
|
|
| def test_D_backpressure_temporary():
|
| print("\n=== D. Backpressure Temporary ===")
|
| sim = ClusterSimulator(n_nodes=5, task_id="task-1", seed=1)
|
| sim.reset(task_id="task-1", seed=1)
|
|
|
| node0 = next(n for n in sim._nodes if n.node_id == "node-0")
|
| original_cap = node0.capacity
|
|
|
|
|
| for n in sim._nodes:
|
| if n.node_id in CLUSTER_TOPOLOGY.get("node-0", []):
|
| n.queue_depth = BACKPRESSURE_THRESHOLD + 100.0
|
|
|
|
|
| class _A: pass
|
| a = _A(); a.action_type = "NO_OP"; a.target_node_id = "node-0"; a.parameter = 0.0
|
| sim.apply_action(a)
|
| sim.tick()
|
|
|
| cap_after_tick = node0.capacity
|
| record("node-0 capacity restored after backpressure tick",
|
| PASS if abs(cap_after_tick - original_cap) < 0.01 else FAIL,
|
| f"before={original_cap:.2f} after={cap_after_tick:.2f}")
|
|
|
|
|
| sim.apply_action(a)
|
| sim.tick()
|
| cap_after_tick2 = node0.capacity
|
| record("node-0 capacity intact after multiple backpressure ticks",
|
| PASS if abs(cap_after_tick2 - original_cap) < 0.01 else FAIL,
|
| f"after 2 ticks={cap_after_tick2:.2f} original={original_cap:.2f}")
|
|
|
|
|
| for n in sim._nodes:
|
| if n.node_id in CLUSTER_TOPOLOGY.get("node-0", []):
|
| n.queue_depth = 0.0
|
| sim.apply_action(a)
|
| sim.tick()
|
| cap_clear = node0.capacity
|
| record("node-0 capacity unchanged after children clear",
|
| PASS if abs(cap_clear - original_cap) < 0.01 else FAIL,
|
| f"capacity={cap_clear:.2f}")
|
|
|
|
|
|
|
|
|
|
|
| def test_E_gradual_recovery():
|
| print("\n=== E. Gradual Recovery ===")
|
| sim = ClusterSimulator(n_nodes=5, task_id="task-1", seed=1)
|
| sim.reset(task_id="task-1", seed=1)
|
|
|
| node = sim._nodes[2]
|
| node.queue_depth = 250.0
|
| sim._update_statuses()
|
|
|
| record("Node becomes FAILED on overload",
|
| PASS if node.status == NodeStatus.FAILED else FAIL,
|
| f"status={node.status}")
|
| record("Capacity drops to 0.5 at failure",
|
| PASS if abs(node.capacity - 0.5) < 0.01 else FAIL,
|
| f"capacity={node.capacity}")
|
|
|
|
|
| class _A: pass
|
| a = _A(); a.action_type = "NO_OP"; a.target_node_id = "node-0"; a.parameter = 0.0
|
| for _ in range(NODE_RECOVERY_TICKS + 5):
|
| sim.apply_action(a)
|
| sim.tick()
|
|
|
| record("Node reaches HEALTHY after full recovery",
|
| PASS if node.status == NodeStatus.HEALTHY else FAIL,
|
| f"status={node.status}")
|
|
|
|
|
|
|
| record("Capacity recovered to DEFAULT_CAPACITY (capped)",
|
| PASS if abs(node.capacity - DEFAULT_CAPACITY) < 0.01 else FAIL,
|
| f"capacity={node.capacity:.2f} expected={DEFAULT_CAPACITY}")
|
|
|
|
|
|
|
|
|
|
|
| def test_F_graph_cascade():
|
| print("\n=== F. Graph-Bounded Cascades ===")
|
| sim = ClusterSimulator(n_nodes=5, task_id="task-1", seed=1)
|
| sim.reset(task_id="task-1", seed=1)
|
|
|
|
|
| node2 = sim._nodes[2]
|
| node2.queue_depth = 250.0
|
| sim._update_statuses()
|
|
|
|
|
|
|
| node3 = sim._nodes[3]
|
| node0 = sim._nodes[0]
|
|
|
| node3.queue_depth = FATAL_FAIL_THRESHOLD * 1.5
|
| sim._cascade_failures()
|
|
|
| record("node-3 (child of failed node-2) cascades to DEGRADED",
|
| PASS if node3.status == NodeStatus.DEGRADED else FAIL,
|
| f"node-3 status={node3.status}")
|
|
|
|
|
| node4 = sim._nodes[4]
|
| node4.queue_depth = FATAL_FAIL_THRESHOLD * 1.5
|
| sim._cascade_failures()
|
| record("node-4 (not adjacent to failed) does NOT cascade",
|
| PASS if node4.status != NodeStatus.DEGRADED else FAIL,
|
| f"node-4 status={node4.status}")
|
|
|
|
|
|
|
|
|
|
|
| def test_G_reroute_with_dag():
|
| print("\n=== G. Reroute Weights with DAG ===")
|
| sim = ClusterSimulator(n_nodes=5, task_id="task-1", seed=1)
|
| sim.reset(task_id="task-1", seed=1)
|
|
|
| class _A: pass
|
|
|
|
|
| a = _A(); a.action_type = "NO_OP"; a.target_node_id = "node-0"; a.parameter = 0.0
|
| sim.apply_action(a)
|
| sim.tick()
|
|
|
| node0 = next(n for n in sim._nodes if n.node_id == "node-0")
|
| node4 = next(n for n in sim._nodes if n.node_id == "node-4")
|
| baseline_n0_in = node0.incoming_request_rate
|
|
|
|
|
| a2 = _A(); a2.action_type = "REROUTE_TRAFFIC"; a2.target_node_id = "node-0"; a2.parameter = 1.0
|
| sim.apply_action(a2)
|
| sim.tick()
|
|
|
| n0_in_after = node0.incoming_request_rate
|
| record("Reroute reduces node-0 incoming traffic",
|
| PASS if n0_in_after < baseline_n0_in else FAIL,
|
| f"before={baseline_n0_in:.1f} after={n0_in_after:.1f}")
|
|
|
|
|
| record("node-0 outflow reduced after reroute",
|
| PASS if node0.outflow_rate < baseline_n0_in else FAIL,
|
| f"outflow={node0.outflow_rate:.1f}")
|
|
|
|
|
| w = sim._reroute_weights.get("node-0", 0.0)
|
| record("Reroute weight decays (0.5 * prev)",
|
| PASS if w < 1.0 else FAIL,
|
| f"weight after first decay={w:.3f}")
|
|
|
|
|
|
|
|
|
|
|
| def test_H_graph_lyapunov():
|
| print("\n=== H. Graph Lyapunov Edge Penalty ===")
|
| nodes_balanced = [
|
| {"node_id": "node-0", "queue_depth": 50.0, "importance_weight": 2.0},
|
| {"node_id": "node-4", "queue_depth": 50.0, "importance_weight": 1.0},
|
| {"node_id": "node-1", "queue_depth": 50.0, "importance_weight": 1.0},
|
| {"node_id": "node-2", "queue_depth": 50.0, "importance_weight": 1.0},
|
| {"node_id": "node-3", "queue_depth": 50.0, "importance_weight": 1.0},
|
| ]
|
| nodes_imbalanced = [
|
| {"node_id": "node-0", "queue_depth": 200.0, "importance_weight": 2.0},
|
| {"node_id": "node-4", "queue_depth": 10.0, "importance_weight": 1.0},
|
| {"node_id": "node-1", "queue_depth": 10.0, "importance_weight": 1.0},
|
| {"node_id": "node-2", "queue_depth": 10.0, "importance_weight": 1.0},
|
| {"node_id": "node-3", "queue_depth": 10.0, "importance_weight": 1.0},
|
| ]
|
|
|
| v_bal = compute_lyapunov_graph(nodes_balanced, CLUSTER_TOPOLOGY)
|
| v_imb = compute_lyapunov_graph(nodes_imbalanced, CLUSTER_TOPOLOGY)
|
| record("Graph Lyapunov: imbalanced > balanced",
|
| PASS if v_imb > v_bal else FAIL,
|
| f"balanced={v_bal:.1f} imbalanced={v_imb:.1f}")
|
|
|
|
|
| v_flat_imb = compute_lyapunov(nodes_imbalanced)
|
| record("Graph Lyapunov > flat Lyapunov for imbalanced cluster",
|
| PASS if v_imb > v_flat_imb else FAIL,
|
| f"graph={v_imb:.1f} flat={v_flat_imb:.1f}")
|
|
|
|
|
|
|
|
|
|
|
| def test_I_env_graph_fields():
|
| print("\n=== I. Environment Graph Fields ===")
|
| try:
|
| from server.AntiAtropos_environment import AntiAtroposEnvironment
|
| from models import SREAction, ActionType
|
| except ImportError:
|
| record("Environment import", FAIL, "Cannot import AntiAtroposEnvironment")
|
| return
|
|
|
| env = AntiAtroposEnvironment()
|
| obs = env.reset(task_id="task-1", mode="simulated", seed=42)
|
|
|
|
|
| n0 = next((n for n in obs.nodes if n.node_id == "node-0"), None)
|
| record("Environment reset succeeds",
|
| PASS if n0 is not None else FAIL, "")
|
|
|
| if n0:
|
| record("node-0 has downstream_nodes",
|
| PASS if isinstance(n0.downstream_nodes, list) and len(n0.downstream_nodes) > 0 else FAIL,
|
| f"downstream={n0.downstream_nodes}")
|
| record("node-0 has upstream_nodes",
|
| PASS if isinstance(n0.upstream_nodes, list) else FAIL,
|
| f"upstream={n0.upstream_nodes}")
|
| record("node-0 has upstream_pressure",
|
| PASS if n0.upstream_pressure is not None else FAIL,
|
| f"pressure={n0.upstream_pressure:.3f}")
|
| record("node-0 has outflow_rate",
|
| PASS if n0.outflow_rate is not None else FAIL,
|
| f"outflow={n0.outflow_rate:.3f}")
|
|
|
|
|
| action = SREAction(action_type=ActionType.SCALE_UP, target_node_id="node-0", parameter=0.5)
|
| obs2 = env.step(action)
|
|
|
| record("Step reward is non-zero",
|
| PASS if obs2.reward != 0.0 else FAIL,
|
| f"reward={obs2.reward:.4f}")
|
|
|
|
|
| lyap_ok = obs2.lyapunov_energy is not None and not math.isnan(obs2.lyapunov_energy)
|
| record("lyapunov_energy is valid (>=0, not NaN)",
|
| PASS if lyap_ok else FAIL,
|
| f"energy={obs2.lyapunov_energy}")
|
|
|
|
|
|
|
|
|
|
|
| def test_J_reward_components():
|
| print("\n=== J. Reward Components ===")
|
| env_module = None
|
| try:
|
| from server.AntiAtropos_environment import AntiAtroposEnvironment
|
| from models import SREAction, ActionType
|
| except ImportError:
|
| record("Reward components", FAIL, "Cannot import environment")
|
| return
|
|
|
| for task_id, warmup_ticks in [("task-1", 30), ("task-2", 5), ("task-3", 5)]:
|
| env = AntiAtroposEnvironment()
|
| env.reset(task_id=task_id, mode="simulated", seed=42)
|
|
|
|
|
|
|
| if task_id == "task-3":
|
| env._sim._t3_surge_start = 1
|
| env._sim._t3_surge_end = 999
|
|
|
|
|
|
|
|
|
| action = SREAction(action_type=ActionType.NO_OP, target_node_id="node-0", parameter=0.0)
|
| obs = None
|
| for _ in range(warmup_ticks):
|
| obs = env.step(action)
|
|
|
|
|
| has_drift = obs.reward_drift != 0.0
|
| has_cost = obs.reward_cost != 0.0
|
| record(f"{task_id}: reward_drift non-zero (after {warmup_ticks} ticks)",
|
| PASS if has_drift else FAIL,
|
| f"drift={obs.reward_drift:.4f}")
|
| record(f"{task_id}: reward_cost non-zero",
|
| PASS if has_cost else FAIL,
|
| f"cost={obs.reward_cost:.4f}")
|
| record(f"{task_id}: no NaN in raw_reward",
|
| PASS if not math.isnan(obs.raw_reward) else FAIL,
|
| f"raw={obs.raw_reward:.4f}")
|
|
|
|
|
| env = AntiAtroposEnvironment()
|
| env.reset(task_id="task-1", mode="simulated", seed=42)
|
| action = SREAction(action_type=ActionType.NO_OP, target_node_id="node-0", parameter=0.0)
|
| obs = env.step(action)
|
| record("reward_drift <= 0 (penalty, not reward)",
|
| PASS if obs.reward_drift <= 0 else FAIL,
|
| f"drift={obs.reward_drift:.4f}")
|
|
|
|
|
|
|
|
|
|
|
| def main():
|
| print("=" * 65)
|
| print("AntiAtropos DAG Physics Validation")
|
| print("=" * 65)
|
|
|
| test_A_dag_routing()
|
| test_B_task2_dag()
|
| test_C_task3_surge_dag()
|
| test_D_backpressure_temporary()
|
| test_E_gradual_recovery()
|
| test_F_graph_cascade()
|
| test_G_reroute_with_dag()
|
| test_H_graph_lyapunov()
|
| test_I_env_graph_fields()
|
| test_J_reward_components()
|
|
|
| passed = sum(1 for _, s, _ in results if s == PASS)
|
| failed = sum(1 for _, s, _ in results if s == FAIL)
|
| total = len(results)
|
|
|
| print("\n" + "=" * 65)
|
| print(f"RESULTS: {passed}/{total} passed, {failed} failed")
|
| print("=" * 65)
|
|
|
| if failed > 0:
|
| print("\nFAILED TESTS:")
|
| for name, status, detail in results:
|
| if status == FAIL:
|
| print(f" X {name}: {detail}")
|
|
|
| return 0 if failed == 0 else 1
|
|
|
|
|
| if __name__ == "__main__":
|
| sys.exit(main())
|
|
|