Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| 🛡️ FAILSAFE WATCHDOG - Immortal Guardian Space | |
| Monitors Agent Zero workspace, auto-heals on failure, persists state via Mem0 | |
| Chimera Protocol: ZeroGPU/FreeCPU only - no credit consumption | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import time | |
| import hashlib | |
| import asyncio | |
| import threading | |
| import requests | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional, Any | |
| from dataclasses import dataclass, asdict | |
| from pathlib import Path | |
| import gradio as gr | |
| from huggingface_hub import HfApi | |
| # Optional: Mem0 for persistent memory | |
| try: | |
| from mem0 import Memory | |
| MEM0_AVAILABLE = True | |
| except ImportError: | |
| MEM0_AVAILABLE = False | |
| print("⚠️ Mem0 not installed - using local JSON fallback") | |
| # Configuration | |
| WATCHDOG_CONFIG = { | |
| "target_workspace": "ScottzillaSystems/agent-zero", | |
| "check_interval": 30, # seconds | |
| "failure_threshold": 3, | |
| "mem0_enabled": True, | |
| "self_heal_enabled": True, | |
| "backup_watchdog_count": 2, | |
| "hf_token": os.environ.get("HF_TOKEN"), | |
| "mem0_api_key": os.environ.get("MEM0_API_KEY") | |
| } | |
| class HealthCheck: | |
| timestamp: str | |
| status: str # healthy, degraded, failed | |
| latency_ms: float | |
| error_patterns: List[str] | |
| action_taken: Optional[str] = None | |
| class Mem0Persistence: | |
| """Cross-restart memory via Mem0 or local fallback""" | |
| def __init__(self, api_key: Optional[str] = None): | |
| self.api_key = api_key or os.environ.get("MEM0_API_KEY") | |
| self.user_id = "failsafe_watchdog" | |
| if MEM0_AVAILABLE and self.api_key: | |
| try: | |
| self.memory = Memory(api_key=self.api_key) | |
| self.mode = "mem0_cloud" | |
| print("✅ Mem0 cloud persistence active") | |
| except Exception as e: | |
| print(f"⚠️ Mem0 init failed: {e}, using local fallback") | |
| self.mode = "local_json" | |
| else: | |
| self.mode = "local_json" | |
| print("✅ Local JSON persistence active") | |
| self.local_path = Path("/data/watchdog_memory.json") | |
| self._ensure_local_storage() | |
| def _ensure_local_storage(self): | |
| if self.mode == "local_json": | |
| self.local_path.parent.mkdir(parents=True, exist_ok=True) | |
| if not self.local_path.exists(): | |
| self.local_path.write_text(json.dumps({"incidents": [], "state": {}})) | |
| def add(self, message: str, metadata: Dict = None): | |
| """Store memory entry""" | |
| if self.mode == "mem0_cloud": | |
| try: | |
| self.memory.add(message, user_id=self.user_id, metadata=metadata or {}) | |
| return True | |
| except Exception as e: | |
| print(f"Mem0 add failed: {e}, falling back to local") | |
| # Local fallback | |
| data = json.loads(self.local_path.read_text()) | |
| data["incidents"].append({ | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "message": message, | |
| "metadata": metadata or {} | |
| }) | |
| self.local_path.write_text(json.dumps(data, indent=2)) | |
| return True | |
| def search(self, query: str, limit: int = 10) -> List[Dict]: | |
| """Search memory""" | |
| if self.mode == "mem0_cloud": | |
| try: | |
| return self.memory.search(query, user_id=self.user_id, limit=limit) | |
| except Exception as e: | |
| print(f"Mem0 search failed: {e}") | |
| # Local fallback - simple text search | |
| data = json.loads(self.local_path.read_text()) | |
| results = [] | |
| for incident in data["incidents"]: | |
| if query.lower() in incident["message"].lower(): | |
| results.append(incident) | |
| return results[-limit:] | |
| def get_state(self) -> Dict: | |
| """Get current watchdog state""" | |
| if self.mode == "local_json": | |
| data = json.loads(self.local_path.read_text()) | |
| return data.get("state", {}) | |
| return {} | |
| def save_state(self, state: Dict): | |
| """Save watchdog state""" | |
| if self.mode == "local_json": | |
| data = json.loads(self.local_path.read_text()) | |
| data["state"] = state | |
| self.local_path.write_text(json.dumps(data, indent=2)) | |
| class WorkspaceMonitor: | |
| """Monitors Agent Zero workspace health""" | |
| FAILURE_PATTERNS = { | |
| "rate_limit": ["Key limit exceeded", "429", "quota exceeded", "rate limit"], | |
| "fd_exhaustion": ["Too many open files", "EMFILE", "Errno 24"], | |
| "memory_pressure": ["MemoryError", "OOM", "Killed process", "out of memory"], | |
| "api_blackout": ["Connection refused", "Timeout", "Name resolution failed"], | |
| "auth_failure": ["401", "403", "Unauthorized", "Invalid token"] | |
| } | |
| def __init__(self, target_space: str, hf_token: str): | |
| self.target = target_space | |
| self.hf_token = hf_token | |
| self.api = HfApi(token=hf_token) | |
| self.history: List[HealthCheck] = [] | |
| self.consecutive_failures = 0 | |
| async def check_health(self) -> HealthCheck: | |
| """Perform health check on target workspace""" | |
| start = time.time() | |
| error_patterns = [] | |
| status = "healthy" | |
| action = None | |
| try: | |
| # Check HF Space runtime status | |
| space_info = self.api.space_info(self.target) | |
| runtime = getattr(space_info, 'runtime', {}) | |
| stage = runtime.get('stage', 'UNKNOWN') | |
| if stage == 'RUNNING': | |
| status = "healthy" | |
| elif stage == 'BUILD_ERROR': | |
| status = "failed" | |
| error_patterns.append("build_error") | |
| action = "trigger_rebuild" | |
| elif stage == 'SLEEPING': | |
| status = "degraded" | |
| action = "wake_space" | |
| else: | |
| status = "degraded" | |
| except Exception as e: | |
| status = "failed" | |
| error_msg = str(e).lower() | |
| # Pattern matching | |
| for pattern_type, patterns in self.FAILURE_PATTERNS.items(): | |
| if any(p.lower() in error_msg for p in patterns): | |
| error_patterns.append(pattern_type) | |
| if not error_patterns: | |
| error_patterns.append("unknown") | |
| action = self._determine_action(error_patterns) | |
| latency = (time.time() - start) * 1000 | |
| check = HealthCheck( | |
| timestamp=datetime.utcnow().isoformat(), | |
| status=status, | |
| latency_ms=latency, | |
| error_patterns=error_patterns, | |
| action_taken=action | |
| ) | |
| self.history.append(check) | |
| if len(self.history) > 100: | |
| self.history = self.history[-100:] | |
| # Track consecutive failures | |
| if status == "failed": | |
| self.consecutive_failures += 1 | |
| else: | |
| self.consecutive_failures = 0 | |
| return check | |
| def _determine_action(self, error_patterns: List[str]) -> str: | |
| """Determine remediation action from error patterns""" | |
| if "fd_exhaustion" in error_patterns: | |
| return "restart_container" | |
| elif "rate_limit" in error_patterns: | |
| return "switch_to_local_inference" | |
| elif "memory_pressure" in error_patterns: | |
| return "scale_down_agents" | |
| elif "build_error" in error_patterns: | |
| return "trigger_rebuild" | |
| else: | |
| return "restart_container" | |
| async def execute_remediation(self, action: str) -> bool: | |
| """Execute remediation action""" | |
| print(f"🔧 Executing remediation: {action}") | |
| try: | |
| if action == "restart_container": | |
| # Trigger space restart via empty commit | |
| self.api.upload_file( | |
| repo_id=self.target, | |
| repo_type="space", | |
| path_in_repo=".watchdog_trigger", | |
| path_or_fileobj=f"restart_{int(time.time())}".encode(), | |
| commit_message="[FAILSAFE] Auto-restart due to health check failure" | |
| ) | |
| return True | |
| elif action == "trigger_rebuild": | |
| # Touch README to trigger rebuild | |
| self.api.upload_file( | |
| repo_id=self.target, | |
| repo_type="space", | |
| path_in_repo="README.md", | |
| path_or_fileobj=b"# Trigger rebuild\n", | |
| commit_message="[FAILSAFE] Trigger rebuild" | |
| ) | |
| return True | |
| elif action == "wake_space": | |
| # Send request to wake sleeping space | |
| requests.get(f"https://{self.target.replace('/', '-')}.hf.space", timeout=10) | |
| return True | |
| except Exception as e: | |
| print(f"❌ Remediation failed: {e}") | |
| return False | |
| return False | |
| class SelfReplication: | |
| """Spawns backup watchdogs if current instance degrades""" | |
| def __init__(self, hf_token: str, mem0: Mem0Persistence): | |
| self.hf_token = hf_token | |
| self.mem0 = mem0 | |
| self.api = HfApi(token=hf_token) | |
| self.backup_spaces = [ | |
| "ScottzillaSystems/failsafe-watchdog-beta", | |
| "ScottzillaSystems/failsafe-watchdog-gamma" | |
| ] | |
| def check_own_health(self) -> bool: | |
| """Check if this watchdog is healthy""" | |
| # Simple checks | |
| if not self.hf_token: | |
| return False | |
| # Check if we can write to memory | |
| try: | |
| self.mem0.add("health_check", {"type": "self_check"}) | |
| return True | |
| except: | |
| return False | |
| def spawn_backup(self) -> Optional[str]: | |
| """Spawn backup watchdog instance""" | |
| for backup_space in self.backup_spaces: | |
| try: | |
| # Check if backup exists | |
| try: | |
| self.api.space_info(backup_space) | |
| print(f"✅ Backup {backup_space} already exists") | |
| return backup_space | |
| except: | |
| # Create backup space | |
| self.api.create_repo( | |
| repo_id=backup_space, | |
| repo_type="space", | |
| space_sdk="gradio" | |
| ) | |
| print(f"🚀 Spawned backup watchdog: {backup_space}") | |
| return backup_space | |
| except Exception as e: | |
| print(f"⚠️ Failed to spawn {backup_space}: {e}") | |
| return None | |
| class FailsafeWatchdog: | |
| """Main watchdog orchestrator""" | |
| def __init__(self): | |
| self.config = WATCHDOG_CONFIG | |
| self.mem0 = Mem0Persistence(self.config["mem0_api_key"]) | |
| self.monitor = WorkspaceMonitor( | |
| self.config["target_workspace"], | |
| self.config["hf_token"] | |
| ) | |
| self.replication = SelfReplication( | |
| self.config["hf_token"], | |
| self.mem0 | |
| ) | |
| self.running = False | |
| self.status_log: List[Dict] = [] | |
| # Restore state from Mem0 | |
| self._restore_state() | |
| def _restore_state(self): | |
| """Restore previous state from persistent memory""" | |
| state = self.mem0.get_state() | |
| if state: | |
| print(f"📥 Restored state: {state.get('restart_count', 0)} previous restarts") | |
| async def run(self): | |
| """Main watchdog loop""" | |
| self.running = True | |
| print("🛡️ Failsafe Watchdog started") | |
| while self.running: | |
| try: | |
| # Health check | |
| check = await self.monitor.check_health() | |
| # Log to Mem0 | |
| if check.status in ["failed", "degraded"]: | |
| self.mem0.add( | |
| f"Health check {check.status}: {', '.join(check.error_patterns)}", | |
| {"type": "health_check", "status": check.status} | |
| ) | |
| # Execute remediation if needed | |
| if check.action_taken and self.monitor.consecutive_failures >= self.config["failure_threshold"]: | |
| success = await self.monitor.execute_remediation(check.action_taken) | |
| self.mem0.add( | |
| f"Remediation {check.action_taken}: {'success' if success else 'failed'}", | |
| {"type": "remediation", "action": check.action_taken, "success": success} | |
| ) | |
| if success: | |
| self.monitor.consecutive_failures = 0 | |
| # Self-replication check | |
| if not self.replication.check_own_health(): | |
| backup = self.replication.spawn_backup() | |
| if backup: | |
| self.mem0.add( | |
| f"Self-replication triggered: spawned {backup}", | |
| {"type": "self_replication", "backup": backup} | |
| ) | |
| # Update status log | |
| self.status_log.append(asdict(check)) | |
| if len(self.status_log) > 50: | |
| self.status_log = self.status_log[-50:] | |
| # Save state | |
| self.mem0.save_state({ | |
| "last_check": check.timestamp, | |
| "restart_count": len([s for s in self.status_log if s.get("action_taken")]), | |
| "consecutive_failures": self.monitor.consecutive_failures | |
| }) | |
| await asyncio.sleep(self.config["check_interval"]) | |
| except Exception as e: | |
| print(f"❌ Watchdog error: {e}") | |
| await asyncio.sleep(5) | |
| def get_dashboard_data(self) -> Dict: | |
| """Get data for Gradio dashboard""" | |
| recent_checks = self.status_log[-10:] | |
| return { | |
| "status": "running" if self.running else "stopped", | |
| "target": self.config["target_workspace"], | |
| "last_check": recent_checks[-1] if recent_checks else None, | |
| "health_history": recent_checks, | |
| "consecutive_failures": self.monitor.consecutive_failures, | |
| "mem0_mode": self.mem0.mode, | |
| "incidents": self.mem0.search("failed", limit=5) if hasattr(self, 'mem0') else [] | |
| } | |
| # Gradio UI | |
| watchdog = FailsafeWatchdog() | |
| def start_watchdog(): | |
| if not watchdog.running: | |
| asyncio.create_task(watchdog.run()) | |
| return "🛡️ Watchdog started" | |
| def stop_watchdog(): | |
| watchdog.running = False | |
| return "⏹️ Watchdog stopped" | |
| def get_status(): | |
| return json.dumps(watchdog.get_dashboard_data(), indent=2, default=str) | |
| def view_incidents(): | |
| incidents = watchdog.mem0.search("incident", limit=10) | |
| return json.dumps(incidents, indent=2, default=str) | |
| with gr.Blocks(title="Failsafe Watchdog") as demo: | |
| gr.Markdown("# 🛡️ Failsafe Watchdog - Immortal Guardian") | |
| gr.Markdown("Monitors Agent Zero workspace, auto-heals on failure, persists via Mem0") | |
| with gr.Row(): | |
| start_btn = gr.Button("▶️ Start Watchdog", variant="primary") | |
| stop_btn = gr.Button("⏹️ Stop Watchdog", variant="secondary") | |
| with gr.Row(): | |
| status_btn = gr.Button("📊 Get Status") | |
| incidents_btn = gr.Button("📋 View Incidents") | |
| output = gr.Textbox(label="Output", lines=20) | |
| start_btn.click(start_watchdog, outputs=output) | |
| stop_btn.click(stop_watchdog, outputs=output) | |
| status_btn.click(get_status, outputs=output) | |
| incidents_btn.click(view_incidents, outputs=output) | |
| # Auto-refresh status using Gradio Timer (5.8+ compatible) | |
| timer = gr.Timer(30) | |
| timer.tick(get_status, outputs=output) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |