""" HuggingFace Space Health Monitor Continuous monitoring and alerting for HF Spaces """ import json import logging import os import time from datetime import datetime from typing import Any, Dict import psutil import requests # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[logging.FileHandler("logs/health_monitor.log"), logging.StreamHandler()], ) logger = logging.getLogger(__name__) class HFSpaceHealthMonitor: """Health monitoring for HuggingFace Spaces""" def __init__(self): self.check_interval = int(os.getenv("HEALTH_CHECK_INTERVAL", 60)) self.webhook_url = os.getenv("SLACK_WEBHOOK_URL") self.space_url = os.getenv("SPACE_URL", "http://localhost:7860") self.memory_threshold = float(os.getenv("MEMORY_THRESHOLD", 85.0)) self.disk_threshold = float(os.getenv("DISK_THRESHOLD", 85.0)) # Ensure logs directory exists os.makedirs("logs", exist_ok=True) logger.info("🚀 HF Space Health Monitor initialized") logger.info(f" Check interval: {self.check_interval}s") logger.info(f" Memory threshold: {self.memory_threshold}%") logger.info(f" Disk threshold: {self.disk_threshold}%") def check_system_health(self) -> Dict[str, Any]: """Check system resource health""" try: # Memory usage memory = psutil.virtual_memory() memory_percent = memory.percent # Disk usage disk = psutil.disk_usage("/") disk_percent = (disk.used / disk.total) * 100 # CPU usage cpu_percent = psutil.cpu_percent(interval=1) return { "memory_percent": memory_percent, "memory_available_gb": memory.available / (1024**3), "disk_percent": disk_percent, "disk_free_gb": disk.free / (1024**3), "cpu_percent": cpu_percent, "timestamp": datetime.now().isoformat(), } except Exception as e: logger.error(f"Error checking system health: {e}") return {"error": str(e)} def check_application_health(self) -> Dict[str, Any]: """Check application health endpoints""" try: # Check main health endpoint response = requests.get(f"{self.space_url}/health", timeout=10) health_status = response.status_code == 200 # Check if citation fix is working citation_test = self.test_citation_fix() return { "health_endpoint": health_status, "status_code": response.status_code, "response_time_ms": response.elapsed.total_seconds() * 1000, "citation_fix_working": citation_test, "timestamp": datetime.now().isoformat(), } except Exception as e: logger.error(f"Error checking application health: {e}") return { "health_endpoint": False, "error": str(e), "timestamp": datetime.now().isoformat(), } def test_citation_fix(self) -> bool: """Test that citation fix is working""" try: # Quick test of citation formatting test_payload = { "message": "What is the remote work policy?", "test_mode": True, } response = requests.post(f"{self.space_url}/chat", json=test_payload, timeout=30) if response.status_code == 200: # Check if response contains proper citation format response_text = response.text return "[Source:" in response_text and "document_1.md" not in response_text except Exception as e: logger.warning(f"Citation test failed: {e}") return False def check_hf_services(self) -> Dict[str, Any]: """Check HuggingFace service connectivity""" try: hf_token = os.getenv("HF_TOKEN") if not hf_token: return {"hf_token_configured": False} # Test HF Inference API headers = {"Authorization": f"Bearer {hf_token}"} response = requests.get( "https://router.huggingface.co/hf-inference/models/intfloat/multilingual-e5-large", headers=headers, timeout=10, ) return { "hf_token_configured": True, "hf_api_accessible": response.status_code in [200, 503], # 503 is "loading" "hf_api_status": response.status_code, "timestamp": datetime.now().isoformat(), } except Exception as e: logger.error(f"Error checking HF services: {e}") return {"error": str(e)} def generate_health_report(self) -> Dict[str, Any]: """Generate comprehensive health report""" system_health = self.check_system_health() app_health = self.check_application_health() hf_health = self.check_hf_services() # Determine overall health status is_healthy = ( system_health.get("memory_percent", 100) < self.memory_threshold and system_health.get("disk_percent", 100) < self.disk_threshold and app_health.get("health_endpoint", False) and app_health.get("citation_fix_working", False) ) return { "overall_healthy": is_healthy, "system": system_health, "application": app_health, "huggingface": hf_health, "timestamp": datetime.now().isoformat(), } def send_alert(self, message: str, health_report: Dict[str, Any]): """Send alert notification""" alert_payload = { "text": f"🚨 HF Space Alert: {message}", "timestamp": datetime.now().isoformat(), "details": health_report, } # Log the alert logger.error(f"ALERT: {message}") logger.error(f"Health Report: {json.dumps(health_report, indent=2)}") # Send to webhook if configured if self.webhook_url: try: requests.post(self.webhook_url, json=alert_payload, timeout=10) logger.info("Alert sent to webhook") except Exception as e: logger.error(f"Failed to send webhook alert: {e}") def log_health_status(self, health_report: Dict[str, Any]): """Log current health status""" system = health_report.get("system", {}) app = health_report.get("application", {}) logger.info( "Health Status: " f"Memory={system.get('memory_percent', 'N/A'):.1f}%, " f"Disk={system.get('disk_percent', 'N/A'):.1f}%, " f"CPU={system.get('cpu_percent', 'N/A'):.1f}%, " f"App={app.get('health_endpoint', False)}, " f"Citations={app.get('citation_fix_working', False)}", ) def run_monitoring_loop(self): """Main monitoring loop""" logger.info("🔍 Starting health monitoring loop...") while True: try: # Generate health report health_report = self.generate_health_report() # Log status self.log_health_status(health_report) # Check for alerts if not health_report["overall_healthy"]: system = health_report.get("system", {}) app = health_report.get("application", {}) alert_reasons = [] if system.get("memory_percent", 0) >= self.memory_threshold: alert_reasons.append(f"High memory usage: {system['memory_percent']:.1f}%") if system.get("disk_percent", 0) >= self.disk_threshold: alert_reasons.append(f"High disk usage: {system['disk_percent']:.1f}%") if not app.get("health_endpoint", True): alert_reasons.append("Health endpoint failing") if not app.get("citation_fix_working", True): alert_reasons.append("Citation fix not working") alert_message = "; ".join(alert_reasons) self.send_alert(alert_message, health_report) # Save health report to file with open("logs/latest_health.json", "w") as f: json.dump(health_report, f, indent=2) except Exception as e: logger.error(f"Error in monitoring loop: {e}") # Wait for next check time.sleep(self.check_interval) def main(): """Main entry point""" monitor = HFSpaceHealthMonitor() try: monitor.run_monitoring_loop() except KeyboardInterrupt: logger.info("Health monitoring stopped by user") except Exception as e: logger.error(f"Health monitoring crashed: {e}") raise if __name__ == "__main__": main()