Create faceswap_video.py
Browse files- App/faceswap_video.py +132 -0
App/faceswap_video.py
ADDED
|
@@ -0,0 +1,132 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
import glob
|
| 2 |
+
import time
|
| 3 |
+
import shutil
|
| 4 |
+
import insightface
|
| 5 |
+
import warnings
|
| 6 |
+
from insightface.app import FaceAnalysis
|
| 7 |
+
import cv2
|
| 8 |
+
import logging
|
| 9 |
+
import tempfile
|
| 10 |
+
import gradio as gr
|
| 11 |
+
from gradio import Video
|
| 12 |
+
from moviepy.editor import VideoFileClip, ImageSequenceClip
|
| 13 |
+
import os
|
| 14 |
+
from gfpgan import GFPGANer # Import the GFPGAN model
|
| 15 |
+
|
| 16 |
+
assert insightface.__version__ >= '0.7'
|
| 17 |
+
|
| 18 |
+
# Ignore specific warnings
|
| 19 |
+
warnings.filterwarnings('ignore')
|
| 20 |
+
|
| 21 |
+
# Set up logging
|
| 22 |
+
logging.basicConfig(filename='app.log', filemode='w', format='%(name)s - %(levelname)s - %(message)s')
|
| 23 |
+
|
| 24 |
+
# Initialize the FaceAnalysis app and swapper
|
| 25 |
+
app = FaceAnalysis(name='buffalo_l')
|
| 26 |
+
app.prepare(ctx_id=0, det_size=(640, 640))
|
| 27 |
+
swapper = insightface.model_zoo.get_model('buffalo_l/inswapper_128.onnx')
|
| 28 |
+
|
| 29 |
+
# Initialize GFPGAN
|
| 30 |
+
gfpgan_model = GFPGANer(
|
| 31 |
+
model_path='buffalo_l/GFPGANv1.4.pth',
|
| 32 |
+
upscale=1,
|
| 33 |
+
arch='clean',
|
| 34 |
+
channel_multiplier=2,
|
| 35 |
+
device='cuda' # Use GPU for processing
|
| 36 |
+
)
|
| 37 |
+
|
| 38 |
+
def load_and_validate_source_image(source_file):
|
| 39 |
+
img_source = cv2.imread(source_file)
|
| 40 |
+
if img_source is None:
|
| 41 |
+
raise ValueError(f"Could not read source image: {source_file}")
|
| 42 |
+
|
| 43 |
+
faces_source = app.get(img_source)
|
| 44 |
+
if len(faces_source) != 1:
|
| 45 |
+
raise ValueError(f"Source image should have exactly one face: {source_file}")
|
| 46 |
+
|
| 47 |
+
return faces_source
|
| 48 |
+
|
| 49 |
+
def break_video_into_frames(destination_file, temp_dir):
|
| 50 |
+
clip = VideoFileClip(destination_file)
|
| 51 |
+
clip.write_images_sequence(f'{temp_dir}/%04d.png')
|
| 52 |
+
clip.close() # Close the clip after it's no longer needed
|
| 53 |
+
|
| 54 |
+
frames = glob.glob(f'{temp_dir}/*.png')
|
| 55 |
+
frames.sort()
|
| 56 |
+
return frames
|
| 57 |
+
|
| 58 |
+
def process_frames(frames, faces_source, temp_dir):
|
| 59 |
+
for frame_path in frames:
|
| 60 |
+
try:
|
| 61 |
+
img_frame = cv2.imread(frame_path)
|
| 62 |
+
if img_frame is None:
|
| 63 |
+
raise ValueError(f"Could not read frame: {frame_path}")
|
| 64 |
+
|
| 65 |
+
faces_frame = app.get(img_frame)
|
| 66 |
+
res = img_frame.copy()
|
| 67 |
+
for idx, face in enumerate(faces_frame):
|
| 68 |
+
res = swapper.get(res, face, faces_source[0], paste_back=True)
|
| 69 |
+
|
| 70 |
+
# Enhance the image using GFPGAN
|
| 71 |
+
_, _, enhanced_img = gfpgan_model.enhance(res, has_aligned=False, only_center_face=False, paste_back=True)
|
| 72 |
+
|
| 73 |
+
# Overwrite the original frame with the face-swapped and enhanced version
|
| 74 |
+
cv2.imwrite(frame_path, enhanced_img)
|
| 75 |
+
logging.info(f"Face swapping and enhancement completed for frame: {frame_path}")
|
| 76 |
+
except Exception as e:
|
| 77 |
+
logging.error(f"Error processing faces: {e}")
|
| 78 |
+
raise ValueError("Error processing faces!") from e
|
| 79 |
+
|
| 80 |
+
def reassemble_video(frames, destination_file, temp_dir):
|
| 81 |
+
clip = VideoFileClip(destination_file)
|
| 82 |
+
fps = clip.fps
|
| 83 |
+
|
| 84 |
+
# Convert frames to a list of file names
|
| 85 |
+
frames = [f'{temp_dir}/{i:04d}.png' for i in range(len(frames))]
|
| 86 |
+
|
| 87 |
+
new_clip = ImageSequenceClip(frames, fps=fps)
|
| 88 |
+
new_clip = new_clip.set_audio(clip.audio) # Keep the original audio
|
| 89 |
+
output_file = f'images/output/video/output_{int(time.time())}.mp4'
|
| 90 |
+
new_clip.write_videofile(output_file)
|
| 91 |
+
new_clip.close() # Close the new clip after it's no longer needed
|
| 92 |
+
clip.close() # Close the clip after it's no longer needed
|
| 93 |
+
|
| 94 |
+
return output_file
|
| 95 |
+
|
| 96 |
+
def process_faces(source_file, destination_file):
|
| 97 |
+
try:
|
| 98 |
+
faces_source = load_and_validate_source_image(source_file)
|
| 99 |
+
with tempfile.TemporaryDirectory() as temp_dir:
|
| 100 |
+
frames = break_video_into_frames(destination_file, temp_dir)
|
| 101 |
+
if frames is None:
|
| 102 |
+
return None
|
| 103 |
+
process_frames(frames, faces_source, temp_dir)
|
| 104 |
+
output_file = reassemble_video(frames, destination_file, temp_dir)
|
| 105 |
+
return output_file
|
| 106 |
+
except Exception as e:
|
| 107 |
+
logging.error(f"Error processing faces: {e}")
|
| 108 |
+
return None, None
|
| 109 |
+
|
| 110 |
+
def img2video(source_image, destination_video):
|
| 111 |
+
source_file_path = "images/source/source.jpg"
|
| 112 |
+
cv2.imwrite(source_file_path, source_image)
|
| 113 |
+
|
| 114 |
+
# Print the original filename
|
| 115 |
+
print(f"Original filename: {destination_video}")
|
| 116 |
+
|
| 117 |
+
# Change the extension back to .mp4 if necessary
|
| 118 |
+
if not destination_video.endswith(".mp4"):
|
| 119 |
+
new_name = destination_video.rsplit(".", 1)[0] + ".mp4"
|
| 120 |
+
os.rename(destination_video, new_name)
|
| 121 |
+
destination_video = new_name
|
| 122 |
+
|
| 123 |
+
destination_file_path = "images/destination/destination.mp4"
|
| 124 |
+
destination_file_path = destination_file_path.replace("\\", "/")
|
| 125 |
+
shutil.copy(destination_video, destination_file_path)
|
| 126 |
+
|
| 127 |
+
# Delete the original file after copying
|
| 128 |
+
os.remove(destination_video)
|
| 129 |
+
|
| 130 |
+
output_file = process_faces(source_file_path, destination_file_path)
|
| 131 |
+
|
| 132 |
+
return output_file
|