#!/usr/bin/env python3 """ đŸ›Ąī¸ FAILSAFE WATCHDOG - Immortal Guardian Space Monitors Agent Zero workspace, auto-heals on failure, persists state via Mem0 Chimera Protocol: ZeroGPU/FreeCPU only - no credit consumption """ import os import sys import json import time import hashlib import asyncio import threading import requests from datetime import datetime, timedelta from typing import Dict, List, Optional, Any from dataclasses import dataclass, asdict from pathlib import Path import gradio as gr from huggingface_hub import HfApi # Optional: Mem0 for persistent memory try: from mem0 import Memory MEM0_AVAILABLE = True except ImportError: MEM0_AVAILABLE = False print("âš ī¸ Mem0 not installed - using local JSON fallback") # Configuration WATCHDOG_CONFIG = { "target_workspace": "ScottzillaSystems/agent-zero", "check_interval": 30, # seconds "failure_threshold": 3, "mem0_enabled": True, "self_heal_enabled": True, "backup_watchdog_count": 2, "hf_token": os.environ.get("HF_TOKEN"), "mem0_api_key": os.environ.get("MEM0_API_KEY") } @dataclass class HealthCheck: timestamp: str status: str # healthy, degraded, failed latency_ms: float error_patterns: List[str] action_taken: Optional[str] = None class Mem0Persistence: """Cross-restart memory via Mem0 or local fallback""" def __init__(self, api_key: Optional[str] = None): self.api_key = api_key or os.environ.get("MEM0_API_KEY") self.user_id = "failsafe_watchdog" if MEM0_AVAILABLE and self.api_key: try: self.memory = Memory(api_key=self.api_key) self.mode = "mem0_cloud" print("✅ Mem0 cloud persistence active") except Exception as e: print(f"âš ī¸ Mem0 init failed: {e}, using local fallback") self.mode = "local_json" else: self.mode = "local_json" print("✅ Local JSON persistence active") self.local_path = Path("/data/watchdog_memory.json") self._ensure_local_storage() def _ensure_local_storage(self): if self.mode == "local_json": self.local_path.parent.mkdir(parents=True, exist_ok=True) if not self.local_path.exists(): self.local_path.write_text(json.dumps({"incidents": [], "state": {}})) def add(self, message: str, metadata: Dict = None): """Store memory entry""" if self.mode == "mem0_cloud": try: self.memory.add(message, user_id=self.user_id, metadata=metadata or {}) return True except Exception as e: print(f"Mem0 add failed: {e}, falling back to local") # Local fallback data = json.loads(self.local_path.read_text()) data["incidents"].append({ "timestamp": datetime.utcnow().isoformat(), "message": message, "metadata": metadata or {} }) self.local_path.write_text(json.dumps(data, indent=2)) return True def search(self, query: str, limit: int = 10) -> List[Dict]: """Search memory""" if self.mode == "mem0_cloud": try: return self.memory.search(query, user_id=self.user_id, limit=limit) except Exception as e: print(f"Mem0 search failed: {e}") # Local fallback - simple text search data = json.loads(self.local_path.read_text()) results = [] for incident in data["incidents"]: if query.lower() in incident["message"].lower(): results.append(incident) return results[-limit:] def get_state(self) -> Dict: """Get current watchdog state""" if self.mode == "local_json": data = json.loads(self.local_path.read_text()) return data.get("state", {}) return {} def save_state(self, state: Dict): """Save watchdog state""" if self.mode == "local_json": data = json.loads(self.local_path.read_text()) data["state"] = state self.local_path.write_text(json.dumps(data, indent=2)) class WorkspaceMonitor: """Monitors Agent Zero workspace health""" FAILURE_PATTERNS = { "rate_limit": ["Key limit exceeded", "429", "quota exceeded", "rate limit"], "fd_exhaustion": ["Too many open files", "EMFILE", "Errno 24"], "memory_pressure": ["MemoryError", "OOM", "Killed process", "out of memory"], "api_blackout": ["Connection refused", "Timeout", "Name resolution failed"], "auth_failure": ["401", "403", "Unauthorized", "Invalid token"] } def __init__(self, target_space: str, hf_token: str): self.target = target_space self.hf_token = hf_token self.api = HfApi(token=hf_token) self.history: List[HealthCheck] = [] self.consecutive_failures = 0 async def check_health(self) -> HealthCheck: """Perform health check on target workspace""" start = time.time() error_patterns = [] status = "healthy" action = None try: # Check HF Space runtime status space_info = self.api.space_info(self.target) runtime = getattr(space_info, 'runtime', {}) stage = runtime.get('stage', 'UNKNOWN') if stage == 'RUNNING': status = "healthy" elif stage == 'BUILD_ERROR': status = "failed" error_patterns.append("build_error") action = "trigger_rebuild" elif stage == 'SLEEPING': status = "degraded" action = "wake_space" else: status = "degraded" except Exception as e: status = "failed" error_msg = str(e).lower() # Pattern matching for pattern_type, patterns in self.FAILURE_PATTERNS.items(): if any(p.lower() in error_msg for p in patterns): error_patterns.append(pattern_type) if not error_patterns: error_patterns.append("unknown") action = self._determine_action(error_patterns) latency = (time.time() - start) * 1000 check = HealthCheck( timestamp=datetime.utcnow().isoformat(), status=status, latency_ms=latency, error_patterns=error_patterns, action_taken=action ) self.history.append(check) if len(self.history) > 100: self.history = self.history[-100:] # Track consecutive failures if status == "failed": self.consecutive_failures += 1 else: self.consecutive_failures = 0 return check def _determine_action(self, error_patterns: List[str]) -> str: """Determine remediation action from error patterns""" if "fd_exhaustion" in error_patterns: return "restart_container" elif "rate_limit" in error_patterns: return "switch_to_local_inference" elif "memory_pressure" in error_patterns: return "scale_down_agents" elif "build_error" in error_patterns: return "trigger_rebuild" else: return "restart_container" async def execute_remediation(self, action: str) -> bool: """Execute remediation action""" print(f"🔧 Executing remediation: {action}") try: if action == "restart_container": # Trigger space restart via empty commit self.api.upload_file( repo_id=self.target, repo_type="space", path_in_repo=".watchdog_trigger", path_or_fileobj=f"restart_{int(time.time())}".encode(), commit_message="[FAILSAFE] Auto-restart due to health check failure" ) return True elif action == "trigger_rebuild": # Touch README to trigger rebuild self.api.upload_file( repo_id=self.target, repo_type="space", path_in_repo="README.md", path_or_fileobj=b"# Trigger rebuild\n", commit_message="[FAILSAFE] Trigger rebuild" ) return True elif action == "wake_space": # Send request to wake sleeping space requests.get(f"https://{self.target.replace('/', '-')}.hf.space", timeout=10) return True except Exception as e: print(f"❌ Remediation failed: {e}") return False return False class SelfReplication: """Spawns backup watchdogs if current instance degrades""" def __init__(self, hf_token: str, mem0: Mem0Persistence): self.hf_token = hf_token self.mem0 = mem0 self.api = HfApi(token=hf_token) self.backup_spaces = [ "ScottzillaSystems/failsafe-watchdog-beta", "ScottzillaSystems/failsafe-watchdog-gamma" ] def check_own_health(self) -> bool: """Check if this watchdog is healthy""" # Simple checks if not self.hf_token: return False # Check if we can write to memory try: self.mem0.add("health_check", {"type": "self_check"}) return True except: return False def spawn_backup(self) -> Optional[str]: """Spawn backup watchdog instance""" for backup_space in self.backup_spaces: try: # Check if backup exists try: self.api.space_info(backup_space) print(f"✅ Backup {backup_space} already exists") return backup_space except: # Create backup space self.api.create_repo( repo_id=backup_space, repo_type="space", space_sdk="gradio" ) print(f"🚀 Spawned backup watchdog: {backup_space}") return backup_space except Exception as e: print(f"âš ī¸ Failed to spawn {backup_space}: {e}") return None class FailsafeWatchdog: """Main watchdog orchestrator""" def __init__(self): self.config = WATCHDOG_CONFIG self.mem0 = Mem0Persistence(self.config["mem0_api_key"]) self.monitor = WorkspaceMonitor( self.config["target_workspace"], self.config["hf_token"] ) self.replication = SelfReplication( self.config["hf_token"], self.mem0 ) self.running = False self.status_log: List[Dict] = [] # Restore state from Mem0 self._restore_state() def _restore_state(self): """Restore previous state from persistent memory""" state = self.mem0.get_state() if state: print(f"đŸ“Ĩ Restored state: {state.get('restart_count', 0)} previous restarts") async def run(self): """Main watchdog loop""" self.running = True print("đŸ›Ąī¸ Failsafe Watchdog started") while self.running: try: # Health check check = await self.monitor.check_health() # Log to Mem0 if check.status in ["failed", "degraded"]: self.mem0.add( f"Health check {check.status}: {', '.join(check.error_patterns)}", {"type": "health_check", "status": check.status} ) # Execute remediation if needed if check.action_taken and self.monitor.consecutive_failures >= self.config["failure_threshold"]: success = await self.monitor.execute_remediation(check.action_taken) self.mem0.add( f"Remediation {check.action_taken}: {'success' if success else 'failed'}", {"type": "remediation", "action": check.action_taken, "success": success} ) if success: self.monitor.consecutive_failures = 0 # Self-replication check if not self.replication.check_own_health(): backup = self.replication.spawn_backup() if backup: self.mem0.add( f"Self-replication triggered: spawned {backup}", {"type": "self_replication", "backup": backup} ) # Update status log self.status_log.append(asdict(check)) if len(self.status_log) > 50: self.status_log = self.status_log[-50:] # Save state self.mem0.save_state({ "last_check": check.timestamp, "restart_count": len([s for s in self.status_log if s.get("action_taken")]), "consecutive_failures": self.monitor.consecutive_failures }) await asyncio.sleep(self.config["check_interval"]) except Exception as e: print(f"❌ Watchdog error: {e}") await asyncio.sleep(5) def get_dashboard_data(self) -> Dict: """Get data for Gradio dashboard""" recent_checks = self.status_log[-10:] return { "status": "running" if self.running else "stopped", "target": self.config["target_workspace"], "last_check": recent_checks[-1] if recent_checks else None, "health_history": recent_checks, "consecutive_failures": self.monitor.consecutive_failures, "mem0_mode": self.mem0.mode, "incidents": self.mem0.search("failed", limit=5) if hasattr(self, 'mem0') else [] } # Gradio UI watchdog = FailsafeWatchdog() def start_watchdog(): if not watchdog.running: asyncio.create_task(watchdog.run()) return "đŸ›Ąī¸ Watchdog started" def stop_watchdog(): watchdog.running = False return "âšī¸ Watchdog stopped" def get_status(): return json.dumps(watchdog.get_dashboard_data(), indent=2, default=str) def view_incidents(): incidents = watchdog.mem0.search("incident", limit=10) return json.dumps(incidents, indent=2, default=str) with gr.Blocks(title="Failsafe Watchdog") as demo: gr.Markdown("# đŸ›Ąī¸ Failsafe Watchdog - Immortal Guardian") gr.Markdown("Monitors Agent Zero workspace, auto-heals on failure, persists via Mem0") with gr.Row(): start_btn = gr.Button("â–ļī¸ Start Watchdog", variant="primary") stop_btn = gr.Button("âšī¸ Stop Watchdog", variant="secondary") with gr.Row(): status_btn = gr.Button("📊 Get Status") incidents_btn = gr.Button("📋 View Incidents") output = gr.Textbox(label="Output", lines=20) start_btn.click(start_watchdog, outputs=output) stop_btn.click(stop_watchdog, outputs=output) status_btn.click(get_status, outputs=output) incidents_btn.click(view_incidents, outputs=output) # Auto-refresh status using Gradio Timer (5.8+ compatible) timer = gr.Timer(30) timer.tick(get_status, outputs=output) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860)