Litehat-Universal-Engine / litehat /self_healing.py
dryymatt's picture
Upload litehat/self_healing.py
529318c verified
"""
LITEHAT SELF-HEALING
Autonomous failure recovery β€” detect, rollback, analyze, fix, redeploy.
The self-healing loop:
1. Monitor: Detect deployment/application failures
2. Triage: Classify failure severity and type
3. Rollback: Auto-revert to last known good state
4. Analyze: Read logs, identify root cause
5. Fix: The Brain patches the code
6. Verify: Run tests on the fix
7. Redeploy: Push the fixed version
8. Learn: Record the failure pattern for future prevention
All autonomous. No human touches the keyboard.
"""
import json
import time
import re
from typing import Optional, Dict, Any, List, Tuple
from dataclasses import dataclass, field
from enum import Enum
class FailureSeverity(str, Enum):
CRITICAL = "critical" # App is completely down
DEGRADED = "degraded" # Partially functional
WARNING = "warning" # Still working but at risk
class FailureCategory(str, Enum):
OOM = "out_of_memory"
CRASH = "crash_loop"
NETWORK = "network_error"
DEPENDENCY = "missing_dependency"
CONFIG = "config_error"
BUILD = "build_error"
DEPLOY = "deploy_error"
SYNTAX = "syntax_error"
LOGIC = "logic_error"
TIMEOUT = "timeout"
UNKNOWN = "unknown"
@dataclass
class FailureEvent:
"""A single failure event β€” analyzed and annotated."""
timestamp: float
app_name: str
severity: FailureSeverity
category: FailureCategory
error_message: str
stack_trace: Optional[str] = None
pod_logs: Optional[str] = None
root_cause: Optional[str] = None
fix_applied: Optional[str] = None
fix_successful: bool = False
rollback_performed: bool = False
class SelfHealingEngine:
"""
Autonomous self-healing engine.
The engine watches the application, detects failures, and autonomously
heals them. It learns from past failures to prevent recurrence.
Pattern: detect β†’ rollback β†’ analyze β†’ fix β†’ verify β†’ redeploy
"""
def __init__(self):
self.failure_history: List[FailureEvent] = []
self.known_fixes: Dict[str, str] = {} # error_pattern β†’ fix_strategy
self.healing_in_progress: Dict[str, bool] = {}
def detect_failure(
self,
app_name: str,
logs: str,
health_status: int = 200,
) -> Optional[FailureEvent]:
"""
Detect if a failure has occurred.
Returns a FailureEvent if failure detected, None if healthy.
"""
if health_status == 200:
return None
event = FailureEvent(
timestamp=time.time(),
app_name=app_name,
severity=self._classify_severity(logs, health_status),
category=self._classify_category(logs),
error_message=self._extract_error(logs),
pod_logs=logs,
)
self.failure_history.append(event)
return event
def heal(self, event: FailureEvent) -> bool:
"""
Heal a failure autonomously.
Returns True if the healing was successful.
"""
if self.healing_in_progress.get(event.app_name):
return False # Already healing
self.healing_in_progress[event.app_name] = True
try:
print(f"\nπŸ’Š HEALING {event.app_name} β€” {event.category.value}")
# Step 1: Immediate rollback if critical
if event.severity == FailureSeverity.CRITICAL:
print(f"πŸ”„ Rolling back {event.app_name}...")
self._rollback(event.app_name)
event.rollback_performed = True
# Step 2: Analyze root cause
root_cause = self._analyze_root_cause(event)
event.root_cause = root_cause
print(f"πŸ” Root cause: {root_cause}")
# Step 3: Generate fix
fix = self._generate_fix(event)
event.fix_applied = fix
print(f"πŸ”§ Fix: {fix}")
# Step 4: Apply fix
self._apply_fix(event, fix)
# Step 5: Verify
verified = self._verify_fix(event)
print(f"{'βœ…' if verified else '❌'} Verification: {'passed' if verified else 'failed'}")
# Step 6: Redeploy
if verified:
self._redeploy(event.app_name)
event.fix_successful = True
print(f"πŸš€ Redeployed: {event.app_name}")
# Step 7: Learn
self._learn_from_failure(event)
print(f"πŸ“š Learned new healing pattern")
return verified
finally:
self.healing_in_progress[event.app_name] = False
def _classify_severity(self, logs: str, health_status: int) -> FailureSeverity:
"""Classify failure severity."""
if health_status >= 500:
return FailureSeverity.CRITICAL
if health_status >= 400:
return FailureSeverity.DEGRADED
return FailureSeverity.WARNING
def _classify_category(self, logs: str) -> FailureCategory:
"""Classify the type of failure from logs."""
patterns = {
FailureCategory.OOM: [r"OOMKilled", r"out of memory", r"memory limit"],
FailureCategory.CRASH: [r"CrashLoopBackOff", r"segfault", r"SIGSEGV"],
FailureCategory.NETWORK: [r"connection refused", r"ECONNREFUSED", r"timeout"],
FailureCategory.DEPENDENCY: [r"module not found", r"cannot find module", r"ModuleNotFoundError"],
FailureCategory.CONFIG: [r"invalid configuration", r"config error"],
FailureCategory.BUILD: [r"build failed", r"compilation error"],
FailureCategory.DEPLOY: [r"ImagePullBackOff", r"ErrImagePull"],
FailureCategory.SYNTAX: [r"SyntaxError", r"syntax error", r"unexpected token"],
FailureCategory.LOGIC: [r"TypeError", r"ReferenceError", r"undefined is not"],
FailureCategory.TIMEOUT: [r"timed out", r"ETIMEDOUT", r"TimeoutError"],
}
for category, regexes in patterns.items():
for regex in regexes:
if re.search(regex, logs, re.IGNORECASE):
return category
return FailureCategory.UNKNOWN
def _extract_error(self, logs: str) -> str:
"""Extract the error message from logs."""
# Look for common error patterns
error_patterns = [
r"Error: (.+?)(?:\n|$)",
r"ERROR: (.+?)(?:\n|$)",
r"FATAL: (.+?)(?:\n|$)",
r"panic: (.+?)(?:\n|$)",
r"Exception: (.+?)(?:\n|$)",
r"(\w+Error): (.+?)(?:\n|$)",
]
for pattern in error_patterns:
match = re.search(pattern, logs, re.MULTILINE)
if match:
return match.group(0).strip()
# Return last non-empty line as fallback
lines = [l for l in logs.split('\n') if l.strip()]
return lines[-1] if lines else "Unknown error"
def _analyze_root_cause(self, event: FailureEvent) -> str:
"""Deep analysis of root cause."""
analysis_map = {
FailureCategory.OOM: (
f"Memory exhaustion in {event.app_name}. "
f"Container hit memory limit. Increase memory request or optimize memory usage."
),
FailureCategory.CRASH: (
f"Application crash in {event.app_name}. "
f"Check for segfaults in native modules or unhandled exceptions."
),
FailureCategory.NETWORK: (
f"Network error in {event.app_name}. "
f"Dependency service unreachable or port mismatch."
),
FailureCategory.DEPENDENCY: (
f"Missing dependency in {event.app_name}. "
f"Check package.json/requirements.txt for missing packages."
),
FailureCategory.CONFIG: (
f"Configuration error in {event.app_name}. "
f"Environment variables or config files are invalid."
),
FailureCategory.SYNTAX: (
f"Syntax error in {event.app_name}. "
f"Code has invalid syntax that prevents execution."
),
FailureCategory.LOGIC: (
f"Runtime logic error in {event.app_name}. "
f"Type error, null reference, or undefined value at runtime."
),
FailureCategory.BUILD: (
f"Build failure in {event.app_name}. "
f"Compilation or bundling step failed."
),
}
return analysis_map.get(
event.category,
f"Unknown failure in {event.app_name}: {event.error_message}"
)
def _generate_fix(self, event: FailureEvent) -> str:
"""Generate a fix for the failure."""
# Check known fixes first
for pattern, fix in self.known_fixes.items():
if pattern in event.error_message.lower():
return fix
fix_map = {
FailureCategory.OOM: "Increase memory limit in deployment config and optimize allocations",
FailureCategory.DEPENDENCY: "Add missing dependency to package manifest and rebuild",
FailureCategory.CONFIG: "Fix environment variable configuration and redeploy",
FailureCategory.SYNTAX: "Fix syntax error in source code",
FailureCategory.LOGIC: "Add null checks and type guards",
FailureCategory.NETWORK: "Verify service connectivity and port configuration",
FailureCategory.CRASH: "Add error boundary and graceful shutdown handler",
FailureCategory.BUILD: "Fix build script and dependency resolution",
FailureCategory.DEPLOY: "Verify container registry access and image tags",
}
return fix_map.get(event.category, "Manual investigation required")
def _apply_fix(self, event: FailureEvent, fix: str):
"""Apply the fix to the codebase/deployment."""
# The Brain modifies the actual source files to implement the fix
# For deployment-level fixes, it modifies the Kuberns configs
pass
def _verify_fix(self, event: FailureEvent) -> bool:
"""Verify the fix by running tests."""
# Run the test suite
# Run health checks against the fixed deployment
return True # Simulated for now
def _rollback(self, app_name: str):
"""Rollback to the last known good deployment."""
# Execute kubectl rollout undo
print(f" β†ͺ Rolling back {app_name} to previous version")
def _redeploy(self, app_name: str):
"""Redeploy the fixed application."""
# Build new image, push, and deploy
print(f" β†ͺ Redeploying {app_name}")
def _learn_from_failure(self, event: FailureEvent):
"""Learn from this failure to prevent recurrence."""
if event.root_cause and event.fix_applied:
key = event.error_message.lower()[:100] # Use error message as pattern key
self.known_fixes[key] = event.fix_applied
def get_health_report(self) -> Dict[str, Any]:
"""Generate a health report for all applications."""
total_failures = len(self.failure_history)
healed = sum(1 for f in self.failure_history if f.fix_successful)
return {
"total_failures": total_failures,
"healed": healed,
"heal_rate": healed / total_failures if total_failures > 0 else 1.0,
"known_patterns": len(self.known_fixes),
"recent_failures": [
{
"app": f.app_name,
"category": f.category.value,
"severity": f.severity.value,
"healed": f.fix_successful,
"time_ago_s": time.time() - f.timestamp,
}
for f in self.failure_history[-5:]
],
}
# ═══════════════════════════════════════════════════════════════════════════════
# CONTINUOUS MONITOR
# ═══════════════════════════════════════════════════════════════════════════════
class ContinuousMonitor:
"""
Continuous monitoring loop β€” watches apps and triggers self-healing.
Runs as a background daemon:
- Pings health endpoints every 30s
- Collects pod metrics
- Detects anomalies
- Triggers self-healing on failure
"""
def __init__(self, healer: SelfHealingEngine):
self.healer = healer
self.apps: Dict[str, str] = {} # app_name β†’ health_url
def register_app(self, app_name: str, health_url: str):
"""Register an app for monitoring."""
self.apps[app_name] = health_url
async def monitor_loop(self, interval_s: int = 30):
"""Main monitoring loop."""
import asyncio
while True:
for app_name, health_url in self.apps.items():
try:
# Health check
import urllib.request
resp = urllib.request.urlopen(health_url, timeout=5)
if resp.status != 200:
# Failure detected
event = self.healer.detect_failure(
app_name,
logs=f"Health check returned {resp.status}",
health_status=resp.status,
)
if event:
self.healer.heal(event)
except Exception as e:
# Connection failure
event = self.healer.detect_failure(
app_name,
logs=f"Health check failed: {e}",
health_status=503,
)
if event:
self.healer.heal(event)
await asyncio.sleep(interval_s)