Spaces:
Sleeping
Sleeping
| """ | |
| coenv Environment - Cluster Simulator | |
| In-memory dict that holds cluster state: nodes, pods, deployments, services. | |
| Has methods like get_pods(), apply_patch(), tick() to advance time. | |
| This is the brain of the whole project. | |
| """ | |
| from typing import Dict, List, Any, Optional, Literal | |
| from datetime import datetime | |
| import numpy as np | |
| from .models import ( | |
| NodeStatus, PodStatus, DeploymentStatus, ServiceStatus, | |
| ClusterEvent, ClusterObservation, | |
| ConfigMapStatus, HPAStatus | |
| ) | |
| from .utils import set_random_seed | |
| class World: | |
| """In-memory Kubernetes cluster simulator""" | |
| def __init__(self, config: Dict[str, Any], seed: Optional[int] = None): | |
| self.config = config | |
| self.seed = seed | |
| self.rng = np.random.default_rng(seed) | |
| set_random_seed(seed) | |
| self.cluster_state = self._initialize_healthy_cluster() | |
| self.step_count = 0 | |
| self.events = [] | |
| self._event_counter = 0 | |
| def _random_suffix(self, length: int = 5) -> str: | |
| """Generate a random lowercase alphabetic suffix.""" | |
| letters = self.rng.integers(97, 123, size=length) | |
| return "".join(chr(int(code)) for code in letters) | |
| def _initialize_healthy_cluster(self) -> Dict[str, List[Dict]]: | |
| """Initialize a healthy cluster state based on config""" | |
| nodes = [] | |
| for i in range(self.config.get("num_nodes", 3)): | |
| nodes.append({ | |
| "name": f"node-{i+1}", | |
| "status": "Ready", | |
| "cpu_capacity": self.config.get("node_cpu_capacity", 4), | |
| "mem_capacity": self.config.get("node_mem_capacity", 8192), | |
| "cpu_usage": 0.0, | |
| "mem_usage": 0.0, | |
| "last_updated": datetime.now().isoformat() | |
| }) | |
| pods = [] | |
| deployments = [] | |
| services = [] | |
| configmaps = [] | |
| hpas = [] | |
| # Create some default deployments and their pods | |
| default_deployments = [ | |
| {"name": "frontend", "image": "nginx:1.21", "replicas": 3}, | |
| {"name": "backend", "image": "python:3.9", "replicas": 2}, | |
| {"name": "database", "image": "postgres:13", "replicas": 1}, | |
| {"name": "auth-service", "image": "auth:latest", "replicas": 2}, | |
| {"name": "api-gateway", "image": "nginx:alpine", "replicas": 2} | |
| ] | |
| for dep in default_deployments: | |
| deployments.append({ | |
| "name": dep["name"], | |
| "desired_replicas": dep["replicas"], | |
| "available_replicas": dep["replicas"], | |
| "image": dep["image"], | |
| "last_updated": datetime.now().isoformat() | |
| }) | |
| # Create pods for this deployment | |
| for j in range(dep["replicas"]): | |
| pod_name = f"{dep['name']}-{int(self.rng.integers(1000, 10000))}-{self._random_suffix()}" | |
| pods.append({ | |
| "name": pod_name, | |
| "status": "Running", | |
| "node": nodes[j % len(nodes)]["name"] if nodes else None, | |
| "restarts": 0, | |
| "cpu_request": self.config.get("pod_cpu_request", 500), | |
| "mem_request": self.config.get("pod_mem_request", 256), | |
| "cpu_limit": self.config.get("pod_cpu_limit", 1000), | |
| "mem_limit": self.config.get("pod_mem_limit", 512), | |
| "deployment": dep["name"], | |
| "last_updated": datetime.now().isoformat() | |
| }) | |
| # Create some default services | |
| default_services = [ | |
| {"name": "frontend-service", "type": "ClusterIP", "ports": [{"port": 80, "targetPort": 80}]}, | |
| {"name": "backend-service", "type": "ClusterIP", "ports": [{"port": 8080, "targetPort": 8080}]}, | |
| {"name": "database-service", "type": "ClusterIP", "ports": [{"port": 5432, "targetPort": 5432}]}, | |
| {"name": "auth-service-service", "type": "ClusterIP", "ports": [{"port": 8000, "targetPort": 8000}]} | |
| ] | |
| for svc in default_services: | |
| services.append({ | |
| "name": svc["name"], | |
| "type": svc["type"], | |
| "ports": svc["ports"], | |
| "selector": {"app": svc["name"].replace("-service", "")}, | |
| "cluster_ip": f"10.96.{len(services)+1}.{len(services)+1}", | |
| "last_updated": datetime.now().isoformat() | |
| }) | |
| # Create some default configmaps | |
| default_configmaps = [ | |
| {"name": "frontend-config", "data": {"DB_HOST": "db.prod.internal", "DB_PORT": "5432"}}, | |
| {"name": "backend-config", "data": {"LOG_LEVEL": "info", "CACHE_SIZE": "100"}}, | |
| {"name": "database-config", "data": {"MAX_CONNECTIONS": "100", "TIMEOUT": "30"}} | |
| ] | |
| for cm in default_configmaps: | |
| configmaps.append({ | |
| "name": cm["name"], | |
| "data": cm["data"], | |
| "last_updated": datetime.now().isoformat() | |
| }) | |
| # Create some default HPAs | |
| default_hpas = [ | |
| {"name": "frontend-hpa", "min_replicas": 2, "max_replicas": 10, "cpu_target_percent": 70}, | |
| {"name": "backend-hpa", "min_replicas": 1, "max_replicas": 5, "cpu_target_percent": 80} | |
| ] | |
| for hpa in default_hpas: | |
| hpas.append({ | |
| "name": hpa["name"], | |
| "min_replicas": hpa["min_replicas"], | |
| "max_replicas": hpa["max_replicas"], | |
| "current_replicas": hpa["min_replicas"], | |
| "cpu_target_percent": hpa["cpu_target_percent"], | |
| "last_updated": datetime.now().isoformat() | |
| }) | |
| return { | |
| "nodes": nodes, | |
| "pods": pods, | |
| "deployments": deployments, | |
| "services": services, | |
| "configmaps": configmaps, | |
| "hpas": hpas | |
| } | |
| def get_pods(self, namespace: Optional[str] = None, selector: Optional[Dict[str, str]] = None) -> List[PodStatus]: | |
| """Returns filtered pod list (mimics kubectl get pods)""" | |
| filtered_pods = self.cluster_state["pods"] | |
| if namespace is not None: | |
| filtered_pods = [ | |
| pod for pod in filtered_pods | |
| if pod.get("namespace", "default") == namespace | |
| ] | |
| if selector: | |
| for key, value in selector.items(): | |
| if key in {"app", "deployment"}: | |
| filtered_pods = [ | |
| pod for pod in filtered_pods | |
| if pod.get("deployment") == value | |
| ] | |
| else: | |
| filtered_pods = [ | |
| pod for pod in filtered_pods | |
| if pod.get("labels", {}).get(key) == value | |
| ] | |
| return [PodStatus(**pod) for pod in filtered_pods] | |
| def get_nodes(self) -> List[NodeStatus]: | |
| """Get all nodes as Pydantic models""" | |
| return [NodeStatus(**node) for node in self.cluster_state["nodes"]] | |
| def get_deployments(self) -> List[DeploymentStatus]: | |
| """Get all deployments as Pydantic models""" | |
| return [DeploymentStatus(**dep) for dep in self.cluster_state["deployments"]] | |
| def get_services(self) -> List[ServiceStatus]: | |
| """Get all services as Pydantic models""" | |
| return [ServiceStatus(**svc) for svc in self.cluster_state["services"]] | |
| def get_configmaps(self) -> List[ConfigMapStatus]: | |
| """Get all configmaps as Pydantic models""" | |
| return [ConfigMapStatus(**cm) for cm in self.cluster_state["configmaps"]] | |
| def get_hpas(self) -> List[HPAStatus]: | |
| """Get all HPAs as Pydantic models""" | |
| return [HPAStatus(**hpa) for hpa in self.cluster_state["hpas"]] | |
| def get_events(self) -> List[ClusterEvent]: | |
| """Get all events""" | |
| return self.events.copy() | |
| def apply_patch(self, resource_type: str, name: str, patch: Dict[str, Any]) -> bool: | |
| """Apply a patch to a resource""" | |
| try: | |
| if resource_type == "deployment": | |
| for dep in self.cluster_state["deployments"]: | |
| if dep["name"] == name: | |
| dep.update(patch) | |
| dep["last_updated"] = datetime.now().isoformat() | |
| if "desired_replicas" in patch or "available_replicas" in patch: | |
| self._update_pods_for_deployment(name, dep.get("desired_replicas", dep["desired_replicas"])) | |
| return True | |
| elif resource_type == "pod": | |
| for pod in self.cluster_state["pods"]: | |
| if pod["name"] == name: | |
| pod.update(patch) | |
| pod["last_updated"] = datetime.now().isoformat() | |
| return True | |
| elif resource_type == "node": | |
| for node in self.cluster_state["nodes"]: | |
| if node["name"] == name: | |
| node.update(patch) | |
| node["last_updated"] = datetime.now().isoformat() | |
| return True | |
| elif resource_type == "service": | |
| for svc in self.cluster_state["services"]: | |
| if svc["name"] == name: | |
| svc.update(patch) | |
| svc["last_updated"] = datetime.now().isoformat() | |
| return True | |
| elif resource_type == "configmap": | |
| for cm in self.cluster_state["configmaps"]: | |
| if cm["name"] == name: | |
| cm.update(patch) | |
| cm["last_updated"] = datetime.now().isoformat() | |
| return True | |
| elif resource_type == "hpa": | |
| for hpa in self.cluster_state["hpas"]: | |
| if hpa["name"] == name: | |
| hpa.update(patch) | |
| hpa["last_updated"] = datetime.now().isoformat() | |
| return True | |
| return False | |
| except Exception as e: | |
| print(f"Error applying patch: {e}") | |
| return False | |
| def _update_pods_for_deployment(self, deployment_name: str, desired_replicas: int): | |
| """Update pods count for a deployment""" | |
| current_pods = [p for p in self.cluster_state["pods"] if p.get("deployment") == deployment_name] | |
| current_count = len(current_pods) | |
| if desired_replicas > current_count: | |
| nodes = self.cluster_state["nodes"] | |
| for i in range(desired_replicas - current_count): | |
| deployment = next((d for d in self.cluster_state["deployments"] if d["name"] == deployment_name), None) | |
| if deployment: | |
| pod_name = f"{deployment_name}-{int(self.rng.integers(1000, 10000))}-{self._random_suffix()}" | |
| node = nodes[i % len(nodes)] if nodes else None | |
| self.cluster_state["pods"].append({ | |
| "name": pod_name, | |
| "status": "Pending", | |
| "node": node["name"] if node else None, | |
| "restarts": 0, | |
| "cpu_request": self.config.get("pod_cpu_request", 500), | |
| "mem_request": self.config.get("pod_mem_request", 256), | |
| "cpu_limit": self.config.get("pod_cpu_limit", 1000), | |
| "mem_limit": self.config.get("pod_mem_limit", 512), | |
| "deployment": deployment_name, | |
| "last_updated": datetime.now().isoformat() | |
| }) | |
| elif desired_replicas < current_count: | |
| pods_to_remove = current_pods[desired_replicas:] | |
| for pod in pods_to_remove: | |
| self.cluster_state["pods"].remove(pod) | |
| def scale(self, deployment_name: str, replicas: int) -> bool: | |
| """Changes replica count""" | |
| return self.apply_patch("deployment", deployment_name, {"desired_replicas": replicas}) | |
| def delete_pod(self, pod_name: str) -> bool: | |
| """Removes a pod (it gets recreated by the deployment controller on next tick)""" | |
| pod_index = None | |
| for i, pod in enumerate(self.cluster_state["pods"]): | |
| if pod["name"] == pod_name: | |
| pod_index = i | |
| break | |
| if pod_index is not None: | |
| del self.cluster_state["pods"][pod_index] | |
| event_type: Literal["Normal"] = "Normal" # type: ignore | |
| event = ClusterEvent( | |
| event_id=f"event-delpod-{int(self.rng.integers(1000, 10000))}", | |
| timestamp=datetime.now().isoformat(), | |
| type=event_type, | |
| reason="UserDeleted", | |
| message=f"pod/{pod_name} deleted by user", | |
| involved_object=pod_name | |
| ) | |
| self.events.append(event) | |
| return True | |
| return False | |
| def rollout_restart(self, deployment: str) -> bool: | |
| """Restart a deployment rollout""" | |
| # Delete all pods for this deployment - they'll get recreated with new config | |
| pods_to_delete = [p for p in self.cluster_state["pods"] if p.get("deployment") == deployment] | |
| for pod in pods_to_delete: | |
| event_type: Literal["Normal"] = "Normal" # type: ignore | |
| event = ClusterEvent( | |
| event_id=f"event-restart-{int(self.rng.integers(1000, 10000))}", | |
| timestamp=datetime.now().isoformat(), | |
| type=event_type, | |
| reason="RolledOut", | |
| message=f"Deployment {deployment} rollout restart triggered", | |
| involved_object=deployment | |
| ) | |
| self.events.append(event) | |
| # Delete pods - they'll be recreated on next tick | |
| self.cluster_state["pods"] = [p for p in self.cluster_state["pods"] if p.get("deployment") != deployment] | |
| return True | |
| def set_hpa(self, deployment: str, min_replicas: int, max_replicas: int, cpu_target_percent: int) -> bool: | |
| """Create or update an HPA configuration for a deployment.""" | |
| target_deployment = next( | |
| (d for d in self.cluster_state["deployments"] if d["name"] == deployment), | |
| None, | |
| ) | |
| if target_deployment is None: | |
| return False | |
| hpa_name = f"{deployment}-hpa" | |
| now = datetime.now().isoformat() | |
| existing_hpa = next((h for h in self.cluster_state["hpas"] if h.get("name") == hpa_name), None) | |
| if existing_hpa is None: | |
| self.cluster_state["hpas"].append({ | |
| "name": hpa_name, | |
| "min_replicas": min_replicas, | |
| "max_replicas": max_replicas, | |
| "current_replicas": max(min_replicas, min(target_deployment["desired_replicas"], max_replicas)), | |
| "cpu_target_percent": cpu_target_percent, | |
| "last_updated": now, | |
| }) | |
| else: | |
| existing_hpa.update({ | |
| "min_replicas": min_replicas, | |
| "max_replicas": max_replicas, | |
| "cpu_target_percent": cpu_target_percent, | |
| "current_replicas": max(min_replicas, min(target_deployment["desired_replicas"], max_replicas)), | |
| "last_updated": now, | |
| }) | |
| # Keep the deployment desired replicas within configured HPA bounds. | |
| bounded_replicas = max(min_replicas, min(target_deployment["desired_replicas"], max_replicas)) | |
| target_deployment["desired_replicas"] = bounded_replicas | |
| target_deployment["last_updated"] = now | |
| event_type: Literal["Normal"] = "Normal" # type: ignore | |
| self.events.append(ClusterEvent( | |
| event_id=f"event-hpa-{int(self.rng.integers(1000, 10000))}", | |
| timestamp=now, | |
| type=event_type, | |
| reason="HorizontalPodAutoscalerUpdated", | |
| message=( | |
| f"HPA configured for deployment/{deployment}: " | |
| f"min={min_replicas}, max={max_replicas}, cpu_target={cpu_target_percent}%" | |
| ), | |
| involved_object=deployment, | |
| )) | |
| return True | |
| def drain_node(self, node_name: str) -> bool: | |
| """Mark a node unschedulable and evict/reschedule pods currently on it.""" | |
| node = next((n for n in self.cluster_state["nodes"] if n["name"] == node_name), None) | |
| if node is None: | |
| return False | |
| node["status"] = "SchedulingDisabled" | |
| node["last_updated"] = datetime.now().isoformat() | |
| candidate_nodes = [ | |
| n for n in self.cluster_state["nodes"] | |
| if n["name"] != node_name and n.get("status") == "Ready" | |
| ] | |
| pods_on_node = [p for p in self.cluster_state["pods"] if p.get("node") == node_name] | |
| for i, pod in enumerate(pods_on_node): | |
| replacement = candidate_nodes[i % len(candidate_nodes)] if candidate_nodes else None | |
| pod["node"] = replacement["name"] if replacement else None | |
| pod["status"] = "Pending" | |
| pod["last_updated"] = datetime.now().isoformat() | |
| event_type: Literal["Normal"] = "Normal" # type: ignore | |
| self.events.append(ClusterEvent( | |
| event_id=f"event-evict-{int(self.rng.integers(1000, 10000))}", | |
| timestamp=datetime.now().isoformat(), | |
| type=event_type, | |
| reason="Evicted", | |
| message=f"pod/{pod['name']} evicted from drained node/{node_name}", | |
| involved_object=pod["name"], | |
| )) | |
| event_type: Literal["Normal"] = "Normal" # type: ignore | |
| self.events.append(ClusterEvent( | |
| event_id=f"event-drain-{int(self.rng.integers(1000, 10000))}", | |
| timestamp=datetime.now().isoformat(), | |
| type=event_type, | |
| reason="NodeDrained", | |
| message=f"node/{node_name} cordoned and drained", | |
| involved_object=node_name, | |
| )) | |
| return True | |
| def describe(self, resource_type: str, name: str) -> Dict[str, Any]: | |
| """Return kubectl-describe style details for a specific resource.""" | |
| collection_map = { | |
| "deployment": "deployments", | |
| "pod": "pods", | |
| "node": "nodes", | |
| "service": "services", | |
| "configmap": "configmaps", | |
| "hpa": "hpas", | |
| } | |
| collection_name = collection_map.get(resource_type) | |
| if collection_name is None: | |
| return { | |
| "type": resource_type, | |
| "name": name, | |
| "found": False, | |
| "error": f"Unsupported resource_type: {resource_type}", | |
| } | |
| resource = next( | |
| (item for item in self.cluster_state.get(collection_name, []) if item.get("name") == name), | |
| None, | |
| ) | |
| if resource is None: | |
| return { | |
| "type": resource_type, | |
| "name": name, | |
| "found": False, | |
| "error": f"{resource_type} '{name}' not found", | |
| } | |
| related_pods = [] | |
| if resource_type == "deployment": | |
| related_pods = [p for p in self.cluster_state["pods"] if p.get("deployment") == name] | |
| elif resource_type == "node": | |
| related_pods = [p for p in self.cluster_state["pods"] if p.get("node") == name] | |
| elif resource_type == "service": | |
| selector_app = resource.get("selector", {}).get("app") | |
| if selector_app: | |
| related_pods = [p for p in self.cluster_state["pods"] if p.get("deployment") == selector_app] | |
| related_events = [e.model_dump() for e in self.events if e.involved_object in {name, resource_type}] | |
| return { | |
| "type": resource_type, | |
| "name": name, | |
| "found": True, | |
| "resource": dict(resource), | |
| "related_pods": related_pods, | |
| "recent_events": related_events[-10:], | |
| "step": self.step_count, | |
| "timestamp": datetime.now().isoformat(), | |
| } | |
| def tick(self): | |
| """Advances simulated time by one step. Pods in CrashLoopBackOff increment their restart counter. Pending pods on ready nodes eventually transition to Running. Dead nodes stay dead unless drained.""" | |
| self.step_count += 1 | |
| # Simulate some natural changes in resource usage | |
| for node in self.cluster_state["nodes"]: | |
| node["cpu_usage"] = max(0, min(100, node["cpu_usage"] + float(self.rng.uniform(-5, 5)))) | |
| node["mem_usage"] = max(0, min(100, node["mem_usage"] + float(self.rng.uniform(-5, 5)))) | |
| node["last_updated"] = datetime.now().isoformat() | |
| # Update pod statuses based on node status | |
| for pod in self.cluster_state["pods"]: | |
| node_name = pod.get("node") | |
| if node_name: | |
| node = next((n for n in self.cluster_state["nodes"] if n["name"] == node_name), None) | |
| if node and node["status"] != "Ready": | |
| if pod["status"] == "Running": | |
| pod["status"] = "Unknown" | |
| elif pod["status"] == "Pending": | |
| pod["status"] = "Unknown" | |
| elif node and node["status"] == "Ready" and pod["status"] == "Pending": | |
| if float(self.rng.random()) > 0.7: | |
| pod["status"] = "Running" | |
| pod["last_updated"] = datetime.now().isoformat() | |
| # Update deployment available replicas based on running pods | |
| for deployment in self.cluster_state["deployments"]: | |
| running_pods = [p for p in self.cluster_state["pods"] | |
| if p.get("deployment") == deployment["name"] and p["status"] == "Running"] | |
| deployment["available_replicas"] = len(running_pods) | |
| deployment["last_updated"] = datetime.now().isoformat() | |
| # Re-create pods for deployments that need them | |
| for deployment in self.cluster_state["deployments"]: | |
| desired = deployment.get("desired_replicas", 0) | |
| current_pods = [p for p in self.cluster_state["pods"] if p.get("deployment") == deployment["name"]] | |
| current_count = len(current_pods) | |
| if current_count < desired: | |
| nodes = self.cluster_state["nodes"] | |
| for i in range(desired - current_count): | |
| pod_name = f"{deployment['name']}-{int(self.rng.integers(1000, 10000))}-{self._random_suffix()}" | |
| node = nodes[i % len(nodes)] if nodes else None | |
| self.cluster_state["pods"].append({ | |
| "name": pod_name, | |
| "status": "Running", | |
| "node": node["name"] if node else None, | |
| "restarts": 0, | |
| "cpu_request": self.config.get("pod_cpu_request", 500), | |
| "mem_request": self.config.get("pod_mem_request", 256), | |
| "cpu_limit": self.config.get("pod_cpu_limit", 1000), | |
| "mem_limit": self.config.get("pod_mem_limit", 512), | |
| "deployment": deployment["name"], | |
| "last_updated": datetime.now().isoformat() | |
| }) | |
| # Generate occasional events | |
| if float(self.rng.random()) < 0.3: | |
| self._generate_event() | |
| def _generate_event(self): | |
| """Generate a realistic cluster event""" | |
| event_types = [ | |
| {"type": "Normal", "reason": "Scheduled", "message": "Successfully assigned node"}, | |
| {"type": "Warning", "reason": "FailedScheduling", "message": "0/3 nodes are available: 3 Insufficient cpu."}, | |
| {"type": "Normal", "reason": "Pulling", "message": "Pulling image \"nginx:1.21\""}, | |
| {"type": "Normal", "reason": "Pulled", "message": "Successfully pulled image \"nginx:1.21\""}, | |
| {"type": "Normal", "reason": "Created", "message": "Created container"}, | |
| {"type": "Normal", "reason": "Started", "message": "Started container"}, | |
| {"type": "Warning", "reason": "BackOff", "message": "Back-off restarting failed container"}, | |
| {"type": "Normal", "reason": "Killing", "message": "Stopping container"} | |
| ] | |
| event = self.rng.choice(event_types) | |
| involved_objects = [] | |
| involved_objects.extend([p["name"] for p in self.cluster_state["pods"][:3]]) | |
| involved_objects.extend([d["name"] for d in self.cluster_state["deployments"][:3]]) | |
| involved_objects.extend([n["name"] for n in self.cluster_state["nodes"][:3]]) | |
| if not involved_objects: | |
| involved_objects = ["cluster"] | |
| event_type: Literal["Normal", "Warning"] = event["type"] # type: ignore | |
| self.events.append(ClusterEvent( | |
| event_id=f"event-{self._event_counter:04d}", | |
| timestamp=datetime.now().isoformat(), | |
| type=event_type, | |
| reason=event["reason"], | |
| message=event["message"], | |
| involved_object=str(self.rng.choice(involved_objects)) | |
| )) | |
| self._event_counter += 1 | |
| if len(self.events) > 100: | |
| self.events = self.events[-50:] | |
| def get_full_state(self) -> Dict[str, Any]: | |
| """Get the full cluster state for debugging""" | |
| return { | |
| "nodes": self.get_nodes(), | |
| "pods": self.get_pods(), | |
| "deployments": self.get_deployments(), | |
| "services": self.get_services(), | |
| "configmaps": self.get_configmaps(), | |
| "hpas": self.get_hpas(), | |
| "events": self.get_events(), | |
| "step": self.step_count | |
| } | |
| def reset_to_healthy(self): | |
| """Reset cluster to healthy state""" | |
| self.cluster_state = self._initialize_healthy_cluster() | |
| self.step_count = 0 | |
| self.events = [] | |
| self._event_counter = 0 | |
| def reset(self, condition=None): | |
| """Reset the world state and optionally inject a failure condition""" | |
| self.reset_to_healthy() | |
| if condition: | |
| condition.inject() | |
| return self.get_observation() | |
| def get_observation(self, objective: str = "Maintain cluster health"): | |
| """Serialises the current state into a ClusterObservation Pydantic model""" | |
| observation_dict = self.get_full_state() | |
| observation_dict["objective"] = objective | |
| return ClusterObservation(**observation_dict) |