Jds20001's picture
Bigger fast-swap time window (400/500s)
b72e013 verified
Raw
History Blame Contribute Delete
19.4 kB
import os
# Patch gradio_client to handle boolean JSON schemas (fixes TypeError in get_api_info)
try:
import gradio_client.utils as _gcu
_orig_jsch = _gcu._json_schema_to_python_type
def _patched_jsch(schema, defs=None):
if not isinstance(schema, dict):
return 'Any'
return _orig_jsch(schema, defs)
_gcu._json_schema_to_python_type = _patched_jsch
except Exception:
pass
import tempfile
import gradio as gr
import numpy as np
from PIL import Image
import spaces
import torch
from composer import compose_frames, crop_reserved_region
from fastswap import fast_swap_video
from pipeline import load_pipeline, run_inference
from video_utils import (
compute_target_size,
extract_audio,
frames_for_duration,
load_video_frames,
resize_frames,
save_video,
)
DEFAULT_RESOLUTION = 768
REGION_SIZE = 256
_face_analysis = None
_describe_model = None
_describe_proc = None
# ZeroGPU: the pipeline MUST load at module level. @spaces.GPU calls run in a
# forked process that is discarded afterward, so lazy-loading inside generate()
# would reload the full 13B model on every single click (and blow the GPU
# window). pipe.to("cuda") here is virtualized by the spaces package until a
# GPU is actually attached.
print("Loading LTX pipeline at startup (this takes a few minutes on first boot)โ€ฆ")
_pipeline_state = load_pipeline(progress_cb=lambda m: print(f"[startup] {m}"))
print("Pipeline ready.")
def make_temp_file(suffix: str) -> str:
fd, path = tempfile.mkstemp(suffix=suffix)
os.close(fd)
return path
# โ”€โ”€ Face alignment (CPU) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def _get_face_analysis():
global _face_analysis
if _face_analysis is None:
from insightface.app import FaceAnalysis
_face_analysis = FaceAnalysis(name='buffalo_l', providers=['CPUExecutionProvider'])
_face_analysis.prepare(ctx_id=-1, det_size=(640, 640))
return _face_analysis
def align_face_image(pil_image: Image.Image):
"""Detect face, crop with padding. Returns (cropped_pil, status_str)."""
try:
fa = _get_face_analysis()
img_rgb = np.array(pil_image.convert('RGB'))
img_bgr = img_rgb[:, :, ::-1].copy()
faces = fa.get(img_bgr)
if not faces:
return pil_image, "No face detected โ€” using full image."
face = faces[0]
x1, y1, x2, y2 = face.bbox.astype(int)
fw, fh = x2 - x1, y2 - y1
pad_x, pad_y = int(fw * 0.45), int(fh * 0.55)
H, W = img_rgb.shape[:2]
x1 = max(0, x1 - pad_x)
y1 = max(0, y1 - pad_y)
x2 = min(W, x2 + pad_x)
y2 = min(H, y2 + pad_y)
return Image.fromarray(img_rgb[y1:y2, x1:x2]), "Face aligned โœ“"
except Exception as e:
return pil_image, f"Alignment skipped: {e}"
# โ”€โ”€ Face enhancement (GPU, optional) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def _get_face_enhancer():
from gfpgan import GFPGANer
return GFPGANer(
model_path='https://github.com/TencentARC/GFPGAN/releases/download/v1.3.0/GFPGANv1.4.pth',
upscale=1,
arch='clean',
channel_multiplier=2,
)
def enhance_video_frames(frames: np.ndarray) -> np.ndarray:
"""Apply GFPGAN to each frame. frames: (N, H, W, 3) RGB uint8."""
enhancer = _get_face_enhancer()
out = []
for frame in frames:
bgr = frame[:, :, ::-1].copy()
try:
_, _, restored = enhancer.enhance(
bgr, has_aligned=False, only_center_face=False, paste_back=True
)
out.append(restored[:, :, ::-1])
except Exception:
out.append(frame)
return np.stack(out)
# โ”€โ”€ Auto face description (GPU) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
@spaces.GPU(duration=120)
def auto_describe_face(face_image):
global _describe_model, _describe_proc
if face_image is None:
return gr.update()
if _describe_model is None:
from transformers import Blip2Processor, Blip2ForConditionalGeneration
_describe_proc = Blip2Processor.from_pretrained("Salesforce/blip2-opt-2.7b")
_describe_model = Blip2ForConditionalGeneration.from_pretrained(
"Salesforce/blip2-opt-2.7b",
torch_dtype=torch.float16,
).cuda().eval()
if not isinstance(face_image, Image.Image):
face_image = Image.fromarray(face_image)
question = (
"Question: Describe the facial features of this person in detail. "
"Include approximate age, gender, hair color and length, eye color, "
"skin tone, any facial hair, and distinctive features. "
"Be specific and concise. Answer:"
)
inputs = _describe_proc(face_image, question, return_tensors="pt").to("cuda", torch.float16)
input_len = inputs["input_ids"].shape[1]
with torch.no_grad():
ids = _describe_model.generate(**inputs, max_new_tokens=200)
caption = _describe_proc.batch_decode(ids[:, input_len:], skip_special_tokens=True)[0].strip()
return (
"head_swap:\n"
f"FACE: {caption}\n\n"
"ACTION: <describe the body and movement from your guide video>"
)
# โ”€โ”€ Main generation โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def _extract_video_path(v) -> str | None:
"""Normalize whatever Gradio 5.x passes for a Video component to a plain path string."""
if v is None:
return None
if isinstance(v, str):
return v
if isinstance(v, dict):
video = v.get("video") or v.get("path")
if isinstance(video, dict):
return video.get("path")
return video
if hasattr(v, "video"):
vv = v.video
return vv.path if hasattr(vv, "path") else str(vv)
if hasattr(v, "path"):
return str(v.path)
return str(v)
def _generate_duration(guide_video_raw, face_image, prompt, duration, fps,
lora_strength, seed, condition_mode, condition_strength,
denoise_strength, enhance_faces, *args, **kwargs):
# Fast swap runs InsightFace on CPU (detection+recognition only, 320px โ€” see fastswap.py).
# Generous window covers the first-call ~1GB model downloads + GFPGAN (GFPGAN runs on the
# torch GPU, so it's fast). ZeroGPU allows this (DreamBoat uses 700).
if str(condition_mode).startswith("Fast"):
return 500 if enhance_faces else 400
return 300
@spaces.GPU(duration=_generate_duration)
def generate(
guide_video_raw,
face_image,
prompt,
duration,
fps,
lora_strength,
seed,
condition_mode,
condition_strength,
denoise_strength,
enhance_faces,
progress=gr.Progress(),
):
try:
return _generate_inner(
guide_video_raw, face_image, prompt, duration, fps,
lora_strength, seed, condition_mode, condition_strength,
denoise_strength, enhance_faces, progress,
)
except Exception as e:
import traceback
tb = traceback.format_exc()
return None, f"ERROR โ€” {type(e).__name__}: {e}\n\n{tb}"
def _generate_inner(
guide_video_raw,
face_image,
prompt,
duration,
fps,
lora_strength,
seed,
condition_mode,
condition_strength,
denoise_strength,
enhance_faces,
progress,
):
guide_video_path = _extract_video_path(guide_video_raw)
if not guide_video_path:
return None, "Please upload a guide video."
if face_image is None:
return None, "Please upload a reference face image."
is_fast = condition_mode.startswith("Fast")
if not is_fast and (not prompt or not str(prompt).strip()):
return None, "Please enter a text prompt (diffusion modes only)."
if not os.path.isfile(guide_video_path):
return None, f"Guide video path is not a real file: {guide_video_path}"
progress(0, desc="Aligning reference faceโ€ฆ")
if not isinstance(face_image, Image.Image):
face_image = Image.fromarray(face_image)
aligned_face, align_msg = align_face_image(face_image)
progress(0.05, desc="Loading guide videoโ€ฆ")
frames, source_fps = load_video_frames(guide_video_path)
if len(frames) == 0:
return None, "Could not read frames from the guide video."
total_secs = len(frames) / max(source_fps, 1)
trim_note = f" (trimmed from {total_secs:.1f}s)" if total_secs > duration + 0.5 else ""
audio_tmp = make_temp_file(".wav")
has_audio = extract_audio(guide_video_path, audio_tmp)
progress(0.10, desc="Resizing framesโ€ฆ")
orig_h, orig_w = frames.shape[1], frames.shape[2]
target_w, target_h = compute_target_size(orig_w, orig_h, DEFAULT_RESOLUTION)
frames = resize_frames(frames, target_w, target_h)
n_frames = frames_for_duration(fps, duration)
if len(frames) >= n_frames:
frames = frames[:n_frames]
else:
pad = np.stack([frames[-1]] * (n_frames - len(frames)))
frames = np.concatenate([frames, pad], axis=0)
if is_fast:
# InsightFace inswapper โ€” deterministic per-frame swap, no prompt needed.
# Full-quality face image (not the tight aligned crop) gives inswapper
# more landmarks to work with.
progress(0.15, desc="Fast swap (InsightFace)โ€ฆ")
cropped, swap_msg = fast_swap_video(
frames,
face_image,
progress_cb=lambda frac, msg: progress(0.15 + frac * 0.7, desc=msg),
)
align_msg = f"{align_msg} {swap_msg}"
else:
progress(0.15, desc="Compositing reference face stripโ€ฆ")
composed = compose_frames(
frames,
aligned_face,
region_position="left",
region_size_px=REGION_SIZE,
)
progress(0.20, desc="Running LTX diffusionโ€ฆ")
generated = run_inference(
_pipeline_state,
composed,
prompt=prompt,
fps=fps,
lora_strength=lora_strength,
seed=int(seed),
condition_mode=condition_mode,
condition_strength=condition_strength,
denoise_strength=denoise_strength,
progress_cb=lambda msg: progress(0.20, desc=msg),
)
progress(0.90, desc="Cropping reserved regionโ€ฆ")
cropped = crop_reserved_region(
generated,
region_position="left",
region_size_px=REGION_SIZE,
output_size=(target_w, target_h),
)
if enhance_faces:
progress(0.92, desc="Enhancing faces (GFPGAN)โ€ฆ")
cropped = enhance_video_frames(cropped)
progress(0.95, desc="Encoding output videoโ€ฆ")
out_path = make_temp_file(".mp4")
save_video(
cropped,
fps=fps,
output_path=out_path,
audio_path=audio_tmp if has_audio else None,
audio_duration=duration,
)
if not os.path.isfile(out_path):
return None, f"Output file was not created: {out_path}"
progress(1.0, desc="Done.")
return out_path, f"Generation complete.{trim_note} {align_msg}"
# โ”€โ”€ UI โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# ============ Quick Start Scenarios (one-click: sets the swap mode + all tuning, and
# tells you what the GUIDE VIDEO should be) ============
# "Fast swap" is the reliable InsightFace path and ignores the prompt, so those
# scenarios leave the prompt untouched (None). The experimental V2V one sets a template.
FACEOFF_SCENARIOS = {
"Talking head / vlog (best quality)": {
"mode": "Fast swap (InsightFace) โ€” recommended",
"cond": 0.7, "denoise": 1.0, "lora": 1.0, "duration": 4, "fps": 24, "enhance": True,
"prompt": None,
"hint": "a guide video of someone talking to camera, head-and-shoulders framing, face clearly visible and well lit, minimal fast motion.",
},
"Full-body performance / dance": {
"mode": "Fast swap (InsightFace) โ€” recommended",
"cond": 0.7, "denoise": 1.0, "lora": 1.0, "duration": 5, "fps": 24, "enhance": True,
"prompt": None,
"hint": "a guide video with the full body in frame and the face clearly visible. Even lighting; avoid heavy motion blur on the face.",
},
"Close-up (max face fidelity)": {
"mode": "Fast swap (InsightFace) โ€” recommended",
"cond": 0.7, "denoise": 1.0, "lora": 1.0, "duration": 3, "fps": 24, "enhance": True,
"prompt": None,
"hint": "a tight close-up of the face, sharp and well lit, looking near the camera. This gives the best identity match.",
},
"Quick preview (fast + cheap)": {
"mode": "Fast swap (InsightFace) โ€” recommended",
"cond": 0.7, "denoise": 1.0, "lora": 1.0, "duration": 2, "fps": 16, "enhance": False,
"prompt": None,
"hint": "any short clip. Low fps + 2s to test the swap quickly before spending GPU time on the full render.",
},
"Experimental restyle (V2V diffusion)": {
"mode": "Guide video (V2V) โ€” experimental",
"cond": 0.6, "denoise": 1.0, "lora": 1.0, "duration": 3, "fps": 24, "enhance": False,
"prompt": ("head_swap:\n"
"FACE: [click 'Auto-describe face' to fill this in from your reference]\n\n"
"ACTION: the person moves naturally, matching the guide video's motion."),
"hint": "a short 2-4s clip. This mode re-renders through diffusion โ€” identity transfer is weaker/experimental. Click 'Auto-describe face' first.",
},
}
def apply_faceoff_scenario(name):
s = FACEOFF_SCENARIOS.get(name)
if not s:
return (gr.update(),) * 9
prompt_update = gr.update() if s["prompt"] is None else gr.update(value=s["prompt"])
return (
prompt_update,
gr.update(value=s["mode"]),
gr.update(value=s["cond"]),
gr.update(value=s["denoise"]),
gr.update(value=s["lora"]),
gr.update(value=s["duration"]),
gr.update(value=s["fps"]),
gr.update(value=s["enhance"]),
gr.update(value=f"**Best guide video for this scenario:** {s['hint']}"),
)
with gr.Blocks() as demo:
gr.Markdown("# FaceOff-FaceSwapper")
gr.Markdown(
"Upload a guide video and a reference face. The face is composited into a "
"chroma strip on every frame and the video is re-rendered with the swapped head. "
"**Consent required** โ€” only process people who have agreed to it."
)
with gr.Row():
guide_video = gr.Video(label="Guide Video", sources=["upload"])
with gr.Column():
face_image = gr.Image(label="Reference Face Image", type="pil")
describe_btn = gr.Button("Auto-describe face", size="sm")
with gr.Group():
gr.Markdown("**๐Ÿš€ Quick Start โ€” pick a scenario, everything gets set up for you**")
with gr.Row():
scenario_dd = gr.Dropdown(
choices=list(FACEOFF_SCENARIOS.keys()),
label="Scenario", value=None, scale=3,
)
scenario_btn = gr.Button("Apply", variant="secondary", scale=1)
scenario_hint = gr.Markdown("")
prompt = gr.Textbox(
label="Prompt",
lines=4,
placeholder="head_swap:\nFACE: ...\n\nACTION: ...",
)
with gr.Row():
duration = gr.Slider(1, 10, value=4, step=0.5, label="Duration (seconds)")
fps = gr.Slider(8, 30, value=24, step=1, label="FPS")
with gr.Row():
condition_mode = gr.Radio(
["Fast swap (InsightFace) โ€” recommended", "Guide video (V2V) โ€” experimental", "First frame only (I2V) โ€” experimental"],
value="Fast swap (InsightFace) โ€” recommended",
label="Swap mode",
info="Fast swap: deterministic face swap on every frame, no prompt needed โ€” this is the mode that reliably works. "
"The diffusion modes re-render the video via the BFS LoRA and are experimental: identity transfer is currently weak.",
)
with gr.Row():
condition_strength = gr.Slider(
0.3, 1.0, value=0.7, step=0.05,
label="Guide adherence (V2V)",
info="How strongly every frame is pulled back to the guide video. "
"High values preserve the original face too โ€” lower this if the head isn't swapping.",
)
denoise_strength = gr.Slider(
0.5, 1.0, value=1.0, step=0.05,
label="Denoise strength (V2V)",
info="How much of the video is re-rendered. Keep at 1.0 for head swap; "
"lower only to stay very close to the guide.",
)
with gr.Row():
lora_strength = gr.Slider(0.0, 2.0, value=1.0, step=0.05, label="LoRA Strength")
seed = gr.Number(value=42, label="Seed")
enhance_faces = gr.Checkbox(
label="Enhance faces with GFPGAN (adds ~30s)",
value=False,
)
run_btn = gr.Button("Generate", variant="primary")
output_video = gr.Video(label="Output Video")
status = gr.Textbox(label="Status", interactive=False)
describe_btn.click(
fn=auto_describe_face,
inputs=[face_image],
outputs=[prompt],
)
scenario_btn.click(
fn=apply_faceoff_scenario,
inputs=[scenario_dd],
outputs=[prompt, condition_mode, condition_strength, denoise_strength,
lora_strength, duration, fps, enhance_faces, scenario_hint],
)
run_btn.click(
fn=generate,
inputs=[
guide_video,
face_image,
prompt,
duration,
fps,
lora_strength,
seed,
condition_mode,
condition_strength,
denoise_strength,
enhance_faces,
],
outputs=[output_video, status],
)
gr.Examples(
examples=[
[
None,
"examples/example_face.png",
(
"head_swap:\n"
"FACE: Male, fair skin, approximately 25-30 years old, short light brown hair,\n"
"blue eyes, clean-shaven, athletic build, wearing a navy blue t-shirt.\n\n"
"ACTION: A person walks confidently toward the camera in an outdoor plaza,\n"
"arms crossed, smiling."
),
4,
24,
1.0,
42,
"Fast swap (InsightFace) โ€” recommended",
0.7,
1.0,
False,
]
],
inputs=[guide_video, face_image, prompt, duration, fps, lora_strength, seed, condition_mode, condition_strength, denoise_strength, enhance_faces],
label="Example (upload your own guide video to generate)",
)
demo.launch(show_error=True, ssr_mode=False)