tushar310's picture
ver 2
0ea43a3 verified
import pickle
import shutil
import subprocess
import uuid
from pathlib import Path
from typing import Optional, Sequence, Tuple
import cv2
import imageio_ffmpeg
import numpy as np
LIP_INDICES = [
61, 146, 91, 181, 84, 17, 314, 405, 321, 375,
291, 409, 270, 269, 267, 0, 37, 39, 40, 185,
]
def _create_face_mesh():
# Compatible across mediapipe package variants where top-level `solutions`
# may not be exposed (seen in some HF builds).
try:
from mediapipe.python.solutions.face_mesh import FaceMesh
except Exception:
try:
import mediapipe as mp
FaceMesh = mp.solutions.face_mesh.FaceMesh
except Exception as exc:
raise ImportError(
"MediaPipe FaceMesh import failed. Install mediapipe==0.10.14."
) from exc
return FaceMesh(
static_image_mode=False,
max_num_faces=1,
refine_landmarks=True,
min_detection_confidence=0.8,
)
def ensure_dir(path: Path) -> Path:
path.mkdir(parents=True, exist_ok=True)
return path
def make_run_dir(base_dir: Path, prefix: str) -> Path:
run_dir = ensure_dir(base_dir) / f"{prefix}_{uuid.uuid4().hex}"
return ensure_dir(run_dir)
def copy_file_to_dir(source_path: str, target_dir: Path, target_name: Optional[str] = None) -> Path:
source = Path(source_path)
if not source.exists():
raise FileNotFoundError(f"Input file not found: {source_path}")
if target_name is None:
target_name = source.name
target_path = target_dir / target_name
shutil.copy2(source, target_path)
return target_path
def get_bbox(
landmarks,
indices: Sequence[int],
iw: int,
ih: int,
scale_w: float = 1.5,
scale_h: float = 1.5,
top_padding: int = 0,
) -> Tuple[int, int, int, int]:
coords = [(landmarks[i].x * iw, landmarks[i].y * ih) for i in indices]
x_min, y_min = np.min(coords, axis=0)
x_max, y_max = np.max(coords, axis=0)
w = x_max - x_min
h = y_max - y_min
new_w = int(w * scale_w)
new_h = int(h * scale_h)
x = max(0, int(x_min - (new_w - w) // 2))
y = max(0, int(y_min - (new_h - h) // 2) - top_padding)
new_w = min(new_w, iw - x)
new_h = min(new_h + top_padding, ih - y)
return (x, y, new_w, new_h)
def _load_coords(coords_path: str) -> Tuple[int, int, int, int]:
with open(coords_path, "rb") as handle:
coords = pickle.load(handle)
if len(coords) != 4:
raise ValueError(f"Invalid coordinates in {coords_path}: expected 4 values, got {len(coords)}")
return tuple(int(v) for v in coords)
def extract_coordinates(
video_path: str,
output_dir: str,
face_name: str = "face_coords_avg.pkl",
lip_name: str = "lip_coords_avg.pkl",
) -> Tuple[str, str, Tuple[int, int, int, int], Tuple[int, int, int, int]]:
output_root = ensure_dir(Path(output_dir))
face_out = output_root / face_name
lip_out = output_root / lip_name
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise ValueError(f"Could not open video: {video_path}")
face_mesh = _create_face_mesh()
face_bbox_list = []
lip_bbox_list = []
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = face_mesh.process(image_rgb)
if results.multi_face_landmarks:
for face_landmarks in results.multi_face_landmarks:
ih, iw, _ = frame.shape
face_bbox = get_bbox(
face_landmarks.landmark,
range(len(face_landmarks.landmark)),
iw,
ih,
scale_w=1.2,
scale_h=1.2,
)
lip_bbox_unclipped = get_bbox(
face_landmarks.landmark,
LIP_INDICES,
iw,
ih,
scale_w=1.5,
scale_h=1.5,
top_padding=20,
)
x_face, y_face, w_face, h_face = face_bbox
x_lip, y_lip, w_lip, h_lip = lip_bbox_unclipped
x_lip = max(x_face, x_lip)
y_lip = max(y_face, y_lip)
w_lip = min(w_lip, x_face + w_face - x_lip)
h_lip = min(h_lip, y_face + h_face - y_lip)
if w_lip > 0 and h_lip > 0:
face_bbox_list.append(face_bbox)
lip_bbox_list.append((x_lip, y_lip, w_lip, h_lip))
cap.release()
face_mesh.close()
if not face_bbox_list or not lip_bbox_list:
raise ValueError("No faces detected in the video. Check the video quality and framing.")
avg_face_bbox = np.mean(np.array(face_bbox_list), axis=0).astype(int)
avg_lip_bbox = np.mean(np.array(lip_bbox_list), axis=0).astype(int)
with open(face_out, "wb") as handle:
pickle.dump(tuple(int(v) for v in avg_face_bbox), handle)
with open(lip_out, "wb") as handle:
pickle.dump(tuple(int(v) for v in avg_lip_bbox), handle)
return (
str(face_out),
str(lip_out),
tuple(int(v) for v in avg_face_bbox),
tuple(int(v) for v in avg_lip_bbox),
)
def extract_face_video(video_path: str, face_coords_path: str, output_path: str) -> str:
x, y, w, h = _load_coords(face_coords_path)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise ValueError(f"Could not open video: {video_path}")
fps = cap.get(cv2.CAP_PROP_FPS)
if fps <= 0:
fps = 25.0
frame_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
x = max(0, min(x, frame_w - 1))
y = max(0, min(y, frame_h - 1))
w = max(1, min(w, frame_w - x))
h = max(1, min(h, frame_h - y))
out = cv2.VideoWriter(
output_path,
cv2.VideoWriter_fourcc(*"mp4v"),
fps,
(w, h),
)
frame_count = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
face_img = frame[y:y + h, x:x + w]
out.write(face_img)
frame_count += 1
cap.release()
out.release()
if frame_count == 0:
raise ValueError("No frames were written for cropped face video.")
return output_path
def _mux_audio(video_no_audio: str, audio_source: str, output_path: str) -> bool:
ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe()
cmd = [
ffmpeg_exe,
"-y",
"-i",
video_no_audio,
"-i",
audio_source,
"-map",
"0:v:0",
"-map",
"1:a:0",
"-c:v",
"copy",
"-c:a",
"aac",
"-shortest",
output_path,
]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
return result.returncode == 0 and Path(output_path).exists()
def merge_lips(
original_video_path: str,
lip_synced_video_path: str,
face_coords_path: str,
lip_coords_path: str,
final_output_path: str,
audio_path: Optional[str] = None,
) -> Tuple[str, str]:
x_face, y_face, w_face, h_face = _load_coords(face_coords_path)
x_lip, y_lip, w_lip, h_lip = _load_coords(lip_coords_path)
lip_rel_x = (x_lip - x_face) / max(1, w_face)
lip_rel_y = (y_lip - y_face) / max(1, h_face)
lip_rel_w = w_lip / max(1, w_face)
lip_rel_h = h_lip / max(1, h_face)
original_cap = cv2.VideoCapture(original_video_path)
lip_synced_cap = cv2.VideoCapture(lip_synced_video_path)
if not original_cap.isOpened():
raise ValueError(f"Could not open original video: {original_video_path}")
if not lip_synced_cap.isOpened():
raise ValueError(f"Could not open lip-synced video: {lip_synced_video_path}")
fps = original_cap.get(cv2.CAP_PROP_FPS)
if fps <= 0:
fps = 25.0
frame_w = int(original_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_h = int(original_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
intermediate_path = str(Path(final_output_path).with_name("merged_no_audio.mp4"))
out_final = cv2.VideoWriter(
intermediate_path,
cv2.VideoWriter_fourcc(*"mp4v"),
fps,
(frame_w, frame_h),
)
frames_written = 0
while original_cap.isOpened():
ret, original_frame = original_cap.read()
if not ret:
break
ret_lip, lip_synced_frame = lip_synced_cap.read()
if ret_lip:
lip_x_in_face = int(lip_rel_x * w_face)
lip_y_in_face = int(lip_rel_y * h_face)
lip_w_in_face = int(lip_rel_w * w_face)
lip_h_in_face = int(lip_rel_h * h_face)
lip_x_in_face = max(0, lip_x_in_face)
lip_y_in_face = max(0, lip_y_in_face)
lip_w_in_face = max(1, min(lip_w_in_face, lip_synced_frame.shape[1] - lip_x_in_face))
lip_h_in_face = max(1, min(lip_h_in_face, lip_synced_frame.shape[0] - lip_y_in_face))
lip_synced_lip = lip_synced_frame[
lip_y_in_face:lip_y_in_face + lip_h_in_face,
lip_x_in_face:lip_x_in_face + lip_w_in_face,
]
if lip_synced_lip.size > 0:
target_x = max(0, min(x_lip, frame_w - 1))
target_y = max(0, min(y_lip, frame_h - 1))
target_w = max(1, min(w_lip, frame_w - target_x))
target_h = max(1, min(h_lip, frame_h - target_y))
lip_synced_lip_resized = cv2.resize(lip_synced_lip, (target_w, target_h))
original_frame[target_y:target_y + target_h, target_x:target_x + target_w] = lip_synced_lip_resized
out_final.write(original_frame)
frames_written += 1
original_cap.release()
lip_synced_cap.release()
out_final.release()
if frames_written == 0:
raise ValueError("No frames written while merging lips.")
audio_candidates = []
if audio_path:
audio_candidates.append(audio_path)
audio_candidates.extend([lip_synced_video_path, original_video_path])
for candidate in audio_candidates:
if candidate and Path(candidate).exists() and _mux_audio(intermediate_path, candidate, final_output_path):
return final_output_path, candidate
shutil.copy2(intermediate_path, final_output_path)
return final_output_path, "none"