Spaces:
Sleeping
Sleeping
| """ | |
| Performance optimization API endpoints for monitoring and managing database performance. | |
| """ | |
| from fastapi import APIRouter, HTTPException, Query | |
| from typing import Dict, Any, List, Optional | |
| import logging | |
| from app.database.indexes import index_manager | |
| from app.database.query_optimizer import query_optimizer, memory_aggregator | |
| from app.repositories.cache_repository import cache_manager | |
| from app.utils.performance_monitor import get_performance_report | |
| from app.utils.simple_log_sanitizer import get_simple_sanitized_logger | |
| logger = get_simple_sanitized_logger(__name__) | |
| router = APIRouter() | |
| async def get_database_indexes() -> Dict[str, Any]: | |
| """Get database index information and usage statistics""" | |
| try: | |
| # Get index usage stats | |
| usage_stats = await index_manager.get_index_usage_stats() | |
| return { | |
| "status": "success", | |
| "index_usage_stats": usage_stats, | |
| "recommendations": [ | |
| "Monitor index usage regularly", | |
| "Remove unused indexes to improve write performance", | |
| "Add indexes for frequently queried fields" | |
| ] | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting database indexes: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to get database index information") | |
| async def create_database_indexes(force_recreate: bool = False) -> Dict[str, Any]: | |
| """Create or recreate database indexes""" | |
| try: | |
| result = await index_manager.create_indexes(force_recreate=force_recreate) | |
| return { | |
| "status": "success", | |
| "result": result, | |
| "message": f"Index creation completed. Created: {len(result['created'])}, Existing: {len(result['existing'])}, Errors: {len(result['errors'])}" | |
| } | |
| except Exception as e: | |
| logger.error(f"Error creating database indexes: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to create database indexes") | |
| async def get_collection_stats(collection_name: str = Query(..., description="Collection name to analyze")) -> Dict[str, Any]: | |
| """Get performance statistics for a specific collection""" | |
| try: | |
| optimization_result = await index_manager.optimize_collection(collection_name) | |
| return { | |
| "status": "success", | |
| "collection_stats": optimization_result | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting collection stats for {collection_name}: {e}") | |
| raise HTTPException(status_code=500, detail=f"Failed to get stats for collection {collection_name}") | |
| async def get_cache_stats() -> Dict[str, Any]: | |
| """Get cache performance statistics""" | |
| try: | |
| cache_stats = cache_manager.get_cache_stats() | |
| return { | |
| "status": "success", | |
| "cache_stats": cache_stats, | |
| "recommendations": [ | |
| f"Cache hit rate: {cache_stats['hit_rate_percent']}%", | |
| "Consider increasing cache TTL for frequently accessed data" if cache_stats['hit_rate_percent'] < 80 else "Cache performance is good", | |
| "Monitor cache warming operations for efficiency" | |
| ] | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting cache stats: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to get cache statistics") | |
| async def invalidate_cache( | |
| key: Optional[str] = Query(None, description="Specific cache key to invalidate"), | |
| pattern: Optional[str] = Query(None, description="Pattern to match for bulk invalidation") | |
| ) -> Dict[str, Any]: | |
| """Invalidate cache entries""" | |
| try: | |
| if key: | |
| await cache_manager.invalidate_cache(key) | |
| message = f"Cache invalidated for key: {key}" | |
| elif pattern: | |
| await cache_manager.invalidate_pattern(pattern) | |
| message = f"Cache invalidated for pattern: {pattern}" | |
| else: | |
| raise HTTPException(status_code=400, detail="Either key or pattern must be provided") | |
| return { | |
| "status": "success", | |
| "message": message | |
| } | |
| except Exception as e: | |
| logger.error(f"Error invalidating cache: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to invalidate cache") | |
| async def get_query_optimizer_stats() -> Dict[str, Any]: | |
| """Get query optimizer statistics""" | |
| try: | |
| optimizer_stats = query_optimizer.get_query_stats() | |
| return { | |
| "status": "success", | |
| "optimizer_stats": optimizer_stats | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting query optimizer stats: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to get query optimizer statistics") | |
| async def get_memory_usage() -> Dict[str, Any]: | |
| """Get current memory usage statistics""" | |
| try: | |
| import psutil | |
| import os | |
| process = psutil.Process(os.getpid()) | |
| memory_info = process.memory_info() | |
| memory_stats = { | |
| "rss_mb": round(memory_info.rss / 1024 / 1024, 2), | |
| "vms_mb": round(memory_info.vms / 1024 / 1024, 2), | |
| "percent": round(process.memory_percent(), 2), | |
| "available_mb": round(psutil.virtual_memory().available / 1024 / 1024, 2), | |
| "total_mb": round(psutil.virtual_memory().total / 1024 / 1024, 2) | |
| } | |
| recommendations = [] | |
| if memory_stats["percent"] > 80: | |
| recommendations.append("High memory usage detected - consider optimization") | |
| if memory_stats["rss_mb"] > 500: | |
| recommendations.append("Large memory footprint - monitor for memory leaks") | |
| return { | |
| "status": "success", | |
| "memory_stats": memory_stats, | |
| "recommendations": recommendations | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting memory usage: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to get memory usage statistics") | |
| async def get_comprehensive_performance_report() -> Dict[str, Any]: | |
| """Get comprehensive performance report""" | |
| try: | |
| # Gather all performance data | |
| performance_report = get_performance_report() | |
| cache_stats = cache_manager.get_cache_stats() | |
| optimizer_stats = query_optimizer.get_query_stats() | |
| # Memory usage | |
| import psutil | |
| import os | |
| process = psutil.Process(os.getpid()) | |
| memory_percent = round(process.memory_percent(), 2) | |
| # Generate overall recommendations | |
| recommendations = [] | |
| # Database performance | |
| avg_time = performance_report.get("metrics", {}).get("average_time", 0) | |
| if avg_time > 0.5: | |
| recommendations.append("Database queries are slow - consider adding indexes") | |
| # Cache performance | |
| hit_rate = cache_stats.get("hit_rate_percent", 0) | |
| if hit_rate < 70: | |
| recommendations.append("Low cache hit rate - optimize caching strategy") | |
| # Memory usage | |
| if memory_percent > 80: | |
| recommendations.append("High memory usage - investigate memory leaks") | |
| # Overall health score | |
| health_score = 100 | |
| if avg_time > 0.5: | |
| health_score -= 20 | |
| if hit_rate < 70: | |
| health_score -= 15 | |
| if memory_percent > 80: | |
| health_score -= 25 | |
| return { | |
| "status": "success", | |
| "performance_report": { | |
| "overall_health_score": max(0, health_score), | |
| "database_performance": performance_report, | |
| "cache_performance": cache_stats, | |
| "query_optimization": optimizer_stats, | |
| "memory_usage_percent": memory_percent, | |
| "recommendations": recommendations | |
| }, | |
| "timestamp": psutil.boot_time() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error generating comprehensive performance report: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to generate performance report") | |
| async def optimize_collection(collection_name: str = Query(..., description="Collection to optimize")) -> Dict[str, Any]: | |
| """Run optimization on a specific collection""" | |
| try: | |
| # Get collection stats | |
| stats = await index_manager.optimize_collection(collection_name) | |
| # Create indexes if needed | |
| index_result = await index_manager.create_indexes() | |
| return { | |
| "status": "success", | |
| "collection_stats": stats, | |
| "index_creation": index_result, | |
| "message": f"Optimization completed for collection: {collection_name}" | |
| } | |
| except Exception as e: | |
| logger.error(f"Error optimizing collection {collection_name}: {e}") | |
| raise HTTPException(status_code=500, detail=f"Failed to optimize collection {collection_name}") | |
| async def get_slow_queries(limit: int = Query(10, description="Number of slow queries to return")) -> Dict[str, Any]: | |
| """Get information about slow queries""" | |
| try: | |
| performance_report = get_performance_report() | |
| slow_queries = performance_report.get("metrics", {}).get("slow_queries", []) | |
| # Limit results | |
| limited_queries = slow_queries[-limit:] if slow_queries else [] | |
| return { | |
| "status": "success", | |
| "slow_queries": limited_queries, | |
| "total_slow_queries": len(slow_queries), | |
| "recommendations": [ | |
| "Add indexes for frequently queried fields", | |
| "Optimize aggregation pipeline stages", | |
| "Consider query result caching", | |
| "Use projection to limit returned fields" | |
| ] | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting slow queries: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to get slow query information") |