Spaces:
Runtime error
Runtime error
| # -*- coding:UTF-8 -*- | |
| from fastapi import FastAPI, UploadFile, File, HTTPException | |
| from fastapi.responses import Response | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from contextlib import asynccontextmanager | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image | |
| import os | |
| import logging | |
| import requests | |
| from pathlib import Path | |
| import uvicorn | |
| # Initialize FastAPI | |
| app = FastAPI( | |
| title="Face Swap API", | |
| description="API for swapping faces in images.", | |
| docs_url="/docs", | |
| redoc_url="/redoc" | |
| ) | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Add CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Root endpoint | |
| async def root(): | |
| return {"message": "Welcome to the Face Swap API! Use /swap-face/ to swap faces or /docs to test the API."} | |
| # Health check | |
| async def health_check(): | |
| return {"status": "healthy"} | |
| # Global flag for model download | |
| MODEL_DOWNLOADED = False | |
| def download_model(): | |
| global MODEL_DOWNLOADED | |
| model_dir = Path("models") | |
| model_path = model_dir / "inswapper_128.onnx" | |
| model_url = "https://huggingface.co/ezioruan/inswapper_128.onnx/resolve/main/inswapper_128.onnx" | |
| if not model_path.exists(): | |
| logger.info("Downloading inswapper_128.onnx...") | |
| model_dir.mkdir(exist_ok=True) | |
| try: | |
| response = requests.get(model_url, stream=True, timeout=30) | |
| response.raise_for_status() | |
| with open(model_path, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| logger.info("Model downloaded successfully.") | |
| MODEL_DOWNLOADED = True | |
| except Exception as e: | |
| logger.error(f"Failed to download model: {e}") | |
| raise RuntimeError("Could not download inswapper_128.onnx.") | |
| else: | |
| logger.info("Model already exists.") | |
| MODEL_DOWNLOADED = True | |
| async def lifespan(app: FastAPI): | |
| logger.info("Starting application...") | |
| try: | |
| download_model() | |
| logger.info("Application started successfully.") | |
| except Exception as e: | |
| logger.error(f"Startup failed: {e}") | |
| raise | |
| yield | |
| logger.info("Shutting down application...") | |
| app.lifespan = lifespan | |
| def swap_faces(source_img, target_img): | |
| try: | |
| from insightface.app import FaceAnalysis | |
| from insightface.utils import face_align | |
| from insightface.model_zoo import face_swapper | |
| # Initialize face analysis | |
| face_analyzer = FaceAnalysis(name="buffalo_l") | |
| face_analyzer.prepare(ctx_id=-1, det_size=(640, 640)) | |
| # Detect faces | |
| source_faces = face_analyzer.get(source_img) | |
| target_faces = face_analyzer.get(target_img) | |
| if not source_faces or not target_faces: | |
| raise ValueError("No faces detected.") | |
| if len(source_faces) > 1 or len(target_faces) > 1: | |
| raise ValueError("Multiple faces detected; only one per image supported.") | |
| source_face = source_faces[0] | |
| target_face = target_faces[0] | |
| # Load the face swapper model | |
| model_path = Path("models/inswapper_128.onnx") | |
| if not model_path.exists(): | |
| raise FileNotFoundError("Model file inswapper_128.onnx not found.") | |
| swapper = face_swapper.FaceSwapper(str(model_path)) | |
| # Perform face swap | |
| result = swapper.get(target_img, target_face, source_face, paste_back=True) | |
| # Resize result to match target image size | |
| result_pil = Image.fromarray(cv2.cvtColor(result, cv2.COLOR_BGR2RGB)) | |
| target_pil = Image.fromarray(cv2.cvtColor(target_img, cv2.COLOR_BGR2RGB)) | |
| result_pil = result_pil.resize(target_pil.size, Image.Resampling.LANCZOS) | |
| return cv2.cvtColor(np.array(result_pil), cv2.COLOR_RGB2BGR) | |
| except Exception as e: | |
| logger.error(f"Face swap failed: {e}") | |
| raise | |
| async def swap_face(source_file: UploadFile = File(...), target_file: UploadFile = File(...)): | |
| try: | |
| # Read source image | |
| source_content = await source_file.read() | |
| source_np = np.frombuffer(source_content, np.uint8) | |
| source_img = cv2.imdecode(source_np, cv2.IMREAD_COLOR) | |
| if source_img is None: | |
| raise ValueError("Invalid source image.") | |
| # Read target image | |
| target_content = await target_file.read() | |
| target_np = np.frombuffer(target_content, np.uint8) | |
| target_img = cv2.imdecode(target_np, cv2.IMREAD_COLOR) | |
| if target_img is None: | |
| raise ValueError("Invalid target image.") | |
| # Perform face swap | |
| result_img = swap_faces(source_img, target_img) | |
| # Convert result to bytes | |
| _, img_encoded = cv2.imencode(".jpg", result_img) | |
| return Response(content=img_encoded.tobytes(), media_type="image/jpeg") | |
| except Exception as e: | |
| logger.error("Error in swap_face: %s", str(e)) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |