div18 commited on
Commit
9db539d
·
1 Parent(s): e4e551f

model changes

Browse files
.env.example CHANGED
@@ -22,6 +22,9 @@ ANTIATROPOS_MIN_REPLICAS=1
22
  ANTIATROPOS_MAX_REPLICAS=
23
  ANTIATROPOS_SCALE_STEP=3
24
 
 
 
 
25
  # Node -> deployment map used by Kubernetes executor
26
  ANTIATROPOS_WORKLOAD_MAP={"node-0":{"deployment":"payments","namespace":"prod-sre"},"node-1":{"deployment":"checkout","namespace":"prod-sre"},"node-2":{"deployment":"catalog","namespace":"prod-sre"},"node-3":{"deployment":"cart","namespace":"prod-sre"},"node-4":{"deployment":"auth","namespace":"prod-sre"}}
27
 
 
22
  ANTIATROPOS_MAX_REPLICAS=
23
  ANTIATROPOS_SCALE_STEP=3
24
 
25
+ # Pod trim: auto-reset deployments to min_replicas and prune stale pods (every 30 min)
26
+ ANTIATROPOS_TRIM_INTERVAL_S=1800
27
+
28
  # Node -> deployment map used by Kubernetes executor
29
  ANTIATROPOS_WORKLOAD_MAP={"node-0":{"deployment":"payments","namespace":"prod-sre"},"node-1":{"deployment":"checkout","namespace":"prod-sre"},"node-2":{"deployment":"catalog","namespace":"prod-sre"},"node-3":{"deployment":"cart","namespace":"prod-sre"},"node-4":{"deployment":"auth","namespace":"prod-sre"}}
30
 
control/kubernetes_executor.py CHANGED
@@ -30,7 +30,7 @@ class KubernetesExecutor:
30
  self.scale_step = int(os.getenv("ANTIATROPOS_SCALE_STEP", "3"))
31
  self._apps_v1_api = None
32
  self._node_workload_map = self._load_node_workload_map()
33
- self._live_supported_actions = {"NO_OP", "SCALE_UP", "SCALE_DOWN"}
34
  self.k8s_retry_count = int(os.getenv("ANTIATROPOS_K8S_RETRY_COUNT", "2"))
35
  self.k8s_retry_backoff_s = float(os.getenv("ANTIATROPOS_K8S_RETRY_BACKOFF_S", "0.2"))
36
 
@@ -131,6 +131,12 @@ class KubernetesExecutor:
131
  if action in ("SCALE_UP", "SCALE_DOWN"):
132
  return self._scale_deployment(action, target, parameter)
133
 
 
 
 
 
 
 
134
  return f"Rejected: {action} is not enabled for live Kubernetes execution"
135
 
136
  def _mock_execution(self, action_type: str, target: str, parameter: float) -> str:
@@ -177,6 +183,135 @@ class KubernetesExecutor:
177
  f"in namespace {namespace} scaled {current}->{desired}"
178
  )
179
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
180
  def _patch_deployment_scale_with_retry(self, apps_v1, deployment_name: str, namespace: str, desired: int) -> None:
