coenv / server /coenv_environment.py
SandyTheAdventurer's picture
Upload folder using huggingface_hub
05a686e verified
"""
coenv Environment - Cluster Simulator
In-memory dict that holds cluster state: nodes, pods, deployments, services.
Has methods like get_pods(), apply_patch(), tick() to advance time.
This is the brain of the whole project.
"""
from typing import Dict, List, Any, Optional, Literal
from datetime import datetime
import numpy as np
from .models import (
NodeStatus, PodStatus, DeploymentStatus, ServiceStatus,
ClusterEvent, ClusterObservation,
ConfigMapStatus, HPAStatus
)
from .utils import set_random_seed
class World:
"""In-memory Kubernetes cluster simulator"""
def __init__(self, config: Dict[str, Any], seed: Optional[int] = None):
self.config = config
self.seed = seed
self.rng = np.random.default_rng(seed)
set_random_seed(seed)
self.cluster_state = self._initialize_healthy_cluster()
self.step_count = 0
self.events = []
self._event_counter = 0
def _random_suffix(self, length: int = 5) -> str:
"""Generate a random lowercase alphabetic suffix."""
letters = self.rng.integers(97, 123, size=length)
return "".join(chr(int(code)) for code in letters)
def _initialize_healthy_cluster(self) -> Dict[str, List[Dict]]:
"""Initialize a healthy cluster state based on config"""
nodes = []
for i in range(self.config.get("num_nodes", 3)):
nodes.append({
"name": f"node-{i+1}",
"status": "Ready",
"cpu_capacity": self.config.get("node_cpu_capacity", 4),
"mem_capacity": self.config.get("node_mem_capacity", 8192),
"cpu_usage": 0.0,
"mem_usage": 0.0,
"last_updated": datetime.now().isoformat()
})
pods = []
deployments = []
services = []
configmaps = []
hpas = []
# Create some default deployments and their pods
default_deployments = [
{"name": "frontend", "image": "nginx:1.21", "replicas": 3},
{"name": "backend", "image": "python:3.9", "replicas": 2},
{"name": "database", "image": "postgres:13", "replicas": 1},
{"name": "auth-service", "image": "auth:latest", "replicas": 2},
{"name": "api-gateway", "image": "nginx:alpine", "replicas": 2}
]
for dep in default_deployments:
deployments.append({
"name": dep["name"],
"desired_replicas": dep["replicas"],
"available_replicas": dep["replicas"],
"image": dep["image"],
"last_updated": datetime.now().isoformat()
})
# Create pods for this deployment
for j in range(dep["replicas"]):
pod_name = f"{dep['name']}-{int(self.rng.integers(1000, 10000))}-{self._random_suffix()}"
pods.append({
"name": pod_name,
"status": "Running",
"node": nodes[j % len(nodes)]["name"] if nodes else None,
"restarts": 0,
"cpu_request": self.config.get("pod_cpu_request", 500),
"mem_request": self.config.get("pod_mem_request", 256),
"cpu_limit": self.config.get("pod_cpu_limit", 1000),
"mem_limit": self.config.get("pod_mem_limit", 512),
"deployment": dep["name"],
"last_updated": datetime.now().isoformat()
})
# Create some default services
default_services = [
{"name": "frontend-service", "type": "ClusterIP", "ports": [{"port": 80, "targetPort": 80}]},
{"name": "backend-service", "type": "ClusterIP", "ports": [{"port": 8080, "targetPort": 8080}]},
{"name": "database-service", "type": "ClusterIP", "ports": [{"port": 5432, "targetPort": 5432}]},
{"name": "auth-service-service", "type": "ClusterIP", "ports": [{"port": 8000, "targetPort": 8000}]}
]
for svc in default_services:
services.append({
"name": svc["name"],
"type": svc["type"],
"ports": svc["ports"],
"selector": {"app": svc["name"].replace("-service", "")},
"cluster_ip": f"10.96.{len(services)+1}.{len(services)+1}",
"last_updated": datetime.now().isoformat()
})
# Create some default configmaps
default_configmaps = [
{"name": "frontend-config", "data": {"DB_HOST": "db.prod.internal", "DB_PORT": "5432"}},
{"name": "backend-config", "data": {"LOG_LEVEL": "info", "CACHE_SIZE": "100"}},
{"name": "database-config", "data": {"MAX_CONNECTIONS": "100", "TIMEOUT": "30"}}
]
for cm in default_configmaps:
configmaps.append({
"name": cm["name"],
"data": cm["data"],
"last_updated": datetime.now().isoformat()
})
# Create some default HPAs
default_hpas = [
{"name": "frontend-hpa", "min_replicas": 2, "max_replicas": 10, "cpu_target_percent": 70},
{"name": "backend-hpa", "min_replicas": 1, "max_replicas": 5, "cpu_target_percent": 80}
]
for hpa in default_hpas:
hpas.append({
"name": hpa["name"],
"min_replicas": hpa["min_replicas"],
"max_replicas": hpa["max_replicas"],
"current_replicas": hpa["min_replicas"],
"cpu_target_percent": hpa["cpu_target_percent"],
"last_updated": datetime.now().isoformat()
})
return {
"nodes": nodes,
"pods": pods,
"deployments": deployments,
"services": services,
"configmaps": configmaps,
"hpas": hpas
}
def get_pods(self, namespace: Optional[str] = None, selector: Optional[Dict[str, str]] = None) -> List[PodStatus]:
"""Returns filtered pod list (mimics kubectl get pods)"""
filtered_pods = self.cluster_state["pods"]
if namespace is not None:
filtered_pods = [
pod for pod in filtered_pods
if pod.get("namespace", "default") == namespace
]
if selector:
for key, value in selector.items():
if key in {"app", "deployment"}:
filtered_pods = [
pod for pod in filtered_pods
if pod.get("deployment") == value
]
else:
filtered_pods = [
pod for pod in filtered_pods
if pod.get("labels", {}).get(key) == value
]
return [PodStatus(**pod) for pod in filtered_pods]
def get_nodes(self) -> List[NodeStatus]:
"""Get all nodes as Pydantic models"""
return [NodeStatus(**node) for node in self.cluster_state["nodes"]]
def get_deployments(self) -> List[DeploymentStatus]:
"""Get all deployments as Pydantic models"""
return [DeploymentStatus(**dep) for dep in self.cluster_state["deployments"]]
def get_services(self) -> List[ServiceStatus]:
"""Get all services as Pydantic models"""
return [ServiceStatus(**svc) for svc in self.cluster_state["services"]]
def get_configmaps(self) -> List[ConfigMapStatus]:
"""Get all configmaps as Pydantic models"""
return [ConfigMapStatus(**cm) for cm in self.cluster_state["configmaps"]]
def get_hpas(self) -> List[HPAStatus]:
"""Get all HPAs as Pydantic models"""
return [HPAStatus(**hpa) for hpa in self.cluster_state["hpas"]]
def get_events(self) -> List[ClusterEvent]:
"""Get all events"""
return self.events.copy()
def apply_patch(self, resource_type: str, name: str, patch: Dict[str, Any]) -> bool:
"""Apply a patch to a resource"""
try:
if resource_type == "deployment":
for dep in self.cluster_state["deployments"]:
if dep["name"] == name:
dep.update(patch)
dep["last_updated"] = datetime.now().isoformat()
if "desired_replicas" in patch or "available_replicas" in patch:
self._update_pods_for_deployment(name, dep.get("desired_replicas", dep["desired_replicas"]))
return True
elif resource_type == "pod":
for pod in self.cluster_state["pods"]:
if pod["name"] == name:
pod.update(patch)
pod["last_updated"] = datetime.now().isoformat()
return True
elif resource_type == "node":
for node in self.cluster_state["nodes"]:
if node["name"] == name:
node.update(patch)
node["last_updated"] = datetime.now().isoformat()
return True
elif resource_type == "service":
for svc in self.cluster_state["services"]:
if svc["name"] == name:
svc.update(patch)
svc["last_updated"] = datetime.now().isoformat()
return True
elif resource_type == "configmap":
for cm in self.cluster_state["configmaps"]:
if cm["name"] == name:
cm.update(patch)
cm["last_updated"] = datetime.now().isoformat()
return True
elif resource_type == "hpa":
for hpa in self.cluster_state["hpas"]:
if hpa["name"] == name:
hpa.update(patch)
hpa["last_updated"] = datetime.now().isoformat()
return True
return False
except Exception as e:
print(f"Error applying patch: {e}")
return False
def _update_pods_for_deployment(self, deployment_name: str, desired_replicas: int):
"""Update pods count for a deployment"""
current_pods = [p for p in self.cluster_state["pods"] if p.get("deployment") == deployment_name]
current_count = len(current_pods)
if desired_replicas > current_count:
nodes = self.cluster_state["nodes"]
for i in range(desired_replicas - current_count):
deployment = next((d for d in self.cluster_state["deployments"] if d["name"] == deployment_name), None)
if deployment:
pod_name = f"{deployment_name}-{int(self.rng.integers(1000, 10000))}-{self._random_suffix()}"
node = nodes[i % len(nodes)] if nodes else None
self.cluster_state["pods"].append({
"name": pod_name,
"status": "Pending",
"node": node["name"] if node else None,
"restarts": 0,
"cpu_request": self.config.get("pod_cpu_request", 500),
"mem_request": self.config.get("pod_mem_request", 256),
"cpu_limit": self.config.get("pod_cpu_limit", 1000),
"mem_limit": self.config.get("pod_mem_limit", 512),
"deployment": deployment_name,
"last_updated": datetime.now().isoformat()
})
elif desired_replicas < current_count:
pods_to_remove = current_pods[desired_replicas:]
for pod in pods_to_remove:
self.cluster_state["pods"].remove(pod)
def scale(self, deployment_name: str, replicas: int) -> bool:
"""Changes replica count"""
return self.apply_patch("deployment", deployment_name, {"desired_replicas": replicas})
def delete_pod(self, pod_name: str) -> bool:
"""Removes a pod (it gets recreated by the deployment controller on next tick)"""
pod_index = None
for i, pod in enumerate(self.cluster_state["pods"]):
if pod["name"] == pod_name:
pod_index = i
break
if pod_index is not None:
del self.cluster_state["pods"][pod_index]
event_type: Literal["Normal"] = "Normal" # type: ignore
event = ClusterEvent(
event_id=f"event-delpod-{int(self.rng.integers(1000, 10000))}",
timestamp=datetime.now().isoformat(),
type=event_type,
reason="UserDeleted",
message=f"pod/{pod_name} deleted by user",
involved_object=pod_name
)
self.events.append(event)
return True
return False
def rollout_restart(self, deployment: str) -> bool:
"""Restart a deployment rollout"""
# Delete all pods for this deployment - they'll get recreated with new config
pods_to_delete = [p for p in self.cluster_state["pods"] if p.get("deployment") == deployment]
for pod in pods_to_delete:
event_type: Literal["Normal"] = "Normal" # type: ignore
event = ClusterEvent(
event_id=f"event-restart-{int(self.rng.integers(1000, 10000))}",
timestamp=datetime.now().isoformat(),
type=event_type,
reason="RolledOut",
message=f"Deployment {deployment} rollout restart triggered",
involved_object=deployment
)
self.events.append(event)
# Delete pods - they'll be recreated on next tick
self.cluster_state["pods"] = [p for p in self.cluster_state["pods"] if p.get("deployment") != deployment]
return True
def set_hpa(self, deployment: str, min_replicas: int, max_replicas: int, cpu_target_percent: int) -> bool:
"""Create or update an HPA configuration for a deployment."""
target_deployment = next(
(d for d in self.cluster_state["deployments"] if d["name"] == deployment),
None,
)
if target_deployment is None:
return False
hpa_name = f"{deployment}-hpa"
now = datetime.now().isoformat()
existing_hpa = next((h for h in self.cluster_state["hpas"] if h.get("name") == hpa_name), None)
if existing_hpa is None:
self.cluster_state["hpas"].append({
"name": hpa_name,
"min_replicas": min_replicas,
"max_replicas": max_replicas,
"current_replicas": max(min_replicas, min(target_deployment["desired_replicas"], max_replicas)),
"cpu_target_percent": cpu_target_percent,
"last_updated": now,
})
else:
existing_hpa.update({
"min_replicas": min_replicas,
"max_replicas": max_replicas,
"cpu_target_percent": cpu_target_percent,
"current_replicas": max(min_replicas, min(target_deployment["desired_replicas"], max_replicas)),
"last_updated": now,
})
# Keep the deployment desired replicas within configured HPA bounds.
bounded_replicas = max(min_replicas, min(target_deployment["desired_replicas"], max_replicas))
target_deployment["desired_replicas"] = bounded_replicas
target_deployment["last_updated"] = now
event_type: Literal["Normal"] = "Normal" # type: ignore
self.events.append(ClusterEvent(
event_id=f"event-hpa-{int(self.rng.integers(1000, 10000))}",
timestamp=now,
type=event_type,
reason="HorizontalPodAutoscalerUpdated",
message=(
f"HPA configured for deployment/{deployment}: "
f"min={min_replicas}, max={max_replicas}, cpu_target={cpu_target_percent}%"
),
involved_object=deployment,
))
return True
def drain_node(self, node_name: str) -> bool:
"""Mark a node unschedulable and evict/reschedule pods currently on it."""
node = next((n for n in self.cluster_state["nodes"] if n["name"] == node_name), None)
if node is None:
return False
node["status"] = "SchedulingDisabled"
node["last_updated"] = datetime.now().isoformat()
candidate_nodes = [
n for n in self.cluster_state["nodes"]
if n["name"] != node_name and n.get("status") == "Ready"
]
pods_on_node = [p for p in self.cluster_state["pods"] if p.get("node") == node_name]
for i, pod in enumerate(pods_on_node):
replacement = candidate_nodes[i % len(candidate_nodes)] if candidate_nodes else None
pod["node"] = replacement["name"] if replacement else None
pod["status"] = "Pending"
pod["last_updated"] = datetime.now().isoformat()
event_type: Literal["Normal"] = "Normal" # type: ignore
self.events.append(ClusterEvent(
event_id=f"event-evict-{int(self.rng.integers(1000, 10000))}",
timestamp=datetime.now().isoformat(),
type=event_type,
reason="Evicted",
message=f"pod/{pod['name']} evicted from drained node/{node_name}",
involved_object=pod["name"],
))
event_type: Literal["Normal"] = "Normal" # type: ignore
self.events.append(ClusterEvent(
event_id=f"event-drain-{int(self.rng.integers(1000, 10000))}",
timestamp=datetime.now().isoformat(),
type=event_type,
reason="NodeDrained",
message=f"node/{node_name} cordoned and drained",
involved_object=node_name,
))
return True
def describe(self, resource_type: str, name: str) -> Dict[str, Any]:
"""Return kubectl-describe style details for a specific resource."""
collection_map = {
"deployment": "deployments",
"pod": "pods",
"node": "nodes",
"service": "services",
"configmap": "configmaps",
"hpa": "hpas",
}
collection_name = collection_map.get(resource_type)
if collection_name is None:
return {
"type": resource_type,
"name": name,
"found": False,
"error": f"Unsupported resource_type: {resource_type}",
}
resource = next(
(item for item in self.cluster_state.get(collection_name, []) if item.get("name") == name),
None,
)
if resource is None:
return {
"type": resource_type,
"name": name,
"found": False,
"error": f"{resource_type} '{name}' not found",
}
related_pods = []
if resource_type == "deployment":
related_pods = [p for p in self.cluster_state["pods"] if p.get("deployment") == name]
elif resource_type == "node":
related_pods = [p for p in self.cluster_state["pods"] if p.get("node") == name]
elif resource_type == "service":
selector_app = resource.get("selector", {}).get("app")
if selector_app:
related_pods = [p for p in self.cluster_state["pods"] if p.get("deployment") == selector_app]
related_events = [e.model_dump() for e in self.events if e.involved_object in {name, resource_type}]
return {
"type": resource_type,
"name": name,
"found": True,
"resource": dict(resource),
"related_pods": related_pods,
"recent_events": related_events[-10:],
"step": self.step_count,
"timestamp": datetime.now().isoformat(),
}
def tick(self):
"""Advances simulated time by one step. Pods in CrashLoopBackOff increment their restart counter. Pending pods on ready nodes eventually transition to Running. Dead nodes stay dead unless drained."""
self.step_count += 1
# Simulate some natural changes in resource usage
for node in self.cluster_state["nodes"]:
node["cpu_usage"] = max(0, min(100, node["cpu_usage"] + float(self.rng.uniform(-5, 5))))
node["mem_usage"] = max(0, min(100, node["mem_usage"] + float(self.rng.uniform(-5, 5))))
node["last_updated"] = datetime.now().isoformat()
# Update pod statuses based on node status
for pod in self.cluster_state["pods"]:
node_name = pod.get("node")
if node_name:
node = next((n for n in self.cluster_state["nodes"] if n["name"] == node_name), None)
if node and node["status"] != "Ready":
if pod["status"] == "Running":
pod["status"] = "Unknown"
elif pod["status"] == "Pending":
pod["status"] = "Unknown"
elif node and node["status"] == "Ready" and pod["status"] == "Pending":
if float(self.rng.random()) > 0.7:
pod["status"] = "Running"
pod["last_updated"] = datetime.now().isoformat()
# Update deployment available replicas based on running pods
for deployment in self.cluster_state["deployments"]:
running_pods = [p for p in self.cluster_state["pods"]
if p.get("deployment") == deployment["name"] and p["status"] == "Running"]
deployment["available_replicas"] = len(running_pods)
deployment["last_updated"] = datetime.now().isoformat()
# Re-create pods for deployments that need them
for deployment in self.cluster_state["deployments"]:
desired = deployment.get("desired_replicas", 0)
current_pods = [p for p in self.cluster_state["pods"] if p.get("deployment") == deployment["name"]]
current_count = len(current_pods)
if current_count < desired:
nodes = self.cluster_state["nodes"]
for i in range(desired - current_count):
pod_name = f"{deployment['name']}-{int(self.rng.integers(1000, 10000))}-{self._random_suffix()}"
node = nodes[i % len(nodes)] if nodes else None
self.cluster_state["pods"].append({
"name": pod_name,
"status": "Running",
"node": node["name"] if node else None,
"restarts": 0,
"cpu_request": self.config.get("pod_cpu_request", 500),
"mem_request": self.config.get("pod_mem_request", 256),
"cpu_limit": self.config.get("pod_cpu_limit", 1000),
"mem_limit": self.config.get("pod_mem_limit", 512),
"deployment": deployment["name"],
"last_updated": datetime.now().isoformat()
})
# Generate occasional events
if float(self.rng.random()) < 0.3:
self._generate_event()
def _generate_event(self):
"""Generate a realistic cluster event"""
event_types = [
{"type": "Normal", "reason": "Scheduled", "message": "Successfully assigned node"},
{"type": "Warning", "reason": "FailedScheduling", "message": "0/3 nodes are available: 3 Insufficient cpu."},
{"type": "Normal", "reason": "Pulling", "message": "Pulling image \"nginx:1.21\""},
{"type": "Normal", "reason": "Pulled", "message": "Successfully pulled image \"nginx:1.21\""},
{"type": "Normal", "reason": "Created", "message": "Created container"},
{"type": "Normal", "reason": "Started", "message": "Started container"},
{"type": "Warning", "reason": "BackOff", "message": "Back-off restarting failed container"},
{"type": "Normal", "reason": "Killing", "message": "Stopping container"}
]
event = self.rng.choice(event_types)
involved_objects = []
involved_objects.extend([p["name"] for p in self.cluster_state["pods"][:3]])
involved_objects.extend([d["name"] for d in self.cluster_state["deployments"][:3]])
involved_objects.extend([n["name"] for n in self.cluster_state["nodes"][:3]])
if not involved_objects:
involved_objects = ["cluster"]
event_type: Literal["Normal", "Warning"] = event["type"] # type: ignore
self.events.append(ClusterEvent(
event_id=f"event-{self._event_counter:04d}",
timestamp=datetime.now().isoformat(),
type=event_type,
reason=event["reason"],
message=event["message"],
involved_object=str(self.rng.choice(involved_objects))
))
self._event_counter += 1
if len(self.events) > 100:
self.events = self.events[-50:]
def get_full_state(self) -> Dict[str, Any]:
"""Get the full cluster state for debugging"""
return {
"nodes": self.get_nodes(),
"pods": self.get_pods(),
"deployments": self.get_deployments(),
"services": self.get_services(),
"configmaps": self.get_configmaps(),
"hpas": self.get_hpas(),
"events": self.get_events(),
"step": self.step_count
}
def reset_to_healthy(self):
"""Reset cluster to healthy state"""
self.cluster_state = self._initialize_healthy_cluster()
self.step_count = 0
self.events = []
self._event_counter = 0
def reset(self, condition=None):
"""Reset the world state and optionally inject a failure condition"""
self.reset_to_healthy()
if condition:
condition.inject()
return self.get_observation()
def get_observation(self, objective: str = "Maintain cluster health"):
"""Serialises the current state into a ClusterObservation Pydantic model"""
observation_dict = self.get_full_state()
observation_dict["objective"] = objective
return ClusterObservation(**observation_dict)