from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks, Depends, Security, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from pydantic import BaseModel from typing import Optional, List import os import uuid import asyncio from datetime import datetime import motor.motor_asyncio from bson import ObjectId import json import shutil from pathlib import Path from fastapi.responses import FileResponse, StreamingResponse, JSONResponse import logging from logging.handlers import BufferingHandler from firebase_app_check import verify_app_check_token, verify_firebase_id_token from huggingface_hub import login as hf_login # Import face swap functionality import sys sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import DeepFakeAI.globals as DF_G from DeepFakeAI import utilities as DF_U from DeepFakeAI.processors.frame.modules import face_swapper as DF_FS app = FastAPI(title="Face Swap Video API", version="1.0.0") # Authentication API_PASSWORD = os.getenv("API_PASSWORD", "logicgo_videoswap@153") security = HTTPBearer() # Authenticate Hugging Face Hub for model downloads (private/rate-limited) _hf_token = os.getenv("HUGGINGFACE_HUB_TOKEN") or os.getenv("HF_TOKEN") if _hf_token: try: hf_login(token=_hf_token) # nosec - token provided via env except Exception: pass # Also expose legacy env names some utils expect os.environ.setdefault("HF_TOKEN", _hf_token) os.environ.setdefault("TOKEN", _hf_token) # Ensure sane threads to avoid libgomp warnings os.environ.setdefault("OMP_NUM_THREADS", "1") def verify_api_key(credentials: HTTPAuthorizationCredentials = Security(security)): """Verify API key from Bearer token. Supports two authentication methods: 1. Static password (API_PASSWORD) 2. Firebase ID token (from Firebase Auth) """ token = credentials.credentials # Try static password first if token == API_PASSWORD: return {"auth_mode": "static", "token": token} # Try Firebase ID token firebase_claims = verify_firebase_id_token(token) if firebase_claims: return {"auth_mode": "firebase", "claims": firebase_claims} # If neither works, raise error raise HTTPException( status_code=401, detail="Invalid authentication credentials. Use static password or Firebase ID token.", headers={"WWW-Authenticate": "Bearer"}, ) # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Firebase App Check configuration APP_CHECK_ENABLED = os.getenv("APP_CHECK_ENABLED", "false").lower() == "true" def verify_app_check(request: Request): """Dependency to enforce Firebase App Check when enabled. Looks for token in headers: 'X-Firebase-AppCheck' or 'X-Firebase-AppCheck-Token'. """ if not APP_CHECK_ENABLED: return True token = request.headers.get("X-Firebase-AppCheck") or request.headers.get("X-Firebase-AppCheck-Token") if not token or not verify_app_check_token(token): raise HTTPException(status_code=401, detail="Invalid or missing Firebase App Check token") return True # MongoDB connection MONGODB_URL = os.getenv("MONGODB_URL", "mongodb+srv://itishalogicgo_db_user:HR837xi0B9yh2vZK@cluster0.jeeytpz.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0") DATABASE_NAME = "face_swap_video" try: client = motor.motor_asyncio.AsyncIOMotorClient(MONGODB_URL) db = client[DATABASE_NAME] # Collections source_images_collection = db["source_images"] target_videos_collection = db["target_videos"] result_videos_collection = db["result_videos"] jobs_collection = db["processing_jobs"] api_logs_collection = db["api_logs"] # For logging API requests except Exception as e: print(f"Warning: MongoDB connection failed at startup: {e}") print("App will continue but MongoDB operations will fail") client = None db = None source_images_collection = None target_videos_collection = None result_videos_collection = None jobs_collection = None api_logs_collection = None # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) async def log_to_mongodb(level: str, message: str, endpoint: str = None, method: str = None, status_code: int = None, user_ip: str = None, error: str = None): """Log events to MongoDB""" if api_logs_collection is None: return try: log_entry = { "timestamp": datetime.utcnow(), "level": level, "message": message, "endpoint": endpoint, "method": method, "status_code": status_code, "user_ip": user_ip, "error": error } await api_logs_collection.insert_one(log_entry) except Exception as e: logger.error(f"Failed to log to MongoDB: {e}") # Middleware to log API requests @app.middleware("http") async def log_requests(request: Request, call_next): """Middleware to log all API requests to MongoDB""" start_time = datetime.utcnow() user_ip = request.client.host if request.client else None try: response = await call_next(request) process_time = (datetime.utcnow() - start_time).total_seconds() # Log successful request await log_to_mongodb( level="INFO", message=f"{request.method} {request.url.path} - {response.status_code}", endpoint=str(request.url.path), method=request.method, status_code=response.status_code, user_ip=user_ip ) return response except Exception as e: # Log error await log_to_mongodb( level="ERROR", message=f"Error processing {request.method} {request.url.path}", endpoint=str(request.url.path), method=request.method, user_ip=user_ip, error=str(e) ) raise # Upload directories UPLOAD_DIR = Path("uploads") SOURCE_IMAGES_DIR = UPLOAD_DIR / "source_images" TARGET_VIDEOS_DIR = UPLOAD_DIR / "target_videos" RESULT_VIDEOS_DIR = UPLOAD_DIR / "result_videos" # Create directories for dir_path in [UPLOAD_DIR, SOURCE_IMAGES_DIR, TARGET_VIDEOS_DIR, RESULT_VIDEOS_DIR]: dir_path.mkdir(parents=True, exist_ok=True) def _run_local_faceswap(source_image_path: str, target_video_path: str) -> Optional[str]: # Configure defaults for local pipeline DF_G.source_path = source_image_path DF_G.target_path = target_video_path DF_G.output_video_encoder = 'libx264' DF_G.output_video_quality = 20 DF_G.temp_frame_format = 'png' DF_G.temp_frame_quality = 95 DF_G.keep_temp = False DF_G.skip_audio = False # Face processing options DF_G.face_recognition = ['many'] DF_G.reference_frame_number = 0 DF_G.execution_thread_count = 2 DF_G.execution_queue_count = 2 # Prefer CUDA (GPU) if available; fallback to CPU try: DF_G.execution_providers = DF_U.decode_execution_providers(['cuda', 'cpu']) except: DF_G.execution_providers = DF_U.decode_execution_providers(['cpu']) # Fix invalid OMP thread settings try: import os as _os _os.environ["OMP_NUM_THREADS"] = "1" except: pass # Ensure model exists model_dir = DF_U.resolve_relative_path('../.assets/models') os.makedirs(model_dir, exist_ok=True) model_path = os.path.join(model_dir, 'inswapper_128.onnx') if not os.path.exists(model_path): from huggingface_hub import hf_hub_download token = os.environ.get('TOKEN') or os.environ.get('HF_TOKEN') for repo_id in ['zihaomu/inswapper_128.onnx', 'linyi/inswapper_128.onnx', 'banodoco/inswapper_128.onnx']: try: model_path = hf_hub_download(repo_id=repo_id, filename='inswapper_128.onnx', token=token) break except: continue if os.path.exists(model_path): os.environ['INSWAPPER_PATH'] = model_path DF_FS.pre_check() # Extract frames fps = DF_U.detect_fps(target_video_path) or 12.0 DF_U.create_temp(target_video_path) ok = DF_U.extract_frames(target_video_path, fps) if not ok: return None temp_frames = DF_U.get_temp_frame_paths(target_video_path) if not temp_frames: return None # Process frames DF_FS.process_video(source_image_path, temp_frames) # Rebuild video and restore audio if not DF_U.create_video(target_video_path, fps): return None out_path = DF_U.normalize_output_path(source_image_path, target_video_path, str(RESULT_VIDEOS_DIR / f"out_{uuid.uuid4().hex}.mp4")) DF_U.restore_audio(target_video_path, out_path) DF_U.clear_temp(target_video_path) return out_path # Pydantic models class SourceImageResponse(BaseModel): id: str filename: str file_path: str uploaded_at: datetime status: str class TargetVideoResponse(BaseModel): id: str filename: str file_path: str uploaded_at: datetime status: str class ResultVideoResponse(BaseModel): id: str source_image_id: str target_video_id: str result_file_path: str created_at: datetime status: str processing_time: Optional[float] = None class FaceSwapRequest(BaseModel): source_image_id: str target_video_id: str class JobStatus(BaseModel): job_id: str status: str progress: Optional[float] = None result_video_id: Optional[str] = None result_video_url: Optional[str] = None # HTTPS download URL error: Optional[str] = None # Base URL for generating download links BASE_URL = os.getenv("BASE_URL", "https://logicgoinfotechspaces-face-swap-video.hf.space") def get_result_video_url(result_video_id: str) -> str: """Generate HTTPS download URL for result video""" return f"{BASE_URL}/api/result-video/{result_video_id}" # Helper functions def save_file_to_disk(file: UploadFile, directory: Path) -> str: """Save uploaded file to disk and return the file path""" file_extension = Path(file.filename).suffix unique_filename = f"{uuid.uuid4().hex}{file_extension}" file_path = directory / unique_filename with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) return str(file_path) async def process_face_swap(job_id: str, source_image_path: str, target_video_path: str): """Background task to process face swap""" try: # Update job status to processing await jobs_collection.update_one( {"job_id": job_id}, {"$set": {"status": "processing", "progress": 0.0}} ) # Run face swap result_path = _run_local_faceswap(source_image_path, target_video_path) if result_path and os.path.exists(result_path): # Save result to MongoDB result_doc = { "source_image_path": source_image_path, "target_video_path": target_video_path, "result_file_path": result_path, "created_at": datetime.utcnow(), "status": "completed", "job_id": job_id } result = await result_videos_collection.insert_one(result_doc) result_video_id = str(result.inserted_id) # Update job status to completed await jobs_collection.update_one( {"job_id": job_id}, {"$set": { "status": "completed", "progress": 100.0, "result_video_id": result_video_id, "result_video_url": get_result_video_url(result_video_id) }} ) else: # Update job status to failed await jobs_collection.update_one( {"job_id": job_id}, {"$set": { "status": "failed", "error": "Face swap processing failed" }} ) except Exception as e: # Update job status to failed await jobs_collection.update_one( {"job_id": job_id}, {"$set": { "status": "failed", "error": str(e) }} ) # API Endpoints @app.post("/api/source-image", response_model=SourceImageResponse) async def upload_source_image( file: UploadFile = File(...), api_key: dict = Depends(verify_api_key), _app_check_ok: bool = Depends(verify_app_check) ): """Upload and store source image in MongoDB""" if not file.content_type.startswith('image/'): raise HTTPException(status_code=400, detail="File must be an image") try: # Save file to disk file_path = save_file_to_disk(file, SOURCE_IMAGES_DIR) # Store metadata in MongoDB doc = { "filename": file.filename, "file_path": file_path, "uploaded_at": datetime.utcnow(), "status": "uploaded", "content_type": file.content_type, "file_size": os.path.getsize(file_path) } result = await source_images_collection.insert_one(doc) return SourceImageResponse( id=str(result.inserted_id), filename=file.filename, file_path=file_path, uploaded_at=doc["uploaded_at"], status=doc["status"] ) except Exception as e: raise HTTPException(status_code=500, detail=f"Error uploading source image: {str(e)}") @app.post("/api/target-video", response_model=TargetVideoResponse) async def upload_target_video( file: UploadFile = File(...), api_key: dict = Depends(verify_api_key), _app_check_ok: bool = Depends(verify_app_check) ): """Upload and store target video in MongoDB""" if not file.content_type.startswith('video/'): raise HTTPException(status_code=400, detail="File must be a video") try: # Save file to disk file_path = save_file_to_disk(file, TARGET_VIDEOS_DIR) # Store metadata in MongoDB doc = { "filename": file.filename, "file_path": file_path, "uploaded_at": datetime.utcnow(), "status": "uploaded", "content_type": file.content_type, "file_size": os.path.getsize(file_path) } result = await target_videos_collection.insert_one(doc) return TargetVideoResponse( id=str(result.inserted_id), filename=file.filename, file_path=file_path, uploaded_at=doc["uploaded_at"], status=doc["status"] ) except Exception as e: raise HTTPException(status_code=500, detail=f"Error uploading target video: {str(e)}") @app.post("/api/face-swap", response_model=JobStatus) async def start_face_swap( request: FaceSwapRequest, background_tasks: BackgroundTasks, api_key: dict = Depends(verify_api_key), _app_check_ok: bool = Depends(verify_app_check) ): """Start face swap processing""" try: # Get source image and target video from MongoDB source_image = await source_images_collection.find_one({"_id": ObjectId(request.source_image_id)}) target_video = await target_videos_collection.find_one({"_id": ObjectId(request.target_video_id)}) if not source_image: raise HTTPException(status_code=404, detail="Source image not found") if not target_video: raise HTTPException(status_code=404, detail="Target video not found") # Create job record job_id = str(uuid.uuid4()) job_doc = { "job_id": job_id, "source_image_id": request.source_image_id, "target_video_id": request.target_video_id, "status": "queued", "created_at": datetime.utcnow(), "progress": 0.0 } await jobs_collection.insert_one(job_doc) # Start background processing background_tasks.add_task( process_face_swap, job_id, source_image["file_path"], target_video["file_path"] ) return JobStatus( job_id=job_id, status="queued", progress=0.0 ) except Exception as e: raise HTTPException(status_code=500, detail=f"Error starting face swap: {str(e)}") @app.get("/api/job/{job_id}", response_model=JobStatus) async def get_job_status(job_id: str, api_key: dict = Depends(verify_api_key), _app_check_ok: bool = Depends(verify_app_check)): """Get job status""" job = await jobs_collection.find_one({"job_id": job_id}) if not job: raise HTTPException(status_code=404, detail="Job not found") result_video_url = None if job.get("result_video_id"): result_video_url = get_result_video_url(job["result_video_id"]) return JobStatus( job_id=job["job_id"], status=job["status"], progress=job.get("progress"), result_video_id=job.get("result_video_id"), result_video_url=result_video_url, error=job.get("error") ) @app.get("/api/result-video/{result_video_id}") async def get_result_video(result_video_id: str, api_key: dict = Depends(verify_api_key), _app_check_ok: bool = Depends(verify_app_check)): """Get result video file""" result = await result_videos_collection.find_one({"_id": ObjectId(result_video_id)}) if not result: raise HTTPException(status_code=404, detail="Result video not found") if not os.path.exists(result["result_file_path"]): raise HTTPException(status_code=404, detail="Result video file not found") return FileResponse( path=result["result_file_path"], media_type="video/mp4", filename=f"face_swap_result_{result_video_id}.mp4" ) @app.get("/api/source-images", response_model=List[SourceImageResponse]) async def list_source_images(api_key: dict = Depends(verify_api_key), _app_check_ok: bool = Depends(verify_app_check)): """List all source images""" cursor = source_images_collection.find().sort("uploaded_at", -1) images = [] async for doc in cursor: images.append(SourceImageResponse( id=str(doc["_id"]), filename=doc["filename"], file_path=doc["file_path"], uploaded_at=doc["uploaded_at"], status=doc["status"] )) return images @app.get("/api/target-videos", response_model=List[TargetVideoResponse]) async def list_target_videos(api_key: dict = Depends(verify_api_key), _app_check_ok: bool = Depends(verify_app_check)): """List all target videos""" cursor = target_videos_collection.find().sort("uploaded_at", -1) videos = [] async for doc in cursor: videos.append(TargetVideoResponse( id=str(doc["_id"]), filename=doc["filename"], file_path=doc["file_path"], uploaded_at=doc["uploaded_at"], status=doc["status"] )) return videos @app.get("/api/result-videos", response_model=List[ResultVideoResponse]) async def list_result_videos(api_key: dict = Depends(verify_api_key), _app_check_ok: bool = Depends(verify_app_check)): """List all result videos""" cursor = result_videos_collection.find().sort("created_at", -1) results = [] async for doc in cursor: results.append(ResultVideoResponse( id=str(doc["_id"]), source_image_id=doc.get("source_image_path", ""), target_video_id=doc.get("target_video_path", ""), result_file_path=doc["result_file_path"], created_at=doc["created_at"], status=doc["status"], processing_time=doc.get("processing_time") )) return results @app.get("/api/health") async def api_health(api_key: dict = Depends(verify_api_key), _app_check_ok: bool = Depends(verify_app_check)): """Health check endpoint with GPU status (requires authentication)""" import onnxruntime available_providers = onnxruntime.get_available_providers() gpu_available = 'CUDAExecutionProvider' in available_providers return { "status": "ok", "time": datetime.utcnow().isoformat(), "gpu_available": gpu_available, "execution_providers": available_providers } @app.get("/") async def root(): """Root endpoint - shows API is running""" return { "message": "Face Swap Video API is running", "version": "1.0.0", "docs": "/docs", "health": "/api/health" } # Test endpoint to verify API is accessible (no auth required for testing) @app.get("/api/test") async def test_endpoint(): """Test endpoint to verify API is running (public endpoint)""" return { "status": "ok", "message": "API is accessible", "authentication": "Bearer token required for all endpoints except /api/test", "endpoints": { "upload_source": "POST /api/source-image", "upload_target": "POST /api/target-video", "face_swap": "POST /api/face-swap", "health": "GET /api/health" } } # API Logs endpoint @app.get("/api/logs") async def get_api_logs( limit: int = 100, level: Optional[str] = None, endpoint: Optional[str] = None, api_key: dict = Depends(verify_api_key) ): """Get API logs from MongoDB""" if api_logs_collection is None: raise HTTPException(status_code=503, detail="MongoDB not available") try: query = {} if level: query["level"] = level.upper() if endpoint: query["endpoint"] = endpoint cursor = api_logs_collection.find(query).sort("timestamp", -1).limit(limit) logs = [] async for doc in cursor: logs.append({ "timestamp": doc["timestamp"].isoformat() if doc.get("timestamp") else None, "level": doc.get("level"), "message": doc.get("message"), "endpoint": doc.get("endpoint"), "method": doc.get("method"), "status_code": doc.get("status_code"), "user_ip": doc.get("user_ip"), "error": doc.get("error") }) return { "total": len(logs), "logs": logs } except Exception as e: raise HTTPException(status_code=500, detail=f"Error fetching logs: {str(e)}") # User info endpoint @app.get("/api/me") async def get_current_user(api_key: dict = Depends(verify_api_key)): """Get current authenticated user info""" auth_mode = api_key.get("auth_mode") if auth_mode == "firebase": claims = api_key.get("claims", {}) return { "auth_mode": "firebase", "user_id": claims.get("uid"), "email": claims.get("email"), "email_verified": claims.get("email_verified", False), "name": claims.get("name"), "picture": claims.get("picture"), "firebase": { "sign_in_provider": claims.get("firebase", {}).get("sign_in_provider"), "iss": claims.get("iss"), "aud": claims.get("aud"), "auth_time": claims.get("auth_time"), "exp": claims.get("exp") } } else: return { "auth_mode": "static", "message": "Using static API password authentication" } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)