PranavKK1201 commited on
Commit
b5e5650
·
1 Parent(s): 9db539d

graph modelling

Browse files
fix_colab_mcp.py DELETED
@@ -1,48 +0,0 @@
1
- import re
2
-
3
- path = r'C:\Users\kesha\AppData\Local\uv\cache\archive-v0\XqdAvZFy3eRi9W25WFXDP\Lib\site-packages\colab_mcp\session.py'
4
- with open(path, 'r') as f:
5
- content = f.read()
6
-
7
- # The corrupted version from the failed PowerShell replacement
8
- old = '''async def check_session_proxy_tool_fn(random_string: str = " \\, ctx: Context = CurrentContext()) -> bool:
9
- \\\\\\Opens a connection to a Google Colab browser session.
10
-
11
- Args:
12
- random_string: A dummy parameter required by some MCP clients for
13
- tools with no real arguments. This value is ignored.
14
- \\\\\\
15
- fe_connected'''
16
-
17
- new = '''async def check_session_proxy_tool_fn(random_string: str = "", ctx: Context = CurrentContext()) -> bool:
18
- """Opens a connection to a Google Colab browser session.
19
-
20
- Args:
21
- random_string: A dummy parameter required by some MCP clients for
22
- tools with no real arguments. This value is ignored.
23
- """
24
- fe_connected'''
25
-
26
- if old in content:
27
- content = content.replace(old, new)
28
- with open(path, 'w') as f:
29
- f.write(content)
30
- print('Fixed corrupted version successfully.')
31
- else:
32
- print('Old pattern not found. Checking current state...')
33
- # Check if the file still has the original version or corrupted version
34
- if 'async def check_session_proxy_tool_fn(ctx: Context = CurrentContext()) -> bool:' in content:
35
- print('Found original version, applying fix...')
36
- old2 = 'async def check_session_proxy_tool_fn(ctx: Context = CurrentContext()) -> bool:\n fe_connected'
37
- new2 = 'async def check_session_proxy_tool_fn(random_string: str = "", ctx: Context = CurrentContext()) -> bool:\n """Opens a connection to a Google Colab browser session.\n\n Args:\n random_string: A dummy parameter required by some MCP clients for\n tools with no real arguments. This value is ignored.\n """\n fe_connected'
38
- content = content.replace(old2, new2)
39
- with open(path, 'w') as f:
40
- f.write(content)
41
- print('Fixed original version successfully.')
42
- else:
43
- # Print the current function for debugging
44
- lines = content.split('\n')
45
- for i, line in enumerate(lines):
46
- if 'check_session_proxy_tool_fn' in line:
47
- print(f'Line {i}: {line}')
48
- print('Could not find expected pattern.')
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
models.py CHANGED
@@ -111,6 +111,21 @@ class NodeObservation(BaseModel):
111
  description="How close this node is to SLA violation (0=safe, 1=violating).",
112
  )
113
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
114
  node_reward: float = Field(
115
  default=0.0,
116
  description="Per-node reward contribution for credit assignment.",
 
111
  description="How close this node is to SLA violation (0=safe, 1=violating).",
112
  )
113
 
