Spaces:
Runtime error
Runtime error
| """ | |
| Performance optimization module for DigiPal application. | |
| This module provides: | |
| - Lazy model loading with optimization | |
| - Background task system for attribute decay and evolution checks | |
| - Resource cleanup and garbage collection | |
| - Database query optimization | |
| - Memory usage monitoring | |
| """ | |
| import logging | |
| import time | |
| import threading | |
| import gc | |
| import psutil | |
| import weakref | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional, Any, Callable, Tuple | |
| from dataclasses import dataclass | |
| from collections import defaultdict | |
| import asyncio | |
| import sqlite3 | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| from .models import DigiPal | |
| from .enums import LifeStage | |
| from ..storage.storage_manager import StorageManager | |
| logger = logging.getLogger(__name__) | |
| class ModelLoadingConfig: | |
| """Configuration for model loading optimization.""" | |
| lazy_loading: bool = True | |
| quantization: bool = True | |
| model_cache_size: int = 2 # Maximum models to keep in memory | |
| unload_after_idle_minutes: int = 30 | |
| preload_on_startup: bool = False | |
| use_cpu_offload: bool = True | |
| class BackgroundTaskConfig: | |
| """Configuration for background tasks.""" | |
| attribute_decay_interval: int = 300 # 5 minutes | |
| evolution_check_interval: int = 600 # 10 minutes | |
| memory_cleanup_interval: int = 1800 # 30 minutes | |
| database_optimization_interval: int = 3600 # 1 hour | |
| performance_monitoring_interval: int = 60 # 1 minute | |
| class PerformanceMetrics: | |
| """Performance metrics tracking.""" | |
| timestamp: datetime | |
| cpu_usage: float | |
| memory_usage: float | |
| gpu_memory_usage: float | |
| active_pets: int | |
| cached_models: int | |
| database_connections: int | |
| response_time_avg: float | |
| class LazyModelLoader: | |
| """Lazy loading system for AI models with optimization.""" | |
| def __init__(self, config: ModelLoadingConfig): | |
| """Initialize lazy model loader.""" | |
| self.config = config | |
| self.loaded_models: Dict[str, Any] = {} | |
| self.model_last_used: Dict[str, datetime] = {} | |
| self.model_loading_locks: Dict[str, threading.Lock] = defaultdict(threading.Lock) | |
| self._cleanup_thread = None | |
| self._stop_cleanup = False | |
| logger.info("Lazy model loader initialized") | |
| def get_language_model(self, model_name: str = "Qwen/Qwen3-0.6B") -> Tuple[Any, Any]: | |
| """ | |
| Get language model with lazy loading. | |
| Args: | |
| model_name: Model identifier | |
| Returns: | |
| Tuple of (model, tokenizer) | |
| """ | |
| model_key = f"language_{model_name}" | |
| with self.model_loading_locks[model_key]: | |
| # Check if model is already loaded | |
| if model_key in self.loaded_models: | |
| self.model_last_used[model_key] = datetime.now() | |
| model_data = self.loaded_models[model_key] | |
| return model_data['model'], model_data['tokenizer'] | |
| # Load model if not in cache | |
| if self.config.lazy_loading: | |
| logger.info(f"Lazy loading language model: {model_name}") | |
| model, tokenizer = self._load_language_model(model_name) | |
| # Cache the model | |
| self.loaded_models[model_key] = { | |
| 'model': model, | |
| 'tokenizer': tokenizer, | |
| 'type': 'language', | |
| 'size_mb': self._estimate_model_size(model) | |
| } | |
| self.model_last_used[model_key] = datetime.now() | |
| # Manage cache size | |
| self._manage_model_cache() | |
| return model, tokenizer | |
| else: | |
| # Direct loading without caching | |
| return self._load_language_model(model_name) | |
| def get_speech_model(self, model_name: str = "kyutai/stt-2.6b-en-trfs") -> Tuple[Any, Any]: | |
| """ | |
| Get speech model with lazy loading. | |
| Args: | |
| model_name: Model identifier | |
| Returns: | |
| Tuple of (model, processor) | |
| """ | |
| model_key = f"speech_{model_name}" | |
| with self.model_loading_locks[model_key]: | |
| # Check if model is already loaded | |
| if model_key in self.loaded_models: | |
| self.model_last_used[model_key] = datetime.now() | |
| model_data = self.loaded_models[model_key] | |
| return model_data['model'], model_data['processor'] | |
| # Load model if not in cache | |
| if self.config.lazy_loading: | |
| logger.info(f"Lazy loading speech model: {model_name}") | |
| model, processor = self._load_speech_model(model_name) | |
| # Cache the model | |
| self.loaded_models[model_key] = { | |
| 'model': model, | |
| 'processor': processor, | |
| 'type': 'speech', | |
| 'size_mb': self._estimate_model_size(model) | |
| } | |
| self.model_last_used[model_key] = datetime.now() | |
| # Manage cache size | |
| self._manage_model_cache() | |
| return model, processor | |
| else: | |
| # Direct loading without caching | |
| return self._load_speech_model(model_name) | |
| def _load_language_model(self, model_name: str) -> Tuple[Any, Any]: | |
| """Load language model with optimization.""" | |
| try: | |
| # Load tokenizer | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| # Configure model loading | |
| model_kwargs = { | |
| "torch_dtype": "auto", | |
| "device_map": "auto" if torch.cuda.is_available() else None | |
| } | |
| # Add quantization if enabled | |
| if self.config.quantization and torch.cuda.is_available(): | |
| from transformers import BitsAndBytesConfig | |
| quantization_config = BitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_compute_dtype=torch.float16, | |
| bnb_4bit_use_double_quant=True, | |
| bnb_4bit_quant_type="nf4" | |
| ) | |
| model_kwargs["quantization_config"] = quantization_config | |
| # Load model | |
| model = AutoModelForCausalLM.from_pretrained(model_name, **model_kwargs) | |
| # Enable CPU offload if configured | |
| if self.config.use_cpu_offload and hasattr(model, 'enable_model_cpu_offload'): | |
| model.enable_model_cpu_offload() | |
| logger.info(f"Successfully loaded language model: {model_name}") | |
| return model, tokenizer | |
| except Exception as e: | |
| logger.error(f"Failed to load language model {model_name}: {e}") | |
| raise | |
| def _load_speech_model(self, model_name: str) -> Tuple[Any, Any]: | |
| """Load speech model with optimization.""" | |
| try: | |
| from transformers import KyutaiSpeechToTextProcessor, KyutaiSpeechToTextForConditionalGeneration | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| processor = KyutaiSpeechToTextProcessor.from_pretrained(model_name) | |
| model = KyutaiSpeechToTextForConditionalGeneration.from_pretrained( | |
| model_name, | |
| device_map=device, | |
| torch_dtype="auto" | |
| ) | |
| logger.info(f"Successfully loaded speech model: {model_name}") | |
| return model, processor | |
| except Exception as e: | |
| logger.error(f"Failed to load speech model {model_name}: {e}") | |
| raise | |
| def _estimate_model_size(self, model) -> float: | |
| """Estimate model size in MB.""" | |
| try: | |
| if hasattr(model, 'get_memory_footprint'): | |
| return model.get_memory_footprint() / (1024 * 1024) | |
| else: | |
| # Rough estimation based on parameters | |
| total_params = sum(p.numel() for p in model.parameters()) | |
| # Assume 4 bytes per parameter (float32) or 2 bytes (float16) | |
| bytes_per_param = 2 if self.config.quantization else 4 | |
| return (total_params * bytes_per_param) / (1024 * 1024) | |
| except: | |
| return 1000.0 # Default estimate | |
| def _manage_model_cache(self) -> None: | |
| """Manage model cache size by unloading least recently used models.""" | |
| if len(self.loaded_models) <= self.config.model_cache_size: | |
| return | |
| # Sort models by last used time | |
| models_by_usage = sorted( | |
| self.loaded_models.items(), | |
| key=lambda x: self.model_last_used.get(x[0], datetime.min) | |
| ) | |
| # Unload oldest models | |
| models_to_unload = models_by_usage[:-self.config.model_cache_size] | |
| for model_key, model_data in models_to_unload: | |
| self._unload_model(model_key) | |
| logger.info(f"Unloaded model {model_key} due to cache size limit") | |
| def _unload_model(self, model_key: str) -> None: | |
| """Unload a specific model from memory.""" | |
| if model_key in self.loaded_models: | |
| model_data = self.loaded_models[model_key] | |
| # Clear model references | |
| if 'model' in model_data: | |
| del model_data['model'] | |
| if 'tokenizer' in model_data: | |
| del model_data['tokenizer'] | |
| if 'processor' in model_data: | |
| del model_data['processor'] | |
| # Remove from cache | |
| del self.loaded_models[model_key] | |
| if model_key in self.model_last_used: | |
| del self.model_last_used[model_key] | |
| # Force garbage collection | |
| gc.collect() | |
| # Clear CUDA cache if available | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| def start_cleanup_thread(self) -> None: | |
| """Start background cleanup thread for idle models.""" | |
| if self._cleanup_thread and self._cleanup_thread.is_alive(): | |
| return | |
| self._stop_cleanup = False | |
| self._cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True) | |
| self._cleanup_thread.start() | |
| logger.info("Started model cleanup thread") | |
| def stop_cleanup_thread(self) -> None: | |
| """Stop background cleanup thread.""" | |
| self._stop_cleanup = True | |
| if self._cleanup_thread: | |
| self._cleanup_thread.join(timeout=5) | |
| def _cleanup_loop(self) -> None: | |
| """Background cleanup loop for idle models.""" | |
| while not self._stop_cleanup: | |
| try: | |
| current_time = datetime.now() | |
| idle_threshold = timedelta(minutes=self.config.unload_after_idle_minutes) | |
| models_to_unload = [] | |
| for model_key, last_used in self.model_last_used.items(): | |
| if current_time - last_used > idle_threshold: | |
| models_to_unload.append(model_key) | |
| for model_key in models_to_unload: | |
| self._unload_model(model_key) | |
| logger.info(f"Unloaded idle model: {model_key}") | |
| # Sleep for cleanup interval | |
| time.sleep(300) # 5 minutes | |
| except Exception as e: | |
| logger.error(f"Error in model cleanup loop: {e}") | |
| time.sleep(60) | |
| def get_cache_info(self) -> Dict[str, Any]: | |
| """Get information about model cache.""" | |
| total_size_mb = sum( | |
| model_data.get('size_mb', 0) | |
| for model_data in self.loaded_models.values() | |
| ) | |
| return { | |
| 'loaded_models': len(self.loaded_models), | |
| 'cache_limit': self.config.model_cache_size, | |
| 'total_size_mb': total_size_mb, | |
| 'models': { | |
| key: { | |
| 'type': data.get('type', 'unknown'), | |
| 'size_mb': data.get('size_mb', 0), | |
| 'last_used': self.model_last_used.get(key, datetime.min).isoformat() | |
| } | |
| for key, data in self.loaded_models.items() | |
| } | |
| } | |
| def shutdown(self) -> None: | |
| """Shutdown the model loader and cleanup resources.""" | |
| logger.info("Shutting down lazy model loader") | |
| # Stop cleanup thread | |
| self.stop_cleanup_thread() | |
| # Unload all models | |
| for model_key in list(self.loaded_models.keys()): | |
| self._unload_model(model_key) | |
| logger.info("Lazy model loader shutdown complete") | |
| class BackgroundTaskManager: | |
| """Manages background tasks for attribute decay, evolution checks, etc.""" | |
| def __init__(self, config: BackgroundTaskConfig, storage_manager: StorageManager): | |
| """Initialize background task manager.""" | |
| self.config = config | |
| self.storage_manager = storage_manager | |
| self.active_tasks: Dict[str, threading.Thread] = {} | |
| self.task_stop_events: Dict[str, threading.Event] = {} | |
| self.task_callbacks: Dict[str, Callable] = {} | |
| # Performance tracking | |
| self.task_performance: Dict[str, List[float]] = defaultdict(list) | |
| logger.info("Background task manager initialized") | |
| def register_task(self, task_name: str, callback: Callable, interval_seconds: int) -> None: | |
| """ | |
| Register a background task. | |
| Args: | |
| task_name: Unique task name | |
| callback: Function to call periodically | |
| interval_seconds: Interval between calls in seconds | |
| """ | |
| self.task_callbacks[task_name] = callback | |
| self.task_stop_events[task_name] = threading.Event() | |
| # Create and start task thread | |
| task_thread = threading.Thread( | |
| target=self._task_loop, | |
| args=(task_name, callback, interval_seconds), | |
| daemon=True, | |
| name=f"BackgroundTask-{task_name}" | |
| ) | |
| self.active_tasks[task_name] = task_thread | |
| task_thread.start() | |
| logger.info(f"Registered background task: {task_name} (interval: {interval_seconds}s)") | |
| def _task_loop(self, task_name: str, callback: Callable, interval_seconds: int) -> None: | |
| """Background task execution loop.""" | |
| stop_event = self.task_stop_events[task_name] | |
| while not stop_event.is_set(): | |
| try: | |
| start_time = time.time() | |
| # Execute task callback | |
| callback() | |
| # Track performance | |
| execution_time = time.time() - start_time | |
| self.task_performance[task_name].append(execution_time) | |
| # Keep only recent performance data | |
| if len(self.task_performance[task_name]) > 100: | |
| self.task_performance[task_name] = self.task_performance[task_name][-50:] | |
| # Wait for next execution | |
| stop_event.wait(timeout=interval_seconds) | |
| except Exception as e: | |
| logger.error(f"Error in background task {task_name}: {e}") | |
| # Wait before retrying | |
| stop_event.wait(timeout=min(60, interval_seconds)) | |
| def stop_task(self, task_name: str) -> None: | |
| """Stop a specific background task.""" | |
| if task_name in self.task_stop_events: | |
| self.task_stop_events[task_name].set() | |
| if task_name in self.active_tasks: | |
| thread = self.active_tasks[task_name] | |
| thread.join(timeout=5) | |
| del self.active_tasks[task_name] | |
| logger.info(f"Stopped background task: {task_name}") | |
| def stop_all_tasks(self) -> None: | |
| """Stop all background tasks.""" | |
| logger.info("Stopping all background tasks") | |
| # Signal all tasks to stop | |
| for stop_event in self.task_stop_events.values(): | |
| stop_event.set() | |
| # Wait for all threads to finish | |
| for task_name, thread in self.active_tasks.items(): | |
| thread.join(timeout=5) | |
| logger.debug(f"Stopped task: {task_name}") | |
| # Clear task data | |
| self.active_tasks.clear() | |
| self.task_stop_events.clear() | |
| logger.info("All background tasks stopped") | |
| def get_task_performance(self) -> Dict[str, Dict[str, float]]: | |
| """Get performance statistics for all tasks.""" | |
| performance_stats = {} | |
| for task_name, execution_times in self.task_performance.items(): | |
| if execution_times: | |
| performance_stats[task_name] = { | |
| 'avg_execution_time': sum(execution_times) / len(execution_times), | |
| 'max_execution_time': max(execution_times), | |
| 'min_execution_time': min(execution_times), | |
| 'total_executions': len(execution_times) | |
| } | |
| else: | |
| performance_stats[task_name] = { | |
| 'avg_execution_time': 0.0, | |
| 'max_execution_time': 0.0, | |
| 'min_execution_time': 0.0, | |
| 'total_executions': 0 | |
| } | |
| return performance_stats | |
| class DatabaseOptimizer: | |
| """Database query optimization and maintenance.""" | |
| def __init__(self, storage_manager: StorageManager): | |
| """Initialize database optimizer.""" | |
| self.storage_manager = storage_manager | |
| self.query_cache: Dict[str, Tuple[Any, datetime]] = {} | |
| self.cache_ttl_seconds = 300 # 5 minutes | |
| logger.info("Database optimizer initialized") | |
| def optimize_database(self) -> Dict[str, Any]: | |
| """Perform database optimization tasks.""" | |
| results = {} | |
| try: | |
| # Analyze database | |
| results['analyze'] = self._analyze_database() | |
| # Vacuum database | |
| results['vacuum'] = self._vacuum_database() | |
| # Update statistics | |
| results['statistics'] = self._update_statistics() | |
| # Check indexes | |
| results['indexes'] = self._check_indexes() | |
| logger.info("Database optimization completed") | |
| except Exception as e: | |
| logger.error(f"Database optimization failed: {e}") | |
| results['error'] = str(e) | |
| return results | |
| def _analyze_database(self) -> Dict[str, Any]: | |
| """Analyze database for optimization opportunities.""" | |
| try: | |
| db = self.storage_manager.db | |
| # Get table sizes | |
| table_info = db.execute_query(""" | |
| SELECT name, | |
| (SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=m.name) as table_count | |
| FROM sqlite_master m WHERE type='table' | |
| """) | |
| # Get index information | |
| index_info = db.execute_query(""" | |
| SELECT name, tbl_name, sql | |
| FROM sqlite_master | |
| WHERE type='index' AND sql IS NOT NULL | |
| """) | |
| return { | |
| 'tables': table_info, | |
| 'indexes': index_info, | |
| 'database_size': self._get_database_size() | |
| } | |
| except Exception as e: | |
| logger.error(f"Database analysis failed: {e}") | |
| return {'error': str(e)} | |
| def _vacuum_database(self) -> Dict[str, Any]: | |
| """Vacuum database to reclaim space.""" | |
| try: | |
| db = self.storage_manager.db | |
| # Get size before vacuum | |
| size_before = self._get_database_size() | |
| # Perform vacuum | |
| db.execute_update("VACUUM") | |
| # Get size after vacuum | |
| size_after = self._get_database_size() | |
| space_reclaimed = size_before - size_after | |
| return { | |
| 'size_before_mb': size_before / (1024 * 1024), | |
| 'size_after_mb': size_after / (1024 * 1024), | |
| 'space_reclaimed_mb': space_reclaimed / (1024 * 1024) | |
| } | |
| except Exception as e: | |
| logger.error(f"Database vacuum failed: {e}") | |
| return {'error': str(e)} | |
| def _update_statistics(self) -> Dict[str, Any]: | |
| """Update database statistics.""" | |
| try: | |
| db = self.storage_manager.db | |
| # Analyze all tables | |
| db.execute_update("ANALYZE") | |
| return {'status': 'completed'} | |
| except Exception as e: | |
| logger.error(f"Statistics update failed: {e}") | |
| return {'error': str(e)} | |
| def _check_indexes(self) -> Dict[str, Any]: | |
| """Check and suggest index optimizations.""" | |
| try: | |
| db = self.storage_manager.db | |
| # Check for missing indexes on frequently queried columns | |
| suggestions = [] | |
| # Check digipals table | |
| digipals_count = db.execute_query("SELECT COUNT(*) as count FROM digipals")[0]['count'] | |
| if digipals_count > 1000: | |
| # Suggest index on user_id if not exists | |
| existing_indexes = db.execute_query(""" | |
| SELECT name FROM sqlite_master | |
| WHERE type='index' AND tbl_name='digipals' AND sql LIKE '%user_id%' | |
| """) | |
| if not existing_indexes: | |
| suggestions.append("CREATE INDEX idx_digipals_user_id ON digipals(user_id)") | |
| # Check interactions table | |
| interactions_count = db.execute_query("SELECT COUNT(*) as count FROM interactions")[0]['count'] | |
| if interactions_count > 5000: | |
| # Suggest index on digipal_id and timestamp | |
| existing_indexes = db.execute_query(""" | |
| SELECT name FROM sqlite_master | |
| WHERE type='index' AND tbl_name='interactions' | |
| AND sql LIKE '%digipal_id%' AND sql LIKE '%timestamp%' | |
| """) | |
| if not existing_indexes: | |
| suggestions.append("CREATE INDEX idx_interactions_digipal_timestamp ON interactions(digipal_id, timestamp)") | |
| return { | |
| 'suggestions': suggestions, | |
| 'digipals_count': digipals_count, | |
| 'interactions_count': interactions_count | |
| } | |
| except Exception as e: | |
| logger.error(f"Index check failed: {e}") | |
| return {'error': str(e)} | |
| def _get_database_size(self) -> int: | |
| """Get database file size in bytes.""" | |
| try: | |
| import os | |
| return os.path.getsize(self.storage_manager.db_path) | |
| except: | |
| return 0 | |
| def create_suggested_indexes(self) -> Dict[str, Any]: | |
| """Create suggested indexes for better performance.""" | |
| try: | |
| db = self.storage_manager.db | |
| results = [] | |
| # Essential indexes for DigiPal application | |
| indexes_to_create = [ | |
| "CREATE INDEX IF NOT EXISTS idx_digipals_user_id ON digipals(user_id)", | |
| "CREATE INDEX IF NOT EXISTS idx_digipals_active ON digipals(is_active)", | |
| "CREATE INDEX IF NOT EXISTS idx_interactions_digipal ON interactions(digipal_id)", | |
| "CREATE INDEX IF NOT EXISTS idx_interactions_timestamp ON interactions(timestamp)", | |
| "CREATE INDEX IF NOT EXISTS idx_care_actions_digipal ON care_actions(digipal_id)", | |
| "CREATE INDEX IF NOT EXISTS idx_users_id ON users(id)" | |
| ] | |
| for index_sql in indexes_to_create: | |
| try: | |
| db.execute_update(index_sql) | |
| results.append(f"Created: {index_sql}") | |
| except Exception as e: | |
| results.append(f"Failed: {index_sql} - {e}") | |
| return {'results': results} | |
| except Exception as e: | |
| logger.error(f"Index creation failed: {e}") | |
| return {'error': str(e)} | |
| class PerformanceMonitor: | |
| """System performance monitoring and optimization.""" | |
| def __init__(self): | |
| """Initialize performance monitor.""" | |
| self.metrics_history: List[PerformanceMetrics] = [] | |
| self.max_history_size = 1440 # 24 hours of minute-by-minute data | |
| logger.info("Performance monitor initialized") | |
| def collect_metrics(self, active_pets: int = 0, cached_models: int = 0, | |
| response_time_avg: float = 0.0) -> PerformanceMetrics: | |
| """Collect current performance metrics.""" | |
| try: | |
| # System metrics | |
| cpu_usage = psutil.cpu_percent(interval=1) | |
| memory = psutil.virtual_memory() | |
| memory_usage = memory.percent | |
| # GPU metrics | |
| gpu_memory_usage = 0.0 | |
| if torch.cuda.is_available(): | |
| gpu_memory_usage = (torch.cuda.memory_allocated() / torch.cuda.max_memory_allocated()) * 100 | |
| # Database connections (approximate) | |
| database_connections = 1 # Simplified for SQLite | |
| metrics = PerformanceMetrics( | |
| timestamp=datetime.now(), | |
| cpu_usage=cpu_usage, | |
| memory_usage=memory_usage, | |
| gpu_memory_usage=gpu_memory_usage, | |
| active_pets=active_pets, | |
| cached_models=cached_models, | |
| database_connections=database_connections, | |
| response_time_avg=response_time_avg | |
| ) | |
| # Add to history | |
| self.metrics_history.append(metrics) | |
| # Manage history size | |
| if len(self.metrics_history) > self.max_history_size: | |
| self.metrics_history = self.metrics_history[-self.max_history_size:] | |
| return metrics | |
| except Exception as e: | |
| logger.error(f"Failed to collect performance metrics: {e}") | |
| return PerformanceMetrics( | |
| timestamp=datetime.now(), | |
| cpu_usage=0.0, | |
| memory_usage=0.0, | |
| gpu_memory_usage=0.0, | |
| active_pets=active_pets, | |
| cached_models=cached_models, | |
| database_connections=0, | |
| response_time_avg=response_time_avg | |
| ) | |
| def get_performance_summary(self, hours: int = 1) -> Dict[str, Any]: | |
| """Get performance summary for the last N hours.""" | |
| if not self.metrics_history: | |
| return {} | |
| cutoff_time = datetime.now() - timedelta(hours=hours) | |
| recent_metrics = [m for m in self.metrics_history if m.timestamp > cutoff_time] | |
| if not recent_metrics: | |
| recent_metrics = self.metrics_history[-60:] # Last 60 data points | |
| # Calculate averages | |
| avg_cpu = sum(m.cpu_usage for m in recent_metrics) / len(recent_metrics) | |
| avg_memory = sum(m.memory_usage for m in recent_metrics) / len(recent_metrics) | |
| avg_gpu_memory = sum(m.gpu_memory_usage for m in recent_metrics) / len(recent_metrics) | |
| avg_response_time = sum(m.response_time_avg for m in recent_metrics) / len(recent_metrics) | |
| # Calculate peaks | |
| max_cpu = max(m.cpu_usage for m in recent_metrics) | |
| max_memory = max(m.memory_usage for m in recent_metrics) | |
| max_gpu_memory = max(m.gpu_memory_usage for m in recent_metrics) | |
| return { | |
| 'time_period_hours': hours, | |
| 'data_points': len(recent_metrics), | |
| 'averages': { | |
| 'cpu_usage': avg_cpu, | |
| 'memory_usage': avg_memory, | |
| 'gpu_memory_usage': avg_gpu_memory, | |
| 'response_time': avg_response_time | |
| }, | |
| 'peaks': { | |
| 'cpu_usage': max_cpu, | |
| 'memory_usage': max_memory, | |
| 'gpu_memory_usage': max_gpu_memory | |
| }, | |
| 'current': { | |
| 'active_pets': recent_metrics[-1].active_pets if recent_metrics else 0, | |
| 'cached_models': recent_metrics[-1].cached_models if recent_metrics else 0 | |
| } | |
| } | |
| def check_performance_alerts(self) -> List[Dict[str, Any]]: | |
| """Check for performance issues and return alerts.""" | |
| alerts = [] | |
| if not self.metrics_history: | |
| return alerts | |
| latest = self.metrics_history[-1] | |
| # CPU usage alert | |
| if latest.cpu_usage > 80: | |
| alerts.append({ | |
| 'type': 'high_cpu', | |
| 'severity': 'warning' if latest.cpu_usage < 90 else 'critical', | |
| 'message': f"High CPU usage: {latest.cpu_usage:.1f}%", | |
| 'value': latest.cpu_usage | |
| }) | |
| # Memory usage alert | |
| if latest.memory_usage > 85: | |
| alerts.append({ | |
| 'type': 'high_memory', | |
| 'severity': 'warning' if latest.memory_usage < 95 else 'critical', | |
| 'message': f"High memory usage: {latest.memory_usage:.1f}%", | |
| 'value': latest.memory_usage | |
| }) | |
| # GPU memory alert | |
| if latest.gpu_memory_usage > 90: | |
| alerts.append({ | |
| 'type': 'high_gpu_memory', | |
| 'severity': 'warning', | |
| 'message': f"High GPU memory usage: {latest.gpu_memory_usage:.1f}%", | |
| 'value': latest.gpu_memory_usage | |
| }) | |
| # Response time alert | |
| if latest.response_time_avg > 5.0: | |
| alerts.append({ | |
| 'type': 'slow_response', | |
| 'severity': 'warning', | |
| 'message': f"Slow response time: {latest.response_time_avg:.2f}s", | |
| 'value': latest.response_time_avg | |
| }) | |
| return alerts | |
| def suggest_optimizations(self) -> List[str]: | |
| """Suggest performance optimizations based on metrics.""" | |
| suggestions = [] | |
| if not self.metrics_history: | |
| return suggestions | |
| # Analyze recent performance | |
| recent_metrics = self.metrics_history[-60:] # Last hour of data | |
| avg_cpu = sum(m.cpu_usage for m in recent_metrics) / len(recent_metrics) | |
| avg_memory = sum(m.memory_usage for m in recent_metrics) / len(recent_metrics) | |
| avg_gpu_memory = sum(m.gpu_memory_usage for m in recent_metrics) / len(recent_metrics) | |
| # CPU optimization suggestions | |
| if avg_cpu > 70: | |
| suggestions.append("Consider reducing background task frequency") | |
| suggestions.append("Enable model CPU offloading to reduce CPU load") | |
| # Memory optimization suggestions | |
| if avg_memory > 80: | |
| suggestions.append("Reduce model cache size to free memory") | |
| suggestions.append("Enable more aggressive memory cleanup") | |
| suggestions.append("Consider using smaller quantized models") | |
| # GPU memory optimization suggestions | |
| if avg_gpu_memory > 80: | |
| suggestions.append("Enable 4-bit quantization for models") | |
| suggestions.append("Reduce maximum concurrent model loading") | |
| suggestions.append("Use CPU offloading for less frequently used models") | |
| # General suggestions | |
| if len(recent_metrics) > 0: | |
| max_active_pets = max(m.active_pets for m in recent_metrics) | |
| if max_active_pets > 100: | |
| suggestions.append("Consider implementing pet data pagination") | |
| suggestions.append("Optimize database queries with better indexing") | |
| return suggestions | |
| class ResourceCleanupManager: | |
| """Manages resource cleanup and garbage collection.""" | |
| def __init__(self): | |
| """Initialize resource cleanup manager.""" | |
| self.cleanup_callbacks: List[Callable] = [] | |
| self.last_cleanup = datetime.now() | |
| logger.info("Resource cleanup manager initialized") | |
| def register_cleanup_callback(self, callback: Callable) -> None: | |
| """Register a cleanup callback function.""" | |
| self.cleanup_callbacks.append(callback) | |
| def perform_cleanup(self, force_gc: bool = True) -> Dict[str, Any]: | |
| """Perform comprehensive resource cleanup.""" | |
| cleanup_results = {} | |
| try: | |
| # Execute registered cleanup callbacks | |
| for i, callback in enumerate(self.cleanup_callbacks): | |
| try: | |
| callback() | |
| cleanup_results[f'callback_{i}'] = 'success' | |
| except Exception as e: | |
| cleanup_results[f'callback_{i}'] = f'error: {e}' | |
| # Python garbage collection | |
| if force_gc: | |
| collected = gc.collect() | |
| cleanup_results['gc_collected'] = collected | |
| # PyTorch cleanup | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| cleanup_results['cuda_cache_cleared'] = True | |
| # Update last cleanup time | |
| self.last_cleanup = datetime.now() | |
| cleanup_results['cleanup_time'] = self.last_cleanup.isoformat() | |
| logger.info("Resource cleanup completed") | |
| except Exception as e: | |
| logger.error(f"Resource cleanup failed: {e}") | |
| cleanup_results['error'] = str(e) | |
| return cleanup_results | |
| def get_memory_info(self) -> Dict[str, Any]: | |
| """Get current memory usage information.""" | |
| try: | |
| # System memory | |
| memory = psutil.virtual_memory() | |
| # Python memory (approximate) | |
| import sys | |
| python_objects = len(gc.get_objects()) | |
| # PyTorch memory | |
| torch_memory = {} | |
| if torch.cuda.is_available(): | |
| torch_memory = { | |
| 'allocated_mb': torch.cuda.memory_allocated() / (1024 * 1024), | |
| 'cached_mb': torch.cuda.memory_reserved() / (1024 * 1024), | |
| 'max_allocated_mb': torch.cuda.max_memory_allocated() / (1024 * 1024) | |
| } | |
| return { | |
| 'system_memory': { | |
| 'total_mb': memory.total / (1024 * 1024), | |
| 'available_mb': memory.available / (1024 * 1024), | |
| 'used_mb': memory.used / (1024 * 1024), | |
| 'percent': memory.percent | |
| }, | |
| 'python_objects': python_objects, | |
| 'torch_memory': torch_memory, | |
| 'last_cleanup': self.last_cleanup.isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to get memory info: {e}") | |
| return {'error': str(e)} |