# -*- coding:UTF-8 -*- from fastapi import FastAPI, UploadFile, File, HTTPException from fastapi.responses import Response from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager import cv2 import numpy as np from PIL import Image import os import logging import requests from pathlib import Path import uvicorn # Initialize FastAPI app = FastAPI( title="Face Swap API", description="API for swapping faces in images.", docs_url="/docs", redoc_url="/redoc" ) # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Root endpoint @app.get("/") async def root(): return {"message": "Welcome to the Face Swap API! Use /swap-face/ to swap faces or /docs to test the API."} # Health check @app.get("/health") async def health_check(): return {"status": "healthy"} # Global flag for model download MODEL_DOWNLOADED = False def download_model(): global MODEL_DOWNLOADED model_dir = Path("models") model_path = model_dir / "inswapper_128.onnx" model_url = "https://huggingface.co/ezioruan/inswapper_128.onnx/resolve/main/inswapper_128.onnx" if not model_path.exists(): logger.info("Downloading inswapper_128.onnx...") model_dir.mkdir(exist_ok=True) try: response = requests.get(model_url, stream=True, timeout=30) response.raise_for_status() with open(model_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) logger.info("Model downloaded successfully.") MODEL_DOWNLOADED = True except Exception as e: logger.error(f"Failed to download model: {e}") raise RuntimeError("Could not download inswapper_128.onnx.") else: logger.info("Model already exists.") MODEL_DOWNLOADED = True @asynccontextmanager async def lifespan(app: FastAPI): logger.info("Starting application...") try: download_model() logger.info("Application started successfully.") except Exception as e: logger.error(f"Startup failed: {e}") raise yield logger.info("Shutting down application...") app.lifespan = lifespan def swap_faces(source_img, target_img): try: from insightface.app import FaceAnalysis from insightface.utils import face_align from insightface.model_zoo import face_swapper # Initialize face analysis face_analyzer = FaceAnalysis(name="buffalo_l") face_analyzer.prepare(ctx_id=-1, det_size=(640, 640)) # Detect faces source_faces = face_analyzer.get(source_img) target_faces = face_analyzer.get(target_img) if not source_faces or not target_faces: raise ValueError("No faces detected.") if len(source_faces) > 1 or len(target_faces) > 1: raise ValueError("Multiple faces detected; only one per image supported.") source_face = source_faces[0] target_face = target_faces[0] # Load the face swapper model model_path = Path("models/inswapper_128.onnx") if not model_path.exists(): raise FileNotFoundError("Model file inswapper_128.onnx not found.") swapper = face_swapper.FaceSwapper(str(model_path)) # Perform face swap result = swapper.get(target_img, target_face, source_face, paste_back=True) # Resize result to match target image size result_pil = Image.fromarray(cv2.cvtColor(result, cv2.COLOR_BGR2RGB)) target_pil = Image.fromarray(cv2.cvtColor(target_img, cv2.COLOR_BGR2RGB)) result_pil = result_pil.resize(target_pil.size, Image.Resampling.LANCZOS) return cv2.cvtColor(np.array(result_pil), cv2.COLOR_RGB2BGR) except Exception as e: logger.error(f"Face swap failed: {e}") raise @app.post("/swap-face/") async def swap_face(source_file: UploadFile = File(...), target_file: UploadFile = File(...)): try: # Read source image source_content = await source_file.read() source_np = np.frombuffer(source_content, np.uint8) source_img = cv2.imdecode(source_np, cv2.IMREAD_COLOR) if source_img is None: raise ValueError("Invalid source image.") # Read target image target_content = await target_file.read() target_np = np.frombuffer(target_content, np.uint8) target_img = cv2.imdecode(target_np, cv2.IMREAD_COLOR) if target_img is None: raise ValueError("Invalid target image.") # Perform face swap result_img = swap_faces(source_img, target_img) # Convert result to bytes _, img_encoded = cv2.imencode(".jpg", result_img) return Response(content=img_encoded.tobytes(), media_type="image/jpeg") except Exception as e: logger.error("Error in swap_face: %s", str(e)) raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)