| import os |
| import sys |
| import importlib |
| import psutil |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| from queue import Queue |
| from types import ModuleType |
| from typing import Any, List, Callable |
| from tqdm import tqdm |
|
|
| import DeepFakeAI.globals |
| from DeepFakeAI import wording |
|
|
| FRAME_PROCESSORS_MODULES : List[ModuleType] = [] |
| FRAME_PROCESSORS_METHODS =\ |
| [ |
| 'get_frame_processor', |
| 'clear_frame_processor', |
| 'pre_check', |
| 'pre_process', |
| 'process_frame', |
| 'process_frames', |
| 'process_image', |
| 'process_video', |
| 'post_process' |
| ] |
|
|
|
|
| def load_frame_processor_module(frame_processor : str) -> Any: |
| try: |
| frame_processor_module = importlib.import_module('DeepFakeAI.processors.frame.modules.' + frame_processor) |
| for method_name in FRAME_PROCESSORS_METHODS: |
| if not hasattr(frame_processor_module, method_name): |
| raise NotImplementedError |
| except ModuleNotFoundError: |
| sys.exit(wording.get('frame_processor_not_loaded').format(frame_processor = frame_processor)) |
| except NotImplementedError: |
| sys.exit(wording.get('frame_processor_not_implemented').format(frame_processor = frame_processor)) |
| return frame_processor_module |
|
|
|
|
| def get_frame_processors_modules(frame_processors : List[str]) -> List[ModuleType]: |
| global FRAME_PROCESSORS_MODULES |
|
|
| if not FRAME_PROCESSORS_MODULES: |
| for frame_processor in frame_processors: |
| frame_processor_module = load_frame_processor_module(frame_processor) |
| FRAME_PROCESSORS_MODULES.append(frame_processor_module) |
| return FRAME_PROCESSORS_MODULES |
|
|
|
|
| def clear_frame_processors_modules() -> None: |
| global FRAME_PROCESSORS_MODULES |
|
|
| for frame_processor_module in get_frame_processors_modules(DeepFakeAI.globals.frame_processors): |
| frame_processor_module.clear_frame_processor() |
| FRAME_PROCESSORS_MODULES = [] |
|
|
|
|
| def multi_process_frame(source_path : str, temp_frame_paths : List[str], process_frames: Callable[[str, List[str], Any], None], update: Callable[[], None]) -> None: |
| with ThreadPoolExecutor(max_workers = DeepFakeAI.globals.execution_thread_count) as executor: |
| futures = [] |
| queue = create_queue(temp_frame_paths) |
| queue_per_future = max(len(temp_frame_paths) // DeepFakeAI.globals.execution_thread_count * DeepFakeAI.globals.execution_queue_count, 1) |
| while not queue.empty(): |
| future = executor.submit(process_frames, source_path, pick_queue(queue, queue_per_future), update) |
| futures.append(future) |
| for future in as_completed(futures): |
| future.result() |
|
|
|
|
| def create_queue(temp_frame_paths : List[str]) -> Queue[str]: |
| queue: Queue[str] = Queue() |
| for frame_path in temp_frame_paths: |
| queue.put(frame_path) |
| return queue |
|
|
|
|
| def pick_queue(queue : Queue[str], queue_per_future : int) -> List[str]: |
| queues = [] |
| for _ in range(queue_per_future): |
| if not queue.empty(): |
| queues.append(queue.get()) |
| return queues |
|
|
|
|
| def process_video(source_path : str, frame_paths : List[str], process_frames : Callable[[str, List[str], Any], None]) -> None: |
| progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]' |
| total = len(frame_paths) |
| with tqdm(total = total, desc = wording.get('processing'), unit = 'frame', dynamic_ncols = True, bar_format = progress_bar_format) as progress: |
| multi_process_frame(source_path, frame_paths, process_frames, lambda: update_progress(progress)) |
|
|
|
|
| def update_progress(progress : Any = None) -> None: |
| process = psutil.Process(os.getpid()) |
| memory_usage = process.memory_info().rss / 1024 / 1024 / 1024 |
| progress.set_postfix( |
| { |
| 'memory_usage': '{:.2f}'.format(memory_usage).zfill(5) + 'GB', |
| 'execution_providers': DeepFakeAI.globals.execution_providers, |
| 'execution_thread_count': DeepFakeAI.globals.execution_thread_count, |
| 'execution_queue_count': DeepFakeAI.globals.execution_queue_count |
| }) |
| progress.refresh() |
| progress.update(1) |
|
|
|
|
| def get_device() -> str: |
| if 'CUDAExecutionProvider' in DeepFakeAI.globals.execution_providers: |
| return 'cuda' |
| if 'CoreMLExecutionProvider' in DeepFakeAI.globals.execution_providers: |
| return 'mps' |
| return 'cpu' |
|
|