import psutil import asyncio import time import logging from collections import deque from threading import Lock from typing import Dict, List, Optional from datetime import datetime logger = logging.getLogger(__name__) try: import numpy as np except Exception: np = None class HealthMonitor: """Real-time system diagnostics with quantum-aware anomaly detection""" def __init__(self, history_size: int = 100): self.metrics = deque(maxlen=history_size) self.anomaly_history = deque(maxlen=50) self.lock = Lock() self.baseline = None self.last_check = None self.quantum_influence = 0.5 self.initialized = False async def initialize(self): """Initialize the health monitor system""" try: # Get initial status to establish baseline initial_status = await self.check_status_async() if np is not None: self.baseline = np.array([ initial_status["memory"], initial_status["cpu"], initial_status["response_time"] ]) else: self.baseline = [ initial_status["memory"], initial_status["cpu"], initial_status["response_time"] ] self.initialized = True logger.info("Health monitor initialized successfully") return True except Exception as e: logger.error(f"Health monitor initialization failed: {e}") return False def check_status(self, consciousness_state: Optional[Dict] = None) -> Dict: """Check system status with quantum consciousness integration - synchronous version""" try: # Get base metrics synchronously status = { "timestamp": datetime.now(), "memory": psutil.virtual_memory().percent, "cpu": psutil.cpu_percent(), "response_time": self._measure_latency_sync(), "quantum_coherence": consciousness_state.get("quantum_state", [0.5])[0] if consciousness_state else 0.5 } # Calculate load score with quantum influence quantum_factor = status["quantum_coherence"] load_score = ( 0.4 * status["memory"] + 0.4 * status["cpu"] + 0.2 * (status["response_time"] * 1000) # Convert to ms ) * (1 + (quantum_factor - 0.5)) # Quantum modification status["load_score"] = min(100, max(0, load_score)) # Thread-safe metrics update with self.lock: self.metrics.append(status) anomaly_score = self._detect_anomalies() status["anomaly_score"] = anomaly_score # Track anomaly if significant if anomaly_score > 0.7: self.anomaly_history.append({ "timestamp": status["timestamp"], "score": anomaly_score, "metrics": status.copy() }) self.last_check = status["timestamp"] return status except Exception as e: logger.error(f"Health check failed: {e}") return { "timestamp": datetime.now(), "status": "error", "error": str(e) } async def check_status_async(self, consciousness_state: Optional[Dict] = None) -> Dict: """Check system status with quantum consciousness integration - async version""" try: # Get base metrics asynchronously status = { "timestamp": datetime.now(), "memory": psutil.virtual_memory().percent, "cpu": psutil.cpu_percent(), "response_time": await self._measure_latency(), "quantum_coherence": consciousness_state.get("quantum_state", [0.5])[0] if consciousness_state else 0.5 } # Calculate load score with quantum influence quantum_factor = status["quantum_coherence"] load_score = ( 0.4 * status["memory"] + 0.4 * status["cpu"] + 0.2 * (status["response_time"] * 1000) # Convert to ms ) * (1 + (quantum_factor - 0.5)) # Quantum modification status["load_score"] = min(100, max(0, load_score)) # Thread-safe metrics update with self.lock: self.metrics.append(status) anomaly_score = self._detect_anomalies() status["anomaly_score"] = anomaly_score # Track anomaly if significant if anomaly_score > 0.7: self.anomaly_history.append({ "timestamp": status["timestamp"], "score": anomaly_score, "metrics": status.copy() }) self.last_check = status["timestamp"] return status except Exception as e: logger.error(f"Health check failed: {e}") return { "timestamp": datetime.now(), "status": "error", "error": str(e) } def _measure_latency_sync(self) -> float: """Measure system response latency - synchronous version""" try: start = time.monotonic() time.sleep(0.1) # Simulated work return time.monotonic() - start except Exception as e: logger.warning(f"Latency measurement failed: {e}") return 0.1 async def _measure_latency(self) -> float: """Measure system response latency - async version""" try: start = time.monotonic() await asyncio.sleep(0.1) # Simulated work return time.monotonic() - start except Exception as e: logger.warning(f"Latency measurement failed: {e}") return 0.1 def _detect_anomalies(self) -> float: """Detect system anomalies using statistical analysis""" try: if len(self.metrics) < 10: return 0.0 # Extract recent metrics if np is not None: recent_data = np.array([ [m["memory"], m["cpu"], m["response_time"]] for m in list(self.metrics)[-10:] ]) else: recent_data = [ [m["memory"], m["cpu"], m["response_time"]] for m in list(self.metrics)[-10:] ] if self.baseline is None: if np is not None: self.baseline = np.mean(recent_data, axis=0) else: # Compute simple mean per column cols = list(zip(*recent_data)) self.baseline = [sum(c)/len(c) for c in cols] return 0.0 if np is not None: deviations = np.abs(recent_data - self.baseline) max_deviation = float(np.max(deviations)) # Update baseline with moving average self.baseline = 0.9 * self.baseline + 0.1 * np.mean(recent_data, axis=0) else: deviations = [[abs(a - b) for a,b in zip(row, self.baseline)] for row in recent_data] max_deviation = float(max(max(row) for row in deviations)) # Update baseline (python moving average) cols = list(zip(*recent_data)) means = [sum(c)/len(c) for c in cols] self.baseline = [0.9*b + 0.1*m for b,m in zip(self.baseline, means)] # Normalize anomaly score to [0,1] return min(1.0, max_deviation / 100.0) except Exception as e: logger.error(f"Anomaly detection failed: {e}") return 0.0 def get_health_summary(self) -> Dict: """Get system health summary""" try: if not self.metrics: return {"status": "initializing"} recent_metrics = list(self.metrics)[-10:] if np is not None: avg_memory = float(np.mean([m["memory"] for m in recent_metrics])) avg_cpu = float(np.mean([m["cpu"] for m in recent_metrics])) avg_latency = float(np.mean([m["response_time"] for m in recent_metrics])) else: avg_memory = float(sum(m["memory"] for m in recent_metrics)/len(recent_metrics)) avg_cpu = float(sum(m["cpu"] for m in recent_metrics)/len(recent_metrics)) avg_latency = float(sum(m["response_time"] for m in recent_metrics)/len(recent_metrics)) return { "status": "healthy" if avg_memory < 80 and avg_cpu < 80 else "stressed", "avg_memory": avg_memory, "avg_cpu": avg_cpu, "avg_latency": avg_latency, "recent_anomalies": len([a for a in self.anomaly_history if (datetime.now() - a["timestamp"]).seconds < 300]), "last_check": self.last_check } except Exception as e: logger.error(f"Health summary generation failed: {e}") return {"status": "error", "error": str(e)}