181
  """
182
  Patch deployment replicas with retries for transient API server errors.
 
30
  self.scale_step = int(os.getenv("ANTIATROPOS_SCALE_STEP", "3"))
31
  self._apps_v1_api = None
32
  self._node_workload_map = self._load_node_workload_map()
33
+ self._live_supported_actions = {"NO_OP", "SCALE_UP", "SCALE_DOWN", "REROUTE_TRAFFIC", "SHED_LOAD"}
34
  self.k8s_retry_count = int(os.getenv("ANTIATROPOS_K8S_RETRY_COUNT", "2"))
35
  self.k8s_retry_backoff_s = float(os.getenv("ANTIATROPOS_K8S_RETRY_BACKOFF_S", "0.2"))
36
 
 
131
  if action in ("SCALE_UP", "SCALE_DOWN"):
132
  return self._scale_deployment(action, target, parameter)
133
 
134
+ if action == "REROUTE_TRAFFIC":
135
+ return self._reroute_traffic(target, parameter)
136
+
137
+ if action == "SHED_LOAD":
138
+ return self._shed_load(target, parameter)
139
+
140
  return f"Rejected: {action} is not enabled for live Kubernetes execution"
141
 
142
  def _mock_execution(self, action_type: str, target: str, parameter: float) -> str:
 
183
  f"in namespace {namespace} scaled {current}->{desired}"
184
  )
185
 
186
+ def _reroute_traffic(self, target: str, parameter: float) -> str:
187
+ """
188
+ Live implementation of REROUTE_TRAFFIC.
189
+
190
+ Shifts capacity away from the target node onto healthy peers by:
191
+ 1. Scaling DOWN the target deployment by parameter * current_replicas
192
+ (min: min_replicas, so at least 1 replica remains).
193
+ 2. Distributing the shed replicas equally across all other healthy
194
+ deployments as a SCALE_UP (best-effort, capped at max_replicas).
195
+
196
+ This reuses the same patch_namespaced_deployment_scale mechanism as
197
+ SCALE_UP/SCALE_DOWN, ensuring observable cluster mutations.
198
+ """
199
+ namespace, deployment_name = self._resolve_workload_target(target)
200
+ apps_v1 = self._get_apps_v1_api()
201
+
202
+ scale_obj = apps_v1.read_namespaced_deployment_scale(
203
+ name=deployment_name,
204
+ namespace=namespace,
205
+ )
206
+ current_target = int(scale_obj.spec.replicas or self.min_replicas)
207
+
208
+ frac = min(1.0, max(0.0, float(parameter)))
209
+ delta = max(1, int(current_target * frac))
210
+ new_target = max(self.min_replicas, current_target - delta)
211
+
212
+ messages: list[str] = []
213
+
214
+ if new_target != current_target:
215
+ self._patch_deployment_scale_with_retry(
216
+ apps_v1=apps_v1,
217
+ deployment_name=deployment_name,
218
+ namespace=namespace,
219
+ desired=new_target,
220
+ )
221
+ messages.append(
222
+ f"target {deployment_name} scaled {current_target}->{new_target}"
223
+ )
224
+ else:
225
+ messages.append(
226
+ f"target {deployment_name} unchanged at {current_target} (already at min)"
227
+ )
228
+
229
+ # Redistribute shed replicas across healthy peers (best-effort)
230
+ healthy_peers = [
231
+ (peer_id, peer_info)
232
+ for peer_id, peer_info in self._node_workload_map.items()
233
+ if peer_id != target
234
+ ]
235
+
236
+ if healthy_peers and delta > 0:
237
+ peer_delta = max(1, delta // len(healthy_peers))
238
+ scaled_peers = 0
239
+ for peer_id, peer_info in healthy_peers:
240
+ peer_deployment = peer_info["deployment"]
241
+ peer_ns = peer_info.get("namespace", self.namespace)
242
+ try:
243
+ peer_scale = apps_v1.read_namespaced_deployment_scale(
244
+ name=peer_deployment, namespace=peer_ns,
245
+ )
246
+ peer_current = int(peer_scale.spec.replicas or self.min_replicas)
247
+ peer_desired = peer_current + peer_delta
248
+ if self.max_replicas is not None:
249
+ peer_desired = min(self.max_replicas, peer_desired)
250
+ if peer_desired != peer_current:
251
+ self._patch_deployment_scale_with_retry(
252
+ apps_v1=apps_v1,
253
+ deployment_name=peer_deployment,
254
+ namespace=peer_ns,
255
+ desired=peer_desired,
256
+ )
257
+ scaled_peers += 1
258
+ except Exception:
259
+ pass # best-effort for peers
260
+
261
+ if scaled_peers:
262
+ messages.append(
263
+ f"redistributed +{peer_delta} replicas to {scaled_peers} peer(s)"
264
+ )
265
+
266
+ return (
267
+ f"Ack: REROUTE_TRAFFIC for {target} (frac={frac:.2f}) - "
268
+ + "; ".join(messages)
269
+ )
270
+
271
+ def _shed_load(self, target: str, parameter: float) -> str:
272
+ """
273
+ Live implementation of SHED_LOAD.
274
+
275
+ Drops a fraction of capacity from the target node by scaling DOWN
276
+ its deployment. The shed fraction decays over time in the simulator,
277
+ but in live mode the replica reduction is permanent until the agent
278
+ explicitly scales back up.
279
+
280
+ Critical nodes (node-0, node-1, node-2) are guarded by validation
281
+ before this method is ever called.
282
+ """
283
+ namespace, deployment_name = self._resolve_workload_target(target)
284
+ apps_v1 = self._get_apps_v1_api()
285
+
286
+ scale_obj = apps_v1.read_namespaced_deployment_scale(
287
+ name=deployment_name,
288
+ namespace=namespace,
289
+ )
290
+ current = int(scale_obj.spec.replicas or self.min_replicas)
291
+
292
+ frac = min(1.0, max(0.0, float(parameter)))
293
+ delta = max(1, int(current * frac))
294
+ desired = max(self.min_replicas, current - delta)
295
+
296
+ if desired == current:
297
+ return (
298
+ f"Ack: SHED_LOAD for {target} - replicas unchanged at {current} "
299
+ f"(already at min_replicas={self.min_replicas})"
300
+ )
301
+
302
+ self._patch_deployment_scale_with_retry(
303
+ apps_v1=apps_v1,
304
+ deployment_name=deployment_name,
305
+ namespace=namespace,
306
+ desired=desired,
307
+ )
308
+
309
+ return (
310
+ f"Ack: SHED_LOAD for {target} - deployment {deployment_name} "
311
+ f"in namespace {namespace} scaled {current}->{desired} "
312
+ f"(shed {delta} replicas, frac={frac:.2f})"
313
+ )
314
+
315
  def _patch_deployment_scale_with_retry(self, apps_v1, deployment_name: str, namespace: str, desired: int) -> None:
316
  """
