File size: 4,799 Bytes
456bcc4
0ed24f7
 
 
411c7f2
456bcc4
0ed24f7
456bcc4
0ed24f7
279af2f
0ed24f7
 
a73b3bc
279af2f
c7d6b3a
b8c2f1b
 
 
e9cafa4
 
b8c2f1b
279af2f
e9cafa4
456bcc4
 
 
e9cafa4
279af2f
 
e9cafa4
279af2f
 
 
 
 
e9cafa4
249a867
 
e9cafa4
249a867
b8c2f1b
 
 
 
e9cafa4
 
 
a73b3bc
456bcc4
e9cafa4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a73b3bc
e9cafa4
411c7f2
 
c7d6b3a
a73b3bc
 
e9cafa4
a73b3bc
 
 
411c7f2
 
 
 
456bcc4
e9cafa4
 
a73b3bc
c7d6b3a
e9cafa4
 
 
 
 
 
 
 
 
a73b3bc
 
0ed24f7
a73b3bc
e9cafa4
0ed24f7
a73b3bc
 
0ed24f7
a73b3bc
c7d6b3a
a73b3bc
e9cafa4
0ed24f7
e9cafa4
 
0ed24f7
e9cafa4
a73b3bc
 
 
279af2f
 
c7d6b3a
279af2f
e9cafa4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0ed24f7
 
e9cafa4
 
 
 
 
 
 
 
 
0ed24f7
279af2f
456bcc4
a73b3bc
 
 
c7d6b3a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# -*- coding:UTF-8 -*-
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.responses import Response
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
import cv2
import numpy as np
from PIL import Image
import os
import logging
import requests
from pathlib import Path
import uvicorn

# Initialize FastAPI
app = FastAPI(
    title="Face Swap API",
    description="API for swapping faces in images.",
    docs_url="/docs",
    redoc_url="/redoc",
)

# Logging setup
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# CORS setup
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Update with your domain in production
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Health check route
@app.get("/")
async def root():
    return {"message": "Face Swap API is running. Use /docs to test the API."}

@app.get("/health")
async def health_check():
    return {"status": "healthy"}

# Prevent multiple downloads
MODEL_PATH = Path("models/inswapper_128.onnx")
MODEL_URL = "https://huggingface.co/ezioruan/inswapper_128.onnx/resolve/main/inswapper_128.onnx"

def download_model():
    if MODEL_PATH.exists():
        logger.info("Model already exists, skipping download.")
        return
    logger.info("Downloading model...")
    MODEL_PATH.parent.mkdir(exist_ok=True)
    try:
        response = requests.get(MODEL_URL, stream=True, timeout=30)
        response.raise_for_status()
        with open(MODEL_PATH, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        logger.info("Model downloaded successfully.")
    except Exception as e:
        logger.error(f"Failed to download model: {e}")
        raise RuntimeError("Could not download inswapper_128.onnx.")

# FastAPI startup event
@asynccontextmanager
async def lifespan(app: FastAPI):
    logger.info("Starting application...")
    try:
        download_model()
        logger.info("Startup completed successfully.")
    except Exception as e:
        logger.error(f"Startup failed: {e}")
        raise
    yield
    logger.info("Shutting down application...")

app.lifespan = lifespan

# Face detection and swap functions
def get_faces(image):
    try:
        from insightface.app import FaceAnalysis
        app = FaceAnalysis(name="buffalo_l")
        app.prepare(ctx_id=0, det_size=(640, 640))
        return app.get(image) or []
    except Exception as e:
        logger.error(f"Face detection failed: {e}")
        raise

def swap_faces(source_img, target_img):
    try:
        from insightface.utils import face_align
        from insightface.model_zoo import face_swapper

        face_analyzer = FaceAnalysis(name="buffalo_l")
        face_analyzer.prepare(ctx_id=0, det_size=(640, 640))

        source_faces = face_analyzer.get(source_img)
        target_faces = face_analyzer.get(target_img)

        if not source_faces or not target_faces:
            raise ValueError("No faces detected.")
        if len(source_faces) > 1 or len(target_faces) > 1:
            raise ValueError("Multiple faces detected. Only one face per image is supported.")

        swapper = face_swapper.FaceSwapper(MODEL_PATH)
        result = swapper.get(target_img, target_faces[0], source_faces[0], paste_back=True)

        return cv2.cvtColor(np.array(Image.fromarray(cv2.cvtColor(result, cv2.COLOR_BGR2RGB))), cv2.COLOR_RGB2BGR)
    except Exception as e:
        logger.error(f"Face swap failed: {e}")
        raise

@app.post("/swap-face/")
async def swap_face(source_file: UploadFile = File(...), target_file: UploadFile = File(...)):
    try:
        source_path = "temp_source.jpg"
        target_path = "temp_target.jpg"
        output_path = "output.jpg"

        with open(source_path, "wb") as f:
            f.write(await source_file.read())
        with open(target_path, "wb") as f:
            f.write(await target_file.read())

        source_img = cv2.imread(source_path)
        target_img = cv2.imread(target_path)

        if source_img is None or target_img is None:
            raise ValueError("Invalid images provided.")

        result_img = swap_faces(source_img, target_img)

        cv2.imwrite(output_path, result_img)
        with open(output_path, "rb") as f:
            image_data = f.read()

        for path in [source_path, target_path, output_path]:
            if os.path.exists(path):
                os.remove(path)

        return Response(content=image_data, media_type="image/jpeg")

    except Exception as e:
        logger.error("Error in swap_face: %s", str(e))
        raise HTTPException(status_code=500, detail=str(e))

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=7860)