""" Enterprise Monitoring Service for Medical AI Platform Comprehensive monitoring, metrics tracking, and alerting system Features: - Real-time performance monitoring - Error rate tracking with automated alerts - Latency analysis across pipeline stages - Resource utilization monitoring - Model performance tracking - System health indicators Author: MiniMax Agent Date: 2025-10-29 Version: 1.0.0 """ import logging import time import hashlib import json import pickle from typing import Dict, List, Any, Optional, Tuple from datetime import datetime, timedelta from collections import defaultdict, deque from dataclasses import dataclass, asdict from enum import Enum import asyncio logger = logging.getLogger(__name__) class SystemStatus(Enum): """System operational status levels""" OPERATIONAL = "operational" DEGRADED = "degraded" CRITICAL = "critical" MAINTENANCE = "maintenance" class AlertLevel(Enum): """Alert severity levels""" INFO = "info" WARNING = "warning" ERROR = "error" CRITICAL = "critical" @dataclass class PerformanceMetric: """Performance metric data structure""" metric_name: str value: float unit: str timestamp: str tags: Dict[str, str] def to_dict(self) -> Dict[str, Any]: return asdict(self) @dataclass class Alert: """Alert data structure""" alert_id: str level: AlertLevel message: str category: str timestamp: str details: Dict[str, Any] resolved: bool = False resolved_at: Optional[str] = None def to_dict(self) -> Dict[str, Any]: return { "alert_id": self.alert_id, "level": self.level.value, "message": self.message, "category": self.category, "timestamp": self.timestamp, "details": self.details, "resolved": self.resolved, "resolved_at": self.resolved_at } class MetricsCollector: """ Collects and aggregates performance metrics Provides time-series data for monitoring and analysis """ def __init__(self, retention_hours: int = 24): self.retention_hours = retention_hours self.metrics: Dict[str, deque] = defaultdict(lambda: deque(maxlen=10000)) self.counters: Dict[str, int] = defaultdict(int) self.gauges: Dict[str, float] = defaultdict(float) logger.info(f"Metrics Collector initialized (retention: {retention_hours}h)") def record_metric( self, metric_name: str, value: float, unit: str = "count", tags: Optional[Dict[str, str]] = None ): """Record a performance metric""" metric = PerformanceMetric( metric_name=metric_name, value=value, unit=unit, timestamp=datetime.utcnow().isoformat(), tags=tags or {} ) self.metrics[metric_name].append(metric) self._cleanup_old_metrics() def increment_counter(self, counter_name: str, value: int = 1): """Increment a counter metric""" self.counters[counter_name] += value def set_gauge(self, gauge_name: str, value: float): """Set a gauge metric (current value)""" self.gauges[gauge_name] = value def get_metrics( self, metric_name: str, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None ) -> List[PerformanceMetric]: """Retrieve metrics within time range""" metrics = list(self.metrics.get(metric_name, [])) if start_time or end_time: filtered = [] for metric in metrics: metric_time = datetime.fromisoformat(metric.timestamp) if start_time and metric_time < start_time: continue if end_time and metric_time > end_time: continue filtered.append(metric) return filtered return metrics def get_statistics( self, metric_name: str, window_minutes: int = 60 ) -> Dict[str, float]: """Calculate statistics for a metric over time window""" cutoff = datetime.utcnow() - timedelta(minutes=window_minutes) metrics = [ m for m in self.metrics.get(metric_name, []) if datetime.fromisoformat(m.timestamp) > cutoff ] if not metrics: return { "count": 0, "mean": 0.0, "min": 0.0, "max": 0.0, "p50": 0.0, "p95": 0.0, "p99": 0.0 } values = sorted([m.value for m in metrics]) count = len(values) return { "count": count, "mean": sum(values) / count, "min": values[0], "max": values[-1], "p50": values[int(count * 0.50)], "p95": values[int(count * 0.95)] if count > 1 else values[0], "p99": values[int(count * 0.99)] if count > 1 else values[0] } def _cleanup_old_metrics(self): """Remove metrics older than retention period""" cutoff = datetime.utcnow() - timedelta(hours=self.retention_hours) for metric_name in list(self.metrics.keys()): metrics = self.metrics[metric_name] # Remove old metrics from front of deque while metrics and datetime.fromisoformat(metrics[0].timestamp) < cutoff: metrics.popleft() def get_counter(self, counter_name: str, default: int = 0) -> int: """Get value of a specific counter""" return self.counters.get(counter_name, default) def get_all_counters(self) -> Dict[str, int]: """Get all counter values""" return dict(self.counters) def get_all_gauges(self) -> Dict[str, float]: """Get all gauge values""" return dict(self.gauges) class ErrorMonitor: """ Monitors error rates and triggers alerts Tracks errors across different categories and stages """ def __init__( self, error_threshold: float = 0.05, # 5% error rate window_minutes: int = 15 ): self.error_threshold = error_threshold self.window_minutes = window_minutes self.errors: deque = deque(maxlen=10000) self.success_count: deque = deque(maxlen=10000) self.error_categories: Dict[str, int] = defaultdict(int) logger.info(f"Error Monitor initialized (threshold: {error_threshold*100}%, window: {window_minutes}m)") def record_error( self, error_type: str, error_message: str, stage: str, details: Optional[Dict[str, Any]] = None ): """Record an error occurrence""" error_record = { "error_type": error_type, "error_message": error_message, "stage": stage, "timestamp": datetime.utcnow().isoformat(), "details": details or {} } self.errors.append(error_record) self.error_categories[f"{stage}:{error_type}"] += 1 logger.warning(f"Error recorded: {stage} - {error_type}: {error_message}") def record_success(self, stage: str): """Record a successful operation""" self.success_count.append({ "stage": stage, "timestamp": datetime.utcnow().isoformat() }) def get_error_rate(self, stage: Optional[str] = None) -> float: """Calculate error rate within time window""" cutoff = datetime.utcnow() - timedelta(minutes=self.window_minutes) # Filter errors within window recent_errors = [ e for e in self.errors if datetime.fromisoformat(e["timestamp"]) > cutoff ] # Filter successes within window recent_successes = [ s for s in self.success_count if datetime.fromisoformat(s["timestamp"]) > cutoff ] # Filter by stage if specified if stage: recent_errors = [e for e in recent_errors if e["stage"] == stage] recent_successes = [s for s in recent_successes if s["stage"] == stage] total = len(recent_errors) + len(recent_successes) if total == 0: return 0.0 return len(recent_errors) / total def check_threshold_exceeded(self, stage: Optional[str] = None) -> bool: """Check if error rate exceeds threshold""" error_rate = self.get_error_rate(stage) return error_rate > self.error_threshold def get_error_summary(self) -> Dict[str, Any]: """Get error summary statistics""" cutoff = datetime.utcnow() - timedelta(minutes=self.window_minutes) recent_errors = [ e for e in self.errors if datetime.fromisoformat(e["timestamp"]) > cutoff ] # Count by category category_counts = defaultdict(int) stage_counts = defaultdict(int) for error in recent_errors: category_counts[error["error_type"]] += 1 stage_counts[error["stage"]] += 1 return { "total_errors": len(recent_errors), "error_rate": self.get_error_rate(), "threshold_exceeded": self.check_threshold_exceeded(), "by_category": dict(category_counts), "by_stage": dict(stage_counts), "window_minutes": self.window_minutes } class LatencyTracker: """ Tracks latency across pipeline stages Provides detailed timing analysis """ def __init__(self): self.active_traces: Dict[str, Dict[str, float]] = {} self.completed_traces: deque = deque(maxlen=1000) logger.info("Latency Tracker initialized") def start_trace(self, trace_id: str, stage: str): """Start timing a pipeline stage""" if trace_id not in self.active_traces: self.active_traces[trace_id] = {} self.active_traces[trace_id][f"{stage}_start"] = time.time() def end_trace(self, trace_id: str, stage: str) -> float: """End timing a pipeline stage and return duration""" if trace_id not in self.active_traces: logger.warning(f"Trace {trace_id} not found") return 0.0 start_key = f"{stage}_start" if start_key not in self.active_traces[trace_id]: logger.warning(f"Start time for {stage} not found in trace {trace_id}") return 0.0 duration = time.time() - self.active_traces[trace_id][start_key] self.active_traces[trace_id][f"{stage}_duration"] = duration return duration def complete_trace(self, trace_id: str) -> Dict[str, float]: """Mark trace as complete and get timing summary""" if trace_id not in self.active_traces: return {} trace_data = self.active_traces.pop(trace_id) # Extract durations durations = { key.replace("_duration", ""): value for key, value in trace_data.items() if key.endswith("_duration") } # Calculate total duration total_duration = sum(durations.values()) completed_trace = { "trace_id": trace_id, "timestamp": datetime.utcnow().isoformat(), "total_duration": total_duration, "stages": durations } self.completed_traces.append(completed_trace) return durations def get_stage_statistics( self, stage: str, window_minutes: int = 60 ) -> Dict[str, float]: """Get latency statistics for a specific stage""" cutoff = datetime.utcnow() - timedelta(minutes=window_minutes) durations = [] for trace in self.completed_traces: if datetime.fromisoformat(trace["timestamp"]) < cutoff: continue if stage in trace["stages"]: durations.append(trace["stages"][stage]) if not durations: return { "count": 0, "mean": 0.0, "min": 0.0, "max": 0.0, "p50": 0.0, "p95": 0.0, "p99": 0.0 } durations_sorted = sorted(durations) count = len(durations_sorted) return { "count": count, "mean": sum(durations_sorted) / count, "min": durations_sorted[0], "max": durations_sorted[-1], "p50": durations_sorted[int(count * 0.50)], "p95": durations_sorted[int(count * 0.95)] if count > 1 else durations_sorted[0], "p99": durations_sorted[int(count * 0.99)] if count > 1 else durations_sorted[0] } @dataclass class CacheEntry: """Cache entry with metadata""" key: str value: Any created_at: float accessed_at: float access_count: int size_bytes: int ttl: Optional[int] = None # Time to live in seconds def is_expired(self) -> bool: """Check if entry has expired""" if self.ttl is None: return False return (time.time() - self.created_at) > self.ttl def to_dict(self) -> Dict[str, Any]: return { "key": self.key, "created_at": datetime.fromtimestamp(self.created_at).isoformat(), "accessed_at": datetime.fromtimestamp(self.accessed_at).isoformat(), "access_count": self.access_count, "size_bytes": self.size_bytes, "ttl": self.ttl, "expired": self.is_expired() } class CacheService: """ SHA256-based caching service for deduplication and performance optimization Features: - SHA256 fingerprinting for input deduplication - LRU eviction policy - TTL support for automatic expiration - Cache hit/miss tracking - Memory usage monitoring - Performance metrics """ def __init__( self, max_entries: int = 10000, max_memory_mb: int = 512, default_ttl: Optional[int] = 3600 # 1 hour default ): self.max_entries = max_entries self.max_memory_mb = max_memory_mb self.default_ttl = default_ttl self.cache: Dict[str, CacheEntry] = {} self.access_order: deque = deque() # For LRU tracking # Metrics self.hits = 0 self.misses = 0 self.evictions = 0 self.total_retrieval_time = 0.0 self.retrieval_count = 0 logger.info(f"Cache Service initialized (max_entries: {max_entries}, max_memory: {max_memory_mb}MB)") def _compute_fingerprint(self, data: Any) -> str: """ Compute SHA256 fingerprint for any data Args: data: Any serializable data (dict, str, bytes, etc.) Returns: SHA256 hash as hex string """ if isinstance(data, bytes): data_bytes = data elif isinstance(data, str): data_bytes = data.encode('utf-8') elif isinstance(data, (dict, list)): # Serialize to JSON for consistent hashing json_str = json.dumps(data, sort_keys=True) data_bytes = json_str.encode('utf-8') else: # Use pickle for other types data_bytes = pickle.dumps(data) return hashlib.sha256(data_bytes).hexdigest() def _estimate_size(self, obj: Any) -> int: """Estimate size of object in bytes""" try: return len(pickle.dumps(obj)) except Exception: # Fallback estimation if isinstance(obj, (str, bytes)): return len(obj) elif isinstance(obj, dict): return sum(len(str(k)) + len(str(v)) for k, v in obj.items()) elif isinstance(obj, list): return sum(len(str(item)) for item in obj) else: return 1024 # Default 1KB estimate def _get_memory_usage_mb(self) -> float: """Calculate current memory usage in MB""" total_bytes = sum(entry.size_bytes for entry in self.cache.values()) return total_bytes / (1024 * 1024) def _evict_lru(self): """Evict least recently used entry""" if not self.access_order: return # Find oldest entry still in cache while self.access_order: lru_key = self.access_order.popleft() if lru_key in self.cache: del self.cache[lru_key] self.evictions += 1 logger.debug(f"Evicted LRU cache entry: {lru_key[:16]}...") break def _cleanup_expired(self): """Remove expired entries""" expired_keys = [ key for key, entry in self.cache.items() if entry.is_expired() ] for key in expired_keys: del self.cache[key] logger.debug(f"Removed expired cache entry: {key[:16]}...") def _ensure_capacity(self, new_entry_size: int): """Ensure cache has capacity for new entry""" # Check entry count limit while len(self.cache) >= self.max_entries: self._evict_lru() # Check memory limit while self._get_memory_usage_mb() + (new_entry_size / 1024 / 1024) > self.max_memory_mb: if len(self.cache) == 0: break self._evict_lru() def get(self, key: str) -> Optional[Any]: """ Retrieve value from cache by key Args: key: Cache key (typically SHA256 fingerprint) Returns: Cached value if found and not expired, None otherwise """ start_time = time.time() # Periodic cleanup if self.retrieval_count % 100 == 0: self._cleanup_expired() if key not in self.cache: self.misses += 1 retrieval_time = time.time() - start_time self.total_retrieval_time += retrieval_time self.retrieval_count += 1 return None entry = self.cache[key] # Check expiration if entry.is_expired(): del self.cache[key] self.misses += 1 retrieval_time = time.time() - start_time self.total_retrieval_time += retrieval_time self.retrieval_count += 1 return None # Update access metadata entry.accessed_at = time.time() entry.access_count += 1 # Update LRU order if key in self.access_order: self.access_order.remove(key) self.access_order.append(key) self.hits += 1 retrieval_time = time.time() - start_time self.total_retrieval_time += retrieval_time self.retrieval_count += 1 logger.debug(f"Cache hit: {key[:16]}... (access_count: {entry.access_count})") return entry.value def set(self, key: str, value: Any, ttl: Optional[int] = None): """ Store value in cache with key Args: key: Cache key (typically SHA256 fingerprint) value: Value to cache ttl: Time to live in seconds (None for default, 0 for no expiration) """ size_bytes = self._estimate_size(value) # Use default TTL if not specified if ttl is None: ttl = self.default_ttl elif ttl == 0: ttl = None # No expiration # Ensure capacity self._ensure_capacity(size_bytes) # Create entry current_time = time.time() entry = CacheEntry( key=key, value=value, created_at=current_time, accessed_at=current_time, access_count=0, size_bytes=size_bytes, ttl=ttl ) # Store in cache self.cache[key] = entry self.access_order.append(key) logger.debug(f"Cached entry: {key[:16]}... (size: {size_bytes} bytes, ttl: {ttl}s)") def get_or_compute( self, data: Any, compute_fn: callable, ttl: Optional[int] = None ) -> Tuple[Any, bool]: """ Get cached value or compute and cache it Args: data: Input data to fingerprint compute_fn: Function to compute value if not cached ttl: Time to live for cached result Returns: Tuple of (result, was_cached) """ # Compute fingerprint fingerprint = self._compute_fingerprint(data) # Try to get from cache cached_value = self.get(fingerprint) if cached_value is not None: return cached_value, True # Compute value result = compute_fn() # Cache result self.set(fingerprint, result, ttl) return result, False def invalidate(self, key: str) -> bool: """ Invalidate (remove) a cache entry Args: key: Cache key to invalidate Returns: True if entry was removed, False if not found """ if key in self.cache: del self.cache[key] if key in self.access_order: self.access_order.remove(key) logger.debug(f"Invalidated cache entry: {key[:16]}...") return True return False def invalidate_by_fingerprint(self, data: Any) -> bool: """ Invalidate cache entry by computing fingerprint of data Args: data: Data to fingerprint and invalidate Returns: True if entry was removed, False if not found """ fingerprint = self._compute_fingerprint(data) return self.invalidate(fingerprint) def clear(self): """Clear all cache entries""" self.cache.clear() self.access_order.clear() logger.info("Cache cleared") def get_statistics(self) -> Dict[str, Any]: """Get cache performance statistics""" total_requests = self.hits + self.misses hit_rate = self.hits / total_requests if total_requests > 0 else 0.0 avg_retrieval_time = ( self.total_retrieval_time / self.retrieval_count if self.retrieval_count > 0 else 0.0 ) return { "total_entries": len(self.cache), "hits": self.hits, "misses": self.misses, "hit_rate": hit_rate, "evictions": self.evictions, "memory_usage_mb": self._get_memory_usage_mb(), "max_memory_mb": self.max_memory_mb, "avg_retrieval_time_ms": avg_retrieval_time * 1000, "cache_efficiency": hit_rate * 100 # Percentage } def get_entry_info(self, key: str) -> Optional[Dict[str, Any]]: """Get information about a specific cache entry""" if key not in self.cache: return None return self.cache[key].to_dict() def list_entries(self, limit: int = 100) -> List[Dict[str, Any]]: """List cache entries with metadata""" entries = sorted( self.cache.values(), key=lambda e: e.accessed_at, reverse=True )[:limit] return [entry.to_dict() for entry in entries] class AlertManager: """ Manages alerts and notifications Handles alert lifecycle and delivery """ def __init__(self): self.active_alerts: Dict[str, Alert] = {} self.alert_history: deque = deque(maxlen=1000) self.alert_handlers: List[callable] = [] logger.info("Alert Manager initialized") def create_alert( self, level: AlertLevel, message: str, category: str, details: Optional[Dict[str, Any]] = None ) -> Alert: """Create a new alert""" alert_id = hashlib.sha256( f"{category}:{message}:{datetime.utcnow().isoformat()}".encode() ).hexdigest()[:16] alert = Alert( alert_id=alert_id, level=level, message=message, category=category, timestamp=datetime.utcnow().isoformat(), details=details or {} ) self.active_alerts[alert_id] = alert self.alert_history.append(alert) # Trigger alert handlers asyncio.create_task(self._trigger_handlers(alert)) logger.warning(f"Alert created: [{level.value}] {category} - {message}") return alert def resolve_alert(self, alert_id: str): """Resolve an active alert""" if alert_id in self.active_alerts: alert = self.active_alerts.pop(alert_id) alert.resolved = True alert.resolved_at = datetime.utcnow().isoformat() logger.info(f"Alert resolved: {alert_id}") def add_handler(self, handler: callable): """Add an alert handler function""" self.alert_handlers.append(handler) async def _trigger_handlers(self, alert: Alert): """Trigger all registered alert handlers""" for handler in self.alert_handlers: try: if asyncio.iscoroutinefunction(handler): await handler(alert) else: handler(alert) except Exception as e: logger.error(f"Alert handler failed: {str(e)}") def get_active_alerts( self, level: Optional[AlertLevel] = None, category: Optional[str] = None ) -> List[Alert]: """Get active alerts with optional filtering""" alerts = list(self.active_alerts.values()) if level: alerts = [a for a in alerts if a.level == level] if category: alerts = [a for a in alerts if a.category == category] return alerts def get_alert_summary(self) -> Dict[str, Any]: """Get summary of alert status""" active = list(self.active_alerts.values()) by_level = defaultdict(int) by_category = defaultdict(int) for alert in active: by_level[alert.level.value] += 1 by_category[alert.category] += 1 return { "total_active": len(active), "by_level": dict(by_level), "by_category": dict(by_category), "critical_count": by_level[AlertLevel.CRITICAL.value], "error_count": by_level[AlertLevel.ERROR.value] } class MonitoringService: """ Central monitoring service coordinating all monitoring components Provides unified interface for system monitoring and health checks """ def __init__( self, error_threshold: float = 0.05, window_minutes: int = 15 ): self.metrics_collector = MetricsCollector() self.error_monitor = ErrorMonitor(error_threshold, window_minutes) self.latency_tracker = LatencyTracker() self.alert_manager = AlertManager() self.cache_service = CacheService( max_entries=10000, max_memory_mb=512, default_ttl=3600 # 1 hour default ) self.system_status = SystemStatus.OPERATIONAL self.start_time = datetime.utcnow() # Setup automatic monitoring (skip background tasks for now) # self._setup_automatic_checks() logger.info("Monitoring Service initialized") def _setup_automatic_checks(self): """Setup automatic health checks and alerts""" async def check_error_rate(): """Periodically check error rate and create alerts""" while True: try: error_summary = self.error_monitor.get_error_summary() if error_summary["threshold_exceeded"]: self.alert_manager.create_alert( level=AlertLevel.ERROR, message=f"Error rate ({error_summary['error_rate']*100:.1f}%) exceeds threshold", category="error_rate", details=error_summary ) await asyncio.sleep(60) # Check every minute except Exception as e: logger.error(f"Error rate check failed: {str(e)}") await asyncio.sleep(60) # Start background task asyncio.create_task(check_error_rate()) def record_processing_stage( self, trace_id: str, stage: str, success: bool, duration: Optional[float] = None, error_details: Optional[Dict[str, Any]] = None ): """Record completion of a processing stage""" # Record success/error if success: self.error_monitor.record_success(stage) else: error_type = error_details.get("error_type", "unknown") if error_details else "unknown" error_message = error_details.get("message", "No details") if error_details else "No details" self.error_monitor.record_error(error_type, error_message, stage, error_details) # Record latency if duration is not None: self.metrics_collector.record_metric( f"latency_{stage}", duration, unit="seconds", tags={"stage": stage, "success": str(success)} ) # Increment counters self.metrics_collector.increment_counter(f"stage_{stage}_total") if success: self.metrics_collector.increment_counter(f"stage_{stage}_success") else: self.metrics_collector.increment_counter(f"stage_{stage}_error") def get_system_health(self) -> Dict[str, Any]: """Get comprehensive system health status""" error_summary = self.error_monitor.get_error_summary() alert_summary = self.alert_manager.get_alert_summary() # Determine system status if alert_summary["critical_count"] > 0: status = SystemStatus.CRITICAL elif error_summary["threshold_exceeded"] or alert_summary["error_count"] > 5: status = SystemStatus.DEGRADED else: status = SystemStatus.OPERATIONAL self.system_status = status uptime = (datetime.utcnow() - self.start_time).total_seconds() return { "status": status.value, "uptime_seconds": uptime, "timestamp": datetime.utcnow().isoformat(), "error_rate": error_summary["error_rate"], "error_threshold": self.error_monitor.error_threshold, "active_alerts": alert_summary["total_active"], "critical_alerts": alert_summary["critical_count"], "total_requests": self.metrics_collector.get_counter("total_requests", 0), "counters": self.metrics_collector.get_all_counters(), "gauges": self.metrics_collector.get_all_gauges() } def get_performance_dashboard(self) -> Dict[str, Any]: """Get performance metrics for dashboard display""" # Define key stages stages = ["pdf_processing", "classification", "model_routing", "synthesis"] stage_stats = {} for stage in stages: stage_stats[stage] = self.latency_tracker.get_stage_statistics(stage) return { "system_health": self.get_system_health(), "error_summary": self.error_monitor.get_error_summary(), "latency_by_stage": stage_stats, "active_alerts": [a.to_dict() for a in self.alert_manager.get_active_alerts()], "timestamp": datetime.utcnow().isoformat() } def start_monitoring(self): """Start monitoring services (placeholder for initialization)""" logger.info("Monitoring services started") self.system_status = SystemStatus.OPERATIONAL def track_request(self, endpoint: str, latency_ms: float, status_code: int): """Track incoming request for monitoring""" # Record latency metric self.metrics_collector.record_metric( f"request_latency_{endpoint}", latency_ms, unit="milliseconds", tags={"endpoint": endpoint, "status_code": str(status_code)} ) # Increment request counter self.metrics_collector.increment_counter("total_requests") self.metrics_collector.increment_counter(f"requests_{endpoint}") # Track status code if status_code >= 500: self.metrics_collector.increment_counter("server_errors") elif status_code >= 400: self.metrics_collector.increment_counter("client_errors") else: self.metrics_collector.increment_counter("successful_requests") def track_error(self, endpoint: str, error_type: str, error_message: str): """Track error occurrence""" self.error_monitor.record_error( error_type=error_type, message=error_message, component=endpoint, details={"endpoint": endpoint} ) # Increment error counter self.metrics_collector.increment_counter("total_errors") self.metrics_collector.increment_counter(f"errors_{error_type}") def get_cache_statistics(self) -> Dict[str, Any]: """Get cache performance statistics from real cache service""" return self.cache_service.get_statistics() def cache_result(self, data: Any, result: Any, ttl: Optional[int] = None): """ Cache a computation result with SHA256 fingerprint Args: data: Input data to fingerprint result: Result to cache ttl: Time to live in seconds """ fingerprint = self.cache_service._compute_fingerprint(data) self.cache_service.set(fingerprint, result, ttl) logger.debug(f"Cached result for fingerprint: {fingerprint[:16]}...") def get_cached_result(self, data: Any) -> Optional[Any]: """ Retrieve cached result by computing fingerprint Args: data: Input data to fingerprint Returns: Cached result if found, None otherwise """ fingerprint = self.cache_service._compute_fingerprint(data) return self.cache_service.get(fingerprint) def get_or_compute_cached( self, data: Any, compute_fn: callable, ttl: Optional[int] = None ) -> Tuple[Any, bool]: """ Get cached result or compute and cache it Args: data: Input data to fingerprint compute_fn: Function to compute result if not cached ttl: Time to live for cached result Returns: Tuple of (result, was_cached) """ return self.cache_service.get_or_compute(data, compute_fn, ttl) def get_recent_alerts(self, limit: int = 10) -> List[Dict[str, Any]]: """Get recent alerts""" alerts = self.alert_manager.get_active_alerts() recent = sorted(alerts, key=lambda a: a.timestamp, reverse=True)[:limit] return [a.to_dict() for a in recent] # Global monitoring service instance _monitoring_service = None def get_monitoring_service() -> MonitoringService: """Get singleton monitoring service instance""" global _monitoring_service if _monitoring_service is None: _monitoring_service = MonitoringService() return _monitoring_service