from __future__ import annotations import imghdr import tempfile import threading from pathlib import Path import cv2 import onnxruntime import modules.globals from modules.face_analyser import get_face_analyser from modules.processors.frame import face_swapper _RUNTIME_LOCK = threading.Lock() _INFERENCE_LOCK = threading.Lock() _RUNTIME_READY = False def _select_execution_providers() -> list[str]: available = onnxruntime.get_available_providers() preferred = [ "CUDAExecutionProvider", "CoreMLExecutionProvider", "DmlExecutionProvider", "CPUExecutionProvider", ] selected = [provider for provider in preferred if provider in available] return selected or ["CPUExecutionProvider"] def _configure_globals() -> None: modules.globals.frame_processors = ["face_swapper"] modules.globals.execution_providers = _select_execution_providers() modules.globals.execution_threads = 1 modules.globals.keep_fps = True modules.globals.keep_audio = False modules.globals.keep_frames = False modules.globals.many_faces = False modules.globals.map_faces = False modules.globals.mouth_mask = False modules.globals.nsfw_filter = False modules.globals.headless = True modules.globals.opacity = 1.0 modules.globals.sharpness = 0.0 modules.globals.enable_interpolation = False modules.globals.log_level = "error" def initialize_runtime() -> None: global _RUNTIME_READY if _RUNTIME_READY: return with _RUNTIME_LOCK: if _RUNTIME_READY: return _configure_globals() if not face_swapper.pre_check(): raise RuntimeError("Face swapper model setup failed.") if not face_swapper.pre_start(): raise RuntimeError("Face swapper model failed to initialize.") get_face_analyser() _RUNTIME_READY = True def _infer_extension(file_path: Path) -> str: detected = imghdr.what(file_path) if detected == "jpeg": return ".jpg" if detected == "png": return ".png" if detected == "webp": return ".webp" raise ValueError("Downloaded file is not a supported image.") def _write_uploaded_image(content: bytes, directory: Path, stem: str) -> Path: provisional_path = directory / f"{stem}.bin" provisional_path.write_bytes(content) final_path = provisional_path.with_suffix(_infer_extension(provisional_path)) provisional_path.replace(final_path) return final_path def swap_face_from_uploads(source_image_bytes: bytes, target_image_bytes: bytes) -> tuple[bytes, str]: initialize_runtime() with _INFERENCE_LOCK: with tempfile.TemporaryDirectory(prefix="face-swap-") as temp_dir: temp_path = Path(temp_dir) source_path = _write_uploaded_image(source_image_bytes, temp_path, "source") target_path = _write_uploaded_image(target_image_bytes, temp_path, "target") output_path = temp_path / "result.png" face_swapper.process_image(str(source_path), str(target_path), str(output_path)) if not output_path.exists(): raise RuntimeError("Face swap failed and no output image was produced.") result = cv2.imread(str(output_path)) if result is None: raise RuntimeError("Generated output image could not be read.") ok, encoded = cv2.imencode(".png", result) if not ok: raise RuntimeError("Generated output image could not be encoded.") return encoded.tobytes(), "image/png"