File size: 12,762 Bytes
c8f3b98 2794920 c8f3b98 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 | """Kubernetes manifest validator and simulator — deterministic, rule-based."""
import re
from typing import Any, Dict, List, Optional
import yaml
from server.models import FileContent
# Valid top-level K8s resource kinds we recognise
VALID_KINDS = {
"Deployment", "StatefulSet", "DaemonSet", "ReplicaSet",
"Pod", "Service", "Ingress", "ConfigMap", "Secret",
"PersistentVolumeClaim", "PersistentVolume",
"Job", "CronJob", "Namespace", "ServiceAccount",
"Role", "RoleBinding", "ClusterRole", "ClusterRoleBinding",
"HorizontalPodAutoscaler", "NetworkPolicy",
}
VALID_API_VERSIONS = {
"v1", "apps/v1", "batch/v1", "networking.k8s.io/v1",
"rbac.authorization.k8s.io/v1", "autoscaling/v2",
"autoscaling/v1", "policy/v1",
}
def _parse_memory(mem_str: str) -> int:
"""Parse K8s memory string to bytes."""
mem_str = str(mem_str).strip()
multipliers = {
"Ki": 1024, "Mi": 1024**2, "Gi": 1024**3, "Ti": 1024**4,
"K": 1000, "M": 1000**2, "G": 1000**3, "T": 1000**4,
}
for suffix, mult in multipliers.items():
if mem_str.endswith(suffix):
return int(mem_str[:-len(suffix)]) * mult
if mem_str.isdigit():
return int(mem_str)
return 0
class KubernetesSimulator:
"""Simulates kubectl apply / kubectl get output.
Validates K8s manifests without a real cluster.
"""
def validate(self, manifests: Dict[str, FileContent]) -> Dict[str, Any]:
"""Validate all Kubernetes manifests in the file set.
Returns dict with keys:
valid: bool
errors: list of error strings
pod_status: simulated pod status
service_status: simulated service endpoint status
"""
k8s_files: Dict[str, Any] = {}
errors: List[str] = []
# Parse all K8s YAML files
for path, fc in manifests.items():
if fc.file_type.value != "kubernetes":
continue
try:
docs = list(yaml.safe_load_all(fc.content))
for doc in docs:
if doc and isinstance(doc, dict):
k8s_files[path] = doc
except yaml.YAMLError as exc:
errors.append(f"YAML parse error in {path}: {exc}")
if not k8s_files and not errors:
return {"valid": True, "errors": [], "pod_status": "N/A", "service_status": "N/A"}
if errors:
return {"valid": False, "errors": errors, "pod_status": "Error", "service_status": "Error"}
# Validate each manifest
all_resources: List[Dict[str, Any]] = []
for path, doc in k8s_files.items():
resource_errors = self._validate_resource(path, doc)
errors.extend(resource_errors)
all_resources.append({"path": path, "doc": doc})
# Cross-resource validation
cross_errors = self._validate_cross_resources(all_resources)
errors.extend(cross_errors)
# Simulate pod status
pod_status = self._simulate_pod_status(all_resources)
service_status = self._simulate_service_status(all_resources)
return {
"valid": len(errors) == 0,
"errors": errors,
"pod_status": pod_status,
"service_status": service_status,
}
def _validate_resource(self, path: str, doc: Dict[str, Any]) -> List[str]:
"""Validate a single K8s resource document."""
errors: List[str] = []
kind = doc.get("kind", "")
api_version = doc.get("apiVersion", "")
if not kind:
errors.append(f"{path}: missing 'kind' field")
elif kind not in VALID_KINDS:
errors.append(f"{path}: unknown kind '{kind}'")
if not api_version:
errors.append(f"{path}: missing 'apiVersion' field")
elif api_version not in VALID_API_VERSIONS:
errors.append(f"{path}: unknown apiVersion '{api_version}'")
metadata = doc.get("metadata", {})
if not isinstance(metadata, dict) or not metadata.get("name"):
errors.append(f"{path}: metadata.name is required")
# Kind-specific validation
if kind == "Deployment":
errors.extend(self._validate_deployment(path, doc))
elif kind == "Service":
errors.extend(self._validate_service(path, doc))
elif kind == "Ingress":
errors.extend(self._validate_ingress(path, doc))
return errors
def _validate_deployment(self, path: str, doc: Dict[str, Any]) -> List[str]:
errors: List[str] = []
spec = doc.get("spec", {})
if not isinstance(spec, dict):
errors.append(f"{path}: Deployment spec must be a mapping")
return errors
selector = spec.get("selector", {})
template = spec.get("template", {})
if not selector or not selector.get("matchLabels"):
errors.append(f"{path}: Deployment must have spec.selector.matchLabels")
return errors
tmpl_labels = template.get("metadata", {}).get("labels", {})
sel_labels = selector.get("matchLabels", {})
# selector must match template labels
for k, v in sel_labels.items():
if tmpl_labels.get(k) != v:
errors.append(
f"{path}: selector matchLabels ({k}={v}) does not match template labels"
)
# Validate containers
containers = template.get("spec", {}).get("containers", [])
if not containers:
errors.append(f"{path}: Deployment must have at least one container")
for c in containers:
if not c.get("image"):
errors.append(f"{path}: container '{c.get('name', '?')}' missing image")
return errors
def _validate_service(self, path: str, doc: Dict[str, Any]) -> List[str]:
errors: List[str] = []
spec = doc.get("spec", {})
if not isinstance(spec, dict):
errors.append(f"{path}: Service spec must be a mapping")
return errors
if not spec.get("selector"):
errors.append(f"{path}: Service must have spec.selector")
ports = spec.get("ports", [])
if not ports:
errors.append(f"{path}: Service must define at least one port")
for p in ports:
if not p.get("port"):
errors.append(f"{path}: Service port entry missing 'port' field")
return errors
def _validate_ingress(self, path: str, doc: Dict[str, Any]) -> List[str]:
errors: List[str] = []
spec = doc.get("spec", {})
rules = spec.get("rules", [])
if not rules:
errors.append(f"{path}: Ingress must define at least one rule")
return errors
def _validate_cross_resources(self, resources: List[Dict[str, Any]]) -> List[str]:
"""Validate cross-resource dependencies (e.g. Service selector matches Deployment labels)."""
errors: List[str] = []
# Collect all pod labels from Deployments/StatefulSets
pod_labels_by_name: Dict[str, Dict[str, str]] = {}
for r in resources:
doc = r["doc"]
kind = doc.get("kind", "")
if kind in ("Deployment", "StatefulSet", "DaemonSet"):
tmpl = doc.get("spec", {}).get("template", {})
labels = tmpl.get("metadata", {}).get("labels", {})
name = doc.get("metadata", {}).get("name", "?")
pod_labels_by_name[name] = labels
# Check Service selectors match some pod labels
for r in resources:
doc = r["doc"]
if doc.get("kind") != "Service":
continue
svc_name = doc.get("metadata", {}).get("name", "?")
selector = doc.get("spec", {}).get("selector", {})
if not selector:
continue
matched = False
for dep_name, labels in pod_labels_by_name.items():
if all(labels.get(k) == v for k, v in selector.items()):
matched = True
break
if not matched and pod_labels_by_name:
errors.append(
f"Service '{svc_name}' selector {selector} does not match any pod labels"
)
return errors
def _simulate_pod_status(self, resources: List[Dict[str, Any]]) -> str:
"""Simulate what pod status would be."""
for r in resources:
doc = r["doc"]
kind = doc.get("kind", "")
if kind not in ("Deployment", "StatefulSet", "DaemonSet", "Pod"):
continue
if kind == "Pod":
containers = doc.get("spec", {}).get("containers", [])
else:
containers = doc.get("spec", {}).get("template", {}).get("spec", {}).get("containers", [])
for c in containers:
image = c.get("image", "")
# Check for image typos (common: latset, lates, etc.)
if image and ":" in image:
tag = image.split(":")[-1]
if tag in ("latset", "lates", "latets"):
return "ImagePullBackOff"
# Check for hardcoded placeholder images
if "OWNER/REPO" in image or "TAG" in image:
return "ImagePullBackOff"
# Check memory limits
resources_spec = c.get("resources", {})
limits = resources_spec.get("limits", {})
mem_limit = limits.get("memory", "")
if mem_limit:
mem_bytes = _parse_memory(str(mem_limit))
# Simulate OOM if memory limit is very low
if 0 < mem_bytes < 128 * 1024 * 1024: # < 128Mi
return "CrashLoopBackOff (OOMKilled)"
# Check command
command = c.get("command", [])
if command and isinstance(command, list):
if any("wrong" in str(cmd).lower() or "typo" in str(cmd).lower() for cmd in command):
return "CrashLoopBackOff"
# Check env refs to missing configmaps
env_from = c.get("envFrom", [])
for ef in env_from:
cm_ref = ef.get("configMapRef", {})
if cm_ref and cm_ref.get("name"):
# Check if configmap exists in resources
cm_exists = any(
res["doc"].get("kind") == "ConfigMap"
and res["doc"].get("metadata", {}).get("name") == cm_ref["name"]
for res in resources
)
if not cm_exists:
return f"CreateContainerConfigError (ConfigMap '{cm_ref['name']}' not found)"
return "Running"
def _simulate_service_status(self, resources: List[Dict[str, Any]]) -> str:
"""Simulate service endpoint status."""
services = [r for r in resources if r["doc"].get("kind") == "Service"]
deployments = [r for r in resources if r["doc"].get("kind") in ("Deployment", "StatefulSet")]
if not services:
return "N/A"
for svc_r in services:
svc = svc_r["doc"]
selector = svc.get("spec", {}).get("selector", {})
if not selector:
continue
matched = False
for dep_r in deployments:
dep = dep_r["doc"]
tmpl_labels = dep.get("spec", {}).get("template", {}).get("metadata", {}).get("labels", {})
if all(tmpl_labels.get(k) == v for k, v in selector.items()):
matched = True
# Check port matching
svc_ports = svc.get("spec", {}).get("ports", [])
container_ports = []
for c in dep.get("spec", {}).get("template", {}).get("spec", {}).get("containers", []):
for p in (c.get("ports") or []):
container_ports.append(p.get("containerPort"))
for sp in svc_ports:
tp = sp.get("targetPort")
if tp and tp not in container_ports and container_ports:
return f"Service port mismatch: targetPort {tp} not in container ports {container_ports}"
break
if not matched:
svc_name = svc.get("metadata", {}).get("name", "?")
return f"No endpoints (selector {selector} matches no pods)"
return "Endpoints active"
|