from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import Optional, List import os import uuid import asyncio from datetime import datetime import motor.motor_asyncio from bson import ObjectId import json import shutil from pathlib import Path from fastapi.responses import FileResponse, StreamingResponse, JSONResponse # Import face swap functionality import sys sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import DeepFakeAI.globals as DF_G from DeepFakeAI import utilities as DF_U from DeepFakeAI.processors.frame.modules import face_swapper as DF_FS app = FastAPI(title="Face Swap Video API", version="1.0.0") # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # MongoDB connection MONGODB_URL = os.getenv("MONGODB_URL", "mongodb+srv://itishalogicgo_db_user:HR837xi0B9yh2vZK@cluster0.jeeytpz.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0") DATABASE_NAME = "face_swap_video" client = motor.motor_asyncio.AsyncIOMotorClient(MONGODB_URL) db = client[DATABASE_NAME] # Collections source_images_collection = db["source_images"] target_videos_collection = db["target_videos"] result_videos_collection = db["result_videos"] jobs_collection = db["processing_jobs"] # Upload directories UPLOAD_DIR = Path("uploads") SOURCE_IMAGES_DIR = UPLOAD_DIR / "source_images" TARGET_VIDEOS_DIR = UPLOAD_DIR / "target_videos" RESULT_VIDEOS_DIR = UPLOAD_DIR / "result_videos" # Create directories for dir_path in [UPLOAD_DIR, SOURCE_IMAGES_DIR, TARGET_VIDEOS_DIR, RESULT_VIDEOS_DIR]: dir_path.mkdir(parents=True, exist_ok=True) def _run_local_faceswap(source_image_path: str, target_video_path: str) -> Optional[str]: # Configure defaults for local pipeline DF_G.source_path = source_image_path DF_G.target_path = target_video_path DF_G.output_video_encoder = 'libx264' DF_G.output_video_quality = 20 DF_G.temp_frame_format = 'png' DF_G.temp_frame_quality = 95 DF_G.keep_temp = False DF_G.skip_audio = False # Face processing options DF_G.face_recognition = ['many'] DF_G.reference_frame_number = 0 DF_G.execution_thread_count = 2 DF_G.execution_queue_count = 2 # Prefer CUDA (GPU) if available; fallback to CPU try: DF_G.execution_providers = DF_U.decode_execution_providers(['cuda', 'cpu']) except: DF_G.execution_providers = DF_U.decode_execution_providers(['cpu']) # Fix invalid OMP thread settings try: import os as _os _os.environ["OMP_NUM_THREADS"] = "1" except: pass # Ensure model exists model_dir = DF_U.resolve_relative_path('../.assets/models') os.makedirs(model_dir, exist_ok=True) model_path = os.path.join(model_dir, 'inswapper_128.onnx') if not os.path.exists(model_path): from huggingface_hub import hf_hub_download token = os.environ.get('TOKEN') or os.environ.get('HF_TOKEN') for repo_id in ['zihaomu/inswapper_128.onnx', 'linyi/inswapper_128.onnx', 'banodoco/inswapper_128.onnx']: try: model_path = hf_hub_download(repo_id=repo_id, filename='inswapper_128.onnx', token=token) break except: continue if os.path.exists(model_path): os.environ['INSWAPPER_PATH'] = model_path DF_FS.pre_check() # Extract frames fps = DF_U.detect_fps(target_video_path) or 12.0 DF_U.create_temp(target_video_path) ok = DF_U.extract_frames(target_video_path, fps) if not ok: return None temp_frames = DF_U.get_temp_frame_paths(target_video_path) if not temp_frames: return None # Process frames DF_FS.process_video(source_image_path, temp_frames) # Rebuild video and restore audio if not DF_U.create_video(target_video_path, fps): return None out_path = DF_U.normalize_output_path(source_image_path, target_video_path, str(RESULT_VIDEOS_DIR / f"out_{uuid.uuid4().hex}.mp4")) DF_U.restore_audio(target_video_path, out_path) DF_U.clear_temp(target_video_path) return out_path # Pydantic models class SourceImageResponse(BaseModel): id: str filename: str file_path: str uploaded_at: datetime status: str class TargetVideoResponse(BaseModel): id: str filename: str file_path: str uploaded_at: datetime status: str class ResultVideoResponse(BaseModel): id: str source_image_id: str target_video_id: str result_file_path: str created_at: datetime status: str processing_time: Optional[float] = None class FaceSwapRequest(BaseModel): source_image_id: str target_video_id: str class JobStatus(BaseModel): job_id: str status: str progress: Optional[float] = None result_video_id: Optional[str] = None result_video_url: Optional[str] = None # HTTPS download URL error: Optional[str] = None # Base URL for generating download links BASE_URL = os.getenv("BASE_URL", "https://logicgoinfotechspaces-face-swap-video.hf.space") def get_result_video_url(result_video_id: str) -> str: """Generate HTTPS download URL for result video""" return f"{BASE_URL}/api/result-video/{result_video_id}" # Helper functions def save_file_to_disk(file: UploadFile, directory: Path) -> str: """Save uploaded file to disk and return the file path""" file_extension = Path(file.filename).suffix unique_filename = f"{uuid.uuid4().hex}{file_extension}" file_path = directory / unique_filename with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) return str(file_path) async def process_face_swap(job_id: str, source_image_path: str, target_video_path: str): """Background task to process face swap""" try: # Update job status to processing await jobs_collection.update_one( {"job_id": job_id}, {"$set": {"status": "processing", "progress": 0.0}} ) # Run face swap result_path = _run_local_faceswap(source_image_path, target_video_path) if result_path and os.path.exists(result_path): # Save result to MongoDB result_doc = { "source_image_path": source_image_path, "target_video_path": target_video_path, "result_file_path": result_path, "created_at": datetime.utcnow(), "status": "completed", "job_id": job_id } result = await result_videos_collection.insert_one(result_doc) result_video_id = str(result.inserted_id) # Update job status to completed await jobs_collection.update_one( {"job_id": job_id}, {"$set": { "status": "completed", "progress": 100.0, "result_video_id": result_video_id, "result_video_url": get_result_video_url(result_video_id) }} ) else: # Update job status to failed await jobs_collection.update_one( {"job_id": job_id}, {"$set": { "status": "failed", "error": "Face swap processing failed" }} ) except Exception as e: # Update job status to failed await jobs_collection.update_one( {"job_id": job_id}, {"$set": { "status": "failed", "error": str(e) }} ) # API Endpoints @app.post("/api/source-image", response_model=SourceImageResponse) async def upload_source_image(file: UploadFile = File(...)): """Upload and store source image in MongoDB""" if not file.content_type.startswith('image/'): raise HTTPException(status_code=400, detail="File must be an image") try: # Save file to disk file_path = save_file_to_disk(file, SOURCE_IMAGES_DIR) # Store metadata in MongoDB doc = { "filename": file.filename, "file_path": file_path, "uploaded_at": datetime.utcnow(), "status": "uploaded", "content_type": file.content_type, "file_size": os.path.getsize(file_path) } result = await source_images_collection.insert_one(doc) return SourceImageResponse( id=str(result.inserted_id), filename=file.filename, file_path=file_path, uploaded_at=doc["uploaded_at"], status=doc["status"] ) except Exception as e: raise HTTPException(status_code=500, detail=f"Error uploading source image: {str(e)}") @app.post("/api/target-video", response_model=TargetVideoResponse) async def upload_target_video(file: UploadFile = File(...)): """Upload and store target video in MongoDB""" if not file.content_type.startswith('video/'): raise HTTPException(status_code=400, detail="File must be a video") try: # Save file to disk file_path = save_file_to_disk(file, TARGET_VIDEOS_DIR) # Store metadata in MongoDB doc = { "filename": file.filename, "file_path": file_path, "uploaded_at": datetime.utcnow(), "status": "uploaded", "content_type": file.content_type, "file_size": os.path.getsize(file_path) } result = await target_videos_collection.insert_one(doc) return TargetVideoResponse( id=str(result.inserted_id), filename=file.filename, file_path=file_path, uploaded_at=doc["uploaded_at"], status=doc["status"] ) except Exception as e: raise HTTPException(status_code=500, detail=f"Error uploading target video: {str(e)}") @app.post("/api/face-swap", response_model=JobStatus) async def start_face_swap(request: FaceSwapRequest, background_tasks: BackgroundTasks): """Start face swap processing""" try: # Get source image and target video from MongoDB source_image = await source_images_collection.find_one({"_id": ObjectId(request.source_image_id)}) target_video = await target_videos_collection.find_one({"_id": ObjectId(request.target_video_id)}) if not source_image: raise HTTPException(status_code=404, detail="Source image not found") if not target_video: raise HTTPException(status_code=404, detail="Target video not found") # Create job record job_id = str(uuid.uuid4()) job_doc = { "job_id": job_id, "source_image_id": request.source_image_id, "target_video_id": request.target_video_id, "status": "queued", "created_at": datetime.utcnow(), "progress": 0.0 } await jobs_collection.insert_one(job_doc) # Start background processing background_tasks.add_task( process_face_swap, job_id, source_image["file_path"], target_video["file_path"] ) return JobStatus( job_id=job_id, status="queued", progress=0.0 ) except Exception as e: raise HTTPException(status_code=500, detail=f"Error starting face swap: {str(e)}") @app.get("/api/job/{job_id}", response_model=JobStatus) async def get_job_status(job_id: str): """Get job status""" job = await jobs_collection.find_one({"job_id": job_id}) if not job: raise HTTPException(status_code=404, detail="Job not found") result_video_url = None if job.get("result_video_id"): result_video_url = get_result_video_url(job["result_video_id"]) return JobStatus( job_id=job["job_id"], status=job["status"], progress=job.get("progress"), result_video_id=job.get("result_video_id"), result_video_url=result_video_url, error=job.get("error") ) @app.get("/api/result-video/{result_video_id}") async def get_result_video(result_video_id: str): """Get result video file""" result = await result_videos_collection.find_one({"_id": ObjectId(result_video_id)}) if not result: raise HTTPException(status_code=404, detail="Result video not found") if not os.path.exists(result["result_file_path"]): raise HTTPException(status_code=404, detail="Result video file not found") return FileResponse( path=result["result_file_path"], media_type="video/mp4", filename=f"face_swap_result_{result_video_id}.mp4" ) @app.get("/api/source-images", response_model=List[SourceImageResponse]) async def list_source_images(): """List all source images""" cursor = source_images_collection.find().sort("uploaded_at", -1) images = [] async for doc in cursor: images.append(SourceImageResponse( id=str(doc["_id"]), filename=doc["filename"], file_path=doc["file_path"], uploaded_at=doc["uploaded_at"], status=doc["status"] )) return images @app.get("/api/target-videos", response_model=List[TargetVideoResponse]) async def list_target_videos(): """List all target videos""" cursor = target_videos_collection.find().sort("uploaded_at", -1) videos = [] async for doc in cursor: videos.append(TargetVideoResponse( id=str(doc["_id"]), filename=doc["filename"], file_path=doc["file_path"], uploaded_at=doc["uploaded_at"], status=doc["status"] )) return videos @app.get("/api/result-videos", response_model=List[ResultVideoResponse]) async def list_result_videos(): """List all result videos""" cursor = result_videos_collection.find().sort("created_at", -1) results = [] async for doc in cursor: results.append(ResultVideoResponse( id=str(doc["_id"]), source_image_id=doc.get("source_image_path", ""), target_video_id=doc.get("target_video_path", ""), result_file_path=doc["result_file_path"], created_at=doc["created_at"], status=doc["status"], processing_time=doc.get("processing_time") )) return results @app.get("/api/health") async def api_health(): return {"status": "ok", "time": datetime.utcnow().isoformat()} @app.get("/") async def root(): """Health check endpoint""" return {"message": "Face Swap Video API is running", "version": "1.0.0"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)