317
  Patch deployment replicas with retries for transient API server errors.
deploy/LOCAL_LAPTOP_FASTAPI_GUIDE.md CHANGED
@@ -50,7 +50,7 @@ Check:
50
  ## 5) Let your agent execute actions
51
 
52
  The server accepts `POST /step` with:
53
- - `action_type`: `NO_OP` | `SCALE_UP` | `SCALE_DOWN`
54
  - `target_node_id`: `node-*`
55
  - `parameter`: float
56
 
 
50
  ## 5) Let your agent execute actions
51
 
52
  The server accepts `POST /step` with:
53
+ - `action_type`: `NO_OP` | `SCALE_UP` | `SCALE_DOWN` | `REROUTE_TRAFFIC` | `SHED_LOAD`
54
  - `target_node_id`: `node-*`
55
  - `parameter`: float
56
 
deploy/aws/ARCHITECTURE.md CHANGED
@@ -106,8 +106,8 @@ Every "tick" (one step of the simulation), the agent goes through this cycle:
106
  |---|---|---|
107
  | `SCALE_UP` | "node-0 needs more capacity" | `KubernetesExecutor` patches `payments` Deployment: `replicas: 2 -> 5` |
108
  | `SCALE_DOWN` | "node-3 is over-provisioned" | `KubernetesExecutor` patches `cart` Deployment: `replicas: 4 -> 1` |
109
- | `REROUTE_TRAFFIC` | "Move traffic away from node-2" | Currently simulation-only (no live K8s ingress patching) |
110
- | `SHED_LOAD` | "Drop 50% of traffic to node-3" | Currently simulation-only (no live K8s traffic shaping) |
111
  | `NO_OP` | "Do nothing this tick" | Nothing changes on EKS |
112
 
113
  ### The SCALE_UP Flow in Detail
 
106
  |---|---|---|
107
  | `SCALE_UP` | "node-0 needs more capacity" | `KubernetesExecutor` patches `payments` Deployment: `replicas: 2 -> 5` |
108
  | `SCALE_DOWN` | "node-3 is over-provisioned" | `KubernetesExecutor` patches `cart` Deployment: `replicas: 4 -> 1` |
