Spaces:
Sleeping
Sleeping
| import itertools | |
| import shutil | |
| import signal | |
| import sys | |
| import tempfile | |
| # В начало core.py добавим импорты: | |
| import os | |
| import traceback | |
| from facefusion.vision import read_static_image, write_image | |
| from facefusion.face_store import get_reference_faces | |
| from facefusion.face_analyser import get_many_faces, get_one_face | |
| from facefusion.processors.core import get_processors_modules | |
| from facefusion.vision import start_processing, end_processing | |
| from time import time | |
| import numpy | |
| from facefusion import benchmarker, cli_helper, content_analyser, face_classifier, face_detector, face_landmarker, face_masker, face_recognizer, logger, process_manager, state_manager, video_manager, voice_extractor, wording | |
| from facefusion.args import apply_args, collect_job_args, reduce_job_args, reduce_step_args | |
| from facefusion.common_helper import get_first | |
| from facefusion.content_analyser import analyse_image, analyse_video | |
| from facefusion.download import conditional_download_hashes, conditional_download_sources | |
| from facefusion.exit_helper import hard_exit, signal_exit | |
| from facefusion.face_analyser import get_average_face, get_many_faces, get_one_face | |
| from facefusion.face_selector import sort_and_filter_faces | |
| from facefusion.face_store import append_reference_face, clear_reference_faces, get_reference_faces | |
| from facefusion.ffmpeg import copy_image, extract_frames, finalize_image, merge_video, replace_audio, restore_audio | |
| from facefusion.filesystem import filter_audio_paths, get_file_name, is_image, is_video, resolve_file_paths, resolve_file_pattern | |
| from facefusion.jobs import job_helper, job_manager, job_runner | |
| from facefusion.jobs.job_list import compose_job_list | |
| from facefusion.memory import limit_system_memory | |
| from facefusion.processors.core import get_processors_modules | |
| from facefusion.program import create_program | |
| from facefusion.program_helper import validate_args | |
| from facefusion.temp_helper import clear_temp_directory, create_temp_directory, get_temp_file_path, move_temp_file, resolve_temp_frame_paths | |
| from facefusion.types import Args, ErrorCode | |
| from facefusion.vision import pack_resolution, read_image, read_static_images, read_video_frame, restrict_image_resolution, restrict_trim_frame, restrict_video_fps, restrict_video_resolution, unpack_resolution | |
| from datetime import datetime | |
| def cli() -> None: | |
| if pre_check(): | |
| signal.signal(signal.SIGINT, signal_exit) | |
| program = create_program() | |
| if validate_args(program): | |
| args = vars(program.parse_args()) | |
| apply_args(args, state_manager.init_item) | |
| if state_manager.get_item('command'): | |
| logger.init(state_manager.get_item('log_level')) | |
| route(args) | |
| else: | |
| program.print_help() | |
| else: | |
| hard_exit(2) | |
| else: | |
| hard_exit(2) | |
| def route(args : Args) -> None: | |
| system_memory_limit = state_manager.get_item('system_memory_limit') | |
| if system_memory_limit and system_memory_limit > 0: | |
| limit_system_memory(system_memory_limit) | |
| if state_manager.get_item('command') == 'force-download': | |
| error_code = force_download() | |
| return hard_exit(error_code) | |
| if state_manager.get_item('command') == 'benchmark': | |
| if not common_pre_check() or not processors_pre_check() or not benchmarker.pre_check(): | |
| return hard_exit(2) | |
| benchmarker.render() | |
| if state_manager.get_item('command') in [ 'job-list', 'job-create', 'job-submit', 'job-submit-all', 'job-delete', 'job-delete-all', 'job-add-step', 'job-remix-step', 'job-insert-step', 'job-remove-step' ]: | |
| if not job_manager.init_jobs(state_manager.get_item('jobs_path')): | |
| hard_exit(1) | |
| error_code = route_job_manager(args) | |
| hard_exit(error_code) | |
| if state_manager.get_item('command') == 'run': | |
| # Импортируем существующий модуль и наш launcher | |
| import facefusion.uis.core as ui | |
| from facefusion.uis.launcher import launch_ui | |
| # Сохраняем существующие проверки | |
| if not common_pre_check() or not processors_pre_check(): | |
| return hard_exit(2) | |
| for ui_layout in ui.get_ui_layouts_modules(state_manager.get_item('ui_layouts')): | |
| if not ui_layout.pre_check(): | |
| return hard_exit(2) | |
| # Инициализируем базовые компоненты UI | |
| ui.init() | |
| # Вместо ui.launch() используем наш launcher | |
| launch_ui(args) | |
| if state_manager.get_item('command') == 'headless-run': | |
| if not job_manager.init_jobs(state_manager.get_item('jobs_path')): | |
| hard_exit(1) | |
| error_core = process_headless(args) | |
| hard_exit(error_core) | |
| if state_manager.get_item('command') == 'batch-run': | |
| if not job_manager.init_jobs(state_manager.get_item('jobs_path')): | |
| hard_exit(1) | |
| error_core = process_batch(args) | |
| hard_exit(error_core) | |
| if state_manager.get_item('command') in [ 'job-run', 'job-run-all', 'job-retry', 'job-retry-all' ]: | |
| if not job_manager.init_jobs(state_manager.get_item('jobs_path')): | |
| hard_exit(1) | |
| error_code = route_job_runner() | |
| hard_exit(error_code) | |
| def pre_check() -> bool: | |
| if sys.version_info < (3, 10): | |
| logger.error(wording.get('python_not_supported').format(version = '3.10'), __name__) | |
| return False | |
| if not shutil.which('curl'): | |
| logger.error(wording.get('curl_not_installed'), __name__) | |
| return False | |
| if not shutil.which('ffmpeg'): | |
| logger.error(wording.get('ffmpeg_not_installed'), __name__) | |
| return False | |
| return True | |
| def common_pre_check() -> bool: | |
| common_modules =\ | |
| [ | |
| content_analyser, | |
| face_classifier, | |
| face_detector, | |
| face_landmarker, | |
| face_masker, | |
| face_recognizer, | |
| voice_extractor | |
| ] | |
| return all(module.pre_check() for module in common_modules) | |
| def processors_pre_check() -> bool: | |
| for processor_module in get_processors_modules(state_manager.get_item('processors')): | |
| if not processor_module.pre_check(): | |
| return False | |
| return True | |
| def force_download() -> ErrorCode: | |
| common_modules =\ | |
| [ | |
| content_analyser, | |
| face_classifier, | |
| face_detector, | |
| face_landmarker, | |
| face_masker, | |
| face_recognizer, | |
| voice_extractor | |
| ] | |
| available_processors = [ get_file_name(file_path) for file_path in resolve_file_paths('facefusion/processors/modules') ] | |
| processor_modules = get_processors_modules(available_processors) | |
| for module in common_modules + processor_modules: | |
| if hasattr(module, 'create_static_model_set'): | |
| for model in module.create_static_model_set(state_manager.get_item('download_scope')).values(): | |
| model_hash_set = model.get('hashes') | |
| model_source_set = model.get('sources') | |
| if model_hash_set and model_source_set: | |
| if not conditional_download_hashes(model_hash_set) or not conditional_download_sources(model_source_set): | |
| return 1 | |
| return 0 | |
| def route_job_manager(args : Args) -> ErrorCode: | |
| if state_manager.get_item('command') == 'job-list': | |
| job_headers, job_contents = compose_job_list(state_manager.get_item('job_status')) | |
| if job_contents: | |
| cli_helper.render_table(job_headers, job_contents) | |
| return 0 | |
| return 1 | |
| if state_manager.get_item('command') == 'job-create': | |
| if job_manager.create_job(state_manager.get_item('job_id')): | |
| logger.info(wording.get('job_created').format(job_id = state_manager.get_item('job_id')), __name__) | |
| return 0 | |
| logger.error(wording.get('job_not_created').format(job_id = state_manager.get_item('job_id')), __name__) | |
| return 1 | |
| if state_manager.get_item('command') == 'job-submit': | |
| if job_manager.submit_job(state_manager.get_item('job_id')): | |
| logger.info(wording.get('job_submitted').format(job_id = state_manager.get_item('job_id')), __name__) | |
| return 0 | |
| logger.error(wording.get('job_not_submitted').format(job_id = state_manager.get_item('job_id')), __name__) | |
| return 1 | |
| if state_manager.get_item('command') == 'job-submit-all': | |
| if job_manager.submit_jobs(state_manager.get_item('halt_on_error')): | |
| logger.info(wording.get('job_all_submitted'), __name__) | |
| return 0 | |
| logger.error(wording.get('job_all_not_submitted'), __name__) | |
| return 1 | |
| if state_manager.get_item('command') == 'job-delete': | |
| if job_manager.delete_job(state_manager.get_item('job_id')): | |
| logger.info(wording.get('job_deleted').format(job_id = state_manager.get_item('job_id')), __name__) | |
| return 0 | |
| logger.error(wording.get('job_not_deleted').format(job_id = state_manager.get_item('job_id')), __name__) | |
| return 1 | |
| if state_manager.get_item('command') == 'job-delete-all': | |
| if job_manager.delete_jobs(state_manager.get_item('halt_on_error')): | |
| logger.info(wording.get('job_all_deleted'), __name__) | |
| return 0 | |
| logger.error(wording.get('job_all_not_deleted'), __name__) | |
| return 1 | |
| if state_manager.get_item('command') == 'job-add-step': | |
| step_args = reduce_step_args(args) | |
| if job_manager.add_step(state_manager.get_item('job_id'), step_args): | |
| logger.info(wording.get('job_step_added').format(job_id = state_manager.get_item('job_id')), __name__) | |
| return 0 | |
| logger.error(wording.get('job_step_not_added').format(job_id = state_manager.get_item('job_id')), __name__) | |
| return 1 | |
| if state_manager.get_item('command') == 'job-remix-step': | |
| step_args = reduce_step_args(args) | |
| if job_manager.remix_step(state_manager.get_item('job_id'), state_manager.get_item('step_index'), step_args): | |
| logger.info(wording.get('job_remix_step_added').format(job_id = state_manager.get_item('job_id'), step_index = state_manager.get_item('step_index')), __name__) | |
| return 0 | |
| logger.error(wording.get('job_remix_step_not_added').format(job_id = state_manager.get_item('job_id'), step_index = state_manager.get_item('step_index')), __name__) | |
| return 1 | |
| if state_manager.get_item('command') == 'job-insert-step': | |
| step_args = reduce_step_args(args) | |
| if job_manager.insert_step(state_manager.get_item('job_id'), state_manager.get_item('step_index'), step_args): | |
| logger.info(wording.get('job_step_inserted').format(job_id = state_manager.get_item('job_id'), step_index = state_manager.get_item('step_index')), __name__) | |
| return 0 | |
| logger.error(wording.get('job_step_not_inserted').format(job_id = state_manager.get_item('job_id'), step_index = state_manager.get_item('step_index')), __name__) | |
| return 1 | |
| if state_manager.get_item('command') == 'job-remove-step': | |
| if job_manager.remove_step(state_manager.get_item('job_id'), state_manager.get_item('step_index')): | |
| logger.info(wording.get('job_step_removed').format(job_id = state_manager.get_item('job_id'), step_index = state_manager.get_item('step_index')), __name__) | |
| return 0 | |
| logger.error(wording.get('job_step_not_removed').format(job_id = state_manager.get_item('job_id'), step_index = state_manager.get_item('step_index')), __name__) | |
| return 1 | |
| return 1 | |
| def route_job_runner() -> ErrorCode: | |
| if state_manager.get_item('command') == 'job-run': | |
| logger.info(wording.get('running_job').format(job_id = state_manager.get_item('job_id')), __name__) | |
| if job_runner.run_job(state_manager.get_item('job_id'), process_step): | |
| logger.info(wording.get('processing_job_succeed').format(job_id = state_manager.get_item('job_id')), __name__) | |
| return 0 | |
| logger.info(wording.get('processing_job_failed').format(job_id = state_manager.get_item('job_id')), __name__) | |
| return 1 | |
| if state_manager.get_item('command') == 'job-run-all': | |
| logger.info(wording.get('running_jobs'), __name__) | |
| if job_runner.run_jobs(process_step, state_manager.get_item('halt_on_error')): | |
| logger.info(wording.get('processing_jobs_succeed'), __name__) | |
| return 0 | |
| logger.info(wording.get('processing_jobs_failed'), __name__) | |
| return 1 | |
| if state_manager.get_item('command') == 'job-retry': | |
| logger.info(wording.get('retrying_job').format(job_id = state_manager.get_item('job_id')), __name__) | |
| if job_runner.retry_job(state_manager.get_item('job_id'), process_step): | |
| logger.info(wording.get('processing_job_succeed').format(job_id = state_manager.get_item('job_id')), __name__) | |
| return 0 | |
| logger.info(wording.get('processing_job_failed').format(job_id = state_manager.get_item('job_id')), __name__) | |
| return 1 | |
| if state_manager.get_item('command') == 'job-retry-all': | |
| logger.info(wording.get('retrying_jobs'), __name__) | |
| if job_runner.retry_jobs(process_step, state_manager.get_item('halt_on_error')): | |
| logger.info(wording.get('processing_jobs_succeed'), __name__) | |
| return 0 | |
| logger.info(wording.get('processing_jobs_failed'), __name__) | |
| return 1 | |
| return 2 | |
| def process_headless(args : Args) -> ErrorCode: | |
| job_id = job_helper.suggest_job_id('headless') | |
| step_args = reduce_step_args(args) | |
| if job_manager.create_job(job_id) and job_manager.add_step(job_id, step_args) and job_manager.submit_job(job_id) and job_runner.run_job(job_id, process_step): | |
| return 0 | |
| return 1 | |
| def process_batch(args : Args) -> ErrorCode: | |
| job_id = job_helper.suggest_job_id('batch') | |
| step_args = reduce_step_args(args) | |
| job_args = reduce_job_args(args) | |
| source_paths = resolve_file_pattern(job_args.get('source_pattern')) | |
| target_paths = resolve_file_pattern(job_args.get('target_pattern')) | |
| if job_manager.create_job(job_id): | |
| if source_paths and target_paths: | |
| for index, (source_path, target_path) in enumerate(itertools.product(source_paths, target_paths)): | |
| step_args['source_paths'] = [ source_path ] | |
| step_args['target_path'] = target_path | |
| step_args['output_path'] = job_args.get('output_pattern').format(index = index) | |
| if not job_manager.add_step(job_id, step_args): | |
| return 1 | |
| if job_manager.submit_job(job_id) and job_runner.run_job(job_id, process_step): | |
| return 0 | |
| if not source_paths and target_paths: | |
| for index, target_path in enumerate(target_paths): | |
| step_args['target_path'] = target_path | |
| step_args['output_path'] = job_args.get('output_pattern').format(index = index) | |
| if not job_manager.add_step(job_id, step_args): | |
| return 1 | |
| if job_manager.submit_job(job_id) and job_runner.run_job(job_id, process_step): | |
| return 0 | |
| return 1 | |
| def process_step(job_id: str, step_index: int, step_args: Args) -> bool: | |
| start_time = time() | |
| try: | |
| clear_reference_faces() | |
| step_total = job_manager.count_step_total(job_id) | |
| job_args = collect_job_args() | |
| step_args = {**job_args, **step_args} # Дефолты из job_args, затем пользовательские из step_args (step_args имеет приоритет) | |
| apply_args(step_args, state_manager.set_item) | |
| # Явно установите face_selector_index из step_args, если он есть | |
| if 'face_selector_index' in step_args: | |
| state_manager.set_item('face_selector_index', step_args['face_selector_index']) | |
| # Добавлено: получаем текущую дату и время | |
| current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| logger.info(wording.get('processing_step').format(step_current=step_index + 1, step_total=step_total) + f" at {current_time}", __name__) | |
| if common_pre_check() and processors_pre_check(): | |
| error_code = conditional_process() | |
| if error_code == 0: | |
| seconds = '{:.2f}'.format((time() - start_time) % 60) | |
| logger.info(wording.get('processing_image_succeed').format(seconds = seconds), __name__) | |
| return error_code == 0 | |
| else: | |
| return False | |
| except Exception as e: | |
| return False | |
| def conditional_process() -> int: | |
| processors_list = state_manager.get_item('processors') | |
| # print("[DEBUG] Starting conditional_process") | |
| # print(f"[DEBUG] face_selector_index from state_manager: {state_manager.get_item('face_selector_index')}") | |
| try: | |
| if is_image(state_manager.get_item('target_path')): | |
| target_vision_frame = read_static_image(state_manager.get_item('target_path')) | |
| if target_vision_frame is None: | |
| # print("[DEBUG] Failed to read target image") | |
| return 1 | |
| # Получаем source face | |
| source_paths = state_manager.get_item('source_paths') | |
| source_frames = read_static_images(source_paths) | |
| source_faces = get_many_faces(source_frames) | |
| source_face = get_one_face(source_faces) | |
| if not source_face: | |
| # print("[DEBUG] No source face found") | |
| return 1 | |
| # Получаем target faces | |
| target_faces = get_many_faces([target_vision_frame]) | |
| if not target_faces: | |
| # print("[DEBUG] No target faces found") | |
| return 1 | |
| # Сохраняем target faces как reference | |
| clear_reference_faces() | |
| for face in target_faces: | |
| append_reference_face(face=face) | |
| # print(f"[DEBUG] Added {len(target_faces)} reference faces") | |
| # Обрабатываем frame | |
| modified_frame = target_vision_frame.copy() | |
| # print(f"[DEBUG] About to process with processors: {processors_list}") | |
| for processor_module in get_processors_modules(processors_list): | |
| if processor_module.pre_process('output'): | |
| face_selector_index = state_manager.get_item('face_selector_index') or 0 | |
| # print(f"[DEBUG] face_selector_index: {face_selector_index}") | |
| inputs = { | |
| 'reference_faces': get_reference_faces(), # Оставь вызов функции, как было | |
| 'source_face': source_face, | |
| 'target_vision_frame': modified_frame, | |
| 'face_selector_index': face_selector_index, # Добавь это | |
| 'face_selector_mode': state_manager.get_item('face_selector_mode'), # Добавь это | |
| 'face_selector_order': state_manager.get_item('face_selector_order'), # Добавь это | |
| } | |
| # print(f"[DEBUG] Calling process_frame for {processor_module.__name__} with inputs keys: {list(inputs.keys())}") | |
| result = processor_module.process_frame(inputs) | |
| if result is not None: | |
| modified_frame = result | |
| # print(f"[DEBUG] Processor {processor_module.__name__} succeeded") | |
| else: | |
| print(f"[DEBUG] Processor {processor_module.__name__} returned None") | |
| # Добавлено: Читаем и логируем ошибку из файла (если есть) | |
| error_file = os.path.join(tempfile.gettempdir(), 'facefusion_error.txt') | |
| if os.path.exists(error_file): | |
| with open(error_file, 'r') as f: | |
| error_message = f.read().strip() | |
| logger.error(error_message, __name__) # Вывод в TERMINAL UI через logger | |
| os.remove(error_file) # Удаляем файл | |
| # Сохраняем результат | |
| output_path = state_manager.get_item('output_path') | |
| success = write_image(output_path, modified_frame) | |
| # print(f"[DEBUG] Write image success: {success} to {output_path}") | |
| if not success: | |
| return 1 | |
| return 0 | |
| except Exception as e: | |
| # print(f"[DEBUG] Exception in conditional_process: {str(e)}") | |
| traceback.print_exc() | |
| return 1 | |
| def conditional_append_reference_faces() -> None: | |
| if 'reference' in state_manager.get_item('face_selector_mode'): | |
| target_path = state_manager.get_item('target_path') | |
| if is_image(target_path): | |
| target_frame = read_static_image(target_path) | |
| if target_frame is not None: | |
| target_faces = get_many_faces([target_frame]) | |
| if target_faces: | |
| clear_reference_faces() | |
| for face in target_faces: | |
| append_reference_face(face=face) | |
| # process image | |
| def process_image(start_time : float) -> ErrorCode: | |
| if analyse_image(state_manager.get_item('target_path')): | |
| return 3 | |
| logger.debug(wording.get('clearing_temp'), __name__) | |
| clear_temp_directory(state_manager.get_item('target_path')) | |
| logger.debug(wording.get('creating_temp'), __name__) | |
| create_temp_directory(state_manager.get_item('target_path')) | |
| process_manager.start() | |
| temp_image_resolution = pack_resolution(restrict_image_resolution(state_manager.get_item('target_path'), unpack_resolution(state_manager.get_item('output_image_resolution')))) | |
| logger.info(wording.get('copying_image').format(resolution = temp_image_resolution), __name__) | |
| if copy_image(state_manager.get_item('target_path'), temp_image_resolution): | |
| logger.debug(wording.get('copying_image_succeed'), __name__) | |
| else: | |
| logger.error(wording.get('copying_image_failed'), __name__) | |
| process_manager.end() | |
| return 1 | |
| temp_image_path = get_temp_file_path(state_manager.get_item('target_path')) | |
| for processor_module in get_processors_modules(state_manager.get_item('processors')): | |
| logger.info(wording.get('processing'), processor_module.__name__) | |
| processor_module.process_image(state_manager.get_item('source_paths'), temp_image_path, temp_image_path) | |
| processor_module.post_process() | |
| if is_process_stopping(): | |
| process_manager.end() | |
| return 4 | |
| # здесь изменяем | |
| base, _ = os.path.splitext(state_manager.get_item('output_path')) | |
| png_output_path = base + ".png" | |
| state_manager.set_item('output_path', png_output_path) | |
| logger.info(wording.get('finalizing_image').format(resolution = state_manager.get_item('output_image_resolution')), __name__) | |
| # Вместо bool pull возвращаем путь | |
| final_path = finalize_image( | |
| state_manager.get_item('target_path'), | |
| state_manager.get_item('output_path'), | |
| state_manager.get_item('output_image_resolution') | |
| ) | |
| if final_path: | |
| logger.debug(wording.get('finalizing_image_succeed'), __name__) | |
| # Обновляем путь в state_manager, чтобы Gradio GUI увидел новый файл | |
| state_manager.set_item('output_path', final_path) | |
| else: | |
| logger.warn(wording.get('finalizing_image_skipped'), __name__) | |
| logger.debug(wording.get('clearing_temp'), __name__) | |
| clear_temp_directory(state_manager.get_item('target_path')) | |
| # Проверяем существование файла по обновленному пути | |
| if is_image(state_manager.get_item('output_path')): | |
| seconds = '{:.2f}'.format((time() - start_time) % 60) | |
| logger.info(wording.get('processing_image_succeed').format(seconds = seconds), __name__) | |
| else: | |
| logger.error(f"Output file not found: {state_manager.get_item('output_path')}", __name__) | |
| process_manager.end() | |
| return 1 | |
| process_manager.end() | |
| return 0 | |
| def process_video(start_time : float) -> ErrorCode: | |
| trim_frame_start, trim_frame_end = restrict_trim_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end')) | |
| if analyse_video(state_manager.get_item('target_path'), trim_frame_start, trim_frame_end): | |
| return 3 | |
| logger.debug(wording.get('clearing_temp'), __name__) | |
| clear_temp_directory(state_manager.get_item('target_path')) | |
| logger.debug(wording.get('creating_temp'), __name__) | |
| create_temp_directory(state_manager.get_item('target_path')) | |
| process_manager.start() | |
| temp_video_resolution = pack_resolution(restrict_video_resolution(state_manager.get_item('target_path'), unpack_resolution(state_manager.get_item('output_video_resolution')))) | |
| temp_video_fps = restrict_video_fps(state_manager.get_item('target_path'), state_manager.get_item('output_video_fps')) | |
| logger.info(wording.get('extracting_frames').format(resolution = temp_video_resolution, fps = temp_video_fps), __name__) | |
| if extract_frames(state_manager.get_item('target_path'), temp_video_resolution, temp_video_fps, trim_frame_start, trim_frame_end): | |
| logger.debug(wording.get('extracting_frames_succeed'), __name__) | |
| else: | |
| if is_process_stopping(): | |
| process_manager.end() | |
| return 4 | |
| logger.error(wording.get('extracting_frames_failed'), __name__) | |
| process_manager.end() | |
| return 1 | |
| temp_frame_paths = resolve_temp_frame_paths(state_manager.get_item('target_path')) | |
| if temp_frame_paths: | |
| for processor_module in get_processors_modules(state_manager.get_item('processors')): | |
| logger.info(wording.get('processing'), processor_module.__name__) | |
| processor_module.process_video(state_manager.get_item('source_paths'), temp_frame_paths) | |
| processor_module.post_process() | |
| if is_process_stopping(): | |
| return 4 | |
| else: | |
| logger.error(wording.get('temp_frames_not_found'), __name__) | |
| process_manager.end() | |
| return 1 | |
| logger.info(wording.get('merging_video').format(resolution = state_manager.get_item('output_video_resolution'), fps = state_manager.get_item('output_video_fps')), __name__) | |
| if merge_video(state_manager.get_item('target_path'), temp_video_fps, state_manager.get_item('output_video_resolution'), state_manager.get_item('output_video_fps'), trim_frame_start, trim_frame_end): | |
| logger.debug(wording.get('merging_video_succeed'), __name__) | |
| else: | |
| if is_process_stopping(): | |
| process_manager.end() | |
| return 4 | |
| logger.error(wording.get('merging_video_failed'), __name__) | |
| process_manager.end() | |
| return 1 | |
| if state_manager.get_item('output_audio_volume') == 0: | |
| logger.info(wording.get('skipping_audio'), __name__) | |
| move_temp_file(state_manager.get_item('target_path'), state_manager.get_item('output_path')) | |
| else: | |
| source_audio_path = get_first(filter_audio_paths(state_manager.get_item('source_paths'))) | |
| if source_audio_path: | |
| if replace_audio(state_manager.get_item('target_path'), source_audio_path, state_manager.get_item('output_path')): | |
| video_manager.clear_video_pool() | |
| logger.debug(wording.get('replacing_audio_succeed'), __name__) | |
| else: | |
| video_manager.clear_video_pool() | |
| if is_process_stopping(): | |
| process_manager.end() | |
| return 4 | |
| logger.warn(wording.get('replacing_audio_skipped'), __name__) | |
| move_temp_file(state_manager.get_item('target_path'), state_manager.get_item('output_path')) | |
| else: | |
| if restore_audio(state_manager.get_item('target_path'), state_manager.get_item('output_path'), trim_frame_start, trim_frame_end): | |
| video_manager.clear_video_pool() | |
| logger.debug(wording.get('restoring_audio_succeed'), __name__) | |
| else: | |
| video_manager.clear_video_pool() | |
| if is_process_stopping(): | |
| process_manager.end() | |
| return 4 | |
| logger.warn(wording.get('restoring_audio_skipped'), __name__) | |
| move_temp_file(state_manager.get_item('target_path'), state_manager.get_item('output_path')) | |
| logger.debug(wording.get('clearing_temp'), __name__) | |
| clear_temp_directory(state_manager.get_item('target_path')) | |
| if is_video(state_manager.get_item('output_path')): | |
| seconds = '{:.2f}'.format((time() - start_time)) | |
| logger.info(wording.get('processing_video_succeed').format(seconds = seconds), __name__) | |
| else: | |
| logger.error(wording.get('processing_video_failed'), __name__) | |
| process_manager.end() | |
| return 1 | |
| process_manager.end() | |
| return 0 | |
| def is_process_stopping() -> bool: | |
| if process_manager.is_stopping(): | |
| process_manager.end() | |
| logger.info(wording.get('processing_stopped'), __name__) | |
| return process_manager.is_pending() |