Spaces:
Sleeping
Sleeping
| """ | |
| FastAPI Application Entry Point | |
| """ | |
| from fastapi import FastAPI | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.staticfiles import StaticFiles | |
| from pathlib import Path | |
| import logging | |
| # Import routers | |
| from app.api.v1 import pages | |
| from app.config import settings | |
| # Setup logging - import configured logging | |
| from app.core.logging import get_logger | |
| from app.core.rate_limit import limiter | |
| from slowapi import _rate_limit_exceeded_handler | |
| from slowapi.errors import RateLimitExceeded | |
| logger = get_logger(__name__) | |
| # Get static files directory | |
| static_dir = Path(__file__).parent / "static" | |
| app = FastAPI( | |
| title="SwiftOps API", | |
| description="Field Service Management Platform", | |
| version="1.0.0", | |
| docs_url="/api/docs", | |
| redoc_url="/api/redoc" | |
| ) | |
| # Add rate limiter | |
| app.state.limiter = limiter | |
| app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) | |
| # Startup event | |
| async def startup_event(): | |
| """Initialize the application on startup""" | |
| from app.core.database import engine, SessionLocal | |
| from sqlalchemy import text | |
| logger.info("=" * 60) | |
| logger.info(f"π {settings.APP_NAME} v1.0.0 | {settings.ENVIRONMENT.upper()}") | |
| logger.info("π Dashboard: Enabled") | |
| logger.info("=" * 60) | |
| # Database Health Check | |
| logger.info("π¦ Database:") | |
| try: | |
| db = SessionLocal() | |
| db.execute(text("SELECT 1")) | |
| # Count tables | |
| table_count = db.execute(text(""" | |
| SELECT COUNT(*) FROM information_schema.tables | |
| WHERE table_schema = 'public' AND table_type = 'BASE TABLE' | |
| """)).scalar() | |
| # Count active users | |
| user_count = db.execute(text( | |
| "SELECT COUNT(*) FROM users WHERE deleted_at IS NULL" | |
| )).scalar() | |
| db.close() | |
| logger.info(f" β Connected | {table_count} tables | {user_count} users") | |
| except Exception as e: | |
| logger.error(f" β Connection failed | {str(e)[:50]}...") | |
| # Redis Cache Check (initialize first, before external services) | |
| logger.info("πΎ Cache & Sessions:") | |
| try: | |
| from app.services.otp_service import OTPService | |
| otp_service = OTPService() # Initialize singleton during startup | |
| if otp_service.storage_type == 'redis': | |
| # Test operations silently | |
| test_key = "health_check_test" | |
| otp_service.redis_client.setex(test_key, 5, "test") | |
| test_result = otp_service.redis_client.get(test_key) | |
| otp_service.redis_client.delete(test_key) | |
| if test_result == "test": | |
| logger.info(f" β Redis: Connected") | |
| else: | |
| logger.warning(f" β Redis: Connected but operations failed") | |
| else: | |
| logger.warning(f" β Redis: Using fallback storage") | |
| except Exception as e: | |
| logger.warning(f" β Redis: Connection failed") | |
| # External Services Health Check | |
| logger.info("π External Services:") | |
| try: | |
| from app.core.health_checks import run_all_health_checks | |
| health_results = run_all_health_checks() | |
| # Cloudinary | |
| status = "β" if health_results["cloudinary"]["status"] == "Connected" else "β" | |
| logger.info(f" {status} Cloudinary: {health_results['cloudinary']['status']}") | |
| # Resend (Email) | |
| status = "β" if health_results["resend"]["status"] == "Configured" else "β" | |
| logger.info(f" {status} Resend: {health_results['resend']['status']}") | |
| # WASender (WhatsApp) | |
| status = "β" if health_results["wasender"]["status"] == "Connected" else "β" | |
| logger.info(f" {status} WASender: {health_results['wasender']['status']}") | |
| # Supabase Storage | |
| storage = health_results["supabase_storage"] | |
| status = "β" if storage["status"] == "Connected" else "β" | |
| bucket_info = f" | {storage.get('buckets', 0)} buckets" if storage["status"] == "Connected" else "" | |
| logger.info(f" {status} Supabase: {storage['status']}{bucket_info}") | |
| except Exception as e: | |
| logger.warning(f" β External services check failed") | |
| logger.info("=" * 60) | |
| logger.info("β Startup complete | Ready to serve requests") | |
| logger.info("=" * 60) | |
| async def shutdown_event(): | |
| """Cleanup on application shutdown""" | |
| logger.info("Shutting down application...") | |
| logger.info("Shutdown complete") | |
| # Mount static files | |
| app.mount("/static", StaticFiles(directory=str(static_dir)), name="static") | |
| # CORS Configuration | |
| import os | |
| allowed_origins = os.getenv("ALLOWED_ORIGINS", "http://localhost:3000,http://localhost:8080,https://swiftops.atomio.tech").split(",") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=allowed_origins, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Include page routes (HTML templates) | |
| app.include_router(pages.router) | |
| # Include API routes | |
| from app.api.v1.router import api_router | |
| app.include_router(api_router, prefix="/api/v1") | |
| # Health check endpoint (API) | |
| async def health_check(): | |
| """ | |
| Health check endpoint - Returns system health status | |
| """ | |
| from app.core.database import SessionLocal | |
| from sqlalchemy import text | |
| health_status = { | |
| "status": "healthy", | |
| "version": "1.0.0", | |
| "service": "SwiftOps API", | |
| "environment": settings.ENVIRONMENT, | |
| "components": {} | |
| } | |
| # Check Database | |
| try: | |
| db = SessionLocal() | |
| db.execute(text("SELECT 1")) | |
| table_count = db.execute(text(""" | |
| SELECT COUNT(*) FROM information_schema.tables | |
| WHERE table_schema = 'public' AND table_type = 'BASE TABLE' | |
| """)).scalar() | |
| db.close() | |
| health_status["components"]["database"] = { | |
| "status": "connected", | |
| "tables": table_count | |
| } | |
| except Exception: | |
| health_status["components"]["database"] = { | |
| "status": "error" | |
| } | |
| health_status["status"] = "degraded" | |
| # Check Redis (uses existing singleton, no re-initialization) | |
| try: | |
| from app.services.otp_service import OTPService | |
| otp_service = OTPService() # Gets existing singleton instance | |
| health_status["components"]["redis"] = { | |
| "status": "connected" if otp_service.storage_type == 'redis' else "fallback", | |
| "storage_type": otp_service.storage_type | |
| } | |
| except Exception: | |
| health_status["components"]["redis"] = { | |
| "status": "error" | |
| } | |
| return health_status | |