Spaces:
Runtime error
Runtime error
File size: 4,799 Bytes
456bcc4 0ed24f7 411c7f2 456bcc4 0ed24f7 456bcc4 0ed24f7 279af2f 0ed24f7 a73b3bc 279af2f c7d6b3a b8c2f1b e9cafa4 b8c2f1b 279af2f e9cafa4 456bcc4 e9cafa4 279af2f e9cafa4 279af2f e9cafa4 249a867 e9cafa4 249a867 b8c2f1b e9cafa4 a73b3bc 456bcc4 e9cafa4 a73b3bc e9cafa4 411c7f2 c7d6b3a a73b3bc e9cafa4 a73b3bc 411c7f2 456bcc4 e9cafa4 a73b3bc c7d6b3a e9cafa4 a73b3bc 0ed24f7 a73b3bc e9cafa4 0ed24f7 a73b3bc 0ed24f7 a73b3bc c7d6b3a a73b3bc e9cafa4 0ed24f7 e9cafa4 0ed24f7 e9cafa4 a73b3bc 279af2f c7d6b3a 279af2f e9cafa4 0ed24f7 e9cafa4 0ed24f7 279af2f 456bcc4 a73b3bc c7d6b3a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# -*- coding:UTF-8 -*-
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.responses import Response
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
import cv2
import numpy as np
from PIL import Image
import os
import logging
import requests
from pathlib import Path
import uvicorn
# Initialize FastAPI
app = FastAPI(
title="Face Swap API",
description="API for swapping faces in images.",
docs_url="/docs",
redoc_url="/redoc",
)
# Logging setup
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# CORS setup
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Update with your domain in production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Health check route
@app.get("/")
async def root():
return {"message": "Face Swap API is running. Use /docs to test the API."}
@app.get("/health")
async def health_check():
return {"status": "healthy"}
# Prevent multiple downloads
MODEL_PATH = Path("models/inswapper_128.onnx")
MODEL_URL = "https://huggingface.co/ezioruan/inswapper_128.onnx/resolve/main/inswapper_128.onnx"
def download_model():
if MODEL_PATH.exists():
logger.info("Model already exists, skipping download.")
return
logger.info("Downloading model...")
MODEL_PATH.parent.mkdir(exist_ok=True)
try:
response = requests.get(MODEL_URL, stream=True, timeout=30)
response.raise_for_status()
with open(MODEL_PATH, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
logger.info("Model downloaded successfully.")
except Exception as e:
logger.error(f"Failed to download model: {e}")
raise RuntimeError("Could not download inswapper_128.onnx.")
# FastAPI startup event
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("Starting application...")
try:
download_model()
logger.info("Startup completed successfully.")
except Exception as e:
logger.error(f"Startup failed: {e}")
raise
yield
logger.info("Shutting down application...")
app.lifespan = lifespan
# Face detection and swap functions
def get_faces(image):
try:
from insightface.app import FaceAnalysis
app = FaceAnalysis(name="buffalo_l")
app.prepare(ctx_id=0, det_size=(640, 640))
return app.get(image) or []
except Exception as e:
logger.error(f"Face detection failed: {e}")
raise
def swap_faces(source_img, target_img):
try:
from insightface.utils import face_align
from insightface.model_zoo import face_swapper
face_analyzer = FaceAnalysis(name="buffalo_l")
face_analyzer.prepare(ctx_id=0, det_size=(640, 640))
source_faces = face_analyzer.get(source_img)
target_faces = face_analyzer.get(target_img)
if not source_faces or not target_faces:
raise ValueError("No faces detected.")
if len(source_faces) > 1 or len(target_faces) > 1:
raise ValueError("Multiple faces detected. Only one face per image is supported.")
swapper = face_swapper.FaceSwapper(MODEL_PATH)
result = swapper.get(target_img, target_faces[0], source_faces[0], paste_back=True)
return cv2.cvtColor(np.array(Image.fromarray(cv2.cvtColor(result, cv2.COLOR_BGR2RGB))), cv2.COLOR_RGB2BGR)
except Exception as e:
logger.error(f"Face swap failed: {e}")
raise
@app.post("/swap-face/")
async def swap_face(source_file: UploadFile = File(...), target_file: UploadFile = File(...)):
try:
source_path = "temp_source.jpg"
target_path = "temp_target.jpg"
output_path = "output.jpg"
with open(source_path, "wb") as f:
f.write(await source_file.read())
with open(target_path, "wb") as f:
f.write(await target_file.read())
source_img = cv2.imread(source_path)
target_img = cv2.imread(target_path)
if source_img is None or target_img is None:
raise ValueError("Invalid images provided.")
result_img = swap_faces(source_img, target_img)
cv2.imwrite(output_path, result_img)
with open(output_path, "rb") as f:
image_data = f.read()
for path in [source_path, target_path, output_path]:
if os.path.exists(path):
os.remove(path)
return Response(content=image_data, media_type="image/jpeg")
except Exception as e:
logger.error("Error in swap_face: %s", str(e))
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)
|