fix: update Dockerfile for Hugging Face Spaces - use port 7860 and follow HF Spaces best practices
7658264
| from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from typing import Optional, List | |
| import os | |
| import uuid | |
| import asyncio | |
| from datetime import datetime | |
| import motor.motor_asyncio | |
| from bson import ObjectId | |
| import json | |
| import shutil | |
| from pathlib import Path | |
| from fastapi.responses import FileResponse, StreamingResponse, JSONResponse | |
| # Import face swap functionality | |
| import sys | |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| import DeepFakeAI.globals as DF_G | |
| from DeepFakeAI import utilities as DF_U | |
| from DeepFakeAI.processors.frame.modules import face_swapper as DF_FS | |
| app = FastAPI(title="Face Swap Video API", version="1.0.0") | |
| # CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # MongoDB connection | |
| MONGODB_URL = os.getenv("MONGODB_URL", "mongodb+srv://itishalogicgo_db_user:HR837xi0B9yh2vZK@cluster0.jeeytpz.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0") | |
| DATABASE_NAME = "face_swap_video" | |
| client = motor.motor_asyncio.AsyncIOMotorClient(MONGODB_URL) | |
| db = client[DATABASE_NAME] | |
| # Collections | |
| source_images_collection = db["source_images"] | |
| target_videos_collection = db["target_videos"] | |
| result_videos_collection = db["result_videos"] | |
| jobs_collection = db["processing_jobs"] | |
| # Upload directories | |
| UPLOAD_DIR = Path("uploads") | |
| SOURCE_IMAGES_DIR = UPLOAD_DIR / "source_images" | |
| TARGET_VIDEOS_DIR = UPLOAD_DIR / "target_videos" | |
| RESULT_VIDEOS_DIR = UPLOAD_DIR / "result_videos" | |
| # Create directories | |
| for dir_path in [UPLOAD_DIR, SOURCE_IMAGES_DIR, TARGET_VIDEOS_DIR, RESULT_VIDEOS_DIR]: | |
| dir_path.mkdir(parents=True, exist_ok=True) | |
| def _run_local_faceswap(source_image_path: str, target_video_path: str) -> Optional[str]: | |
| # Configure defaults for local pipeline | |
| DF_G.source_path = source_image_path | |
| DF_G.target_path = target_video_path | |
| DF_G.output_video_encoder = 'libx264' | |
| DF_G.output_video_quality = 20 | |
| DF_G.temp_frame_format = 'png' | |
| DF_G.temp_frame_quality = 95 | |
| DF_G.keep_temp = False | |
| DF_G.skip_audio = False | |
| # Face processing options | |
| DF_G.face_recognition = ['many'] | |
| DF_G.reference_frame_number = 0 | |
| DF_G.execution_thread_count = 2 | |
| DF_G.execution_queue_count = 2 | |
| # Prefer CUDA (GPU) if available; fallback to CPU | |
| try: | |
| DF_G.execution_providers = DF_U.decode_execution_providers(['cuda', 'cpu']) | |
| except: | |
| DF_G.execution_providers = DF_U.decode_execution_providers(['cpu']) | |
| # Fix invalid OMP thread settings | |
| try: | |
| import os as _os | |
| _os.environ["OMP_NUM_THREADS"] = "1" | |
| except: | |
| pass | |
| # Ensure model exists | |
| model_dir = DF_U.resolve_relative_path('../.assets/models') | |
| os.makedirs(model_dir, exist_ok=True) | |
| model_path = os.path.join(model_dir, 'inswapper_128.onnx') | |
| if not os.path.exists(model_path): | |
| from huggingface_hub import hf_hub_download | |
| token = os.environ.get('TOKEN') or os.environ.get('HF_TOKEN') | |
| for repo_id in ['zihaomu/inswapper_128.onnx', 'linyi/inswapper_128.onnx', 'banodoco/inswapper_128.onnx']: | |
| try: | |
| model_path = hf_hub_download(repo_id=repo_id, filename='inswapper_128.onnx', token=token) | |
| break | |
| except: | |
| continue | |
| if os.path.exists(model_path): | |
| os.environ['INSWAPPER_PATH'] = model_path | |
| DF_FS.pre_check() | |
| # Extract frames | |
| fps = DF_U.detect_fps(target_video_path) or 12.0 | |
| DF_U.create_temp(target_video_path) | |
| ok = DF_U.extract_frames(target_video_path, fps) | |
| if not ok: | |
| return None | |
| temp_frames = DF_U.get_temp_frame_paths(target_video_path) | |
| if not temp_frames: | |
| return None | |
| # Process frames | |
| DF_FS.process_video(source_image_path, temp_frames) | |
| # Rebuild video and restore audio | |
| if not DF_U.create_video(target_video_path, fps): | |
| return None | |
| out_path = DF_U.normalize_output_path(source_image_path, target_video_path, str(RESULT_VIDEOS_DIR / f"out_{uuid.uuid4().hex}.mp4")) | |
| DF_U.restore_audio(target_video_path, out_path) | |
| DF_U.clear_temp(target_video_path) | |
| return out_path | |
| # Pydantic models | |
| class SourceImageResponse(BaseModel): | |
| id: str | |
| filename: str | |
| file_path: str | |
| uploaded_at: datetime | |
| status: str | |
| class TargetVideoResponse(BaseModel): | |
| id: str | |
| filename: str | |
| file_path: str | |
| uploaded_at: datetime | |
| status: str | |
| class ResultVideoResponse(BaseModel): | |
| id: str | |
| source_image_id: str | |
| target_video_id: str | |
| result_file_path: str | |
| created_at: datetime | |
| status: str | |
| processing_time: Optional[float] = None | |
| class FaceSwapRequest(BaseModel): | |
| source_image_id: str | |
| target_video_id: str | |
| class JobStatus(BaseModel): | |
| job_id: str | |
| status: str | |
| progress: Optional[float] = None | |
| result_video_id: Optional[str] = None | |
| result_video_url: Optional[str] = None # HTTPS download URL | |
| error: Optional[str] = None | |
| # Base URL for generating download links | |
| BASE_URL = os.getenv("BASE_URL", "https://logicgoinfotechspaces-face-swap-video.hf.space") | |
| def get_result_video_url(result_video_id: str) -> str: | |
| """Generate HTTPS download URL for result video""" | |
| return f"{BASE_URL}/api/result-video/{result_video_id}" | |
| # Helper functions | |
| def save_file_to_disk(file: UploadFile, directory: Path) -> str: | |
| """Save uploaded file to disk and return the file path""" | |
| file_extension = Path(file.filename).suffix | |
| unique_filename = f"{uuid.uuid4().hex}{file_extension}" | |
| file_path = directory / unique_filename | |
| with open(file_path, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| return str(file_path) | |
| async def process_face_swap(job_id: str, source_image_path: str, target_video_path: str): | |
| """Background task to process face swap""" | |
| try: | |
| # Update job status to processing | |
| await jobs_collection.update_one( | |
| {"job_id": job_id}, | |
| {"$set": {"status": "processing", "progress": 0.0}} | |
| ) | |
| # Run face swap | |
| result_path = _run_local_faceswap(source_image_path, target_video_path) | |
| if result_path and os.path.exists(result_path): | |
| # Save result to MongoDB | |
| result_doc = { | |
| "source_image_path": source_image_path, | |
| "target_video_path": target_video_path, | |
| "result_file_path": result_path, | |
| "created_at": datetime.utcnow(), | |
| "status": "completed", | |
| "job_id": job_id | |
| } | |
| result = await result_videos_collection.insert_one(result_doc) | |
| result_video_id = str(result.inserted_id) | |
| # Update job status to completed | |
| await jobs_collection.update_one( | |
| {"job_id": job_id}, | |
| {"$set": { | |
| "status": "completed", | |
| "progress": 100.0, | |
| "result_video_id": result_video_id, | |
| "result_video_url": get_result_video_url(result_video_id) | |
| }} | |
| ) | |
| else: | |
| # Update job status to failed | |
| await jobs_collection.update_one( | |
| {"job_id": job_id}, | |
| {"$set": { | |
| "status": "failed", | |
| "error": "Face swap processing failed" | |
| }} | |
| ) | |
| except Exception as e: | |
| # Update job status to failed | |
| await jobs_collection.update_one( | |
| {"job_id": job_id}, | |
| {"$set": { | |
| "status": "failed", | |
| "error": str(e) | |
| }} | |
| ) | |
| # API Endpoints | |
| async def upload_source_image(file: UploadFile = File(...)): | |
| """Upload and store source image in MongoDB""" | |
| if not file.content_type.startswith('image/'): | |
| raise HTTPException(status_code=400, detail="File must be an image") | |
| try: | |
| # Save file to disk | |
| file_path = save_file_to_disk(file, SOURCE_IMAGES_DIR) | |
| # Store metadata in MongoDB | |
| doc = { | |
| "filename": file.filename, | |
| "file_path": file_path, | |
| "uploaded_at": datetime.utcnow(), | |
| "status": "uploaded", | |
| "content_type": file.content_type, | |
| "file_size": os.path.getsize(file_path) | |
| } | |
| result = await source_images_collection.insert_one(doc) | |
| return SourceImageResponse( | |
| id=str(result.inserted_id), | |
| filename=file.filename, | |
| file_path=file_path, | |
| uploaded_at=doc["uploaded_at"], | |
| status=doc["status"] | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error uploading source image: {str(e)}") | |
| async def upload_target_video(file: UploadFile = File(...)): | |
| """Upload and store target video in MongoDB""" | |
| if not file.content_type.startswith('video/'): | |
| raise HTTPException(status_code=400, detail="File must be a video") | |
| try: | |
| # Save file to disk | |
| file_path = save_file_to_disk(file, TARGET_VIDEOS_DIR) | |
| # Store metadata in MongoDB | |
| doc = { | |
| "filename": file.filename, | |
| "file_path": file_path, | |
| "uploaded_at": datetime.utcnow(), | |
| "status": "uploaded", | |
| "content_type": file.content_type, | |
| "file_size": os.path.getsize(file_path) | |
| } | |
| result = await target_videos_collection.insert_one(doc) | |
| return TargetVideoResponse( | |
| id=str(result.inserted_id), | |
| filename=file.filename, | |
| file_path=file_path, | |
| uploaded_at=doc["uploaded_at"], | |
| status=doc["status"] | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error uploading target video: {str(e)}") | |
| async def start_face_swap(request: FaceSwapRequest, background_tasks: BackgroundTasks): | |
| """Start face swap processing""" | |
| try: | |
| # Get source image and target video from MongoDB | |
| source_image = await source_images_collection.find_one({"_id": ObjectId(request.source_image_id)}) | |
| target_video = await target_videos_collection.find_one({"_id": ObjectId(request.target_video_id)}) | |
| if not source_image: | |
| raise HTTPException(status_code=404, detail="Source image not found") | |
| if not target_video: | |
| raise HTTPException(status_code=404, detail="Target video not found") | |
| # Create job record | |
| job_id = str(uuid.uuid4()) | |
| job_doc = { | |
| "job_id": job_id, | |
| "source_image_id": request.source_image_id, | |
| "target_video_id": request.target_video_id, | |
| "status": "queued", | |
| "created_at": datetime.utcnow(), | |
| "progress": 0.0 | |
| } | |
| await jobs_collection.insert_one(job_doc) | |
| # Start background processing | |
| background_tasks.add_task( | |
| process_face_swap, | |
| job_id, | |
| source_image["file_path"], | |
| target_video["file_path"] | |
| ) | |
| return JobStatus( | |
| job_id=job_id, | |
| status="queued", | |
| progress=0.0 | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error starting face swap: {str(e)}") | |
| async def get_job_status(job_id: str): | |
| """Get job status""" | |
| job = await jobs_collection.find_one({"job_id": job_id}) | |
| if not job: | |
| raise HTTPException(status_code=404, detail="Job not found") | |
| result_video_url = None | |
| if job.get("result_video_id"): | |
| result_video_url = get_result_video_url(job["result_video_id"]) | |
| return JobStatus( | |
| job_id=job["job_id"], | |
| status=job["status"], | |
| progress=job.get("progress"), | |
| result_video_id=job.get("result_video_id"), | |
| result_video_url=result_video_url, | |
| error=job.get("error") | |
| ) | |
| async def get_result_video(result_video_id: str): | |
| """Get result video file""" | |
| result = await result_videos_collection.find_one({"_id": ObjectId(result_video_id)}) | |
| if not result: | |
| raise HTTPException(status_code=404, detail="Result video not found") | |
| if not os.path.exists(result["result_file_path"]): | |
| raise HTTPException(status_code=404, detail="Result video file not found") | |
| return FileResponse( | |
| path=result["result_file_path"], | |
| media_type="video/mp4", | |
| filename=f"face_swap_result_{result_video_id}.mp4" | |
| ) | |
| async def list_source_images(): | |
| """List all source images""" | |
| cursor = source_images_collection.find().sort("uploaded_at", -1) | |
| images = [] | |
| async for doc in cursor: | |
| images.append(SourceImageResponse( | |
| id=str(doc["_id"]), | |
| filename=doc["filename"], | |
| file_path=doc["file_path"], | |
| uploaded_at=doc["uploaded_at"], | |
| status=doc["status"] | |
| )) | |
| return images | |
| async def list_target_videos(): | |
| """List all target videos""" | |
| cursor = target_videos_collection.find().sort("uploaded_at", -1) | |
| videos = [] | |
| async for doc in cursor: | |
| videos.append(TargetVideoResponse( | |
| id=str(doc["_id"]), | |
| filename=doc["filename"], | |
| file_path=doc["file_path"], | |
| uploaded_at=doc["uploaded_at"], | |
| status=doc["status"] | |
| )) | |
| return videos | |
| async def list_result_videos(): | |
| """List all result videos""" | |
| cursor = result_videos_collection.find().sort("created_at", -1) | |
| results = [] | |
| async for doc in cursor: | |
| results.append(ResultVideoResponse( | |
| id=str(doc["_id"]), | |
| source_image_id=doc.get("source_image_path", ""), | |
| target_video_id=doc.get("target_video_path", ""), | |
| result_file_path=doc["result_file_path"], | |
| created_at=doc["created_at"], | |
| status=doc["status"], | |
| processing_time=doc.get("processing_time") | |
| )) | |
| return results | |
| async def api_health(): | |
| return {"status": "ok", "time": datetime.utcnow().isoformat()} | |
| async def root(): | |
| """Health check endpoint""" | |
| return {"message": "Face Swap Video API is running", "version": "1.0.0"} | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |