|
|
import cv2 |
|
|
import os |
|
|
import shutil |
|
|
import time |
|
|
from SinglePhoto import FaceSwapper |
|
|
|
|
|
def extract_frames(video_path, frames_dir): |
|
|
if not os.path.exists(frames_dir): |
|
|
os.makedirs(frames_dir) |
|
|
last_idx = -1 |
|
|
else: |
|
|
|
|
|
existing = [f for f in os.listdir(frames_dir) if f.startswith("frame_") and f.endswith(".jpg")] |
|
|
if existing: |
|
|
last_idx = max([int(f.split("_")[1].split(".")[0]) for f in existing]) |
|
|
else: |
|
|
last_idx = -1 |
|
|
|
|
|
cap = cv2.VideoCapture(video_path) |
|
|
frame_paths = [] |
|
|
idx = 0 |
|
|
while True: |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
frame_path = os.path.join(frames_dir, f"frame_{idx:05d}.jpg") |
|
|
if idx > last_idx: |
|
|
cv2.imwrite(frame_path, frame) |
|
|
frame_paths.append(frame_path) |
|
|
idx += 1 |
|
|
cap.release() |
|
|
return frame_paths |
|
|
|
|
|
def frames_to_video(frames_dir, output_video_path, fps): |
|
|
frames = sorted([os.path.join(frames_dir, f) for f in os.listdir(frames_dir) if f.endswith('.jpg')]) |
|
|
if not frames: |
|
|
print("No frames found in directory.") |
|
|
return |
|
|
first_frame = cv2.imread(frames[0]) |
|
|
height, width, layers = first_frame.shape |
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
|
out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) |
|
|
for frame_path in frames: |
|
|
frame = cv2.imread(frame_path) |
|
|
out.write(frame) |
|
|
out.release() |
|
|
|
|
|
def main(): |
|
|
|
|
|
video_path = os.path.join("VideoSwapping", "data_dst.mp4") |
|
|
source_image_path = os.path.join("VideoSwapping", "data_src.jpg") |
|
|
frames_dir = os.path.join("VideoSwapping", "video_frames") |
|
|
swapped_dir = os.path.join("VideoSwapping", "swapped_frames") |
|
|
output_video_path = os.path.join("VideoSwapping", "output_swapped_video.mp4") |
|
|
source_face_idx = 1 |
|
|
|
|
|
|
|
|
while True: |
|
|
try: |
|
|
user_input = input("Enter target_face_idx (default is 1): ").strip() |
|
|
if user_input == "": |
|
|
dest_face_idx = 1 |
|
|
break |
|
|
dest_face_idx = int(user_input) |
|
|
break |
|
|
except ValueError: |
|
|
print("Invalid input. Please enter an integer value.") |
|
|
|
|
|
print("Choose an option:") |
|
|
print("1. Extract frames only") |
|
|
print("2. Face swap only (requires extracted frames)") |
|
|
print("3. Both extract frames and face swap") |
|
|
choice = input("Enter 1, 2, or 3: ").strip() |
|
|
|
|
|
frame_paths = [] |
|
|
if choice == "1" or choice == "3": |
|
|
print("Extracting frames from video (resuming if needed)...") |
|
|
frame_paths = extract_frames(video_path, frames_dir) |
|
|
print(f"Extracted {len(frame_paths)} frames to {frames_dir}.") |
|
|
if choice == "1": |
|
|
return |
|
|
|
|
|
if choice == "2": |
|
|
|
|
|
if not os.path.exists(frames_dir): |
|
|
print("Frames directory does not exist. Please extract frames first.") |
|
|
return |
|
|
frame_paths = sorted([os.path.join(frames_dir, f) for f in os.listdir(frames_dir) if f.endswith('.jpg')]) |
|
|
|
|
|
if choice == "2" or choice == "3": |
|
|
|
|
|
if not os.path.exists(swapped_dir): |
|
|
os.makedirs(swapped_dir) |
|
|
|
|
|
|
|
|
swapper = FaceSwapper() |
|
|
|
|
|
|
|
|
print("Swapping faces on frames...") |
|
|
start_time = time.time() |
|
|
for idx, frame_path in enumerate(frame_paths): |
|
|
frame_name = os.path.basename(frame_path) |
|
|
|
|
|
if frame_name.startswith("frame_"): |
|
|
swapped_name = "swapped_" + frame_name[len("frame_"):] |
|
|
else: |
|
|
swapped_name = "swapped_" + frame_name |
|
|
out_path = os.path.join(swapped_dir, swapped_name) |
|
|
if os.path.exists(out_path): |
|
|
|
|
|
print(f"Frame {idx+1}/{len(frame_paths)} already swapped, skipping...", end='\r') |
|
|
continue |
|
|
try: |
|
|
try: |
|
|
swapped = swapper.swap_faces( |
|
|
source_path=source_image_path, |
|
|
source_face_idx=source_face_idx, |
|
|
target_path=frame_path, |
|
|
target_face_idx=dest_face_idx |
|
|
) |
|
|
except ValueError as ve: |
|
|
if "Target image contains" in str(ve): |
|
|
print(f"\nFrame {idx}: Target face idx {dest_face_idx} not found, trying with idx 1.",end='\r') |
|
|
swapped = swapper.swap_faces( |
|
|
source_path=source_image_path, |
|
|
source_face_idx=source_face_idx, |
|
|
target_path=frame_path, |
|
|
target_face_idx=1 |
|
|
) |
|
|
else: |
|
|
raise ve |
|
|
cv2.imwrite(out_path, swapped) |
|
|
except Exception as e: |
|
|
print(f"\nFrame {idx}: {e}") |
|
|
|
|
|
cv2.imwrite(out_path, cv2.imread(frame_path)) |
|
|
|
|
|
elapsed = time.time() - start_time |
|
|
avg_time = elapsed / (idx + 1) |
|
|
remaining = avg_time * (len(frame_paths) - (idx + 1)) |
|
|
mins, secs = divmod(int(remaining), 60) |
|
|
print(f"Swapping frame {idx+1}/{len(frame_paths)} | Est. time left: {mins:02d}:{secs:02d}", end='\r') |
|
|
print() |
|
|
|
|
|
|
|
|
cap = cv2.VideoCapture(video_path) |
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
cap.release() |
|
|
|
|
|
|
|
|
print("Combining swapped frames into video...") |
|
|
frames_to_video(swapped_dir, output_video_path, fps) |
|
|
print(f"Done! Output video saved as {output_video_path}") |
|
|
|
|
|
|
|
|
answer = input("Do you want to keep the extracted frames and swapped images? (y/n): ").strip().lower() |
|
|
if answer == 'n': |
|
|
try: |
|
|
shutil.rmtree(frames_dir) |
|
|
shutil.rmtree(swapped_dir) |
|
|
print("Temporary folders deleted.") |
|
|
except Exception as e: |
|
|
print(f"Error deleting folders: {e}") |
|
|
else: |
|
|
print("Temporary folders kept.") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |