bookmyservice-mhs / app /startup.py
MukeshKapoor25's picture
perf(optimization): Implement comprehensive performance optimization strategy
79ca9ba
"""
Application startup procedures including database optimization and resource initialization.
"""
import asyncio
import logging
from typing import Dict, Any
from app.database.indexes import ensure_indexes
from app.repositories.cache_repository import cache_manager
from app.services.advanced_nlp import advanced_nlp_pipeline
from app.utils.simple_log_sanitizer import get_simple_sanitized_logger
logger = get_simple_sanitized_logger(__name__)
class StartupManager:
"""Manages application startup procedures"""
def __init__(self):
self.startup_tasks = []
self.startup_completed = False
async def initialize_database_indexes(self) -> Dict[str, Any]:
"""Initialize database indexes for optimal performance"""
logger.info("πŸ”§ Initializing database indexes...")
try:
result = await ensure_indexes()
# Check if errors are just index conflicts (which are acceptable)
index_conflict_errors = []
other_errors = []
for error in result.get("errors", []):
if "Index already exists" in error or "equivalent index already exists" in error.lower():
index_conflict_errors.append(error)
else:
other_errors.append(error)
if other_errors:
logger.warning(f"Some indexes failed to create with serious errors: {other_errors}")
return {"status": "partial", "result": result, "serious_errors": other_errors}
elif index_conflict_errors:
logger.info(f"βœ… Database indexes initialized (some already existed): {len(index_conflict_errors)} conflicts")
return {"status": "success", "result": result, "index_conflicts": len(index_conflict_errors)}
else:
logger.info("βœ… Database indexes initialized successfully")
return {"status": "success", "result": result}
except Exception as e:
logger.error(f"❌ Failed to initialize database indexes: {e}")
return {"status": "error", "error": str(e)}
async def warm_cache(self) -> Dict[str, Any]:
"""Warm up cache with common queries"""
logger.info("πŸ”₯ Warming up cache...")
try:
# Define common cache entries to preload
common_queries = [
{
"key": "business_categories",
"fetch_func": self._fetch_business_categories,
"expiry": 7200 # 2 hours
},
{
"key": "live_locations",
"fetch_func": self._fetch_live_locations,
"expiry": 3600 # 1 hour
}
]
await cache_manager.preload_cache(common_queries)
logger.info("βœ… Cache warming completed")
return {"status": "success", "preloaded": len(common_queries)}
except Exception as e:
logger.error(f"❌ Cache warming failed: {e}")
return {"status": "error", "error": str(e)}
async def _fetch_business_categories(self):
"""Fetch business categories for cache warming"""
# This would typically fetch from database
return {"categories": ["salon", "spa", "fitness", "dental"]}
async def _fetch_live_locations(self):
"""Fetch live locations for cache warming"""
# This would typically fetch from database
return {"locations": ["IN-SOUTH", "IN-NORTH", "IN-WEST"]}
async def initialize_nlp_models(self) -> Dict[str, Any]:
"""Initialize NLP models and processors"""
logger.info("🧠 Initializing NLP models...")
try:
# Pre-load spaCy model
await advanced_nlp_pipeline.async_processor.get_nlp_model()
logger.info("βœ… NLP models initialized successfully")
return {"status": "success"}
except Exception as e:
logger.error(f"❌ NLP model initialization failed: {e}")
return {"status": "error", "error": str(e)}
async def health_check_dependencies(self) -> Dict[str, Any]:
"""Check health of all dependencies"""
logger.info("πŸ₯ Checking dependency health...")
health_status = {
"mongodb": False,
"redis": False,
"nlp": False
}
try:
# Check MongoDB
from app.nosql import check_mongodb_health
health_status["mongodb"] = await check_mongodb_health()
# Check Redis
from app.nosql import check_redis_health
health_status["redis"] = await check_redis_health()
# Check NLP
try:
await advanced_nlp_pipeline.async_processor.get_nlp_model()
health_status["nlp"] = True
except Exception:
health_status["nlp"] = False
all_healthy = all(health_status.values())
if all_healthy:
logger.info("βœ… All dependencies are healthy")
else:
logger.warning(f"⚠️ Some dependencies are unhealthy: {health_status}")
return {
"status": "healthy" if all_healthy else "degraded",
"dependencies": health_status
}
except Exception as e:
logger.error(f"❌ Health check failed: {e}")
return {"status": "error", "error": str(e)}
async def run_startup_sequence(self) -> Dict[str, Any]:
"""Run complete startup sequence"""
logger.info("πŸš€ Starting application initialization...")
startup_results = {
"database_indexes": {"status": "pending"},
"cache_warming": {"status": "pending"},
"nlp_models": {"status": "pending"},
"health_check": {"status": "pending"}
}
try:
# Run startup tasks in parallel where possible
tasks = [
("database_indexes", self.initialize_database_indexes()),
("nlp_models", self.initialize_nlp_models()),
("health_check", self.health_check_dependencies())
]
# Execute parallel tasks
for task_name, task_coro in tasks:
try:
result = await task_coro
startup_results[task_name] = result
except Exception as e:
startup_results[task_name] = {"status": "error", "error": str(e)}
# Cache warming depends on database being ready
if startup_results["database_indexes"]["status"] in ["success", "partial"]:
try:
cache_result = await self.warm_cache()
startup_results["cache_warming"] = cache_result
except Exception as e:
startup_results["cache_warming"] = {"status": "error", "error": str(e)}
# Determine overall status
error_count = sum(1 for result in startup_results.values() if result["status"] == "error")
if error_count == 0:
overall_status = "success"
logger.info("πŸŽ‰ Application initialization completed successfully!")
elif error_count < len(startup_results):
overall_status = "partial"
logger.warning("⚠️ Application initialization completed with some issues")
else:
overall_status = "failed"
logger.error("❌ Application initialization failed")
self.startup_completed = True
return {
"overall_status": overall_status,
"results": startup_results,
"timestamp": asyncio.get_event_loop().time()
}
except Exception as e:
logger.error(f"❌ Startup sequence failed: {e}")
return {
"overall_status": "failed",
"error": str(e),
"results": startup_results
}
async def shutdown_sequence(self):
"""Run graceful shutdown sequence"""
logger.info("πŸ›‘ Starting graceful shutdown...")
try:
# Cleanup NLP resources
await advanced_nlp_pipeline.cleanup()
# Close database connections
from app.nosql import close_database_connections
await close_database_connections()
logger.info("βœ… Graceful shutdown completed")
except Exception as e:
logger.error(f"❌ Error during shutdown: {e}")
# Global startup manager
startup_manager = StartupManager()
async def initialize_application() -> Dict[str, Any]:
"""Initialize application with all optimizations"""
return await startup_manager.run_startup_sequence()
async def shutdown_application():
"""Shutdown application gracefully"""
await startup_manager.shutdown_sequence()