Swapper / processors /face_swap.py
devkunalnaik's picture
Fix: expand mask to 3 channels before Laplacian pyramid to prevent shape broadcast error
abd882e
Raw
History Blame Contribute Delete
23.4 kB
"""
Face swap processor using InsightFace (inswapper_128) with optional
GFPGAN face enhancement.
Model weights are downloaded automatically on first use and cached
in the local `models/` directory.
"""
import os
import shutil
import cv2
import numpy as np
import requests
from pathlib import Path
# ── Model paths ───────────────────────────────────────────────────────────────
MODELS_DIR = Path(__file__).parent.parent / "models"
MODELS_DIR.mkdir(exist_ok=True)
INSWAPPER_PATH = MODELS_DIR / "inswapper_128.onnx"
CODEFORMER_PATH = MODELS_DIR / "codeformer.onnx"
ESPCN_PATH = MODELS_DIR / "ESPCN_x2.pb"
# Public mirrors β€” tried in order until one succeeds
_INSWAPPER_URLS = [
# Public HF mirror (no auth required)
"https://huggingface.co/ezioruan/inswapper_128.onnx/resolve/main/inswapper_128.onnx",
# Fallback mirror
"https://huggingface.co/theNeofr/inswapper/resolve/main/inswapper_128.onnx",
]
# ── Helpers ───────────────────────────────────────────────────────────────────
def _download_inswapper() -> None:
"""Download inswapper_128.onnx.
Strategy:
1. huggingface_hub.hf_hub_download (uses HF_TOKEN env var automatically
on HF Spaces β€” works if user has accepted gated-model terms).
2. Plain HTTP fallback from public mirrors.
"""
if INSWAPPER_PATH.exists() and INSWAPPER_PATH.stat().st_size > 100_000:
return
# ── Strategy 1: huggingface_hub ──────────────────────────────────────────
try:
from huggingface_hub import hf_hub_download
print("[FaceSwapper] Downloading inswapper_128.onnx via HF Hub …")
cached = hf_hub_download(
repo_id="deepinsight/inswapper",
filename="inswapper_128.onnx",
token=os.environ.get("HF_TOKEN"),
)
shutil.copy(cached, INSWAPPER_PATH)
print(f"[FaceSwapper] Saved to {INSWAPPER_PATH}")
return
except Exception as e:
print(f"[FaceSwapper] HF Hub download failed ({e}), trying mirrors …")
# ── Strategy 2: public mirrors ───────────────────────────────────────────
for url in _INSWAPPER_URLS:
try:
print(f"[FaceSwapper] Trying {url} …")
resp = requests.get(url, stream=True, timeout=180)
resp.raise_for_status()
with open(INSWAPPER_PATH, "wb") as f:
for chunk in resp.iter_content(chunk_size=65536):
f.write(chunk)
if INSWAPPER_PATH.stat().st_size > 500_000_000: # ~554 MB expected
print(f"[FaceSwapper] Saved to {INSWAPPER_PATH}")
return
INSWAPPER_PATH.unlink(missing_ok=True)
print("[FaceSwapper] Mirror file too small, trying next …")
except Exception as e:
print(f"[FaceSwapper] Mirror failed ({e})")
INSWAPPER_PATH.unlink(missing_ok=True)
raise RuntimeError(
"Could not download inswapper_128.onnx. "
"Accept the model terms at https://huggingface.co/deepinsight/inswapper "
"then add your HF token as a Space secret named HF_TOKEN."
)
def _download_codeformer() -> None:
"""Download CodeFormer ONNX model (~56 MB)."""
if CODEFORMER_PATH.exists() and CODEFORMER_PATH.stat().st_size > 50_000_000:
return
urls = [
"https://github.com/facefusion/facefusion-assets/releases/download/models/codeformer.onnx",
]
for url in urls:
try:
print(f"[FaceSwapper] Downloading CodeFormer from {url} …")
resp = requests.get(url, stream=True, timeout=300)
resp.raise_for_status()
with open(CODEFORMER_PATH, "wb") as f:
for chunk in resp.iter_content(65536):
f.write(chunk)
if CODEFORMER_PATH.stat().st_size > 50_000_000:
print("[FaceSwapper] CodeFormer ready.")
return
CODEFORMER_PATH.unlink(missing_ok=True)
except Exception as e:
print(f"[FaceSwapper] CodeFormer download failed: {e}")
CODEFORMER_PATH.unlink(missing_ok=True)
print("[FaceSwapper] CodeFormer unavailable β€” falling back to OpenCV enhancement.")
def _download_espcn() -> None:
"""Download ESPCN x2 super-resolution model (~100 KB)."""
if ESPCN_PATH.exists() and ESPCN_PATH.stat().st_size > 50_000:
return
urls = [
"https://github.com/fannymonori/TF-ESPCN/raw/master/export/ESPCN_x2.pb",
]
for url in urls:
try:
print(f"[FaceSwapper] Downloading ESPCN SR model from {url} …")
resp = requests.get(url, timeout=60)
resp.raise_for_status()
ESPCN_PATH.write_bytes(resp.content)
if ESPCN_PATH.stat().st_size > 50_000:
print("[FaceSwapper] ESPCN SR model ready.")
return
ESPCN_PATH.unlink(missing_ok=True)
except Exception as e:
print(f"[FaceSwapper] ESPCN download failed: {e}")
ESPCN_PATH.unlink(missing_ok=True)
print("[FaceSwapper] ESPCN unavailable β€” skipping super-resolution step.")
# ── Main class ────────────────────────────────────────────────────────────────
class FaceSwapper:
"""
Swaps the dominant face from a source image onto every detected face in
the target image. Optionally runs CodeFormer (ONNX) + ESPCN super-res
for ultra-realistic high-definition output.
"""
def __init__(self):
self._app = None # InsightFace FaceAnalysis
self._swapper = None # inswapper ONNX model
self._codeformer = None # CodeFormer ONNX session
self._sr = None # ESPCN DNN super-res (opencv-contrib)
self._ready = False
# ── Lazy initialisation ───────────────────────────────────────────────────
def _init(self):
if self._ready:
return
import insightface
from insightface.app import FaceAnalysis
import onnxruntime as ort
import multiprocessing
n_threads = multiprocessing.cpu_count()
# Use all available CPU cores for ONNX inference
sess_opts = ort.SessionOptions()
sess_opts.intra_op_num_threads = n_threads
sess_opts.inter_op_num_threads = n_threads
sess_opts.execution_mode = ort.ExecutionMode.ORT_PARALLEL
# Face analysis β€” 640 for images, 320 for video (set via swap_frame)
self._app = FaceAnalysis(
name="buffalo_l",
providers=["CPUExecutionProvider"],
)
self._app.prepare(ctx_id=-1, det_size=(640, 640))
# inswapper model with multi-thread session options
_download_inswapper()
self._swapper = insightface.model_zoo.get_model(
str(INSWAPPER_PATH),
providers=["CPUExecutionProvider"],
)
self._ready = True
# ── Enhancement (pure OpenCV, no extra models) ────────────────────────────
@staticmethod
def _enhance_opencv(image: np.ndarray, faces) -> np.ndarray:
"""
For each detected face bounding box:
1. Unsharp masking β€” recovers detail lost by inswapper's 128-px output
2. CLAHE on the L channel β€” local contrast without blowing highlights
"""
result = image.copy()
for face in faces:
box = face.bbox.astype(int)
x1, y1, x2, y2 = (
max(box[0], 0), max(box[1], 0),
min(box[2], image.shape[1]), min(box[3], image.shape[0]),
)
if x2 <= x1 or y2 <= y1:
continue
roi = result[y1:y2, x1:x2].copy()
# 1. Unsharp mask β€” scale radius with face size for consistent sharpness
face_short = min(x2 - x1, y2 - y1)
sigma = max(1.5, face_short / 80) # larger face β†’ larger radius
blurred = cv2.GaussianBlur(roi, (0, 0), sigma)
sharp = cv2.addWeighted(roi, 1.8, blurred, -0.8, 0)
# 2. CLAHE on L channel
lab = cv2.cvtColor(sharp, cv2.COLOR_BGR2LAB)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
lab[:, :, 0] = clahe.apply(lab[:, :, 0])
enhanced_roi = cv2.cvtColor(lab, cv2.COLOR_LAB2BGR)
# Feather-blend back so edges stay smooth
mask = np.zeros(roi.shape[:2], dtype=np.float32)
pad = max(4, (y2 - y1) // 10)
mask[pad:-pad, pad:-pad] = 1.0
mask = cv2.GaussianBlur(mask, (0, 0), pad // 2 or 1)
mask_3ch = mask[:, :, np.newaxis]
result[y1:y2, x1:x2] = (
enhanced_roi * mask_3ch + roi * (1 - mask_3ch)
).astype(np.uint8)
return result
# ── CodeFormer ONNX enhancement ───────────────────────────────────────────
def _load_codeformer(self):
"""Lazy-load CodeFormer ONNX session. Returns None if unavailable."""
if self._codeformer is not None:
return self._codeformer
try:
_download_codeformer()
if not CODEFORMER_PATH.exists():
return None
import onnxruntime as ort
self._codeformer = ort.InferenceSession(
str(CODEFORMER_PATH),
providers=["CPUExecutionProvider"],
)
print("[FaceSwapper] CodeFormer ONNX loaded.")
except Exception as e:
print(f"[FaceSwapper] CodeFormer load failed: {e}")
self._codeformer = None
return self._codeformer
def _load_sr(self):
"""Lazy-load ESPCN x2 DNN super-res (needs opencv-contrib). Returns None if unavailable."""
if self._sr is not None:
return self._sr
try:
_download_espcn()
if not ESPCN_PATH.exists():
return None
sr = cv2.dnn_superres.DnnSuperResImpl_create()
sr.readModel(str(ESPCN_PATH))
sr.setModel("espcn", 2)
self._sr = sr
print("[FaceSwapper] ESPCN 2Γ— super-res loaded.")
except Exception as e:
print(f"[FaceSwapper] ESPCN load failed ({e}) β€” super-res disabled.")
self._sr = None
return self._sr
def _enhance_codeformer(self, image: np.ndarray, faces) -> np.ndarray:
"""
For each detected face:
1. CodeFormer ONNX β€” neural face restoration at 512Γ—512
2. ESPCN 2Γ— super-res β€” upscales small faces for HD output
3. CLAHE β€” local contrast refinement
Falls back to OpenCV enhancement if CodeFormer is unavailable.
"""
sess = self._load_codeformer()
if sess is None:
return self._enhance_opencv(image, faces)
sr = self._load_sr() # may be None β€” applied only when available
result = image.copy()
input_names = [i.name for i in sess.get_inputs()]
for face in faces:
box = face.bbox.astype(int)
# Expand bbox 20% for realistic context padding
bx1, by1, bx2, by2 = (
max(box[0], 0), max(box[1], 0),
min(box[2], image.shape[1]), min(box[3], image.shape[0]),
)
pad = int(min(bx2 - bx1, by2 - by1) * 0.15)
x1 = max(0, bx1 - pad); y1 = max(0, by1 - pad)
x2 = min(image.shape[1], bx2 + pad); y2 = min(image.shape[0], by2 + pad)
if x2 <= x1 or y2 <= y1:
continue
roi = result[y1:y2, x1:x2].copy()
orig = roi.copy()
h, w = roi.shape[:2]
# ── 1. CodeFormer: BGRβ†’RGB, resize to 512, normalize [-1, 1] ─────
face_rgb = cv2.cvtColor(roi, cv2.COLOR_BGR2RGB)
face_512 = cv2.resize(face_rgb, (512, 512), interpolation=cv2.INTER_LANCZOS4)
inp = (face_512.astype(np.float32) / 127.5) - 1.0 # [-1, 1]
inp = np.transpose(inp, (2, 0, 1))[np.newaxis] # [1,3,512,512]
try:
out = sess.run(None, {input_names[0]: inp})[0] # [1,3,512,512]
except Exception as e:
print(f"[FaceSwapper] CodeFormer inference failed: {e}")
continue
# Postprocess: [-1,1] β†’ [0,255] β†’ BGR
out_rgb = np.squeeze(out) # [3,512,512]
out_rgb = np.transpose(out_rgb, (1, 2, 0)) # [512,512,3]
out_rgb = ((out_rgb + 1.0) * 127.5).clip(0, 255).astype(np.uint8)
out_bgr = cv2.cvtColor(out_rgb, cv2.COLOR_RGB2BGR)
# ── 2. ESPCN 2Γ— super-res on small faces (<= 128 px) ─────────────
if sr is not None and min(w, h) <= 128:
try:
out_bgr = sr.upsample(out_bgr)
# Resize back to face region size (x2 upsample β†’ scale back down)
out_bgr = cv2.resize(out_bgr, (w, h), interpolation=cv2.INTER_LANCZOS4)
except Exception:
out_bgr = cv2.resize(out_bgr, (w, h), interpolation=cv2.INTER_LANCZOS4)
else:
out_bgr = cv2.resize(out_bgr, (w, h), interpolation=cv2.INTER_LANCZOS4)
# ── 3. CLAHE on L channel for final contrast refinement ───────────
lab = cv2.cvtColor(out_bgr, cv2.COLOR_BGR2LAB)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
lab[:, :, 0] = clahe.apply(lab[:, :, 0])
out_bgr = cv2.cvtColor(lab, cv2.COLOR_LAB2BGR)
# ── 4. Feather-blend onto result ──────────────────────────────────
msk = np.zeros((h, w), dtype=np.float32)
p = max(4, min(h, w) // 10)
msk[p:-p, p:-p] = 1.0
msk = cv2.GaussianBlur(msk, (0, 0), p // 2 or 1)
msk = msk[:, :, np.newaxis]
result[y1:y2, x1:x2] = (
out_bgr.astype(np.float32) * msk + orig.astype(np.float32) * (1 - msk)
).astype(np.uint8)
return result
# ── Laplacian pyramid blending ────────────────────────────────────────────
@staticmethod
def _face_ellipse_mask(shape: tuple, faces, expand: float = 0.35) -> np.ndarray:
"""
Soft elliptical mask covering all detected face regions.
255 = use swapped face, 0 = use original background.
"""
mask = np.zeros(shape[:2], dtype=np.uint8)
for face in faces:
box = face.bbox.astype(int)
x1 = max(box[0], 0); y1 = max(box[1], 0)
x2 = min(box[2], shape[1]); y2 = min(box[3], shape[0])
w, h = x2 - x1, y2 - y1
if w <= 0 or h <= 0:
continue
cx, cy = (x1 + x2) // 2, (y1 + y2) // 2
ax = int(w // 2 * (1 + expand))
ay = int(h // 2 * (1 + expand))
cv2.ellipse(mask, (cx, cy), (ax, ay), 0, 0, 360, 255, -1)
# Heavy Gaussian feather β€” wide transition = no visible seam
blur = max(31, min(mask.shape[:2]) // 10)
if blur % 2 == 0:
blur += 1
return cv2.GaussianBlur(mask, (blur, blur), 0)
@staticmethod
def _laplacian_blend(swapped: np.ndarray, original: np.ndarray,
mask: np.ndarray, levels: int = 6) -> np.ndarray:
"""
Laplacian pyramid blending.
Blends swapped face region onto original at multiple spatial scales
so no hard edge is visible regardless of skin tone or lighting.
mask: uint8 single-channel, 255 = take from swapped, 0 = take from original.
"""
A = swapped.astype(np.float32)
B = original.astype(np.float32)
M = mask.astype(np.float32) / 255.0
if M.ndim == 2:
M = M[:, :, np.newaxis]
# Expand to 3 channels so pyrDown/pyrUp never collapse the channel dim
M = np.repeat(M, 3, axis=2)
# Build Gaussian pyramids
gA, gB, gM = [A], [B], [M]
for _ in range(levels):
gA.append(cv2.pyrDown(gA[-1]))
gB.append(cv2.pyrDown(gB[-1]))
gM.append(cv2.pyrDown(gM[-1]))
# Build Laplacian pyramids
lA, lB = [], []
for i in range(levels):
sz = (gA[i].shape[1], gA[i].shape[0])
lA.append(gA[i] - cv2.pyrUp(gA[i + 1], dstsize=sz))
lB.append(gB[i] - cv2.pyrUp(gB[i + 1], dstsize=sz))
lA.append(gA[levels])
lB.append(gB[levels])
# Blend each level, reconstruct coarse→fine
result = lA[levels] * gM[levels] + lB[levels] * (1.0 - gM[levels])
for i in range(levels - 1, -1, -1):
sz = (lA[i].shape[1], lA[i].shape[0])
result = cv2.pyrUp(result, dstsize=sz) + lA[i] * gM[i] + lB[i] * (1.0 - gM[i])
return np.clip(result, 0, 255).astype(np.uint8)
# ── Public API ────────────────────────────────────────────────────────────
def swap(
self,
source_bgr: np.ndarray,
target_bgr: np.ndarray,
enhance: bool = True,
progress_cb=None,
):
"""
Swap the first detected face in *source_bgr* onto every face in
*target_bgr*. Applies Laplacian pyramid blending for seamless edges.
progress_cb: optional callable(fraction: float, label: str)
Returns:
(result_bgr, status_message)
"""
def _p(v, msg):
if progress_cb:
progress_cb(v, msg)
self._init()
_p(0.1, "Models ready β€” detecting faces…")
try:
MAX_DIM = 2048
orig_h, orig_w = target_bgr.shape[:2]
scale_down = 1.0
if max(orig_h, orig_w) > MAX_DIM:
scale_down = MAX_DIM / max(orig_h, orig_w)
target_bgr = cv2.resize(
target_bgr,
(int(orig_w * scale_down), int(orig_h * scale_down)),
interpolation=cv2.INTER_LANCZOS4,
)
source_faces = self._app.get(source_bgr)
_p(0.3, "Source face detected β€” scanning target…")
target_faces = self._app.get(target_bgr)
if not source_faces:
return None, "No face detected in source image."
if not target_faces:
return None, "No face detected in target image."
_p(0.45, f"Swapping {len(target_faces)} face(s)…")
source_face = source_faces[0]
result = target_bgr.copy()
original_bgr = target_bgr.copy() # kept for Laplacian blend
for tgt_face in target_faces:
result = self._swapper.get(
result, tgt_face, source_face, paste_back=True
)
# ── Laplacian pyramid blending β€” removes hard boundary ─────────
_p(0.65, "Blending edges (Laplacian pyramid)…")
blend_mask = self._face_ellipse_mask(original_bgr.shape, target_faces)
result = self._laplacian_blend(result, original_bgr, blend_mask)
# ── CodeFormer enhancement (images only) ──────────────────────
if enhance:
_p(0.80, "Enhancing quality (CodeFormer)…")
result = self._enhance_codeformer(result, target_faces)
# ── Upscale back to original resolution ───────────────────────
if scale_down < 1.0:
_p(0.95, "Upscaling to original resolution…")
result = cv2.resize(
result,
(orig_w, orig_h),
interpolation=cv2.INTER_LANCZOS4,
)
_p(1.0, f"Done β€” {len(target_faces)} face(s) swapped.")
return result, f"Swapped {len(target_faces)} face(s) successfully."
except Exception as exc:
return None, f"Face swap error: {exc}"
def get_source_face(self, source_bgr: np.ndarray):
"""
Detect and return the first face in *source_bgr*.
Call once before a video loop and reuse the result in swap_frame().
Returns:
face object or None
"""
self._init()
faces = self._app.get(source_bgr)
return faces[0] if faces else None
def swap_frame(
self,
target_bgr: np.ndarray,
source_face,
cached_target_faces=None,
enhance: bool = False,
):
"""
Fast path for video β€” reuses a pre-computed source_face and optionally
cached target faces (re-detection skipped when supplied).
Returns:
(result_bgr, target_faces_used)
"""
self._init()
# Cap video frames at 720p for speed; quality still good for motion
MAX_VIDEO_DIM = 720
orig_h, orig_w = target_bgr.shape[:2]
scale_down = 1.0
if max(orig_h, orig_w) > MAX_VIDEO_DIM:
scale_down = MAX_VIDEO_DIM / max(orig_h, orig_w)
target_bgr = cv2.resize(
target_bgr,
(int(orig_w * scale_down), int(orig_h * scale_down)),
interpolation=cv2.INTER_LINEAR,
)
if cached_target_faces is None:
# Use smaller det_size for video to speed up detection
self._app.det_model.input_size = (320, 320)
target_faces = self._app.get(target_bgr)
self._app.det_model.input_size = (640, 640) # restore for images
else:
target_faces = cached_target_faces
if not target_faces:
return None, []
result = target_bgr.copy()
for tgt_face in target_faces:
result = self._swapper.get(result, tgt_face, source_face, paste_back=True)
# No per-frame enhancement for video β€” temporally unstable (causes flicker).
# FFmpeg unsharp filter handles sharpening globally at encode time.
# Scale back up to original frame size
if scale_down < 1.0:
result = cv2.resize(result, (orig_w, orig_h), interpolation=cv2.INTER_LINEAR)
return result, target_faces