Spaces:
Sleeping
Sleeping
| """ | |
| Application startup procedures including database optimization and resource initialization. | |
| """ | |
| import asyncio | |
| import logging | |
| from typing import Dict, Any | |
| from app.database.indexes import ensure_indexes | |
| from app.repositories.cache_repository import cache_manager | |
| from app.services.advanced_nlp import advanced_nlp_pipeline | |
| from app.utils.simple_log_sanitizer import get_simple_sanitized_logger | |
| logger = get_simple_sanitized_logger(__name__) | |
| class StartupManager: | |
| """Manages application startup procedures""" | |
| def __init__(self): | |
| self.startup_tasks = [] | |
| self.startup_completed = False | |
| async def initialize_database_indexes(self) -> Dict[str, Any]: | |
| """Initialize database indexes for optimal performance""" | |
| logger.info("π§ Initializing database indexes...") | |
| try: | |
| result = await ensure_indexes() | |
| # Check if errors are just index conflicts (which are acceptable) | |
| index_conflict_errors = [] | |
| other_errors = [] | |
| for error in result.get("errors", []): | |
| if "Index already exists" in error or "equivalent index already exists" in error.lower(): | |
| index_conflict_errors.append(error) | |
| else: | |
| other_errors.append(error) | |
| if other_errors: | |
| logger.warning(f"Some indexes failed to create with serious errors: {other_errors}") | |
| return {"status": "partial", "result": result, "serious_errors": other_errors} | |
| elif index_conflict_errors: | |
| logger.info(f"β Database indexes initialized (some already existed): {len(index_conflict_errors)} conflicts") | |
| return {"status": "success", "result": result, "index_conflicts": len(index_conflict_errors)} | |
| else: | |
| logger.info("β Database indexes initialized successfully") | |
| return {"status": "success", "result": result} | |
| except Exception as e: | |
| logger.error(f"β Failed to initialize database indexes: {e}") | |
| return {"status": "error", "error": str(e)} | |
| async def warm_cache(self) -> Dict[str, Any]: | |
| """Warm up cache with common queries""" | |
| logger.info("π₯ Warming up cache...") | |
| try: | |
| # Define common cache entries to preload | |
| common_queries = [ | |
| { | |
| "key": "business_categories", | |
| "fetch_func": self._fetch_business_categories, | |
| "expiry": 7200 # 2 hours | |
| }, | |
| { | |
| "key": "live_locations", | |
| "fetch_func": self._fetch_live_locations, | |
| "expiry": 3600 # 1 hour | |
| } | |
| ] | |
| await cache_manager.preload_cache(common_queries) | |
| logger.info("β Cache warming completed") | |
| return {"status": "success", "preloaded": len(common_queries)} | |
| except Exception as e: | |
| logger.error(f"β Cache warming failed: {e}") | |
| return {"status": "error", "error": str(e)} | |
| async def _fetch_business_categories(self): | |
| """Fetch business categories for cache warming""" | |
| # This would typically fetch from database | |
| return {"categories": ["salon", "spa", "fitness", "dental"]} | |
| async def _fetch_live_locations(self): | |
| """Fetch live locations for cache warming""" | |
| # This would typically fetch from database | |
| return {"locations": ["IN-SOUTH", "IN-NORTH", "IN-WEST"]} | |
| async def initialize_nlp_models(self) -> Dict[str, Any]: | |
| """Initialize NLP models and processors""" | |
| logger.info("π§ Initializing NLP models...") | |
| try: | |
| # Pre-load spaCy model | |
| await advanced_nlp_pipeline.async_processor.get_nlp_model() | |
| logger.info("β NLP models initialized successfully") | |
| return {"status": "success"} | |
| except Exception as e: | |
| logger.error(f"β NLP model initialization failed: {e}") | |
| return {"status": "error", "error": str(e)} | |
| async def health_check_dependencies(self) -> Dict[str, Any]: | |
| """Check health of all dependencies""" | |
| logger.info("π₯ Checking dependency health...") | |
| health_status = { | |
| "mongodb": False, | |
| "redis": False, | |
| "nlp": False | |
| } | |
| try: | |
| # Check MongoDB | |
| from app.nosql import check_mongodb_health | |
| health_status["mongodb"] = await check_mongodb_health() | |
| # Check Redis | |
| from app.nosql import check_redis_health | |
| health_status["redis"] = await check_redis_health() | |
| # Check NLP | |
| try: | |
| await advanced_nlp_pipeline.async_processor.get_nlp_model() | |
| health_status["nlp"] = True | |
| except Exception: | |
| health_status["nlp"] = False | |
| all_healthy = all(health_status.values()) | |
| if all_healthy: | |
| logger.info("β All dependencies are healthy") | |
| else: | |
| logger.warning(f"β οΈ Some dependencies are unhealthy: {health_status}") | |
| return { | |
| "status": "healthy" if all_healthy else "degraded", | |
| "dependencies": health_status | |
| } | |
| except Exception as e: | |
| logger.error(f"β Health check failed: {e}") | |
| return {"status": "error", "error": str(e)} | |
| async def run_startup_sequence(self) -> Dict[str, Any]: | |
| """Run complete startup sequence""" | |
| logger.info("π Starting application initialization...") | |
| startup_results = { | |
| "database_indexes": {"status": "pending"}, | |
| "cache_warming": {"status": "pending"}, | |
| "nlp_models": {"status": "pending"}, | |
| "health_check": {"status": "pending"} | |
| } | |
| try: | |
| # Run startup tasks in parallel where possible | |
| tasks = [ | |
| ("database_indexes", self.initialize_database_indexes()), | |
| ("nlp_models", self.initialize_nlp_models()), | |
| ("health_check", self.health_check_dependencies()) | |
| ] | |
| # Execute parallel tasks | |
| for task_name, task_coro in tasks: | |
| try: | |
| result = await task_coro | |
| startup_results[task_name] = result | |
| except Exception as e: | |
| startup_results[task_name] = {"status": "error", "error": str(e)} | |
| # Cache warming depends on database being ready | |
| if startup_results["database_indexes"]["status"] in ["success", "partial"]: | |
| try: | |
| cache_result = await self.warm_cache() | |
| startup_results["cache_warming"] = cache_result | |
| except Exception as e: | |
| startup_results["cache_warming"] = {"status": "error", "error": str(e)} | |
| # Determine overall status | |
| error_count = sum(1 for result in startup_results.values() if result["status"] == "error") | |
| if error_count == 0: | |
| overall_status = "success" | |
| logger.info("π Application initialization completed successfully!") | |
| elif error_count < len(startup_results): | |
| overall_status = "partial" | |
| logger.warning("β οΈ Application initialization completed with some issues") | |
| else: | |
| overall_status = "failed" | |
| logger.error("β Application initialization failed") | |
| self.startup_completed = True | |
| return { | |
| "overall_status": overall_status, | |
| "results": startup_results, | |
| "timestamp": asyncio.get_event_loop().time() | |
| } | |
| except Exception as e: | |
| logger.error(f"β Startup sequence failed: {e}") | |
| return { | |
| "overall_status": "failed", | |
| "error": str(e), | |
| "results": startup_results | |
| } | |
| async def shutdown_sequence(self): | |
| """Run graceful shutdown sequence""" | |
| logger.info("π Starting graceful shutdown...") | |
| try: | |
| # Cleanup NLP resources | |
| await advanced_nlp_pipeline.cleanup() | |
| # Close database connections | |
| from app.nosql import close_database_connections | |
| await close_database_connections() | |
| logger.info("β Graceful shutdown completed") | |
| except Exception as e: | |
| logger.error(f"β Error during shutdown: {e}") | |
| # Global startup manager | |
| startup_manager = StartupManager() | |
| async def initialize_application() -> Dict[str, Any]: | |
| """Initialize application with all optimizations""" | |
| return await startup_manager.run_startup_sequence() | |
| async def shutdown_application(): | |
| """Shutdown application gracefully""" | |
| await startup_manager.shutdown_sequence() |