div18 commited on
Commit
3550e4d
·
1 Parent(s): 630f735
Files changed (3) hide show
  1. inference.py +45 -12
  2. server/AntiAtropos_environment.py +4 -0
  3. simulator.py +1 -1
inference.py CHANGED
@@ -1,3 +1,4 @@
 
1
  import asyncio
2
  import inspect
3
  import json
@@ -56,19 +57,26 @@ TEMPERATURE_SWEEP = [0.6, 0.3, 0.7] # Fixed temperatures for multi-episode eval
56
 
57
  TASK_BRIEFS: Dict[str, str] = {
58
  "task-1": "Traffic ramps linearly every tick. Scale up proactively — new capacity takes 5 ticks to boot. Keep latency under SLA (200ms) while minimizing cost. Scale down when queues are safe.",
59
- "task-2": "A node fails randomly. Detect quickly and recover with reroute/scale actions.",
60
- "task-3": "Protect VIP node-0 under surges. Keep VIP healthy without invalid actions.",
61
  }
62
 
63
  SYSTEM_PROMPT = textwrap.dedent(
64
  """
65
  You are an autonomous SRE controller managing a five-node microservice cluster.
66
 
 
 
 
 
 
 
 
67
  ACTIONS (new capacity takes 5 ticks to boot):
68
- SCALE_UP <node> <amount:0-1> — add capacity, clears DEGRADED status
69
- SCALE_DOWN <node> <amount:0-1> — remove capacity (min 1 unit)
70
- REROUTE_TRAFFIC <node> <fraction:0-1> — offload traffic to healthy peers
71
- SHED_LOAD <node> <fraction:0-1> — drop incoming traffic (NOT on critical nodes)
72
  NO_OP — do nothing
73
 
74
  REWARD PRIORITIES (in order):
@@ -213,11 +221,25 @@ def observation_for_model(obs) -> dict:
213
 
214
  The scalar reward for past steps is already in the history (correct).
215
  """
 
 
 
 
 
 
 
 
 
 
 
 
216
  return {
217
  "task_id": obs.task_id,
218
  "mode": getattr(obs.mode, "value", str(obs.mode)),
219
  "step": obs.step,
220
  "max_steps": obs.max_steps,
 
 
221
  "average_latency_ms": obs.average_latency_ms,
222
  "error_rate": obs.error_rate,
223
  "total_queue_backlog": obs.total_queue_backlog,
@@ -276,9 +298,9 @@ async def get_model_action(client: AsyncOpenAI, task_id: str, step: int, obs: di
276
  ],
277
  temperature=TEMPERATURE,
278
  max_tokens=MAX_TOKENS,
 
279
  response_format={"type": "json_object"},
280
  timeout=MODEL_TIMEOUT_S,
281
- seed=SEED,
282
  )
283
  content = completion.choices[0].message.content or ""
284
  if not content.strip():
@@ -372,12 +394,15 @@ async def run_single_task(env: AntiAtroposEnv, client: AsyncOpenAI, task_id: str
372
  }
373
 
374
 
375
- async def run_all_tasks() -> None:
376
  _seed_everything(SEED)
377
  all_tasks = ["task-1", "task-2", "task-3"]
378
- run_single = os.getenv("ANTIATROPOS_RUN_SINGLE_TASK", "false").lower() == "true"
379
- task_id = TASK_NAME if TASK_NAME in set(all_tasks) else "task-1"
380
- tasks_to_run = [task_id] if run_single else all_tasks
 
 
 
381
  if not API_KEY:
382
  raise RuntimeError("Missing API key (API_KEY/HF_TOKEN/OPENAI_API_KEY).")
383
 
@@ -439,7 +464,15 @@ async def run_all_tasks() -> None:
439
 
440
 
441
  def main() -> None:
442
- asyncio.run(run_all_tasks())
 
 
 
 
 
 
 
 
443
 
444
 
445
  if __name__ == "__main__":
 
1
+ import argparse
2
  import asyncio
3
  import inspect
4
  import json
 
57
 
58
  TASK_BRIEFS: Dict[str, str] = {
59
  "task-1": "Traffic ramps linearly every tick. Scale up proactively — new capacity takes 5 ticks to boot. Keep latency under SLA (200ms) while minimizing cost. Scale down when queues are safe.",
60
+ "task-2": "One node will fail permanently (any of node-1 through node-4, never node-0). STEP 1: scan all nodes and find which has status=FAILED and outflow=0. STEP 2: if the failed node has children, scale those children up (they are starved). STEP 3: reroute traffic from THE FAILED NODE (not its parent!) to healthy peers. If node-4 failed (independent), scale up node-0 to compensate.",
61
+ "task-3": "Periodic surge hits node-1 and node-2 as a side-channel. node-0 is the primary ingress — never starve it below 3 units. Scale nodes 1/2 to absorb the burst (2-4 times max), then watch: if queues drop, STOP scaling. Over-provisioning wastes cost and starves other nodes. Reroute from the surge nodes if scaling alone doesn't help.",
62
  }
63
 
64
  SYSTEM_PROMPT = textwrap.dedent(
65
  """
66
  You are an autonomous SRE controller managing a five-node microservice cluster.
67
 
68
+ CLUSTER TOPOLOGY (traffic flows parent → children):
69
+ node-0 → node-1, node-2
70
+ node-2 → node-3
71
+ node-4 (independent ingress)
72
+ FAILED nodes have outflow=0 — their children are starved.
73
+ Backpressure: overloaded children reduce parent capacity.
74
+
75
  ACTIONS (new capacity takes 5 ticks to boot):
76
+ SCALE_UP <node> <amount> — add capacity (0.3-0.5 normal, 0.6-0.8 heavy surge), clears DEGRADED
77
+ SCALE_DOWN <node> <amount> — remove capacity (0.2-0.4 safe, 0.5-0.7 aggressive)
78
+ REROUTE_TRAFFIC <node> <fraction> — reduce THIS node capacity, redistribute to peers (0.3-0.5)
79
+ SHED_LOAD <node> <fraction> — drop incoming traffic (0.3-0.5), NOT on critical nodes
80
  NO_OP — do nothing
81
 
82
  REWARD PRIORITIES (in order):
 
221
 
222
  The scalar reward for past steps is already in the history (correct).
223
  """
224
+ # Derive summary lists from per-node status fields (feature engineering,
225
+ # same principle as total_queue_backlog — the agent could compute these
226
+ # from raw per-node data but having them pre-computed speeds up reasoning).
227
+ failed_nodes = []
228
+ degraded_nodes = []
229
+ for node in obs.nodes:
230
+ s = str(getattr(node.status, "value", str(node.status)))
231
+ if s == "failed":
232
+ failed_nodes.append(node.node_id)
233
+ elif s == "degraded":
234
+ degraded_nodes.append(node.node_id)
235
+
236
  return {
237
  "task_id": obs.task_id,
238
  "mode": getattr(obs.mode, "value", str(obs.mode)),
239
  "step": obs.step,
240
  "max_steps": obs.max_steps,
241
+ "failed_nodes": failed_nodes,
242
+ "degraded_nodes": degraded_nodes,
243
  "average_latency_ms": obs.average_latency_ms,
244
  "error_rate": obs.error_rate,
245
  "total_queue_backlog": obs.total_queue_backlog,
 
298
  ],
299
  temperature=TEMPERATURE,
300
  max_tokens=MAX_TOKENS,
301
+ presence_penalty=0.3,
302
  response_format={"type": "json_object"},
303
  timeout=MODEL_TIMEOUT_S,
 
304
  )
305
  content = completion.choices[0].message.content or ""
306
  if not content.strip():
 
394
  }
