Spaces:
Running on Zero
Running on Zero
| import os | |
| # Patch gradio_client to handle boolean JSON schemas (fixes TypeError in get_api_info) | |
| try: | |
| import gradio_client.utils as _gcu | |
| _orig_jsch = _gcu._json_schema_to_python_type | |
| def _patched_jsch(schema, defs=None): | |
| if not isinstance(schema, dict): | |
| return 'Any' | |
| return _orig_jsch(schema, defs) | |
| _gcu._json_schema_to_python_type = _patched_jsch | |
| except Exception: | |
| pass | |
| import tempfile | |
| import gradio as gr | |
| import numpy as np | |
| from PIL import Image | |
| import spaces | |
| import torch | |
| from composer import compose_frames, crop_reserved_region | |
| from fastswap import fast_swap_video | |
| from pipeline import load_pipeline, run_inference | |
| from video_utils import ( | |
| compute_target_size, | |
| extract_audio, | |
| frames_for_duration, | |
| load_video_frames, | |
| resize_frames, | |
| save_video, | |
| ) | |
| DEFAULT_RESOLUTION = 768 | |
| REGION_SIZE = 256 | |
| _face_analysis = None | |
| _describe_model = None | |
| _describe_proc = None | |
| # ZeroGPU: the pipeline MUST load at module level. @spaces.GPU calls run in a | |
| # forked process that is discarded afterward, so lazy-loading inside generate() | |
| # would reload the full 13B model on every single click (and blow the GPU | |
| # window). pipe.to("cuda") here is virtualized by the spaces package until a | |
| # GPU is actually attached. | |
| print("Loading LTX pipeline at startup (this takes a few minutes on first boot)โฆ") | |
| _pipeline_state = load_pipeline(progress_cb=lambda m: print(f"[startup] {m}")) | |
| print("Pipeline ready.") | |
| def make_temp_file(suffix: str) -> str: | |
| fd, path = tempfile.mkstemp(suffix=suffix) | |
| os.close(fd) | |
| return path | |
| # โโ Face alignment (CPU) โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| def _get_face_analysis(): | |
| global _face_analysis | |
| if _face_analysis is None: | |
| from insightface.app import FaceAnalysis | |
| _face_analysis = FaceAnalysis(name='buffalo_l', providers=['CPUExecutionProvider']) | |
| _face_analysis.prepare(ctx_id=-1, det_size=(640, 640)) | |
| return _face_analysis | |
| def align_face_image(pil_image: Image.Image): | |
| """Detect face, crop with padding. Returns (cropped_pil, status_str).""" | |
| try: | |
| fa = _get_face_analysis() | |
| img_rgb = np.array(pil_image.convert('RGB')) | |
| img_bgr = img_rgb[:, :, ::-1].copy() | |
| faces = fa.get(img_bgr) | |
| if not faces: | |
| return pil_image, "No face detected โ using full image." | |
| face = faces[0] | |
| x1, y1, x2, y2 = face.bbox.astype(int) | |
| fw, fh = x2 - x1, y2 - y1 | |
| pad_x, pad_y = int(fw * 0.45), int(fh * 0.55) | |
| H, W = img_rgb.shape[:2] | |
| x1 = max(0, x1 - pad_x) | |
| y1 = max(0, y1 - pad_y) | |
| x2 = min(W, x2 + pad_x) | |
| y2 = min(H, y2 + pad_y) | |
| return Image.fromarray(img_rgb[y1:y2, x1:x2]), "Face aligned โ" | |
| except Exception as e: | |
| return pil_image, f"Alignment skipped: {e}" | |
| # โโ Face enhancement (GPU, optional) โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| def _get_face_enhancer(): | |
| from gfpgan import GFPGANer | |
| return GFPGANer( | |
| model_path='https://github.com/TencentARC/GFPGAN/releases/download/v1.3.0/GFPGANv1.4.pth', | |
| upscale=1, | |
| arch='clean', | |
| channel_multiplier=2, | |
| ) | |
| def enhance_video_frames(frames: np.ndarray) -> np.ndarray: | |
| """Apply GFPGAN to each frame. frames: (N, H, W, 3) RGB uint8.""" | |
| enhancer = _get_face_enhancer() | |
| out = [] | |
| for frame in frames: | |
| bgr = frame[:, :, ::-1].copy() | |
| try: | |
| _, _, restored = enhancer.enhance( | |
| bgr, has_aligned=False, only_center_face=False, paste_back=True | |
| ) | |
| out.append(restored[:, :, ::-1]) | |
| except Exception: | |
| out.append(frame) | |
| return np.stack(out) | |
| # โโ Auto face description (GPU) โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| def auto_describe_face(face_image): | |
| global _describe_model, _describe_proc | |
| if face_image is None: | |
| return gr.update() | |
| if _describe_model is None: | |
| from transformers import Blip2Processor, Blip2ForConditionalGeneration | |
| _describe_proc = Blip2Processor.from_pretrained("Salesforce/blip2-opt-2.7b") | |
| _describe_model = Blip2ForConditionalGeneration.from_pretrained( | |
| "Salesforce/blip2-opt-2.7b", | |
| torch_dtype=torch.float16, | |
| ).cuda().eval() | |
| if not isinstance(face_image, Image.Image): | |
| face_image = Image.fromarray(face_image) | |
| question = ( | |
| "Question: Describe the facial features of this person in detail. " | |
| "Include approximate age, gender, hair color and length, eye color, " | |
| "skin tone, any facial hair, and distinctive features. " | |
| "Be specific and concise. Answer:" | |
| ) | |
| inputs = _describe_proc(face_image, question, return_tensors="pt").to("cuda", torch.float16) | |
| input_len = inputs["input_ids"].shape[1] | |
| with torch.no_grad(): | |
| ids = _describe_model.generate(**inputs, max_new_tokens=200) | |
| caption = _describe_proc.batch_decode(ids[:, input_len:], skip_special_tokens=True)[0].strip() | |
| return ( | |
| "head_swap:\n" | |
| f"FACE: {caption}\n\n" | |
| "ACTION: <describe the body and movement from your guide video>" | |
| ) | |
| # โโ Main generation โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| def _extract_video_path(v) -> str | None: | |
| """Normalize whatever Gradio 5.x passes for a Video component to a plain path string.""" | |
| if v is None: | |
| return None | |
| if isinstance(v, str): | |
| return v | |
| if isinstance(v, dict): | |
| video = v.get("video") or v.get("path") | |
| if isinstance(video, dict): | |
| return video.get("path") | |
| return video | |
| if hasattr(v, "video"): | |
| vv = v.video | |
| return vv.path if hasattr(vv, "path") else str(vv) | |
| if hasattr(v, "path"): | |
| return str(v.path) | |
| return str(v) | |
| def _generate_duration(guide_video_raw, face_image, prompt, duration, fps, | |
| lora_strength, seed, condition_mode, condition_strength, | |
| denoise_strength, enhance_faces, *args, **kwargs): | |
| # Fast swap runs InsightFace on CPU (detection+recognition only, 320px โ see fastswap.py). | |
| # Generous window covers the first-call ~1GB model downloads + GFPGAN (GFPGAN runs on the | |
| # torch GPU, so it's fast). ZeroGPU allows this (DreamBoat uses 700). | |
| if str(condition_mode).startswith("Fast"): | |
| return 500 if enhance_faces else 400 | |
| return 300 | |
| def generate( | |
| guide_video_raw, | |
| face_image, | |
| prompt, | |
| duration, | |
| fps, | |
| lora_strength, | |
| seed, | |
| condition_mode, | |
| condition_strength, | |
| denoise_strength, | |
| enhance_faces, | |
| progress=gr.Progress(), | |
| ): | |
| try: | |
| return _generate_inner( | |
| guide_video_raw, face_image, prompt, duration, fps, | |
| lora_strength, seed, condition_mode, condition_strength, | |
| denoise_strength, enhance_faces, progress, | |
| ) | |
| except Exception as e: | |
| import traceback | |
| tb = traceback.format_exc() | |
| return None, f"ERROR โ {type(e).__name__}: {e}\n\n{tb}" | |
| def _generate_inner( | |
| guide_video_raw, | |
| face_image, | |
| prompt, | |
| duration, | |
| fps, | |
| lora_strength, | |
| seed, | |
| condition_mode, | |
| condition_strength, | |
| denoise_strength, | |
| enhance_faces, | |
| progress, | |
| ): | |
| guide_video_path = _extract_video_path(guide_video_raw) | |
| if not guide_video_path: | |
| return None, "Please upload a guide video." | |
| if face_image is None: | |
| return None, "Please upload a reference face image." | |
| is_fast = condition_mode.startswith("Fast") | |
| if not is_fast and (not prompt or not str(prompt).strip()): | |
| return None, "Please enter a text prompt (diffusion modes only)." | |
| if not os.path.isfile(guide_video_path): | |
| return None, f"Guide video path is not a real file: {guide_video_path}" | |
| progress(0, desc="Aligning reference faceโฆ") | |
| if not isinstance(face_image, Image.Image): | |
| face_image = Image.fromarray(face_image) | |
| aligned_face, align_msg = align_face_image(face_image) | |
| progress(0.05, desc="Loading guide videoโฆ") | |
| frames, source_fps = load_video_frames(guide_video_path) | |
| if len(frames) == 0: | |
| return None, "Could not read frames from the guide video." | |
| total_secs = len(frames) / max(source_fps, 1) | |
| trim_note = f" (trimmed from {total_secs:.1f}s)" if total_secs > duration + 0.5 else "" | |
| audio_tmp = make_temp_file(".wav") | |
| has_audio = extract_audio(guide_video_path, audio_tmp) | |
| progress(0.10, desc="Resizing framesโฆ") | |
| orig_h, orig_w = frames.shape[1], frames.shape[2] | |
| target_w, target_h = compute_target_size(orig_w, orig_h, DEFAULT_RESOLUTION) | |
| frames = resize_frames(frames, target_w, target_h) | |
| n_frames = frames_for_duration(fps, duration) | |
| if len(frames) >= n_frames: | |
| frames = frames[:n_frames] | |
| else: | |
| pad = np.stack([frames[-1]] * (n_frames - len(frames))) | |
| frames = np.concatenate([frames, pad], axis=0) | |
| if is_fast: | |
| # InsightFace inswapper โ deterministic per-frame swap, no prompt needed. | |
| # Full-quality face image (not the tight aligned crop) gives inswapper | |
| # more landmarks to work with. | |
| progress(0.15, desc="Fast swap (InsightFace)โฆ") | |
| cropped, swap_msg = fast_swap_video( | |
| frames, | |
| face_image, | |
| progress_cb=lambda frac, msg: progress(0.15 + frac * 0.7, desc=msg), | |
| ) | |
| align_msg = f"{align_msg} {swap_msg}" | |
| else: | |
| progress(0.15, desc="Compositing reference face stripโฆ") | |
| composed = compose_frames( | |
| frames, | |
| aligned_face, | |
| region_position="left", | |
| region_size_px=REGION_SIZE, | |
| ) | |
| progress(0.20, desc="Running LTX diffusionโฆ") | |
| generated = run_inference( | |
| _pipeline_state, | |
| composed, | |
| prompt=prompt, | |
| fps=fps, | |
| lora_strength=lora_strength, | |
| seed=int(seed), | |
| condition_mode=condition_mode, | |
| condition_strength=condition_strength, | |
| denoise_strength=denoise_strength, | |
| progress_cb=lambda msg: progress(0.20, desc=msg), | |
| ) | |
| progress(0.90, desc="Cropping reserved regionโฆ") | |
| cropped = crop_reserved_region( | |
| generated, | |
| region_position="left", | |
| region_size_px=REGION_SIZE, | |
| output_size=(target_w, target_h), | |
| ) | |
| if enhance_faces: | |
| progress(0.92, desc="Enhancing faces (GFPGAN)โฆ") | |
| cropped = enhance_video_frames(cropped) | |
| progress(0.95, desc="Encoding output videoโฆ") | |
| out_path = make_temp_file(".mp4") | |
| save_video( | |
| cropped, | |
| fps=fps, | |
| output_path=out_path, | |
| audio_path=audio_tmp if has_audio else None, | |
| audio_duration=duration, | |
| ) | |
| if not os.path.isfile(out_path): | |
| return None, f"Output file was not created: {out_path}" | |
| progress(1.0, desc="Done.") | |
| return out_path, f"Generation complete.{trim_note} {align_msg}" | |
| # โโ UI โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # ============ Quick Start Scenarios (one-click: sets the swap mode + all tuning, and | |
| # tells you what the GUIDE VIDEO should be) ============ | |
| # "Fast swap" is the reliable InsightFace path and ignores the prompt, so those | |
| # scenarios leave the prompt untouched (None). The experimental V2V one sets a template. | |
| FACEOFF_SCENARIOS = { | |
| "Talking head / vlog (best quality)": { | |
| "mode": "Fast swap (InsightFace) โ recommended", | |
| "cond": 0.7, "denoise": 1.0, "lora": 1.0, "duration": 4, "fps": 24, "enhance": True, | |
| "prompt": None, | |
| "hint": "a guide video of someone talking to camera, head-and-shoulders framing, face clearly visible and well lit, minimal fast motion.", | |
| }, | |
| "Full-body performance / dance": { | |
| "mode": "Fast swap (InsightFace) โ recommended", | |
| "cond": 0.7, "denoise": 1.0, "lora": 1.0, "duration": 5, "fps": 24, "enhance": True, | |
| "prompt": None, | |
| "hint": "a guide video with the full body in frame and the face clearly visible. Even lighting; avoid heavy motion blur on the face.", | |
| }, | |
| "Close-up (max face fidelity)": { | |
| "mode": "Fast swap (InsightFace) โ recommended", | |
| "cond": 0.7, "denoise": 1.0, "lora": 1.0, "duration": 3, "fps": 24, "enhance": True, | |
| "prompt": None, | |
| "hint": "a tight close-up of the face, sharp and well lit, looking near the camera. This gives the best identity match.", | |
| }, | |
| "Quick preview (fast + cheap)": { | |
| "mode": "Fast swap (InsightFace) โ recommended", | |
| "cond": 0.7, "denoise": 1.0, "lora": 1.0, "duration": 2, "fps": 16, "enhance": False, | |
| "prompt": None, | |
| "hint": "any short clip. Low fps + 2s to test the swap quickly before spending GPU time on the full render.", | |
| }, | |
| "Experimental restyle (V2V diffusion)": { | |
| "mode": "Guide video (V2V) โ experimental", | |
| "cond": 0.6, "denoise": 1.0, "lora": 1.0, "duration": 3, "fps": 24, "enhance": False, | |
| "prompt": ("head_swap:\n" | |
| "FACE: [click 'Auto-describe face' to fill this in from your reference]\n\n" | |
| "ACTION: the person moves naturally, matching the guide video's motion."), | |
| "hint": "a short 2-4s clip. This mode re-renders through diffusion โ identity transfer is weaker/experimental. Click 'Auto-describe face' first.", | |
| }, | |
| } | |
| def apply_faceoff_scenario(name): | |
| s = FACEOFF_SCENARIOS.get(name) | |
| if not s: | |
| return (gr.update(),) * 9 | |
| prompt_update = gr.update() if s["prompt"] is None else gr.update(value=s["prompt"]) | |
| return ( | |
| prompt_update, | |
| gr.update(value=s["mode"]), | |
| gr.update(value=s["cond"]), | |
| gr.update(value=s["denoise"]), | |
| gr.update(value=s["lora"]), | |
| gr.update(value=s["duration"]), | |
| gr.update(value=s["fps"]), | |
| gr.update(value=s["enhance"]), | |
| gr.update(value=f"**Best guide video for this scenario:** {s['hint']}"), | |
| ) | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# FaceOff-FaceSwapper") | |
| gr.Markdown( | |
| "Upload a guide video and a reference face. The face is composited into a " | |
| "chroma strip on every frame and the video is re-rendered with the swapped head. " | |
| "**Consent required** โ only process people who have agreed to it." | |
| ) | |
| with gr.Row(): | |
| guide_video = gr.Video(label="Guide Video", sources=["upload"]) | |
| with gr.Column(): | |
| face_image = gr.Image(label="Reference Face Image", type="pil") | |
| describe_btn = gr.Button("Auto-describe face", size="sm") | |
| with gr.Group(): | |
| gr.Markdown("**๐ Quick Start โ pick a scenario, everything gets set up for you**") | |
| with gr.Row(): | |
| scenario_dd = gr.Dropdown( | |
| choices=list(FACEOFF_SCENARIOS.keys()), | |
| label="Scenario", value=None, scale=3, | |
| ) | |
| scenario_btn = gr.Button("Apply", variant="secondary", scale=1) | |
| scenario_hint = gr.Markdown("") | |
| prompt = gr.Textbox( | |
| label="Prompt", | |
| lines=4, | |
| placeholder="head_swap:\nFACE: ...\n\nACTION: ...", | |
| ) | |
| with gr.Row(): | |
| duration = gr.Slider(1, 10, value=4, step=0.5, label="Duration (seconds)") | |
| fps = gr.Slider(8, 30, value=24, step=1, label="FPS") | |
| with gr.Row(): | |
| condition_mode = gr.Radio( | |
| ["Fast swap (InsightFace) โ recommended", "Guide video (V2V) โ experimental", "First frame only (I2V) โ experimental"], | |
| value="Fast swap (InsightFace) โ recommended", | |
| label="Swap mode", | |
| info="Fast swap: deterministic face swap on every frame, no prompt needed โ this is the mode that reliably works. " | |
| "The diffusion modes re-render the video via the BFS LoRA and are experimental: identity transfer is currently weak.", | |
| ) | |
| with gr.Row(): | |
| condition_strength = gr.Slider( | |
| 0.3, 1.0, value=0.7, step=0.05, | |
| label="Guide adherence (V2V)", | |
| info="How strongly every frame is pulled back to the guide video. " | |
| "High values preserve the original face too โ lower this if the head isn't swapping.", | |
| ) | |
| denoise_strength = gr.Slider( | |
| 0.5, 1.0, value=1.0, step=0.05, | |
| label="Denoise strength (V2V)", | |
| info="How much of the video is re-rendered. Keep at 1.0 for head swap; " | |
| "lower only to stay very close to the guide.", | |
| ) | |
| with gr.Row(): | |
| lora_strength = gr.Slider(0.0, 2.0, value=1.0, step=0.05, label="LoRA Strength") | |
| seed = gr.Number(value=42, label="Seed") | |
| enhance_faces = gr.Checkbox( | |
| label="Enhance faces with GFPGAN (adds ~30s)", | |
| value=False, | |
| ) | |
| run_btn = gr.Button("Generate", variant="primary") | |
| output_video = gr.Video(label="Output Video") | |
| status = gr.Textbox(label="Status", interactive=False) | |
| describe_btn.click( | |
| fn=auto_describe_face, | |
| inputs=[face_image], | |
| outputs=[prompt], | |
| ) | |
| scenario_btn.click( | |
| fn=apply_faceoff_scenario, | |
| inputs=[scenario_dd], | |
| outputs=[prompt, condition_mode, condition_strength, denoise_strength, | |
| lora_strength, duration, fps, enhance_faces, scenario_hint], | |
| ) | |
| run_btn.click( | |
| fn=generate, | |
| inputs=[ | |
| guide_video, | |
| face_image, | |
| prompt, | |
| duration, | |
| fps, | |
| lora_strength, | |
| seed, | |
| condition_mode, | |
| condition_strength, | |
| denoise_strength, | |
| enhance_faces, | |
| ], | |
| outputs=[output_video, status], | |
| ) | |
| gr.Examples( | |
| examples=[ | |
| [ | |
| None, | |
| "examples/example_face.png", | |
| ( | |
| "head_swap:\n" | |
| "FACE: Male, fair skin, approximately 25-30 years old, short light brown hair,\n" | |
| "blue eyes, clean-shaven, athletic build, wearing a navy blue t-shirt.\n\n" | |
| "ACTION: A person walks confidently toward the camera in an outdoor plaza,\n" | |
| "arms crossed, smiling." | |
| ), | |
| 4, | |
| 24, | |
| 1.0, | |
| 42, | |
| "Fast swap (InsightFace) โ recommended", | |
| 0.7, | |
| 1.0, | |
| False, | |
| ] | |
| ], | |
| inputs=[guide_video, face_image, prompt, duration, fps, lora_strength, seed, condition_mode, condition_strength, denoise_strength, enhance_faces], | |
| label="Example (upload your own guide video to generate)", | |
| ) | |
| demo.launch(show_error=True, ssr_mode=False) | |