div18 commited on
Commit
d062bfb
Β·
1 Parent(s): 7ccb648

fix(inference): refine scaling and rerouting rules and action behavior

Browse files

- Updated task-2 and task-3 descriptions for clearer scaling and rerouting guidelines
- Modified SCALE_DOWN action to first cancel pending boots before removing active capacity
- Changed parsing of target_node_id and parameter to handle None values correctly
- Allowed NO_OP action to always succeed without target node validation
- Improved SCALE_DOWN implementation to prioritize canceling pending capacity
- Clarified cost tiers to classify pending capacity with justified excess at moderate rate
- Adjusted cost calculation for active and pending capacity to reflect tiered charging accurately

control/validation.py CHANGED
@@ -38,6 +38,10 @@ class ActionValidator:
38
 
39
  cooldown_penalty = 0.0
40
 
 
 
 
 
41
  if valid_targets is not None and target not in valid_targets:
42
  return False, f"Unknown target node: {target}", 0.0
43
 
 
38
 
39
  cooldown_penalty = 0.0
40
 
41
+ # NO_OP always succeeds β€” target and parameter don't matter
42
+ if action == "NO_OP":
43
+ return True, "Success", 0.0
44
+
45
  if valid_targets is not None and target not in valid_targets:
46
  return False, f"Unknown target node: {target}", 0.0
47
 
inference.py CHANGED
@@ -57,8 +57,8 @@ TEMPERATURE_SWEEP = [0.6, 0.3, 0.7] # Fixed temperatures for multi-episode eval
57
 
58
  TASK_BRIEFS: Dict[str, str] = {
59
  "task-1": "Traffic ramps linearly every tick. Scale up proactively β€” new capacity takes 5 ticks to boot. Keep latency under SLA (200ms) while minimizing cost. Scale down when queues are safe.",
60
- "task-2": "One node will fail permanently (any of node-1 through node-4, never node-0). STEP 1: scan all nodes and find which has status=FAILED and outflow=0. STEP 2: if the failed node has children, scale those children up (they are starved). STEP 3: reroute traffic from THE FAILED NODE (not its parent!) to healthy peers. If node-4 failed (independent), scale up node-0 to compensate.",
61
- "task-3": "A surge (~75 req/tick) will hit node-1 and node-2 via a side channel bypassing node-0. Do NOT pre-scale β€” idle capacity costs money and there is no advance warning. Monitor queue_depth: ONLY scale node-1/node-2 when you SEE their queues climbing sharply. 3-4 SCALE_UPs on each is sufficient (5 replicas handles the burst at equilibrium). If queues don't drop after 4 SCALE_UPs, STOP scaling and try REROUTE instead. Scale down after queues return to safe levels to save cost.",
62
  }
63
 
