tushar310 commited on
Commit
95b2ad6
·
verified ·
1 Parent(s): af6b4e3

version 1

Browse files
Files changed (7) hide show
  1. README.md +58 -8
  2. app.py +198 -0
  3. extract_coordinates.py +26 -0
  4. extract_face.py +23 -0
  5. merge_lips.py +30 -0
  6. pipeline.py +323 -0
  7. requirements.txt +5 -0
README.md CHANGED
@@ -1,13 +1,63 @@
1
- ---
2
- title: Face Crop
3
- emoji: 👁
4
- colorFrom: yellow
5
- colorTo: green
6
  sdk: gradio
7
- sdk_version: 6.6.0
8
  app_file: app.py
9
  pinned: false
10
- license: mit
11
  ---
12
 
13
- Check out the configuration reference at https://huggingface.co/docs/hub/spaces-config-reference
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ ---
2
+ title: Dub Module Gradio App
 
 
 
3
  sdk: gradio
 
4
  app_file: app.py
5
  pinned: false
 
6
  ---
7
 
8
+ # Dub Module Gradio App (HF Space Ready)
9
+
10
+ This folder provides a Hugging Face Spaces-ready Gradio app for the pipeline in `instructions.txt`.
11
+
12
+ Implemented workflow:
13
+ - Step 1: Extract face and lip coordinates from the original video.
14
+ - Step 2: Extract cropped face video using face coordinates and allow downloads of:
15
+ - face coordinates (`.pkl`)
16
+ - lip coordinates (`.pkl`)
17
+ - cropped face video (`.mp4`)
18
+ - Step 3: Manual only (not part of the app).
19
+ - Step 4: Merge manual Step 3 output back into the original video and generate final downloadable video.
20
+ - Step 5: Not part of the app.
21
+
22
+ ## Files
23
+ - `app.py`: Gradio interface (Step 1, 2, and 4).
24
+ - `pipeline.py`: Core logic shared by UI and CLI.
25
+ - `extract_coordinates.py`: CLI wrapper for Step 1.
26
+ - `extract_face.py`: CLI wrapper for Step 2.
27
+ - `merge_lips.py`: CLI wrapper for Step 4.
28
+ - `requirements.txt`: Python dependencies.
29
+
30
+ ## Local Run
31
+
32
+ ```bash
33
+ pip install -r requirements.txt
34
+ python app.py
35
+ ```
36
+
37
+ ## Hugging Face Space Setup
38
+ 1. Create a new Gradio Space.
39
+ 2. Upload all files from this folder to the root of the Space.
40
+ 3. Ensure `README.md` and `requirements.txt` are present.
41
+ 4. Space will auto-build and run `app.py`.
42
+
43
+ ## CLI Usage (Optional)
44
+
45
+ Step 1:
46
+ ```bash
47
+ python extract_coordinates.py --video input.mp4 --output-dir outputs
48
+ ```
49
+
50
+ Step 2:
51
+ ```bash
52
+ python extract_face.py --video input.mp4 --face-coords outputs/face_coords_avg.pkl --output outputs/cropped_face.mp4
53
+ ```
54
+
55
+ Step 4:
56
+ ```bash
57
+ python merge_lips.py --original-video input.mp4 --lip-synced-video lipsynced_face.mp4 --face-coords outputs/face_coords_avg.pkl --lip-coords outputs/lip_coords_avg.pkl --output outputs/final.mp4 --audio tts.wav
58
+ ```
59
+
60
+ ## Notes
61
+ - Coordinates are generated per video and should not be reused across unrelated videos.
62
+ - If no external audio is uploaded in Step 4, the app attempts to pull audio from the lip-synced video, then from the original video.
63
+ - Generated files are stored in `work/` during runtime.
app.py ADDED
@@ -0,0 +1,198 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from pathlib import Path
2
+
3
+ import gradio as gr
4
+
5
+ from pipeline import (
6
+ copy_file_to_dir,
7
+ extract_coordinates,
8
+ extract_face_video,
9
+ make_run_dir,
10
+ merge_lips,
11
+ )
12
+
13
+ BASE_DIR = Path(__file__).resolve().parent
14
+ WORK_DIR = BASE_DIR / "work"
15
+ WORK_DIR.mkdir(parents=True, exist_ok=True)
16
+
17
+
18
+ def _normalize_upload_path(file_obj):
19
+ if file_obj is None:
20
+ return None
21
+ if isinstance(file_obj, str):
22
+ return file_obj
23
+ return str(file_obj)
24
+
25
+
26
+ def run_step1(original_video):
27
+ try:
28
+ original_path = _normalize_upload_path(original_video)
29
+ if not original_path:
30
+ raise ValueError("Please upload an original video.")
31
+
32
+ run_dir = make_run_dir(WORK_DIR, "step1")
33
+ local_video = copy_file_to_dir(original_path, run_dir)
34
+
35
+ face_path, lip_path, face_bbox, lip_bbox = extract_coordinates(
36
+ video_path=str(local_video),
37
+ output_dir=str(run_dir),
38
+ face_name="face_coords_avg.pkl",
39
+ lip_name="lip_coords_avg.pkl",
40
+ )
41
+
42
+ status = (
43
+ "Step 1 completed. "
44
+ f"Face bbox: {face_bbox}. "
45
+ f"Lip bbox: {lip_bbox}."
46
+ )
47
+ return status, face_path, lip_path
48
+ except Exception as exc:
49
+ return f"Step 1 failed: {exc}", None, None
50
+
51
+
52
+ def run_step2(original_video, face_coords, lip_coords):
53
+ try:
54
+ original_path = _normalize_upload_path(original_video)
55
+ face_path = _normalize_upload_path(face_coords)
56
+ lip_path = _normalize_upload_path(lip_coords)
57
+
58
+ if not original_path:
59
+ raise ValueError("Please upload the original video.")
60
+ if not face_path:
61
+ raise ValueError("Please upload face coordinates (.pkl).")
62
+
63
+ run_dir = make_run_dir(WORK_DIR, "step2")
64
+ local_video = copy_file_to_dir(original_path, run_dir)
65
+ local_face = copy_file_to_dir(face_path, run_dir, "face_coords_avg.pkl")
66
+
67
+ local_lip = None
68
+ if lip_path:
69
+ local_lip = copy_file_to_dir(lip_path, run_dir, "lip_coords_avg.pkl")
70
+
71
+ cropped_face_path = run_dir / "cropped_face.mp4"
72
+ extract_face_video(
73
+ video_path=str(local_video),
74
+ face_coords_path=str(local_face),
75
+ output_path=str(cropped_face_path),
76
+ )
77
+
78
+ status = "Step 2 completed. Download cropped face video and coordinate files below."
79
+ return status, str(cropped_face_path), str(cropped_face_path), str(local_face), str(local_lip) if local_lip else None
80
+ except Exception as exc:
81
+ return f"Step 2 failed: {exc}", None, None, None, None
82
+
83
+
84
+ def run_step4(original_video, lip_synced_video, face_coords, lip_coords, audio_file):
85
+ try:
86
+ original_path = _normalize_upload_path(original_video)
87
+ lipsynced_path = _normalize_upload_path(lip_synced_video)
88
+ face_path = _normalize_upload_path(face_coords)
89
+ lip_path = _normalize_upload_path(lip_coords)
90
+ audio_path = _normalize_upload_path(audio_file)
91
+
92
+ if not original_path:
93
+ raise ValueError("Please upload the original video.")
94
+ if not lipsynced_path:
95
+ raise ValueError("Please upload the lip-synced face video from Step 3.")
96
+ if not face_path:
97
+ raise ValueError("Please upload face coordinates (.pkl).")
98
+ if not lip_path:
99
+ raise ValueError("Please upload lip coordinates (.pkl).")
100
+
101
+ run_dir = make_run_dir(WORK_DIR, "step4")
102
+ local_original = copy_file_to_dir(original_path, run_dir, "original_video.mp4")
103
+ local_lipsynced = copy_file_to_dir(lipsynced_path, run_dir, "lip_synced_face_video.mp4")
104
+ local_face = copy_file_to_dir(face_path, run_dir, "face_coords_avg.pkl")
105
+ local_lip = copy_file_to_dir(lip_path, run_dir, "lip_coords_avg.pkl")
106
+
107
+ local_audio = None
108
+ if audio_path:
109
+ local_audio = copy_file_to_dir(audio_path, run_dir)
110
+
111
+ final_path = run_dir / "final_synced_output.mp4"
112
+ final_video_path, audio_used = merge_lips(
113
+ original_video_path=str(local_original),
114
+ lip_synced_video_path=str(local_lipsynced),
115
+ face_coords_path=str(local_face),
116
+ lip_coords_path=str(local_lip),
117
+ final_output_path=str(final_path),
118
+ audio_path=str(local_audio) if local_audio else None,
119
+ )
120
+
121
+ status = f"Step 4 completed. Final video generated. Audio source used: {audio_used}"
122
+ return status, final_video_path, final_video_path
123
+ except Exception as exc:
124
+ return f"Step 4 failed: {exc}", None, None
125
+
126
+
127
+ with gr.Blocks(title="Dub Module - Steps 1, 2, and 4") as demo:
128
+ gr.Markdown(
129
+ """
130
+ # Dub Module Gradio App (HF Ready)
131
+ This app implements Step 1, Step 2, and Step 4 from your pipeline.
132
+ - Step 3 must be done manually outside this app.
133
+ - Step 5 is not included.
134
+ """
135
+ )
136
+
137
+ with gr.Tab("Step 1 - Extract Coordinates"):
138
+ gr.Markdown("Upload the original video to generate `face_coords_avg.pkl` and `lip_coords_avg.pkl`.")
139
+ s1_video = gr.File(label="Original Video", file_types=["video"], type="filepath")
140
+ s1_run = gr.Button("Run Step 1")
141
+ s1_status = gr.Textbox(label="Status", interactive=False)
142
+ s1_face = gr.File(label="Face Coordinates (.pkl)")
143
+ s1_lip = gr.File(label="Lip Coordinates (.pkl)")
144
+ s1_run.click(fn=run_step1, inputs=[s1_video], outputs=[s1_status, s1_face, s1_lip])
145
+
146
+ with gr.Tab("Step 2 - Extract Cropped Face Video"):
147
+ gr.Markdown(
148
+ "Upload original video and face coordinates. Lip coordinates are optional here, "
149
+ "but if provided they are returned for download as requested."
150
+ )
151
+ s2_video = gr.File(label="Original Video", file_types=["video"], type="filepath")
152
+ s2_face = gr.File(label="Face Coordinates (.pkl)", file_types=[".pkl"], type="filepath")
153
+ s2_lip = gr.File(label="Lip Coordinates (.pkl) - optional", file_types=[".pkl"], type="filepath")
154
+ s2_run = gr.Button("Run Step 2")
155
+ s2_status = gr.Textbox(label="Status", interactive=False)
156
+ s2_preview = gr.Video(label="Cropped Face Video Preview")
157
+ s2_video_file = gr.File(label="Download Cropped Face Video")
158
+ s2_face_out = gr.File(label="Download Face Coordinates")
159
+ s2_lip_out = gr.File(label="Download Lip Coordinates")
160
+ s2_run.click(
161
+ fn=run_step2,
162
+ inputs=[s2_video, s2_face, s2_lip],
163
+ outputs=[s2_status, s2_preview, s2_video_file, s2_face_out, s2_lip_out],
164
+ )
165
+
166
+ with gr.Tab("Step 3 - Manual (Outside App)"):
167
+ gr.Markdown(
168
+ """
169
+ Run your Step 3 lip-sync process manually using the cropped face video from Step 2.
170
+ After Step 3, return to Step 4 and upload:
171
+ 1. Original video
172
+ 2. Lip-synced face video from your external tool
173
+ 3. Face coordinates pkl
174
+ 4. Lip coordinates pkl
175
+ 5. Optional audio file used during lip-sync
176
+ """
177
+ )
178
+
179
+ with gr.Tab("Step 4 - Merge and Final Output"):
180
+ gr.Markdown("Merge the lip-synced lips back to original video and download the final output.")
181
+ s4_original = gr.File(label="Original Video", file_types=["video"], type="filepath")
182
+ s4_lipsynced = gr.File(label="Lip-synced Face Video", file_types=["video"], type="filepath")
183
+ s4_face = gr.File(label="Face Coordinates (.pkl)", file_types=[".pkl"], type="filepath")
184
+ s4_lip = gr.File(label="Lip Coordinates (.pkl)", file_types=[".pkl"], type="filepath")
185
+ s4_audio = gr.File(label="Audio from Step 3 (optional)", file_types=["audio"], type="filepath")
186
+ s4_run = gr.Button("Run Step 4")
187
+ s4_status = gr.Textbox(label="Status", interactive=False)
188
+ s4_preview = gr.Video(label="Final Video Preview")
189
+ s4_file = gr.File(label="Download Final Video")
190
+ s4_run.click(
191
+ fn=run_step4,
192
+ inputs=[s4_original, s4_lipsynced, s4_face, s4_lip, s4_audio],
193
+ outputs=[s4_status, s4_preview, s4_file],
194
+ )
195
+
196
+
197
+ if __name__ == "__main__":
198
+ demo.launch()
extract_coordinates.py ADDED
@@ -0,0 +1,26 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import argparse
2
+
3
+ from pipeline import extract_coordinates
4
+
5
+
6
+ def main() -> None:
7
+ parser = argparse.ArgumentParser(description="Step 1: Extract face/lip coordinates from a video.")
8
+ parser.add_argument("--video", required=True, help="Path to input video")
9
+ parser.add_argument("--output-dir", default=".", help="Directory to store output pkl files")
10
+ parser.add_argument("--face-name", default="face_coords_avg.pkl", help="Output face coordinates filename")
11
+ parser.add_argument("--lip-name", default="lip_coords_avg.pkl", help="Output lip coordinates filename")
12
+ args = parser.parse_args()
13
+
14
+ face_path, lip_path, face_bbox, lip_bbox = extract_coordinates(
15
+ video_path=args.video,
16
+ output_dir=args.output_dir,
17
+ face_name=args.face_name,
18
+ lip_name=args.lip_name,
19
+ )
20
+
21
+ print(f"Face coordinates: {face_path} -> {face_bbox}")
22
+ print(f"Lip coordinates: {lip_path} -> {lip_bbox}")
23
+
24
+
25
+ if __name__ == "__main__":
26
+ main()
extract_face.py ADDED
@@ -0,0 +1,23 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import argparse
2
+
3
+ from pipeline import extract_face_video
4
+
5
+
6
+ def main() -> None:
7
+ parser = argparse.ArgumentParser(description="Step 2: Extract cropped face video using face coordinates.")
8
+ parser.add_argument("--video", required=True, help="Path to original video")
9
+ parser.add_argument("--face-coords", required=True, help="Path to face coordinates pkl")
10
+ parser.add_argument("--output", default="extracted_face.mp4", help="Output cropped video path")
11
+ args = parser.parse_args()
12
+
13
+ output_path = extract_face_video(
14
+ video_path=args.video,
15
+ face_coords_path=args.face_coords,
16
+ output_path=args.output,
17
+ )
18
+
19
+ print(f"Cropped face video: {output_path}")
20
+
21
+
22
+ if __name__ == "__main__":
23
+ main()
merge_lips.py ADDED
@@ -0,0 +1,30 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import argparse
2
+
3
+ from pipeline import merge_lips
4
+
5
+
6
+ def main() -> None:
7
+ parser = argparse.ArgumentParser(description="Step 4: Merge lip-synced face region back into original video.")
8
+ parser.add_argument("--original-video", required=True, help="Path to original video")
9
+ parser.add_argument("--lip-synced-video", required=True, help="Path to lip-synced face video from external module")
10
+ parser.add_argument("--face-coords", required=True, help="Path to face coordinates pkl")
11
+ parser.add_argument("--lip-coords", required=True, help="Path to lip coordinates pkl")
12
+ parser.add_argument("--output", default="final_synced_output.mp4", help="Output final merged video path")
13
+ parser.add_argument("--audio", default=None, help="Optional external audio path from Step 3")
14
+ args = parser.parse_args()
15
+
16
+ final_path, audio_used = merge_lips(
17
+ original_video_path=args.original_video,
18
+ lip_synced_video_path=args.lip_synced_video,
19
+ face_coords_path=args.face_coords,
20
+ lip_coords_path=args.lip_coords,
21
+ final_output_path=args.output,
22
+ audio_path=args.audio,
23
+ )
24
+
25
+ print(f"Final merged video: {final_path}")
26
+ print(f"Audio source used: {audio_used}")
27
+
28
+
29
+ if __name__ == "__main__":
30
+ main()
pipeline.py ADDED
@@ -0,0 +1,323 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import pickle
2
+ import shutil
3
+ import subprocess
4
+ import uuid
5
+ from pathlib import Path
6
+ from typing import Optional, Sequence, Tuple
7
+
8
+ import cv2
9
+ import imageio_ffmpeg
10
+ import mediapipe as mp
11
+ import numpy as np
12
+
13
+ LIP_INDICES = [
14
+ 61, 146, 91, 181, 84, 17, 314, 405, 321, 375,
15
+ 291, 409, 270, 269, 267, 0, 37, 39, 40, 185,
16
+ ]
17
+
18
+
19
+ def ensure_dir(path: Path) -> Path:
20
+ path.mkdir(parents=True, exist_ok=True)
21
+ return path
22
+
23
+
24
+ def make_run_dir(base_dir: Path, prefix: str) -> Path:
25
+ run_dir = ensure_dir(base_dir) / f"{prefix}_{uuid.uuid4().hex}"
26
+ return ensure_dir(run_dir)
27
+
28
+
29
+ def copy_file_to_dir(source_path: str, target_dir: Path, target_name: Optional[str] = None) -> Path:
30
+ source = Path(source_path)
31
+ if not source.exists():
32
+ raise FileNotFoundError(f"Input file not found: {source_path}")
33
+
34
+ if target_name is None:
35
+ target_name = source.name
36
+
37
+ target_path = target_dir / target_name
38
+ shutil.copy2(source, target_path)
39
+ return target_path
40
+
41
+
42
+ def get_bbox(
43
+ landmarks,
44
+ indices: Sequence[int],
45
+ iw: int,
46
+ ih: int,
47
+ scale_w: float = 1.5,
48
+ scale_h: float = 1.5,
49
+ top_padding: int = 0,
50
+ ) -> Tuple[int, int, int, int]:
51
+ coords = [(landmarks[i].x * iw, landmarks[i].y * ih) for i in indices]
52
+ x_min, y_min = np.min(coords, axis=0)
53
+ x_max, y_max = np.max(coords, axis=0)
54
+
55
+ w = x_max - x_min
56
+ h = y_max - y_min
57
+ new_w = int(w * scale_w)
58
+ new_h = int(h * scale_h)
59
+
60
+ x = max(0, int(x_min - (new_w - w) // 2))
61
+ y = max(0, int(y_min - (new_h - h) // 2) - top_padding)
62
+ new_w = min(new_w, iw - x)
63
+ new_h = min(new_h + top_padding, ih - y)
64
+ return (x, y, new_w, new_h)
65
+
66
+
67
+ def _load_coords(coords_path: str) -> Tuple[int, int, int, int]:
68
+ with open(coords_path, "rb") as handle:
69
+ coords = pickle.load(handle)
70
+
71
+ if len(coords) != 4:
72
+ raise ValueError(f"Invalid coordinates in {coords_path}: expected 4 values, got {len(coords)}")
73
+
74
+ return tuple(int(v) for v in coords)
75
+
76
+
77
+ def extract_coordinates(
78
+ video_path: str,
79
+ output_dir: str,
80
+ face_name: str = "face_coords_avg.pkl",
81
+ lip_name: str = "lip_coords_avg.pkl",
82
+ ) -> Tuple[str, str, Tuple[int, int, int, int], Tuple[int, int, int, int]]:
83
+ output_root = ensure_dir(Path(output_dir))
84
+ face_out = output_root / face_name
85
+ lip_out = output_root / lip_name
86
+
87
+ cap = cv2.VideoCapture(video_path)
88
+ if not cap.isOpened():
89
+ raise ValueError(f"Could not open video: {video_path}")
90
+
91
+ mp_face_mesh = mp.solutions.face_mesh
92
+ face_mesh = mp_face_mesh.FaceMesh(
93
+ static_image_mode=False,
94
+ max_num_faces=1,
95
+ refine_landmarks=True,
96
+ min_detection_confidence=0.8,
97
+ )
98
+
99
+ face_bbox_list = []
100
+ lip_bbox_list = []
101
+
102
+ while cap.isOpened():
103
+ ret, frame = cap.read()
104
+ if not ret:
105
+ break
106
+
107
+ image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
108
+ results = face_mesh.process(image_rgb)
109
+
110
+ if results.multi_face_landmarks:
111
+ for face_landmarks in results.multi_face_landmarks:
112
+ ih, iw, _ = frame.shape
113
+ face_bbox = get_bbox(
114
+ face_landmarks.landmark,
115
+ range(len(face_landmarks.landmark)),
116
+ iw,
117
+ ih,
118
+ scale_w=1.2,
119
+ scale_h=1.2,
120
+ )
121
+ lip_bbox_unclipped = get_bbox(
122
+ face_landmarks.landmark,
123
+ LIP_INDICES,
124
+ iw,
125
+ ih,
126
+ scale_w=1.5,
127
+ scale_h=1.5,
128
+ top_padding=20,
129
+ )
130
+
131
+ x_face, y_face, w_face, h_face = face_bbox
132
+ x_lip, y_lip, w_lip, h_lip = lip_bbox_unclipped
133
+
134
+ x_lip = max(x_face, x_lip)
135
+ y_lip = max(y_face, y_lip)
136
+ w_lip = min(w_lip, x_face + w_face - x_lip)
137
+ h_lip = min(h_lip, y_face + h_face - y_lip)
138
+
139
+ if w_lip > 0 and h_lip > 0:
140
+ face_bbox_list.append(face_bbox)
141
+ lip_bbox_list.append((x_lip, y_lip, w_lip, h_lip))
142
+
143
+ cap.release()
144
+ face_mesh.close()
145
+
146
+ if not face_bbox_list or not lip_bbox_list:
147
+ raise ValueError("No faces detected in the video. Check the video quality and framing.")
148
+
149
+ avg_face_bbox = np.mean(np.array(face_bbox_list), axis=0).astype(int)
150
+ avg_lip_bbox = np.mean(np.array(lip_bbox_list), axis=0).astype(int)
151
+
152
+ with open(face_out, "wb") as handle:
153
+ pickle.dump(tuple(int(v) for v in avg_face_bbox), handle)
154
+ with open(lip_out, "wb") as handle:
155
+ pickle.dump(tuple(int(v) for v in avg_lip_bbox), handle)
156
+
157
+ return (
158
+ str(face_out),
159
+ str(lip_out),
160
+ tuple(int(v) for v in avg_face_bbox),
161
+ tuple(int(v) for v in avg_lip_bbox),
162
+ )
163
+
164
+
165
+ def extract_face_video(video_path: str, face_coords_path: str, output_path: str) -> str:
166
+ x, y, w, h = _load_coords(face_coords_path)
167
+
168
+ cap = cv2.VideoCapture(video_path)
169
+ if not cap.isOpened():
170
+ raise ValueError(f"Could not open video: {video_path}")
171
+
172
+ fps = cap.get(cv2.CAP_PROP_FPS)
173
+ if fps <= 0:
174
+ fps = 25.0
175
+
176
+ frame_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
177
+ frame_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
178
+
179
+ x = max(0, min(x, frame_w - 1))
180
+ y = max(0, min(y, frame_h - 1))
181
+ w = max(1, min(w, frame_w - x))
182
+ h = max(1, min(h, frame_h - y))
183
+
184
+ out = cv2.VideoWriter(
185
+ output_path,
186
+ cv2.VideoWriter_fourcc(*"mp4v"),
187
+ fps,
188
+ (w, h),
189
+ )
190
+
191
+ frame_count = 0
192
+ while cap.isOpened():
193
+ ret, frame = cap.read()
194
+ if not ret:
195
+ break
196
+ face_img = frame[y:y + h, x:x + w]
197
+ out.write(face_img)
198
+ frame_count += 1
199
+
200
+ cap.release()
201
+ out.release()
202
+
203
+ if frame_count == 0:
204
+ raise ValueError("No frames were written for cropped face video.")
205
+
206
+ return output_path
207
+
208
+
209
+ def _mux_audio(video_no_audio: str, audio_source: str, output_path: str) -> bool:
210
+ ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe()
211
+ cmd = [
212
+ ffmpeg_exe,
213
+ "-y",
214
+ "-i",
215
+ video_no_audio,
216
+ "-i",
217
+ audio_source,
218
+ "-map",
219
+ "0:v:0",
220
+ "-map",
221
+ "1:a:0",
222
+ "-c:v",
223
+ "copy",
224
+ "-c:a",
225
+ "aac",
226
+ "-shortest",
227
+ output_path,
228
+ ]
229
+ result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
230
+ return result.returncode == 0 and Path(output_path).exists()
231
+
232
+
233
+ def merge_lips(
234
+ original_video_path: str,
235
+ lip_synced_video_path: str,
236
+ face_coords_path: str,
237
+ lip_coords_path: str,
238
+ final_output_path: str,
239
+ audio_path: Optional[str] = None,
240
+ ) -> Tuple[str, str]:
241
+ x_face, y_face, w_face, h_face = _load_coords(face_coords_path)
242
+ x_lip, y_lip, w_lip, h_lip = _load_coords(lip_coords_path)
243
+
244
+ lip_rel_x = (x_lip - x_face) / max(1, w_face)
245
+ lip_rel_y = (y_lip - y_face) / max(1, h_face)
246
+ lip_rel_w = w_lip / max(1, w_face)
247
+ lip_rel_h = h_lip / max(1, h_face)
248
+
249
+ original_cap = cv2.VideoCapture(original_video_path)
250
+ lip_synced_cap = cv2.VideoCapture(lip_synced_video_path)
251
+
252
+ if not original_cap.isOpened():
253
+ raise ValueError(f"Could not open original video: {original_video_path}")
254
+ if not lip_synced_cap.isOpened():
255
+ raise ValueError(f"Could not open lip-synced video: {lip_synced_video_path}")
256
+
257
+ fps = original_cap.get(cv2.CAP_PROP_FPS)
258
+ if fps <= 0:
259
+ fps = 25.0
260
+
261
+ frame_w = int(original_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
262
+ frame_h = int(original_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
263
+
264
+ intermediate_path = str(Path(final_output_path).with_name("merged_no_audio.mp4"))
265
+ out_final = cv2.VideoWriter(
266
+ intermediate_path,
267
+ cv2.VideoWriter_fourcc(*"mp4v"),
268
+ fps,
269
+ (frame_w, frame_h),
270
+ )
271
+
272
+ frames_written = 0
273
+ while original_cap.isOpened():
274
+ ret, original_frame = original_cap.read()
275
+ if not ret:
276
+ break
277
+
278
+ ret_lip, lip_synced_frame = lip_synced_cap.read()
279
+ if ret_lip:
280
+ lip_x_in_face = int(lip_rel_x * w_face)
281
+ lip_y_in_face = int(lip_rel_y * h_face)
282
+ lip_w_in_face = int(lip_rel_w * w_face)
283
+ lip_h_in_face = int(lip_rel_h * h_face)
284
+
285
+ lip_x_in_face = max(0, lip_x_in_face)
286
+ lip_y_in_face = max(0, lip_y_in_face)
287
+ lip_w_in_face = max(1, min(lip_w_in_face, lip_synced_frame.shape[1] - lip_x_in_face))
288
+ lip_h_in_face = max(1, min(lip_h_in_face, lip_synced_frame.shape[0] - lip_y_in_face))
289
+
290
+ lip_synced_lip = lip_synced_frame[
291
+ lip_y_in_face:lip_y_in_face + lip_h_in_face,
292
+ lip_x_in_face:lip_x_in_face + lip_w_in_face,
293
+ ]
294
+
295
+ if lip_synced_lip.size > 0:
296
+ target_x = max(0, min(x_lip, frame_w - 1))
297
+ target_y = max(0, min(y_lip, frame_h - 1))
298
+ target_w = max(1, min(w_lip, frame_w - target_x))
299
+ target_h = max(1, min(h_lip, frame_h - target_y))
300
+ lip_synced_lip_resized = cv2.resize(lip_synced_lip, (target_w, target_h))
301
+ original_frame[target_y:target_y + target_h, target_x:target_x + target_w] = lip_synced_lip_resized
302
+
303
+ out_final.write(original_frame)
304
+ frames_written += 1
305
+
306
+ original_cap.release()
307
+ lip_synced_cap.release()
308
+ out_final.release()
309
+
310
+ if frames_written == 0:
311
+ raise ValueError("No frames written while merging lips.")
312
+
313
+ audio_candidates = []
314
+ if audio_path:
315
+ audio_candidates.append(audio_path)
316
+ audio_candidates.extend([lip_synced_video_path, original_video_path])
317
+
318
+ for candidate in audio_candidates:
319
+ if candidate and Path(candidate).exists() and _mux_audio(intermediate_path, candidate, final_output_path):
320
+ return final_output_path, candidate
321
+
322
+ shutil.copy2(intermediate_path, final_output_path)
323
+ return final_output_path, "none"
requirements.txt ADDED
@@ -0,0 +1,5 @@
 
 
 
 
 
 
1
+ gradio>=5.0.0
2
+ opencv-python-headless>=4.8.0
3
+ mediapipe>=0.10.0
4
+ numpy>=1.24.0
5
+ imageio-ffmpeg>=0.4.9