# -*- coding:UTF-8 -*- #!/usr/bin/env python import numpy as np import roop.globals from roop.core import start, decode_execution_providers from roop.processors.frame.core import get_frame_processors_modules from roop.utilities import normalize_output_path from roop.face_analyser import get_many_faces import os import requests import cv2 from pathlib import Path from PIL import Image from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks from fastapi.responses import FileResponse from fastapi.middleware.cors import CORSMiddleware import shutil import logging import time app = FastAPI() # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], # Update with your Framer domain in production allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) article_text = """
""" def download_model(): model_dir = Path("models") model_path = model_dir / "inswapper_128.onnx" model_url = "https://huggingface.co/ezioruan/inswapper_128.onnx/resolve/main/inswapper_128.onnx" if not model_path.exists(): logger.info("Model not found. Downloading inswapper_128.onnx...") model_dir.mkdir(exist_ok=True) try: response = requests.get(model_url, stream=True) response.raise_for_status() with open(model_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) logger.info("Model downloaded successfully.") except Exception as e: logger.error(f"Failed to download model: {e}") raise RuntimeError("Could not download inswapper_128.onnx. Please download it manually from 'https://huggingface.co/ezioruan/inswapper_128.onnx' and place it in the 'models' folder.") else: logger.info("Model already exists at: %s", model_path) download_model() def cleanup_files(paths): time.sleep(2) # Wait 2 seconds to ensure response is sent for path in paths: if os.path.exists(path): os.remove(path) logger.info("Cleaned up: %s", path) @app.post("/swap-face/") async def swap_face(source_file: UploadFile = File(...), target_file: UploadFile = File(...), doFaceEnhancer: bool = True, background_tasks: BackgroundTasks = BackgroundTasks()): try: # Save uploaded files temporarily source_path = "input.jpg" target_path = "target.jpg" output_path = "output.jpg" # Read and save source image source_content = await source_file.read() with open(source_path, "wb") as f: f.write(source_content) source_image = Image.open(source_path) source_image.save(source_path, quality=100, subsampling=0) # Read and save target image, get its size target_content = await target_file.read() with open(target_path, "wb") as f: f.write(target_content) target_image = Image.open(target_path) target_size = target_image.size target_image.save(target_path, quality=100, subsampling=0) # Load images as NumPy arrays for face detection source_img = cv2.imread(source_path) target_img = cv2.imread(target_path) if source_img is None or target_img is None: raise HTTPException(status_code=400, detail="Failed to load one or both images.") # Debug: Check face detection logger.info("Source faces detected: %s", get_many_faces(source_img)) logger.info("Target faces detected: %s", get_many_faces(target_img)) # Set up roop globals roop.globals.source_path = source_path roop.globals.target_path = target_path roop.globals.output_path = normalize_output_path( roop.globals.source_path, roop.globals.target_path, output_path ) roop.globals.frame_processors = ["face_swapper", "face_enhancer"] roop.globals.headless = True roop.globals.keep_fps = True roop.globals.keep_audio = True roop.globals.keep_frames = False roop.globals.many_faces = False roop.globals.reference_face_position = 0 roop.globals.similar_face_distance = 0.6 roop.globals.video_encoder = "libx264" roop.globals.video_quality = 18 roop.globals.max_memory = 16 roop.globals.execution_providers = decode_execution_providers(["cpu"]) roop.globals.execution_threads = 4 logger.info("Using frame processors: %s", roop.globals.frame_processors) # Validate frame processors for frame_processor in get_frame_processors_modules( roop.globals.frame_processors ): if not frame_processor.pre_check(): raise HTTPException(status_code=500, detail=f"Pre-check failed for {frame_processor.__name__}") # Perform face swap start() if os.path.exists(output_path): output_image = Image.open(output_path) output_image = output_image.resize(target_size, Image.Resampling.LANCZOS) output_image.save(output_path, quality=100, subsampling=0) logger.info("Returning FileResponse for %s", output_path) background_tasks.add_task(cleanup_files, [source_path, target_path, output_path]) return FileResponse(output_path, media_type="image/jpeg", filename="output.jpg") else: raise HTTPException(status_code=500, detail="Face swap failed, output not generated") except Exception as e: logger.error("Error in swap_face: %s", str(e)) raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="127.0.0.1", port=8000)