#!/usr/bin/env python3 import os import sys import shutil import argparse import warnings from typing import List import platform import signal import torch import onnxruntime import pathlib from time import time import roop.globals import roop.metadata import roop.utilities as util import roop.util_ffmpeg as ffmpeg from settings import Settings from roop.face_util import extract_face_images from roop.ProcessEntry import ProcessEntry from roop.ProcessMgr import ProcessMgr from roop.ProcessOptions import ProcessOptions from roop.capturer import get_video_frame_total from roop.FaceSet import FaceSet process_mgr = None if 'ROCMExecutionProvider' in roop.globals.execution_providers: del torch warnings.filterwarnings('ignore', category=FutureWarning, module='insightface') warnings.filterwarnings('ignore', category=UserWarning, module='torchvision') def parse_args(): parser = argparse.ArgumentParser(description="Run Roop from the command line") parser.add_argument('--source_path', type=str, required=True, help="Path to the source file") parser.add_argument('--target_path', type=str, required=True, help="Path to the target file") parser.add_argument('--output_path', type=str, required=True, help="Path to save the output file") parser.add_argument('--execution_provider', type=str, default='CPUExecutionProvider', help="Execution provider for ONNX runtime") parser.add_argument('--max_memory', type=int, default=None, help="Max memory to use (in GB)") parser.add_argument('--distance_threshold', type=float, default=0.6, help="Distance threshold for face matching") parser.add_argument('--blend_ratio', type=float, default=0.5, help="Blend ratio for face swapping") parser.add_argument('--face_swap_mode', type=str, default='replace', help="Face swap mode") parser.add_argument('--output_image_format', type=str, default='png', help="Output image format") parser.add_argument('--output_video_format', type=str, default='mp4', help="Output video format") parser.add_argument('--execution_threads', type=int, default=8, help="Number of threads to use for execution") parser.add_argument('--skip_audio', action='store_true', help="Skip audio when processing video") return parser.parse_args() def encode_execution_providers(execution_providers: List[str]) -> List[str]: return [execution_provider.replace('ExecutionProvider', '').lower() for execution_provider in execution_providers] def decode_execution_providers(execution_providers: List[str]) -> List[str]: return [provider for provider, encoded_execution_provider in zip(onnxruntime.get_available_providers(), encode_execution_providers(onnxruntime.get_available_providers())) if any(execution_provider in encoded_execution_provider for execution_provider in execution_providers)] def suggest_max_memory() -> int: if platform.system().lower() == 'darwin': return 4 return 16 def suggest_execution_providers() -> List[str]: return encode_execution_providers(onnxruntime.get_available_providers()) def suggest_execution_threads() -> int: if 'DmlExecutionProvider' in roop.globals.execution_providers: return 1 if 'ROCMExecutionProvider' in roop.globals.execution_providers: return 1 return 8 def limit_resources() -> None: # limit memory usage if roop.globals.max_memory: memory = roop.globals.max_memory * 1024 ** 3 if platform.system().lower() == 'darwin': memory = roop.globals.max_memory * 1024 ** 6 if platform.system().lower() == 'windows': import ctypes kernel32 = ctypes.windll.kernel32 # type: ignore[attr-defined] kernel32.SetProcessWorkingSetSize(-1, ctypes.c_size_t(memory), ctypes.c_size_t(memory)) else: import resource resource.setrlimit(resource.RLIMIT_DATA, (memory, memory)) def release_resources() -> None: import gc global process_mgr if process_mgr is not None: process_mgr.release_resources() process_mgr = None gc.collect() def pre_check() -> bool: if sys.version_info < (3, 9): update_status('Python version is not supported - please upgrade to 3.9 or higher.') return False download_directory_path = util.resolve_relative_path('../models') util.conditional_download(download_directory_path, ['https://huggingface.co/countfloyd/deepfake/resolve/main/inswapper_128.onnx']) util.conditional_download(download_directory_path, ['https://huggingface.co/countfloyd/deepfake/resolve/main/GFPGANv1.4.onnx']) util.conditional_download(download_directory_path, ['https://github.com/csxmli2016/DMDNet/releases/download/v1/DMDNet.pth']) util.conditional_download(download_directory_path, ['https://huggingface.co/countfloyd/deepfake/resolve/main/GPEN-BFR-512.onnx']) util.conditional_download(download_directory_path, ['https://huggingface.co/countfloyd/deepfake/resolve/main/restoreformer_plus_plus.onnx']) util.conditional_download(download_directory_path, ['https://huggingface.co/countfloyd/deepfake/resolve/main/xseg.onnx']) download_directory_path = util.resolve_relative_path('../models/CLIP') util.conditional_download(download_directory_path, ['https://huggingface.co/countfloyd/deepfake/resolve/main/rd64-uni-refined.pth']) download_directory_path = util.resolve_relative_path('../models/CodeFormer') util.conditional_download(download_directory_path, ['https://huggingface.co/countfloyd/deepfake/resolve/main/CodeFormerv0.1.onnx']) download_directory_path = util.resolve_relative_path('../models/Frame') util.conditional_download(download_directory_path, ['https://huggingface.co/countfloyd/deepfake/resolve/main/deoldify_artistic.onnx']) util.conditional_download(download_directory_path, ['https://huggingface.co/countfloyd/deepfake/resolve/main/deoldify_stable.onnx']) util.conditional_download(download_directory_path, ['https://huggingface.co/countfloyd/deepfake/resolve/main/isnet-general-use.onnx']) util.conditional_download(download_directory_path, ['https://huggingface.co/countfloyd/deepfake/resolve/main/real_esrgan_x4.onnx']) util.conditional_download(download_directory_path, ['https://huggingface.co/countfloyd/deepfake/resolve/main/real_esrgan_x2.onnx']) util.conditional_download(download_directory_path, ['https://huggingface.co/countfloyd/deepfake/resolve/main/lsdir_x4.onnx']) if not shutil.which('ffmpeg'): update_status('ffmpeg is not installed.') return True def update_status(message: str) -> None: print(message) def get_processing_plugins(masking_engine): processors = {"faceswap": {}} if masking_engine is not None: processors.update({masking_engine: {}}) if roop.globals.selected_enhancer == 'GFPGAN': processors.update({"gfpgan": {}}) elif roop.globals.selected_enhancer == 'Codeformer': processors.update({"codeformer": {}}) elif roop.globals.selected_enhancer == 'DMDNet': processors.update({"dmdnet": {}}) elif roop.globals.selected_enhancer == 'GPEN': processors.update({"gpen": {}}) elif roop.globals.selected_enhancer == 'Restoreformer++': processors.update({"restoreformer++": {}}) return processors def live_swap(frame, options): global process_mgr if frame is None: return frame if process_mgr is None: process_mgr = ProcessMgr(None) process_mgr.initialize(roop.globals.INPUT_FACESETS, roop.globals.TARGET_FACES, options) newframe = process_mgr.process_frame(frame) if newframe is None: return frame return newframe def batch_process_regular(files: List[ProcessEntry], masking_engine: str, new_clip_text: str, use_new_method, imagemask, num_swap_steps, progress, selected_index=0) -> None: global process_mgr release_resources() limit_resources() if process_mgr is None: process_mgr = ProcessMgr(progress) mask = imagemask["layers"][0] if imagemask is not None else None if len(roop.globals.INPUT_FACESETS) <= selected_index: selected_index = 0 options = ProcessOptions(get_processing_plugins(masking_engine), roop.globals.distance_threshold, roop.globals.blend_ratio, roop.globals.face_swap_mode, selected_index, new_clip_text, mask, num_swap_steps, False) process_mgr.initialize(roop.globals.INPUT_FACESETS, roop.globals.TARGET_FACES, options) batch_process(files, use_new_method) return def batch_process_with_options(files: List[ProcessEntry], options, progress): global process_mgr release_resources() limit_resources() if process_mgr is None: process_mgr = ProcessMgr(progress) process_mgr.initialize(roop.globals.INPUT_FACESETS, roop.globals.TARGET_FACES, options) roop.globals.keep_frames = False roop.globals.wait_after_extraction = False roop.globals.skip_audio = False batch_process(files, True) def batch_process(files: List[ProcessEntry], use_new_method) -> None: global process_mgr roop.globals.processing = True max_threads = suggest_execution_threads() if max_threads == 1: roop.globals.execution_threads = 1 imagefiles: List[ProcessEntry] = [] videofiles: List[ProcessEntry] = [] update_status('Sorting videos/images') for index, f in enumerate(files): fullname = f.filename if util.has_image_extension(fullname): destination = util.get_destfilename_from_path(fullname, roop.globals.output_path, f'.{roop.globals.CFG.output_image_format}') destination = util.replace_template(destination, index=index) pathlib.Path(os.path.dirname(destination)).mkdir(parents=True, exist_ok=True) f.finalname = destination imagefiles.append(f) elif util.is_video(fullname) or util.has_extension(fullname, ['gif']): destination = util.get_destfilename_from_path(fullname, roop.globals.output_path, f'__temp.{roop.globals.CFG.output_video_format}') f.finalname = destination videofiles.append(f) if len(imagefiles) > 0: update_status('Processing image(s)') origimages = [] fakeimages = [] for f in imagefiles: origimages.append(f.filename) fakeimages.append(f.finalname) process_mgr.run_batch(origimages, fakeimages, roop.globals.execution_threads) origimages.clear() fakeimages.clear() if len(videofiles) > 0: for index, v in enumerate(videofiles): if not roop.globals.processing: end_processing('Processing stopped!') return fps = v.fps if v.fps > 0 else util.detect_fps(v.filename) if v.endframe == 0: v.endframe = get_video_frame_total(v.filename) update_status(f'Creating {os.path.basename(v.finalname)} with {fps} FPS...') start_processing = time() if roop.globals.keep_frames or not use_new_method: util.create_temp(v.filename) update_status('Extracting frames...') ffmpeg.extract_frames(v.filename, v.startframe, v.endframe, fps) if not roop.globals.processing: end_processing('Processing stopped!') return temp_frame_paths = util.get_temp_frame_paths(v.filename) process_mgr.run_batch(temp_frame_paths, temp_frame_paths, roop.globals.execution_threads) if not roop.globals.processing: end_processing('Processing stopped!') return if roop.globals.wait_after_extraction: extract_path = os.path.dirname(temp_frame_paths[0]) util.open_folder(extract_path) input("Press any key to continue...") print("Resorting frames to create video") util.sort_rename_frames(extract_path) ffmpeg.create_video(v.filename, v.finalname, fps) if not roop.globals.keep_frames: util.delete_temp_frames(temp_frame_paths[0]) else: if util.has_extension(v.filename, ['gif']): skip_audio = True else: skip_audio = roop.globals.skip_audio process_mgr.run_batch_inmem(v.filename, v.finalname, v.startframe, v.endframe, fps, roop.globals.execution_threads, skip_audio) if not roop.globals.processing: end_processing('Processing stopped!') return video_file_name = v.finalname if os.path.isfile(video_file_name): destination = '' if util.has_extension(v.filename, ['gif']): gifname = util.get_destfilename_from_path(v.filename, roop.globals.output_path, '.gif') destination = util.replace_template(gifname, index=index) pathlib.Path(os.path.dirname(destination)).mkdir(parents=True, exist_ok=True) update_status('Creating final GIF') ffmpeg.create_gif_from_video(video_file_name, destination) if os.path.isfile(destination): os.remove(video_file_name) else: skip_audio = roop.globals.skip_audio destination = util.replace_template(video_file_name, index=index) pathlib.Path(os.path.dirname(destination)).mkdir(parents=True, exist_ok=True) if not skip_audio: ffmpeg.restore_audio(video_file_name, v.filename, v.startframe, v.endframe, destination) if os.path.isfile(destination): os.remove(video_file_name) else: shutil.move(video_file_name, destination) update_status(f'\nProcessing {os.path.basename(destination)} took {time() - start_processing} secs') else: update_status(f'Failed processing {os.path.basename(v.finalname)}!') end_processing('Finished') def end_processing(msg: str): update_status(msg) roop.globals.target_folder_path = None release_resources() def destroy() -> None: if roop.globals.target_path: util.clean_temp(roop.globals.target_path) release_resources() sys.exit() def run() -> None: args = parse_args() roop.globals.source_path = args.source_path roop.globals.target_path = args.target_path roop.globals.output_path = args.output_path roop.globals.execution_providers = decode_execution_providers([args.execution_provider]) roop.globals.max_memory = args.max_memory roop.globals.distance_threshold = args.distance_threshold roop.globals.blend_ratio = args.blend_ratio roop.globals.face_swap_mode = args.face_swap_mode roop.globals.CFG = Settings('config.yaml') roop.globals.execution_threads = args.execution_threads roop.globals.output_image_format = args.output_image_format roop.globals.output_video_format = args.output_video_format roop.globals.skip_audio = args.skip_audio roop.globals.face_swap_mode == 'selected' # Ensure these values are set if not roop.globals.video_encoder: roop.globals.video_encoder = 'libx264' # or another suitable default value if not roop.globals.video_quality: roop.globals.video_quality = 23 # or another suitable default value signal.signal(signal.SIGINT, lambda signal_number, frame: destroy()) if not pre_check(): return # Extract faces from the source and target files and create FaceSet objects source_faces = extract_face_images(args.source_path, (False, 0)) target_faces = extract_face_images(args.target_path, (False, util.has_image_extension(args.target_path))) print("Number of targets faces is ", target_faces.count) if source_faces: source_face_set = FaceSet() for face_data in source_faces: face = face_data[0] face.mask_offsets = (0, 0, 0, 0, 1, 20) source_face_set.faces.append(face) if len(source_face_set.faces) > 1: source_face_set.AverageEmbeddings() roop.globals.INPUT_FACESETS.append(source_face_set) if target_faces: target_face_set = FaceSet() for face_data in target_faces: face = face_data[0] face.mask_offsets = (0, 0, 0, 0, 1, 20) target_face_set.faces.append(face) if len(target_face_set.faces) > 1: target_face_set.AverageEmbeddings() roop.globals.TARGET_FACES.append(target_face_set.faces[0]) # Assuming using the first face for target # Detect fps and endframe values for the source and target videos source_fps = util.detect_fps(args.source_path) source_endframe = get_video_frame_total(args.source_path) target_fps = util.detect_fps(args.target_path) target_endframe = get_video_frame_total(args.target_path) # Initialize ProcessEntry objects using detected values source_entry = ProcessEntry( filename=args.source_path, start=0, end=source_endframe, fps=source_fps ) target_entry = ProcessEntry( filename=args.target_path, start=0, end=target_endframe, fps=target_fps ) files = [source_entry, target_entry] batch_process_regular(files, None, None, False, None, 1, None)