""" LITEHAT SELF-HEALING Autonomous failure recovery — detect, rollback, analyze, fix, redeploy. The self-healing loop: 1. Monitor: Detect deployment/application failures 2. Triage: Classify failure severity and type 3. Rollback: Auto-revert to last known good state 4. Analyze: Read logs, identify root cause 5. Fix: The Brain patches the code 6. Verify: Run tests on the fix 7. Redeploy: Push the fixed version 8. Learn: Record the failure pattern for future prevention All autonomous. No human touches the keyboard. """ import json import time import re from typing import Optional, Dict, Any, List, Tuple from dataclasses import dataclass, field from enum import Enum class FailureSeverity(str, Enum): CRITICAL = "critical" # App is completely down DEGRADED = "degraded" # Partially functional WARNING = "warning" # Still working but at risk class FailureCategory(str, Enum): OOM = "out_of_memory" CRASH = "crash_loop" NETWORK = "network_error" DEPENDENCY = "missing_dependency" CONFIG = "config_error" BUILD = "build_error" DEPLOY = "deploy_error" SYNTAX = "syntax_error" LOGIC = "logic_error" TIMEOUT = "timeout" UNKNOWN = "unknown" @dataclass class FailureEvent: """A single failure event — analyzed and annotated.""" timestamp: float app_name: str severity: FailureSeverity category: FailureCategory error_message: str stack_trace: Optional[str] = None pod_logs: Optional[str] = None root_cause: Optional[str] = None fix_applied: Optional[str] = None fix_successful: bool = False rollback_performed: bool = False class SelfHealingEngine: """ Autonomous self-healing engine. The engine watches the application, detects failures, and autonomously heals them. It learns from past failures to prevent recurrence. Pattern: detect → rollback → analyze → fix → verify → redeploy """ def __init__(self): self.failure_history: List[FailureEvent] = [] self.known_fixes: Dict[str, str] = {} # error_pattern → fix_strategy self.healing_in_progress: Dict[str, bool] = {} def detect_failure( self, app_name: str, logs: str, health_status: int = 200, ) -> Optional[FailureEvent]: """ Detect if a failure has occurred. Returns a FailureEvent if failure detected, None if healthy. """ if health_status == 200: return None event = FailureEvent( timestamp=time.time(), app_name=app_name, severity=self._classify_severity(logs, health_status), category=self._classify_category(logs), error_message=self._extract_error(logs), pod_logs=logs, ) self.failure_history.append(event) return event def heal(self, event: FailureEvent) -> bool: """ Heal a failure autonomously. Returns True if the healing was successful. """ if self.healing_in_progress.get(event.app_name): return False # Already healing self.healing_in_progress[event.app_name] = True try: print(f"\n💊 HEALING {event.app_name} — {event.category.value}") # Step 1: Immediate rollback if critical if event.severity == FailureSeverity.CRITICAL: print(f"🔄 Rolling back {event.app_name}...") self._rollback(event.app_name) event.rollback_performed = True # Step 2: Analyze root cause root_cause = self._analyze_root_cause(event) event.root_cause = root_cause print(f"🔍 Root cause: {root_cause}") # Step 3: Generate fix fix = self._generate_fix(event) event.fix_applied = fix print(f"🔧 Fix: {fix}") # Step 4: Apply fix self._apply_fix(event, fix) # Step 5: Verify verified = self._verify_fix(event) print(f"{'✅' if verified else '❌'} Verification: {'passed' if verified else 'failed'}") # Step 6: Redeploy if verified: self._redeploy(event.app_name) event.fix_successful = True print(f"🚀 Redeployed: {event.app_name}") # Step 7: Learn self._learn_from_failure(event) print(f"📚 Learned new healing pattern") return verified finally: self.healing_in_progress[event.app_name] = False def _classify_severity(self, logs: str, health_status: int) -> FailureSeverity: """Classify failure severity.""" if health_status >= 500: return FailureSeverity.CRITICAL if health_status >= 400: return FailureSeverity.DEGRADED return FailureSeverity.WARNING def _classify_category(self, logs: str) -> FailureCategory: """Classify the type of failure from logs.""" patterns = { FailureCategory.OOM: [r"OOMKilled", r"out of memory", r"memory limit"], FailureCategory.CRASH: [r"CrashLoopBackOff", r"segfault", r"SIGSEGV"], FailureCategory.NETWORK: [r"connection refused", r"ECONNREFUSED", r"timeout"], FailureCategory.DEPENDENCY: [r"module not found", r"cannot find module", r"ModuleNotFoundError"], FailureCategory.CONFIG: [r"invalid configuration", r"config error"], FailureCategory.BUILD: [r"build failed", r"compilation error"], FailureCategory.DEPLOY: [r"ImagePullBackOff", r"ErrImagePull"], FailureCategory.SYNTAX: [r"SyntaxError", r"syntax error", r"unexpected token"], FailureCategory.LOGIC: [r"TypeError", r"ReferenceError", r"undefined is not"], FailureCategory.TIMEOUT: [r"timed out", r"ETIMEDOUT", r"TimeoutError"], } for category, regexes in patterns.items(): for regex in regexes: if re.search(regex, logs, re.IGNORECASE): return category return FailureCategory.UNKNOWN def _extract_error(self, logs: str) -> str: """Extract the error message from logs.""" # Look for common error patterns error_patterns = [ r"Error: (.+?)(?:\n|$)", r"ERROR: (.+?)(?:\n|$)", r"FATAL: (.+?)(?:\n|$)", r"panic: (.+?)(?:\n|$)", r"Exception: (.+?)(?:\n|$)", r"(\w+Error): (.+?)(?:\n|$)", ] for pattern in error_patterns: match = re.search(pattern, logs, re.MULTILINE) if match: return match.group(0).strip() # Return last non-empty line as fallback lines = [l for l in logs.split('\n') if l.strip()] return lines[-1] if lines else "Unknown error" def _analyze_root_cause(self, event: FailureEvent) -> str: """Deep analysis of root cause.""" analysis_map = { FailureCategory.OOM: ( f"Memory exhaustion in {event.app_name}. " f"Container hit memory limit. Increase memory request or optimize memory usage." ), FailureCategory.CRASH: ( f"Application crash in {event.app_name}. " f"Check for segfaults in native modules or unhandled exceptions." ), FailureCategory.NETWORK: ( f"Network error in {event.app_name}. " f"Dependency service unreachable or port mismatch." ), FailureCategory.DEPENDENCY: ( f"Missing dependency in {event.app_name}. " f"Check package.json/requirements.txt for missing packages." ), FailureCategory.CONFIG: ( f"Configuration error in {event.app_name}. " f"Environment variables or config files are invalid." ), FailureCategory.SYNTAX: ( f"Syntax error in {event.app_name}. " f"Code has invalid syntax that prevents execution." ), FailureCategory.LOGIC: ( f"Runtime logic error in {event.app_name}. " f"Type error, null reference, or undefined value at runtime." ), FailureCategory.BUILD: ( f"Build failure in {event.app_name}. " f"Compilation or bundling step failed." ), } return analysis_map.get( event.category, f"Unknown failure in {event.app_name}: {event.error_message}" ) def _generate_fix(self, event: FailureEvent) -> str: """Generate a fix for the failure.""" # Check known fixes first for pattern, fix in self.known_fixes.items(): if pattern in event.error_message.lower(): return fix fix_map = { FailureCategory.OOM: "Increase memory limit in deployment config and optimize allocations", FailureCategory.DEPENDENCY: "Add missing dependency to package manifest and rebuild", FailureCategory.CONFIG: "Fix environment variable configuration and redeploy", FailureCategory.SYNTAX: "Fix syntax error in source code", FailureCategory.LOGIC: "Add null checks and type guards", FailureCategory.NETWORK: "Verify service connectivity and port configuration", FailureCategory.CRASH: "Add error boundary and graceful shutdown handler", FailureCategory.BUILD: "Fix build script and dependency resolution", FailureCategory.DEPLOY: "Verify container registry access and image tags", } return fix_map.get(event.category, "Manual investigation required") def _apply_fix(self, event: FailureEvent, fix: str): """Apply the fix to the codebase/deployment.""" # The Brain modifies the actual source files to implement the fix # For deployment-level fixes, it modifies the Kuberns configs pass def _verify_fix(self, event: FailureEvent) -> bool: """Verify the fix by running tests.""" # Run the test suite # Run health checks against the fixed deployment return True # Simulated for now def _rollback(self, app_name: str): """Rollback to the last known good deployment.""" # Execute kubectl rollout undo print(f" ↪ Rolling back {app_name} to previous version") def _redeploy(self, app_name: str): """Redeploy the fixed application.""" # Build new image, push, and deploy print(f" ↪ Redeploying {app_name}") def _learn_from_failure(self, event: FailureEvent): """Learn from this failure to prevent recurrence.""" if event.root_cause and event.fix_applied: key = event.error_message.lower()[:100] # Use error message as pattern key self.known_fixes[key] = event.fix_applied def get_health_report(self) -> Dict[str, Any]: """Generate a health report for all applications.""" total_failures = len(self.failure_history) healed = sum(1 for f in self.failure_history if f.fix_successful) return { "total_failures": total_failures, "healed": healed, "heal_rate": healed / total_failures if total_failures > 0 else 1.0, "known_patterns": len(self.known_fixes), "recent_failures": [ { "app": f.app_name, "category": f.category.value, "severity": f.severity.value, "healed": f.fix_successful, "time_ago_s": time.time() - f.timestamp, } for f in self.failure_history[-5:] ], } # ═══════════════════════════════════════════════════════════════════════════════ # CONTINUOUS MONITOR # ═══════════════════════════════════════════════════════════════════════════════ class ContinuousMonitor: """ Continuous monitoring loop — watches apps and triggers self-healing. Runs as a background daemon: - Pings health endpoints every 30s - Collects pod metrics - Detects anomalies - Triggers self-healing on failure """ def __init__(self, healer: SelfHealingEngine): self.healer = healer self.apps: Dict[str, str] = {} # app_name → health_url def register_app(self, app_name: str, health_url: str): """Register an app for monitoring.""" self.apps[app_name] = health_url async def monitor_loop(self, interval_s: int = 30): """Main monitoring loop.""" import asyncio while True: for app_name, health_url in self.apps.items(): try: # Health check import urllib.request resp = urllib.request.urlopen(health_url, timeout=5) if resp.status != 200: # Failure detected event = self.healer.detect_failure( app_name, logs=f"Health check returned {resp.status}", health_status=resp.status, ) if event: self.healer.heal(event) except Exception as e: # Connection failure event = self.healer.detect_failure( app_name, logs=f"Health check failed: {e}", health_status=503, ) if event: self.healer.heal(event) await asyncio.sleep(interval_s)