import os # Patch gradio_client to handle boolean JSON schemas (fixes TypeError in get_api_info) try: import gradio_client.utils as _gcu _orig_jsch = _gcu._json_schema_to_python_type def _patched_jsch(schema, defs=None): if not isinstance(schema, dict): return 'Any' return _orig_jsch(schema, defs) _gcu._json_schema_to_python_type = _patched_jsch except Exception: pass import tempfile import gradio as gr import numpy as np from PIL import Image import spaces import torch from composer import compose_frames, crop_reserved_region from fastswap import fast_swap_video from pipeline import load_pipeline, run_inference from video_utils import ( compute_target_size, extract_audio, frames_for_duration, load_video_frames, resize_frames, save_video, ) DEFAULT_RESOLUTION = 768 REGION_SIZE = 256 _face_analysis = None _describe_model = None _describe_proc = None # ZeroGPU: the pipeline MUST load at module level. @spaces.GPU calls run in a # forked process that is discarded afterward, so lazy-loading inside generate() # would reload the full 13B model on every single click (and blow the GPU # window). pipe.to("cuda") here is virtualized by the spaces package until a # GPU is actually attached. print("Loading LTX pipeline at startup (this takes a few minutes on first boot)…") _pipeline_state = load_pipeline(progress_cb=lambda m: print(f"[startup] {m}")) print("Pipeline ready.") def make_temp_file(suffix: str) -> str: fd, path = tempfile.mkstemp(suffix=suffix) os.close(fd) return path # ── Face alignment (CPU) ────────────────────────────────────────────────────── def _get_face_analysis(): global _face_analysis if _face_analysis is None: from insightface.app import FaceAnalysis _face_analysis = FaceAnalysis(name='buffalo_l', providers=['CPUExecutionProvider']) _face_analysis.prepare(ctx_id=-1, det_size=(640, 640)) return _face_analysis def align_face_image(pil_image: Image.Image): """Detect face, crop with padding. Returns (cropped_pil, status_str).""" try: fa = _get_face_analysis() img_rgb = np.array(pil_image.convert('RGB')) img_bgr = img_rgb[:, :, ::-1].copy() faces = fa.get(img_bgr) if not faces: return pil_image, "No face detected — using full image." face = faces[0] x1, y1, x2, y2 = face.bbox.astype(int) fw, fh = x2 - x1, y2 - y1 pad_x, pad_y = int(fw * 0.45), int(fh * 0.55) H, W = img_rgb.shape[:2] x1 = max(0, x1 - pad_x) y1 = max(0, y1 - pad_y) x2 = min(W, x2 + pad_x) y2 = min(H, y2 + pad_y) return Image.fromarray(img_rgb[y1:y2, x1:x2]), "Face aligned ✓" except Exception as e: return pil_image, f"Alignment skipped: {e}" # ── Face enhancement (GPU, optional) ───────────────────────────────────────── def _get_face_enhancer(): from gfpgan import GFPGANer return GFPGANer( model_path='https://github.com/TencentARC/GFPGAN/releases/download/v1.3.0/GFPGANv1.4.pth', upscale=1, arch='clean', channel_multiplier=2, ) def enhance_video_frames(frames: np.ndarray) -> np.ndarray: """Apply GFPGAN to each frame. frames: (N, H, W, 3) RGB uint8.""" enhancer = _get_face_enhancer() out = [] for frame in frames: bgr = frame[:, :, ::-1].copy() try: _, _, restored = enhancer.enhance( bgr, has_aligned=False, only_center_face=False, paste_back=True ) out.append(restored[:, :, ::-1]) except Exception: out.append(frame) return np.stack(out) # ── Auto face description (GPU) ─────────────────────────────────────────────── @spaces.GPU(duration=120) def auto_describe_face(face_image): global _describe_model, _describe_proc if face_image is None: return gr.update() if _describe_model is None: from transformers import Blip2Processor, Blip2ForConditionalGeneration _describe_proc = Blip2Processor.from_pretrained("Salesforce/blip2-opt-2.7b") _describe_model = Blip2ForConditionalGeneration.from_pretrained( "Salesforce/blip2-opt-2.7b", torch_dtype=torch.float16, ).cuda().eval() if not isinstance(face_image, Image.Image): face_image = Image.fromarray(face_image) question = ( "Question: Describe the facial features of this person in detail. " "Include approximate age, gender, hair color and length, eye color, " "skin tone, any facial hair, and distinctive features. " "Be specific and concise. Answer:" ) inputs = _describe_proc(face_image, question, return_tensors="pt").to("cuda", torch.float16) input_len = inputs["input_ids"].shape[1] with torch.no_grad(): ids = _describe_model.generate(**inputs, max_new_tokens=200) caption = _describe_proc.batch_decode(ids[:, input_len:], skip_special_tokens=True)[0].strip() return ( "head_swap:\n" f"FACE: {caption}\n\n" "ACTION: " ) # ── Main generation ─────────────────────────────────────────────────────────── def _extract_video_path(v) -> str | None: """Normalize whatever Gradio 5.x passes for a Video component to a plain path string.""" if v is None: return None if isinstance(v, str): return v if isinstance(v, dict): video = v.get("video") or v.get("path") if isinstance(video, dict): return video.get("path") return video if hasattr(v, "video"): vv = v.video return vv.path if hasattr(vv, "path") else str(vv) if hasattr(v, "path"): return str(v.path) return str(v) def _generate_duration(guide_video_raw, face_image, prompt, duration, fps, lora_strength, seed, condition_mode, condition_strength, denoise_strength, enhance_faces, *args, **kwargs): # Fast swap runs InsightFace on CPU (detection+recognition only, 320px — see fastswap.py). # Generous window covers the first-call ~1GB model downloads + GFPGAN (GFPGAN runs on the # torch GPU, so it's fast). ZeroGPU allows this (DreamBoat uses 700). if str(condition_mode).startswith("Fast"): return 500 if enhance_faces else 400 return 300 @spaces.GPU(duration=_generate_duration) def generate( guide_video_raw, face_image, prompt, duration, fps, lora_strength, seed, condition_mode, condition_strength, denoise_strength, enhance_faces, progress=gr.Progress(), ): try: return _generate_inner( guide_video_raw, face_image, prompt, duration, fps, lora_strength, seed, condition_mode, condition_strength, denoise_strength, enhance_faces, progress, ) except Exception as e: import traceback tb = traceback.format_exc() return None, f"ERROR — {type(e).__name__}: {e}\n\n{tb}" def _generate_inner( guide_video_raw, face_image, prompt, duration, fps, lora_strength, seed, condition_mode, condition_strength, denoise_strength, enhance_faces, progress, ): guide_video_path = _extract_video_path(guide_video_raw) if not guide_video_path: return None, "Please upload a guide video." if face_image is None: return None, "Please upload a reference face image." is_fast = condition_mode.startswith("Fast") if not is_fast and (not prompt or not str(prompt).strip()): return None, "Please enter a text prompt (diffusion modes only)." if not os.path.isfile(guide_video_path): return None, f"Guide video path is not a real file: {guide_video_path}" progress(0, desc="Aligning reference face…") if not isinstance(face_image, Image.Image): face_image = Image.fromarray(face_image) aligned_face, align_msg = align_face_image(face_image) progress(0.05, desc="Loading guide video…") frames, source_fps = load_video_frames(guide_video_path) if len(frames) == 0: return None, "Could not read frames from the guide video." total_secs = len(frames) / max(source_fps, 1) trim_note = f" (trimmed from {total_secs:.1f}s)" if total_secs > duration + 0.5 else "" audio_tmp = make_temp_file(".wav") has_audio = extract_audio(guide_video_path, audio_tmp) progress(0.10, desc="Resizing frames…") orig_h, orig_w = frames.shape[1], frames.shape[2] target_w, target_h = compute_target_size(orig_w, orig_h, DEFAULT_RESOLUTION) frames = resize_frames(frames, target_w, target_h) n_frames = frames_for_duration(fps, duration) if len(frames) >= n_frames: frames = frames[:n_frames] else: pad = np.stack([frames[-1]] * (n_frames - len(frames))) frames = np.concatenate([frames, pad], axis=0) if is_fast: # InsightFace inswapper — deterministic per-frame swap, no prompt needed. # Full-quality face image (not the tight aligned crop) gives inswapper # more landmarks to work with. progress(0.15, desc="Fast swap (InsightFace)…") cropped, swap_msg = fast_swap_video( frames, face_image, progress_cb=lambda frac, msg: progress(0.15 + frac * 0.7, desc=msg), ) align_msg = f"{align_msg} {swap_msg}" else: progress(0.15, desc="Compositing reference face strip…") composed = compose_frames( frames, aligned_face, region_position="left", region_size_px=REGION_SIZE, ) progress(0.20, desc="Running LTX diffusion…") generated = run_inference( _pipeline_state, composed, prompt=prompt, fps=fps, lora_strength=lora_strength, seed=int(seed), condition_mode=condition_mode, condition_strength=condition_strength, denoise_strength=denoise_strength, progress_cb=lambda msg: progress(0.20, desc=msg), ) progress(0.90, desc="Cropping reserved region…") cropped = crop_reserved_region( generated, region_position="left", region_size_px=REGION_SIZE, output_size=(target_w, target_h), ) if enhance_faces: progress(0.92, desc="Enhancing faces (GFPGAN)…") cropped = enhance_video_frames(cropped) progress(0.95, desc="Encoding output video…") out_path = make_temp_file(".mp4") save_video( cropped, fps=fps, output_path=out_path, audio_path=audio_tmp if has_audio else None, audio_duration=duration, ) if not os.path.isfile(out_path): return None, f"Output file was not created: {out_path}" progress(1.0, desc="Done.") return out_path, f"Generation complete.{trim_note} {align_msg}" # ── UI ──────────────────────────────────────────────────────────────────────── # ============ Quick Start Scenarios (one-click: sets the swap mode + all tuning, and # tells you what the GUIDE VIDEO should be) ============ # "Fast swap" is the reliable InsightFace path and ignores the prompt, so those # scenarios leave the prompt untouched (None). The experimental V2V one sets a template. FACEOFF_SCENARIOS = { "Talking head / vlog (best quality)": { "mode": "Fast swap (InsightFace) — recommended", "cond": 0.7, "denoise": 1.0, "lora": 1.0, "duration": 4, "fps": 24, "enhance": True, "prompt": None, "hint": "a guide video of someone talking to camera, head-and-shoulders framing, face clearly visible and well lit, minimal fast motion.", }, "Full-body performance / dance": { "mode": "Fast swap (InsightFace) — recommended", "cond": 0.7, "denoise": 1.0, "lora": 1.0, "duration": 5, "fps": 24, "enhance": True, "prompt": None, "hint": "a guide video with the full body in frame and the face clearly visible. Even lighting; avoid heavy motion blur on the face.", }, "Close-up (max face fidelity)": { "mode": "Fast swap (InsightFace) — recommended", "cond": 0.7, "denoise": 1.0, "lora": 1.0, "duration": 3, "fps": 24, "enhance": True, "prompt": None, "hint": "a tight close-up of the face, sharp and well lit, looking near the camera. This gives the best identity match.", }, "Quick preview (fast + cheap)": { "mode": "Fast swap (InsightFace) — recommended", "cond": 0.7, "denoise": 1.0, "lora": 1.0, "duration": 2, "fps": 16, "enhance": False, "prompt": None, "hint": "any short clip. Low fps + 2s to test the swap quickly before spending GPU time on the full render.", }, "Experimental restyle (V2V diffusion)": { "mode": "Guide video (V2V) — experimental", "cond": 0.6, "denoise": 1.0, "lora": 1.0, "duration": 3, "fps": 24, "enhance": False, "prompt": ("head_swap:\n" "FACE: [click 'Auto-describe face' to fill this in from your reference]\n\n" "ACTION: the person moves naturally, matching the guide video's motion."), "hint": "a short 2-4s clip. This mode re-renders through diffusion — identity transfer is weaker/experimental. Click 'Auto-describe face' first.", }, } def apply_faceoff_scenario(name): s = FACEOFF_SCENARIOS.get(name) if not s: return (gr.update(),) * 9 prompt_update = gr.update() if s["prompt"] is None else gr.update(value=s["prompt"]) return ( prompt_update, gr.update(value=s["mode"]), gr.update(value=s["cond"]), gr.update(value=s["denoise"]), gr.update(value=s["lora"]), gr.update(value=s["duration"]), gr.update(value=s["fps"]), gr.update(value=s["enhance"]), gr.update(value=f"**Best guide video for this scenario:** {s['hint']}"), ) with gr.Blocks() as demo: gr.Markdown("# FaceOff-FaceSwapper") gr.Markdown( "Upload a guide video and a reference face. The face is composited into a " "chroma strip on every frame and the video is re-rendered with the swapped head. " "**Consent required** — only process people who have agreed to it." ) with gr.Row(): guide_video = gr.Video(label="Guide Video", sources=["upload"]) with gr.Column(): face_image = gr.Image(label="Reference Face Image", type="pil") describe_btn = gr.Button("Auto-describe face", size="sm") with gr.Group(): gr.Markdown("**🚀 Quick Start — pick a scenario, everything gets set up for you**") with gr.Row(): scenario_dd = gr.Dropdown( choices=list(FACEOFF_SCENARIOS.keys()), label="Scenario", value=None, scale=3, ) scenario_btn = gr.Button("Apply", variant="secondary", scale=1) scenario_hint = gr.Markdown("") prompt = gr.Textbox( label="Prompt", lines=4, placeholder="head_swap:\nFACE: ...\n\nACTION: ...", ) with gr.Row(): duration = gr.Slider(1, 10, value=4, step=0.5, label="Duration (seconds)") fps = gr.Slider(8, 30, value=24, step=1, label="FPS") with gr.Row(): condition_mode = gr.Radio( ["Fast swap (InsightFace) — recommended", "Guide video (V2V) — experimental", "First frame only (I2V) — experimental"], value="Fast swap (InsightFace) — recommended", label="Swap mode", info="Fast swap: deterministic face swap on every frame, no prompt needed — this is the mode that reliably works. " "The diffusion modes re-render the video via the BFS LoRA and are experimental: identity transfer is currently weak.", ) with gr.Row(): condition_strength = gr.Slider( 0.3, 1.0, value=0.7, step=0.05, label="Guide adherence (V2V)", info="How strongly every frame is pulled back to the guide video. " "High values preserve the original face too — lower this if the head isn't swapping.", ) denoise_strength = gr.Slider( 0.5, 1.0, value=1.0, step=0.05, label="Denoise strength (V2V)", info="How much of the video is re-rendered. Keep at 1.0 for head swap; " "lower only to stay very close to the guide.", ) with gr.Row(): lora_strength = gr.Slider(0.0, 2.0, value=1.0, step=0.05, label="LoRA Strength") seed = gr.Number(value=42, label="Seed") enhance_faces = gr.Checkbox( label="Enhance faces with GFPGAN (adds ~30s)", value=False, ) run_btn = gr.Button("Generate", variant="primary") output_video = gr.Video(label="Output Video") status = gr.Textbox(label="Status", interactive=False) describe_btn.click( fn=auto_describe_face, inputs=[face_image], outputs=[prompt], ) scenario_btn.click( fn=apply_faceoff_scenario, inputs=[scenario_dd], outputs=[prompt, condition_mode, condition_strength, denoise_strength, lora_strength, duration, fps, enhance_faces, scenario_hint], ) run_btn.click( fn=generate, inputs=[ guide_video, face_image, prompt, duration, fps, lora_strength, seed, condition_mode, condition_strength, denoise_strength, enhance_faces, ], outputs=[output_video, status], ) gr.Examples( examples=[ [ None, "examples/example_face.png", ( "head_swap:\n" "FACE: Male, fair skin, approximately 25-30 years old, short light brown hair,\n" "blue eyes, clean-shaven, athletic build, wearing a navy blue t-shirt.\n\n" "ACTION: A person walks confidently toward the camera in an outdoor plaza,\n" "arms crossed, smiling." ), 4, 24, 1.0, 42, "Fast swap (InsightFace) — recommended", 0.7, 1.0, False, ] ], inputs=[guide_video, face_image, prompt, duration, fps, lora_strength, seed, condition_mode, condition_strength, denoise_strength, enhance_faces], label="Example (upload your own guide video to generate)", ) demo.launch(show_error=True, ssr_mode=False)