Sandeep Suresh commited on
Commit
b578a5d
·
1 Parent(s): 7796d67

feat: Refactor random number generation to use NumPy and enhance simulation logic

Browse files
.gitignore CHANGED
@@ -10,6 +10,7 @@ opencode.json
10
  venv/
11
  ENV/
12
  .env/
 
13
  .env.*
14
  .env
15
 
 
10
  venv/
11
  ENV/
12
  .env/
13
+ .env
14
  .env.*
15
  .env
16
 
pyproject.toml CHANGED
@@ -42,4 +42,8 @@ server = "COEnv.server.app:main"
42
  [tool.setuptools]
43
  include-package-data = true
44
  packages = ["COEnv", "COEnv.server"]
45
- package-dir = { "COEnv" = ".", "COEnv.server" = "server" }
 
 
 
 
 
42
  [tool.setuptools]
43
  include-package-data = true
44
  packages = ["COEnv", "COEnv.server"]
45
+ package-dir = { "COEnv" = ".", "COEnv.server" = "server" }
46
+
47
+ [tool.pytest.ini_options]
48
+ pythonpath = ["."]
49
+ testpaths = ["tests"]
requirements.txt CHANGED
@@ -1,4 +1,5 @@
1
- fastapi>=0.100.0
2
- uvicorn>=0.23.0
3
- pydantic>=2.0.0
4
- requests>=2.31.0
 
 
1
+ fastapi[standard]
2
+ uvicorn
3
+ pydantic
4
+ numpy
5
+ openenv-core
server/COEnv_environment.py CHANGED
@@ -7,7 +7,7 @@ This is the brain of the whole project.
7
 
8
  from typing import Dict, List, Any, Optional, Literal
9
  from datetime import datetime
10
- import random
11
  import time
12
 
