# -*- coding: UTF-8 -*- from fastapi import FastAPI, UploadFile, File, HTTPException from fastapi.responses import Response from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager import cv2 import numpy as np from PIL import Image import os import requests from pathlib import Path import uvicorn import logging # Initialize FastAPI with explicit docs settings app = FastAPI( title="Face Swap API", description="API for swapping faces in images.", docs_url="/docs", # Explicitly set docs URL redoc_url="/redoc", # Explicitly set redoc URL ) # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], # Update with your Framer domain in production allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Health Check Endpoint @app.get("/health") async def health_check(): return {"status": "healthy"} # Global flag to prevent multiple downloads MODEL_DOWNLOADED = False def download_model(): global MODEL_DOWNLOADED if MODEL_DOWNLOADED: logger.info("Model already downloaded, skipping.") return model_dir = Path("models") model_path = model_dir / "inswapper_128.onnx" model_url = "https://huggingface.co/ezioruan/inswapper_128.onnx/resolve/main/inswapper_128.onnx" if not model_path.exists(): logger.info("Model not found. Downloading inswapper_128.onnx...") model_dir.mkdir(exist_ok=True) try: response = requests.get(model_url, stream=True, timeout=30) response.raise_for_status() with open(model_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) logger.info("Model downloaded successfully.") MODEL_DOWNLOADED = True except Exception as e: logger.error(f"Failed to download model: {e}") raise RuntimeError("Could not download inswapper_128.onnx. Please check logs.") else: logger.info("Model already exists at: %s", model_path) MODEL_DOWNLOADED = True # Use lifespan event handler @asynccontextmanager async def lifespan(app: FastAPI): logger.info("Starting up application...") try: download_model() logger.info("Startup completed successfully.") except Exception as e: logger.error(f"Startup failed: {e}") raise yield logger.info("Shutting down application...") app.lifespan = lifespan def swap_faces(source_img, target_img): """Perform face swapping using insightface and inswapper model.""" try: from insightface.app import FaceAnalysis from insightface.utils import face_align from insightface.model_zoo import face_swapper face_analyzer = FaceAnalysis(name="buffalo_l") face_analyzer.prepare(ctx_id=0, det_size=(640, 640)) source_faces = face_analyzer.get(source_img) target_faces = face_analyzer.get(target_img) if not source_faces or not target_faces: raise ValueError("No faces detected in one or both images.") if len(source_faces) > 1 or len(target_faces) > 1: raise ValueError("Multiple faces detected; only one face per image is supported.") source_face = source_faces[0] target_face = target_faces[0] model_path = Path("models/inswapper_128.onnx") if not model_path.exists(): raise FileNotFoundError("Model file inswapper_128.onnx not found.") swapper = face_swapper.FaceSwapper(model_path) result = swapper.get(target_img, target_face, source_face, paste_back=True) target_pil = Image.fromarray(cv2.cvtColor(target_img, cv2.COLOR_BGR2RGB)) result_pil = Image.fromarray(cv2.cvtColor(result, cv2.COLOR_BGR2RGB)) result_pil = result_pil.resize(target_pil.size, Image.Resampling.LANCZOS) return cv2.cvtColor(np.array(result_pil), cv2.COLOR_RGB2BGR) except Exception as e: logger.error(f"Face swap failed: {e}") raise @app.post("/swap-face/") async def swap_face(source_file: UploadFile = File(...), target_file: UploadFile = File(...)): try: source_path = "temp_source.jpg" target_path = "temp_target.jpg" output_path = "output.jpg" source_content = await source_file.read() with open(source_path, "wb") as f: f.write(source_content) source_img = cv2.imread(source_path) if source_img is None: raise ValueError("Failed to load source image.") target_content = await target_file.read() with open(target_path, "wb") as f: f.write(target_content) target_img = cv2.imread(target_path) if target_img is None: raise ValueError("Failed to load target image.") result_img = swap_faces(source_img, target_img) cv2.imwrite(output_path, result_img) with open(output_path, "rb") as f: image_data = f.read() for path in [source_path, target_path, output_path]: if os.path.exists(path): os.remove(path) return Response(content=image_data, media_type="image/jpeg") except Exception as e: logger.error("Error in swap_face: %s", str(e)) raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)