395
 
396
 
397
+ async def run_all_tasks(overrides: Optional[argparse.Namespace] = None) -> None:
398
  _seed_everything(SEED)
399
  all_tasks = ["task-1", "task-2", "task-3"]
400
+ if overrides and overrides.task != "all":
401
+ tasks_to_run = [overrides.task]
402
+ else:
403
+ run_single = os.getenv("ANTIATROPOS_RUN_SINGLE_TASK", "false").lower() == "true"
404
+ task_id = TASK_NAME if TASK_NAME in set(all_tasks) else "task-1"
405
+ tasks_to_run = [task_id] if run_single else all_tasks
406
  if not API_KEY:
407
  raise RuntimeError("Missing API key (API_KEY/HF_TOKEN/OPENAI_API_KEY).")
408
 
 
464
 
465
 
466
  def main() -> None:
467
+ parser = argparse.ArgumentParser(description="AntiAtropos SRE inference")
468
+ parser.add_argument(
469
+ "--task", "-t",
470
+ choices=["task-1", "task-2", "task-3", "all"],
471
+ default="all",
472
+ help="Run a specific task or all tasks (default: all)",
473
+ )
474
+ args = parser.parse_args()
475
+ asyncio.run(run_all_tasks(overrides=args))
476
 
477
 
478
  if __name__ == "__main__":
server/AntiAtropos_environment.py CHANGED
@@ -49,6 +49,10 @@ ALPHA: float = 0.002 # Weight on Lyapunov energy drift DeltaV(s)
49
  BETA: float = 0.3 # Weight on infrastructure cost (increased so cost signal is visible)
50
  GAMMA: float = 6.0 # Weight on per-step SLA violation indicator (dominant but not overwhelming)
51
  DELTA: float = 0.1 # Weight on control-barrier function penalty (queue safety zone)
 
 
 
 
52
 
53
  MAX_QUEUE_NORM = 200.0
54
  MAX_LATENCY_NORM = 1000.0
 
49
  BETA: float = 0.3 # Weight on infrastructure cost (increased so cost signal is visible)
50
  GAMMA: float = 6.0 # Weight on per-step SLA violation indicator (dominant but not overwhelming)
51
  DELTA: float = 0.1 # Weight on control-barrier function penalty (queue safety zone)
52
+ ZETA: float = 0.01 # Weight on action-repetition penalty (discourages degenerate fixations)
53
+
54
+ # How many repeated identical actions before the quadratic penalty kicks in hard
55
+ REPETITION_SOFT_LIMIT = 3
56
 
57
  MAX_QUEUE_NORM = 200.0
58
  MAX_LATENCY_NORM = 1000.0
simulator.py CHANGED
@@ -58,7 +58,7 @@ T3_SURGE_CYCLE: int = 60 # Cycle length (ticks)
58
  T3_SURGE_BASE_START: int = 30 # Nominal start of surge within cycle
59
  T3_SURGE_BASE_END: int = 40 # Nominal end of surge within cycle
60
  T3_SURGE_JITTER: int = 10 # ±jitter applied to start/end each episode
61
- T3_SURGE_MAGNITUDE: float = 140.0 # Extra req/tick added to node-1 and node-2
62
 
63
  # Hardening: Critical infrastructure that CANNOT be shed
64
  # In Task 3, these receive the surge. Forcing the agent to SCALE.
 
58
  T3_SURGE_BASE_START: int = 30 # Nominal start of surge within cycle
59
  T3_SURGE_BASE_END: int = 40 # Nominal end of surge within cycle
60
  T3_SURGE_JITTER: int = 10 # ±jitter applied to start/end each episode
61
+ T3_SURGE_MAGNITUDE: float = 60.0 # Extra req/tick added to node-1 and node-2 (4 SCALE_UPs at delta=1 covers it: 5x15=75 = 15 DAG + 60 surge)
62
 
63
  # Hardening: Critical infrastructure that CANNOT be shed
64
  # In Task 3, these receive the surge. Forcing the agent to SCALE.