Spaces:
Sleeping
Sleeping
| """ | |
| HuggingFace Space Health Monitor | |
| Continuous monitoring and alerting for HF Spaces | |
| """ | |
| import json | |
| import logging | |
| import os | |
| import time | |
| from datetime import datetime | |
| from typing import Any, Dict | |
| import psutil | |
| import requests | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s - %(levelname)s - %(message)s", | |
| handlers=[logging.FileHandler("logs/health_monitor.log"), logging.StreamHandler()], | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class HFSpaceHealthMonitor: | |
| """Health monitoring for HuggingFace Spaces""" | |
| def __init__(self): | |
| self.check_interval = int(os.getenv("HEALTH_CHECK_INTERVAL", 60)) | |
| self.webhook_url = os.getenv("SLACK_WEBHOOK_URL") | |
| self.space_url = os.getenv("SPACE_URL", "http://localhost:7860") | |
| self.memory_threshold = float(os.getenv("MEMORY_THRESHOLD", 85.0)) | |
| self.disk_threshold = float(os.getenv("DISK_THRESHOLD", 85.0)) | |
| # Ensure logs directory exists | |
| os.makedirs("logs", exist_ok=True) | |
| logger.info("🚀 HF Space Health Monitor initialized") | |
| logger.info(f" Check interval: {self.check_interval}s") | |
| logger.info(f" Memory threshold: {self.memory_threshold}%") | |
| logger.info(f" Disk threshold: {self.disk_threshold}%") | |
| def check_system_health(self) -> Dict[str, Any]: | |
| """Check system resource health""" | |
| try: | |
| # Memory usage | |
| memory = psutil.virtual_memory() | |
| memory_percent = memory.percent | |
| # Disk usage | |
| disk = psutil.disk_usage("/") | |
| disk_percent = (disk.used / disk.total) * 100 | |
| # CPU usage | |
| cpu_percent = psutil.cpu_percent(interval=1) | |
| return { | |
| "memory_percent": memory_percent, | |
| "memory_available_gb": memory.available / (1024**3), | |
| "disk_percent": disk_percent, | |
| "disk_free_gb": disk.free / (1024**3), | |
| "cpu_percent": cpu_percent, | |
| "timestamp": datetime.now().isoformat(), | |
| } | |
| except Exception as e: | |
| logger.error(f"Error checking system health: {e}") | |
| return {"error": str(e)} | |
| def check_application_health(self) -> Dict[str, Any]: | |
| """Check application health endpoints""" | |
| try: | |
| # Check main health endpoint | |
| response = requests.get(f"{self.space_url}/health", timeout=10) | |
| health_status = response.status_code == 200 | |
| # Check if citation fix is working | |
| citation_test = self.test_citation_fix() | |
| return { | |
| "health_endpoint": health_status, | |
| "status_code": response.status_code, | |
| "response_time_ms": response.elapsed.total_seconds() * 1000, | |
| "citation_fix_working": citation_test, | |
| "timestamp": datetime.now().isoformat(), | |
| } | |
| except Exception as e: | |
| logger.error(f"Error checking application health: {e}") | |
| return { | |
| "health_endpoint": False, | |
| "error": str(e), | |
| "timestamp": datetime.now().isoformat(), | |
| } | |
| def test_citation_fix(self) -> bool: | |
| """Test that citation fix is working""" | |
| try: | |
| # Quick test of citation formatting | |
| test_payload = { | |
| "message": "What is the remote work policy?", | |
| "test_mode": True, | |
| } | |
| response = requests.post(f"{self.space_url}/chat", json=test_payload, timeout=30) | |
| if response.status_code == 200: | |
| # Check if response contains proper citation format | |
| response_text = response.text | |
| return "[Source:" in response_text and "document_1.md" not in response_text | |
| except Exception as e: | |
| logger.warning(f"Citation test failed: {e}") | |
| return False | |
| def check_hf_services(self) -> Dict[str, Any]: | |
| """Check HuggingFace service connectivity""" | |
| try: | |
| hf_token = os.getenv("HF_TOKEN") | |
| if not hf_token: | |
| return {"hf_token_configured": False} | |
| # Test HF Inference API | |
| headers = {"Authorization": f"Bearer {hf_token}"} | |
| response = requests.get( | |
| "https://router.huggingface.co/hf-inference/models/intfloat/multilingual-e5-large", | |
| headers=headers, | |
| timeout=10, | |
| ) | |
| return { | |
| "hf_token_configured": True, | |
| "hf_api_accessible": response.status_code in [200, 503], # 503 is "loading" | |
| "hf_api_status": response.status_code, | |
| "timestamp": datetime.now().isoformat(), | |
| } | |
| except Exception as e: | |
| logger.error(f"Error checking HF services: {e}") | |
| return {"error": str(e)} | |
| def generate_health_report(self) -> Dict[str, Any]: | |
| """Generate comprehensive health report""" | |
| system_health = self.check_system_health() | |
| app_health = self.check_application_health() | |
| hf_health = self.check_hf_services() | |
| # Determine overall health status | |
| is_healthy = ( | |
| system_health.get("memory_percent", 100) < self.memory_threshold | |
| and system_health.get("disk_percent", 100) < self.disk_threshold | |
| and app_health.get("health_endpoint", False) | |
| and app_health.get("citation_fix_working", False) | |
| ) | |
| return { | |
| "overall_healthy": is_healthy, | |
| "system": system_health, | |
| "application": app_health, | |
| "huggingface": hf_health, | |
| "timestamp": datetime.now().isoformat(), | |
| } | |
| def send_alert(self, message: str, health_report: Dict[str, Any]): | |
| """Send alert notification""" | |
| alert_payload = { | |
| "text": f"🚨 HF Space Alert: {message}", | |
| "timestamp": datetime.now().isoformat(), | |
| "details": health_report, | |
| } | |
| # Log the alert | |
| logger.error(f"ALERT: {message}") | |
| logger.error(f"Health Report: {json.dumps(health_report, indent=2)}") | |
| # Send to webhook if configured | |
| if self.webhook_url: | |
| try: | |
| requests.post(self.webhook_url, json=alert_payload, timeout=10) | |
| logger.info("Alert sent to webhook") | |
| except Exception as e: | |
| logger.error(f"Failed to send webhook alert: {e}") | |
| def log_health_status(self, health_report: Dict[str, Any]): | |
| """Log current health status""" | |
| system = health_report.get("system", {}) | |
| app = health_report.get("application", {}) | |
| logger.info( | |
| "Health Status: " | |
| f"Memory={system.get('memory_percent', 'N/A'):.1f}%, " | |
| f"Disk={system.get('disk_percent', 'N/A'):.1f}%, " | |
| f"CPU={system.get('cpu_percent', 'N/A'):.1f}%, " | |
| f"App={app.get('health_endpoint', False)}, " | |
| f"Citations={app.get('citation_fix_working', False)}", | |
| ) | |
| def run_monitoring_loop(self): | |
| """Main monitoring loop""" | |
| logger.info("🔍 Starting health monitoring loop...") | |
| while True: | |
| try: | |
| # Generate health report | |
| health_report = self.generate_health_report() | |
| # Log status | |
| self.log_health_status(health_report) | |
| # Check for alerts | |
| if not health_report["overall_healthy"]: | |
| system = health_report.get("system", {}) | |
| app = health_report.get("application", {}) | |
| alert_reasons = [] | |
| if system.get("memory_percent", 0) >= self.memory_threshold: | |
| alert_reasons.append(f"High memory usage: {system['memory_percent']:.1f}%") | |
| if system.get("disk_percent", 0) >= self.disk_threshold: | |
| alert_reasons.append(f"High disk usage: {system['disk_percent']:.1f}%") | |
| if not app.get("health_endpoint", True): | |
| alert_reasons.append("Health endpoint failing") | |
| if not app.get("citation_fix_working", True): | |
| alert_reasons.append("Citation fix not working") | |
| alert_message = "; ".join(alert_reasons) | |
| self.send_alert(alert_message, health_report) | |
| # Save health report to file | |
| with open("logs/latest_health.json", "w") as f: | |
| json.dump(health_report, f, indent=2) | |
| except Exception as e: | |
| logger.error(f"Error in monitoring loop: {e}") | |
| # Wait for next check | |
| time.sleep(self.check_interval) | |
| def main(): | |
| """Main entry point""" | |
| monitor = HFSpaceHealthMonitor() | |
| try: | |
| monitor.run_monitoring_loop() | |
| except KeyboardInterrupt: | |
| logger.info("Health monitoring stopped by user") | |
| except Exception as e: | |
| logger.error(f"Health monitoring crashed: {e}") | |
| raise | |
| if __name__ == "__main__": | |
| main() | |