109
+ | `REROUTE_TRAFFIC` | "Move traffic away from node-2" | `KubernetesExecutor` scales DOWN target deployment and redistributes replicas to healthy peer deployments |
110
+ | `SHED_LOAD` | "Drop 50% of traffic to node-3" | `KubernetesExecutor` scales DOWN target deployment by `parameter * current_replicas` |
111
  | `NO_OP` | "Do nothing this tick" | Nothing changes on EKS |
112
 
113
  ### The SCALE_UP Flow in Detail
deploy/do/antiatropos-pod-trim.sh ADDED
@@ -0,0 +1,44 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env bash
2
+ # antiatropos-pod-trim.sh
3
+ # Resets all prod-sre deployments to their minimum replica count
4
+ # AND deletes completed/failed/evicted pods to prevent accumulation.
5
+ # Installed as a cron job to prevent pod stacking across episodes.
6
+ set -euo pipefail
7
+
8
+ KUBECONFIG="${KUBECONFIG:-/etc/rancher/k3s/k3s.yaml}"
9
+ export KUBECONFIG
10
+ NAMESPACE="${1:-prod-sre}"
11
+ MIN_REPLICAS="${2:-1}"
12
+
13
+ trimmed=0
14
+ while IFS= read -r deploy; do
15
+ current=$(kubectl get deploy "$deploy" -n "$NAMESPACE" -o jsonpath='{.spec.replicas}' 2>/dev/null || echo "0")
16
+ if [[ "$current" -gt "$MIN_REPLICAS" ]]; then
17
+ kubectl scale deploy "$deploy" -n "$NAMESPACE" --replicas="$MIN_REPLICAS" >/dev/null 2>&1
18
+ trimmed=$((trimmed + 1))
19
+ fi
20
+ done < <(kubectl get deploy -n "$NAMESPACE" -o jsonpath='{.items[*].metadata.name}' 2>/dev/null)
21
+
22
+ # Delete completed (Succeeded), failed, and evicted pods across the namespace.
23
+ # These accumulate across episodes and can exhaust node resources
24
+ # even after deployments are scaled back down.
25
+ deleted=0
26
+ for phase in Succeeded Failed; do
27
+ while IFS= read -r pod; do
28
+ [[ -z "$pod" ]] && continue
29
+ kubectl delete pod "$pod" -n "$NAMESPACE" --force --grace-period=0 >/dev/null 2>&1 && deleted=$((deleted + 1))
30
+ done < <(kubectl get pods -n "$NAMESPACE" --field-selector=status.phase=$phase -o jsonpath='{.items[*].metadata.name}' 2>/dev/null)
31
+ done
32
+
33
+ # Also nuke evicted pods (reason=Evicted, phase=Failed is often covered
34
+ # above, but some k3s versions keep evicted pods in a weird state).
35
+ while IFS= read -r pod; do
36
+ [[ -z "$pod" ]] && continue
37
+ kubectl delete pod "$pod" -n "$NAMESPACE" --force --grace-period=0 >/dev/null 2>&1 && deleted=$((deleted + 1))
38
+ done < <(kubectl get pods -n "$NAMESPACE" -o json | \
39
+ grep -l '"reason": "Evicted"' >/dev/null 2>&1 && \
40
+ kubectl get pods -n "$NAMESPACE" -o jsonpath='{range .items[?(@.status.reason=="Evicted")]}{.metadata.name}{"\n"}{end}' 2>/dev/null || true)
41
+
42
+ if [[ "$trimmed" -gt 0 || "$deleted" -gt 0 ]]; then
43
+ echo "$(date -Iseconds) Trimmed $trimmed deployments to $MIN_REPLICAS replicas, deleted $deleted stale pods in $NAMESPACE"
44
+ fi
deploy/do/deploy-droplet-one-shot.sh CHANGED
@@ -103,6 +103,7 @@ ANTIATROPOS_K8S_NAMESPACE=prod-sre
103
  ANTIATROPOS_MIN_REPLICAS=${MIN_REPLICAS}
104
  ANTIATROPOS_MAX_REPLICAS=${MAX_REPLICAS}
