AntiAtropos / validate_dag_physics.py
PranavKK1201
graph modelling
b5e5650
#!/usr/bin/env python3
"""
Comprehensive DAG Physics Validation for AntiAtropos training readiness.
Verifies:
A. DAG traffic routing (parent->child propagation)
B. Task-2 scripted failure flows through DAG (not bypassed)
C. Task-3 surge correct overlay on DAG
D. Backpressure is temporary (not permanent capacity drain)
E. Gradual recovery completes fully
F. Graph-bounded cascades
G. Reroute weights work with DAG
H. Graph Lyapunov edge penalty
I. Environment observation populates graph fields
J. Reward components are non-degenerate across tasks
Run: python validate_dag_physics.py
"""
import sys, os, math
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from simulator import (
ClusterSimulator, NodeStatus, DEFAULT_CAPACITY,
CLUSTER_TOPOLOGY, EXTERNAL_TRAFFIC_NODES, _TOPOLOGICAL_ORDER,
DEFAULT_ROUTING_SPLIT, T1_INITIAL_LAMBDA, T2_INITIAL_LAMBDA,
T3_INITIAL_LAMBDA, T3_SURGE_MAGNITUDE,
COST_PER_CAPACITY_UNIT_PER_HOUR, FATAL_FAIL_THRESHOLD,
NODE_RECOVERY_TICKS, BACKPRESSURE_THRESHOLD,
)
from stability import (
compute_lyapunov, compute_lyapunov_graph, compute_reward,
compute_barrier, normalize_reward, smooth_sla_penalty, compute_drift,
BARRIER_NORM_SCALE,
)
# --- Test harness ---
PASS = "PASS"
FAIL = "FAIL"
results = []
def record(name, status, detail=""):
results.append((name, status, detail))
icon = "+" if status == PASS else "X"
msg = f" [{icon}] {name}"
if detail:
msg += f" -- {detail}"
print(msg)
# ============================================================================
# A. DAG Traffic Routing
# ============================================================================
def test_A_dag_routing():
print("\n=== A. DAG Traffic Routing ===")
sim = ClusterSimulator(n_nodes=5, task_id="task-1", seed=1)
sim.reset(task_id="task-1", seed=1)
# Tick once with NO_OP
class _A: pass
a = _A(); a.action_type = "NO_OP"; a.target_node_id = "node-0"; a.parameter = 0.0
sim.apply_action(a)
sim.tick()
nodes = {n.node_id: n for n in sim._nodes}
# node-0 and node-4 should receive external traffic
n0_in = nodes["node-0"].incoming_request_rate
n4_in = nodes["node-4"].incoming_request_rate
record("node-0 (ingress) receives traffic",
PASS if n0_in > 0 else FAIL,
f"incoming={n0_in:.1f}")
record("node-4 (ingress) receives traffic",
PASS if n4_in > 0 else FAIL,
f"incoming={n4_in:.1f}")
# node-1, node-2 should receive outflow from node-0
n1_in = nodes["node-1"].incoming_request_rate
n2_in = nodes["node-2"].incoming_request_rate
record("node-1 receives from node-0 (DAG child)",
PASS if n1_in > 0 else FAIL,
f"incoming={n1_in:.1f}")
record("node-2 receives from node-0 (DAG child)",
PASS if n2_in > 0 else FAIL,
f"incoming={n2_in:.1f}")
# node-3 should receive outflow from node-2
n3_in = nodes["node-3"].incoming_request_rate
record("node-3 receives from node-2 (DAG grandchild)",
PASS if n3_in > 0 else FAIL,
f"incoming={n3_in:.1f}")
# node-0 outflow should be ~incoming (since capacity >> lambda at start)
record("node-0 has positive outflow_rate",
PASS if nodes["node-0"].outflow_rate > 0 else FAIL,
f"outflow={nodes['node-0'].outflow_rate:.1f}")
# ============================================================================
# B. Task-2 Scripted Failure Flows Through DAG
# ============================================================================
def test_B_task2_dag():
print("\n=== B. Task-2 Failure Through DAG ===")
sim = ClusterSimulator(n_nodes=5, task_id="task-2", seed=42)
sim.reset(task_id="task-2", seed=42)
# Run enough ticks for failure to trigger
class _A: pass
a = _A(); a.action_type = "NO_OP"; a.target_node_id = "node-0"; a.parameter = 0.0
failed_id = None
for _ in range(60):
sim.apply_action(a)
sim.tick()
if sim._failed_node_id and failed_id is None:
failed_id = sim._failed_node_id
record("Scripted failure was assigned",
PASS if failed_id is not None else FAIL,
f"failed_id={failed_id}")
nodes = {n.node_id: n for n in sim._nodes}
failed_node = nodes.get(failed_id)
record("Failed node has FAILED status",
PASS if failed_node and failed_node.status == NodeStatus.FAILED else FAIL,
f"status={failed_node.status if failed_node else 'N/A'}")
# If failed node is a child of node-0 (e.g., node-1 or node-2),
# node-0 should still be sending traffic to it (flow not bypassed)
if failed_id in CLUSTER_TOPOLOGY.get("node-0", []):
# The failed node outflow should be 0 (service_rate=0),
# but it should still have incoming from DAG
record("Failed child still receives DAG traffic (as dropped requests)",
PASS if failed_node.incoming_request_rate >= 0 else FAIL,
f"incoming={failed_node.incoming_request_rate:.1f} dropped={failed_node.dropped_requests:.1f}")
# If failed node was node-2, node-3 should be starved (outflow=0 upstream)
if failed_id == "node-2":
n3 = nodes["node-3"]
record("node-3 starved when parent node-2 fails",
PASS if n3.incoming_request_rate == 0 else FAIL,
f"node-3 incoming={n3.incoming_request_rate:.1f}")
# ============================================================================
# C. Task-3 Surge Overlay on DAG
# ============================================================================
def test_C_task3_surge_dag():
print("\n=== C. Task-3 Surge Overlay ===")
sim = ClusterSimulator(n_nodes=5, task_id="task-3", seed=7)
sim.reset(task_id="task-3", seed=7)
# Force surge window to be active immediately
sim._t3_surge_start = 0
sim._t3_surge_end = 999
class _A: pass
a = _A(); a.action_type = "NO_OP"; a.target_node_id = "node-0"; a.parameter = 0.0
sim.apply_action(a)
sim.tick()
nodes = {n.node_id: n for n in sim._nodes}
# node-1 and node-2 should have surge + DAG traffic
n1_in = nodes["node-1"].incoming_request_rate
n2_in = nodes["node-2"].incoming_request_rate
record("node-1 receives surge + DAG traffic",
PASS if n1_in > T3_SURGE_MAGNITUDE else FAIL,
f"incoming={n1_in:.1f} (surge={T3_SURGE_MAGNITUDE})")
record("node-2 receives surge + DAG traffic",
PASS if n2_in > T3_SURGE_MAGNITUDE else FAIL,
f"incoming={n2_in:.1f} (surge={T3_SURGE_MAGNITUDE})")
# node-0 should still have base DAG traffic (not affected by surge directly)
n0_in = nodes["node-0"].incoming_request_rate
record("node-0 gets base DAG traffic (surge is side-channel)",
PASS if n0_in < T3_SURGE_MAGNITUDE else FAIL,
f"node-0 incoming={n0_in:.1f}")
# ============================================================================
# D. Backpressure Is Temporary
# ============================================================================
def test_D_backpressure_temporary():
print("\n=== D. Backpressure Temporary ===")
sim = ClusterSimulator(n_nodes=5, task_id="task-1", seed=1)
sim.reset(task_id="task-1", seed=1)
node0 = next(n for n in sim._nodes if n.node_id == "node-0")
original_cap = node0.capacity
# Artificially overload node-0's children to trigger backpressure
for n in sim._nodes:
if n.node_id in CLUSTER_TOPOLOGY.get("node-0", []):
n.queue_depth = BACKPRESSURE_THRESHOLD + 100.0 # well above threshold
# Tick: backpressure should reduce node-0's capacity for THIS tick only
class _A: pass
a = _A(); a.action_type = "NO_OP"; a.target_node_id = "node-0"; a.parameter = 0.0
sim.apply_action(a)
sim.tick()
cap_after_tick = node0.capacity
record("node-0 capacity restored after backpressure tick",
PASS if abs(cap_after_tick - original_cap) < 0.01 else FAIL,
f"before={original_cap:.2f} after={cap_after_tick:.2f}")
# Tick again (children still overloaded) — capacity should still be original
sim.apply_action(a)
sim.tick()
cap_after_tick2 = node0.capacity
record("node-0 capacity intact after multiple backpressure ticks",
PASS if abs(cap_after_tick2 - original_cap) < 0.01 else FAIL,
f"after 2 ticks={cap_after_tick2:.2f} original={original_cap:.2f}")
# Clear children overload — capacity should remain original
for n in sim._nodes:
if n.node_id in CLUSTER_TOPOLOGY.get("node-0", []):
n.queue_depth = 0.0
sim.apply_action(a)
sim.tick()
cap_clear = node0.capacity
record("node-0 capacity unchanged after children clear",
PASS if abs(cap_clear - original_cap) < 0.01 else FAIL,
f"capacity={cap_clear:.2f}")
# ============================================================================
# E. Gradual Recovery Completes
# ============================================================================
def test_E_gradual_recovery():
print("\n=== E. Gradual Recovery ===")
sim = ClusterSimulator(n_nodes=5, task_id="task-1", seed=1)
sim.reset(task_id="task-1", seed=1)
node = sim._nodes[2]
node.queue_depth = 250.0
sim._update_statuses()
record("Node becomes FAILED on overload",
PASS if node.status == NodeStatus.FAILED else FAIL,
f"status={node.status}")
record("Capacity drops to 0.5 at failure",
PASS if abs(node.capacity - 0.5) < 0.01 else FAIL,
f"capacity={node.capacity}")
# Tick through full recovery (NODE_RECOVERY_TICKS + some margin)
class _A: pass
a = _A(); a.action_type = "NO_OP"; a.target_node_id = "node-0"; a.parameter = 0.0
for _ in range(NODE_RECOVERY_TICKS + 5):
sim.apply_action(a)
sim.tick()
record("Node reaches HEALTHY after full recovery",
PASS if node.status == NodeStatus.HEALTHY else FAIL,
f"status={node.status}")
# Capacity should have ramped: start=0.5, each recovery tick adds 0.5
# After NODE_RECOVERY_TICKS=20 ticks: 0.5 + 20*0.5 = 10.5, capped at 3.0
record("Capacity recovered to DEFAULT_CAPACITY (capped)",
PASS if abs(node.capacity - DEFAULT_CAPACITY) < 0.01 else FAIL,
f"capacity={node.capacity:.2f} expected={DEFAULT_CAPACITY}")
# ============================================================================
# F. Graph-Bounded Cascades
# ============================================================================
def test_F_graph_cascade():
print("\n=== F. Graph-Bounded Cascades ===")
sim = ClusterSimulator(n_nodes=5, task_id="task-1", seed=1)
sim.reset(task_id="task-1", seed=1)
# Fail node-2, overload its children/parents
node2 = sim._nodes[2]
node2.queue_depth = 250.0
sim._update_statuses() # node-2 becomes FAILED, triggers cascade
# node-3 is child of node-2 — should be at_risk
# node-0 is parent of node-2 — should be at_risk
node3 = sim._nodes[3]
node0 = sim._nodes[0]
# Overload node-3 to trigger cascade
node3.queue_depth = FATAL_FAIL_THRESHOLD * 1.5 # > cascade threshold
sim._cascade_failures()
record("node-3 (child of failed node-2) cascades to DEGRADED",
PASS if node3.status == NodeStatus.DEGRADED else FAIL,
f"node-3 status={node3.status}")
# node-4 is NOT adjacent to node-2 — should NOT cascade
node4 = sim._nodes[4]
node4.queue_depth = FATAL_FAIL_THRESHOLD * 1.5
sim._cascade_failures()
record("node-4 (not adjacent to failed) does NOT cascade",
PASS if node4.status != NodeStatus.DEGRADED else FAIL,
f"node-4 status={node4.status}")
# ============================================================================
# G. Reroute Weights with DAG
# ============================================================================
def test_G_reroute_with_dag():
print("\n=== G. Reroute Weights with DAG ===")
sim = ClusterSimulator(n_nodes=5, task_id="task-1", seed=1)
sim.reset(task_id="task-1", seed=1)
class _A: pass
# Tick once to establish baseline
a = _A(); a.action_type = "NO_OP"; a.target_node_id = "node-0"; a.parameter = 0.0
sim.apply_action(a)
sim.tick()
node0 = next(n for n in sim._nodes if n.node_id == "node-0")
node4 = next(n for n in sim._nodes if n.node_id == "node-4")
baseline_n0_in = node0.incoming_request_rate
# Reroute 100% of node-0 traffic away
a2 = _A(); a2.action_type = "REROUTE_TRAFFIC"; a2.target_node_id = "node-0"; a2.parameter = 1.0
sim.apply_action(a2)
sim.tick()
n0_in_after = node0.incoming_request_rate
record("Reroute reduces node-0 incoming traffic",
PASS if n0_in_after < baseline_n0_in else FAIL,
f"before={baseline_n0_in:.1f} after={n0_in_after:.1f}")
# Verify outflow_rate is also reduced (since incoming is lower)
record("node-0 outflow reduced after reroute",
PASS if node0.outflow_rate < baseline_n0_in else FAIL,
f"outflow={node0.outflow_rate:.1f}")
# Reroute weight should decay each tick (0.5 factor)
w = sim._reroute_weights.get("node-0", 0.0)
record("Reroute weight decays (0.5 * prev)",
PASS if w < 1.0 else FAIL,
f"weight after first decay={w:.3f}")
# ============================================================================
# H. Graph Lyapunov Edge Penalty
# ============================================================================
def test_H_graph_lyapunov():
print("\n=== H. Graph Lyapunov Edge Penalty ===")
nodes_balanced = [
{"node_id": "node-0", "queue_depth": 50.0, "importance_weight": 2.0},
{"node_id": "node-4", "queue_depth": 50.0, "importance_weight": 1.0},
{"node_id": "node-1", "queue_depth": 50.0, "importance_weight": 1.0},
{"node_id": "node-2", "queue_depth": 50.0, "importance_weight": 1.0},
{"node_id": "node-3", "queue_depth": 50.0, "importance_weight": 1.0},
]
nodes_imbalanced = [
{"node_id": "node-0", "queue_depth": 200.0, "importance_weight": 2.0},
{"node_id": "node-4", "queue_depth": 10.0, "importance_weight": 1.0},
{"node_id": "node-1", "queue_depth": 10.0, "importance_weight": 1.0},
{"node_id": "node-2", "queue_depth": 10.0, "importance_weight": 1.0},
{"node_id": "node-3", "queue_depth": 10.0, "importance_weight": 1.0},
]
v_bal = compute_lyapunov_graph(nodes_balanced, CLUSTER_TOPOLOGY)
v_imb = compute_lyapunov_graph(nodes_imbalanced, CLUSTER_TOPOLOGY)
record("Graph Lyapunov: imbalanced > balanced",
PASS if v_imb > v_bal else FAIL,
f"balanced={v_bal:.1f} imbalanced={v_imb:.1f}")
# Compare with flat Lyapunov: graph version should add edge penalty
v_flat_imb = compute_lyapunov(nodes_imbalanced)
record("Graph Lyapunov > flat Lyapunov for imbalanced cluster",
PASS if v_imb > v_flat_imb else FAIL,
f"graph={v_imb:.1f} flat={v_flat_imb:.1f}")
# ============================================================================
# I. Environment Observation Populates Graph Fields
# ============================================================================
def test_I_env_graph_fields():
print("\n=== I. Environment Graph Fields ===")
try:
from server.AntiAtropos_environment import AntiAtroposEnvironment
from models import SREAction, ActionType
except ImportError:
record("Environment import", FAIL, "Cannot import AntiAtroposEnvironment")
return
env = AntiAtroposEnvironment()
obs = env.reset(task_id="task-1", mode="simulated", seed=42)
# Check that NodeObservations have graph fields
n0 = next((n for n in obs.nodes if n.node_id == "node-0"), None)
record("Environment reset succeeds",
PASS if n0 is not None else FAIL, "")
if n0:
record("node-0 has downstream_nodes",
PASS if isinstance(n0.downstream_nodes, list) and len(n0.downstream_nodes) > 0 else FAIL,
f"downstream={n0.downstream_nodes}")
record("node-0 has upstream_nodes",
PASS if isinstance(n0.upstream_nodes, list) else FAIL,
f"upstream={n0.upstream_nodes}")
record("node-0 has upstream_pressure",
PASS if n0.upstream_pressure is not None else FAIL,
f"pressure={n0.upstream_pressure:.3f}")
record("node-0 has outflow_rate",
PASS if n0.outflow_rate is not None else FAIL,
f"outflow={n0.outflow_rate:.3f}")
# Step once with SCALE_UP to verify reward computation
action = SREAction(action_type=ActionType.SCALE_UP, target_node_id="node-0", parameter=0.5)
obs2 = env.step(action)
record("Step reward is non-zero",
PASS if obs2.reward != 0.0 else FAIL,
f"reward={obs2.reward:.4f}")
# Lyapunov can be 0 on first tick if all queues are below capacity.
# Just verify it's a number (not None/NaN).
lyap_ok = obs2.lyapunov_energy is not None and not math.isnan(obs2.lyapunov_energy)
record("lyapunov_energy is valid (>=0, not NaN)",
PASS if lyap_ok else FAIL,
f"energy={obs2.lyapunov_energy}")
# ============================================================================
# J. Reward Components Across Tasks
# ============================================================================
def test_J_reward_components():
print("\n=== J. Reward Components ===")
env_module = None
try:
from server.AntiAtropos_environment import AntiAtroposEnvironment
from models import SREAction, ActionType
except ImportError:
record("Reward components", FAIL, "Cannot import environment")
return
for task_id, warmup_ticks in [("task-1", 30), ("task-2", 5), ("task-3", 5)]:
env = AntiAtroposEnvironment()
env.reset(task_id=task_id, mode="simulated", seed=42)
# Task-3 surge may not be active in early ticks (window depends on
# jitter). Force the surge window to be wide-open for validation.
if task_id == "task-3":
env._sim._t3_surge_start = 1
env._sim._t3_surge_end = 999
# Run enough ticks for queues to accumulate and drift to become non-zero.
# Task-1 has a slow ramp (0.5/tick) and needs ~15+ ticks to exceed ingress
# capacity of 90 req/tick (starting from ~86).
action = SREAction(action_type=ActionType.NO_OP, target_node_id="node-0", parameter=0.0)
obs = None
for _ in range(warmup_ticks):
obs = env.step(action)
# All sub-components should be present
has_drift = obs.reward_drift != 0.0
has_cost = obs.reward_cost != 0.0
record(f"{task_id}: reward_drift non-zero (after {warmup_ticks} ticks)",
PASS if has_drift else FAIL,
f"drift={obs.reward_drift:.4f}")
record(f"{task_id}: reward_cost non-zero",
PASS if has_cost else FAIL,
f"cost={obs.reward_cost:.4f}")
record(f"{task_id}: no NaN in raw_reward",
PASS if not math.isnan(obs.raw_reward) else FAIL,
f"raw={obs.raw_reward:.4f}")
# Quick check: drift and cost should be negative (penalties)
env = AntiAtroposEnvironment()
env.reset(task_id="task-1", mode="simulated", seed=42)
action = SREAction(action_type=ActionType.NO_OP, target_node_id="node-0", parameter=0.0)
obs = env.step(action)
record("reward_drift <= 0 (penalty, not reward)",
PASS if obs.reward_drift <= 0 else FAIL,
f"drift={obs.reward_drift:.4f}")
# ============================================================================
# Main
# ============================================================================
def main():
print("=" * 65)
print("AntiAtropos DAG Physics Validation")
print("=" * 65)
test_A_dag_routing()
test_B_task2_dag()
test_C_task3_surge_dag()
test_D_backpressure_temporary()
test_E_gradual_recovery()
test_F_graph_cascade()
test_G_reroute_with_dag()
test_H_graph_lyapunov()
test_I_env_graph_fields()
test_J_reward_components()
passed = sum(1 for _, s, _ in results if s == PASS)
failed = sum(1 for _, s, _ in results if s == FAIL)
total = len(results)
print("\n" + "=" * 65)
print(f"RESULTS: {passed}/{total} passed, {failed} failed")
print("=" * 65)
if failed > 0:
print("\nFAILED TESTS:")
for name, status, detail in results:
if status == FAIL:
print(f" X {name}: {detail}")
return 0 if failed == 0 else 1
if __name__ == "__main__":
sys.exit(main())