Krishna1107's picture
fixed inference
2794920
"""Kubernetes manifest validator and simulator — deterministic, rule-based."""
import re
from typing import Any, Dict, List, Optional
import yaml
from server.models import FileContent
# Valid top-level K8s resource kinds we recognise
VALID_KINDS = {
"Deployment", "StatefulSet", "DaemonSet", "ReplicaSet",
"Pod", "Service", "Ingress", "ConfigMap", "Secret",
"PersistentVolumeClaim", "PersistentVolume",
"Job", "CronJob", "Namespace", "ServiceAccount",
"Role", "RoleBinding", "ClusterRole", "ClusterRoleBinding",
"HorizontalPodAutoscaler", "NetworkPolicy",
}
VALID_API_VERSIONS = {
"v1", "apps/v1", "batch/v1", "networking.k8s.io/v1",
"rbac.authorization.k8s.io/v1", "autoscaling/v2",
"autoscaling/v1", "policy/v1",
}
def _parse_memory(mem_str: str) -> int:
"""Parse K8s memory string to bytes."""
mem_str = str(mem_str).strip()
multipliers = {
"Ki": 1024, "Mi": 1024**2, "Gi": 1024**3, "Ti": 1024**4,
"K": 1000, "M": 1000**2, "G": 1000**3, "T": 1000**4,
}
for suffix, mult in multipliers.items():
if mem_str.endswith(suffix):
return int(mem_str[:-len(suffix)]) * mult
if mem_str.isdigit():
return int(mem_str)
return 0
class KubernetesSimulator:
"""Simulates kubectl apply / kubectl get output.
Validates K8s manifests without a real cluster.
"""
def validate(self, manifests: Dict[str, FileContent]) -> Dict[str, Any]:
"""Validate all Kubernetes manifests in the file set.
Returns dict with keys:
valid: bool
errors: list of error strings
pod_status: simulated pod status
service_status: simulated service endpoint status
"""
k8s_files: Dict[str, Any] = {}
errors: List[str] = []
# Parse all K8s YAML files
for path, fc in manifests.items():
if fc.file_type.value != "kubernetes":
continue
try:
docs = list(yaml.safe_load_all(fc.content))
for doc in docs:
if doc and isinstance(doc, dict):
k8s_files[path] = doc
except yaml.YAMLError as exc:
errors.append(f"YAML parse error in {path}: {exc}")
if not k8s_files and not errors:
return {"valid": True, "errors": [], "pod_status": "N/A", "service_status": "N/A"}
if errors:
return {"valid": False, "errors": errors, "pod_status": "Error", "service_status": "Error"}
# Validate each manifest
all_resources: List[Dict[str, Any]] = []
for path, doc in k8s_files.items():
resource_errors = self._validate_resource(path, doc)
errors.extend(resource_errors)
all_resources.append({"path": path, "doc": doc})
# Cross-resource validation
cross_errors = self._validate_cross_resources(all_resources)
errors.extend(cross_errors)
# Simulate pod status
pod_status = self._simulate_pod_status(all_resources)
service_status = self._simulate_service_status(all_resources)
return {
"valid": len(errors) == 0,
"errors": errors,
"pod_status": pod_status,
"service_status": service_status,
}
def _validate_resource(self, path: str, doc: Dict[str, Any]) -> List[str]:
"""Validate a single K8s resource document."""
errors: List[str] = []
kind = doc.get("kind", "")
api_version = doc.get("apiVersion", "")
if not kind:
errors.append(f"{path}: missing 'kind' field")
elif kind not in VALID_KINDS:
errors.append(f"{path}: unknown kind '{kind}'")
if not api_version:
errors.append(f"{path}: missing 'apiVersion' field")
elif api_version not in VALID_API_VERSIONS:
errors.append(f"{path}: unknown apiVersion '{api_version}'")
metadata = doc.get("metadata", {})
if not isinstance(metadata, dict) or not metadata.get("name"):
errors.append(f"{path}: metadata.name is required")
# Kind-specific validation
if kind == "Deployment":
errors.extend(self._validate_deployment(path, doc))
elif kind == "Service":
errors.extend(self._validate_service(path, doc))
elif kind == "Ingress":
errors.extend(self._validate_ingress(path, doc))
return errors
def _validate_deployment(self, path: str, doc: Dict[str, Any]) -> List[str]:
errors: List[str] = []
spec = doc.get("spec", {})
if not isinstance(spec, dict):
errors.append(f"{path}: Deployment spec must be a mapping")
return errors
selector = spec.get("selector", {})
template = spec.get("template", {})
if not selector or not selector.get("matchLabels"):
errors.append(f"{path}: Deployment must have spec.selector.matchLabels")
return errors
tmpl_labels = template.get("metadata", {}).get("labels", {})
sel_labels = selector.get("matchLabels", {})
# selector must match template labels
for k, v in sel_labels.items():
if tmpl_labels.get(k) != v:
errors.append(
f"{path}: selector matchLabels ({k}={v}) does not match template labels"
)
# Validate containers
containers = template.get("spec", {}).get("containers", [])
if not containers:
errors.append(f"{path}: Deployment must have at least one container")
for c in containers:
if not c.get("image"):
errors.append(f"{path}: container '{c.get('name', '?')}' missing image")
return errors
def _validate_service(self, path: str, doc: Dict[str, Any]) -> List[str]:
errors: List[str] = []
spec = doc.get("spec", {})
if not isinstance(spec, dict):
errors.append(f"{path}: Service spec must be a mapping")
return errors
if not spec.get("selector"):
errors.append(f"{path}: Service must have spec.selector")
ports = spec.get("ports", [])
if not ports:
errors.append(f"{path}: Service must define at least one port")
for p in ports:
if not p.get("port"):
errors.append(f"{path}: Service port entry missing 'port' field")
return errors
def _validate_ingress(self, path: str, doc: Dict[str, Any]) -> List[str]:
errors: List[str] = []
spec = doc.get("spec", {})
rules = spec.get("rules", [])
if not rules:
errors.append(f"{path}: Ingress must define at least one rule")
return errors
def _validate_cross_resources(self, resources: List[Dict[str, Any]]) -> List[str]:
"""Validate cross-resource dependencies (e.g. Service selector matches Deployment labels)."""
errors: List[str] = []
# Collect all pod labels from Deployments/StatefulSets
pod_labels_by_name: Dict[str, Dict[str, str]] = {}
for r in resources:
doc = r["doc"]
kind = doc.get("kind", "")
if kind in ("Deployment", "StatefulSet", "DaemonSet"):
tmpl = doc.get("spec", {}).get("template", {})
labels = tmpl.get("metadata", {}).get("labels", {})
name = doc.get("metadata", {}).get("name", "?")
pod_labels_by_name[name] = labels
# Check Service selectors match some pod labels
for r in resources:
doc = r["doc"]
if doc.get("kind") != "Service":
continue
svc_name = doc.get("metadata", {}).get("name", "?")
selector = doc.get("spec", {}).get("selector", {})
if not selector:
continue
matched = False
for dep_name, labels in pod_labels_by_name.items():
if all(labels.get(k) == v for k, v in selector.items()):
matched = True
break
if not matched and pod_labels_by_name:
errors.append(
f"Service '{svc_name}' selector {selector} does not match any pod labels"
)
return errors
def _simulate_pod_status(self, resources: List[Dict[str, Any]]) -> str:
"""Simulate what pod status would be."""
for r in resources:
doc = r["doc"]
kind = doc.get("kind", "")
if kind not in ("Deployment", "StatefulSet", "DaemonSet", "Pod"):
continue
if kind == "Pod":
containers = doc.get("spec", {}).get("containers", [])
else:
containers = doc.get("spec", {}).get("template", {}).get("spec", {}).get("containers", [])
for c in containers:
image = c.get("image", "")
# Check for image typos (common: latset, lates, etc.)
if image and ":" in image:
tag = image.split(":")[-1]
if tag in ("latset", "lates", "latets"):
return "ImagePullBackOff"
# Check for hardcoded placeholder images
if "OWNER/REPO" in image or "TAG" in image:
return "ImagePullBackOff"
# Check memory limits
resources_spec = c.get("resources", {})
limits = resources_spec.get("limits", {})
mem_limit = limits.get("memory", "")
if mem_limit:
mem_bytes = _parse_memory(str(mem_limit))
# Simulate OOM if memory limit is very low
if 0 < mem_bytes < 128 * 1024 * 1024: # < 128Mi
return "CrashLoopBackOff (OOMKilled)"
# Check command
command = c.get("command", [])
if command and isinstance(command, list):
if any("wrong" in str(cmd).lower() or "typo" in str(cmd).lower() for cmd in command):
return "CrashLoopBackOff"
# Check env refs to missing configmaps
env_from = c.get("envFrom", [])
for ef in env_from:
cm_ref = ef.get("configMapRef", {})
if cm_ref and cm_ref.get("name"):
# Check if configmap exists in resources
cm_exists = any(
res["doc"].get("kind") == "ConfigMap"
and res["doc"].get("metadata", {}).get("name") == cm_ref["name"]
for res in resources
)
if not cm_exists:
return f"CreateContainerConfigError (ConfigMap '{cm_ref['name']}' not found)"
return "Running"
def _simulate_service_status(self, resources: List[Dict[str, Any]]) -> str:
"""Simulate service endpoint status."""
services = [r for r in resources if r["doc"].get("kind") == "Service"]
deployments = [r for r in resources if r["doc"].get("kind") in ("Deployment", "StatefulSet")]
if not services:
return "N/A"
for svc_r in services:
svc = svc_r["doc"]
selector = svc.get("spec", {}).get("selector", {})
if not selector:
continue
matched = False
for dep_r in deployments:
dep = dep_r["doc"]
tmpl_labels = dep.get("spec", {}).get("template", {}).get("metadata", {}).get("labels", {})
if all(tmpl_labels.get(k) == v for k, v in selector.items()):
matched = True
# Check port matching
svc_ports = svc.get("spec", {}).get("ports", [])
container_ports = []
for c in dep.get("spec", {}).get("template", {}).get("spec", {}).get("containers", []):
for p in (c.get("ports") or []):
container_ports.append(p.get("containerPort"))
for sp in svc_ports:
tp = sp.get("targetPort")
if tp and tp not in container_ports and container_ports:
return f"Service port mismatch: targetPort {tp} not in container ports {container_ports}"
break
if not matched:
svc_name = svc.get("metadata", {}).get("name", "?")
return f"No endpoints (selector {selector} matches no pods)"
return "Endpoints active"