Spaces:
Running
Running
| """ | |
| Video processor β extracts frames from an input video, applies face or body | |
| swap to each frame, then re-encodes the result with FFmpeg (audio preserved). | |
| Speed optimisations | |
| ------------------- | |
| * Source face is detected **once** before the loop (never per-frame). | |
| * Target face detection is cached and reused for DET_INTERVAL frames β faces | |
| don't move much between consecutive frames at normal frame rates. | |
| * Video frames are capped at 720p for processing (upscaled back for writing). | |
| * A hard cap of MAX_FRAMES is enforced to keep processing times reasonable on | |
| free CPU tiers. | |
| """ | |
| import cv2 | |
| import os | |
| import tempfile | |
| import numpy as np | |
| from pathlib import Path | |
| MAX_FRAMES = 600 # ~20 s at 30 fps | |
| DET_INTERVAL = 1 # detect faces every frame β caching causes flicker when face moves | |
| class VideoProcessor: | |
| def __init__( | |
| self, | |
| face_swapper=None, | |
| body_swapper=None, | |
| ): | |
| self.face_swapper = face_swapper | |
| self.body_swapper = body_swapper | |
| # ββ Public API ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def process_video( | |
| self, | |
| source_bgr: np.ndarray, | |
| video_path: str, | |
| mode: str = "face", # "face" | "body" | |
| enhance: bool = False, | |
| blend_strength: float = 0.85, | |
| fast_mode: bool = False, # skip every other frame (~2x speed) | |
| start_frame: int = 0, # resume from this frame index | |
| progress=None, | |
| ) -> tuple[str | None, str]: | |
| """ | |
| Process every frame of *video_path*, applying the selected swap mode. | |
| Set *start_frame* > 0 to resume after a dropped connection. | |
| Partial output is always saved β even if processing is interrupted. | |
| Returns: | |
| (output_path, status_message) | |
| """ | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| return None, "Could not open video file." | |
| fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| # Clamp start_frame | |
| start_frame = max(0, min(start_frame, total_frames - 1)) | |
| remaining = total_frames - start_frame | |
| if remaining > MAX_FRAMES: | |
| cap.release() | |
| return None, ( | |
| f"Segment starting at frame {start_frame} has {remaining} frames β " | |
| f"maximum allowed is {MAX_FRAMES} (~{MAX_FRAMES / fps:.0f} s at {fps:.0f} fps). " | |
| "Increase the start frame or trim the video." | |
| ) | |
| # ββ Pre-compute source face once (big win for face-swap mode) βββββββββ | |
| source_face = None | |
| if mode == "face" and self.face_swapper: | |
| source_face = self.face_swapper.get_source_face(source_bgr) | |
| if source_face is None: | |
| cap.release() | |
| return None, "No face detected in source image." | |
| # ββ Seek to start_frame β use FFmpeg cut for instant seek ββββββββββββββ | |
| # cap.set(POS_FRAMES) is slow: OpenCV decodes every frame up to the | |
| # target. FFmpeg keyframe-seeks in milliseconds. | |
| segment_path = None | |
| if start_frame > 0: | |
| start_time = start_frame / fps | |
| segment_path = tempfile.mktemp(suffix="_segment.mp4") | |
| try: | |
| import ffmpeg as _ffmpeg | |
| ( | |
| _ffmpeg.input(video_path, ss=start_time) | |
| .output(segment_path, c="copy", avoid_negative_ts="make_zero") | |
| .overwrite_output() | |
| .run(quiet=True) | |
| ) | |
| cap.release() | |
| cap = cv2.VideoCapture(segment_path) | |
| print(f"[VideoProcessor] Resumed via FFmpeg cut at frame {start_frame} ({start_time:.2f}s)") | |
| except Exception as e: | |
| print(f"[VideoProcessor] FFmpeg seek failed ({e}), falling back to slow seek") | |
| segment_path = None | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame) | |
| # Use AVI + XVID for the intermediate file β far more reliable than | |
| # mp4v on Linux (HF Spaces). FFmpeg converts it to H.264/mp4 after. | |
| # XVID/MJPG require even dimensions β round down if necessary. | |
| enc_w = width - (width % 2) | |
| enc_h = height - (height % 2) | |
| raw_out_path = tempfile.mktemp(suffix="_raw.avi") | |
| fourcc = cv2.VideoWriter_fourcc(*"XVID") | |
| writer = cv2.VideoWriter(raw_out_path, fourcc, fps, (enc_w, enc_h)) | |
| if not writer.isOpened(): | |
| # XVID not available β fall back to MJPG | |
| raw_out_path = tempfile.mktemp(suffix="_raw.avi") | |
| fourcc = cv2.VideoWriter_fourcc(*"MJPG") | |
| writer = cv2.VideoWriter(raw_out_path, fourcc, fps, (enc_w, enc_h)) | |
| frame_idx = start_frame # absolute frame number in the source video | |
| processed = 0 | |
| errors = 0 | |
| cached_tgt_faces = None | |
| last_result = None | |
| try: | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| if progress is not None and total_frames > 0: | |
| progress( | |
| (frame_idx - start_frame) / remaining, | |
| f"Frame {frame_idx + 1} / {total_frames} " | |
| f"(resume at {frame_idx} if interrupted)", | |
| ) | |
| # Fast mode: skip odd frames β write the ORIGINAL frame (not a | |
| # duplicate) so motion stays smooth with no stutter or blur. | |
| # Only applies to face swap; body swap needs every frame. | |
| if fast_mode and mode == "face" and (frame_idx - start_frame) % 2 == 1: | |
| writer.write(frame) # original frame keeps motion fluid | |
| frame_idx += 1 | |
| continue | |
| # Only re-detect target faces every DET_INTERVAL frames | |
| use_cache = (mode == "face") and (frame_idx % DET_INTERVAL != 0) and (cached_tgt_faces is not None) | |
| result_frame, new_faces = self._process_frame( | |
| source_bgr, frame, mode, enhance, blend_strength, | |
| source_face=source_face, | |
| cached_target_faces=cached_tgt_faces if use_cache else None, | |
| ) | |
| if mode == "face" and new_faces is not None: | |
| cached_tgt_faces = new_faces if new_faces else cached_tgt_faces | |
| if result_frame is not None: | |
| # Ensure frame matches writer dimensions (even crop if needed) | |
| rf_h, rf_w = result_frame.shape[:2] | |
| if rf_w != enc_w or rf_h != enc_h: | |
| result_frame = cv2.resize(result_frame, (enc_w, enc_h), interpolation=cv2.INTER_LINEAR) | |
| writer.write(result_frame) | |
| last_result = result_frame | |
| processed += 1 | |
| else: | |
| frm = frame[:enc_h, :enc_w] if (frame.shape[1] > enc_w or frame.shape[0] > enc_h) else frame | |
| if frm.shape[1] != enc_w or frm.shape[0] != enc_h: | |
| frm = cv2.resize(frm, (enc_w, enc_h), interpolation=cv2.INTER_LINEAR) | |
| writer.write(frm) | |
| last_result = frm | |
| errors += 1 | |
| frame_idx += 1 | |
| except Exception as loop_err: | |
| print(f"[VideoProcessor] Loop interrupted at frame {frame_idx}: {loop_err}") | |
| finally: | |
| cap.release() | |
| writer.release() | |
| if segment_path: | |
| try: | |
| os.unlink(segment_path) | |
| except OSError: | |
| pass | |
| frames_done = frame_idx - start_frame | |
| if frames_done == 0: | |
| try: | |
| os.unlink(raw_out_path) | |
| except OSError: | |
| pass | |
| return None, f"No frames processed. Try resuming from frame {start_frame}." | |
| # Re-encode with H.264 and merge original audio via FFmpeg | |
| # Pass start_time so audio lines up with the resumed segment | |
| start_time = start_frame / fps | |
| final_path = self._ffmpeg_encode(video_path, raw_out_path, audio_start=start_time) | |
| try: | |
| os.unlink(raw_out_path) | |
| except OSError: | |
| pass | |
| partial = frames_done < remaining | |
| status = ( | |
| f"{'Partial β ' if partial else ''}Frames {start_frame}β{frame_idx - 1} " | |
| f"({processed} swapped{', ' + str(errors) + ' skipped' if errors else ''}). " | |
| + (f"Resume from frame {frame_idx} to continue." if partial else "Done.") | |
| ) | |
| return final_path, status | |
| # ββ Internal helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _process_frame( | |
| self, | |
| source_bgr: np.ndarray, | |
| frame: np.ndarray, | |
| mode: str, | |
| enhance: bool, | |
| blend_strength: float, | |
| source_face=None, | |
| cached_target_faces=None, | |
| ): | |
| """Returns (result_frame_or_None, detected_faces_or_None).""" | |
| try: | |
| if mode == "face" and self.face_swapper: | |
| result, faces = self.face_swapper.swap_frame( | |
| frame, | |
| source_face, | |
| cached_target_faces=cached_target_faces, | |
| enhance=enhance, | |
| ) | |
| return result, faces | |
| elif mode == "body" and self.body_swapper: | |
| result, _ = self.body_swapper.swap( | |
| source_bgr, frame, blend_strength=blend_strength | |
| ) | |
| return result, None | |
| except Exception as e: | |
| print(f"[VideoProcessor] Frame error: {e}") | |
| return None, None | |
| def _ffmpeg_encode(original_video_path: str, processed_raw_path: str, audio_start: float = 0.0) -> str: | |
| """ | |
| Re-encode processed frames as H.264 mp4 and merge the original audio. | |
| audio_start: seconds into the original audio (for resumed segments). | |
| Returns the output path; raises if encoding fails so caller can report it. | |
| """ | |
| final_path = tempfile.mktemp(suffix="_output.mp4") | |
| try: | |
| import ffmpeg | |
| import subprocess | |
| video_in = ffmpeg.input(processed_raw_path) | |
| audio_in = ffmpeg.input(original_video_path) | |
| # Build output streams | |
| streams = [video_in.video] | |
| # Only attach audio if the source has an audio track | |
| try: | |
| probe = ffmpeg.probe(original_video_path) | |
| has_audio = any(s["codec_type"] == "audio" for s in probe["streams"]) | |
| except Exception: | |
| has_audio = False | |
| if has_audio: | |
| if audio_start > 0: | |
| audio_in = ffmpeg.input(original_video_path, ss=audio_start) | |
| streams.append(audio_in.audio) | |
| out_kwargs = dict( | |
| vcodec="libx264", | |
| crf=18, | |
| preset="fast", | |
| pix_fmt="yuv420p", | |
| **{"vf": "unsharp=3:3:0.3:3:3:0.0"}, # subtle luma sharpening, no ringing | |
| ) | |
| if has_audio: | |
| out_kwargs.update(acodec="aac", audio_bitrate="192k") | |
| ( | |
| ffmpeg.output(*streams, final_path, **out_kwargs) | |
| .overwrite_output() | |
| .run(quiet=False, capture_stdout=True, capture_stderr=True) | |
| ) | |
| # Validate output | |
| if not os.path.exists(final_path) or os.path.getsize(final_path) < 1024: | |
| raise RuntimeError("FFmpeg produced an empty output file.") | |
| return final_path | |
| except ffmpeg.Error as e: | |
| stderr = e.stderr.decode(errors="replace") if e.stderr else "" | |
| print(f"[VideoProcessor] FFmpeg error:\n{stderr}") | |
| # Return the raw file as fallback so the user gets something | |
| return processed_raw_path | |
| except Exception as e: | |
| print(f"[VideoProcessor] FFmpeg encode failed: {e}") | |
| return processed_raw_path | |