File size: 6,494 Bytes
456bcc4
 
279af2f
456bcc4
 
 
 
 
279af2f
456bcc4
 
 
 
 
 
 
279af2f
 
456bcc4
279af2f
 
 
456bcc4
 
 
 
 
279af2f
 
456bcc4
279af2f
 
 
 
 
456bcc4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
279af2f
 
456bcc4
279af2f
 
456bcc4
 
279af2f
 
456bcc4
 
279af2f
456bcc4
 
 
 
 
 
279af2f
456bcc4
 
 
 
279af2f
456bcc4
279af2f
 
 
456bcc4
279af2f
456bcc4
 
 
279af2f
456bcc4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
279af2f
456bcc4
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# -*- coding:UTF-8 -*-
#!/usr/bin/env python
import numpy as np
import roop.globals
from roop.core import start, decode_execution_providers
from roop.processors.frame.core import get_frame_processors_modules
from roop.utilities import normalize_output_path
from roop.face_analyser import get_many_faces
import os
import requests
import cv2
from pathlib import Path
from PIL import Image
from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks
from fastapi.responses import FileResponse
from fastapi.middleware.cors import CORSMiddleware
import shutil
import logging
import time

app = FastAPI()

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Add CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Update with your Framer domain in production
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

article_text = """
<div style="text-align: center;">
    <p>Enjoying the tool? Buy me a coffee and get exclusive prompt guides!</p>
    <p><i>Instantly unlock helpful tips for creating better prompts!</i></p>
    <div style="display: flex; justify-content: center;">
        <a href=\"https://piczify.lemonsqueezy.com/buy/0f5206fa-68e8-42f6-9ca8-4f80c587c83e\">
            <img src=\"https://www.buymeacoffee.com/assets/img/custom_images/yellow_img.png\" 
                 alt=\"Buy Me a Coffee\" 
                 style=\"height: 40px; width: auto; box-shadow: 0 4px 8px rgba(0, 0, 0, 0.2); border-radius: 10px;\">
        </a>
    </div>
</div>
"""

def download_model():
    model_dir = Path("models")
    model_path = model_dir / "inswapper_128.onnx"
    model_url = "https://huggingface.co/ezioruan/inswapper_128.onnx/resolve/main/inswapper_128.onnx"

    if not model_path.exists():
        logger.info("Model not found. Downloading inswapper_128.onnx...")
        model_dir.mkdir(exist_ok=True)
        try:
            response = requests.get(model_url, stream=True)
            response.raise_for_status()
            with open(model_path, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)
            logger.info("Model downloaded successfully.")
        except Exception as e:
            logger.error(f"Failed to download model: {e}")
            raise RuntimeError("Could not download inswapper_128.onnx. Please download it manually from 'https://huggingface.co/ezioruan/inswapper_128.onnx' and place it in the 'models' folder.")
    else:
        logger.info("Model already exists at: %s", model_path)

download_model()

def cleanup_files(paths):
    time.sleep(2)  # Wait 2 seconds to ensure response is sent
    for path in paths:
        if os.path.exists(path):
            os.remove(path)
            logger.info("Cleaned up: %s", path)

@app.post("/swap-face/")
async def swap_face(source_file: UploadFile = File(...), target_file: UploadFile = File(...), doFaceEnhancer: bool = True, background_tasks: BackgroundTasks = BackgroundTasks()):
    try:
        # Save uploaded files temporarily
        source_path = "input.jpg"
        target_path = "target.jpg"
        output_path = "output.jpg"

        # Read and save source image
        source_content = await source_file.read()
        with open(source_path, "wb") as f:
            f.write(source_content)
        source_image = Image.open(source_path)
        source_image.save(source_path, quality=100, subsampling=0)

        # Read and save target image, get its size
        target_content = await target_file.read()
        with open(target_path, "wb") as f:
            f.write(target_content)
        target_image = Image.open(target_path)
        target_size = target_image.size
        target_image.save(target_path, quality=100, subsampling=0)

        # Load images as NumPy arrays for face detection
        source_img = cv2.imread(source_path)
        target_img = cv2.imread(target_path)
        if source_img is None or target_img is None:
            raise HTTPException(status_code=400, detail="Failed to load one or both images.")

        # Debug: Check face detection
        logger.info("Source faces detected: %s", get_many_faces(source_img))
        logger.info("Target faces detected: %s", get_many_faces(target_img))

        # Set up roop globals
        roop.globals.source_path = source_path
        roop.globals.target_path = target_path
        roop.globals.output_path = normalize_output_path(
            roop.globals.source_path, roop.globals.target_path, output_path
        )
        roop.globals.frame_processors = ["face_swapper", "face_enhancer"]
        roop.globals.headless = True
        roop.globals.keep_fps = True
        roop.globals.keep_audio = True
        roop.globals.keep_frames = False
        roop.globals.many_faces = False
        roop.globals.reference_face_position = 0
        roop.globals.similar_face_distance = 0.6
        roop.globals.video_encoder = "libx264"
        roop.globals.video_quality = 18
        roop.globals.max_memory = 16
        roop.globals.execution_providers = decode_execution_providers(["cpu"])
        roop.globals.execution_threads = 4

        logger.info("Using frame processors: %s", roop.globals.frame_processors)

        # Validate frame processors
        for frame_processor in get_frame_processors_modules(
            roop.globals.frame_processors
        ):
            if not frame_processor.pre_check():
                raise HTTPException(status_code=500, detail=f"Pre-check failed for {frame_processor.__name__}")

        # Perform face swap
        start()
        if os.path.exists(output_path):
            output_image = Image.open(output_path)
            output_image = output_image.resize(target_size, Image.Resampling.LANCZOS)
            output_image.save(output_path, quality=100, subsampling=0)
            logger.info("Returning FileResponse for %s", output_path)
            background_tasks.add_task(cleanup_files, [source_path, target_path, output_path])
            return FileResponse(output_path, media_type="image/jpeg", filename="output.jpg")
        else:
            raise HTTPException(status_code=500, detail="Face swap failed, output not generated")
    except Exception as e:
        logger.error("Error in swap_face: %s", str(e))
        raise HTTPException(status_code=500, detail=str(e))

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8000)