Spaces:
Running
Running
| """ | |
| Face swap processor using InsightFace (inswapper_128) with optional | |
| GFPGAN face enhancement. | |
| Model weights are downloaded automatically on first use and cached | |
| in the local `models/` directory. | |
| """ | |
| import os | |
| import shutil | |
| import cv2 | |
| import numpy as np | |
| import requests | |
| from pathlib import Path | |
| # ββ Model paths βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| MODELS_DIR = Path(__file__).parent.parent / "models" | |
| MODELS_DIR.mkdir(exist_ok=True) | |
| INSWAPPER_PATH = MODELS_DIR / "inswapper_128.onnx" | |
| CODEFORMER_PATH = MODELS_DIR / "codeformer.onnx" | |
| ESPCN_PATH = MODELS_DIR / "ESPCN_x2.pb" | |
| # Public mirrors β tried in order until one succeeds | |
| _INSWAPPER_URLS = [ | |
| # Public HF mirror (no auth required) | |
| "https://huggingface.co/ezioruan/inswapper_128.onnx/resolve/main/inswapper_128.onnx", | |
| # Fallback mirror | |
| "https://huggingface.co/theNeofr/inswapper/resolve/main/inswapper_128.onnx", | |
| ] | |
| # ββ Helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _download_inswapper() -> None: | |
| """Download inswapper_128.onnx. | |
| Strategy: | |
| 1. huggingface_hub.hf_hub_download (uses HF_TOKEN env var automatically | |
| on HF Spaces β works if user has accepted gated-model terms). | |
| 2. Plain HTTP fallback from public mirrors. | |
| """ | |
| if INSWAPPER_PATH.exists() and INSWAPPER_PATH.stat().st_size > 100_000: | |
| return | |
| # ββ Strategy 1: huggingface_hub ββββββββββββββββββββββββββββββββββββββββββ | |
| try: | |
| from huggingface_hub import hf_hub_download | |
| print("[FaceSwapper] Downloading inswapper_128.onnx via HF Hub β¦") | |
| cached = hf_hub_download( | |
| repo_id="deepinsight/inswapper", | |
| filename="inswapper_128.onnx", | |
| token=os.environ.get("HF_TOKEN"), | |
| ) | |
| shutil.copy(cached, INSWAPPER_PATH) | |
| print(f"[FaceSwapper] Saved to {INSWAPPER_PATH}") | |
| return | |
| except Exception as e: | |
| print(f"[FaceSwapper] HF Hub download failed ({e}), trying mirrors β¦") | |
| # ββ Strategy 2: public mirrors βββββββββββββββββββββββββββββββββββββββββββ | |
| for url in _INSWAPPER_URLS: | |
| try: | |
| print(f"[FaceSwapper] Trying {url} β¦") | |
| resp = requests.get(url, stream=True, timeout=180) | |
| resp.raise_for_status() | |
| with open(INSWAPPER_PATH, "wb") as f: | |
| for chunk in resp.iter_content(chunk_size=65536): | |
| f.write(chunk) | |
| if INSWAPPER_PATH.stat().st_size > 500_000_000: # ~554 MB expected | |
| print(f"[FaceSwapper] Saved to {INSWAPPER_PATH}") | |
| return | |
| INSWAPPER_PATH.unlink(missing_ok=True) | |
| print("[FaceSwapper] Mirror file too small, trying next β¦") | |
| except Exception as e: | |
| print(f"[FaceSwapper] Mirror failed ({e})") | |
| INSWAPPER_PATH.unlink(missing_ok=True) | |
| raise RuntimeError( | |
| "Could not download inswapper_128.onnx. " | |
| "Accept the model terms at https://huggingface.co/deepinsight/inswapper " | |
| "then add your HF token as a Space secret named HF_TOKEN." | |
| ) | |
| def _download_codeformer() -> None: | |
| """Download CodeFormer ONNX model (~56 MB).""" | |
| if CODEFORMER_PATH.exists() and CODEFORMER_PATH.stat().st_size > 50_000_000: | |
| return | |
| urls = [ | |
| "https://github.com/facefusion/facefusion-assets/releases/download/models/codeformer.onnx", | |
| ] | |
| for url in urls: | |
| try: | |
| print(f"[FaceSwapper] Downloading CodeFormer from {url} β¦") | |
| resp = requests.get(url, stream=True, timeout=300) | |
| resp.raise_for_status() | |
| with open(CODEFORMER_PATH, "wb") as f: | |
| for chunk in resp.iter_content(65536): | |
| f.write(chunk) | |
| if CODEFORMER_PATH.stat().st_size > 50_000_000: | |
| print("[FaceSwapper] CodeFormer ready.") | |
| return | |
| CODEFORMER_PATH.unlink(missing_ok=True) | |
| except Exception as e: | |
| print(f"[FaceSwapper] CodeFormer download failed: {e}") | |
| CODEFORMER_PATH.unlink(missing_ok=True) | |
| print("[FaceSwapper] CodeFormer unavailable β falling back to OpenCV enhancement.") | |
| def _download_espcn() -> None: | |
| """Download ESPCN x2 super-resolution model (~100 KB).""" | |
| if ESPCN_PATH.exists() and ESPCN_PATH.stat().st_size > 50_000: | |
| return | |
| urls = [ | |
| "https://github.com/fannymonori/TF-ESPCN/raw/master/export/ESPCN_x2.pb", | |
| ] | |
| for url in urls: | |
| try: | |
| print(f"[FaceSwapper] Downloading ESPCN SR model from {url} β¦") | |
| resp = requests.get(url, timeout=60) | |
| resp.raise_for_status() | |
| ESPCN_PATH.write_bytes(resp.content) | |
| if ESPCN_PATH.stat().st_size > 50_000: | |
| print("[FaceSwapper] ESPCN SR model ready.") | |
| return | |
| ESPCN_PATH.unlink(missing_ok=True) | |
| except Exception as e: | |
| print(f"[FaceSwapper] ESPCN download failed: {e}") | |
| ESPCN_PATH.unlink(missing_ok=True) | |
| print("[FaceSwapper] ESPCN unavailable β skipping super-resolution step.") | |
| # ββ Main class ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class FaceSwapper: | |
| """ | |
| Swaps the dominant face from a source image onto every detected face in | |
| the target image. Optionally runs CodeFormer (ONNX) + ESPCN super-res | |
| for ultra-realistic high-definition output. | |
| """ | |
| def __init__(self): | |
| self._app = None # InsightFace FaceAnalysis | |
| self._swapper = None # inswapper ONNX model | |
| self._codeformer = None # CodeFormer ONNX session | |
| self._sr = None # ESPCN DNN super-res (opencv-contrib) | |
| self._ready = False | |
| # ββ Lazy initialisation βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _init(self): | |
| if self._ready: | |
| return | |
| import insightface | |
| from insightface.app import FaceAnalysis | |
| import onnxruntime as ort | |
| import multiprocessing | |
| n_threads = multiprocessing.cpu_count() | |
| # Use all available CPU cores for ONNX inference | |
| sess_opts = ort.SessionOptions() | |
| sess_opts.intra_op_num_threads = n_threads | |
| sess_opts.inter_op_num_threads = n_threads | |
| sess_opts.execution_mode = ort.ExecutionMode.ORT_PARALLEL | |
| # Face analysis β 640 for images, 320 for video (set via swap_frame) | |
| self._app = FaceAnalysis( | |
| name="buffalo_l", | |
| providers=["CPUExecutionProvider"], | |
| ) | |
| self._app.prepare(ctx_id=-1, det_size=(640, 640)) | |
| # inswapper model with multi-thread session options | |
| _download_inswapper() | |
| self._swapper = insightface.model_zoo.get_model( | |
| str(INSWAPPER_PATH), | |
| providers=["CPUExecutionProvider"], | |
| ) | |
| self._ready = True | |
| # ββ Enhancement (pure OpenCV, no extra models) ββββββββββββββββββββββββββββ | |
| def _enhance_opencv(image: np.ndarray, faces) -> np.ndarray: | |
| """ | |
| For each detected face bounding box: | |
| 1. Unsharp masking β recovers detail lost by inswapper's 128-px output | |
| 2. CLAHE on the L channel β local contrast without blowing highlights | |
| """ | |
| result = image.copy() | |
| for face in faces: | |
| box = face.bbox.astype(int) | |
| x1, y1, x2, y2 = ( | |
| max(box[0], 0), max(box[1], 0), | |
| min(box[2], image.shape[1]), min(box[3], image.shape[0]), | |
| ) | |
| if x2 <= x1 or y2 <= y1: | |
| continue | |
| roi = result[y1:y2, x1:x2].copy() | |
| # 1. Unsharp mask β scale radius with face size for consistent sharpness | |
| face_short = min(x2 - x1, y2 - y1) | |
| sigma = max(1.5, face_short / 80) # larger face β larger radius | |
| blurred = cv2.GaussianBlur(roi, (0, 0), sigma) | |
| sharp = cv2.addWeighted(roi, 1.8, blurred, -0.8, 0) | |
| # 2. CLAHE on L channel | |
| lab = cv2.cvtColor(sharp, cv2.COLOR_BGR2LAB) | |
| clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) | |
| lab[:, :, 0] = clahe.apply(lab[:, :, 0]) | |
| enhanced_roi = cv2.cvtColor(lab, cv2.COLOR_LAB2BGR) | |
| # Feather-blend back so edges stay smooth | |
| mask = np.zeros(roi.shape[:2], dtype=np.float32) | |
| pad = max(4, (y2 - y1) // 10) | |
| mask[pad:-pad, pad:-pad] = 1.0 | |
| mask = cv2.GaussianBlur(mask, (0, 0), pad // 2 or 1) | |
| mask_3ch = mask[:, :, np.newaxis] | |
| result[y1:y2, x1:x2] = ( | |
| enhanced_roi * mask_3ch + roi * (1 - mask_3ch) | |
| ).astype(np.uint8) | |
| return result | |
| # ββ CodeFormer ONNX enhancement βββββββββββββββββββββββββββββββββββββββββββ | |
| def _load_codeformer(self): | |
| """Lazy-load CodeFormer ONNX session. Returns None if unavailable.""" | |
| if self._codeformer is not None: | |
| return self._codeformer | |
| try: | |
| _download_codeformer() | |
| if not CODEFORMER_PATH.exists(): | |
| return None | |
| import onnxruntime as ort | |
| self._codeformer = ort.InferenceSession( | |
| str(CODEFORMER_PATH), | |
| providers=["CPUExecutionProvider"], | |
| ) | |
| print("[FaceSwapper] CodeFormer ONNX loaded.") | |
| except Exception as e: | |
| print(f"[FaceSwapper] CodeFormer load failed: {e}") | |
| self._codeformer = None | |
| return self._codeformer | |
| def _load_sr(self): | |
| """Lazy-load ESPCN x2 DNN super-res (needs opencv-contrib). Returns None if unavailable.""" | |
| if self._sr is not None: | |
| return self._sr | |
| try: | |
| _download_espcn() | |
| if not ESPCN_PATH.exists(): | |
| return None | |
| sr = cv2.dnn_superres.DnnSuperResImpl_create() | |
| sr.readModel(str(ESPCN_PATH)) | |
| sr.setModel("espcn", 2) | |
| self._sr = sr | |
| print("[FaceSwapper] ESPCN 2Γ super-res loaded.") | |
| except Exception as e: | |
| print(f"[FaceSwapper] ESPCN load failed ({e}) β super-res disabled.") | |
| self._sr = None | |
| return self._sr | |
| def _enhance_codeformer(self, image: np.ndarray, faces) -> np.ndarray: | |
| """ | |
| For each detected face: | |
| 1. CodeFormer ONNX β neural face restoration at 512Γ512 | |
| 2. ESPCN 2Γ super-res β upscales small faces for HD output | |
| 3. CLAHE β local contrast refinement | |
| Falls back to OpenCV enhancement if CodeFormer is unavailable. | |
| """ | |
| sess = self._load_codeformer() | |
| if sess is None: | |
| return self._enhance_opencv(image, faces) | |
| sr = self._load_sr() # may be None β applied only when available | |
| result = image.copy() | |
| input_names = [i.name for i in sess.get_inputs()] | |
| for face in faces: | |
| box = face.bbox.astype(int) | |
| # Expand bbox 20% for realistic context padding | |
| bx1, by1, bx2, by2 = ( | |
| max(box[0], 0), max(box[1], 0), | |
| min(box[2], image.shape[1]), min(box[3], image.shape[0]), | |
| ) | |
| pad = int(min(bx2 - bx1, by2 - by1) * 0.15) | |
| x1 = max(0, bx1 - pad); y1 = max(0, by1 - pad) | |
| x2 = min(image.shape[1], bx2 + pad); y2 = min(image.shape[0], by2 + pad) | |
| if x2 <= x1 or y2 <= y1: | |
| continue | |
| roi = result[y1:y2, x1:x2].copy() | |
| orig = roi.copy() | |
| h, w = roi.shape[:2] | |
| # ββ 1. CodeFormer: BGRβRGB, resize to 512, normalize [-1, 1] βββββ | |
| face_rgb = cv2.cvtColor(roi, cv2.COLOR_BGR2RGB) | |
| face_512 = cv2.resize(face_rgb, (512, 512), interpolation=cv2.INTER_LANCZOS4) | |
| inp = (face_512.astype(np.float32) / 127.5) - 1.0 # [-1, 1] | |
| inp = np.transpose(inp, (2, 0, 1))[np.newaxis] # [1,3,512,512] | |
| try: | |
| out = sess.run(None, {input_names[0]: inp})[0] # [1,3,512,512] | |
| except Exception as e: | |
| print(f"[FaceSwapper] CodeFormer inference failed: {e}") | |
| continue | |
| # Postprocess: [-1,1] β [0,255] β BGR | |
| out_rgb = np.squeeze(out) # [3,512,512] | |
| out_rgb = np.transpose(out_rgb, (1, 2, 0)) # [512,512,3] | |
| out_rgb = ((out_rgb + 1.0) * 127.5).clip(0, 255).astype(np.uint8) | |
| out_bgr = cv2.cvtColor(out_rgb, cv2.COLOR_RGB2BGR) | |
| # ββ 2. ESPCN 2Γ super-res on small faces (<= 128 px) βββββββββββββ | |
| if sr is not None and min(w, h) <= 128: | |
| try: | |
| out_bgr = sr.upsample(out_bgr) | |
| # Resize back to face region size (x2 upsample β scale back down) | |
| out_bgr = cv2.resize(out_bgr, (w, h), interpolation=cv2.INTER_LANCZOS4) | |
| except Exception: | |
| out_bgr = cv2.resize(out_bgr, (w, h), interpolation=cv2.INTER_LANCZOS4) | |
| else: | |
| out_bgr = cv2.resize(out_bgr, (w, h), interpolation=cv2.INTER_LANCZOS4) | |
| # ββ 3. CLAHE on L channel for final contrast refinement βββββββββββ | |
| lab = cv2.cvtColor(out_bgr, cv2.COLOR_BGR2LAB) | |
| clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) | |
| lab[:, :, 0] = clahe.apply(lab[:, :, 0]) | |
| out_bgr = cv2.cvtColor(lab, cv2.COLOR_LAB2BGR) | |
| # ββ 4. Feather-blend onto result ββββββββββββββββββββββββββββββββββ | |
| msk = np.zeros((h, w), dtype=np.float32) | |
| p = max(4, min(h, w) // 10) | |
| msk[p:-p, p:-p] = 1.0 | |
| msk = cv2.GaussianBlur(msk, (0, 0), p // 2 or 1) | |
| msk = msk[:, :, np.newaxis] | |
| result[y1:y2, x1:x2] = ( | |
| out_bgr.astype(np.float32) * msk + orig.astype(np.float32) * (1 - msk) | |
| ).astype(np.uint8) | |
| return result | |
| # ββ Laplacian pyramid blending ββββββββββββββββββββββββββββββββββββββββββββ | |
| def _face_ellipse_mask(shape: tuple, faces, expand: float = 0.35) -> np.ndarray: | |
| """ | |
| Soft elliptical mask covering all detected face regions. | |
| 255 = use swapped face, 0 = use original background. | |
| """ | |
| mask = np.zeros(shape[:2], dtype=np.uint8) | |
| for face in faces: | |
| box = face.bbox.astype(int) | |
| x1 = max(box[0], 0); y1 = max(box[1], 0) | |
| x2 = min(box[2], shape[1]); y2 = min(box[3], shape[0]) | |
| w, h = x2 - x1, y2 - y1 | |
| if w <= 0 or h <= 0: | |
| continue | |
| cx, cy = (x1 + x2) // 2, (y1 + y2) // 2 | |
| ax = int(w // 2 * (1 + expand)) | |
| ay = int(h // 2 * (1 + expand)) | |
| cv2.ellipse(mask, (cx, cy), (ax, ay), 0, 0, 360, 255, -1) | |
| # Heavy Gaussian feather β wide transition = no visible seam | |
| blur = max(31, min(mask.shape[:2]) // 10) | |
| if blur % 2 == 0: | |
| blur += 1 | |
| return cv2.GaussianBlur(mask, (blur, blur), 0) | |
| def _laplacian_blend(swapped: np.ndarray, original: np.ndarray, | |
| mask: np.ndarray, levels: int = 6) -> np.ndarray: | |
| """ | |
| Laplacian pyramid blending. | |
| Blends swapped face region onto original at multiple spatial scales | |
| so no hard edge is visible regardless of skin tone or lighting. | |
| mask: uint8 single-channel, 255 = take from swapped, 0 = take from original. | |
| """ | |
| A = swapped.astype(np.float32) | |
| B = original.astype(np.float32) | |
| M = mask.astype(np.float32) / 255.0 | |
| if M.ndim == 2: | |
| M = M[:, :, np.newaxis] | |
| # Expand to 3 channels so pyrDown/pyrUp never collapse the channel dim | |
| M = np.repeat(M, 3, axis=2) | |
| # Build Gaussian pyramids | |
| gA, gB, gM = [A], [B], [M] | |
| for _ in range(levels): | |
| gA.append(cv2.pyrDown(gA[-1])) | |
| gB.append(cv2.pyrDown(gB[-1])) | |
| gM.append(cv2.pyrDown(gM[-1])) | |
| # Build Laplacian pyramids | |
| lA, lB = [], [] | |
| for i in range(levels): | |
| sz = (gA[i].shape[1], gA[i].shape[0]) | |
| lA.append(gA[i] - cv2.pyrUp(gA[i + 1], dstsize=sz)) | |
| lB.append(gB[i] - cv2.pyrUp(gB[i + 1], dstsize=sz)) | |
| lA.append(gA[levels]) | |
| lB.append(gB[levels]) | |
| # Blend each level, reconstruct coarseβfine | |
| result = lA[levels] * gM[levels] + lB[levels] * (1.0 - gM[levels]) | |
| for i in range(levels - 1, -1, -1): | |
| sz = (lA[i].shape[1], lA[i].shape[0]) | |
| result = cv2.pyrUp(result, dstsize=sz) + lA[i] * gM[i] + lB[i] * (1.0 - gM[i]) | |
| return np.clip(result, 0, 255).astype(np.uint8) | |
| # ββ Public API ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def swap( | |
| self, | |
| source_bgr: np.ndarray, | |
| target_bgr: np.ndarray, | |
| enhance: bool = True, | |
| progress_cb=None, | |
| ): | |
| """ | |
| Swap the first detected face in *source_bgr* onto every face in | |
| *target_bgr*. Applies Laplacian pyramid blending for seamless edges. | |
| progress_cb: optional callable(fraction: float, label: str) | |
| Returns: | |
| (result_bgr, status_message) | |
| """ | |
| def _p(v, msg): | |
| if progress_cb: | |
| progress_cb(v, msg) | |
| self._init() | |
| _p(0.1, "Models ready β detecting facesβ¦") | |
| try: | |
| MAX_DIM = 2048 | |
| orig_h, orig_w = target_bgr.shape[:2] | |
| scale_down = 1.0 | |
| if max(orig_h, orig_w) > MAX_DIM: | |
| scale_down = MAX_DIM / max(orig_h, orig_w) | |
| target_bgr = cv2.resize( | |
| target_bgr, | |
| (int(orig_w * scale_down), int(orig_h * scale_down)), | |
| interpolation=cv2.INTER_LANCZOS4, | |
| ) | |
| source_faces = self._app.get(source_bgr) | |
| _p(0.3, "Source face detected β scanning targetβ¦") | |
| target_faces = self._app.get(target_bgr) | |
| if not source_faces: | |
| return None, "No face detected in source image." | |
| if not target_faces: | |
| return None, "No face detected in target image." | |
| _p(0.45, f"Swapping {len(target_faces)} face(s)β¦") | |
| source_face = source_faces[0] | |
| result = target_bgr.copy() | |
| original_bgr = target_bgr.copy() # kept for Laplacian blend | |
| for tgt_face in target_faces: | |
| result = self._swapper.get( | |
| result, tgt_face, source_face, paste_back=True | |
| ) | |
| # ββ Laplacian pyramid blending β removes hard boundary βββββββββ | |
| _p(0.65, "Blending edges (Laplacian pyramid)β¦") | |
| blend_mask = self._face_ellipse_mask(original_bgr.shape, target_faces) | |
| result = self._laplacian_blend(result, original_bgr, blend_mask) | |
| # ββ CodeFormer enhancement (images only) ββββββββββββββββββββββ | |
| if enhance: | |
| _p(0.80, "Enhancing quality (CodeFormer)β¦") | |
| result = self._enhance_codeformer(result, target_faces) | |
| # ββ Upscale back to original resolution βββββββββββββββββββββββ | |
| if scale_down < 1.0: | |
| _p(0.95, "Upscaling to original resolutionβ¦") | |
| result = cv2.resize( | |
| result, | |
| (orig_w, orig_h), | |
| interpolation=cv2.INTER_LANCZOS4, | |
| ) | |
| _p(1.0, f"Done β {len(target_faces)} face(s) swapped.") | |
| return result, f"Swapped {len(target_faces)} face(s) successfully." | |
| except Exception as exc: | |
| return None, f"Face swap error: {exc}" | |
| def get_source_face(self, source_bgr: np.ndarray): | |
| """ | |
| Detect and return the first face in *source_bgr*. | |
| Call once before a video loop and reuse the result in swap_frame(). | |
| Returns: | |
| face object or None | |
| """ | |
| self._init() | |
| faces = self._app.get(source_bgr) | |
| return faces[0] if faces else None | |
| def swap_frame( | |
| self, | |
| target_bgr: np.ndarray, | |
| source_face, | |
| cached_target_faces=None, | |
| enhance: bool = False, | |
| ): | |
| """ | |
| Fast path for video β reuses a pre-computed source_face and optionally | |
| cached target faces (re-detection skipped when supplied). | |
| Returns: | |
| (result_bgr, target_faces_used) | |
| """ | |
| self._init() | |
| # Cap video frames at 720p for speed; quality still good for motion | |
| MAX_VIDEO_DIM = 720 | |
| orig_h, orig_w = target_bgr.shape[:2] | |
| scale_down = 1.0 | |
| if max(orig_h, orig_w) > MAX_VIDEO_DIM: | |
| scale_down = MAX_VIDEO_DIM / max(orig_h, orig_w) | |
| target_bgr = cv2.resize( | |
| target_bgr, | |
| (int(orig_w * scale_down), int(orig_h * scale_down)), | |
| interpolation=cv2.INTER_LINEAR, | |
| ) | |
| if cached_target_faces is None: | |
| # Use smaller det_size for video to speed up detection | |
| self._app.det_model.input_size = (320, 320) | |
| target_faces = self._app.get(target_bgr) | |
| self._app.det_model.input_size = (640, 640) # restore for images | |
| else: | |
| target_faces = cached_target_faces | |
| if not target_faces: | |
| return None, [] | |
| result = target_bgr.copy() | |
| for tgt_face in target_faces: | |
| result = self._swapper.get(result, tgt_face, source_face, paste_back=True) | |
| # No per-frame enhancement for video β temporally unstable (causes flicker). | |
| # FFmpeg unsharp filter handles sharpening globally at encode time. | |
| # Scale back up to original frame size | |
| if scale_down < 1.0: | |
| result = cv2.resize(result, (orig_w, orig_h), interpolation=cv2.INTER_LINEAR) | |
| return result, target_faces | |