| import pickle |
| import shutil |
| import subprocess |
| import uuid |
| from pathlib import Path |
| from typing import Optional, Sequence, Tuple |
|
|
| import cv2 |
| import imageio_ffmpeg |
| import numpy as np |
|
|
| LIP_INDICES = [ |
| 61, 146, 91, 181, 84, 17, 314, 405, 321, 375, |
| 291, 409, 270, 269, 267, 0, 37, 39, 40, 185, |
| ] |
|
|
|
|
| def _create_face_mesh(): |
| |
| |
| try: |
| from mediapipe.python.solutions.face_mesh import FaceMesh |
| except Exception: |
| try: |
| import mediapipe as mp |
|
|
| FaceMesh = mp.solutions.face_mesh.FaceMesh |
| except Exception as exc: |
| raise ImportError( |
| "MediaPipe FaceMesh import failed. Install mediapipe==0.10.14." |
| ) from exc |
|
|
| return FaceMesh( |
| static_image_mode=False, |
| max_num_faces=1, |
| refine_landmarks=True, |
| min_detection_confidence=0.8, |
| ) |
|
|
|
|
| def ensure_dir(path: Path) -> Path: |
| path.mkdir(parents=True, exist_ok=True) |
| return path |
|
|
|
|
| def make_run_dir(base_dir: Path, prefix: str) -> Path: |
| run_dir = ensure_dir(base_dir) / f"{prefix}_{uuid.uuid4().hex}" |
| return ensure_dir(run_dir) |
|
|
|
|
| def copy_file_to_dir(source_path: str, target_dir: Path, target_name: Optional[str] = None) -> Path: |
| source = Path(source_path) |
| if not source.exists(): |
| raise FileNotFoundError(f"Input file not found: {source_path}") |
|
|
| if target_name is None: |
| target_name = source.name |
|
|
| target_path = target_dir / target_name |
| shutil.copy2(source, target_path) |
| return target_path |
|
|
|
|
| def get_bbox( |
| landmarks, |
| indices: Sequence[int], |
| iw: int, |
| ih: int, |
| scale_w: float = 1.5, |
| scale_h: float = 1.5, |
| top_padding: int = 0, |
| ) -> Tuple[int, int, int, int]: |
| coords = [(landmarks[i].x * iw, landmarks[i].y * ih) for i in indices] |
| x_min, y_min = np.min(coords, axis=0) |
| x_max, y_max = np.max(coords, axis=0) |
|
|
| w = x_max - x_min |
| h = y_max - y_min |
| new_w = int(w * scale_w) |
| new_h = int(h * scale_h) |
|
|
| x = max(0, int(x_min - (new_w - w) // 2)) |
| y = max(0, int(y_min - (new_h - h) // 2) - top_padding) |
| new_w = min(new_w, iw - x) |
| new_h = min(new_h + top_padding, ih - y) |
| return (x, y, new_w, new_h) |
|
|
|
|
| def _load_coords(coords_path: str) -> Tuple[int, int, int, int]: |
| with open(coords_path, "rb") as handle: |
| coords = pickle.load(handle) |
|
|
| if len(coords) != 4: |
| raise ValueError(f"Invalid coordinates in {coords_path}: expected 4 values, got {len(coords)}") |
|
|
| return tuple(int(v) for v in coords) |
|
|
|
|
| def extract_coordinates( |
| video_path: str, |
| output_dir: str, |
| face_name: str = "face_coords_avg.pkl", |
| lip_name: str = "lip_coords_avg.pkl", |
| ) -> Tuple[str, str, Tuple[int, int, int, int], Tuple[int, int, int, int]]: |
| output_root = ensure_dir(Path(output_dir)) |
| face_out = output_root / face_name |
| lip_out = output_root / lip_name |
|
|
| cap = cv2.VideoCapture(video_path) |
| if not cap.isOpened(): |
| raise ValueError(f"Could not open video: {video_path}") |
|
|
| face_mesh = _create_face_mesh() |
|
|
| face_bbox_list = [] |
| lip_bbox_list = [] |
|
|
| while cap.isOpened(): |
| ret, frame = cap.read() |
| if not ret: |
| break |
|
|
| image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| results = face_mesh.process(image_rgb) |
|
|
| if results.multi_face_landmarks: |
| for face_landmarks in results.multi_face_landmarks: |
| ih, iw, _ = frame.shape |
| face_bbox = get_bbox( |
| face_landmarks.landmark, |
| range(len(face_landmarks.landmark)), |
| iw, |
| ih, |
| scale_w=1.2, |
| scale_h=1.2, |
| ) |
| lip_bbox_unclipped = get_bbox( |
| face_landmarks.landmark, |
| LIP_INDICES, |
| iw, |
| ih, |
| scale_w=1.5, |
| scale_h=1.5, |
| top_padding=20, |
| ) |
|
|
| x_face, y_face, w_face, h_face = face_bbox |
| x_lip, y_lip, w_lip, h_lip = lip_bbox_unclipped |
|
|
| x_lip = max(x_face, x_lip) |
| y_lip = max(y_face, y_lip) |
| w_lip = min(w_lip, x_face + w_face - x_lip) |
| h_lip = min(h_lip, y_face + h_face - y_lip) |
|
|
| if w_lip > 0 and h_lip > 0: |
| face_bbox_list.append(face_bbox) |
| lip_bbox_list.append((x_lip, y_lip, w_lip, h_lip)) |
|
|
| cap.release() |
| face_mesh.close() |
|
|
| if not face_bbox_list or not lip_bbox_list: |
| raise ValueError("No faces detected in the video. Check the video quality and framing.") |
|
|
| avg_face_bbox = np.mean(np.array(face_bbox_list), axis=0).astype(int) |
| avg_lip_bbox = np.mean(np.array(lip_bbox_list), axis=0).astype(int) |
|
|
| with open(face_out, "wb") as handle: |
| pickle.dump(tuple(int(v) for v in avg_face_bbox), handle) |
| with open(lip_out, "wb") as handle: |
| pickle.dump(tuple(int(v) for v in avg_lip_bbox), handle) |
|
|
| return ( |
| str(face_out), |
| str(lip_out), |
| tuple(int(v) for v in avg_face_bbox), |
| tuple(int(v) for v in avg_lip_bbox), |
| ) |
|
|
|
|
| def extract_face_video(video_path: str, face_coords_path: str, output_path: str) -> str: |
| x, y, w, h = _load_coords(face_coords_path) |
|
|
| cap = cv2.VideoCapture(video_path) |
| if not cap.isOpened(): |
| raise ValueError(f"Could not open video: {video_path}") |
|
|
| fps = cap.get(cv2.CAP_PROP_FPS) |
| if fps <= 0: |
| fps = 25.0 |
|
|
| frame_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| frame_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
| x = max(0, min(x, frame_w - 1)) |
| y = max(0, min(y, frame_h - 1)) |
| w = max(1, min(w, frame_w - x)) |
| h = max(1, min(h, frame_h - y)) |
|
|
| out = cv2.VideoWriter( |
| output_path, |
| cv2.VideoWriter_fourcc(*"mp4v"), |
| fps, |
| (w, h), |
| ) |
|
|
| frame_count = 0 |
| while cap.isOpened(): |
| ret, frame = cap.read() |
| if not ret: |
| break |
| face_img = frame[y:y + h, x:x + w] |
| out.write(face_img) |
| frame_count += 1 |
|
|
| cap.release() |
| out.release() |
|
|
| if frame_count == 0: |
| raise ValueError("No frames were written for cropped face video.") |
|
|
| return output_path |
|
|
|
|
| def _mux_audio(video_no_audio: str, audio_source: str, output_path: str) -> bool: |
| ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe() |
| cmd = [ |
| ffmpeg_exe, |
| "-y", |
| "-i", |
| video_no_audio, |
| "-i", |
| audio_source, |
| "-map", |
| "0:v:0", |
| "-map", |
| "1:a:0", |
| "-c:v", |
| "copy", |
| "-c:a", |
| "aac", |
| "-shortest", |
| output_path, |
| ] |
| result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) |
| return result.returncode == 0 and Path(output_path).exists() |
|
|
|
|
| def merge_lips( |
| original_video_path: str, |
| lip_synced_video_path: str, |
| face_coords_path: str, |
| lip_coords_path: str, |
| final_output_path: str, |
| audio_path: Optional[str] = None, |
| ) -> Tuple[str, str]: |
| x_face, y_face, w_face, h_face = _load_coords(face_coords_path) |
| x_lip, y_lip, w_lip, h_lip = _load_coords(lip_coords_path) |
|
|
| lip_rel_x = (x_lip - x_face) / max(1, w_face) |
| lip_rel_y = (y_lip - y_face) / max(1, h_face) |
| lip_rel_w = w_lip / max(1, w_face) |
| lip_rel_h = h_lip / max(1, h_face) |
|
|
| original_cap = cv2.VideoCapture(original_video_path) |
| lip_synced_cap = cv2.VideoCapture(lip_synced_video_path) |
|
|
| if not original_cap.isOpened(): |
| raise ValueError(f"Could not open original video: {original_video_path}") |
| if not lip_synced_cap.isOpened(): |
| raise ValueError(f"Could not open lip-synced video: {lip_synced_video_path}") |
|
|
| fps = original_cap.get(cv2.CAP_PROP_FPS) |
| if fps <= 0: |
| fps = 25.0 |
|
|
| frame_w = int(original_cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| frame_h = int(original_cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
| intermediate_path = str(Path(final_output_path).with_name("merged_no_audio.mp4")) |
| out_final = cv2.VideoWriter( |
| intermediate_path, |
| cv2.VideoWriter_fourcc(*"mp4v"), |
| fps, |
| (frame_w, frame_h), |
| ) |
|
|
| frames_written = 0 |
| while original_cap.isOpened(): |
| ret, original_frame = original_cap.read() |
| if not ret: |
| break |
|
|
| ret_lip, lip_synced_frame = lip_synced_cap.read() |
| if ret_lip: |
| lip_x_in_face = int(lip_rel_x * w_face) |
| lip_y_in_face = int(lip_rel_y * h_face) |
| lip_w_in_face = int(lip_rel_w * w_face) |
| lip_h_in_face = int(lip_rel_h * h_face) |
|
|
| lip_x_in_face = max(0, lip_x_in_face) |
| lip_y_in_face = max(0, lip_y_in_face) |
| lip_w_in_face = max(1, min(lip_w_in_face, lip_synced_frame.shape[1] - lip_x_in_face)) |
| lip_h_in_face = max(1, min(lip_h_in_face, lip_synced_frame.shape[0] - lip_y_in_face)) |
|
|
| lip_synced_lip = lip_synced_frame[ |
| lip_y_in_face:lip_y_in_face + lip_h_in_face, |
| lip_x_in_face:lip_x_in_face + lip_w_in_face, |
| ] |
|
|
| if lip_synced_lip.size > 0: |
| target_x = max(0, min(x_lip, frame_w - 1)) |
| target_y = max(0, min(y_lip, frame_h - 1)) |
| target_w = max(1, min(w_lip, frame_w - target_x)) |
| target_h = max(1, min(h_lip, frame_h - target_y)) |
| lip_synced_lip_resized = cv2.resize(lip_synced_lip, (target_w, target_h)) |
| original_frame[target_y:target_y + target_h, target_x:target_x + target_w] = lip_synced_lip_resized |
|
|
| out_final.write(original_frame) |
| frames_written += 1 |
|
|
| original_cap.release() |
| lip_synced_cap.release() |
| out_final.release() |
|
|
| if frames_written == 0: |
| raise ValueError("No frames written while merging lips.") |
|
|
| audio_candidates = [] |
| if audio_path: |
| audio_candidates.append(audio_path) |
| audio_candidates.extend([lip_synced_video_path, original_video_path]) |
|
|
| for candidate in audio_candidates: |
| if candidate and Path(candidate).exists() and _mux_audio(intermediate_path, candidate, final_output_path): |
| return final_output_path, candidate |
|
|
| shutil.copy2(intermediate_path, final_output_path) |
| return final_output_path, "none"
|
|
|