13
  from .models import (
@@ -20,12 +20,19 @@ from .models import (
20
  class World:
21
  """In-memory Kubernetes cluster simulator"""
22
 
23
- def __init__(self, config: Dict[str, Any]):
24
  self.config = config
 
 
25
  self.cluster_state = self._initialize_healthy_cluster()
26
  self.step_count = 0
27
  self.events = []
28
  self._event_counter = 0
 
 
 
 
 
29
 
30
  def _initialize_healthy_cluster(self) -> Dict[str, List[Dict]]:
31
  """Initialize a healthy cluster state based on config"""
@@ -67,7 +74,7 @@ class World:
67
 
68
  # Create pods for this deployment
69
  for j in range(dep["replicas"]):
70
- pod_name = f"{dep['name']}-{random.randint(1000, 9999)}-{''.join([chr(random.randint(97, 122)) for _ in range(5)])}"
71
  pods.append({
72
  "name": pod_name,
73
  "status": "Running",
@@ -140,9 +147,28 @@ class World:
140
 
141
  def get_pods(self, namespace: Optional[str] = None, selector: Optional[Dict[str, str]] = None) -> List[PodStatus]:
142
  """Returns filtered pod list (mimics kubectl get pods)"""
143
- pods = [PodStatus(**pod) for pod in self.cluster_state["pods"]]
144
- # Simple filtering by namespace (not fully implemented - just returns all for now)
145
- return pods
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
146
 
147
  def get_nodes(self) -> List[NodeStatus]:
148
  """Get all nodes as Pydantic models"""
@@ -230,7 +256,7 @@ class World:
230
  for i in range(desired_replicas - current_count):
231
  deployment = next((d for d in self.cluster_state["deployments"] if d["name"] == deployment_name), None)
232
  if deployment:
233
- pod_name = f"{deployment_name}-{random.randint(1000, 9999)}-{''.join([chr(random.randint(97, 122)) for _ in range(5)])}"
234
  node = nodes[i % len(nodes)] if nodes else None
235
  self.cluster_state["pods"].append({
236
  "name": pod_name,
@@ -266,7 +292,7 @@ class World:
266
 
267
  event_type: Literal["Normal"] = "Normal" # type: ignore
268
  event = ClusterEvent(
269
- event_id=f"event-delpod-{random.randint(1000, 9999)}",
270
  timestamp=datetime.now().isoformat(),
271
  type=event_type,
272
  reason="UserDeleted",
@@ -286,7 +312,7 @@ class World:
286
  for pod in pods_to_delete:
287
  event_type: Literal["Normal"] = "Normal" # type: ignore
288
  event = ClusterEvent(
289
- event_id=f"event-restart-{random.randint(1000, 9999)}",
290
  timestamp=datetime.now().isoformat(),
291
  type=event_type,
292
  reason="RolledOut",
@@ -299,6 +325,155 @@ class World:
299
  self.cluster_state["pods"] = [p for p in self.cluster_state["pods"] if p.get("deployment") != deployment]
300
 
301
  return True
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
302
 
303
  def tick(self):
304
  """Advances simulated time by one step. Pods in CrashLoopBackOff increment their restart counter. Pending pods on ready nodes eventually transition to Running. Dead nodes stay dead unless drained."""
@@ -306,8 +481,8 @@ class World:
306
 
307
  # Simulate some natural changes in resource usage
308
  for node in self.cluster_state["nodes"]:
309
- node["cpu_usage"] = max(0, min(100, node["cpu_usage"] + random.uniform(-5, 5)))
310
- node["mem_usage"] = max(0, min(100, node["mem_usage"] + random.uniform(-5, 5)))
311
  node["last_updated"] = datetime.now().isoformat()
312
 
313
  # Update pod statuses based on node status
@@ -321,7 +496,7 @@ class World:
321
  elif pod["status"] == "Pending":
322
  pod["status"] = "Unknown"
323
  elif node and node["status"] == "Ready" and pod["status"] == "Pending":
324
- if random.random() > 0.7:
325
  pod["status"] = "Running"
326
  pod["last_updated"] = datetime.now().isoformat()
327
 
@@ -341,7 +516,7 @@ class World:
341
  if current_count < desired:
342
  nodes = self.cluster_state["nodes"]
343
  for i in range(desired - current_count):
344
- pod_name = f"{deployment['name']}-{random.randint(1000, 9999)}-{''.join([chr(random.randint(97, 122)) for _ in range(5)])}"
345
  node = nodes[i % len(nodes)] if nodes else None
346
  self.cluster_state["pods"].append({
347
  "name": pod_name,
@@ -357,7 +532,7 @@ class World:
357
  })
358
 
359
  # Generate occasional events
360
- if random.random() < 0.3:
361
  self._generate_event()
362
 
363
  def _generate_event(self):
@@ -373,7 +548,7 @@ class World:
373
  {"type": "Normal", "reason": "Killing", "message": "Stopping container"}
374
  ]
375
 
376
- event = random.choice(event_types)
377
  involved_objects = []
378
  involved_objects.extend([p["name"] for p in self.cluster_state["pods"][:3]])
379
  involved_objects.extend([d["name"] for d in self.cluster_state["deployments"][:3]])
@@ -389,7 +564,7 @@ class World:
389
  type=event_type,
390
  reason=event["reason"],
391
  message=event["message"],
392
- involved_object=random.choice(involved_objects)
393
  ))
394
  self._event_counter += 1
395
 
 
7
 
8
  from typing import Dict, List, Any, Optional, Literal
9
  from datetime import datetime
10
+ import numpy as np
11
  import time
12
 
13
  from .models import (
 
20
  class World:
21
  """In-memory Kubernetes cluster simulator"""
22
 
23
+ def __init__(self, config: Dict[str, Any], seed: Optional[int] = None):
24
  self.config = config
25
+ self.seed = seed
26
+ self.rng = np.random.default_rng(seed)
27
  self.cluster_state = self._initialize_healthy_cluster()
28
  self.step_count = 0
29
  self.events = []
30
  self._event_counter = 0
31
+
32
+ def _random_suffix(self, length: int = 5) -> str:
33
+ """Generate a random lowercase alphabetic suffix."""
34
+ letters = self.rng.integers(97, 123, size=length)
35
+ return "".join(chr(int(code)) for code in letters)
36
 
37
  def _initialize_healthy_cluster(self) -> Dict[str, List[Dict]]:
38
  """Initialize a healthy cluster state based on config"""
 
74
 
75
  # Create pods for this deployment
76
  for j in range(dep["replicas"]):
77
+ pod_name = f"{dep['name']}-{int(self.rng.integers(1000, 10000))}-{self._random_suffix()}"
78
  pods.append({
79
  "name": pod_name,
80
  "status": "Running",
 
147
 
148
  def get_pods(self, namespace: Optional[str] = None, selector: Optional[Dict[str, str]] = None) -> List[PodStatus]:
149
  """Returns filtered pod list (mimics kubectl get pods)"""
150
+ filtered_pods = self.cluster_state["pods"]
151
+
152
+ if namespace is not None:
153
+ filtered_pods = [
154
+ pod for pod in filtered_pods
155
+ if pod.get("namespace", "default") == namespace
156
+ ]
157
+
158
+ if selector:
159
+ for key, value in selector.items():
160
+ if key in {"app", "deployment"}:
161
+ filtered_pods = [
162
+ pod for pod in filtered_pods
163
+ if pod.get("deployment") == value
164
+ ]
165
+ else:
166
+ filtered_pods = [
167
+ pod for pod in filtered_pods
168
+ if pod.get("labels", {}).get(key) == value
169
+ ]
170
+
171
+ return [PodStatus(**pod) for pod in filtered_pods]
172
 
173
  def get_nodes(self) -> List[NodeStatus]:
174
  """Get all nodes as Pydantic models"""
 
256
  for i in range(desired_replicas - current_count):
257
  deployment = next((d for d in self.cluster_state["deployments"] if d["name"] == deployment_name), None)
258
  if deployment:
259
+ pod_name = f"{deployment_name}-{int(self.rng.integers(1000, 10000))}-{self._random_suffix()}"
260
  node = nodes[i % len(nodes)] if nodes else None
261
  self.cluster_state["pods"].append({
262
  "name": pod_name,
 
292
 
293
  event_type: Literal["Normal"] = "Normal" # type: ignore
294
  event = ClusterEvent(
295
+ event_id=f"event-delpod-{int(self.rng.integers(1000, 10000))}",
296
  timestamp=datetime.now().isoformat(),
297
  type=event_type,
298
  reason="UserDeleted",
 
312
  for pod in pods_to_delete:
313
  event_type: Literal["Normal"] = "Normal" # type: ignore
314
  event = ClusterEvent(
315
+ event_id=f"event-restart-{int(self.rng.integers(1000, 10000))}",
316
  timestamp=datetime.now().isoformat(),
317
  type=event_type,
318
  reason="RolledOut",
 
325
  self.cluster_state["pods"] = [p for p in self.cluster_state["pods"] if p.get("deployment") != deployment]
326
 
327
  return True
328
+
329
+ def set_hpa(self, deployment: str, min_replicas: int, max_replicas: int, cpu_target_percent: int) -> bool:
330
+ """Create or update an HPA configuration for a deployment."""
331
+ target_deployment = next(
332
+ (d for d in self.cluster_state["deployments"] if d["name"] == deployment),
333
+ None,
334
+ )
335
+ if target_deployment is None:
336
+ return False
337
+
338
+ hpa_name = f"{deployment}-hpa"
339
+ now = datetime.now().isoformat()
340
+
341
+ existing_hpa = next((h for h in self.cluster_state["hpas"] if h.get("name") == hpa_name), None)
342
+ if existing_hpa is None:
343
+ self.cluster_state["hpas"].append({
344
+ "name": hpa_name,
345
+ "min_replicas": min_replicas,
346
+ "max_replicas": max_replicas,
347
+ "current_replicas": max(min_replicas, min(target_deployment["desired_replicas"], max_replicas)),
348
+ "cpu_target_percent": cpu_target_percent,
349
+ "last_updated": now,
350
+ })
351
+ else:
352
+ existing_hpa.update({
353
+ "min_replicas": min_replicas,
354
+ "max_replicas": max_replicas,
355
+ "cpu_target_percent": cpu_target_percent,
356
+ "current_replicas": max(min_replicas, min(target_deployment["desired_replicas"], max_replicas)),
357
+ "last_updated": now,
358
+ })
359
+
360
+ # Keep the deployment desired replicas within configured HPA bounds.
361
+ bounded_replicas = max(min_replicas, min(target_deployment["desired_replicas"], max_replicas))
362
+ target_deployment["desired_replicas"] = bounded_replicas
363
+ target_deployment["last_updated"] = now
364
+
365
+ event_type: Literal["Normal"] = "Normal" # type: ignore
366
+ self.events.append(ClusterEvent(
367
+ event_id=f"event-hpa-{int(self.rng.integers(1000, 10000))}",
368
+ timestamp=now,
369
+ type=event_type,
370
+ reason="HorizontalPodAutoscalerUpdated",
371
+ message=(
372
+ f"HPA configured for deployment/{deployment}: "
373
+ f"min={min_replicas}, max={max_replicas}, cpu_target={cpu_target_percent}%"
374
+ ),
375
+ involved_object=deployment,
376
+ ))
377
+
378
+ return True
379
+
380
+ def drain_node(self, node_name: str) -> bool:
381
+ """Mark a node unschedulable and evict/reschedule pods currently on it."""
382
+ node = next((n for n in self.cluster_state["nodes"] if n["name"] == node_name), None)
383
+ if node is None:
384
+ return False
385
+
386
+ node["status"] = "SchedulingDisabled"
387
+ node["last_updated"] = datetime.now().isoformat()
388
+
389
+ candidate_nodes = [
390
+ n for n in self.cluster_state["nodes"]
391
+ if n["name"] != node_name and n.get("status") == "Ready"
392
+ ]
393
+
394
+ pods_on_node = [p for p in self.cluster_state["pods"] if p.get("node") == node_name]
395
+ for i, pod in enumerate(pods_on_node):
396
+ replacement = candidate_nodes[i % len(candidate_nodes)] if candidate_nodes else None
397
+ pod["node"] = replacement["name"] if replacement else None
398
+ pod["status"] = "Pending"
399
+ pod["last_updated"] = datetime.now().isoformat()
400
+
401
+ event_type: Literal["Normal"] = "Normal" # type: ignore
402
+ self.events.append(ClusterEvent(
403
+ event_id=f"event-evict-{int(self.rng.integers(1000, 10000))}",
404
+ timestamp=datetime.now().isoformat(),
405
+ type=event_type,
406
+ reason="Evicted",
407
+ message=f"pod/{pod['name']} evicted from drained node/{node_name}",
408
+ involved_object=pod["name"],
409
+ ))
410
+
411
+ event_type: Literal["Normal"] = "Normal" # type: ignore
412
+ self.events.append(ClusterEvent(
413
+ event_id=f"event-drain-{int(self.rng.integers(1000, 10000))}",
414
+ timestamp=datetime.now().isoformat(),
415
+ type=event_type,
416
+ reason="NodeDrained",
417
+ message=f"node/{node_name} cordoned and drained",
418
+ involved_object=node_name,
419
+ ))
420
+
421
+ return True
422
+
423
+ def describe(self, resource_type: str, name: str) -> Dict[str, Any]:
424
+ """Return kubectl-describe style details for a specific resource."""
425
+ collection_map = {
426
+ "deployment": "deployments",
427
+ "pod": "pods",
428
+ "node": "nodes",
429
+ "service": "services",
430
+ "configmap": "configmaps",
431
+ "hpa": "hpas",
432
+ }
433
+
434
+ collection_name = collection_map.get(resource_type)
435
+ if collection_name is None:
436
+ return {
437
+ "type": resource_type,
438
+ "name": name,
439
+ "found": False,
440
+ "error": f"Unsupported resource_type: {resource_type}",
441
+ }
442
+
443
+ resource = next(
444
+ (item for item in self.cluster_state.get(collection_name, []) if item.get("name") == name),
445
+ None,
446
+ )
447
+ if resource is None:
448
+ return {
449
+ "type": resource_type,
450
+ "name": name,
451
+ "found": False,
452
+ "error": f"{resource_type} '{name}' not found",
453
+ }
454
+
455
+ related_pods = []
456
+ if resource_type == "deployment":
457
+ related_pods = [p for p in self.cluster_state["pods"] if p.get("deployment") == name]
458
+ elif resource_type == "node":
459
+ related_pods = [p for p in self.cluster_state["pods"] if p.get("node") == name]
460
+ elif resource_type == "service":
461
+ selector_app = resource.get("selector", {}).get("app")
462
+ if selector_app:
463
+ related_pods = [p for p in self.cluster_state["pods"] if p.get("deployment") == selector_app]
464
+
465
+ related_events = [e.model_dump() for e in self.events if e.involved_object in {name, resource_type}]
466
+
467
+ return {
468
+ "type": resource_type,
469
+ "name": name,
470
+ "found": True,
471
+ "resource": dict(resource),
472
+ "related_pods": related_pods,
473
+ "recent_events": related_events[-10:],
474
+ "step": self.step_count,
475
+ "timestamp": datetime.now().isoformat(),
476
+ }
477
 
478
  def tick(self):
479
  """Advances simulated time by one step. Pods in CrashLoopBackOff increment their restart counter. Pending pods on ready nodes eventually transition to Running. Dead nodes stay dead unless drained."""
 
481
 
482
  # Simulate some natural changes in resource usage
483
  for node in self.cluster_state["nodes"]:
484
+ node["cpu_usage"] = max(0, min(100, node["cpu_usage"] + float(self.rng.uniform(-5, 5))))
485
+ node["mem_usage"] = max(0, min(100, node["mem_usage"] + float(self.rng.uniform(-5, 5))))
486
  node["last_updated"] = datetime.now().isoformat()
487
 
488
  # Update pod statuses based on node status
 
496
  elif pod["status"] == "Pending":
497
  pod["status"] = "Unknown"
498
  elif node and node["status"] == "Ready" and pod["status"] == "Pending":
499
+ if float(self.rng.random()) > 0.7:
500
  pod["status"] = "Running"
501
  pod["last_updated"] = datetime.now().isoformat()
502
 
 
516
  if current_count < desired:
517
  nodes = self.cluster_state["nodes"]
518
  for i in range(desired - current_count):
519
+ pod_name = f"{deployment['name']}-{int(self.rng.integers(1000, 10000))}-{self._random_suffix()}"
520
  node = nodes[i % len(nodes)] if nodes else None
521
  self.cluster_state["pods"].append({
522
  "name": pod_name,
 
532
  })
533
 
534
  # Generate occasional events
535
+ if float(self.rng.random()) < 0.3:
536
  self._generate_event()
537
 
538
  def _generate_event(self):
 
548
  {"type": "Normal", "reason": "Killing", "message": "Stopping container"}
549
  ]
550
 
551
+ event = self.rng.choice(event_types)
552
  involved_objects = []
553
  involved_objects.extend([p["name"] for p in self.cluster_state["pods"][:3]])
554
  involved_objects.extend([d["name"] for d in self.cluster_state["deployments"][:3]])
 
564
  type=event_type,
565
  reason=event["reason"],
566
  message=event["message"],
567
+ involved_object=str(self.rng.choice(involved_objects))
568
  ))
569
  self._event_counter += 1
570
 
server/app.py CHANGED
@@ -3,6 +3,7 @@ COEnv FastAPI Application
3
  Exposes /reset /step /state endpoints
4
  """
5
 
 
6
  from fastapi import FastAPI, HTTPException
7
  from pydantic import BaseModel, Field
8
  from typing import Dict, Any, Optional, List, Literal
@@ -11,8 +12,13 @@ import json
11
  import os
12
  import sys
13
 
14
- from .COEnv_environment import World
15
- from .models import ClusterObservation, RewardSignal, KubeAction
 
 
 
 
 
16
 
17
  app = FastAPI(title="COEnv", description="Kubernetes Simulator for OpenEnv")
18
 
@@ -56,13 +62,22 @@ def load_config():
56
  def get_condition_for_task(task_id: str):
57
  """Get the condition injector for a task"""
58
  if task_id == "pod_recovery":
59
- from .conditions.crash_loop import CrashLoopCondition
 
 
 
60
  return CrashLoopCondition(world_instance, config)
61
  elif task_id == "autoscaling":
62
- from .conditions.oom_kill import OOMKillCondition
 
 
 
63
  return OOMKillCondition(world_instance, config)
64
  elif task_id == "incident":
65
- from .conditions.cascade_failure import CascadeFailureCondition
 
 
 
66
  return CascadeFailureCondition(world_instance, config)
67
  return None
68
 
@@ -82,7 +97,7 @@ async def startup_event():
82
  """Initialize the world on startup"""
83
  global world_instance, current_task, current_objective
84
  load_config()
85
- world_instance = World(config)
86
  print("COEnv initialized")
87
 
88
 
@@ -279,5 +294,16 @@ async def get_state():
279
  return world_instance.get_observation(current_objective).model_dump()
280
 
281
 
 
 
 
 
 
 
 
 
 
 
 
282
  if __name__ == "__main__":
283
- uvicorn.run(app, host="0.0.0.0", port=8000)
 
3
  Exposes /reset /step /state endpoints
4
  """
5
 
6
+ from openenv.core.env_server import create_app
7
  from fastapi import FastAPI, HTTPException
8
  from pydantic import BaseModel, Field
9
  from typing import Dict, Any, Optional, List, Literal
 
12
  import os
13
  import sys
14
 
15
+ try:
16
+ from .COEnv_environment import World
17
+ from .models import ClusterObservation, RewardSignal, KubeAction
18
+ except ImportError:
19
+ # Support running as a top-level module inside container images.
20
+ from COEnv_environment import World
21
+ from models import ClusterObservation, RewardSignal, KubeAction
22
 
23
  app = FastAPI(title="COEnv", description="Kubernetes Simulator for OpenEnv")
24
 
 
62
  def get_condition_for_task(task_id: str):
63
  """Get the condition injector for a task"""
64
  if task_id == "pod_recovery":
65
+ try:
66
+ from .conditions.crash_loop import CrashLoopCondition
67
+ except ImportError:
68
+ from conditions.crash_loop import CrashLoopCondition
69
  return CrashLoopCondition(world_instance, config)
70
  elif task_id == "autoscaling":
71
+ try:
72
+ from .conditions.oom_kill import OOMKillCondition
73
+ except ImportError:
74
+ from conditions.oom_kill import OOMKillCondition
75
  return OOMKillCondition(world_instance, config)
76
  elif task_id == "incident":
77
+ try:
78
+ from .conditions.cascade_failure import CascadeFailureCondition
79
+ except ImportError:
80
+ from conditions.cascade_failure import CascadeFailureCondition
81
  return CascadeFailureCondition(world_instance, config)
82
  return None
83
 
 
97
  """Initialize the world on startup"""
98
  global world_instance, current_task, current_objective
99
  load_config()
100
+ world_instance = World(config, seed=config.get("seed"))
101
  print("COEnv initialized")
102
 
103
 
 
294
  return world_instance.get_observation(current_objective).model_dump()
295
 
296
 
297
+ @app.get("/health")
298
+ async def health():
299
+ """Container health endpoint used by Docker health checks."""
300
+ return {"status": "ok"}
301
+
302
+
303
+ def main() -> None:
304
+ """Application entrypoint for local execution."""
305
+ uvicorn.run(app, host="0.0.0.0", port=8000)
306
+
307
+
308
  if __name__ == "__main__":
309
+ main()
server/conditions/cascade_failure.py CHANGED
@@ -4,7 +4,6 @@ CascadeFailureCondition - Simulates multi-service dependency failure
4
 
5
  from typing import Dict, List, Any, Optional
6
  from ..COEnv_environment import World
7
- import random
8
 
9
 
10
  class CascadeFailureCondition:
@@ -32,14 +31,14 @@ class CascadeFailureCondition:
32
  deployments = self.world.get_deployments()
33
  critical_deployments = [d for d in deployments if d.name in critical_services]
34
  if critical_deployments:
35
- root_cause_service = random.choice(critical_deployments).name
36
  else:
37
  deployments = self.world.get_deployments()
38
- root_cause_service = random.choice(deployments).name if deployments else "frontend"
39
 
40
  root_deployment = next((d for d in self.world.get_deployments() if d.name == root_cause_service), None)
41
  if root_deployment:
42
- from ..oom_kill import OOMKillCondition
43
  oom_condition = OOMKillCondition(self.world, self.config)
44
  oom_condition.inject(target_deployment=root_cause_service, failure_rate=0.8)
45
 
@@ -47,15 +46,15 @@ class CascadeFailureCondition:
47
 
48
  deployments = self.world.get_deployments()
49
  for deployment in deployments:
50
- if deployment.name != root_cause_service and failure_probability is not None and random.random() < failure_probability:
51
- failure_type = random.choice(["crashloop", "oom", "slow"])
52
 
53
  if failure_type == "crashloop":
54
- from ..crash_loop import CrashLoopCondition
55
  condition = CrashLoopCondition(self.world, self.config)
56
  condition.inject(target_deployment=deployment.name, failure_rate=0.6)
57
  elif failure_type == "oom":
58
- from ..oom_kill import OOMKillCondition
59
  condition = OOMKillCondition(self.world, self.config)
60
  condition.inject(target_deployment=deployment.name, failure_rate=0.6)
61
  else:
@@ -75,7 +74,7 @@ class CascadeFailureCondition:
75
  from datetime import datetime
76
 
77
  event = ClusterEvent(
78
- event_id=f"event-cascade-{random.randint(1000, 9999)}",
79
  timestamp=datetime.now().isoformat(),
80
  type=event_type,
81
  reason="CascadeFailure",
 
4
 
5
  from typing import Dict, List, Any, Optional
6
  from ..COEnv_environment import World
 
7
 
8
 
9
  class CascadeFailureCondition:
 
31
  deployments = self.world.get_deployments()
32
  critical_deployments = [d for d in deployments if d.name in critical_services]
33
  if critical_deployments:
34
+ root_cause_service = self.world.rng.choice(critical_deployments).name
35
  else:
36
  deployments = self.world.get_deployments()
37
+ root_cause_service = self.world.rng.choice(deployments).name if deployments else "frontend"
38
 
39
  root_deployment = next((d for d in self.world.get_deployments() if d.name == root_cause_service), None)
40
  if root_deployment:
41
+ from .oom_kill import OOMKillCondition
42
  oom_condition = OOMKillCondition(self.world, self.config)
43
  oom_condition.inject(target_deployment=root_cause_service, failure_rate=0.8)
44
 
 
46
 
47
  deployments = self.world.get_deployments()
48
  for deployment in deployments:
49
+ if deployment.name != root_cause_service and failure_probability is not None and float(self.world.rng.random()) < failure_probability:
50
+ failure_type = str(self.world.rng.choice(["crashloop", "oom", "slow"]))
51
 
52
  if failure_type == "crashloop":
53
+ from .crash_loop import CrashLoopCondition
54
  condition = CrashLoopCondition(self.world, self.config)
55
  condition.inject(target_deployment=deployment.name, failure_rate=0.6)
56
  elif failure_type == "oom":
57
+ from .oom_kill import OOMKillCondition
58
  condition = OOMKillCondition(self.world, self.config)
59
  condition.inject(target_deployment=deployment.name, failure_rate=0.6)
60
  else:
 
74
  from datetime import datetime
75
 
76
  event = ClusterEvent(
77
+ event_id=f"event-cascade-{int(self.world.rng.integers(1000, 10000))}",
78
  timestamp=datetime.now().isoformat(),
79
  type=event_type,
80
  reason="CascadeFailure",
server/conditions/crash_loop.py CHANGED
@@ -4,7 +4,6 @@ CrashLoopCondition - Simulates pods stuck in CrashLoopBackOff
4
 
5
  from typing import Dict, List, Any, Optional
6
  from ..COEnv_environment import World
7
- import random
8
 
9
 
10
  class CrashLoopCondition:
@@ -32,27 +31,27 @@ class CrashLoopCondition:
32
  if target_deployment is not None:
33
  target_deps = [d for d in deployments if d.name == target_deployment]
34
  else:
35
- target_deps = [random.choice(deployments)] if deployments else []
36
 
37
  for deployment in target_deps:
38
  pods = [p for p in self.world.get_pods() if p.deployment == deployment.name]
39
 
40
  for pod in pods:
41
- if failure_rate is not None and random.random() < failure_rate:
42
  patch = {
43
  "status": "CrashLoopBackOff",
44
- "restarts": random.randint(5, 20)
45
  }
46
  self.world.apply_patch("pod", pod.name, patch)
47
  self._add_crashloop_event(pod.name)
48
 
49
  def _add_crashloop_event(self, pod_name: str):
50
  """Add a crashloop event"""
51
- from .models import ClusterEvent
52
  from datetime import datetime
53
 
54
  event = ClusterEvent(
55
- event_id=f"event-crashloop-{random.randint(1000, 9999)}",
56
  timestamp=datetime.now().isoformat(),
57
  type="Warning",
58
  reason="BackOff",
 
4
 
5
  from typing import Dict, List, Any, Optional
6
  from ..COEnv_environment import World
 
7
 
8
 
9
  class CrashLoopCondition:
 
31
  if target_deployment is not None:
32
  target_deps = [d for d in deployments if d.name == target_deployment]
33
  else:
34
+ target_deps = [self.world.rng.choice(deployments)] if deployments else []
35
 
36
  for deployment in target_deps:
37
  pods = [p for p in self.world.get_pods() if p.deployment == deployment.name]
38
 
39
  for pod in pods:
40
+ if failure_rate is not None and float(self.world.rng.random()) < failure_rate:
41
  patch = {
42
  "status": "CrashLoopBackOff",
43
+ "restarts": int(self.world.rng.integers(5, 21))
44
  }
45
  self.world.apply_patch("pod", pod.name, patch)
46
  self._add_crashloop_event(pod.name)
47
 
48
  def _add_crashloop_event(self, pod_name: str):
49
  """Add a crashloop event"""
50
+ from ..models import ClusterEvent
51
  from datetime import datetime
52
 
53
  event = ClusterEvent(
54
+ event_id=f"event-crashloop-{int(self.world.rng.integers(1000, 10000))}",
55
  timestamp=datetime.now().isoformat(),
56
  type="Warning",
57
  reason="BackOff",
server/conditions/node_failure.py CHANGED
@@ -4,7 +4,6 @@ NodeFailureCondition - Simulates node outages and scheduling disruption
4
 
5
  from typing import Dict, List, Any, Optional
6
  from ..COEnv_environment import World
7
- import random
8
 
9
 
10
  class NodeFailureCondition:
@@ -32,7 +31,7 @@ class NodeFailureCondition:
32
  if target_node:
33
  target_nodes = [n for n in nodes if n.name == target_node]
34
  else:
35
- target_nodes = [n for n in nodes if failure_rate is not None and random.random() < failure_rate]
36
 
37
  for node in target_nodes:
38
  patch = {
@@ -54,11 +53,11 @@ class NodeFailureCondition:
54
 
55
  def _add_node_failure_event(self, node_name: str):
56
  """Add a node failure event"""
57
- from models import ClusterEvent
58
  from datetime import datetime
59
 
60
  event = ClusterEvent(
61
- event_id=f"event-nodefail-{random.randint(1000, 9999)}",
62
  timestamp=datetime.now().isoformat(),
63
  type="Warning",
64
  reason="NodeNotReady",
 
4
 
5
  from typing import Dict, List, Any, Optional
6
  from ..COEnv_environment import World
 
7
 
8
 
9
  class NodeFailureCondition:
 
31
  if target_node:
32
  target_nodes = [n for n in nodes if n.name == target_node]
33
  else:
34
+ target_nodes = [n for n in nodes if failure_rate is not None and float(self.world.rng.random()) < failure_rate]
35
 
36
  for node in target_nodes:
37
  patch = {
 
53
 
54
  def _add_node_failure_event(self, node_name: str):
55
  """Add a node failure event"""
56
+ from ..models import ClusterEvent
57
  from datetime import datetime
58
 
59
  event = ClusterEvent(
60
+ event_id=f"event-nodefail-{int(self.world.rng.integers(1000, 10000))}",
61
  timestamp=datetime.now().isoformat(),
62
  type="Warning",
63
  reason="NodeNotReady",
server/conditions/oom_kill.py CHANGED
@@ -4,7 +4,6 @@ OOMKillCondition - Simulates memory-limit failures causing repeated restarts
4
 
5
  from typing import Dict, List, Any, Optional
6
  from ..COEnv_environment import World
7
- import random
8
 
9
 
10
  class OOMKillCondition:
@@ -34,18 +33,18 @@ class OOMKillCondition:
34
  target_deps = [d for d in deployments if d.name == target_deployment]
35
  else:
36
  # Target a random deployment
37
- target_deps = [random.choice(deployments)] if deployments else []
38
 
39
  for deployment in target_deps:
40
  # Get pods for this deployment
41
  pods = [p for p in self.world.get_pods() if p.deployment == deployment.name]
42
 
43
  for pod in pods:
44
- if failure_rate is not None and random.random() < failure_rate:
45
  # Simulate OOMKill by setting high memory usage and restart count
46
  patch = {
47
  "status": "Running", # OOMKill pods often show as Running but crash
48
- "restarts": random.randint(10, 30) # High restart count from OOM
49
  }
50
  self.world.apply_patch("pod", pod.name, patch)
51
 
@@ -60,11 +59,11 @@ class OOMKillCondition:
60
 
61
  def _add_oom_event(self, pod_name: str):
62
  """Add an OOMKill event"""
63
- from .models import ClusterEvent
64
  from datetime import datetime
65
 
66
  event = ClusterEvent(
67
- event_id=f"event-oom-{random.randint(1000, 9999)}",
68
  timestamp=datetime.now().isoformat(),
69
  type="Warning",
70
  reason="OOMKilling",
 
4
 
5
  from typing import Dict, List, Any, Optional
6
  from ..COEnv_environment import World
 
7
 
8
 
9
  class OOMKillCondition:
 
33
  target_deps = [d for d in deployments if d.name == target_deployment]
34
  else:
35
  # Target a random deployment
36
+ target_deps = [self.world.rng.choice(deployments)] if deployments else []
37
 
38
  for deployment in target_deps:
39
  # Get pods for this deployment
40
  pods = [p for p in self.world.get_pods() if p.deployment == deployment.name]
41
 
42
  for pod in pods:
43
+ if failure_rate is not None and float(self.world.rng.random()) < failure_rate:
44
  # Simulate OOMKill by setting high memory usage and restart count
45
  patch = {
46
  "status": "Running", # OOMKill pods often show as Running but crash
47
+ "restarts": int(self.world.rng.integers(10, 31)) # High restart count from OOM
48
  }
49
  self.world.apply_patch("pod", pod.name, patch)
50
 
 
59
 
60
  def _add_oom_event(self, pod_name: str):
61
  """Add an OOMKill event"""
62
+ from ..models import ClusterEvent
63
  from datetime import datetime
64
 
65
  event = ClusterEvent(
66
+ event_id=f"event-oom-{int(self.world.rng.integers(1000, 10000))}",
67
  timestamp=datetime.now().isoformat(),
68
  type="Warning",
69
  reason="OOMKilling",
server/models.py CHANGED
@@ -88,7 +88,7 @@ All typed models are mandatory for OpenEnv spec compliance.
88
  Every endpoint uses these.
89
  """
90
 
91
- from pydantic import BaseModel, Field
92
  from typing import List, Dict, Any, Optional, Literal
93
  from datetime import datetime
94
 
@@ -171,7 +171,10 @@ class ClusterObservation(BaseModel):
171
  deployments: List[DeploymentStatus]
172
  services: List[ServiceStatus]
173
  configmaps: List[ConfigMapStatus]
174
- hpas: List[HPAStatus]
 
 
 
175
  events: List[ClusterEvent]
176
  step: int
177
  objective: str
 
88
  Every endpoint uses these.
89
  """
90
 
91
+ from pydantic import BaseModel, Field, AliasChoices
92
  from typing import List, Dict, Any, Optional, Literal
93
  from datetime import datetime
94
 
 
171
  deployments: List[DeploymentStatus]
172
  services: List[ServiceStatus]
173
  configmaps: List[ConfigMapStatus]
174
+ hpas: List[HPAStatus] = Field(
175
+ default_factory=list,
176
+ validation_alias=AliasChoices("hpa", "hpas")
177
+ )
178
  events: List[ClusterEvent]
179
  step: int
180
  objective: str
server/utils.py CHANGED
@@ -4,12 +4,21 @@ Random failure rate generators, latency simulators, resource usage curves.
4
  Makes the simulation feel realistic and non-deterministic in the right ways.
5
  """
6
 
7
- import random
8
  import math
9
  from typing import Dict, List, Any, Optional
10
  from datetime import datetime, timedelta
11
 
12
 
 
 
 
 
 
 
 
 
 
13
  class ProbabilityHelpers:
14
  """Helpers for generating realistic probabilities and distributions"""
15
 
@@ -17,23 +26,15 @@ class ProbabilityHelpers:
17
  def weighted_random_choice(choices: List[Any], weights: List[float]) -> Any:
18
  """Make a weighted random choice"""
19
  if not choices or not weights or len(choices) != len(weights):
20
- return random.choice(choices) if choices else None
21
 
22
  # Normalize weights
23
  total_weight = sum(weights)
24
  if total_weight == 0:
25
- return random.choice(choices)
26
 
27
  normalized_weights = [w / total_weight for w in weights]
28
-
29
- # Make choice
30
- r = random.random()
31
- cumulative_weight = 0
32
- for choice, weight in zip(choices, normalized_weights):
33
- cumulative_weight += weight
34
- if r <= cumulative_weight:
35
- return choice
36
- return choices[-1] # Fallback
37
 
38
  @staticmethod
39
  def exponential_backoff(attempt: int, base_delay: float = 1.0, max_delay: float = 60.0) -> float:
@@ -44,8 +45,7 @@ class ProbabilityHelpers:
44
  @staticmethod
45
  def poisson_arrival_rate(lambda_rate: float, time_window: float) -> int:
46
  """Generate number of events in time window using Poisson distribution"""
47
- # Simple approximation - in reality would use numpy.random.poisson
48
- return int(lambda_rate * time_window + random.gauss(0, math.sqrt(lambda_rate * time_window)))
49
 
50
  @staticmethod
51
  def failure_probability_over_time(base_rate: float, time_elapsed: float,
@@ -57,7 +57,7 @@ class ProbabilityHelpers:
57
  @staticmethod
58
  def random_failure_rate(min_rate: float = 0.1, max_rate: float = 0.9) -> float:
59
  """Generate a random failure rate within bounds"""
60
- return random.uniform(min_rate, max_rate)
61
 
62
 
63
  class LatencySimulator:
@@ -75,7 +75,7 @@ class LatencySimulator:
75
  """Get simulated latency in milliseconds"""
76
  # Base latency + load-dependent component + random jitter
77
  load_latency = self.base_latency_ms * (self.load_factor - 1.0) * 2
78
- jitter = random.gauss(0, self.base_latency_ms * 0.1)
79
  latency = self.base_latency_ms + max(0, load_latency) + jitter
80
  return max(1.0, latency) # Minimum 1ms latency
81
 
@@ -83,7 +83,7 @@ class LatencySimulator:
83
  spike_multiplier: float = 5.0) -> float:
84
  """Get latency with occasional spikes"""
85
  latency = self.get_latency()
86
- if random.random() < spike_probability:
87
  latency *= spike_multiplier
88
  return latency
89
 
@@ -92,7 +92,7 @@ class ResourceUsageSimulator:
92
  """Simulates realistic CPU and memory usage patterns"""
93
 
94
  def __init__(self):
95
- self.time_offset = random.uniform(0, 2 * math.pi)
96
 
97
  def get_cpu_usage(self, base_usage: float = 0.3,
98
  variation: float = 0.2) -> float:
@@ -102,7 +102,7 @@ class ResourceUsageSimulator:
102
  daily_pattern = 0.5 * math.sin(2 * math.pi * time_factor / 24) + 0.5
103
 
104
  usage = base_usage + variation * daily_pattern
105
- usage += random.gauss(0, 0.05) # Noise
106
  return max(0.0, min(1.0, usage)) * 100 # Clamp to 0-100%
107
 
108
  def get_memory_usage(self, base_usage: float = 0.4,
@@ -113,7 +113,7 @@ class ResourceUsageSimulator:
113
  leak_factor = 0.1 * time_factor # Slow leak over week
114
 
115
  usage = base_usage + leak_factor
116
- usage += random.gauss(0, 0.03) # Noise
117
  return max(0.0, min(1.0, usage)) * 100 # Clamp to 0-100%
118
 
119
  def get_resource_curve(self, resource_type: str,
@@ -121,11 +121,11 @@ class ResourceUsageSimulator:
121
  """Get resource usage following a specific curve"""
122
  if resource_type == "cpu":
123
  # CPU: periodic with bursts
124
- return 0.3 + 0.4 * math.sin(time_elapsed / 100) + 0.2 * random.random()
125
  elif resource_type == "memory":
126
  # Memory: gradual increase with occasional GC drops
127
  base = 0.2 + 0.6 * (1 - math.exp(-time_elapsed / 1000))
128
- gc_drop = 0.3 if random.random() < 0.01 else 0 # Occasional GC
129
  return max(0, base - gc_drop)
130
  elif resource_type == "disk":
131
  # Disk: steady growth
@@ -144,30 +144,30 @@ class NetworkSimulator:
144
 
145
  def simulate_partition(self) -> bool:
146
  """Return True if network partition is simulated"""
147
- return random.random() < self.partition_probability
148
 
149
  def get_latency(self) -> float:
150
  """Get network latency in milliseconds"""
151
  # Base latency with occasional spikes
152
- latency = self.latency_ms + random.gauss(0, self.latency_ms * 0.2)
153
- if random.random() < 0.05: # 5% chance of spike
154
- latency *= random.uniform(2, 10)
155
  return max(1.0, latency)
156
 
157
  def get_bandwidth(self) -> float:
158
  """Get available bandwidth in Mbps"""
159
  # Bandwidth varies with usage and conditions
160
- usage_factor = random.uniform(0.3, 0.9)
161
- condition_factor = random.uniform(0.8, 1.2)
162
  return self.bandwidth_mbps * usage_factor * condition_factor
163
 
164
 
165
  def generate_failure_scenario(config: Dict[str, Any]) -> Dict[str, Any]:
166
  """Generate a random failure scenario based on config"""
167
  scenario = {
168
- "type": random.choice(["crashloop", "oom", "node_failure", "cascade"]),
169
- "severity": random.uniform(0.3, 0.9),
170
- "duration": random.randint(30, 300), # seconds
171
  "affected_components": []
172
  }
173
 
@@ -186,5 +186,5 @@ def generate_failure_scenario(config: Dict[str, Any]) -> Dict[str, Any]:
186
 
187
  def apply_realistic_noise(value: float, noise_percent: float = 10.0) -> float:
188
  """Apply realistic noise to a value"""
189
- noise = random.gauss(0, value * (noise_percent / 100.0))
190
  return max(0, value + noise)
 
4
  Makes the simulation feel realistic and non-deterministic in the right ways.
5
  """
6
 
7
+ import numpy as np
8
  import math
9
  from typing import Dict, List, Any, Optional
10
  from datetime import datetime, timedelta
11
 
12
 
13
+ _RNG = np.random.default_rng()
14
+
15
+
16
+ def set_random_seed(seed: Optional[int]) -> None:
17
+ """Set module-level RNG seed for deterministic utility behavior."""
18
+ global _RNG
19
+ _RNG = np.random.default_rng(seed)
20
+
21
+
22
  class ProbabilityHelpers:
23
  """Helpers for generating realistic probabilities and distributions"""
24
 
 
26
  def weighted_random_choice(choices: List[Any], weights: List[float]) -> Any:
27
  """Make a weighted random choice"""
28
  if not choices or not weights or len(choices) != len(weights):
29
+ return _RNG.choice(choices) if choices else None
30
 
31
  # Normalize weights
32
  total_weight = sum(weights)
33
  if total_weight == 0:
34
+ return _RNG.choice(choices)
35
 
36
  normalized_weights = [w / total_weight for w in weights]
37
+ return _RNG.choice(choices, p=normalized_weights)
 
 
 
 
 
 
 
 
38
 
39
  @staticmethod
40
  def exponential_backoff(attempt: int, base_delay: float = 1.0, max_delay: float = 60.0) -> float:
 
45
  @staticmethod
46
  def poisson_arrival_rate(lambda_rate: float, time_window: float) -> int:
47
  """Generate number of events in time window using Poisson distribution"""
48
+ return int(_RNG.poisson(max(lambda_rate * time_window, 0)))
 
49
 
50
  @staticmethod
51
  def failure_probability_over_time(base_rate: float, time_elapsed: float,
 
57
  @staticmethod
58
  def random_failure_rate(min_rate: float = 0.1, max_rate: float = 0.9) -> float:
59
  """Generate a random failure rate within bounds"""
60
+ return float(_RNG.uniform(min_rate, max_rate))
61
 
62
 
63
  class LatencySimulator:
 
75
  """Get simulated latency in milliseconds"""
76
  # Base latency + load-dependent component + random jitter
77
  load_latency = self.base_latency_ms * (self.load_factor - 1.0) * 2
78
+ jitter = float(_RNG.normal(0, self.base_latency_ms * 0.1))
79
  latency = self.base_latency_ms + max(0, load_latency) + jitter
80
  return max(1.0, latency) # Minimum 1ms latency
81
 
 
83
  spike_multiplier: float = 5.0) -> float:
84
  """Get latency with occasional spikes"""
85
  latency = self.get_latency()
86
+ if float(_RNG.random()) < spike_probability:
87
  latency *= spike_multiplier
88
  return latency
89
 
 
92
  """Simulates realistic CPU and memory usage patterns"""
93
 
94
  def __init__(self):
95
+ self.time_offset = float(_RNG.uniform(0, 2 * math.pi))
96
 
97
  def get_cpu_usage(self, base_usage: float = 0.3,
98
  variation: float = 0.2) -> float:
 
102
  daily_pattern = 0.5 * math.sin(2 * math.pi * time_factor / 24) + 0.5
103
 
104
  usage = base_usage + variation * daily_pattern
105
+ usage += float(_RNG.normal(0, 0.05)) # Noise
106
  return max(0.0, min(1.0, usage)) * 100 # Clamp to 0-100%
107
 
108
  def get_memory_usage(self, base_usage: float = 0.4,
 
113
  leak_factor = 0.1 * time_factor # Slow leak over week
114
 
115
  usage = base_usage + leak_factor
116
+ usage += float(_RNG.normal(0, 0.03)) # Noise
117
  return max(0.0, min(1.0, usage)) * 100 # Clamp to 0-100%
118
 
119
  def get_resource_curve(self, resource_type: str,
 
121
  """Get resource usage following a specific curve"""
122
  if resource_type == "cpu":
123
  # CPU: periodic with bursts
124
+ return 0.3 + 0.4 * math.sin(time_elapsed / 100) + 0.2 * float(_RNG.random())
125
  elif resource_type == "memory":
126
  # Memory: gradual increase with occasional GC drops
127
  base = 0.2 + 0.6 * (1 - math.exp(-time_elapsed / 1000))
128
+ gc_drop = 0.3 if float(_RNG.random()) < 0.01 else 0 # Occasional GC
129
  return max(0, base - gc_drop)
130
  elif resource_type == "disk":
131
  # Disk: steady growth
 
144
 
145
  def simulate_partition(self) -> bool:
146
  """Return True if network partition is simulated"""
147
+ return float(_RNG.random()) < self.partition_probability
148
 
149
  def get_latency(self) -> float:
150
  """Get network latency in milliseconds"""
151
  # Base latency with occasional spikes
152
+ latency = self.latency_ms + float(_RNG.normal(0, self.latency_ms * 0.2))
153
+ if float(_RNG.random()) < 0.05: # 5% chance of spike
154
+ latency *= float(_RNG.uniform(2, 10))
155
  return max(1.0, latency)
156
 
157
  def get_bandwidth(self) -> float:
158
  """Get available bandwidth in Mbps"""
159
  # Bandwidth varies with usage and conditions
160
+ usage_factor = float(_RNG.uniform(0.3, 0.9))
161
+ condition_factor = float(_RNG.uniform(0.8, 1.2))
162
  return self.bandwidth_mbps * usage_factor * condition_factor
163
 
164
 
165
  def generate_failure_scenario(config: Dict[str, Any]) -> Dict[str, Any]:
166
  """Generate a random failure scenario based on config"""
167
  scenario = {
168
+ "type": str(_RNG.choice(["crashloop", "oom", "node_failure", "cascade"])),
169
+ "severity": float(_RNG.uniform(0.3, 0.9)),
170
+ "duration": int(_RNG.integers(30, 301)), # seconds
171
  "affected_components": []
172
  }
173
 
 
186
 
187
  def apply_realistic_noise(value: float, noise_percent: float = 10.0) -> float:
188
  """Apply realistic noise to a value"""
189
+ noise = float(_RNG.normal(0, value * (noise_percent / 100.0)))
190
  return max(0, value + noise)