105
  ANTIATROPOS_SCALE_STEP=${SCALE_STEP}
 
106
  ANTIATROPOS_WORKLOAD_MAP=${WORKLOAD_MAP}
107
  EOF
108
  echo "Created ${ENV_FILE}"
@@ -149,6 +150,18 @@ EOF
149
  systemctl daemon-reload
150
  systemctl enable --now antiatropos-control
151
 
 
 
 
 
 
 
 
 
 
 
 
 
152
  echo ""
153
  echo "Waiting for control API readiness..."
154
  for _ in {1..30}; do
 
103
  ANTIATROPOS_MIN_REPLICAS=${MIN_REPLICAS}
104
  ANTIATROPOS_MAX_REPLICAS=${MAX_REPLICAS}
105
  ANTIATROPOS_SCALE_STEP=${SCALE_STEP}
106
+ ANTIATROPOS_TRIM_INTERVAL_S=1800
107
  ANTIATROPOS_WORKLOAD_MAP=${WORKLOAD_MAP}
108
  EOF
109
  echo "Created ${ENV_FILE}"
 
150
  systemctl daemon-reload
151
  systemctl enable --now antiatropos-control
152
 
153
+ # --- Pod trim cron: resets prod-sre deployments to min replicas every 30 min ---
154
+ TRIM_SCRIPT="/usr/local/bin/antiatropos-pod-trim.sh"
155
+ if [[ -f "${REPO_DIR}/deploy/do/antiatropos-pod-trim.sh" ]]; then
156
+ cp "${REPO_DIR}/deploy/do/antiatropos-pod-trim.sh" "${TRIM_SCRIPT}"
157
+ chmod +x "${TRIM_SCRIPT}"
158
+ (crontab -l 2>/dev/null | grep -v 'antiatropos-pod-trim'; echo "*/30 * * * * KUBECONFIG=${KUBECONFIG_PATH} ${TRIM_SCRIPT} ${K8S_NAMESPACE} ${MIN_REPLICAS} >> /var/log/antiatropos-trim.log 2>&1") | crontab -
159
+ echo "Pod trim cron installed: every 30 min, resets ${K8S_NAMESPACE} deployments to ${MIN_REPLICAS} replicas + prunes stale pods"
160
+ echo " Log: /var/log/antiatropos-trim.log"
161
+ else
162
+ echo "WARNING: antiatropos-pod-trim.sh not found; skipping cron setup"
163
+ fi
164
+
165
  echo ""
166
  echo "Waiting for control API readiness..."
167
  for _ in {1..30}; do
inference.py CHANGED
@@ -52,7 +52,7 @@ MAX_TOKENS = int(os.getenv("ANTIATROPOS_MAX_TOKENS", "180"))
52
  SEED = int(os.getenv("ANTIATROPOS_SEED", "42"))
53
  SUCCESS_SCORE_THRESHOLD = float(os.getenv("ANTIATROPOS_SUCCESS_THRESHOLD", "0.55"))
54
  EVAL_RUNS = int(os.getenv("ANTIATROPOS_EVAL_RUNS", "3")) # Num eval runs per task
55
- TEMPERATURE_SWEEP = [0.0, 0.3, 0.7] # Fixed temperatures for multi-episode eval
56
 
