""" FastAPI Application Entry Point """ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from pathlib import Path import logging # Import routers from app.api.v1 import pages from app.config import settings # Setup logging - import configured logging from app.core.logging import get_logger from app.core.rate_limit import limiter from slowapi import _rate_limit_exceeded_handler from slowapi.errors import RateLimitExceeded logger = get_logger(__name__) # Get static files directory static_dir = Path(__file__).parent / "static" app = FastAPI( title="SwiftOps API", description="Field Service Management Platform", version="1.0.0", docs_url="/api/docs", redoc_url="/api/redoc" ) # Add rate limiter app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) # Startup event @app.on_event("startup") async def startup_event(): """Initialize the application on startup""" from app.core.database import engine, SessionLocal from sqlalchemy import text logger.info("=" * 60) logger.info(f"🚀 {settings.APP_NAME} v1.0.0 | {settings.ENVIRONMENT.upper()}") logger.info("📊 Dashboard: Enabled") logger.info("=" * 60) # Database Health Check logger.info("📦 Database:") try: db = SessionLocal() db.execute(text("SELECT 1")) # Count tables table_count = db.execute(text(""" SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'public' AND table_type = 'BASE TABLE' """)).scalar() # Count active users user_count = db.execute(text( "SELECT COUNT(*) FROM users WHERE deleted_at IS NULL" )).scalar() db.close() logger.info(f" ✓ Connected | {table_count} tables | {user_count} users") except Exception as e: logger.error(f" ✗ Connection failed | {str(e)[:50]}...") # Redis Cache Check (initialize first, before external services) logger.info("💾 Cache & Sessions:") try: from app.services.otp_service import OTPService otp_service = OTPService() # Initialize singleton during startup if otp_service.storage_type == 'redis': # Test operations silently test_key = "health_check_test" otp_service.redis_client.setex(test_key, 5, "test") test_result = otp_service.redis_client.get(test_key) otp_service.redis_client.delete(test_key) if test_result == "test": logger.info(f" ✓ Redis: Connected") else: logger.warning(f" ⚠ Redis: Connected but operations failed") else: logger.warning(f" ○ Redis: Using fallback storage") except Exception as e: logger.warning(f" ⚠ Redis: Connection failed") # External Services Health Check logger.info("🔌 External Services:") try: from app.core.health_checks import run_all_health_checks health_results = run_all_health_checks() # Cloudinary status = "✓" if health_results["cloudinary"]["status"] == "Connected" else "○" logger.info(f" {status} Cloudinary: {health_results['cloudinary']['status']}") # Resend (Email) status = "✓" if health_results["resend"]["status"] == "Configured" else "○" logger.info(f" {status} Resend: {health_results['resend']['status']}") # WASender (WhatsApp) status = "✓" if health_results["wasender"]["status"] == "Connected" else "○" logger.info(f" {status} WASender: {health_results['wasender']['status']}") # Supabase Storage storage = health_results["supabase_storage"] status = "✓" if storage["status"] == "Connected" else "○" bucket_info = f" | {storage.get('buckets', 0)} buckets" if storage["status"] == "Connected" else "" logger.info(f" {status} Supabase: {storage['status']}{bucket_info}") except Exception as e: logger.warning(f" ⚠ External services check failed") logger.info("=" * 60) logger.info("✅ Startup complete | Ready to serve requests") logger.info("=" * 60) @app.on_event("shutdown") async def shutdown_event(): """Cleanup on application shutdown""" logger.info("Shutting down application...") logger.info("Shutdown complete") # Mount static files app.mount("/static", StaticFiles(directory=str(static_dir)), name="static") # CORS Configuration import os allowed_origins = os.getenv("ALLOWED_ORIGINS", "http://localhost:3000,http://localhost:8080,https://swiftops.atomio.tech").split(",") app.add_middleware( CORSMiddleware, allow_origins=allowed_origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Include page routes (HTML templates) app.include_router(pages.router) # Include API routes from app.api.v1.router import api_router app.include_router(api_router, prefix="/api/v1") # Health check endpoint (API) @app.get("/health", tags=["Health"]) async def health_check(): """ Health check endpoint - Returns system health status """ from app.core.database import SessionLocal from sqlalchemy import text health_status = { "status": "healthy", "version": "1.0.0", "service": "SwiftOps API", "environment": settings.ENVIRONMENT, "components": {} } # Check Database try: db = SessionLocal() db.execute(text("SELECT 1")) table_count = db.execute(text(""" SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'public' AND table_type = 'BASE TABLE' """)).scalar() db.close() health_status["components"]["database"] = { "status": "connected", "tables": table_count } except Exception: health_status["components"]["database"] = { "status": "error" } health_status["status"] = "degraded" # Check Redis (uses existing singleton, no re-initialization) try: from app.services.otp_service import OTPService otp_service = OTPService() # Gets existing singleton instance health_status["components"]["redis"] = { "status": "connected" if otp_service.storage_type == 'redis' else "fallback", "storage_type": otp_service.storage_type } except Exception: health_status["components"]["redis"] = { "status": "error" } return health_status