114
+ outflow_rate: float = Field(
115
+ default=0.0,
116
+ ge=0.0,
117
+ le=1.0,
118
+ description="Normalised rate of requests dispatched downstream [0, 1].",
119
+ )
120
+ upstream_nodes: list[str] = Field(default_factory=list)
121
+ downstream_nodes: list[str] = Field(default_factory=list)
122
+ upstream_pressure: float = Field(
123
+ default=0.0,
124
+ ge=0.0,
125
+ le=1.0,
126
+ description="Mean queue depth of upstream parent nodes (normalised).",
127
+ )
128
+
129
  node_reward: float = Field(
130
  default=0.0,
131
  description="Per-node reward contribution for credit assignment.",
server/AntiAtropos_environment.py CHANGED
@@ -9,9 +9,10 @@ from openenv.core.env_server.types import State
9
 
10
  try:
11
  from ..models import SREAction, ClusterObservation, NodeObservation, NodeStatus, EnvironmentMode
12
- from ..simulator import ClusterSimulator, COST_PER_CAPACITY_UNIT_PER_HOUR
13
  from ..stability import (
14
  compute_lyapunov,
 
15
  compute_reward,
16
  compute_barrier,
17
  normalize_reward,
@@ -24,9 +25,10 @@ try:
24
  from ..control import KubernetesExecutor, ActionValidator
25
  except ImportError:
26
  from models import SREAction, ClusterObservation, NodeObservation, NodeStatus, EnvironmentMode # type: ignore[no-redef]
27
- from simulator import ClusterSimulator, COST_PER_CAPACITY_UNIT_PER_HOUR # type: ignore[no-redef]
28
  from stability import ( # type: ignore[no-redef]
29
  compute_lyapunov,
 
30
  compute_reward,
31
  compute_barrier,
32
  normalize_reward,
@@ -161,7 +163,7 @@ class AntiAtroposEnvironment(Environment):
161
 
162
  self._nodes_true = self._sim.state(for_agent=False)
163
  self._nodes_obs = self._sim.state(for_agent=True)
164
- self._prev_lyapunov = compute_lyapunov(self._nodes_true)
165
 
166
  return self._build_observation()
167
 
@@ -249,7 +251,7 @@ class AntiAtroposEnvironment(Environment):
249
  self._sla_violations += 1
250
 
251
  # 6. Compute Lyapunov stability metrics from Ground Truth
252
- current_lyapunov = compute_lyapunov(self._nodes_true)
253
 
254
  # 7. Compute scalar reward (with barrier function)
255
  cost = self._compute_cost(self._nodes_true)
@@ -438,6 +440,22 @@ class AntiAtroposEnvironment(Environment):
438
  node_latency_norm = min(1.0, max(0.0, float(n["latency_ms"]) / MAX_LATENCY_NORM))
439
  sla_prox = max(0.0, min(1.0, node_latency_norm / 0.20)) # 0.20 is SLA threshold
440
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
441
  node_obs.append(NodeObservation(
442
  node_id=n["node_id"],
443
  status=n["status"],
@@ -451,6 +469,10 @@ class AntiAtroposEnvironment(Environment):
451
  pending_capacity=float(n.get("pending_capacity_units", 0)) / 5.0,
452
  queue_delta=queue_delta,
453
  sla_proximity=sla_prox,
 
 
 
 
454
  node_reward=node_reward_val,
455
  done=False,
456
  reward=0.0,
 
9
 
10
  try:
11
  from ..models import SREAction, ClusterObservation, NodeObservation, NodeStatus, EnvironmentMode
12
+ from ..simulator import ClusterSimulator, COST_PER_CAPACITY_UNIT_PER_HOUR, CLUSTER_TOPOLOGY
13
  from ..stability import (
14
  compute_lyapunov,
15
+ compute_lyapunov_graph,
16
  compute_reward,
17
  compute_barrier,
18
  normalize_reward,
 
25
  from ..control import KubernetesExecutor, ActionValidator
26
  except ImportError:
27
  from models import SREAction, ClusterObservation, NodeObservation, NodeStatus, EnvironmentMode # type: ignore[no-redef]
28
+ from simulator import ClusterSimulator, COST_PER_CAPACITY_UNIT_PER_HOUR, CLUSTER_TOPOLOGY # type: ignore[no-redef]
29
  from stability import ( # type: ignore[no-redef]
30
  compute_lyapunov,
31
+ compute_lyapunov_graph,
32
  compute_reward,
33
  compute_barrier,
34
  normalize_reward,
 
163
 
164
  self._nodes_true = self._sim.state(for_agent=False)
165
  self._nodes_obs = self._sim.state(for_agent=True)
166
+ self._prev_lyapunov = compute_lyapunov_graph(self._nodes_true, CLUSTER_TOPOLOGY)
167
 
168
  return self._build_observation()
169
 
 
251
  self._sla_violations += 1
252
 
253
  # 6. Compute Lyapunov stability metrics from Ground Truth
254
+ current_lyapunov = compute_lyapunov_graph(self._nodes_true, CLUSTER_TOPOLOGY)
255
 
256
  # 7. Compute scalar reward (with barrier function)
257
  cost = self._compute_cost(self._nodes_true)
 
440
  node_latency_norm = min(1.0, max(0.0, float(n["latency_ms"]) / MAX_LATENCY_NORM))
441
  sla_prox = max(0.0, min(1.0, node_latency_norm / 0.20)) # 0.20 is SLA threshold
442
 
443
+ # Topology context for each node
444
+ node_upstreams = [
445
+ pid for pid, children in CLUSTER_TOPOLOGY.items()
446
+ if n["node_id"] in children
447
+ ]
448
+ node_downstreams = CLUSTER_TOPOLOGY.get(n["node_id"], [])
449
+
450
+ # Upstream pressure: mean normalised queue depth of parent nodes.
451
+ # Clamp to [0,1] because raw queues can transiently exceed MAX_QUEUE_NORM
452
+ # (e.g. during surge events), producing normalised values > 1.
453
+ parent_queues = [
454
+ min(1.0, max(0.0, float(prev_by_id.get(pid, {}).get("queue_depth", 0)) / MAX_QUEUE_NORM))
455
+ for pid in node_upstreams
456
+ ]
457
+ upstream_pressure = sum(parent_queues) / len(parent_queues) if parent_queues else 0.0
458
+
459
  node_obs.append(NodeObservation(
460
  node_id=n["node_id"],
461
  status=n["status"],
 
469
  pending_capacity=float(n.get("pending_capacity_units", 0)) / 5.0,
470
  queue_delta=queue_delta,
471
  sla_proximity=sla_prox,
472
+ outflow_rate=min(1.0, float(n.get("outflow_rate", 0.0)) / MAX_REQUEST_RATE_NORM),
473
+ upstream_nodes=node_upstreams,
474
+ downstream_nodes=node_downstreams,
475
+ upstream_pressure=upstream_pressure,
476
  node_reward=node_reward_val,
477
  done=False,
478
  reward=0.0,
simulator.py CHANGED
@@ -33,6 +33,8 @@ FATAL_FAIL_THRESHOLD: int = 200 # Hard cap on queue depth (catastrophic fa
33
  CASCADE_WINDOW_TICKS: int = 3 # Ticks after a failure to check for cascade effects
34
  CASCADE_QUEUE_MULTIPLIER: float = 1.2 # Queue must exceed FATAL_FAIL_THRESHOLD * this to cascade
35
  NODE_RECOVERY_TICKS: int = 20 # Ticks before a FAILED node auto-recovers
 
 
36
 
37
  SENSOR_DROPOUT_PROB: float = 0.05 # P(node.queue, latency reports 0 or -1.0)
38
  NODE_FAILURE_PROB: float = 0.00 # P(node fails naturally) — largely driven by task profile
@@ -41,13 +43,13 @@ NODE_FAILURE_PROB: float = 0.00 # P(node fails naturally) — largely driv
41
  COST_PER_CAPACITY_UNIT_PER_HOUR: float = 0.05
42
 
43
  # Task Profiles (Domain Randomization)
44
- # Task 1: Start very near capacity so reward/state react earlier.
45
- # Default μ_total = 10 nodes × 3 capacity × 15 = 450 req/tick.
46
- # λ_initial randomized close to saturation to avoid long flat early phases.
47
- T1_INITIAL_LAMBDA: float = 390.0
48
- T1_RAMP_SLOPE: float = 2.0 # +2 req per tick globally (doubled for 10 nodes)
49
- # Task 2: lambda 460 means 46/node (102% util) creates dynamic queue pressure for RL signal.
50
- T2_INITIAL_LAMBDA: float = 460.0
51
  T2_FAIL_TICK: int = 20
52
  T3_INITIAL_LAMBDA: float = 60.0
53
 
@@ -71,6 +73,41 @@ VIP_NODE_WEIGHTS: dict[str, float] = {
71
  "node-0": 2.0,
72
  }
73
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
74
 
75
  class NodeStatus(str, Enum):
76
  HEALTHY = "HEALTHY"
@@ -98,6 +135,7 @@ class NodeState:
98
  pending_capacity_queue: list[int] = field(default_factory=list)
99
  recovery_timer: int = 0 # Countdown to auto-recovery from FAILED status
100
  is_scripted_failure: bool = False # True if failed due to task scripting (no auto-recovery)
 
101
 
102
  # Derived (recomputed whenever capacity or status changes)
103
  @property
@@ -124,6 +162,7 @@ class NodeState:
124
  "pending_capacity_units": int(len(self.pending_capacity_queue)),
125
  "recovery_timer": self.recovery_timer,
126
  "is_scripted_failure": self.is_scripted_failure,
 
127
  }
128
 
129
 
@@ -166,15 +205,22 @@ class ClusterSimulator:
166
  def _randomize_domain(self) -> None:
167
  """Apply domain randomization for RL robustness across tasks."""
168
  self._t1_ramp_slope = self._rng.uniform(0.8, 2.0)
169
- # Task 1: start between 92–99 % of default cluster capacity so
170
- # the system is responsive early (less flat reward plateaus).
171
- default_mu_total = self._n_nodes * DEFAULT_CAPACITY * 15.0 # 225
 
 
 
 
172
  self._t1_init_lambda = self._rng.uniform(
173
- default_mu_total * 0.92, default_mu_total * 0.99
174
  )
175
  self._t2_fail_tick = self._rng.randint(10, 40)
176
- # Task 2: guarantee immediate overload (46/node vs 45 capacity)
177
- self._t2_init_lambda = self._rng.uniform(455.0, 475.0)
 
 
 
178
  # Task 3: jitter the surge window so the LLM can't memorise it.
179
  jitter = self._rng.randint(-T3_SURGE_JITTER, T3_SURGE_JITTER)
180
  self._t3_surge_start = T3_SURGE_BASE_START + jitter
@@ -275,21 +321,27 @@ class ClusterSimulator:
275
  self._tick_count += 1
276
  self._update_capacity()
277
  self._inject_traffic()
 
 
 
 
 
278
  # Reset per-tick shed counters before physics update
279
  for node in self._nodes:
280
  node.dropped_requests = 0.0
281
  self._update_queues()
 
 
 
282
  self._update_derived_metrics()
283
  self._update_statuses()
284
  self._cascade_failures()
285
  self._process_recovery()
286
- # Decay shed fractions gradually (retain 80% per tick = slow decay)
287
- # The agent must still re-issue to maintain full effect, but the
288
- # effect doesn't vanish instantly. *= 0.8 means after 3 ticks
289
- # the shed is still at 51% (0.8^3), vs old 0.0 after 1 tick.
290
  for node in self._nodes:
291
- node.shed_fraction *= 0.8
292
- if node.shed_fraction < 0.01:
293
  node.shed_fraction = 0.0
294
 
295
  def _update_capacity(self) -> None:
@@ -304,62 +356,106 @@ class ClusterSimulator:
304
  node.pending_capacity_queue = [delay for delay in node.pending_capacity_queue if delay > 0]
305
 
306
  def _inject_traffic(self) -> None:
307
- """Determine λ_i per node based on task and routing state."""
308
- total_lambda = 0.0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
309
 
310
  if self._task_id == "task-1":
311
- # Task 1: Linear Ramp — starts near cluster capacity
312
  total_lambda = self._t1_init_lambda + (self._t1_ramp_slope * self._tick_count)
313
 
314
  elif self._task_id == "task-2":
315
- # Task 2: Fault Tolerance
316
  total_lambda = self._t2_init_lambda
 
317
  if self._tick_count >= self._t2_fail_tick and not self._failed_node_id:
318
  self._failed_node_id = self._rng.choice(
319
  [n.node_id for n in self._nodes if n.node_id != "node-0"]
320
  )
321
- # Mark the chosen node as a scripted (permanent) failure
322
  target = next((n for n in self._nodes if n.node_id == self._failed_node_id), None)
323
  if target:
324
  target.is_scripted_failure = True
325
-
326
- # Physics change: In Task 2, we do NOT redistribute dead node traffic
327
- # automatically. The infrastructure keeps sending λ/N to the failed node
328
- # (causing errors) until the agent chooses REROUTE_TRAFFIC or SCALE_UP.
329
- base_share = total_lambda / self._n_nodes
330
- for n in self._nodes:
331
- if n.node_id == self._failed_node_id:
332
- n.status = NodeStatus.FAILED
333
- # If the agent hasn't rerouted traffic away, it still hits the failed node
334
- n.incoming_request_rate = base_share
335
- else:
336
- n.incoming_request_rate = base_share
337
-
338
- # This is where the agent's actions (REROUTE_TRAFFIC) physically
339
- # move the share from the failed node to the survivors.
340
- self._apply_reroute_weights()
341
- return
342
 
343
  elif self._task_id == "task-3":
344
- # Task 3: Periodic surge — window is jittered per episode
345
  total_lambda = T3_INITIAL_LAMBDA
346
  phase = self._tick_count % T3_SURGE_CYCLE
347
  if self._t3_surge_start <= phase <= self._t3_surge_end:
348
- surge = T3_SURGE_MAGNITUDE
349
- for n in self._nodes:
350
- if n.node_id in ["node-1", "node-2"]:
351
- n.incoming_request_rate = (total_lambda / self._n_nodes) + surge
352
- else:
353
- n.incoming_request_rate = total_lambda / self._n_nodes
354
- return
355
-
356
- # --- Default: distribute traffic evenly, then apply rerouting ---
357
- base_share = total_lambda / self._n_nodes
358
- for n in self._nodes:
359
- n.incoming_request_rate = base_share
360
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
361
  self._apply_reroute_weights()
362
 
 
 
 
 
 
 
363
  def _apply_reroute_weights(self) -> None:
364
  """
365
  Apply REROUTE_TRAFFIC adjustments.
@@ -414,13 +510,31 @@ class ClusterSimulator:
414
  n.incoming_request_rate += share
415
 
416
  # Decay weights — agent must keep re-issuing to maintain effect
417
- # *= 0.8 retains 80% per tick (slow decay, persistent effect).
418
- # After 5 ticks without re-issue, effect is at 33% (0.8^5).
419
  for nid in list(self._reroute_weights.keys()):
420
- self._reroute_weights[nid] *= 0.8
421
- if self._reroute_weights[nid] < 0.01:
422
  del self._reroute_weights[nid]
423
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
424
  def _update_queues(self) -> None:
425
  """
426
  Fluid-queue update for all nodes.
@@ -484,6 +598,7 @@ class ClusterSimulator:
484
  if n.status != NodeStatus.FAILED:
485
  n.status = NodeStatus.FAILED
486
  n.recovery_timer = NODE_RECOVERY_TICKS
 
487
  self._cascade_triggered = True # Signal cascade detection
488
  elif n.queue_depth > OVERLOAD_THRESHOLD:
489
  n.status = NodeStatus.DEGRADED
@@ -496,7 +611,7 @@ class ClusterSimulator:
496
 
497
  Guardrails:
498
  - Only triggers when a NEW failure occurred this tick (not any failed node).
499
- - Max one cascade step per failure event (no cascade chains).
500
  - Scripted failures (Task 2) do not trigger cascades.
501
  """
502
  if not self._cascade_triggered:
@@ -509,18 +624,27 @@ class ClusterSimulator:
509
  self._cascade_tick = 0
510
  return
511
 
 
 
 
 
 
 
 
 
 
 
 
 
 
512
  cascade_threshold = FATAL_FAIL_THRESHOLD * CASCADE_QUEUE_MULTIPLIER
513
- cascaded_this_tick = 0
514
  for n in self._nodes:
515
- if cascaded_this_tick >= 1:
516
- break # Max one cascade per window to prevent chain reactions
517
- if n.status == NodeStatus.FAILED:
518
- continue
519
- if n.is_scripted_failure:
520
  continue
521
  if n.queue_depth > cascade_threshold:
522
  n.status = NodeStatus.DEGRADED
523
- cascaded_this_tick += 1
524
 
525
  def _process_recovery(self) -> None:
526
  """Count down recovery timers and bring FAILED nodes back online.
@@ -528,17 +652,26 @@ class ClusterSimulator:
528
  Only overload-failed nodes (recovery_timer > 0) can recover.
529
  Scripted failures (is_scripted_failure=True) are excluded.
530
  """
 
 
531
  for n in self._nodes:
532
  if n.is_scripted_failure:
533
  continue
534
- if n.status == NodeStatus.FAILED and n.recovery_timer > 0:
 
 
 
535
  n.recovery_timer -= 1
536
  if n.recovery_timer <= 0:
537
  n.status = NodeStatus.HEALTHY
538
- n.capacity = 1.0 # Recover at minimum capacity
539
  n.queue_depth = 0.0
540
  n.latency_ms = BASE_LATENCY_MS
541
  n.cpu_utilization = 0.0
 
 
 
 
 
542
 
543
  def reconcile_state(self, telemetry_map: dict) -> None:
544
  """
 
33
  CASCADE_WINDOW_TICKS: int = 3 # Ticks after a failure to check for cascade effects
34
  CASCADE_QUEUE_MULTIPLIER: float = 1.2 # Queue must exceed FATAL_FAIL_THRESHOLD * this to cascade
35
  NODE_RECOVERY_TICKS: int = 20 # Ticks before a FAILED node auto-recovers
36
+ BACKPRESSURE_THRESHOLD: float = 60.0 # Queue depth that triggers backpressure
37
+ BACKPRESSURE_MAX_FACTOR: float = 0.4 # Maximum service rate reduction (40%)
38
 
39
  SENSOR_DROPOUT_PROB: float = 0.05 # P(node.queue, latency reports 0 or -1.0)
40
  NODE_FAILURE_PROB: float = 0.00 # P(node fails naturally) — largely driven by task profile
 
43
  COST_PER_CAPACITY_UNIT_PER_HOUR: float = 0.05
44
 
45
  # Task Profiles (Domain Randomization)
46
+ # Task 1: Start at 92-99% of ingress capacity (randomised in _randomize_domain).
47
+ # DAG ingress capacity = 2 ingress nodes * DEFAULT_CAPACITY * 15 = 90 req/tick.
48
+ # lambda_init 83-89 so each ingress node sees ~41-44 req/tick (just under 45 capacity).
49
+ T1_INITIAL_LAMBDA: float = 86.0 # midpoint of [82.8, 89.1]; overridden by _randomize_domain
50
+ T1_RAMP_SLOPE: float = 0.5 # +0.5 req/tick globally per tick
51
+ # Task 2: lambda at 100-110% of ingress capacityguarantees immediate ingress overload.
52
+ T2_INITIAL_LAMBDA: float = 95.0 # midpoint of [90, 99]; overridden by _randomize_domain
53
  T2_FAIL_TICK: int = 20
54
  T3_INITIAL_LAMBDA: float = 60.0
55
 
 
73
  "node-0": 2.0,
74
  }
75
 
76
+ # ---------------------------------------------------------------------------
77
+ # Graph Topology (DAG — fixed 5-node cluster architecture)
78
+ # ---------------------------------------------------------------------------
79
+
80
+ # Directed edges: parent -> list of direct children.
81
+ # node-0 (payments/VIP) is the primary ingress; node-4 (auth) is independent.
82
+ CLUSTER_TOPOLOGY: dict[str, list[str]] = {
83
+ "node-0": ["node-1", "node-2"],
84
+ "node-1": [],
85
+ "node-2": ["node-3"],
86
+ "node-3": [],
87
+ "node-4": [],
88
+ }
89
+
90
+ # Nodes that receive raw external traffic directly.
91
+ EXTERNAL_TRAFFIC_NODES: set[str] = {"node-0", "node-4"}
92
+
93
+ # 50/50 external λ split between the two ingress nodes.
94
+ # node-0 (payments/VIP) and node-4 (auth) each receive half of total_lambda.
95
+ # total_lambda is the cluster-wide external arrival rate (req/tick).
96
+ # Each ingress node therefore sees total_lambda * 0.5 req/tick at its input.
97
+ EXTERNAL_LAMBDA_FRACTION: float = 0.5
98
+
99
+ # Default upstream-to-downstream routing weights (parent → child splits).
100
+ # These represent the baseline traffic split before agent rerouting.
101
+ DEFAULT_ROUTING_SPLIT: dict[str, dict[str, float]] = {
102
+ "node-0": {"node-1": 0.5, "node-2": 0.5},
103
+ "node-2": {"node-3": 1.0},
104
+ }
105
+
106
+ # Pre-computed topological order (Kahn's BFS on CLUSTER_TOPOLOGY).
107
+ # Ensures parents are always processed before their children in _inject_traffic().
108
+ # Order: node-0, node-4 (roots) → node-1, node-2 (node-0 children) → node-3 (node-2 child).
109
+ _TOPOLOGICAL_ORDER: tuple[str, ...] = ("node-0", "node-4", "node-1", "node-2", "node-3")
110
+
111
 
112
  class NodeStatus(str, Enum):
113
  HEALTHY = "HEALTHY"
 
135
  pending_capacity_queue: list[int] = field(default_factory=list)
136
  recovery_timer: int = 0 # Countdown to auto-recovery from FAILED status
137
  is_scripted_failure: bool = False # True if failed due to task scripting (no auto-recovery)
138
+ outflow_rate: float = 0.0 # Requests/tick actually dispatched downstream (DAG edge signal)
139
 
140
  # Derived (recomputed whenever capacity or status changes)
141
  @property
 
162
  "pending_capacity_units": int(len(self.pending_capacity_queue)),
163
  "recovery_timer": self.recovery_timer,
164
  "is_scripted_failure": self.is_scripted_failure,
165
+ "outflow_rate": round(self.outflow_rate, 2),
166
  }
167
 
168
 
 
205
  def _randomize_domain(self) -> None:
206
  """Apply domain randomization for RL robustness across tasks."""
207
  self._t1_ramp_slope = self._rng.uniform(0.8, 2.0)
208
+ # DAG calibration: total_lambda is split across 2 ingress nodes (node-0, node-4).
209
+ # Each ingress node's capacity is DEFAULT_CAPACITY * 15 req/tick.
210
+ # Ingress cluster capacity = len(EXTERNAL_TRAFFIC_NODES) * DEFAULT_CAPACITY * 15 = 90.
211
+ # Task 1: start between 92-99% of ingress capacity so the ingress nodes are
212
+ # near saturation immediately, producing rich early reward signal.
213
+ n_ingress = len(EXTERNAL_TRAFFIC_NODES) # 2
214
+ ingress_mu_total = n_ingress * DEFAULT_CAPACITY * 15.0 # 90 req/tick
215
  self._t1_init_lambda = self._rng.uniform(
216
+ ingress_mu_total * 0.92, ingress_mu_total * 0.99
217
  )
218
  self._t2_fail_tick = self._rng.randint(10, 40)
219
+ # Task 2: guarantee immediate ingress overload (slightly above ingress saturation).
220
+ # Each ingress node sees total_lambda/2; target ~102% of individual ingress capacity.
221
+ self._t2_init_lambda = self._rng.uniform(
222
+ ingress_mu_total * 1.00, ingress_mu_total * 1.10
223
+ )
224
  # Task 3: jitter the surge window so the LLM can't memorise it.
225
  jitter = self._rng.randint(-T3_SURGE_JITTER, T3_SURGE_JITTER)
226
  self._t3_surge_start = T3_SURGE_BASE_START + jitter
 
321
  self._tick_count += 1
322
  self._update_capacity()
323
  self._inject_traffic()
324
+ # Save original capacities; backpressure temporarily reduces service_rate
325
+ # for this tick only. Restore after _update_queues so the reduction does
326
+ # not compound across ticks and permanently cripple parent nodes.
327
+ saved_capacities = {n.node_id: n.capacity for n in self._nodes}
328
+ self._apply_backpressure()
329
  # Reset per-tick shed counters before physics update
330
  for node in self._nodes:
331
  node.dropped_requests = 0.0
332
  self._update_queues()
333
+ # Restore capacities so the next tick starts from the true provisioned level
334
+ for n in self._nodes:
335
+ n.capacity = saved_capacities.get(n.node_id, n.capacity)
336
  self._update_derived_metrics()
337
  self._update_statuses()
338
  self._cascade_failures()
339
  self._process_recovery()
340
+ # Decay shed fractions gradually
341
+ # *= 0.5 retains 50% per tick (fast decay).
 
 
342
  for node in self._nodes:
343
+ node.shed_fraction *= 0.5
344
+ if node.shed_fraction < 0.05:
345
  node.shed_fraction = 0.0
346
 
347
  def _update_capacity(self) -> None:
 
356
  node.pending_capacity_queue = [delay for delay in node.pending_capacity_queue if delay > 0]
357
 
358
  def _inject_traffic(self) -> None:
359
+ """
360
+ Distribute traffic through the cluster DAG in three phases.
361
+
362
+ Phase 1 — Task lambda + scripted events:
363
+ Compute total external λ for this tick and apply any task-specific
364
+ mutations (node failure scripting, surge flags). No early returns;
365
+ all branches fall through to the shared DAG in Phase 2.
366
+
367
+ Phase 2 — Topological DAG distribution:
368
+ Traverse _TOPOLOGICAL_ORDER (roots first). Each parent's
369
+ processed outflow (min(incoming, service_rate)) is split across
370
+ its children via DEFAULT_ROUTING_SPLIT. A FAILED node has
371
+ service_rate=0, so outflow=0 and its children are naturally
372
+ starved — this is the causal failure chain the RL agent must
373
+ learn to route around.
374
+
375
+ Phase 3 — Reroute weight correction:
376
+ Apply REROUTE_TRAFFIC weight adjustments post-DAG, then decay
377
+ weights. Keeps reroute semantics identical to pre-DAG behaviour.
378
+ """
379
+ # -------------------------------------------------------------------
380
+ # Phase 1: task-specific lambda + scripted events (no early returns)
381
+ # -------------------------------------------------------------------
382
+ total_lambda: float = 0.0
383
+ # direct_injections: extra traffic added directly to a node ON TOP OF
384
+ # the DAG distribution. Used for Task-3 surge bursts that model a
385
+ # side-channel load source (e.g. bulk import hitting checkout/catalog
386
+ # directly), while the base λ still travels through node-0 as ingress.
387
+ direct_injections: dict[str, float] = {}
388
 
389
  if self._task_id == "task-1":
390
+ # Linear ramp — starts near cluster capacity
391
  total_lambda = self._t1_init_lambda + (self._t1_ramp_slope * self._tick_count)
392
 
393
  elif self._task_id == "task-2":
 
394
  total_lambda = self._t2_init_lambda
395
+ # Scripted node failure fires at the configured tick
396
  if self._tick_count >= self._t2_fail_tick and not self._failed_node_id:
397
  self._failed_node_id = self._rng.choice(
398
  [n.node_id for n in self._nodes if n.node_id != "node-0"]
399
  )
 
400
  target = next((n for n in self._nodes if n.node_id == self._failed_node_id), None)
401
  if target:
402
  target.is_scripted_failure = True
403
+ # No early return: DAG distributes traffic to the failed node normally.
404
+ # The dead node's service_rate=0 means outflow=0, so its children are
405
+ # starved. _update_queues() converts all its incoming traffic to
406
+ # dropped_requests. The agent must issue REROUTE_TRAFFIC to shift
407
+ # the parent's split away from the dead child.
 
 
 
 
 
 
 
 
 
 
 
 
408
 
409
  elif self._task_id == "task-3":
 
410
  total_lambda = T3_INITIAL_LAMBDA
411
  phase = self._tick_count % T3_SURGE_CYCLE
412
  if self._t3_surge_start <= phase <= self._t3_surge_end:
413
+ # Surge is modelled as a direct external burst arriving at the
414
+ # checkout (node-1) and catalog (node-2) services from a side
415
+ # channel that bypasses the payment gateway ingress.
416
+ # Base λ still routes through the DAG; the surge is overlaid so
417
+ # CRITICAL_NODE protections (no SHED_LOAD on node-1/2) still apply.
418
+ for nid in ["node-1", "node-2"]:
419
+ direct_injections[nid] = T3_SURGE_MAGNITUDE
420
+
421
+ # -------------------------------------------------------------------
422
+ # Phase 2: DAG topological distribution
423
+ # -------------------------------------------------------------------
424
+ node_incoming: dict[str, float] = {n.node_id: 0.0 for n in self._nodes}
425
+ node_map: dict[str, "NodeState"] = {n.node_id: n for n in self._nodes}
426
+
427
+ # Seed ingress nodes with their share of external λ
428
+ node_incoming["node-0"] = total_lambda * EXTERNAL_LAMBDA_FRACTION
429
+ node_incoming["node-4"] = total_lambda * (1.0 - EXTERNAL_LAMBDA_FRACTION)
430
+
431
+ # Overlay task-specific direct injections (Task-3 surge)
432
+ for nid, extra in direct_injections.items():
433
+ node_incoming[nid] = node_incoming.get(nid, 0.0) + extra
434
+
435
+ # Propagate outflow through the graph in topological order
436
+ for parent_id in _TOPOLOGICAL_ORDER:
437
+ parent = node_map.get(parent_id)
438
+ if parent is None:
439
+ continue
440
+ parent.incoming_request_rate = node_incoming[parent_id]
441
+ # Outflow = requests the parent actually forwards downstream.
442
+ # FAILED nodes have service_rate=0 → outflow=0 → children starved.
443
+ outflow = min(parent.incoming_request_rate, parent.service_rate)
444
+ parent.outflow_rate = outflow
445
+ for child_id, split in DEFAULT_ROUTING_SPLIT.get(parent_id, {}).items():
446
+ node_incoming[child_id] = node_incoming.get(child_id, 0.0) + outflow * split
447
+
448
+ # -------------------------------------------------------------------
449
+ # Phase 3: REROUTE_TRAFFIC weight corrections (post-DAG)
450
+ # -------------------------------------------------------------------
451
  self._apply_reroute_weights()
452
 
453
+ # Recalculate outflow after reroute so the agent sees accurate
454
+ # per-node dispatch rates. Without this, a node whose incoming was
455
+ # halved by reroute would still report its pre-reroute outflow.
456
+ for n in self._nodes:
457
+ n.outflow_rate = min(n.incoming_request_rate, n.service_rate)
458
+
459
  def _apply_reroute_weights(self) -> None:
460
  """
461
  Apply REROUTE_TRAFFIC adjustments.
 
510
  n.incoming_request_rate += share
511
 
512
  # Decay weights — agent must keep re-issuing to maintain effect
513
+ # *= 0.5 retains 50% per tick (fast decay).
 
514
  for nid in list(self._reroute_weights.keys()):
515
+ self._reroute_weights[nid] *= 0.5
516
+ if self._reroute_weights[nid] < 0.05:
517
  del self._reroute_weights[nid]
518
 
519
+ def _apply_backpressure(self) -> None:
520
+ """Reduce parent service rate when children are overloaded."""
521
+ for parent_id, children in CLUSTER_TOPOLOGY.items():
522
+ parent = next((n for n in self._nodes if n.node_id == parent_id), None)
523
+ if not children or not parent or parent.status == NodeStatus.FAILED:
524
+ continue
525
+
526
+ # Compute pressure from overloaded children
527
+ total_pressure = 0.0
528
+ for child_id in children:
529
+ child = next((n for n in self._nodes if n.node_id == child_id), None)
530
+ if child:
531
+ excess = max(0.0, child.queue_depth - BACKPRESSURE_THRESHOLD)
532
+ total_pressure += excess / FATAL_FAIL_THRESHOLD # normalise to [0, 1]
533
+
534
+ # Reduce parent's effective capacity proportionally
535
+ pressure_factor = min(BACKPRESSURE_MAX_FACTOR, total_pressure * 0.6)
536
+ parent.capacity = max(1.0, parent.capacity * (1.0 - pressure_factor))
537
+
538
  def _update_queues(self) -> None:
539
  """
540
  Fluid-queue update for all nodes.
 
598
  if n.status != NodeStatus.FAILED:
599
  n.status = NodeStatus.FAILED
600
  n.recovery_timer = NODE_RECOVERY_TICKS
601
+ n.capacity = 0.5 # starts at half capacity when recovery begins
602
  self._cascade_triggered = True # Signal cascade detection
603
  elif n.queue_depth > OVERLOAD_THRESHOLD:
604
  n.status = NodeStatus.DEGRADED
 
611
 
612
  Guardrails:
613
  - Only triggers when a NEW failure occurred this tick (not any failed node).
614
+ - Graph-bounded: cascade only propagates along edges (parents or children).
615
  - Scripted failures (Task 2) do not trigger cascades.
616
  """
617
  if not self._cascade_triggered:
 
624
  self._cascade_tick = 0
625
  return
626
 
627
+ # Find all currently failed nodes
628
+ failed_ids = {n.node_id for n in self._nodes if n.status == NodeStatus.FAILED}
629
+
630
+ # Build set of nodes adjacent to any failed node (upstream or downstream)
631
+ at_risk = set()
632
+ for failed_id in failed_ids:
633
+ # Downstream children of the failed node
634
+ at_risk.update(CLUSTER_TOPOLOGY.get(failed_id, []))
635
+ # Upstream parents of the failed node
636
+ for parent_id, children in CLUSTER_TOPOLOGY.items():
637
+ if failed_id in children:
638
+ at_risk.add(parent_id)
639
+
640
  cascade_threshold = FATAL_FAIL_THRESHOLD * CASCADE_QUEUE_MULTIPLIER
 
641
  for n in self._nodes:
642
+ if n.node_id not in at_risk:
643
+ continue # Not adjacent to failure cannot cascade
644
+ if n.status == NodeStatus.FAILED or n.is_scripted_failure:
 
 
645
  continue
646
  if n.queue_depth > cascade_threshold:
647
  n.status = NodeStatus.DEGRADED
 
648
 
649
  def _process_recovery(self) -> None:
650
  """Count down recovery timers and bring FAILED nodes back online.
 
652
  Only overload-failed nodes (recovery_timer > 0) can recover.
653
  Scripted failures (is_scripted_failure=True) are excluded.
654
  """
655
+ RECOVERY_RAMP_PER_TICK: float = 0.5 # capacity added per tick during recovery
656
+
657
  for n in self._nodes:
658
  if n.is_scripted_failure:
659
  continue
660
+ # Check recovery_timer > 0, not status == FAILED: the first recovery
661
+ # tick transitions the node to DEGRADED, but the timer must keep
662
+ # counting until it reaches 0 and the node becomes HEALTHY.
663
+ if n.recovery_timer > 0:
664
  n.recovery_timer -= 1
665
  if n.recovery_timer <= 0:
666
  n.status = NodeStatus.HEALTHY
 
667
  n.queue_depth = 0.0
668
  n.latency_ms = BASE_LATENCY_MS
669
  n.cpu_utilization = 0.0
670
+ # capacity stays at whatever it ramped to
671
+ else:
672
+ # Still in recovery: ramp capacity up, stay DEGRADED
673
+ n.capacity = min(DEFAULT_CAPACITY, n.capacity + RECOVERY_RAMP_PER_TICK)
674
+ n.status = NodeStatus.DEGRADED # not HEALTHY until fully ramped
675
 
676
  def reconcile_state(self, telemetry_map: dict) -> None:
677
  """
smoke_test.py CHANGED
@@ -172,16 +172,16 @@ def test_simulator_node_count():
172
  sim = ClusterSimulator(n_nodes=5, task_id="task-1", seed=1)
173
  nodes = sim.state(for_agent=False)
174
 
175
- record("10 nodes created",
176
- PASS if len(nodes) == 10 else FAIL,
177
  f"got {len(nodes)}")
178
 
179
  record("node-0 is VIP",
180
  PASS if nodes[0]["is_vip"] else FAIL,
181
  f"is_vip={nodes[0]['is_vip']}")
182
 
183
- record("node-0 weight=4.0",
184
- PASS if nodes[0]["importance_weight"] == 4.0 else FAIL,
185
  f"weight={nodes[0]['importance_weight']}")
186
 
187
  non_vip_weights = [n["importance_weight"] for n in nodes[1:]]
@@ -190,8 +190,8 @@ def test_simulator_node_count():
190
  f"unique weights={set(non_vip_weights)}")
191
 
192
  node_ids = [n["node_id"] for n in nodes]
193
- expected_ids = [f"node-{i}" for i in range(10)]
194
- record("Node IDs 0-9",
195
  PASS if node_ids == expected_ids else FAIL,
196
  f"ids={node_ids}")
197
 
@@ -542,8 +542,8 @@ def test_cascade_and_recovery():
542
  sim = ClusterSimulator(n_nodes=5, task_id="task-1", seed=1)
543
  sim.reset(task_id="task-1", seed=1)
544
 
545
- # Artificially overload a node to trigger failure
546
- node = sim._nodes[5]
547
  node.queue_depth = 250.0 # > FATAL_FAIL_THRESHOLD=200
548
  sim._update_statuses()
549
  record("Node fails when queue > FATAL_FAIL_THRESHOLD",
 
172
  sim = ClusterSimulator(n_nodes=5, task_id="task-1", seed=1)
173
  nodes = sim.state(for_agent=False)
174
 
175
+ record("5 nodes created",
176
+ PASS if len(nodes) == 5 else FAIL,
177
  f"got {len(nodes)}")
178
 
179
  record("node-0 is VIP",
180
  PASS if nodes[0]["is_vip"] else FAIL,
181
  f"is_vip={nodes[0]['is_vip']}")
182
 
183
+ record("node-0 weight=2.0",
184
+ PASS if nodes[0]["importance_weight"] == 2.0 else FAIL,
185
  f"weight={nodes[0]['importance_weight']}")
186
 
187
  non_vip_weights = [n["importance_weight"] for n in nodes[1:]]
 
190
  f"unique weights={set(non_vip_weights)}")
191
 
192
  node_ids = [n["node_id"] for n in nodes]
193
+ expected_ids = [f"node-{i}" for i in range(5)]
194
+ record("Node IDs 0-4",
195
  PASS if node_ids == expected_ids else FAIL,
196
  f"ids={node_ids}")
197
 
 
542
  sim = ClusterSimulator(n_nodes=5, task_id="task-1", seed=1)
543
  sim.reset(task_id="task-1", seed=1)
544
 
545
+ # Artificially overload node-2 (has children, tests graph cascade)
546
+ node = sim._nodes[2]
547
  node.queue_depth = 250.0 # > FATAL_FAIL_THRESHOLD=200
548
  sim._update_statuses()
549
  record("Node fails when queue > FATAL_FAIL_THRESHOLD",
stability.py CHANGED
@@ -53,6 +53,9 @@ Q_BARRIER_MAX: float = 150.0
53
  Set higher than OVERLOAD_THRESHOLD (80) to allow the agent time to react
54
  before the barrier penalty kicks in."""
55
 
 
 
 
56
  BARRIER_NORM_SCALE: float = 10000.0
57
  """Normalization divisor for the barrier term.
58
  The raw barrier H(s) = sum(max(0, Q_i - Q_max)^2) can produce very large values
@@ -100,6 +103,44 @@ def compute_lyapunov(nodes: list[dict]) -> float:
100
  )
101
  )
102
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
103
 
104
  def compute_drift(v_prev: float, v_curr: float) -> float:
105
  """
 
53
  Set higher than OVERLOAD_THRESHOLD (80) to allow the agent time to react
54
  before the barrier penalty kicks in."""
55
 
56
+ MAX_QUEUE_NORM: float = 200.0
57
+ """Normalization divisor shared with environment.py. Matches FATAL_FAIL_THRESHOLD."""
58
+
59
  BARRIER_NORM_SCALE: float = 10000.0
60
  """Normalization divisor for the barrier term.
61
  The raw barrier H(s) = sum(max(0, Q_i - Q_max)^2) can produce very large values
 
103
  )
104
  )
105
 
106
+ def compute_lyapunov_graph(
107
+ nodes: list[dict],
108
+ topology: dict[str, list[str]],
109
+ edge_weight: float = 5.0,
110
+ ) -> float:
111
+ """
112
+ V_graph(s) = Σ w_i·Q_i² + edge_weight · Σ_{(i,j)∈edges} |Q_i - Q_j|
113
+
114
+ The edge term penalises flow imbalance between connected nodes.
115
+ If node-0 is overloaded but node-1 (its child) is idle, the edge
116
+ term fires even if node-1's individual energy contribution is zero.
117
+ This gives the agent gradient signal to balance load across the graph,
118
+ not just reduce individual node queues.
119
+
120
+ With edge_weight=5.0 and max |Q_diff|=200 on 4 edges, the edge term
121
+ adds ~4000 to the Lyapunov energy, or ~2-5 % of the base energy at
122
+ full overload — a meaningful secondary gradient without dominating.
123
+ """
124
+ node_map = {n["node_id"]: n for n in nodes}
125
+
126
+ # Standard weighted Lyapunov
127
+ base_energy = compute_lyapunov(nodes)
128
+
129
+ # Edge imbalance penalty (raw queue-depth differences)
130
+ edge_penalty = 0.0
131
+ for parent_id, children in topology.items():
132
+ parent = node_map.get(parent_id)
133
+ if not parent:
134
+ continue
135
+ for child_id in children:
136
+ child = node_map.get(child_id)
137
+ if not child:
138
+ continue
139
+ imbalance = abs(float(parent["queue_depth"]) - float(child["queue_depth"]))
140
+ edge_penalty += imbalance
141
+
142
+ return base_energy + edge_weight * edge_penalty
143
+
144
 
145
  def compute_drift(v_prev: float, v_curr: float) -> float:
146
  """
validate_dag_physics.py ADDED
@@ -0,0 +1,512 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ Comprehensive DAG Physics Validation for AntiAtropos training readiness.
4
+
5
+ Verifies:
6
+ A. DAG traffic routing (parent->child propagation)
7
+ B. Task-2 scripted failure flows through DAG (not bypassed)
8
+ C. Task-3 surge correct overlay on DAG
9
+ D. Backpressure is temporary (not permanent capacity drain)
10
+ E. Gradual recovery completes fully
11
+ F. Graph-bounded cascades
12
+ G. Reroute weights work with DAG
13
+ H. Graph Lyapunov edge penalty
14
+ I. Environment observation populates graph fields
15
+ J. Reward components are non-degenerate across tasks
16
+
17
+ Run: python validate_dag_physics.py
18
+ """
19
+
20
+ import sys, os, math
21
+ sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
22
+
23
+ from simulator import (
24
+ ClusterSimulator, NodeStatus, DEFAULT_CAPACITY,
25
+ CLUSTER_TOPOLOGY, EXTERNAL_TRAFFIC_NODES, _TOPOLOGICAL_ORDER,
26
+ DEFAULT_ROUTING_SPLIT, T1_INITIAL_LAMBDA, T2_INITIAL_LAMBDA,
27
+ T3_INITIAL_LAMBDA, T3_SURGE_MAGNITUDE,
28
+ COST_PER_CAPACITY_UNIT_PER_HOUR, FATAL_FAIL_THRESHOLD,
29
+ NODE_RECOVERY_TICKS, BACKPRESSURE_THRESHOLD,
30
+ )
31
+ from stability import (
32
+ compute_lyapunov, compute_lyapunov_graph, compute_reward,
33
+ compute_barrier, normalize_reward, smooth_sla_penalty, compute_drift,
34
+ BARRIER_NORM_SCALE,
35
+ )
36
+
37
+ # --- Test harness ---
38
+ PASS = "PASS"
39
+ FAIL = "FAIL"
40
+ results = []
41
+
42
+ def record(name, status, detail=""):
43
+ results.append((name, status, detail))
44
+ icon = "+" if status == PASS else "X"
45
+ msg = f" [{icon}] {name}"
46
+ if detail:
47
+ msg += f" -- {detail}"
48
+ print(msg)
49
+
50
+ # ============================================================================
51
+ # A. DAG Traffic Routing
52
+ # ============================================================================
53
+ def test_A_dag_routing():
54
+ print("\n=== A. DAG Traffic Routing ===")
55
+ sim = ClusterSimulator(n_nodes=5, task_id="task-1", seed=1)
56
+ sim.reset(task_id="task-1", seed=1)
57
+
58
+ # Tick once with NO_OP
59
+ class _A: pass
60
+ a = _A(); a.action_type = "NO_OP"; a.target_node_id = "node-0"; a.parameter = 0.0
61
+ sim.apply_action(a)
62
+ sim.tick()
63
+
64
+ nodes = {n.node_id: n for n in sim._nodes}
65
+
66
+ # node-0 and node-4 should receive external traffic
67
+ n0_in = nodes["node-0"].incoming_request_rate
68
+ n4_in = nodes["node-4"].incoming_request_rate
69
+ record("node-0 (ingress) receives traffic",
70
+ PASS if n0_in > 0 else FAIL,
71
+ f"incoming={n0_in:.1f}")
72
+ record("node-4 (ingress) receives traffic",
73
+ PASS if n4_in > 0 else FAIL,
74
+ f"incoming={n4_in:.1f}")
75
+
76
+ # node-1, node-2 should receive outflow from node-0
77
+ n1_in = nodes["node-1"].incoming_request_rate
78
+ n2_in = nodes["node-2"].incoming_request_rate
79
+ record("node-1 receives from node-0 (DAG child)",
80
+ PASS if n1_in > 0 else FAIL,
81
+ f"incoming={n1_in:.1f}")
82
+ record("node-2 receives from node-0 (DAG child)",
83
+ PASS if n2_in > 0 else FAIL,
84
+ f"incoming={n2_in:.1f}")
85
+
86
+ # node-3 should receive outflow from node-2
87
+ n3_in = nodes["node-3"].incoming_request_rate
88
+ record("node-3 receives from node-2 (DAG grandchild)",
89
+ PASS if n3_in > 0 else FAIL,
90
+ f"incoming={n3_in:.1f}")
91
+
92
+ # node-0 outflow should be ~incoming (since capacity >> lambda at start)
93
+ record("node-0 has positive outflow_rate",
94
+ PASS if nodes["node-0"].outflow_rate > 0 else FAIL,
95
+ f"outflow={nodes['node-0'].outflow_rate:.1f}")
96
+
97
+
98
+ # ============================================================================
99
+ # B. Task-2 Scripted Failure Flows Through DAG
100
+ # ============================================================================
101
+ def test_B_task2_dag():
102
+ print("\n=== B. Task-2 Failure Through DAG ===")
103
+ sim = ClusterSimulator(n_nodes=5, task_id="task-2", seed=42)
104
+ sim.reset(task_id="task-2", seed=42)
105
+
106
+ # Run enough ticks for failure to trigger
107
+ class _A: pass
108
+ a = _A(); a.action_type = "NO_OP"; a.target_node_id = "node-0"; a.parameter = 0.0
109
+
110
+ failed_id = None
111
+ for _ in range(60):
112
+ sim.apply_action(a)
113
+ sim.tick()
114
+ if sim._failed_node_id and failed_id is None:
115
+ failed_id = sim._failed_node_id
116
+
117
+ record("Scripted failure was assigned",
118
+ PASS if failed_id is not None else FAIL,
119
+ f"failed_id={failed_id}")
120
+
121
+ nodes = {n.node_id: n for n in sim._nodes}
122
+ failed_node = nodes.get(failed_id)
123
+ record("Failed node has FAILED status",
124
+ PASS if failed_node and failed_node.status == NodeStatus.FAILED else FAIL,
125
+ f"status={failed_node.status if failed_node else 'N/A'}")
126
+
127
+ # If failed node is a child of node-0 (e.g., node-1 or node-2),
128
+ # node-0 should still be sending traffic to it (flow not bypassed)
129
+ if failed_id in CLUSTER_TOPOLOGY.get("node-0", []):
130
+ # The failed node outflow should be 0 (service_rate=0),
131
+ # but it should still have incoming from DAG
132
+ record("Failed child still receives DAG traffic (as dropped requests)",
133
+ PASS if failed_node.incoming_request_rate >= 0 else FAIL,
134
+ f"incoming={failed_node.incoming_request_rate:.1f} dropped={failed_node.dropped_requests:.1f}")
135
+
136
+ # If failed node was node-2, node-3 should be starved (outflow=0 upstream)
137
+ if failed_id == "node-2":
138
+ n3 = nodes["node-3"]
139
+ record("node-3 starved when parent node-2 fails",
140
+ PASS if n3.incoming_request_rate == 0 else FAIL,
141
+ f"node-3 incoming={n3.incoming_request_rate:.1f}")
142
+
143
+
144
+ # ============================================================================
145
+ # C. Task-3 Surge Overlay on DAG
146
+ # ============================================================================
147
+ def test_C_task3_surge_dag():
148
+ print("\n=== C. Task-3 Surge Overlay ===")
149
+ sim = ClusterSimulator(n_nodes=5, task_id="task-3", seed=7)
150
+ sim.reset(task_id="task-3", seed=7)
151
+
152
+ # Force surge window to be active immediately
153
+ sim._t3_surge_start = 0
154
+ sim._t3_surge_end = 999
155
+
156
+ class _A: pass
157
+ a = _A(); a.action_type = "NO_OP"; a.target_node_id = "node-0"; a.parameter = 0.0
158
+ sim.apply_action(a)
159
+ sim.tick()
160
+
161
+ nodes = {n.node_id: n for n in sim._nodes}
162
+
163
+ # node-1 and node-2 should have surge + DAG traffic
164
+ n1_in = nodes["node-1"].incoming_request_rate
165
+ n2_in = nodes["node-2"].incoming_request_rate
166
+ record("node-1 receives surge + DAG traffic",
167
+ PASS if n1_in > T3_SURGE_MAGNITUDE else FAIL,
168
+ f"incoming={n1_in:.1f} (surge={T3_SURGE_MAGNITUDE})")
169
+ record("node-2 receives surge + DAG traffic",
170
+ PASS if n2_in > T3_SURGE_MAGNITUDE else FAIL,
171
+ f"incoming={n2_in:.1f} (surge={T3_SURGE_MAGNITUDE})")
172
+
173
+ # node-0 should still have base DAG traffic (not affected by surge directly)
174
+ n0_in = nodes["node-0"].incoming_request_rate
175
+ record("node-0 gets base DAG traffic (surge is side-channel)",
176
+ PASS if n0_in < T3_SURGE_MAGNITUDE else FAIL,
177
+ f"node-0 incoming={n0_in:.1f}")
178
+
179
+
180
+ # ============================================================================
181
+ # D. Backpressure Is Temporary
182
+ # ============================================================================
183
+ def test_D_backpressure_temporary():
184
+ print("\n=== D. Backpressure Temporary ===")
185
+ sim = ClusterSimulator(n_nodes=5, task_id="task-1", seed=1)
186
+ sim.reset(task_id="task-1", seed=1)
187
+
188
+ node0 = next(n for n in sim._nodes if n.node_id == "node-0")
189
+ original_cap = node0.capacity
190
+
191
+ # Artificially overload node-0's children to trigger backpressure
192
+ for n in sim._nodes:
193
+ if n.node_id in CLUSTER_TOPOLOGY.get("node-0", []):
194
+ n.queue_depth = BACKPRESSURE_THRESHOLD + 100.0 # well above threshold
195
+
196
+ # Tick: backpressure should reduce node-0's capacity for THIS tick only
197
+ class _A: pass
198
+ a = _A(); a.action_type = "NO_OP"; a.target_node_id = "node-0"; a.parameter = 0.0
199
+ sim.apply_action(a)
200
+ sim.tick()
201
+
202
+ cap_after_tick = node0.capacity
203
+ record("node-0 capacity restored after backpressure tick",
204
+ PASS if abs(cap_after_tick - original_cap) < 0.01 else FAIL,
205
+ f"before={original_cap:.2f} after={cap_after_tick:.2f}")
206
+
207
+ # Tick again (children still overloaded) — capacity should still be original
208
+ sim.apply_action(a)
209
+ sim.tick()
210
+ cap_after_tick2 = node0.capacity
211
+ record("node-0 capacity intact after multiple backpressure ticks",
212
+ PASS if abs(cap_after_tick2 - original_cap) < 0.01 else FAIL,
213
+ f"after 2 ticks={cap_after_tick2:.2f} original={original_cap:.2f}")
214
+
215
+ # Clear children overload — capacity should remain original
216
+ for n in sim._nodes:
217
+ if n.node_id in CLUSTER_TOPOLOGY.get("node-0", []):
218
+ n.queue_depth = 0.0
219
+ sim.apply_action(a)
220
+ sim.tick()
221
+ cap_clear = node0.capacity
222
+ record("node-0 capacity unchanged after children clear",
223
+ PASS if abs(cap_clear - original_cap) < 0.01 else FAIL,
224
+ f"capacity={cap_clear:.2f}")
225
+
226
+
227
+ # ============================================================================
228
+ # E. Gradual Recovery Completes
229
+ # ============================================================================
230
+ def test_E_gradual_recovery():
231
+ print("\n=== E. Gradual Recovery ===")
232
+ sim = ClusterSimulator(n_nodes=5, task_id="task-1", seed=1)
233
+ sim.reset(task_id="task-1", seed=1)
234
+
235
+ node = sim._nodes[2]
236
+ node.queue_depth = 250.0
237
+ sim._update_statuses()
238
+
239
+ record("Node becomes FAILED on overload",
240
+ PASS if node.status == NodeStatus.FAILED else FAIL,
241
+ f"status={node.status}")
242
+ record("Capacity drops to 0.5 at failure",
243
+ PASS if abs(node.capacity - 0.5) < 0.01 else FAIL,
244
+ f"capacity={node.capacity}")
245
+
246
+ # Tick through full recovery (NODE_RECOVERY_TICKS + some margin)
247
+ class _A: pass
248
+ a = _A(); a.action_type = "NO_OP"; a.target_node_id = "node-0"; a.parameter = 0.0
249
+ for _ in range(NODE_RECOVERY_TICKS + 5):
250
+ sim.apply_action(a)
251
+ sim.tick()
252
+
253
+ record("Node reaches HEALTHY after full recovery",
254
+ PASS if node.status == NodeStatus.HEALTHY else FAIL,
255
+ f"status={node.status}")
256
+
257
+ # Capacity should have ramped: start=0.5, each recovery tick adds 0.5
258
+ # After NODE_RECOVERY_TICKS=20 ticks: 0.5 + 20*0.5 = 10.5, capped at 3.0
259
+ record("Capacity recovered to DEFAULT_CAPACITY (capped)",
260
+ PASS if abs(node.capacity - DEFAULT_CAPACITY) < 0.01 else FAIL,
261
+ f"capacity={node.capacity:.2f} expected={DEFAULT_CAPACITY}")
262
+
263
+
264
+ # ============================================================================
265
+ # F. Graph-Bounded Cascades
266
+ # ============================================================================
267
+ def test_F_graph_cascade():
268
+ print("\n=== F. Graph-Bounded Cascades ===")
269
+ sim = ClusterSimulator(n_nodes=5, task_id="task-1", seed=1)
270
+ sim.reset(task_id="task-1", seed=1)
271
+
272
+ # Fail node-2, overload its children/parents
273
+ node2 = sim._nodes[2]
274
+ node2.queue_depth = 250.0
275
+ sim._update_statuses() # node-2 becomes FAILED, triggers cascade
276
+
277
+ # node-3 is child of node-2 — should be at_risk
278
+ # node-0 is parent of node-2 — should be at_risk
279
+ node3 = sim._nodes[3]
280
+ node0 = sim._nodes[0]
281
+ # Overload node-3 to trigger cascade
282
+ node3.queue_depth = FATAL_FAIL_THRESHOLD * 1.5 # > cascade threshold
283
+ sim._cascade_failures()
284
+
285
+ record("node-3 (child of failed node-2) cascades to DEGRADED",
286
+ PASS if node3.status == NodeStatus.DEGRADED else FAIL,
287
+ f"node-3 status={node3.status}")
288
+
289
+ # node-4 is NOT adjacent to node-2 — should NOT cascade
290
+ node4 = sim._nodes[4]
291
+ node4.queue_depth = FATAL_FAIL_THRESHOLD * 1.5
292
+ sim._cascade_failures()
293
+ record("node-4 (not adjacent to failed) does NOT cascade",
294
+ PASS if node4.status != NodeStatus.DEGRADED else FAIL,
295
+ f"node-4 status={node4.status}")
296
+
297
+
298
+ # ============================================================================
299
+ # G. Reroute Weights with DAG
300
+ # ============================================================================
301
+ def test_G_reroute_with_dag():
302
+ print("\n=== G. Reroute Weights with DAG ===")
303
+ sim = ClusterSimulator(n_nodes=5, task_id="task-1", seed=1)
304
+ sim.reset(task_id="task-1", seed=1)
305
+
306
+ class _A: pass
307
+
308
+ # Tick once to establish baseline
309
+ a = _A(); a.action_type = "NO_OP"; a.target_node_id = "node-0"; a.parameter = 0.0
310
+ sim.apply_action(a)
311
+ sim.tick()
312
+
313
+ node0 = next(n for n in sim._nodes if n.node_id == "node-0")
314
+ node4 = next(n for n in sim._nodes if n.node_id == "node-4")
315
+ baseline_n0_in = node0.incoming_request_rate
316
+
317
+ # Reroute 100% of node-0 traffic away
318
+ a2 = _A(); a2.action_type = "REROUTE_TRAFFIC"; a2.target_node_id = "node-0"; a2.parameter = 1.0
319
+ sim.apply_action(a2)
320
+ sim.tick()
321
+
322
+ n0_in_after = node0.incoming_request_rate
323
+ record("Reroute reduces node-0 incoming traffic",
324
+ PASS if n0_in_after < baseline_n0_in else FAIL,
325
+ f"before={baseline_n0_in:.1f} after={n0_in_after:.1f}")
326
+
327
+ # Verify outflow_rate is also reduced (since incoming is lower)
328
+ record("node-0 outflow reduced after reroute",
329
+ PASS if node0.outflow_rate < baseline_n0_in else FAIL,
330
+ f"outflow={node0.outflow_rate:.1f}")
331
+
332
+ # Reroute weight should decay each tick (0.5 factor)
333
+ w = sim._reroute_weights.get("node-0", 0.0)
334
+ record("Reroute weight decays (0.5 * prev)",
335
+ PASS if w < 1.0 else FAIL,
336
+ f"weight after first decay={w:.3f}")
337
+
338
+
339
+ # ============================================================================
340
+ # H. Graph Lyapunov Edge Penalty
341
+ # ============================================================================
342
+ def test_H_graph_lyapunov():
343
+ print("\n=== H. Graph Lyapunov Edge Penalty ===")
344
+ nodes_balanced = [
345
+ {"node_id": "node-0", "queue_depth": 50.0, "importance_weight": 2.0},
346
+ {"node_id": "node-4", "queue_depth": 50.0, "importance_weight": 1.0},
347
+ {"node_id": "node-1", "queue_depth": 50.0, "importance_weight": 1.0},
348
+ {"node_id": "node-2", "queue_depth": 50.0, "importance_weight": 1.0},
349
+ {"node_id": "node-3", "queue_depth": 50.0, "importance_weight": 1.0},
350
+ ]
351
+ nodes_imbalanced = [
352
+ {"node_id": "node-0", "queue_depth": 200.0, "importance_weight": 2.0},
353
+ {"node_id": "node-4", "queue_depth": 10.0, "importance_weight": 1.0},
354
+ {"node_id": "node-1", "queue_depth": 10.0, "importance_weight": 1.0},
355
+ {"node_id": "node-2", "queue_depth": 10.0, "importance_weight": 1.0},
356
+ {"node_id": "node-3", "queue_depth": 10.0, "importance_weight": 1.0},
357
+ ]
358
+
359
+ v_bal = compute_lyapunov_graph(nodes_balanced, CLUSTER_TOPOLOGY)
360
+ v_imb = compute_lyapunov_graph(nodes_imbalanced, CLUSTER_TOPOLOGY)
361
+ record("Graph Lyapunov: imbalanced > balanced",
362
+ PASS if v_imb > v_bal else FAIL,
363
+ f"balanced={v_bal:.1f} imbalanced={v_imb:.1f}")
364
+
365
+ # Compare with flat Lyapunov: graph version should add edge penalty
366
+ v_flat_imb = compute_lyapunov(nodes_imbalanced)
367
+ record("Graph Lyapunov > flat Lyapunov for imbalanced cluster",
368
+ PASS if v_imb > v_flat_imb else FAIL,
369
+ f"graph={v_imb:.1f} flat={v_flat_imb:.1f}")
370
+
371
+
372
+ # ============================================================================
373
+ # I. Environment Observation Populates Graph Fields
374
+ # ============================================================================
375
+ def test_I_env_graph_fields():
376
+ print("\n=== I. Environment Graph Fields ===")
377
+ try:
378
+ from server.AntiAtropos_environment import AntiAtroposEnvironment
379
+ from models import SREAction, ActionType
380
+ except ImportError:
381
+ record("Environment import", FAIL, "Cannot import AntiAtroposEnvironment")
382
+ return
383
+
384
+ env = AntiAtroposEnvironment()
385
+ obs = env.reset(task_id="task-1", mode="simulated", seed=42)
386
+
387
+ # Check that NodeObservations have graph fields
388
+ n0 = next((n for n in obs.nodes if n.node_id == "node-0"), None)
389
+ record("Environment reset succeeds",
390
+ PASS if n0 is not None else FAIL, "")
391
+
392
+ if n0:
393
+ record("node-0 has downstream_nodes",
394
+ PASS if isinstance(n0.downstream_nodes, list) and len(n0.downstream_nodes) > 0 else FAIL,
395
+ f"downstream={n0.downstream_nodes}")
396
+ record("node-0 has upstream_nodes",
397
+ PASS if isinstance(n0.upstream_nodes, list) else FAIL,
398
+ f"upstream={n0.upstream_nodes}")
399
+ record("node-0 has upstream_pressure",
400
+ PASS if n0.upstream_pressure is not None else FAIL,
401
+ f"pressure={n0.upstream_pressure:.3f}")
402
+ record("node-0 has outflow_rate",
403
+ PASS if n0.outflow_rate is not None else FAIL,
404
+ f"outflow={n0.outflow_rate:.3f}")
405
+
406
+ # Step once with SCALE_UP to verify reward computation
407
+ action = SREAction(action_type=ActionType.SCALE_UP, target_node_id="node-0", parameter=0.5)
408
+ obs2 = env.step(action)
409
+
410
+ record("Step reward is non-zero",
411
+ PASS if obs2.reward != 0.0 else FAIL,
412
+ f"reward={obs2.reward:.4f}")
413
+ # Lyapunov can be 0 on first tick if all queues are below capacity.
414
+ # Just verify it's a number (not None/NaN).
415
+ lyap_ok = obs2.lyapunov_energy is not None and not math.isnan(obs2.lyapunov_energy)
416
+ record("lyapunov_energy is valid (>=0, not NaN)",
417
+ PASS if lyap_ok else FAIL,
418
+ f"energy={obs2.lyapunov_energy}")
419
+
420
+
421
+ # ============================================================================
422
+ # J. Reward Components Across Tasks
423
+ # ============================================================================
424
+ def test_J_reward_components():
425
+ print("\n=== J. Reward Components ===")
426
+ env_module = None
427
+ try:
428
+ from server.AntiAtropos_environment import AntiAtroposEnvironment
429
+ from models import SREAction, ActionType
430
+ except ImportError:
431
+ record("Reward components", FAIL, "Cannot import environment")
432
+ return
433
+
434
+ for task_id, warmup_ticks in [("task-1", 30), ("task-2", 5), ("task-3", 5)]:
435
+ env = AntiAtroposEnvironment()
436
+ env.reset(task_id=task_id, mode="simulated", seed=42)
437
+
438
+ # Task-3 surge may not be active in early ticks (window depends on
439
+ # jitter). Force the surge window to be wide-open for validation.
440
+ if task_id == "task-3":
441
+ env._sim._t3_surge_start = 1
442
+ env._sim._t3_surge_end = 999
443
+
444
+ # Run enough ticks for queues to accumulate and drift to become non-zero.
445
+ # Task-1 has a slow ramp (0.5/tick) and needs ~15+ ticks to exceed ingress
446
+ # capacity of 90 req/tick (starting from ~86).
447
+ action = SREAction(action_type=ActionType.NO_OP, target_node_id="node-0", parameter=0.0)
448
+ obs = None
449
+ for _ in range(warmup_ticks):
450
+ obs = env.step(action)
451
+
452
+ # All sub-components should be present
453
+ has_drift = obs.reward_drift != 0.0
454
+ has_cost = obs.reward_cost != 0.0
455
+ record(f"{task_id}: reward_drift non-zero (after {warmup_ticks} ticks)",
456
+ PASS if has_drift else FAIL,
457
+ f"drift={obs.reward_drift:.4f}")
458
+ record(f"{task_id}: reward_cost non-zero",
459
+ PASS if has_cost else FAIL,
460
+ f"cost={obs.reward_cost:.4f}")
461
+ record(f"{task_id}: no NaN in raw_reward",
462
+ PASS if not math.isnan(obs.raw_reward) else FAIL,
463
+ f"raw={obs.raw_reward:.4f}")
464
+
465
+ # Quick check: drift and cost should be negative (penalties)
466
+ env = AntiAtroposEnvironment()
467
+ env.reset(task_id="task-1", mode="simulated", seed=42)
468
+ action = SREAction(action_type=ActionType.NO_OP, target_node_id="node-0", parameter=0.0)
469
+ obs = env.step(action)
470
+ record("reward_drift <= 0 (penalty, not reward)",
471
+ PASS if obs.reward_drift <= 0 else FAIL,
472
+ f"drift={obs.reward_drift:.4f}")
473
+
474
+
475
+ # ============================================================================
476
+ # Main
477
+ # ============================================================================
478
+ def main():
479
+ print("=" * 65)
480
+ print("AntiAtropos DAG Physics Validation")
481
+ print("=" * 65)
482
+
483
+ test_A_dag_routing()
484
+ test_B_task2_dag()
485
+ test_C_task3_surge_dag()
486
+ test_D_backpressure_temporary()
487
+ test_E_gradual_recovery()
488
+ test_F_graph_cascade()
489
+ test_G_reroute_with_dag()
490
+ test_H_graph_lyapunov()
491
+ test_I_env_graph_fields()
492
+ test_J_reward_components()
493
+
494
+ passed = sum(1 for _, s, _ in results if s == PASS)
495
+ failed = sum(1 for _, s, _ in results if s == FAIL)
496
+ total = len(results)
497
+
498
+ print("\n" + "=" * 65)
499
+ print(f"RESULTS: {passed}/{total} passed, {failed} failed")
500
+ print("=" * 65)
501
+
502
+ if failed > 0:
503
+ print("\nFAILED TESTS:")
504
+ for name, status, detail in results:
505
+ if status == FAIL:
506
+ print(f" X {name}: {detail}")
507
+
508
+ return 0 if failed == 0 else 1
509
+
510
+
511
+ if __name__ == "__main__":
512
+ sys.exit(main())