57
  TASK_BRIEFS: Dict[str, str] = {
58
  "task-1": "Traffic increases linearly. Scale proactively to keep latency low and cost efficient.",
@@ -62,7 +62,9 @@ TASK_BRIEFS: Dict[str, str] = {
62
 
63
  SYSTEM_PROMPT = textwrap.dedent(
64
  """
65
- You are an autonomous SRE controller managing a ten-node microservice cluster.
 
 
66
 
67
  Return exactly one JSON object:
68
  {
@@ -178,6 +180,14 @@ def build_user_prompt(task_id: str, step: int, obs: dict, history: List[str], de
178
 
179
 
180
  def observation_for_model(obs) -> dict:
 
 
 
 
 
 
 
 
181
  return {
182
  "task_id": obs.task_id,
183
  "mode": getattr(obs.mode, "value", str(obs.mode)),
@@ -197,8 +207,6 @@ def observation_for_model(obs) -> dict:
197
  {
198
  "node_id": node.node_id,
199
  "status": getattr(node.status, "value", str(node.status)),
200
- "is_vip": node.is_vip,
201
- "importance_weight": node.importance_weight,
202
  "queue_depth": node.queue_depth,
203
  "latency_ms": node.latency_ms,
204
  "incoming_request_rate": node.incoming_request_rate,
 
52
  SEED = int(os.getenv("ANTIATROPOS_SEED", "42"))
53
  SUCCESS_SCORE_THRESHOLD = float(os.getenv("ANTIATROPOS_SUCCESS_THRESHOLD", "0.55"))
54
  EVAL_RUNS = int(os.getenv("ANTIATROPOS_EVAL_RUNS", "3")) # Num eval runs per task
55
+ TEMPERATURE_SWEEP = [0.7, 0.3, 0.7] # Fixed temperatures for multi-episode eval
56
 
57
  TASK_BRIEFS: Dict[str, str] = {
58
  "task-1": "Traffic increases linearly. Scale proactively to keep latency low and cost efficient.",
 
62
 
63
  SYSTEM_PROMPT = textwrap.dedent(
64
  """
65
+ You are an autonomous SRE controller managing a five-node microservice cluster.
66
+ node-0 is the payment gateway (higher business priority, receives 2x reward weight).
67
+ Balance protection of node-0 with the health of all other nodes — do not ignore nodes 1-4.
68
 
69
  Return exactly one JSON object:
70
  {
 
180
 
181
 
182
  def observation_for_model(obs) -> dict:
183
+ """
184
+ Build a compact observation dict for the LLM.
185
+
186
+ IMPORTANT: is_vip and importance_weight are deliberately EXCLUDED.
187
+ The LLM must learn which nodes matter from rewards alone, not from
188
+ explicit bias signals in the observation. Including these fields
189
+ caused the model to fixate on node-0 and ignore nodes 1-4.
190
+ """
191
  return {
192
  "task_id": obs.task_id,
193
  "mode": getattr(obs.mode, "value", str(obs.mode)),
 
207
  {
208
  "node_id": node.node_id,
209
  "status": getattr(node.status, "value", str(node.status)),
 
 
210
  "queue_depth": node.queue_depth,
211
  "latency_ms": node.latency_ms,
212
  "incoming_request_rate": node.incoming_request_rate,
server/local_laptop_control.py CHANGED
@@ -3,7 +3,7 @@ Lightweight FastAPI control plane for local laptop Kubernetes testing.
3
 
4
  Purpose:
5
  - Accept simple SRE actions over HTTP
6
- - Execute SCALE_UP / SCALE_DOWN / NO_OP against local deployments
7
  - Keep a minimal in-memory action history for debugging
8
 
9
  Run:
@@ -12,12 +12,16 @@ Run:
12
 
13
  from __future__ import annotations
14
 
 
 
15
  from datetime import datetime, timezone
16
  from typing import Any
17
 
18
  from fastapi import FastAPI, HTTPException
19
  from pydantic import BaseModel, Field
20
 
 
 
21
  try:
22
  from ..control import KubernetesExecutor
23
  except (ImportError, ModuleNotFoundError):
@@ -25,7 +29,7 @@ except (ImportError, ModuleNotFoundError):
25
 
26
 
27
  class ActionRequest(BaseModel):
28
- action_type: str = Field(description="NO_OP | SCALE_UP | SCALE_DOWN")
29
  target_node_id: str = Field(description="node-0 .. node-9")
30
  parameter: float = Field(default=0.0, ge=0.0, le=10.0)
31
 
@@ -50,9 +54,111 @@ STATE: dict[str, Any] = {
50
  "step_count": 0,
51
  "last_action": None,
52
  "history": [],
 
53
  }
54
 
55
- _ALLOWED_ACTIONS = {"NO_OP", "SCALE_UP", "SCALE_DOWN"}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
56
 
57
 
58
  def _now_utc_iso() -> str:
@@ -68,6 +174,7 @@ def health() -> dict[str, Any]:
68
  "kubeconfig": executor.kubeconfig,
69
  "mapped_targets": sorted(list(executor._node_workload_map.keys())),
70
  "allowed_actions": sorted(list(_ALLOWED_ACTIONS)),
 
71
  }
72
 
73
 
@@ -85,10 +192,34 @@ def state() -> dict[str, Any]:
85
  "step_count": STATE["step_count"],
86
  "last_action": STATE["last_action"],
87
  "history_size": len(STATE["history"]),
 
88
  "is_mock": executor.is_mock,
89
  }
90
 
91
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
92
  @app.post("/step", response_model=ActionResponse)
93
  def step(action: ActionRequest) -> ActionResponse:
94
  if executor.is_mock:
 
3
 
4
  Purpose:
5
  - Accept simple SRE actions over HTTP
6
+ - Execute SCALE_UP / SCALE_DOWN / REROUTE_TRAFFIC / SHED_LOAD / NO_OP against local deployments
7
  - Keep a minimal in-memory action history for debugging
8
 
9
  Run:
 
12
 
13
  from __future__ import annotations
14
 
15
+ import subprocess
16
+ import threading
17
  from datetime import datetime, timezone
18
  from typing import Any
19
 
20
  from fastapi import FastAPI, HTTPException
21
  from pydantic import BaseModel, Field
22
 
23
+ import os
24
+
25
  try:
26
  from ..control import KubernetesExecutor
27
  except (ImportError, ModuleNotFoundError):
 
29
 
30
 
31
  class ActionRequest(BaseModel):
32
+ action_type: str = Field(description="NO_OP | SCALE_UP | SCALE_DOWN | REROUTE_TRAFFIC | SHED_LOAD")
33
  target_node_id: str = Field(description="node-0 .. node-9")
34
  parameter: float = Field(default=0.0, ge=0.0, le=10.0)
35
 
 
54
  "step_count": 0,
55
  "last_action": None,
56
  "history": [],
57
+ "last_trim": None,
58
  }
59
 
60
+ _ALLOWED_ACTIONS = {"NO_OP", "SCALE_UP", "SCALE_DOWN", "REROUTE_TRAFFIC", "SHED_LOAD"}
61
+
62
+ # Background trim interval (seconds). Default 30 minutes.
63
+ TRIM_INTERVAL_S = int(os.getenv("ANTIATROPOS_TRIM_INTERVAL_S", "1800"))
64
+
65
+
66
+ def _run_kubectl_trim() -> dict[str, Any]:
67
+ """
68
+ Run the pod-trim logic inline via kubectl subprocess calls.
69
+
70
+ Scales every deployment in the namespace back to min_replicas
71
+ and force-deletes completed / failed / evicted pods.
72
+ Returns a summary dict.
73
+ """
74
+ ns = executor.namespace
75
+ min_r = executor.min_replicas
76
+ kubeconfig = executor.kubeconfig
77
+ result: dict[str, Any] = {
78
+ "namespace": ns,
79
+ "min_replicas": min_r,
80
+ "deployments_scaled": 0,
81
+ "pods_deleted": 0,
82
+ "errors": [],
83
+ }
84
+
85
+ def _kubectl(args: list[str]) -> str:
86
+ env = None
87
+ if kubeconfig and kubeconfig.lower() not in ("mock", ""):
88
+ import os as _os
89
+ env = {**_os.environ, "KUBECONFIG": kubeconfig}
90
+ try:
91
+ proc = subprocess.run(
92
+ ["kubectl"] + args,
93
+ capture_output=True,
94
+ text=True,
95
+ timeout=30,
96
+ env=env,
97
+ )
98
+ return proc.stdout.strip()
99
+ except Exception as exc:
100
+ result["errors"].append(str(exc))
101
+ return ""
102
+
103
+ # Scale deployments back to min_replicas
104
+ deploys = _kubectl(["get", "deploy", "-n", ns, "-o", "jsonpath={.items[*].metadata.name}"])
105
+ for name in deploys.split():
106
+ if not name:
107
+ continue
108
+ cur = _kubectl(["get", "deploy", name, "-n", ns, "-o", "jsonpath={.spec.replicas}"])
109
+ try:
110
+ cur_r = int(cur)
111
+ except ValueError:
112
+ continue
113
+ if cur_r > min_r:
114
+ _kubectl(["scale", "deploy", name, "-n", ns, "--replicas", str(min_r)])
115
+ result["deployments_scaled"] += 1
116
+
117
+ # Delete completed and failed pods
118
+ for phase in ("Succeeded", "Failed"):
119
+ pods = _kubectl([
120
+ "get", "pods", "-n", ns,
121
+ "--field-selector", f"status.phase={phase}",
122
+ "-o", "jsonpath={.items[*].metadata.name}",
123
+ ])
124
+ for pod in pods.split():
125
+ if not pod:
126
+ continue
127
+ _kubectl(["delete", "pod", pod, "-n", ns, "--force", "--grace-period=0"])
128
+ result["pods_deleted"] += 1
129
+
130
+ # Delete evicted pods (some k3s versions don't surface these as Failed)
131
+ evicted = _kubectl([
132
+ "get", "pods", "-n", ns, "-o",
133
+ 'jsonpath={range .items[?(@.status.reason=="Evicted")]}{.metadata.name}{" "}{end}',
134
+ ])
135
+ for pod in evicted.split():
136
+ if not pod:
137
+ continue
138
+ _kubectl(["delete", "pod", pod, "-n", ns, "--force", "--grace-period=0"])
139
+ result["pods_deleted"] += 1
140
+
141
+ return result
142
+
143
+
144
+ def _periodic_trim() -> None:
145
+ """Background thread: trim pods every TRIM_INTERVAL_S seconds."""
146
+ import time as _time
147
+ while True:
148
+ _time.sleep(TRIM_INTERVAL_S)
149
+ try:
150
+ if not executor.is_mock:
151
+ _run_kubectl_trim()
152
+ except Exception:
153
+ pass # best-effort; next cycle will retry
154
+
155
+
156
+ @app.on_event("startup")
157
+ def _start_trim_thread() -> None:
158
+ """Start the background pod-trim thread on FastAPI startup."""
159
+ if not executor.is_mock:
160
+ t = threading.Thread(target=_periodic_trim, daemon=True, name="pod-trim")
161
+ t.start()
162
 
163
 
164
  def _now_utc_iso() -> str:
 
174
  "kubeconfig": executor.kubeconfig,
175
  "mapped_targets": sorted(list(executor._node_workload_map.keys())),
176
  "allowed_actions": sorted(list(_ALLOWED_ACTIONS)),
177
+ "trim_interval_s": TRIM_INTERVAL_S if not executor.is_mock else None,
178
  }
179
 
180
 
 
192
  "step_count": STATE["step_count"],
193
  "last_action": STATE["last_action"],
194
  "history_size": len(STATE["history"]),
195
+ "last_trim": STATE["last_trim"],
196
  "is_mock": executor.is_mock,
197
  }
198
 
199
 
200
+ @app.post("/trim")
201
+ def trim() -> dict[str, Any]:
202
+ """
203
+ On-demand pod trim: scale all deployments to min_replicas
204
+ and delete completed / failed / evicted pods.
205
+ """
206
+ if executor.is_mock:
207
+ raise HTTPException(
208
+ status_code=400,
209
+ detail="KubernetesExecutor is in mock mode. Set KUBECONFIG to enable trimming.",
210
+ )
211
+ try:
212
+ result = _run_kubectl_trim()
213
+ except Exception as exc:
214
+ raise HTTPException(status_code=500, detail=f"Trim failed: {exc}") from exc
215
+
216
+ STATE["last_trim"] = {
217
+ **result,
218
+ "timestamp_utc": _now_utc_iso(),
219
+ }
220
+ return STATE["last_trim"]
221
+
222
+
223
  @app.post("/step", response_model=ActionResponse)
224
  def step(action: ActionRequest) -> ActionResponse:
225
  if executor.is_mock: