ai-engineering-project / scripts /hf_health_monitor.py
GitHub Action
Clean deployment without binary files
f884e6e
"""
HuggingFace Space Health Monitor
Continuous monitoring and alerting for HF Spaces
"""
import json
import logging
import os
import time
from datetime import datetime
from typing import Any, Dict
import psutil
import requests
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler("logs/health_monitor.log"), logging.StreamHandler()],
)
logger = logging.getLogger(__name__)
class HFSpaceHealthMonitor:
"""Health monitoring for HuggingFace Spaces"""
def __init__(self):
self.check_interval = int(os.getenv("HEALTH_CHECK_INTERVAL", 60))
self.webhook_url = os.getenv("SLACK_WEBHOOK_URL")
self.space_url = os.getenv("SPACE_URL", "http://localhost:7860")
self.memory_threshold = float(os.getenv("MEMORY_THRESHOLD", 85.0))
self.disk_threshold = float(os.getenv("DISK_THRESHOLD", 85.0))
# Ensure logs directory exists
os.makedirs("logs", exist_ok=True)
logger.info("🚀 HF Space Health Monitor initialized")
logger.info(f" Check interval: {self.check_interval}s")
logger.info(f" Memory threshold: {self.memory_threshold}%")
logger.info(f" Disk threshold: {self.disk_threshold}%")
def check_system_health(self) -> Dict[str, Any]:
"""Check system resource health"""
try:
# Memory usage
memory = psutil.virtual_memory()
memory_percent = memory.percent
# Disk usage
disk = psutil.disk_usage("/")
disk_percent = (disk.used / disk.total) * 100
# CPU usage
cpu_percent = psutil.cpu_percent(interval=1)
return {
"memory_percent": memory_percent,
"memory_available_gb": memory.available / (1024**3),
"disk_percent": disk_percent,
"disk_free_gb": disk.free / (1024**3),
"cpu_percent": cpu_percent,
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
logger.error(f"Error checking system health: {e}")
return {"error": str(e)}
def check_application_health(self) -> Dict[str, Any]:
"""Check application health endpoints"""
try:
# Check main health endpoint
response = requests.get(f"{self.space_url}/health", timeout=10)
health_status = response.status_code == 200
# Check if citation fix is working
citation_test = self.test_citation_fix()
return {
"health_endpoint": health_status,
"status_code": response.status_code,
"response_time_ms": response.elapsed.total_seconds() * 1000,
"citation_fix_working": citation_test,
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
logger.error(f"Error checking application health: {e}")
return {
"health_endpoint": False,
"error": str(e),
"timestamp": datetime.now().isoformat(),
}
def test_citation_fix(self) -> bool:
"""Test that citation fix is working"""
try:
# Quick test of citation formatting
test_payload = {
"message": "What is the remote work policy?",
"test_mode": True,
}
response = requests.post(f"{self.space_url}/chat", json=test_payload, timeout=30)
if response.status_code == 200:
# Check if response contains proper citation format
response_text = response.text
return "[Source:" in response_text and "document_1.md" not in response_text
except Exception as e:
logger.warning(f"Citation test failed: {e}")
return False
def check_hf_services(self) -> Dict[str, Any]:
"""Check HuggingFace service connectivity"""
try:
hf_token = os.getenv("HF_TOKEN")
if not hf_token:
return {"hf_token_configured": False}
# Test HF Inference API
headers = {"Authorization": f"Bearer {hf_token}"}
response = requests.get(
"https://router.huggingface.co/hf-inference/models/intfloat/multilingual-e5-large",
headers=headers,
timeout=10,
)
return {
"hf_token_configured": True,
"hf_api_accessible": response.status_code in [200, 503], # 503 is "loading"
"hf_api_status": response.status_code,
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
logger.error(f"Error checking HF services: {e}")
return {"error": str(e)}
def generate_health_report(self) -> Dict[str, Any]:
"""Generate comprehensive health report"""
system_health = self.check_system_health()
app_health = self.check_application_health()
hf_health = self.check_hf_services()
# Determine overall health status
is_healthy = (
system_health.get("memory_percent", 100) < self.memory_threshold
and system_health.get("disk_percent", 100) < self.disk_threshold
and app_health.get("health_endpoint", False)
and app_health.get("citation_fix_working", False)
)
return {
"overall_healthy": is_healthy,
"system": system_health,
"application": app_health,
"huggingface": hf_health,
"timestamp": datetime.now().isoformat(),
}
def send_alert(self, message: str, health_report: Dict[str, Any]):
"""Send alert notification"""
alert_payload = {
"text": f"🚨 HF Space Alert: {message}",
"timestamp": datetime.now().isoformat(),
"details": health_report,
}
# Log the alert
logger.error(f"ALERT: {message}")
logger.error(f"Health Report: {json.dumps(health_report, indent=2)}")
# Send to webhook if configured
if self.webhook_url:
try:
requests.post(self.webhook_url, json=alert_payload, timeout=10)
logger.info("Alert sent to webhook")
except Exception as e:
logger.error(f"Failed to send webhook alert: {e}")
def log_health_status(self, health_report: Dict[str, Any]):
"""Log current health status"""
system = health_report.get("system", {})
app = health_report.get("application", {})
logger.info(
"Health Status: "
f"Memory={system.get('memory_percent', 'N/A'):.1f}%, "
f"Disk={system.get('disk_percent', 'N/A'):.1f}%, "
f"CPU={system.get('cpu_percent', 'N/A'):.1f}%, "
f"App={app.get('health_endpoint', False)}, "
f"Citations={app.get('citation_fix_working', False)}",
)
def run_monitoring_loop(self):
"""Main monitoring loop"""
logger.info("🔍 Starting health monitoring loop...")
while True:
try:
# Generate health report
health_report = self.generate_health_report()
# Log status
self.log_health_status(health_report)
# Check for alerts
if not health_report["overall_healthy"]:
system = health_report.get("system", {})
app = health_report.get("application", {})
alert_reasons = []
if system.get("memory_percent", 0) >= self.memory_threshold:
alert_reasons.append(f"High memory usage: {system['memory_percent']:.1f}%")
if system.get("disk_percent", 0) >= self.disk_threshold:
alert_reasons.append(f"High disk usage: {system['disk_percent']:.1f}%")
if not app.get("health_endpoint", True):
alert_reasons.append("Health endpoint failing")
if not app.get("citation_fix_working", True):
alert_reasons.append("Citation fix not working")
alert_message = "; ".join(alert_reasons)
self.send_alert(alert_message, health_report)
# Save health report to file
with open("logs/latest_health.json", "w") as f:
json.dump(health_report, f, indent=2)
except Exception as e:
logger.error(f"Error in monitoring loop: {e}")
# Wait for next check
time.sleep(self.check_interval)
def main():
"""Main entry point"""
monitor = HFSpaceHealthMonitor()
try:
monitor.run_monitoring_loop()
except KeyboardInterrupt:
logger.info("Health monitoring stopped by user")
except Exception as e:
logger.error(f"Health monitoring crashed: {e}")
raise
if __name__ == "__main__":
main()