64
  SYSTEM_PROMPT = textwrap.dedent(
@@ -74,7 +74,7 @@ SYSTEM_PROMPT = textwrap.dedent(
74
 
75
  ACTIONS (new capacity takes 5 ticks to boot):
76
  SCALE_UP <node> <amount> β€” add capacity (0.3-0.5 normal, 0.6-0.8 heavy surge), clears DEGRADED
77
- SCALE_DOWN <node> <amount> β€” remove capacity (0.2-0.4 safe, 0.5-0.7 aggressive)
78
  REROUTE_TRAFFIC <node> <fraction> β€” reduce THIS node capacity, redistribute to peers (0.3-0.5)
79
  SHED_LOAD <node> <fraction> β€” drop incoming traffic (0.3-0.5), NEVER on node-0 (payment gateway)
80
  NO_OP β€” do nothing
@@ -293,8 +293,8 @@ def _extract_json_object(text: str) -> dict:
293
 
294
  def _parse_action(payload: dict) -> SREAction:
295
  action_type = str(payload.get("action_type", "NO_OP")).upper()
296
- target_node_id = str(payload.get("target_node_id", "node-0"))
297
- parameter = float(payload.get("parameter", 0.0))
298
  return SREAction(
299
  action_type=ActionType(action_type),
300
  target_node_id=target_node_id,
 
57
 
58
  TASK_BRIEFS: Dict[str, str] = {
59
  "task-1": "Traffic ramps linearly every tick. Scale up proactively β€” new capacity takes 5 ticks to boot. Keep latency under SLA (200ms) while minimizing cost. Scale down when queues are safe.",
60
+ "task-2": "One node (node-1 through node-4) will fail permanently. Wait until you SEE a FAILED node β€” do NOT pre-scale. Once a node shows status=FAILED: reroute traffic FROM the failed node to healthy peers, and scale up any starved children. Do NOT scale node-0 unless node-4 failed independently. SCALE_DOWN cancels pending boots and reduces cost. If reward is falling, stop scaling.",
61
+ "task-3": "A surge (~75 req/tick) will hit node-1 and node-2 via a side channel bypassing node-0. Do NOT scale node-0 β€” it is NOT affected. ONLY scale node-1 or node-2 when their queue_depth rises. Do NOT pre-scale. 3-4 SCALE_UPs on each is sufficient. SCALE_DOWN cancels pending boots and reduces cost β€” use it when queues are safe. If reward is falling, STOP scaling and SCALE_DOWN to recover.",
62
  }
63
 
64
  SYSTEM_PROMPT = textwrap.dedent(
 
74
 
75
  ACTIONS (new capacity takes 5 ticks to boot):
76
  SCALE_UP <node> <amount> β€” add capacity (0.3-0.5 normal, 0.6-0.8 heavy surge), clears DEGRADED
77
+ SCALE_DOWN <node> <amount> β€” cancel pending boots first, then remove active capacity (0.2-0.4 safe, 0.5-0.7 aggressive)
78
  REROUTE_TRAFFIC <node> <fraction> β€” reduce THIS node capacity, redistribute to peers (0.3-0.5)
79
  SHED_LOAD <node> <fraction> β€” drop incoming traffic (0.3-0.5), NEVER on node-0 (payment gateway)
80
  NO_OP β€” do nothing
 
293
 
294
  def _parse_action(payload: dict) -> SREAction:
295
  action_type = str(payload.get("action_type", "NO_OP")).upper()
296
+ target_node_id = str(payload.get("target_node_id") or "node-0")
297
+ parameter = float(payload.get("parameter") or 0.0)
298
  return SREAction(
299
  action_type=ActionType(action_type),
300
  target_node_id=target_node_id,
server/AntiAtropos_environment.py CHANGED
@@ -389,41 +389,61 @@ class AntiAtroposEnvironment(Environment):
389
 
390
  Tier 1 β€” Baseline capacity (up to DEFAULT_CAPACITY): cheap base rate.
391
  Infrastructure already provisioned and paid for β€” no penalty.
392
- Tier 2 β€” Needed excess (above DEFAULT_CAPACITY, up to 'needed'): moderate
393
- rate (4Γ— base). Agent added capacity that's actually serving traffic β€”
394
- costs more but is justified by demand.
395
- Tier 3 β€” Idle excess (above 'needed'): expensive penalty rate (20Γ— base).
396
- Capacity sitting idle beyond what traffic requires β€” pure waste.
397
-
398
- 'needed' = ceil(incoming_rate / 15) β€” minimum units to serve traffic.
399
- With DEFAULT_CAPACITY=3, a node at baseline costs 3 Γ— $0.05 = $0.15/hr
400
- regardless of traffic. Only scaling ABOVE baseline triggers higher rates,
401
- giving the agent a clear gradient: scale just enough, not too much.
 
 
 
 
 
 
402
  """
403
  total_cost = 0.0
404
- baseline_cap = int(DEFAULT_CAPACITY) # Tier 1 ceiling (imported from simulator)
405
  for node in nodes_true:
406
  if node["status"] == NodeStatus.FAILED:
407
  continue
408
- capacity = int(node.get("capacity_units", 0)) + int(node.get("pending_capacity_units", 0))
 
 
409
  if capacity <= 0:
410
  continue
411
  incoming = float(node.get("incoming_request_rate", 0.0))
412
  needed = max(1, int(math.ceil(incoming / 15.0)))
413
 
414
- if capacity <= baseline_cap:
 
415
  # Tier 1: baseline provisioned capacity β€” cheap base rate
416
- total_cost += capacity * COST_PER_CAPACITY_UNIT_PER_HOUR
417
  else:
418
  # Tier 1: baseline portion at cheap rate
419
  total_cost += baseline_cap * COST_PER_CAPACITY_UNIT_PER_HOUR
420
- above_baseline = capacity - baseline_cap
421
  justified = max(0, needed - baseline_cap) # excess that serves traffic
422
- idle = max(0, above_baseline - justified) # excess sitting idle (never negative)
423
  # Tier 2: needed excess at moderate rate (4Γ— base)
424
  total_cost += justified * (COST_PER_CAPACITY_UNIT_PER_HOUR * 4.0)
425
  # Tier 3: idle excess at penalty rate (20Γ— base)
426
  total_cost += idle * OVERPROVISION_COST_PER_UNIT
 
 
 
 
 
 
 
 
 
 
 
427
  return total_cost
428
 
429
  def _avg_latency(self, nodes: list[dict]) -> float:
 
389
 
390
  Tier 1 β€” Baseline capacity (up to DEFAULT_CAPACITY): cheap base rate.
391
  Infrastructure already provisioned and paid for β€” no penalty.
392
+ Tier 2 β€” Justified excess (above DEFAULT_CAPACITY, up to 'needed', or
393
+ pending/booting capacity): moderate rate (4Γ— base). Agent-added
394
+ capacity that's serving traffic OR in the boot queue β€” costs more
395
+ but is defensible.
396
+ Tier 3 β€” Idle excess (above 'needed', active only): expensive penalty
397
+ rate (20Γ— base). ACTIVE capacity sitting idle beyond what traffic
398
+ requires β€” pure waste.
399
+
400
+ Key: PENDING capacity is always charged at Tier 2 (justified), not Tier 3
401
+ (idle waste). Pending units haven't booted yet so they CAN'T serve traffic;
402
+ classifying them as "idle waste" penalises the agent for the boot delay
403
+ which it cannot control. Once they boot, they become active and are
404
+ reclassified as justified or idle based on actual traffic.
405
+
406
+ 'needed' = ceil(incoming_rate / 15) β€” minimum ACTIVE units to serve traffic.
407
+ With DEFAULT_CAPACITY=3, a node at baseline costs 3 Γ— $0.05 = $0.15/hr.
408
  """
409
  total_cost = 0.0
410
+ baseline_cap = int(DEFAULT_CAPACITY) # Tier 1 ceiling
411
  for node in nodes_true:
412
  if node["status"] == NodeStatus.FAILED:
413
  continue
414
+ active = int(node.get("capacity_units", 0))
415
+ pending = int(node.get("pending_capacity_units", 0))
416
+ capacity = active + pending
417
  if capacity <= 0:
418
  continue
419
  incoming = float(node.get("incoming_request_rate", 0.0))
420
  needed = max(1, int(math.ceil(incoming / 15.0)))
421
 
422
+ # --- Active capacity ---
423
+ if active <= baseline_cap:
424
  # Tier 1: baseline provisioned capacity β€” cheap base rate
425
+ total_cost += active * COST_PER_CAPACITY_UNIT_PER_HOUR
426
  else:
427
  # Tier 1: baseline portion at cheap rate
428
  total_cost += baseline_cap * COST_PER_CAPACITY_UNIT_PER_HOUR
429
+ above_baseline = active - baseline_cap
430
  justified = max(0, needed - baseline_cap) # excess that serves traffic
431
+ idle = max(0, above_baseline - justified) # excess sitting idle
432
  # Tier 2: needed excess at moderate rate (4Γ— base)
433
  total_cost += justified * (COST_PER_CAPACITY_UNIT_PER_HOUR * 4.0)
434
  # Tier 3: idle excess at penalty rate (20Γ— base)
435
  total_cost += idle * OVERPROVISION_COST_PER_UNIT
436
+
437
+ # --- Pending capacity (always Tier 2 β€” booting, not yet serving) ---
438
+ if pending > 0:
439
+ # How much of the baseline budget is unused by active capacity?
440
+ baseline_remaining = max(0, baseline_cap - active)
441
+ # Pending fills remaining baseline slots first (Tier 1 rate)
442
+ pending_at_baseline = min(pending, baseline_remaining)
443
+ pending_above = pending - pending_at_baseline
444
+ total_cost += pending_at_baseline * COST_PER_CAPACITY_UNIT_PER_HOUR
445
+ total_cost += pending_above * (COST_PER_CAPACITY_UNIT_PER_HOUR * 4.0) # Tier 2
446
+
447
  return total_cost
448
 
449
  def _avg_latency(self, nodes: list[dict]) -> float:
simulator.py CHANGED
@@ -282,12 +282,16 @@ class ClusterSimulator:
282
  node_id = action_model.target_node_id if hasattr(action_model, "target_node_id") else action_model["target_node_id"]
283
  param = action_model.parameter if hasattr(action_model, "parameter") else action_model["parameter"]
284
 
285
- # 1. Target node lookup
 
 
 
 
286
  target = next((n for n in self._nodes if n.node_id == node_id), None)
287
  if not target:
288
  return False
289
 
290
- # 2. Command implementation
291
  if at == "SCALE_UP":
292
  delta = max(1, int(param * MAX_SCALING_STEP))
293
  for _ in range(delta):
@@ -302,9 +306,19 @@ class ClusterSimulator:
302
 
303
  elif at == "SCALE_DOWN":
304
  delta = max(1, int(param * MAX_SCALING_STEP))
 
 
 
 
 
 
 
 
305
  old_capacity = target.capacity
306
- target.capacity = max(1, target.capacity - delta)
307
- return target.capacity != old_capacity # Had effect if capacity actually changed
 
 
308
 
309
  elif at == "REROUTE_TRAFFIC":
310
  # Physically offload traffic FROM the target node by proportion `param`.
 
282
  node_id = action_model.target_node_id if hasattr(action_model, "target_node_id") else action_model["target_node_id"]
283
  param = action_model.parameter if hasattr(action_model, "parameter") else action_model["parameter"]
284
 
285
+ # 1. NO_OP always succeeds regardless of target node
286
+ if at == "NO_OP":
287
+ return True
288
+
289
+ # 2. Target node lookup (required for all other actions)
290
  target = next((n for n in self._nodes if n.node_id == node_id), None)
291
  if not target:
292
  return False
293
 
294
+ # 3. Command implementation
295
  if at == "SCALE_UP":
296
  delta = max(1, int(param * MAX_SCALING_STEP))
297
  for _ in range(delta):
 
306
 
307
  elif at == "SCALE_DOWN":
308
  delta = max(1, int(param * MAX_SCALING_STEP))
309
+ # First cancel any pending capacity (cancel boot queue entries)
310
+ # This is like canceling a VM launch β€” it hasn't served traffic yet.
311
+ cancelled = 0
312
+ while cancelled < delta and target.pending_capacity_queue:
313
+ target.pending_capacity_queue.pop() # Remove newest pending first
314
+ cancelled += 1
315
+ # If still need to remove more, reduce active capacity
316
+ remaining = delta - cancelled
317
  old_capacity = target.capacity
318
+ if remaining > 0:
319
+ target.capacity = max(1, target.capacity - remaining)
320
+ # Had effect if we cancelled pending or reduced active capacity
321
+ return cancelled > 0 or target.capacity != old_capacity
322
 
323
  elif at == "REROUTE_TRAFFIC":
324
  # Physically offload traffic FROM the target node by proportion `param`.