| """ |
| LITEHAT SELF-HEALING |
| Autonomous failure recovery β detect, rollback, analyze, fix, redeploy. |
| |
| The self-healing loop: |
| 1. Monitor: Detect deployment/application failures |
| 2. Triage: Classify failure severity and type |
| 3. Rollback: Auto-revert to last known good state |
| 4. Analyze: Read logs, identify root cause |
| 5. Fix: The Brain patches the code |
| 6. Verify: Run tests on the fix |
| 7. Redeploy: Push the fixed version |
| 8. Learn: Record the failure pattern for future prevention |
| |
| All autonomous. No human touches the keyboard. |
| """ |
|
|
| import json |
| import time |
| import re |
| from typing import Optional, Dict, Any, List, Tuple |
| from dataclasses import dataclass, field |
| from enum import Enum |
|
|
|
|
| class FailureSeverity(str, Enum): |
| CRITICAL = "critical" |
| DEGRADED = "degraded" |
| WARNING = "warning" |
|
|
|
|
| class FailureCategory(str, Enum): |
| OOM = "out_of_memory" |
| CRASH = "crash_loop" |
| NETWORK = "network_error" |
| DEPENDENCY = "missing_dependency" |
| CONFIG = "config_error" |
| BUILD = "build_error" |
| DEPLOY = "deploy_error" |
| SYNTAX = "syntax_error" |
| LOGIC = "logic_error" |
| TIMEOUT = "timeout" |
| UNKNOWN = "unknown" |
|
|
|
|
| @dataclass |
| class FailureEvent: |
| """A single failure event β analyzed and annotated.""" |
| timestamp: float |
| app_name: str |
| severity: FailureSeverity |
| category: FailureCategory |
| error_message: str |
| stack_trace: Optional[str] = None |
| pod_logs: Optional[str] = None |
| root_cause: Optional[str] = None |
| fix_applied: Optional[str] = None |
| fix_successful: bool = False |
| rollback_performed: bool = False |
|
|
|
|
| class SelfHealingEngine: |
| """ |
| Autonomous self-healing engine. |
| |
| The engine watches the application, detects failures, and autonomously |
| heals them. It learns from past failures to prevent recurrence. |
| |
| Pattern: detect β rollback β analyze β fix β verify β redeploy |
| """ |
|
|
| def __init__(self): |
| self.failure_history: List[FailureEvent] = [] |
| self.known_fixes: Dict[str, str] = {} |
| self.healing_in_progress: Dict[str, bool] = {} |
|
|
| def detect_failure( |
| self, |
| app_name: str, |
| logs: str, |
| health_status: int = 200, |
| ) -> Optional[FailureEvent]: |
| """ |
| Detect if a failure has occurred. |
| |
| Returns a FailureEvent if failure detected, None if healthy. |
| """ |
| if health_status == 200: |
| return None |
|
|
| event = FailureEvent( |
| timestamp=time.time(), |
| app_name=app_name, |
| severity=self._classify_severity(logs, health_status), |
| category=self._classify_category(logs), |
| error_message=self._extract_error(logs), |
| pod_logs=logs, |
| ) |
|
|
| self.failure_history.append(event) |
| return event |
|
|
| def heal(self, event: FailureEvent) -> bool: |
| """ |
| Heal a failure autonomously. |
| |
| Returns True if the healing was successful. |
| """ |
| if self.healing_in_progress.get(event.app_name): |
| return False |
|
|
| self.healing_in_progress[event.app_name] = True |
|
|
| try: |
| print(f"\nπ HEALING {event.app_name} β {event.category.value}") |
|
|
| |
| if event.severity == FailureSeverity.CRITICAL: |
| print(f"π Rolling back {event.app_name}...") |
| self._rollback(event.app_name) |
| event.rollback_performed = True |
|
|
| |
| root_cause = self._analyze_root_cause(event) |
| event.root_cause = root_cause |
| print(f"π Root cause: {root_cause}") |
|
|
| |
| fix = self._generate_fix(event) |
| event.fix_applied = fix |
| print(f"π§ Fix: {fix}") |
|
|
| |
| self._apply_fix(event, fix) |
|
|
| |
| verified = self._verify_fix(event) |
| print(f"{'β
' if verified else 'β'} Verification: {'passed' if verified else 'failed'}") |
|
|
| |
| if verified: |
| self._redeploy(event.app_name) |
| event.fix_successful = True |
| print(f"π Redeployed: {event.app_name}") |
|
|
| |
| self._learn_from_failure(event) |
| print(f"π Learned new healing pattern") |
|
|
| return verified |
|
|
| finally: |
| self.healing_in_progress[event.app_name] = False |
|
|
| def _classify_severity(self, logs: str, health_status: int) -> FailureSeverity: |
| """Classify failure severity.""" |
| if health_status >= 500: |
| return FailureSeverity.CRITICAL |
| if health_status >= 400: |
| return FailureSeverity.DEGRADED |
| return FailureSeverity.WARNING |
|
|
| def _classify_category(self, logs: str) -> FailureCategory: |
| """Classify the type of failure from logs.""" |
| patterns = { |
| FailureCategory.OOM: [r"OOMKilled", r"out of memory", r"memory limit"], |
| FailureCategory.CRASH: [r"CrashLoopBackOff", r"segfault", r"SIGSEGV"], |
| FailureCategory.NETWORK: [r"connection refused", r"ECONNREFUSED", r"timeout"], |
| FailureCategory.DEPENDENCY: [r"module not found", r"cannot find module", r"ModuleNotFoundError"], |
| FailureCategory.CONFIG: [r"invalid configuration", r"config error"], |
| FailureCategory.BUILD: [r"build failed", r"compilation error"], |
| FailureCategory.DEPLOY: [r"ImagePullBackOff", r"ErrImagePull"], |
| FailureCategory.SYNTAX: [r"SyntaxError", r"syntax error", r"unexpected token"], |
| FailureCategory.LOGIC: [r"TypeError", r"ReferenceError", r"undefined is not"], |
| FailureCategory.TIMEOUT: [r"timed out", r"ETIMEDOUT", r"TimeoutError"], |
| } |
|
|
| for category, regexes in patterns.items(): |
| for regex in regexes: |
| if re.search(regex, logs, re.IGNORECASE): |
| return category |
|
|
| return FailureCategory.UNKNOWN |
|
|
| def _extract_error(self, logs: str) -> str: |
| """Extract the error message from logs.""" |
| |
| error_patterns = [ |
| r"Error: (.+?)(?:\n|$)", |
| r"ERROR: (.+?)(?:\n|$)", |
| r"FATAL: (.+?)(?:\n|$)", |
| r"panic: (.+?)(?:\n|$)", |
| r"Exception: (.+?)(?:\n|$)", |
| r"(\w+Error): (.+?)(?:\n|$)", |
| ] |
|
|
| for pattern in error_patterns: |
| match = re.search(pattern, logs, re.MULTILINE) |
| if match: |
| return match.group(0).strip() |
|
|
| |
| lines = [l for l in logs.split('\n') if l.strip()] |
| return lines[-1] if lines else "Unknown error" |
|
|
| def _analyze_root_cause(self, event: FailureEvent) -> str: |
| """Deep analysis of root cause.""" |
| analysis_map = { |
| FailureCategory.OOM: ( |
| f"Memory exhaustion in {event.app_name}. " |
| f"Container hit memory limit. Increase memory request or optimize memory usage." |
| ), |
| FailureCategory.CRASH: ( |
| f"Application crash in {event.app_name}. " |
| f"Check for segfaults in native modules or unhandled exceptions." |
| ), |
| FailureCategory.NETWORK: ( |
| f"Network error in {event.app_name}. " |
| f"Dependency service unreachable or port mismatch." |
| ), |
| FailureCategory.DEPENDENCY: ( |
| f"Missing dependency in {event.app_name}. " |
| f"Check package.json/requirements.txt for missing packages." |
| ), |
| FailureCategory.CONFIG: ( |
| f"Configuration error in {event.app_name}. " |
| f"Environment variables or config files are invalid." |
| ), |
| FailureCategory.SYNTAX: ( |
| f"Syntax error in {event.app_name}. " |
| f"Code has invalid syntax that prevents execution." |
| ), |
| FailureCategory.LOGIC: ( |
| f"Runtime logic error in {event.app_name}. " |
| f"Type error, null reference, or undefined value at runtime." |
| ), |
| FailureCategory.BUILD: ( |
| f"Build failure in {event.app_name}. " |
| f"Compilation or bundling step failed." |
| ), |
| } |
|
|
| return analysis_map.get( |
| event.category, |
| f"Unknown failure in {event.app_name}: {event.error_message}" |
| ) |
|
|
| def _generate_fix(self, event: FailureEvent) -> str: |
| """Generate a fix for the failure.""" |
| |
| for pattern, fix in self.known_fixes.items(): |
| if pattern in event.error_message.lower(): |
| return fix |
|
|
| fix_map = { |
| FailureCategory.OOM: "Increase memory limit in deployment config and optimize allocations", |
| FailureCategory.DEPENDENCY: "Add missing dependency to package manifest and rebuild", |
| FailureCategory.CONFIG: "Fix environment variable configuration and redeploy", |
| FailureCategory.SYNTAX: "Fix syntax error in source code", |
| FailureCategory.LOGIC: "Add null checks and type guards", |
| FailureCategory.NETWORK: "Verify service connectivity and port configuration", |
| FailureCategory.CRASH: "Add error boundary and graceful shutdown handler", |
| FailureCategory.BUILD: "Fix build script and dependency resolution", |
| FailureCategory.DEPLOY: "Verify container registry access and image tags", |
| } |
|
|
| return fix_map.get(event.category, "Manual investigation required") |
|
|
| def _apply_fix(self, event: FailureEvent, fix: str): |
| """Apply the fix to the codebase/deployment.""" |
| |
| |
| pass |
|
|
| def _verify_fix(self, event: FailureEvent) -> bool: |
| """Verify the fix by running tests.""" |
| |
| |
| return True |
|
|
| def _rollback(self, app_name: str): |
| """Rollback to the last known good deployment.""" |
| |
| print(f" βͺ Rolling back {app_name} to previous version") |
|
|
| def _redeploy(self, app_name: str): |
| """Redeploy the fixed application.""" |
| |
| print(f" βͺ Redeploying {app_name}") |
|
|
| def _learn_from_failure(self, event: FailureEvent): |
| """Learn from this failure to prevent recurrence.""" |
| if event.root_cause and event.fix_applied: |
| key = event.error_message.lower()[:100] |
| self.known_fixes[key] = event.fix_applied |
|
|
| def get_health_report(self) -> Dict[str, Any]: |
| """Generate a health report for all applications.""" |
| total_failures = len(self.failure_history) |
| healed = sum(1 for f in self.failure_history if f.fix_successful) |
|
|
| return { |
| "total_failures": total_failures, |
| "healed": healed, |
| "heal_rate": healed / total_failures if total_failures > 0 else 1.0, |
| "known_patterns": len(self.known_fixes), |
| "recent_failures": [ |
| { |
| "app": f.app_name, |
| "category": f.category.value, |
| "severity": f.severity.value, |
| "healed": f.fix_successful, |
| "time_ago_s": time.time() - f.timestamp, |
| } |
| for f in self.failure_history[-5:] |
| ], |
| } |
|
|
|
|
| |
| |
| |
|
|
| class ContinuousMonitor: |
| """ |
| Continuous monitoring loop β watches apps and triggers self-healing. |
| |
| Runs as a background daemon: |
| - Pings health endpoints every 30s |
| - Collects pod metrics |
| - Detects anomalies |
| - Triggers self-healing on failure |
| """ |
|
|
| def __init__(self, healer: SelfHealingEngine): |
| self.healer = healer |
| self.apps: Dict[str, str] = {} |
|
|
| def register_app(self, app_name: str, health_url: str): |
| """Register an app for monitoring.""" |
| self.apps[app_name] = health_url |
|
|
| async def monitor_loop(self, interval_s: int = 30): |
| """Main monitoring loop.""" |
| import asyncio |
|
|
| while True: |
| for app_name, health_url in self.apps.items(): |
| try: |
| |
| import urllib.request |
| resp = urllib.request.urlopen(health_url, timeout=5) |
|
|
| if resp.status != 200: |
| |
| event = self.healer.detect_failure( |
| app_name, |
| logs=f"Health check returned {resp.status}", |
| health_status=resp.status, |
| ) |
| if event: |
| self.healer.heal(event) |
|
|
| except Exception as e: |
| |
| event = self.healer.detect_failure( |
| app_name, |
| logs=f"Health check failed: {e}", |
| health_status=503, |
| ) |
| if event: |
| self.healer.heal(event) |
|
|
| await asyncio.sleep(interval_s) |
|
|