Spaces:
Sleeping
Sleeping
| """ | |
| FastAPI Application - API Gateway with credit management and AI services. | |
| """ | |
| import os | |
| import logging | |
| from contextlib import asynccontextmanager | |
| from fastapi import FastAPI, Request, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| from fastapi.exceptions import RequestValidationError | |
| from core.database import engine, DB_FILENAME | |
| from core.api_response import APIError, error_response, status_to_error_code, ErrorCode | |
| from routers import auth, blink, contact, credits, general, gemini, payments, schema | |
| from services.drive_service import DriveService | |
| from services.db_service import init_database, reset_database | |
| from services.db_service.register_config import register_db_service_config | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Initialize Drive Service | |
| drive_service = DriveService() | |
| async def lifespan(app: FastAPI): | |
| """Application lifespan manager.""" | |
| # Startup Banner | |
| logger.info("β" * 60) | |
| logger.info(" API Gateway v1.0.0 - Starting Up") | |
| logger.info("β" * 60) | |
| # Database Initialization Section | |
| logger.info("") | |
| logger.info("π¦ [DATABASE INITIALIZATION]") | |
| # Register DB Service configuration | |
| register_db_service_config() | |
| logger.info("β DB Service configured") | |
| # Initialize Backup Service | |
| from services.backup_service import initialize_backup_service | |
| local_drive_service = DriveService() | |
| backup_service = initialize_backup_service( | |
| local_drive_service, | |
| min_interval_seconds=30 # Minimum 30s between backups | |
| ) | |
| logger.info("β Backup Service initialized") | |
| # Check for RESET_DB environment variable | |
| if os.getenv("RESET_DB", "").lower() == "true": | |
| logger.warning(f"β οΈ RESET_DB enabled - Clearing local database ({DB_FILENAME})") | |
| if os.path.exists(DB_FILENAME): | |
| os.remove(DB_FILENAME) | |
| logger.info(" Local database deleted") | |
| # Reset database (drop + create all tables) | |
| await reset_database(engine) | |
| logger.info("β Database reset complete") | |
| else: | |
| # Startup: Download DB from Drive ONLY if local file doesn't exist | |
| if not os.path.exists(DB_FILENAME): | |
| logger.info("β¬οΈ Downloading database from Google Drive...") | |
| drive_service.download_db() | |
| else: | |
| logger.info("β Local database found") | |
| # Initialize database (create tables if not exist) | |
| await init_database(engine) | |
| logger.info("β Database initialized") | |
| # Service Registration Section | |
| logger.info("") | |
| logger.info("βοΈ [SERVICE REGISTRATION]") | |
| # Register Auth Service configuration | |
| from services.auth_service import register_auth_service | |
| register_auth_service( | |
| required_urls=[ | |
| "/blink", | |
| "/api/*", # All admin blink API endpoints | |
| "/contact", | |
| "/gemini/*", | |
| "/credits/balance", | |
| "/credits/history", | |
| "/payments/create-order", | |
| "/payments/verify/*", | |
| ], | |
| optional_urls=[ | |
| "/", # Home page works with or without auth | |
| ], | |
| public_urls=[ | |
| "/health", | |
| "/auth/*", | |
| "/payments/packages", # Public pricing info | |
| "/payments/webhook/*", # Webhooks from payment gateway | |
| "/docs", | |
| "/openapi.json", | |
| "/redoc", | |
| ], | |
| jwt_secret=os.getenv("JWT_SECRET"), | |
| jwt_algorithm="HS256", | |
| jwt_expiry_hours=24, | |
| google_client_id=os.getenv("AUTH_SIGN_IN_GOOGLE_CLIENT_ID"), | |
| admin_emails=os.getenv("ADMIN_EMAILS", "").split(",") if os.getenv("ADMIN_EMAILS") else [], | |
| ) | |
| logger.info("β Auth Service configured") | |
| # Register Credit Service configuration | |
| from services.credit_service import CreditServiceConfig | |
| CreditServiceConfig.register( | |
| route_configs={ | |
| # Synchronous operations - credits confirmed/refunded immediately | |
| "/gemini/generate-animation-prompt": { | |
| "cost": 1, | |
| "type": "sync" | |
| }, | |
| "/gemini/edit-image": { | |
| "cost": 1, | |
| "type": "sync" | |
| }, | |
| "/gemini/generate-text": { | |
| "cost": 1, | |
| "type": "sync" | |
| }, | |
| "/gemini/analyze-image": { | |
| "cost": 1, | |
| "type": "sync" | |
| }, | |
| # Asynchronous operations - credits reserved until job completes | |
| "/gemini/generate-video": { | |
| "cost": 10, | |
| "type": "async" | |
| }, | |
| "/gemini/job/{job_id}": { | |
| "cost": 0, # No additional cost for status checks | |
| "type": "async" | |
| } | |
| } | |
| ) | |
| logger.info("β Credit Service configured") | |
| # Register Audit Service configuration | |
| from services.audit_service import AuditServiceConfig | |
| AuditServiceConfig.register( | |
| excluded_paths=[ | |
| "/health", | |
| "/docs", | |
| "/openapi.json", | |
| "/redoc" | |
| ], | |
| log_all_requests=True, | |
| log_response_bodies=False # Privacy: don't log response bodies | |
| ) | |
| logger.info("β Audit Service configured") | |
| # Register API Key Service configuration | |
| from services.gemini_service import APIKeyServiceConfig | |
| APIKeyServiceConfig.register( | |
| rotation_strategy="least_used", # or "round_robin" | |
| cooldown_seconds=60, # Wait 1 min after quota error | |
| max_requests_per_minute=60, | |
| retry_on_quota_error=True # Auto-retry with different key | |
| ) | |
| logger.info("β API Key Service configured") | |
| # Worker Pool Section | |
| logger.info("") | |
| logger.info("π· [WORKER POOL]") | |
| # Start background job worker | |
| from services.gemini_service import start_worker, stop_worker | |
| await start_worker() | |
| logger.info("β Worker pool started") | |
| # Log CORS configuration | |
| allowed_origins = os.getenv("CORS_ORIGINS").split(",") | |
| logger.info("") | |
| logger.info("π [NETWORK CONFIGURATION]") | |
| logger.info(f"β CORS origins: {', '.join(allowed_origins)}") | |
| # Startup Complete Summary | |
| logger.info("") | |
| logger.info("β" * 60) | |
| logger.info(" π API Gateway Ready") | |
| logger.info(" β’ Database: β Ready") | |
| logger.info(" β’ Services: 5 initialized (DB, Auth, Credit, Audit, API Key)") | |
| logger.info(" β’ Workers: 15 active") | |
| logger.info(" β’ Endpoint: http://0.0.0.0:8000") | |
| logger.info("β" * 60) | |
| logger.info("") | |
| yield | |
| # Stop background job worker | |
| await stop_worker() | |
| logger.info("Background job worker stopped") | |
| # Shutdown: Upload DB to Drive | |
| logger.info("Shutdown: Uploading database to Google Drive...") | |
| from services.backup_service import get_backup_service | |
| backup_service = get_backup_service() | |
| await backup_service.backup_async(force=True) # Force backup on shutdown | |
| logger.info("Shutting down...") | |
| # Create FastAPI application | |
| app = FastAPI( | |
| title="APIGateway", | |
| description="API for receiving and processing encrypted user data", | |
| version="1.0.0", | |
| lifespan=lifespan | |
| ) | |
| # Middleware order matters! They execute in reverse order (bottom to top) | |
| # Request flow: CORS β Auth β APIKey β Audit β Credit β Router | |
| # So we add them in REVERSE order (last added = first to run on REQUEST) | |
| from services.credit_service import CreditMiddleware | |
| app.add_middleware(CreditMiddleware) | |
| from services.audit_service import AuditMiddleware | |
| app.add_middleware(AuditMiddleware) | |
| from services.gemini_service import APIKeyMiddleware | |
| app.add_middleware(APIKeyMiddleware) | |
| from services.auth_service import AuthMiddleware | |
| app.add_middleware(AuthMiddleware) | |
| # CORS middleware MUST be added last to ensure error responses also have CORS headers | |
| allowed_origins = os.getenv("CORS_ORIGINS").split(",") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=allowed_origins, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| app.include_router(general.router) | |
| app.include_router(auth.router) | |
| app.include_router(blink.router) | |
| app.include_router(gemini.router) | |
| app.include_router(credits.router) | |
| app.include_router(payments.router) | |
| app.include_router(contact.router) | |
| app.include_router(schema.router) | |
| async def api_error_handler(request: Request, exc: APIError): | |
| """Handle custom APIError exceptions with standardized format.""" | |
| logger.warning(f"API Error: {exc.code} - {exc.message}") | |
| return JSONResponse( | |
| status_code=exc.status_code, | |
| content=error_response(exc.code, exc.message, exc.details) | |
| ) | |
| async def http_exception_handler(request: Request, exc: HTTPException): | |
| """Convert HTTPException to standardized error format.""" | |
| code = status_to_error_code(exc.status_code) | |
| return JSONResponse( | |
| status_code=exc.status_code, | |
| content=error_response(code, str(exc.detail)) | |
| ) | |
| async def validation_exception_handler(request: Request, exc: RequestValidationError): | |
| """Handle Pydantic validation errors with detailed field info.""" | |
| errors = [] | |
| for error in exc.errors(): | |
| errors.append({ | |
| "field": ".".join(str(loc) for loc in error["loc"]), | |
| "message": error["msg"], | |
| "type": error["type"] | |
| }) | |
| return JSONResponse( | |
| status_code=422, | |
| content=error_response( | |
| ErrorCode.VALIDATION_ERROR, | |
| "Request validation failed", | |
| {"errors": errors} | |
| ) | |
| ) | |
| async def global_exception_handler(request: Request, exc: Exception): | |
| """Global exception handler for unexpected errors.""" | |
| logger.error(f"Unhandled exception: {exc}", exc_info=True) | |
| return JSONResponse( | |
| status_code=500, | |
| content=error_response( | |
| ErrorCode.SERVER_ERROR, | |
| "An unexpected error occurred. Please try again later." | |
| ) | |
| ) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run( | |
| "app:app", | |
| host="0.0.0.0", | |
| port=8000, | |
| reload=True, | |
| log_level="info" | |
| ) | |