| import os |
| import uuid |
| import time |
| from dataclasses import dataclass |
| from typing import Any, Dict, List, Optional, Tuple |
|
|
| import gradio as gr |
| import numpy as np |
| from PIL import Image, ImageDraw, ImageFont |
|
|
|
|
| APP_NAME = "FaceSwap AI" |
| DEFAULT_REMOTE_SPACE_ID = os.getenv("REMOTE_SPACE_ID", "felixrosberg/face-swap") |
| HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACEHUB_API_TOKEN") |
|
|
| OUTPUT_DIR = os.path.join(os.path.dirname(__file__), "outputs") |
| EXAMPLES_DIR = os.path.join(os.path.dirname(__file__), "assets", "examples") |
| MODELS_DIR = os.path.join(os.path.dirname(__file__), "models") |
|
|
|
|
| @dataclass |
| class SwapResult: |
| output_path: str |
| share_url: str |
| error: Optional[str] = None |
|
|
|
|
| def _ensure_dirs() -> None: |
| os.makedirs(OUTPUT_DIR, exist_ok=True) |
| os.makedirs(EXAMPLES_DIR, exist_ok=True) |
| os.makedirs(MODELS_DIR, exist_ok=True) |
|
|
|
|
| def _pil_from_any(img: Any) -> Image.Image: |
| if img is None: |
| raise ValueError("No image provided.") |
| if isinstance(img, Image.Image): |
| return img.convert("RGB") |
| if isinstance(img, np.ndarray): |
| if img.ndim == 2: |
| return Image.fromarray(img).convert("RGB") |
| if img.ndim == 3: |
| return Image.fromarray(img[:, :, :3]).convert("RGB") |
| if isinstance(img, str) and os.path.exists(img): |
| return Image.open(img).convert("RGB") |
| raise ValueError("Unsupported image format.") |
|
|
|
|
| def _save_temp_upload(img: Image.Image, prefix: str) -> str: |
| _ensure_dirs() |
| fp = os.path.join(OUTPUT_DIR, f"{prefix}_{uuid.uuid4().hex}.png") |
| img.save(fp, format="PNG") |
| return fp |
|
|
|
|
| def _detect_faces_haar(pil_img: Image.Image) -> int: |
| """ |
| Lightweight face detection for user-friendly errors. |
| This is not used for swapping; only for "No face detected" messaging. |
| """ |
| try: |
| import cv2 |
|
|
| cv_img = cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2BGR) |
| gray = cv2.cvtColor(cv_img, cv2.COLOR_BGR2GRAY) |
| cascade = cv2.CascadeClassifier( |
| os.path.join(cv2.data.haarcascades, "haarcascade_frontalface_default.xml") |
| ) |
| faces = cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(60, 60)) |
| return int(len(faces)) |
| except Exception: |
| |
| return 1 |
|
|
|
|
| def _onnx_providers() -> List[str]: |
| """ |
| Best-effort provider selection for ONNXRuntime / InsightFace. |
| Set `FORCE_CPU=1` to disable CUDA even if available. |
| """ |
| force_cpu = os.getenv("FORCE_CPU", "").strip().lower() in {"1", "true", "yes", "y"} |
| if force_cpu: |
| return ["CPUExecutionProvider"] |
|
|
| try: |
| import onnxruntime as ort |
|
|
| available = set(ort.get_available_providers()) |
| if "CUDAExecutionProvider" in available: |
| return ["CUDAExecutionProvider", "CPUExecutionProvider"] |
| except Exception: |
| pass |
|
|
| return ["CPUExecutionProvider"] |
|
|
|
|
| def _ensure_inswapper_onnx() -> str: |
| """ |
| Ensures `inswapper_128.onnx` exists locally and returns its path. |
| |
| You can override with: |
| - `INSWAPPER_ONNX_PATH` (absolute/relative path) |
| - `INSWAPPER_REPO_ID` and `INSWAPPER_FILENAME` for HF download |
| """ |
| override = os.getenv("INSWAPPER_ONNX_PATH", "").strip() |
| if override: |
| p = override |
| if not os.path.isabs(p): |
| p = os.path.join(os.path.dirname(__file__), p) |
| if not os.path.exists(p): |
| raise FileNotFoundError(f"INSWAPPER_ONNX_PATH not found: {p}") |
| return p |
|
|
| _ensure_dirs() |
| local_path = os.path.join(MODELS_DIR, "inswapper_128.onnx") |
| if os.path.exists(local_path): |
| return local_path |
|
|
| |
| repo_id = os.getenv("INSWAPPER_REPO_ID", "ezioruan/inswapper_128.onnx").strip() |
| filename = os.getenv("INSWAPPER_FILENAME", "inswapper_128.onnx").strip() |
|
|
| try: |
| from huggingface_hub import hf_hub_download |
|
|
| downloaded = hf_hub_download( |
| repo_id=repo_id, |
| filename=filename, |
| token=HF_TOKEN, |
| ) |
| |
| |
| import shutil |
|
|
| shutil.copyfile(downloaded, local_path) |
| return local_path |
| except Exception as e: |
| raise RuntimeError( |
| "Could not download inswapper ONNX model.\n" |
| f"- Tried repo `{repo_id}` file `{filename}`\n" |
| f"- You can also set `INSWAPPER_ONNX_PATH` to a local file.\n" |
| f"Error: {e}" |
| ) |
|
|
|
|
| _IFACE_ANALYZER = None |
| _IFACE_SWAPPER = None |
|
|
|
|
| def _load_local_faceswap_models(): |
| """ |
| Lazy-load InsightFace analyzer + inswapper ONNX swapper. |
| Returns (analyzer, swapper). |
| """ |
| global _IFACE_ANALYZER, _IFACE_SWAPPER |
| if _IFACE_ANALYZER is not None and _IFACE_SWAPPER is not None: |
| return _IFACE_ANALYZER, _IFACE_SWAPPER |
|
|
| try: |
| import insightface |
| from insightface.app import FaceAnalysis |
| except Exception as e: |
| raise RuntimeError( |
| "Missing dependency for local live swap. Install `insightface`.\n" |
| f"Error: {e}" |
| ) |
|
|
| providers = _onnx_providers() |
| |
| analyzer = FaceAnalysis(name="buffalo_l", providers=providers) |
| analyzer.prepare(ctx_id=0 if providers[0] != "CPUExecutionProvider" else -1, det_size=(640, 640)) |
|
|
| onnx_path = _ensure_inswapper_onnx() |
| swapper = insightface.model_zoo.get_model(onnx_path, providers=providers) |
|
|
| _IFACE_ANALYZER, _IFACE_SWAPPER = analyzer, swapper |
| return analyzer, swapper |
|
|
|
|
| def _largest_face(faces: List[Any]) -> Optional[Any]: |
| if not faces: |
| return None |
| best = None |
| best_area = -1 |
| for f in faces: |
| try: |
| x1, y1, x2, y2 = f.bbox.astype(int).tolist() |
| area = max(0, x2 - x1) * max(0, y2 - y1) |
| except Exception: |
| area = -1 |
| if area > best_area: |
| best_area = area |
| best = f |
| return best |
|
|
|
|
| def _np_rgb_to_bgr(img: np.ndarray) -> np.ndarray: |
| |
| if img is None: |
| raise ValueError("No image provided.") |
| if img.ndim != 3 or img.shape[2] < 3: |
| raise ValueError("Expected a 3-channel color image.") |
|
|
| rgb = img[:, :, :3] |
| if rgb.dtype != np.uint8: |
| |
| mx = float(np.max(rgb)) if rgb.size else 255.0 |
| if mx <= 1.5: |
| rgb = np.clip(rgb, 0.0, 1.0) * 255.0 |
| else: |
| rgb = np.clip(rgb, 0.0, 255.0) |
| rgb = rgb.astype(np.uint8) |
|
|
| return rgb[:, :, ::-1].copy() |
|
|
|
|
| def _np_bgr_to_rgb(img: np.ndarray) -> np.ndarray: |
| if img is None: |
| raise ValueError("No image provided.") |
| if img.ndim != 3 or img.shape[2] < 3: |
| return img |
| return img[:, :, :3][:, :, ::-1].copy() |
|
|
|
|
| def _watermark(pil_img: Image.Image, text: str = "FaceSwap AI • demo") -> Image.Image: |
| img = pil_img.copy().convert("RGBA") |
| w, h = img.size |
|
|
| overlay = Image.new("RGBA", img.size, (0, 0, 0, 0)) |
| draw = ImageDraw.Draw(overlay) |
|
|
| |
| font_size = max(14, int(min(w, h) * 0.03)) |
| try: |
| font = ImageFont.truetype("DejaVuSans.ttf", font_size) |
| except Exception: |
| font = ImageFont.load_default() |
|
|
| padding = max(10, int(font_size * 0.6)) |
| tw, th = draw.textbbox((0, 0), text, font=font)[2:] |
| x = w - tw - padding |
| y = h - th - padding |
|
|
| |
| bg_pad = max(6, int(font_size * 0.5)) |
| draw.rounded_rectangle( |
| (x - bg_pad, y - bg_pad, x + tw + bg_pad, y + th + bg_pad), |
| radius=max(6, int(font_size * 0.6)), |
| fill=(0, 0, 0, 110), |
| ) |
| draw.text((x, y), text, font=font, fill=(255, 255, 255, 220)) |
|
|
| return Image.alpha_composite(img, overlay).convert("RGB") |
|
|
|
|
| def _host_base_url() -> str: |
| |
| for k in ("SPACE_HOST", "HOST", "GRADIO_SERVER_NAME"): |
| v = os.getenv(k) |
| if v and v.startswith("http"): |
| return v.rstrip("/") |
|
|
| space_id = os.getenv("SPACE_ID") |
| if space_id: |
| return f"https://{space_id.replace('/', '-')}.hf.space" |
| return "" |
|
|
|
|
| def _make_share_url(local_file_path: str) -> str: |
| |
| |
| base = _host_base_url() |
| if not base: |
| return "" |
| |
| |
| rel = os.path.relpath(local_file_path, os.path.dirname(__file__)).replace("\\", "/") |
| return f"{base}/file={rel}" |
|
|
|
|
| def _call_remote_space( |
| source_pil: Image.Image, |
| target_pil: Image.Image, |
| *, |
| defense_ratio: int, |
| blend_ratio: int, |
| options: List[str], |
| remote_space_id: str, |
| ) -> Image.Image: |
| """ |
| Calls a remote Gradio Space as the "cloud inference" backend. |
| Default backend: felixrosberg/face-swap (FaceDancer). |
| """ |
| from gradio_client import Client, handle_file |
|
|
| client = Client(remote_space_id, token=HF_TOKEN) |
|
|
| |
| src_path = _save_temp_upload(source_pil, "source") |
| trg_path = _save_temp_upload(target_pil, "target") |
|
|
| |
| |
| out = client.predict( |
| handle_file(trg_path), |
| handle_file(src_path), |
| int(defense_ratio), |
| int(blend_ratio), |
| options, |
| api_name="/run_inference", |
| ) |
|
|
| return _pil_from_any(out) |
|
|
|
|
| def _call_custom_endpoint( |
| source_pil: Image.Image, |
| target_pil: Image.Image, |
| *, |
| strength: float, |
| steps: int, |
| guidance: float, |
| ) -> Image.Image: |
| """ |
| Optional BYO endpoint mode. |
| Contract: POST $HF_INFERENCE_ENDPOINT_URL with multipart form: |
| - source: image file |
| - target: image file |
| - strength: float |
| - steps: int |
| - guidance: float |
| Returns: image bytes (PNG/JPEG) in response body. |
| """ |
| import requests |
|
|
| url = os.getenv("HF_INFERENCE_ENDPOINT_URL", "").strip() |
| if not url: |
| raise ValueError("Custom endpoint URL is not set.") |
|
|
| src_bytes = _pil_to_png_bytes(source_pil) |
| trg_bytes = _pil_to_png_bytes(target_pil) |
| files = { |
| "source": ("source.png", src_bytes, "image/png"), |
| "target": ("target.png", trg_bytes, "image/png"), |
| } |
| data = {"strength": str(strength), "steps": str(int(steps)), "guidance": str(guidance)} |
| headers = {} |
| token = os.getenv("HF_ENDPOINT_TOKEN") or HF_TOKEN |
| if token: |
| headers["Authorization"] = f"Bearer {token}" |
|
|
| resp = requests.post(url, files=files, data=data, headers=headers, timeout=180) |
| if resp.status_code >= 400: |
| raise RuntimeError(f"Endpoint error {resp.status_code}: {resp.text[:300]}") |
| return Image.open(_bytes_io(resp.content)).convert("RGB") |
|
|
|
|
| def _bytes_io(b: bytes): |
| import io |
|
|
| return io.BytesIO(b) |
|
|
|
|
| def _pil_to_png_bytes(img: Image.Image) -> bytes: |
| import io |
|
|
| buf = io.BytesIO() |
| img.save(buf, format="PNG") |
| return buf.getvalue() |
|
|
|
|
| def _download_example_images() -> List[Tuple[str, str]]: |
| """ |
| Downloads a couple of lightweight example images on first run. |
| Returned list is (source_path, target_path) pairs. |
| """ |
| import requests |
|
|
| _ensure_dirs() |
| examples: List[Tuple[str, str]] = [] |
|
|
| |
| |
| pairs = [ |
| ( |
| "https://upload.wikimedia.org/wikipedia/commons/thumb/3/37/Face_of_a_young_woman.jpg/512px-Face_of_a_young_woman.jpg", |
| "https://upload.wikimedia.org/wikipedia/commons/thumb/0/0b/Barack_Obama.jpg/512px-Barack_Obama.jpg", |
| ), |
| ( |
| "https://upload.wikimedia.org/wikipedia/commons/thumb/5/50/Vd-Orig.png/512px-Vd-Orig.png", |
| "https://upload.wikimedia.org/wikipedia/commons/thumb/8/8d/Portrait_Placeholder.png/512px-Portrait_Placeholder.png", |
| ), |
| ] |
|
|
| def fetch(url: str, out_path: str) -> None: |
| if os.path.exists(out_path): |
| return |
| r = requests.get(url, timeout=60) |
| r.raise_for_status() |
| with open(out_path, "wb") as f: |
| f.write(r.content) |
|
|
| for i, (src_url, trg_url) in enumerate(pairs, start=1): |
| src_path = os.path.join(EXAMPLES_DIR, f"source_{i}.jpg") |
| trg_path = os.path.join(EXAMPLES_DIR, f"target_{i}.jpg") |
| try: |
| fetch(src_url, src_path) |
| fetch(trg_url, trg_path) |
| examples.append((src_path, trg_path)) |
| except Exception: |
| |
| continue |
|
|
| return examples |
|
|
|
|
| def swap_faces( |
| source_img: Any, |
| target_img: Any, |
| consent_ok: bool, |
| strength: float, |
| steps: int, |
| guidance: float, |
| backend: str, |
| history: List[Dict[str, str]], |
| ) -> Tuple[Any, Any, Any, List[Dict[str, str]], str]: |
| if not consent_ok: |
| return None, None, None, history, "Please confirm you have consent to swap faces." |
|
|
| try: |
| src = _pil_from_any(source_img) |
| trg = _pil_from_any(target_img) |
| except Exception as e: |
| return None, None, None, history, str(e) |
|
|
| |
| if _detect_faces_haar(src) < 1: |
| return None, None, None, history, "No face detected in Source Face." |
| if _detect_faces_haar(trg) < 1: |
| return None, None, None, history, "No face detected in Target Photo." |
|
|
| try: |
| t0 = time.time() |
| if backend == "Cloud (FaceDancer Space)": |
| |
| |
| |
| out = _call_remote_space( |
| src, |
| trg, |
| defense_ratio=100, |
| blend_ratio=int(np.clip(strength * 100, 0, 100)), |
| options=[], |
| remote_space_id=DEFAULT_REMOTE_SPACE_ID, |
| ) |
| else: |
| out = _call_custom_endpoint(src, trg, strength=strength, steps=steps, guidance=guidance) |
|
|
| out = _watermark(out) |
|
|
| _ensure_dirs() |
| out_path = os.path.join(OUTPUT_DIR, f"faceswap_{uuid.uuid4().hex}.png") |
| out.save(out_path, format="PNG") |
|
|
| share = _make_share_url(out_path) |
| elapsed = time.time() - t0 |
|
|
| history = [{"result": out_path, "source": _save_temp_upload(src, "src"), "target": _save_temp_upload(trg, "trg")}][ |
| :1 |
| ] + history |
| history = history[:12] |
|
|
| status = f"Done in {elapsed:.1f}s." |
| if share: |
| status += f" Share link: {share}" |
| return trg, out, out_path, history, status |
| except Exception as e: |
| msg = str(e) |
| if "Could not find Space" in msg or "404" in msg: |
| msg = ( |
| "Cloud backend unavailable. Try again, or configure a custom endpoint. " |
| "See README for deployment options." |
| ) |
| return None, None, None, history, msg |
|
|
|
|
| CSS = """ |
| .fsai-wrap { max-width: 1200px; margin: 0 auto; } |
| .fsai-hero { font-size: 28px; font-weight: 700; margin: 8px 0 4px; } |
| .fsai-sub { opacity: 0.8; margin-top: 0; } |
| .fsai-warn { border: 1px solid rgba(255,255,255,0.12); border-radius: 12px; padding: 12px 14px; } |
| @media (prefers-color-scheme: dark) { |
| .fsai-warn { background: rgba(255,255,255,0.04); } |
| } |
| @media (prefers-color-scheme: light) { |
| .fsai-warn { background: rgba(0,0,0,0.03); } |
| } |
| """ |
|
|
|
|
| def build_demo() -> gr.Blocks: |
| _ensure_dirs() |
| examples = _download_example_images() |
|
|
| theme = gr.themes.Soft(primary_hue="violet", neutral_hue="slate") |
|
|
| with gr.Blocks(theme=theme, css=CSS, title=APP_NAME) as demo: |
| gr.HTML( |
| f""" |
| <div class="fsai-wrap"> |
| <div class="fsai-hero">{APP_NAME}</div> |
| <p class="fsai-sub">Swap faces in photos (cloud) or live webcam (local ONNX). Use only with consent.</p> |
| </div> |
| """ |
| ) |
|
|
| with gr.Tabs(): |
| with gr.Tab("Photo Swap (Cloud)"): |
| with gr.Accordion("Consent & Safety (required)", open=True): |
| gr.Markdown( |
| """ |
| **Important:** Only upload photos you own or have explicit permission to edit. |
| |
| - **Consent**: You confirm you have consent from any person depicted. |
| - **No misuse**: Do not use for harassment, impersonation, fraud, or sexual content. |
| - **Watermark**: Outputs are watermarked to discourage misuse. |
| """ |
| ) |
| consent = gr.Checkbox(label="I confirm I have consent and will use this responsibly.") |
|
|
| with gr.Row(): |
| with gr.Column(scale=1): |
| source = gr.Image(label="Source Face", type="pil", height=320) |
| with gr.Column(scale=1): |
| target = gr.Image(label="Target Photo", type="pil", height=320) |
|
|
| with gr.Row(): |
| backend = gr.Radio( |
| choices=["Cloud (FaceDancer Space)", "Custom Endpoint (HF Inference Endpoint / your API)"], |
| value="Cloud (FaceDancer Space)", |
| label="Inference backend", |
| ) |
|
|
| with gr.Accordion("Advanced options", open=False): |
| strength = gr.Slider( |
| 0.0, |
| 1.0, |
| value=0.8, |
| step=0.05, |
| label="Swap strength", |
| info="Higher = stronger identity transfer. (Cloud backend maps this to blend ratio.)", |
| ) |
| steps = gr.Slider( |
| 10, 60, value=30, step=1, label="Steps", info="Used by Custom Endpoint backends." |
| ) |
| guidance = gr.Slider( |
| 1.0, |
| 10.0, |
| value=4.5, |
| step=0.5, |
| label="Guidance scale", |
| info="Used by Custom Endpoint backends.", |
| ) |
|
|
| swap_btn = gr.Button("Swap Faces", variant="primary", size="lg") |
| status = gr.Markdown(value="", elem_classes=["fsai-wrap"]) |
|
|
| with gr.Row(): |
| before = gr.Image(label="Before (Target)", type="pil", height=360) |
| after = gr.Image(label="After (Result)", type="pil", height=360) |
|
|
| with gr.Row(): |
| download = gr.File(label="Download result", file_types=[".png"]) |
|
|
| history_state = gr.State([]) |
| gallery = gr.Gallery(label="History (this session)", columns=4, height=260, preview=True) |
|
|
| def _history_to_gallery(items: List[Dict[str, str]]) -> List[str]: |
| return [it["result"] for it in items if "result" in it and os.path.exists(it["result"])] |
|
|
| def _swap_and_gallery(*args): |
| b, a, f, hist, msg = swap_faces(*args) |
| return b, a, f, hist, _history_to_gallery(hist), msg |
|
|
| swap_btn.click( |
| _swap_and_gallery, |
| inputs=[source, target, consent, strength, steps, guidance, backend, history_state], |
| outputs=[before, after, download, history_state, gallery, status], |
| ) |
|
|
| if examples: |
| gr.Examples( |
| examples=examples, |
| inputs=[source, target], |
| label="Examples", |
| examples_per_page=4, |
| ) |
|
|
| with gr.Accordion("Setup notes", open=False): |
| gr.Markdown( |
| f""" |
| **Default cloud backend:** `{DEFAULT_REMOTE_SPACE_ID}` via Gradio Spaces API. |
| |
| To use a custom backend, set: |
| - `HF_INFERENCE_ENDPOINT_URL` (your endpoint URL) |
| - optional `HF_ENDPOINT_TOKEN` (Bearer token) |
| |
| See `README.md` for a 5-minute deploy guide. |
| """ |
| ) |
|
|
| with gr.Tab("Live Swap (Local ONNX)"): |
| gr.Markdown( |
| """ |
| Upload a **source face** — it **locks automatically** (with consent checked) so the **webcam** |
| shows the swap **in real time**. You can use **Re-lock** if you change the photo. |
| This runs locally using **InsightFace + ONNXRuntime** with `inswapper_128.onnx`. |
| |
| Tip: For best results, use a clear, front-facing source photo and good lighting. |
| """ |
| ) |
|
|
| live_consent = gr.Checkbox( |
| label="I confirm I have consent and will use this responsibly.", |
| value=False, |
| ) |
|
|
| with gr.Row(): |
| live_source = gr.Image(label="Source Face (identity to use)", type="numpy", height=260) |
| live_source_status = gr.Markdown(value="") |
|
|
| source_face_state = gr.State(None) |
|
|
| def _set_live_source(source_np: Any, consent_ok: bool): |
| if not consent_ok: |
| return None, "Please confirm consent to enable live swap." |
| if source_np is None: |
| return None, "Upload a source face image." |
|
|
| analyzer, _ = _load_local_faceswap_models() |
| src_bgr = _np_rgb_to_bgr(np.array(source_np)) |
| faces = analyzer.get(src_bgr) |
| src_face = _largest_face(faces) |
| if src_face is None: |
| return None, "No face detected in source image." |
| return src_face, "Source face locked — webcam shows live swap." |
|
|
| live_set_btn = gr.Button("Re-lock source face", variant="secondary") |
| _live_source_inputs = [live_source, live_consent] |
| _live_source_outputs = [source_face_state, live_source_status] |
| live_source.change(_set_live_source, inputs=_live_source_inputs, outputs=_live_source_outputs) |
| live_consent.change(_set_live_source, inputs=_live_source_inputs, outputs=_live_source_outputs) |
| live_set_btn.click(_set_live_source, inputs=_live_source_inputs, outputs=_live_source_outputs) |
|
|
| with gr.Row(): |
| webcam = gr.Image( |
| label="Webcam", |
| sources=["webcam"], |
| streaming=True, |
| type="numpy", |
| height=420, |
| ) |
| live_out = gr.Image(label="Live swapped output", type="numpy", height=420) |
|
|
| live_status = gr.Markdown(value="") |
|
|
| def _live_swap(frame_np: Any, src_face: Any, consent_ok: bool): |
| if not consent_ok: |
| return frame_np, "Consent not confirmed." |
| if frame_np is None: |
| return None, "" |
| if src_face is None: |
| return frame_np, "Lock a source face first." |
|
|
| analyzer, swapper = _load_local_faceswap_models() |
| frame_bgr = _np_rgb_to_bgr(np.array(frame_np)) |
| faces = analyzer.get(frame_bgr) |
| tgt_face = _largest_face(faces) |
| if tgt_face is None: |
| return _np_bgr_to_rgb(frame_bgr), "No face detected in webcam frame." |
|
|
| try: |
| swapped_bgr = swapper.get(frame_bgr, tgt_face, src_face, paste_back=True) |
| except Exception as e: |
| return _np_bgr_to_rgb(frame_bgr), f"Swap error: {e}" |
|
|
| return _np_bgr_to_rgb(swapped_bgr), "" |
|
|
| webcam.stream( |
| _live_swap, |
| inputs=[webcam, source_face_state, live_consent], |
| outputs=[live_out, live_status], |
| ) |
|
|
| return demo |
|
|
|
|
| if __name__ == "__main__": |
| build_demo().launch() |
|
|
|
|