Spaces:
Sleeping
Sleeping
| """ | |
| Enterprise Monitoring Service for Medical AI Platform | |
| Comprehensive monitoring, metrics tracking, and alerting system | |
| Features: | |
| - Real-time performance monitoring | |
| - Error rate tracking with automated alerts | |
| - Latency analysis across pipeline stages | |
| - Resource utilization monitoring | |
| - Model performance tracking | |
| - System health indicators | |
| Author: MiniMax Agent | |
| Date: 2025-10-29 | |
| Version: 1.0.0 | |
| """ | |
| import logging | |
| import time | |
| import hashlib | |
| import json | |
| import pickle | |
| from typing import Dict, List, Any, Optional, Tuple | |
| from datetime import datetime, timedelta | |
| from collections import defaultdict, deque | |
| from dataclasses import dataclass, asdict | |
| from enum import Enum | |
| import asyncio | |
| logger = logging.getLogger(__name__) | |
| class SystemStatus(Enum): | |
| """System operational status levels""" | |
| OPERATIONAL = "operational" | |
| DEGRADED = "degraded" | |
| CRITICAL = "critical" | |
| MAINTENANCE = "maintenance" | |
| class AlertLevel(Enum): | |
| """Alert severity levels""" | |
| INFO = "info" | |
| WARNING = "warning" | |
| ERROR = "error" | |
| CRITICAL = "critical" | |
| class PerformanceMetric: | |
| """Performance metric data structure""" | |
| metric_name: str | |
| value: float | |
| unit: str | |
| timestamp: str | |
| tags: Dict[str, str] | |
| def to_dict(self) -> Dict[str, Any]: | |
| return asdict(self) | |
| class Alert: | |
| """Alert data structure""" | |
| alert_id: str | |
| level: AlertLevel | |
| message: str | |
| category: str | |
| timestamp: str | |
| details: Dict[str, Any] | |
| resolved: bool = False | |
| resolved_at: Optional[str] = None | |
| def to_dict(self) -> Dict[str, Any]: | |
| return { | |
| "alert_id": self.alert_id, | |
| "level": self.level.value, | |
| "message": self.message, | |
| "category": self.category, | |
| "timestamp": self.timestamp, | |
| "details": self.details, | |
| "resolved": self.resolved, | |
| "resolved_at": self.resolved_at | |
| } | |
| class MetricsCollector: | |
| """ | |
| Collects and aggregates performance metrics | |
| Provides time-series data for monitoring and analysis | |
| """ | |
| def __init__(self, retention_hours: int = 24): | |
| self.retention_hours = retention_hours | |
| self.metrics: Dict[str, deque] = defaultdict(lambda: deque(maxlen=10000)) | |
| self.counters: Dict[str, int] = defaultdict(int) | |
| self.gauges: Dict[str, float] = defaultdict(float) | |
| logger.info(f"Metrics Collector initialized (retention: {retention_hours}h)") | |
| def record_metric( | |
| self, | |
| metric_name: str, | |
| value: float, | |
| unit: str = "count", | |
| tags: Optional[Dict[str, str]] = None | |
| ): | |
| """Record a performance metric""" | |
| metric = PerformanceMetric( | |
| metric_name=metric_name, | |
| value=value, | |
| unit=unit, | |
| timestamp=datetime.utcnow().isoformat(), | |
| tags=tags or {} | |
| ) | |
| self.metrics[metric_name].append(metric) | |
| self._cleanup_old_metrics() | |
| def increment_counter(self, counter_name: str, value: int = 1): | |
| """Increment a counter metric""" | |
| self.counters[counter_name] += value | |
| def set_gauge(self, gauge_name: str, value: float): | |
| """Set a gauge metric (current value)""" | |
| self.gauges[gauge_name] = value | |
| def get_metrics( | |
| self, | |
| metric_name: str, | |
| start_time: Optional[datetime] = None, | |
| end_time: Optional[datetime] = None | |
| ) -> List[PerformanceMetric]: | |
| """Retrieve metrics within time range""" | |
| metrics = list(self.metrics.get(metric_name, [])) | |
| if start_time or end_time: | |
| filtered = [] | |
| for metric in metrics: | |
| metric_time = datetime.fromisoformat(metric.timestamp) | |
| if start_time and metric_time < start_time: | |
| continue | |
| if end_time and metric_time > end_time: | |
| continue | |
| filtered.append(metric) | |
| return filtered | |
| return metrics | |
| def get_statistics( | |
| self, | |
| metric_name: str, | |
| window_minutes: int = 60 | |
| ) -> Dict[str, float]: | |
| """Calculate statistics for a metric over time window""" | |
| cutoff = datetime.utcnow() - timedelta(minutes=window_minutes) | |
| metrics = [ | |
| m for m in self.metrics.get(metric_name, []) | |
| if datetime.fromisoformat(m.timestamp) > cutoff | |
| ] | |
| if not metrics: | |
| return { | |
| "count": 0, | |
| "mean": 0.0, | |
| "min": 0.0, | |
| "max": 0.0, | |
| "p50": 0.0, | |
| "p95": 0.0, | |
| "p99": 0.0 | |
| } | |
| values = sorted([m.value for m in metrics]) | |
| count = len(values) | |
| return { | |
| "count": count, | |
| "mean": sum(values) / count, | |
| "min": values[0], | |
| "max": values[-1], | |
| "p50": values[int(count * 0.50)], | |
| "p95": values[int(count * 0.95)] if count > 1 else values[0], | |
| "p99": values[int(count * 0.99)] if count > 1 else values[0] | |
| } | |
| def _cleanup_old_metrics(self): | |
| """Remove metrics older than retention period""" | |
| cutoff = datetime.utcnow() - timedelta(hours=self.retention_hours) | |
| for metric_name in list(self.metrics.keys()): | |
| metrics = self.metrics[metric_name] | |
| # Remove old metrics from front of deque | |
| while metrics and datetime.fromisoformat(metrics[0].timestamp) < cutoff: | |
| metrics.popleft() | |
| def get_counter(self, counter_name: str, default: int = 0) -> int: | |
| """Get value of a specific counter""" | |
| return self.counters.get(counter_name, default) | |
| def get_all_counters(self) -> Dict[str, int]: | |
| """Get all counter values""" | |
| return dict(self.counters) | |
| def get_all_gauges(self) -> Dict[str, float]: | |
| """Get all gauge values""" | |
| return dict(self.gauges) | |
| class ErrorMonitor: | |
| """ | |
| Monitors error rates and triggers alerts | |
| Tracks errors across different categories and stages | |
| """ | |
| def __init__( | |
| self, | |
| error_threshold: float = 0.05, # 5% error rate | |
| window_minutes: int = 15 | |
| ): | |
| self.error_threshold = error_threshold | |
| self.window_minutes = window_minutes | |
| self.errors: deque = deque(maxlen=10000) | |
| self.success_count: deque = deque(maxlen=10000) | |
| self.error_categories: Dict[str, int] = defaultdict(int) | |
| logger.info(f"Error Monitor initialized (threshold: {error_threshold*100}%, window: {window_minutes}m)") | |
| def record_error( | |
| self, | |
| error_type: str, | |
| error_message: str, | |
| stage: str, | |
| details: Optional[Dict[str, Any]] = None | |
| ): | |
| """Record an error occurrence""" | |
| error_record = { | |
| "error_type": error_type, | |
| "error_message": error_message, | |
| "stage": stage, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "details": details or {} | |
| } | |
| self.errors.append(error_record) | |
| self.error_categories[f"{stage}:{error_type}"] += 1 | |
| logger.warning(f"Error recorded: {stage} - {error_type}: {error_message}") | |
| def record_success(self, stage: str): | |
| """Record a successful operation""" | |
| self.success_count.append({ | |
| "stage": stage, | |
| "timestamp": datetime.utcnow().isoformat() | |
| }) | |
| def get_error_rate(self, stage: Optional[str] = None) -> float: | |
| """Calculate error rate within time window""" | |
| cutoff = datetime.utcnow() - timedelta(minutes=self.window_minutes) | |
| # Filter errors within window | |
| recent_errors = [ | |
| e for e in self.errors | |
| if datetime.fromisoformat(e["timestamp"]) > cutoff | |
| ] | |
| # Filter successes within window | |
| recent_successes = [ | |
| s for s in self.success_count | |
| if datetime.fromisoformat(s["timestamp"]) > cutoff | |
| ] | |
| # Filter by stage if specified | |
| if stage: | |
| recent_errors = [e for e in recent_errors if e["stage"] == stage] | |
| recent_successes = [s for s in recent_successes if s["stage"] == stage] | |
| total = len(recent_errors) + len(recent_successes) | |
| if total == 0: | |
| return 0.0 | |
| return len(recent_errors) / total | |
| def check_threshold_exceeded(self, stage: Optional[str] = None) -> bool: | |
| """Check if error rate exceeds threshold""" | |
| error_rate = self.get_error_rate(stage) | |
| return error_rate > self.error_threshold | |
| def get_error_summary(self) -> Dict[str, Any]: | |
| """Get error summary statistics""" | |
| cutoff = datetime.utcnow() - timedelta(minutes=self.window_minutes) | |
| recent_errors = [ | |
| e for e in self.errors | |
| if datetime.fromisoformat(e["timestamp"]) > cutoff | |
| ] | |
| # Count by category | |
| category_counts = defaultdict(int) | |
| stage_counts = defaultdict(int) | |
| for error in recent_errors: | |
| category_counts[error["error_type"]] += 1 | |
| stage_counts[error["stage"]] += 1 | |
| return { | |
| "total_errors": len(recent_errors), | |
| "error_rate": self.get_error_rate(), | |
| "threshold_exceeded": self.check_threshold_exceeded(), | |
| "by_category": dict(category_counts), | |
| "by_stage": dict(stage_counts), | |
| "window_minutes": self.window_minutes | |
| } | |
| class LatencyTracker: | |
| """ | |
| Tracks latency across pipeline stages | |
| Provides detailed timing analysis | |
| """ | |
| def __init__(self): | |
| self.active_traces: Dict[str, Dict[str, float]] = {} | |
| self.completed_traces: deque = deque(maxlen=1000) | |
| logger.info("Latency Tracker initialized") | |
| def start_trace(self, trace_id: str, stage: str): | |
| """Start timing a pipeline stage""" | |
| if trace_id not in self.active_traces: | |
| self.active_traces[trace_id] = {} | |
| self.active_traces[trace_id][f"{stage}_start"] = time.time() | |
| def end_trace(self, trace_id: str, stage: str) -> float: | |
| """End timing a pipeline stage and return duration""" | |
| if trace_id not in self.active_traces: | |
| logger.warning(f"Trace {trace_id} not found") | |
| return 0.0 | |
| start_key = f"{stage}_start" | |
| if start_key not in self.active_traces[trace_id]: | |
| logger.warning(f"Start time for {stage} not found in trace {trace_id}") | |
| return 0.0 | |
| duration = time.time() - self.active_traces[trace_id][start_key] | |
| self.active_traces[trace_id][f"{stage}_duration"] = duration | |
| return duration | |
| def complete_trace(self, trace_id: str) -> Dict[str, float]: | |
| """Mark trace as complete and get timing summary""" | |
| if trace_id not in self.active_traces: | |
| return {} | |
| trace_data = self.active_traces.pop(trace_id) | |
| # Extract durations | |
| durations = { | |
| key.replace("_duration", ""): value | |
| for key, value in trace_data.items() | |
| if key.endswith("_duration") | |
| } | |
| # Calculate total duration | |
| total_duration = sum(durations.values()) | |
| completed_trace = { | |
| "trace_id": trace_id, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "total_duration": total_duration, | |
| "stages": durations | |
| } | |
| self.completed_traces.append(completed_trace) | |
| return durations | |
| def get_stage_statistics( | |
| self, | |
| stage: str, | |
| window_minutes: int = 60 | |
| ) -> Dict[str, float]: | |
| """Get latency statistics for a specific stage""" | |
| cutoff = datetime.utcnow() - timedelta(minutes=window_minutes) | |
| durations = [] | |
| for trace in self.completed_traces: | |
| if datetime.fromisoformat(trace["timestamp"]) < cutoff: | |
| continue | |
| if stage in trace["stages"]: | |
| durations.append(trace["stages"][stage]) | |
| if not durations: | |
| return { | |
| "count": 0, | |
| "mean": 0.0, | |
| "min": 0.0, | |
| "max": 0.0, | |
| "p50": 0.0, | |
| "p95": 0.0, | |
| "p99": 0.0 | |
| } | |
| durations_sorted = sorted(durations) | |
| count = len(durations_sorted) | |
| return { | |
| "count": count, | |
| "mean": sum(durations_sorted) / count, | |
| "min": durations_sorted[0], | |
| "max": durations_sorted[-1], | |
| "p50": durations_sorted[int(count * 0.50)], | |
| "p95": durations_sorted[int(count * 0.95)] if count > 1 else durations_sorted[0], | |
| "p99": durations_sorted[int(count * 0.99)] if count > 1 else durations_sorted[0] | |
| } | |
| class CacheEntry: | |
| """Cache entry with metadata""" | |
| key: str | |
| value: Any | |
| created_at: float | |
| accessed_at: float | |
| access_count: int | |
| size_bytes: int | |
| ttl: Optional[int] = None # Time to live in seconds | |
| def is_expired(self) -> bool: | |
| """Check if entry has expired""" | |
| if self.ttl is None: | |
| return False | |
| return (time.time() - self.created_at) > self.ttl | |
| def to_dict(self) -> Dict[str, Any]: | |
| return { | |
| "key": self.key, | |
| "created_at": datetime.fromtimestamp(self.created_at).isoformat(), | |
| "accessed_at": datetime.fromtimestamp(self.accessed_at).isoformat(), | |
| "access_count": self.access_count, | |
| "size_bytes": self.size_bytes, | |
| "ttl": self.ttl, | |
| "expired": self.is_expired() | |
| } | |
| class CacheService: | |
| """ | |
| SHA256-based caching service for deduplication and performance optimization | |
| Features: | |
| - SHA256 fingerprinting for input deduplication | |
| - LRU eviction policy | |
| - TTL support for automatic expiration | |
| - Cache hit/miss tracking | |
| - Memory usage monitoring | |
| - Performance metrics | |
| """ | |
| def __init__( | |
| self, | |
| max_entries: int = 10000, | |
| max_memory_mb: int = 512, | |
| default_ttl: Optional[int] = 3600 # 1 hour default | |
| ): | |
| self.max_entries = max_entries | |
| self.max_memory_mb = max_memory_mb | |
| self.default_ttl = default_ttl | |
| self.cache: Dict[str, CacheEntry] = {} | |
| self.access_order: deque = deque() # For LRU tracking | |
| # Metrics | |
| self.hits = 0 | |
| self.misses = 0 | |
| self.evictions = 0 | |
| self.total_retrieval_time = 0.0 | |
| self.retrieval_count = 0 | |
| logger.info(f"Cache Service initialized (max_entries: {max_entries}, max_memory: {max_memory_mb}MB)") | |
| def _compute_fingerprint(self, data: Any) -> str: | |
| """ | |
| Compute SHA256 fingerprint for any data | |
| Args: | |
| data: Any serializable data (dict, str, bytes, etc.) | |
| Returns: | |
| SHA256 hash as hex string | |
| """ | |
| if isinstance(data, bytes): | |
| data_bytes = data | |
| elif isinstance(data, str): | |
| data_bytes = data.encode('utf-8') | |
| elif isinstance(data, (dict, list)): | |
| # Serialize to JSON for consistent hashing | |
| json_str = json.dumps(data, sort_keys=True) | |
| data_bytes = json_str.encode('utf-8') | |
| else: | |
| # Use pickle for other types | |
| data_bytes = pickle.dumps(data) | |
| return hashlib.sha256(data_bytes).hexdigest() | |
| def _estimate_size(self, obj: Any) -> int: | |
| """Estimate size of object in bytes""" | |
| try: | |
| return len(pickle.dumps(obj)) | |
| except Exception: | |
| # Fallback estimation | |
| if isinstance(obj, (str, bytes)): | |
| return len(obj) | |
| elif isinstance(obj, dict): | |
| return sum(len(str(k)) + len(str(v)) for k, v in obj.items()) | |
| elif isinstance(obj, list): | |
| return sum(len(str(item)) for item in obj) | |
| else: | |
| return 1024 # Default 1KB estimate | |
| def _get_memory_usage_mb(self) -> float: | |
| """Calculate current memory usage in MB""" | |
| total_bytes = sum(entry.size_bytes for entry in self.cache.values()) | |
| return total_bytes / (1024 * 1024) | |
| def _evict_lru(self): | |
| """Evict least recently used entry""" | |
| if not self.access_order: | |
| return | |
| # Find oldest entry still in cache | |
| while self.access_order: | |
| lru_key = self.access_order.popleft() | |
| if lru_key in self.cache: | |
| del self.cache[lru_key] | |
| self.evictions += 1 | |
| logger.debug(f"Evicted LRU cache entry: {lru_key[:16]}...") | |
| break | |
| def _cleanup_expired(self): | |
| """Remove expired entries""" | |
| expired_keys = [ | |
| key for key, entry in self.cache.items() | |
| if entry.is_expired() | |
| ] | |
| for key in expired_keys: | |
| del self.cache[key] | |
| logger.debug(f"Removed expired cache entry: {key[:16]}...") | |
| def _ensure_capacity(self, new_entry_size: int): | |
| """Ensure cache has capacity for new entry""" | |
| # Check entry count limit | |
| while len(self.cache) >= self.max_entries: | |
| self._evict_lru() | |
| # Check memory limit | |
| while self._get_memory_usage_mb() + (new_entry_size / 1024 / 1024) > self.max_memory_mb: | |
| if len(self.cache) == 0: | |
| break | |
| self._evict_lru() | |
| def get(self, key: str) -> Optional[Any]: | |
| """ | |
| Retrieve value from cache by key | |
| Args: | |
| key: Cache key (typically SHA256 fingerprint) | |
| Returns: | |
| Cached value if found and not expired, None otherwise | |
| """ | |
| start_time = time.time() | |
| # Periodic cleanup | |
| if self.retrieval_count % 100 == 0: | |
| self._cleanup_expired() | |
| if key not in self.cache: | |
| self.misses += 1 | |
| retrieval_time = time.time() - start_time | |
| self.total_retrieval_time += retrieval_time | |
| self.retrieval_count += 1 | |
| return None | |
| entry = self.cache[key] | |
| # Check expiration | |
| if entry.is_expired(): | |
| del self.cache[key] | |
| self.misses += 1 | |
| retrieval_time = time.time() - start_time | |
| self.total_retrieval_time += retrieval_time | |
| self.retrieval_count += 1 | |
| return None | |
| # Update access metadata | |
| entry.accessed_at = time.time() | |
| entry.access_count += 1 | |
| # Update LRU order | |
| if key in self.access_order: | |
| self.access_order.remove(key) | |
| self.access_order.append(key) | |
| self.hits += 1 | |
| retrieval_time = time.time() - start_time | |
| self.total_retrieval_time += retrieval_time | |
| self.retrieval_count += 1 | |
| logger.debug(f"Cache hit: {key[:16]}... (access_count: {entry.access_count})") | |
| return entry.value | |
| def set(self, key: str, value: Any, ttl: Optional[int] = None): | |
| """ | |
| Store value in cache with key | |
| Args: | |
| key: Cache key (typically SHA256 fingerprint) | |
| value: Value to cache | |
| ttl: Time to live in seconds (None for default, 0 for no expiration) | |
| """ | |
| size_bytes = self._estimate_size(value) | |
| # Use default TTL if not specified | |
| if ttl is None: | |
| ttl = self.default_ttl | |
| elif ttl == 0: | |
| ttl = None # No expiration | |
| # Ensure capacity | |
| self._ensure_capacity(size_bytes) | |
| # Create entry | |
| current_time = time.time() | |
| entry = CacheEntry( | |
| key=key, | |
| value=value, | |
| created_at=current_time, | |
| accessed_at=current_time, | |
| access_count=0, | |
| size_bytes=size_bytes, | |
| ttl=ttl | |
| ) | |
| # Store in cache | |
| self.cache[key] = entry | |
| self.access_order.append(key) | |
| logger.debug(f"Cached entry: {key[:16]}... (size: {size_bytes} bytes, ttl: {ttl}s)") | |
| def get_or_compute( | |
| self, | |
| data: Any, | |
| compute_fn: callable, | |
| ttl: Optional[int] = None | |
| ) -> Tuple[Any, bool]: | |
| """ | |
| Get cached value or compute and cache it | |
| Args: | |
| data: Input data to fingerprint | |
| compute_fn: Function to compute value if not cached | |
| ttl: Time to live for cached result | |
| Returns: | |
| Tuple of (result, was_cached) | |
| """ | |
| # Compute fingerprint | |
| fingerprint = self._compute_fingerprint(data) | |
| # Try to get from cache | |
| cached_value = self.get(fingerprint) | |
| if cached_value is not None: | |
| return cached_value, True | |
| # Compute value | |
| result = compute_fn() | |
| # Cache result | |
| self.set(fingerprint, result, ttl) | |
| return result, False | |
| def invalidate(self, key: str) -> bool: | |
| """ | |
| Invalidate (remove) a cache entry | |
| Args: | |
| key: Cache key to invalidate | |
| Returns: | |
| True if entry was removed, False if not found | |
| """ | |
| if key in self.cache: | |
| del self.cache[key] | |
| if key in self.access_order: | |
| self.access_order.remove(key) | |
| logger.debug(f"Invalidated cache entry: {key[:16]}...") | |
| return True | |
| return False | |
| def invalidate_by_fingerprint(self, data: Any) -> bool: | |
| """ | |
| Invalidate cache entry by computing fingerprint of data | |
| Args: | |
| data: Data to fingerprint and invalidate | |
| Returns: | |
| True if entry was removed, False if not found | |
| """ | |
| fingerprint = self._compute_fingerprint(data) | |
| return self.invalidate(fingerprint) | |
| def clear(self): | |
| """Clear all cache entries""" | |
| self.cache.clear() | |
| self.access_order.clear() | |
| logger.info("Cache cleared") | |
| def get_statistics(self) -> Dict[str, Any]: | |
| """Get cache performance statistics""" | |
| total_requests = self.hits + self.misses | |
| hit_rate = self.hits / total_requests if total_requests > 0 else 0.0 | |
| avg_retrieval_time = ( | |
| self.total_retrieval_time / self.retrieval_count | |
| if self.retrieval_count > 0 else 0.0 | |
| ) | |
| return { | |
| "total_entries": len(self.cache), | |
| "hits": self.hits, | |
| "misses": self.misses, | |
| "hit_rate": hit_rate, | |
| "evictions": self.evictions, | |
| "memory_usage_mb": self._get_memory_usage_mb(), | |
| "max_memory_mb": self.max_memory_mb, | |
| "avg_retrieval_time_ms": avg_retrieval_time * 1000, | |
| "cache_efficiency": hit_rate * 100 # Percentage | |
| } | |
| def get_entry_info(self, key: str) -> Optional[Dict[str, Any]]: | |
| """Get information about a specific cache entry""" | |
| if key not in self.cache: | |
| return None | |
| return self.cache[key].to_dict() | |
| def list_entries(self, limit: int = 100) -> List[Dict[str, Any]]: | |
| """List cache entries with metadata""" | |
| entries = sorted( | |
| self.cache.values(), | |
| key=lambda e: e.accessed_at, | |
| reverse=True | |
| )[:limit] | |
| return [entry.to_dict() for entry in entries] | |
| class AlertManager: | |
| """ | |
| Manages alerts and notifications | |
| Handles alert lifecycle and delivery | |
| """ | |
| def __init__(self): | |
| self.active_alerts: Dict[str, Alert] = {} | |
| self.alert_history: deque = deque(maxlen=1000) | |
| self.alert_handlers: List[callable] = [] | |
| logger.info("Alert Manager initialized") | |
| def create_alert( | |
| self, | |
| level: AlertLevel, | |
| message: str, | |
| category: str, | |
| details: Optional[Dict[str, Any]] = None | |
| ) -> Alert: | |
| """Create a new alert""" | |
| alert_id = hashlib.sha256( | |
| f"{category}:{message}:{datetime.utcnow().isoformat()}".encode() | |
| ).hexdigest()[:16] | |
| alert = Alert( | |
| alert_id=alert_id, | |
| level=level, | |
| message=message, | |
| category=category, | |
| timestamp=datetime.utcnow().isoformat(), | |
| details=details or {} | |
| ) | |
| self.active_alerts[alert_id] = alert | |
| self.alert_history.append(alert) | |
| # Trigger alert handlers | |
| asyncio.create_task(self._trigger_handlers(alert)) | |
| logger.warning(f"Alert created: [{level.value}] {category} - {message}") | |
| return alert | |
| def resolve_alert(self, alert_id: str): | |
| """Resolve an active alert""" | |
| if alert_id in self.active_alerts: | |
| alert = self.active_alerts.pop(alert_id) | |
| alert.resolved = True | |
| alert.resolved_at = datetime.utcnow().isoformat() | |
| logger.info(f"Alert resolved: {alert_id}") | |
| def add_handler(self, handler: callable): | |
| """Add an alert handler function""" | |
| self.alert_handlers.append(handler) | |
| async def _trigger_handlers(self, alert: Alert): | |
| """Trigger all registered alert handlers""" | |
| for handler in self.alert_handlers: | |
| try: | |
| if asyncio.iscoroutinefunction(handler): | |
| await handler(alert) | |
| else: | |
| handler(alert) | |
| except Exception as e: | |
| logger.error(f"Alert handler failed: {str(e)}") | |
| def get_active_alerts( | |
| self, | |
| level: Optional[AlertLevel] = None, | |
| category: Optional[str] = None | |
| ) -> List[Alert]: | |
| """Get active alerts with optional filtering""" | |
| alerts = list(self.active_alerts.values()) | |
| if level: | |
| alerts = [a for a in alerts if a.level == level] | |
| if category: | |
| alerts = [a for a in alerts if a.category == category] | |
| return alerts | |
| def get_alert_summary(self) -> Dict[str, Any]: | |
| """Get summary of alert status""" | |
| active = list(self.active_alerts.values()) | |
| by_level = defaultdict(int) | |
| by_category = defaultdict(int) | |
| for alert in active: | |
| by_level[alert.level.value] += 1 | |
| by_category[alert.category] += 1 | |
| return { | |
| "total_active": len(active), | |
| "by_level": dict(by_level), | |
| "by_category": dict(by_category), | |
| "critical_count": by_level[AlertLevel.CRITICAL.value], | |
| "error_count": by_level[AlertLevel.ERROR.value] | |
| } | |
| class MonitoringService: | |
| """ | |
| Central monitoring service coordinating all monitoring components | |
| Provides unified interface for system monitoring and health checks | |
| """ | |
| def __init__( | |
| self, | |
| error_threshold: float = 0.05, | |
| window_minutes: int = 15 | |
| ): | |
| self.metrics_collector = MetricsCollector() | |
| self.error_monitor = ErrorMonitor(error_threshold, window_minutes) | |
| self.latency_tracker = LatencyTracker() | |
| self.alert_manager = AlertManager() | |
| self.cache_service = CacheService( | |
| max_entries=10000, | |
| max_memory_mb=512, | |
| default_ttl=3600 # 1 hour default | |
| ) | |
| self.system_status = SystemStatus.OPERATIONAL | |
| self.start_time = datetime.utcnow() | |
| # Setup automatic monitoring (skip background tasks for now) | |
| # self._setup_automatic_checks() | |
| logger.info("Monitoring Service initialized") | |
| def _setup_automatic_checks(self): | |
| """Setup automatic health checks and alerts""" | |
| async def check_error_rate(): | |
| """Periodically check error rate and create alerts""" | |
| while True: | |
| try: | |
| error_summary = self.error_monitor.get_error_summary() | |
| if error_summary["threshold_exceeded"]: | |
| self.alert_manager.create_alert( | |
| level=AlertLevel.ERROR, | |
| message=f"Error rate ({error_summary['error_rate']*100:.1f}%) exceeds threshold", | |
| category="error_rate", | |
| details=error_summary | |
| ) | |
| await asyncio.sleep(60) # Check every minute | |
| except Exception as e: | |
| logger.error(f"Error rate check failed: {str(e)}") | |
| await asyncio.sleep(60) | |
| # Start background task | |
| asyncio.create_task(check_error_rate()) | |
| def record_processing_stage( | |
| self, | |
| trace_id: str, | |
| stage: str, | |
| success: bool, | |
| duration: Optional[float] = None, | |
| error_details: Optional[Dict[str, Any]] = None | |
| ): | |
| """Record completion of a processing stage""" | |
| # Record success/error | |
| if success: | |
| self.error_monitor.record_success(stage) | |
| else: | |
| error_type = error_details.get("error_type", "unknown") if error_details else "unknown" | |
| error_message = error_details.get("message", "No details") if error_details else "No details" | |
| self.error_monitor.record_error(error_type, error_message, stage, error_details) | |
| # Record latency | |
| if duration is not None: | |
| self.metrics_collector.record_metric( | |
| f"latency_{stage}", | |
| duration, | |
| unit="seconds", | |
| tags={"stage": stage, "success": str(success)} | |
| ) | |
| # Increment counters | |
| self.metrics_collector.increment_counter(f"stage_{stage}_total") | |
| if success: | |
| self.metrics_collector.increment_counter(f"stage_{stage}_success") | |
| else: | |
| self.metrics_collector.increment_counter(f"stage_{stage}_error") | |
| def get_system_health(self) -> Dict[str, Any]: | |
| """Get comprehensive system health status""" | |
| error_summary = self.error_monitor.get_error_summary() | |
| alert_summary = self.alert_manager.get_alert_summary() | |
| # Determine system status | |
| if alert_summary["critical_count"] > 0: | |
| status = SystemStatus.CRITICAL | |
| elif error_summary["threshold_exceeded"] or alert_summary["error_count"] > 5: | |
| status = SystemStatus.DEGRADED | |
| else: | |
| status = SystemStatus.OPERATIONAL | |
| self.system_status = status | |
| uptime = (datetime.utcnow() - self.start_time).total_seconds() | |
| return { | |
| "status": status.value, | |
| "uptime_seconds": uptime, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "error_rate": error_summary["error_rate"], | |
| "error_threshold": self.error_monitor.error_threshold, | |
| "active_alerts": alert_summary["total_active"], | |
| "critical_alerts": alert_summary["critical_count"], | |
| "total_requests": self.metrics_collector.get_counter("total_requests", 0), | |
| "counters": self.metrics_collector.get_all_counters(), | |
| "gauges": self.metrics_collector.get_all_gauges() | |
| } | |
| def get_performance_dashboard(self) -> Dict[str, Any]: | |
| """Get performance metrics for dashboard display""" | |
| # Define key stages | |
| stages = ["pdf_processing", "classification", "model_routing", "synthesis"] | |
| stage_stats = {} | |
| for stage in stages: | |
| stage_stats[stage] = self.latency_tracker.get_stage_statistics(stage) | |
| return { | |
| "system_health": self.get_system_health(), | |
| "error_summary": self.error_monitor.get_error_summary(), | |
| "latency_by_stage": stage_stats, | |
| "active_alerts": [a.to_dict() for a in self.alert_manager.get_active_alerts()], | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| def start_monitoring(self): | |
| """Start monitoring services (placeholder for initialization)""" | |
| logger.info("Monitoring services started") | |
| self.system_status = SystemStatus.OPERATIONAL | |
| def track_request(self, endpoint: str, latency_ms: float, status_code: int): | |
| """Track incoming request for monitoring""" | |
| # Record latency metric | |
| self.metrics_collector.record_metric( | |
| f"request_latency_{endpoint}", | |
| latency_ms, | |
| unit="milliseconds", | |
| tags={"endpoint": endpoint, "status_code": str(status_code)} | |
| ) | |
| # Increment request counter | |
| self.metrics_collector.increment_counter("total_requests") | |
| self.metrics_collector.increment_counter(f"requests_{endpoint}") | |
| # Track status code | |
| if status_code >= 500: | |
| self.metrics_collector.increment_counter("server_errors") | |
| elif status_code >= 400: | |
| self.metrics_collector.increment_counter("client_errors") | |
| else: | |
| self.metrics_collector.increment_counter("successful_requests") | |
| def track_error(self, endpoint: str, error_type: str, error_message: str): | |
| """Track error occurrence""" | |
| self.error_monitor.record_error( | |
| error_type=error_type, | |
| message=error_message, | |
| component=endpoint, | |
| details={"endpoint": endpoint} | |
| ) | |
| # Increment error counter | |
| self.metrics_collector.increment_counter("total_errors") | |
| self.metrics_collector.increment_counter(f"errors_{error_type}") | |
| def get_cache_statistics(self) -> Dict[str, Any]: | |
| """Get cache performance statistics from real cache service""" | |
| return self.cache_service.get_statistics() | |
| def cache_result(self, data: Any, result: Any, ttl: Optional[int] = None): | |
| """ | |
| Cache a computation result with SHA256 fingerprint | |
| Args: | |
| data: Input data to fingerprint | |
| result: Result to cache | |
| ttl: Time to live in seconds | |
| """ | |
| fingerprint = self.cache_service._compute_fingerprint(data) | |
| self.cache_service.set(fingerprint, result, ttl) | |
| logger.debug(f"Cached result for fingerprint: {fingerprint[:16]}...") | |
| def get_cached_result(self, data: Any) -> Optional[Any]: | |
| """ | |
| Retrieve cached result by computing fingerprint | |
| Args: | |
| data: Input data to fingerprint | |
| Returns: | |
| Cached result if found, None otherwise | |
| """ | |
| fingerprint = self.cache_service._compute_fingerprint(data) | |
| return self.cache_service.get(fingerprint) | |
| def get_or_compute_cached( | |
| self, | |
| data: Any, | |
| compute_fn: callable, | |
| ttl: Optional[int] = None | |
| ) -> Tuple[Any, bool]: | |
| """ | |
| Get cached result or compute and cache it | |
| Args: | |
| data: Input data to fingerprint | |
| compute_fn: Function to compute result if not cached | |
| ttl: Time to live for cached result | |
| Returns: | |
| Tuple of (result, was_cached) | |
| """ | |
| return self.cache_service.get_or_compute(data, compute_fn, ttl) | |
| def get_recent_alerts(self, limit: int = 10) -> List[Dict[str, Any]]: | |
| """Get recent alerts""" | |
| alerts = self.alert_manager.get_active_alerts() | |
| recent = sorted(alerts, key=lambda a: a.timestamp, reverse=True)[:limit] | |
| return [a.to_dict() for a in recent] | |
| # Global monitoring service instance | |
| _monitoring_service = None | |
| def get_monitoring_service() -> MonitoringService: | |
| """Get singleton monitoring service instance""" | |
| global _monitoring_service | |
| if _monitoring_service is None: | |
| _monitoring_service = MonitoringService() | |
| return _monitoring_service | |