Swapper / processors /video_processor.py
devkunalnaik's picture
Fix flickering: detect faces every frame, no per-frame enhancement in video
47c8f24
Raw
History Blame Contribute Delete
12.7 kB
"""
Video processor β€” extracts frames from an input video, applies face or body
swap to each frame, then re-encodes the result with FFmpeg (audio preserved).
Speed optimisations
-------------------
* Source face is detected **once** before the loop (never per-frame).
* Target face detection is cached and reused for DET_INTERVAL frames β€” faces
don't move much between consecutive frames at normal frame rates.
* Video frames are capped at 720p for processing (upscaled back for writing).
* A hard cap of MAX_FRAMES is enforced to keep processing times reasonable on
free CPU tiers.
"""
import cv2
import os
import tempfile
import numpy as np
from pathlib import Path
MAX_FRAMES = 600 # ~20 s at 30 fps
DET_INTERVAL = 1 # detect faces every frame β€” caching causes flicker when face moves
class VideoProcessor:
def __init__(
self,
face_swapper=None,
body_swapper=None,
):
self.face_swapper = face_swapper
self.body_swapper = body_swapper
# ── Public API ────────────────────────────────────────────────────────────
def process_video(
self,
source_bgr: np.ndarray,
video_path: str,
mode: str = "face", # "face" | "body"
enhance: bool = False,
blend_strength: float = 0.85,
fast_mode: bool = False, # skip every other frame (~2x speed)
start_frame: int = 0, # resume from this frame index
progress=None,
) -> tuple[str | None, str]:
"""
Process every frame of *video_path*, applying the selected swap mode.
Set *start_frame* > 0 to resume after a dropped connection.
Partial output is always saved β€” even if processing is interrupted.
Returns:
(output_path, status_message)
"""
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return None, "Could not open video file."
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Clamp start_frame
start_frame = max(0, min(start_frame, total_frames - 1))
remaining = total_frames - start_frame
if remaining > MAX_FRAMES:
cap.release()
return None, (
f"Segment starting at frame {start_frame} has {remaining} frames β€” "
f"maximum allowed is {MAX_FRAMES} (~{MAX_FRAMES / fps:.0f} s at {fps:.0f} fps). "
"Increase the start frame or trim the video."
)
# ── Pre-compute source face once (big win for face-swap mode) ─────────
source_face = None
if mode == "face" and self.face_swapper:
source_face = self.face_swapper.get_source_face(source_bgr)
if source_face is None:
cap.release()
return None, "No face detected in source image."
# ── Seek to start_frame β€” use FFmpeg cut for instant seek ──────────────
# cap.set(POS_FRAMES) is slow: OpenCV decodes every frame up to the
# target. FFmpeg keyframe-seeks in milliseconds.
segment_path = None
if start_frame > 0:
start_time = start_frame / fps
segment_path = tempfile.mktemp(suffix="_segment.mp4")
try:
import ffmpeg as _ffmpeg
(
_ffmpeg.input(video_path, ss=start_time)
.output(segment_path, c="copy", avoid_negative_ts="make_zero")
.overwrite_output()
.run(quiet=True)
)
cap.release()
cap = cv2.VideoCapture(segment_path)
print(f"[VideoProcessor] Resumed via FFmpeg cut at frame {start_frame} ({start_time:.2f}s)")
except Exception as e:
print(f"[VideoProcessor] FFmpeg seek failed ({e}), falling back to slow seek")
segment_path = None
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
# Use AVI + XVID for the intermediate file β€” far more reliable than
# mp4v on Linux (HF Spaces). FFmpeg converts it to H.264/mp4 after.
# XVID/MJPG require even dimensions β€” round down if necessary.
enc_w = width - (width % 2)
enc_h = height - (height % 2)
raw_out_path = tempfile.mktemp(suffix="_raw.avi")
fourcc = cv2.VideoWriter_fourcc(*"XVID")
writer = cv2.VideoWriter(raw_out_path, fourcc, fps, (enc_w, enc_h))
if not writer.isOpened():
# XVID not available β€” fall back to MJPG
raw_out_path = tempfile.mktemp(suffix="_raw.avi")
fourcc = cv2.VideoWriter_fourcc(*"MJPG")
writer = cv2.VideoWriter(raw_out_path, fourcc, fps, (enc_w, enc_h))
frame_idx = start_frame # absolute frame number in the source video
processed = 0
errors = 0
cached_tgt_faces = None
last_result = None
try:
while True:
ret, frame = cap.read()
if not ret:
break
if progress is not None and total_frames > 0:
progress(
(frame_idx - start_frame) / remaining,
f"Frame {frame_idx + 1} / {total_frames} "
f"(resume at {frame_idx} if interrupted)",
)
# Fast mode: skip odd frames β€” write the ORIGINAL frame (not a
# duplicate) so motion stays smooth with no stutter or blur.
# Only applies to face swap; body swap needs every frame.
if fast_mode and mode == "face" and (frame_idx - start_frame) % 2 == 1:
writer.write(frame) # original frame keeps motion fluid
frame_idx += 1
continue
# Only re-detect target faces every DET_INTERVAL frames
use_cache = (mode == "face") and (frame_idx % DET_INTERVAL != 0) and (cached_tgt_faces is not None)
result_frame, new_faces = self._process_frame(
source_bgr, frame, mode, enhance, blend_strength,
source_face=source_face,
cached_target_faces=cached_tgt_faces if use_cache else None,
)
if mode == "face" and new_faces is not None:
cached_tgt_faces = new_faces if new_faces else cached_tgt_faces
if result_frame is not None:
# Ensure frame matches writer dimensions (even crop if needed)
rf_h, rf_w = result_frame.shape[:2]
if rf_w != enc_w or rf_h != enc_h:
result_frame = cv2.resize(result_frame, (enc_w, enc_h), interpolation=cv2.INTER_LINEAR)
writer.write(result_frame)
last_result = result_frame
processed += 1
else:
frm = frame[:enc_h, :enc_w] if (frame.shape[1] > enc_w or frame.shape[0] > enc_h) else frame
if frm.shape[1] != enc_w or frm.shape[0] != enc_h:
frm = cv2.resize(frm, (enc_w, enc_h), interpolation=cv2.INTER_LINEAR)
writer.write(frm)
last_result = frm
errors += 1
frame_idx += 1
except Exception as loop_err:
print(f"[VideoProcessor] Loop interrupted at frame {frame_idx}: {loop_err}")
finally:
cap.release()
writer.release()
if segment_path:
try:
os.unlink(segment_path)
except OSError:
pass
frames_done = frame_idx - start_frame
if frames_done == 0:
try:
os.unlink(raw_out_path)
except OSError:
pass
return None, f"No frames processed. Try resuming from frame {start_frame}."
# Re-encode with H.264 and merge original audio via FFmpeg
# Pass start_time so audio lines up with the resumed segment
start_time = start_frame / fps
final_path = self._ffmpeg_encode(video_path, raw_out_path, audio_start=start_time)
try:
os.unlink(raw_out_path)
except OSError:
pass
partial = frames_done < remaining
status = (
f"{'Partial β€” ' if partial else ''}Frames {start_frame}–{frame_idx - 1} "
f"({processed} swapped{', ' + str(errors) + ' skipped' if errors else ''}). "
+ (f"Resume from frame {frame_idx} to continue." if partial else "Done.")
)
return final_path, status
# ── Internal helpers ──────────────────────────────────────────────────────
def _process_frame(
self,
source_bgr: np.ndarray,
frame: np.ndarray,
mode: str,
enhance: bool,
blend_strength: float,
source_face=None,
cached_target_faces=None,
):
"""Returns (result_frame_or_None, detected_faces_or_None)."""
try:
if mode == "face" and self.face_swapper:
result, faces = self.face_swapper.swap_frame(
frame,
source_face,
cached_target_faces=cached_target_faces,
enhance=enhance,
)
return result, faces
elif mode == "body" and self.body_swapper:
result, _ = self.body_swapper.swap(
source_bgr, frame, blend_strength=blend_strength
)
return result, None
except Exception as e:
print(f"[VideoProcessor] Frame error: {e}")
return None, None
@staticmethod
def _ffmpeg_encode(original_video_path: str, processed_raw_path: str, audio_start: float = 0.0) -> str:
"""
Re-encode processed frames as H.264 mp4 and merge the original audio.
audio_start: seconds into the original audio (for resumed segments).
Returns the output path; raises if encoding fails so caller can report it.
"""
final_path = tempfile.mktemp(suffix="_output.mp4")
try:
import ffmpeg
import subprocess
video_in = ffmpeg.input(processed_raw_path)
audio_in = ffmpeg.input(original_video_path)
# Build output streams
streams = [video_in.video]
# Only attach audio if the source has an audio track
try:
probe = ffmpeg.probe(original_video_path)
has_audio = any(s["codec_type"] == "audio" for s in probe["streams"])
except Exception:
has_audio = False
if has_audio:
if audio_start > 0:
audio_in = ffmpeg.input(original_video_path, ss=audio_start)
streams.append(audio_in.audio)
out_kwargs = dict(
vcodec="libx264",
crf=18,
preset="fast",
pix_fmt="yuv420p",
**{"vf": "unsharp=3:3:0.3:3:3:0.0"}, # subtle luma sharpening, no ringing
)
if has_audio:
out_kwargs.update(acodec="aac", audio_bitrate="192k")
(
ffmpeg.output(*streams, final_path, **out_kwargs)
.overwrite_output()
.run(quiet=False, capture_stdout=True, capture_stderr=True)
)
# Validate output
if not os.path.exists(final_path) or os.path.getsize(final_path) < 1024:
raise RuntimeError("FFmpeg produced an empty output file.")
return final_path
except ffmpeg.Error as e:
stderr = e.stderr.decode(errors="replace") if e.stderr else ""
print(f"[VideoProcessor] FFmpeg error:\n{stderr}")
# Return the raw file as fallback so the user gets something
return processed_raw_path
except Exception as e:
print(f"[VideoProcessor] FFmpeg encode failed: {e}")
return processed_raw_path