"""Kubernetes manifest validator and simulator — deterministic, rule-based.""" import re from typing import Any, Dict, List, Optional import yaml from server.models import FileContent # Valid top-level K8s resource kinds we recognise VALID_KINDS = { "Deployment", "StatefulSet", "DaemonSet", "ReplicaSet", "Pod", "Service", "Ingress", "ConfigMap", "Secret", "PersistentVolumeClaim", "PersistentVolume", "Job", "CronJob", "Namespace", "ServiceAccount", "Role", "RoleBinding", "ClusterRole", "ClusterRoleBinding", "HorizontalPodAutoscaler", "NetworkPolicy", } VALID_API_VERSIONS = { "v1", "apps/v1", "batch/v1", "networking.k8s.io/v1", "rbac.authorization.k8s.io/v1", "autoscaling/v2", "autoscaling/v1", "policy/v1", } def _parse_memory(mem_str: str) -> int: """Parse K8s memory string to bytes.""" mem_str = str(mem_str).strip() multipliers = { "Ki": 1024, "Mi": 1024**2, "Gi": 1024**3, "Ti": 1024**4, "K": 1000, "M": 1000**2, "G": 1000**3, "T": 1000**4, } for suffix, mult in multipliers.items(): if mem_str.endswith(suffix): return int(mem_str[:-len(suffix)]) * mult if mem_str.isdigit(): return int(mem_str) return 0 class KubernetesSimulator: """Simulates kubectl apply / kubectl get output. Validates K8s manifests without a real cluster. """ def validate(self, manifests: Dict[str, FileContent]) -> Dict[str, Any]: """Validate all Kubernetes manifests in the file set. Returns dict with keys: valid: bool errors: list of error strings pod_status: simulated pod status service_status: simulated service endpoint status """ k8s_files: Dict[str, Any] = {} errors: List[str] = [] # Parse all K8s YAML files for path, fc in manifests.items(): if fc.file_type.value != "kubernetes": continue try: docs = list(yaml.safe_load_all(fc.content)) for doc in docs: if doc and isinstance(doc, dict): k8s_files[path] = doc except yaml.YAMLError as exc: errors.append(f"YAML parse error in {path}: {exc}") if not k8s_files and not errors: return {"valid": True, "errors": [], "pod_status": "N/A", "service_status": "N/A"} if errors: return {"valid": False, "errors": errors, "pod_status": "Error", "service_status": "Error"} # Validate each manifest all_resources: List[Dict[str, Any]] = [] for path, doc in k8s_files.items(): resource_errors = self._validate_resource(path, doc) errors.extend(resource_errors) all_resources.append({"path": path, "doc": doc}) # Cross-resource validation cross_errors = self._validate_cross_resources(all_resources) errors.extend(cross_errors) # Simulate pod status pod_status = self._simulate_pod_status(all_resources) service_status = self._simulate_service_status(all_resources) return { "valid": len(errors) == 0, "errors": errors, "pod_status": pod_status, "service_status": service_status, } def _validate_resource(self, path: str, doc: Dict[str, Any]) -> List[str]: """Validate a single K8s resource document.""" errors: List[str] = [] kind = doc.get("kind", "") api_version = doc.get("apiVersion", "") if not kind: errors.append(f"{path}: missing 'kind' field") elif kind not in VALID_KINDS: errors.append(f"{path}: unknown kind '{kind}'") if not api_version: errors.append(f"{path}: missing 'apiVersion' field") elif api_version not in VALID_API_VERSIONS: errors.append(f"{path}: unknown apiVersion '{api_version}'") metadata = doc.get("metadata", {}) if not isinstance(metadata, dict) or not metadata.get("name"): errors.append(f"{path}: metadata.name is required") # Kind-specific validation if kind == "Deployment": errors.extend(self._validate_deployment(path, doc)) elif kind == "Service": errors.extend(self._validate_service(path, doc)) elif kind == "Ingress": errors.extend(self._validate_ingress(path, doc)) return errors def _validate_deployment(self, path: str, doc: Dict[str, Any]) -> List[str]: errors: List[str] = [] spec = doc.get("spec", {}) if not isinstance(spec, dict): errors.append(f"{path}: Deployment spec must be a mapping") return errors selector = spec.get("selector", {}) template = spec.get("template", {}) if not selector or not selector.get("matchLabels"): errors.append(f"{path}: Deployment must have spec.selector.matchLabels") return errors tmpl_labels = template.get("metadata", {}).get("labels", {}) sel_labels = selector.get("matchLabels", {}) # selector must match template labels for k, v in sel_labels.items(): if tmpl_labels.get(k) != v: errors.append( f"{path}: selector matchLabels ({k}={v}) does not match template labels" ) # Validate containers containers = template.get("spec", {}).get("containers", []) if not containers: errors.append(f"{path}: Deployment must have at least one container") for c in containers: if not c.get("image"): errors.append(f"{path}: container '{c.get('name', '?')}' missing image") return errors def _validate_service(self, path: str, doc: Dict[str, Any]) -> List[str]: errors: List[str] = [] spec = doc.get("spec", {}) if not isinstance(spec, dict): errors.append(f"{path}: Service spec must be a mapping") return errors if not spec.get("selector"): errors.append(f"{path}: Service must have spec.selector") ports = spec.get("ports", []) if not ports: errors.append(f"{path}: Service must define at least one port") for p in ports: if not p.get("port"): errors.append(f"{path}: Service port entry missing 'port' field") return errors def _validate_ingress(self, path: str, doc: Dict[str, Any]) -> List[str]: errors: List[str] = [] spec = doc.get("spec", {}) rules = spec.get("rules", []) if not rules: errors.append(f"{path}: Ingress must define at least one rule") return errors def _validate_cross_resources(self, resources: List[Dict[str, Any]]) -> List[str]: """Validate cross-resource dependencies (e.g. Service selector matches Deployment labels).""" errors: List[str] = [] # Collect all pod labels from Deployments/StatefulSets pod_labels_by_name: Dict[str, Dict[str, str]] = {} for r in resources: doc = r["doc"] kind = doc.get("kind", "") if kind in ("Deployment", "StatefulSet", "DaemonSet"): tmpl = doc.get("spec", {}).get("template", {}) labels = tmpl.get("metadata", {}).get("labels", {}) name = doc.get("metadata", {}).get("name", "?") pod_labels_by_name[name] = labels # Check Service selectors match some pod labels for r in resources: doc = r["doc"] if doc.get("kind") != "Service": continue svc_name = doc.get("metadata", {}).get("name", "?") selector = doc.get("spec", {}).get("selector", {}) if not selector: continue matched = False for dep_name, labels in pod_labels_by_name.items(): if all(labels.get(k) == v for k, v in selector.items()): matched = True break if not matched and pod_labels_by_name: errors.append( f"Service '{svc_name}' selector {selector} does not match any pod labels" ) return errors def _simulate_pod_status(self, resources: List[Dict[str, Any]]) -> str: """Simulate what pod status would be.""" for r in resources: doc = r["doc"] kind = doc.get("kind", "") if kind not in ("Deployment", "StatefulSet", "DaemonSet", "Pod"): continue if kind == "Pod": containers = doc.get("spec", {}).get("containers", []) else: containers = doc.get("spec", {}).get("template", {}).get("spec", {}).get("containers", []) for c in containers: image = c.get("image", "") # Check for image typos (common: latset, lates, etc.) if image and ":" in image: tag = image.split(":")[-1] if tag in ("latset", "lates", "latets"): return "ImagePullBackOff" # Check for hardcoded placeholder images if "OWNER/REPO" in image or "TAG" in image: return "ImagePullBackOff" # Check memory limits resources_spec = c.get("resources", {}) limits = resources_spec.get("limits", {}) mem_limit = limits.get("memory", "") if mem_limit: mem_bytes = _parse_memory(str(mem_limit)) # Simulate OOM if memory limit is very low if 0 < mem_bytes < 128 * 1024 * 1024: # < 128Mi return "CrashLoopBackOff (OOMKilled)" # Check command command = c.get("command", []) if command and isinstance(command, list): if any("wrong" in str(cmd).lower() or "typo" in str(cmd).lower() for cmd in command): return "CrashLoopBackOff" # Check env refs to missing configmaps env_from = c.get("envFrom", []) for ef in env_from: cm_ref = ef.get("configMapRef", {}) if cm_ref and cm_ref.get("name"): # Check if configmap exists in resources cm_exists = any( res["doc"].get("kind") == "ConfigMap" and res["doc"].get("metadata", {}).get("name") == cm_ref["name"] for res in resources ) if not cm_exists: return f"CreateContainerConfigError (ConfigMap '{cm_ref['name']}' not found)" return "Running" def _simulate_service_status(self, resources: List[Dict[str, Any]]) -> str: """Simulate service endpoint status.""" services = [r for r in resources if r["doc"].get("kind") == "Service"] deployments = [r for r in resources if r["doc"].get("kind") in ("Deployment", "StatefulSet")] if not services: return "N/A" for svc_r in services: svc = svc_r["doc"] selector = svc.get("spec", {}).get("selector", {}) if not selector: continue matched = False for dep_r in deployments: dep = dep_r["doc"] tmpl_labels = dep.get("spec", {}).get("template", {}).get("metadata", {}).get("labels", {}) if all(tmpl_labels.get(k) == v for k, v in selector.items()): matched = True # Check port matching svc_ports = svc.get("spec", {}).get("ports", []) container_ports = [] for c in dep.get("spec", {}).get("template", {}).get("spec", {}).get("containers", []): for p in (c.get("ports") or []): container_ports.append(p.get("containerPort")) for sp in svc_ports: tp = sp.get("targetPort") if tp and tp not in container_ports and container_ports: return f"Service port mismatch: targetPort {tp} not in container ports {container_ports}" break if not matched: svc_name = svc.get("metadata", {}).get("name", "?") return f"No endpoints (selector {selector} matches no pods)" return "Endpoints active"