V0pr0S's picture
Initial commit for FaceFusion-Face-Swap-Hyperswap
74f4dcc
import itertools
import shutil
import signal
import sys
import tempfile
# В начало core.py добавим импорты:
import os
import traceback
from facefusion.vision import read_static_image, write_image
from facefusion.face_store import get_reference_faces
from facefusion.face_analyser import get_many_faces, get_one_face
from facefusion.processors.core import get_processors_modules
from facefusion.vision import start_processing, end_processing
from time import time
import numpy
from facefusion import benchmarker, cli_helper, content_analyser, face_classifier, face_detector, face_landmarker, face_masker, face_recognizer, logger, process_manager, state_manager, video_manager, voice_extractor, wording
from facefusion.args import apply_args, collect_job_args, reduce_job_args, reduce_step_args
from facefusion.common_helper import get_first
from facefusion.content_analyser import analyse_image, analyse_video
from facefusion.download import conditional_download_hashes, conditional_download_sources
from facefusion.exit_helper import hard_exit, signal_exit
from facefusion.face_analyser import get_average_face, get_many_faces, get_one_face
from facefusion.face_selector import sort_and_filter_faces
from facefusion.face_store import append_reference_face, clear_reference_faces, get_reference_faces
from facefusion.ffmpeg import copy_image, extract_frames, finalize_image, merge_video, replace_audio, restore_audio
from facefusion.filesystem import filter_audio_paths, get_file_name, is_image, is_video, resolve_file_paths, resolve_file_pattern
from facefusion.jobs import job_helper, job_manager, job_runner
from facefusion.jobs.job_list import compose_job_list
from facefusion.memory import limit_system_memory
from facefusion.processors.core import get_processors_modules
from facefusion.program import create_program
from facefusion.program_helper import validate_args
from facefusion.temp_helper import clear_temp_directory, create_temp_directory, get_temp_file_path, move_temp_file, resolve_temp_frame_paths
from facefusion.types import Args, ErrorCode
from facefusion.vision import pack_resolution, read_image, read_static_images, read_video_frame, restrict_image_resolution, restrict_trim_frame, restrict_video_fps, restrict_video_resolution, unpack_resolution
from datetime import datetime
def cli() -> None:
if pre_check():
signal.signal(signal.SIGINT, signal_exit)
program = create_program()
if validate_args(program):
args = vars(program.parse_args())
apply_args(args, state_manager.init_item)
if state_manager.get_item('command'):
logger.init(state_manager.get_item('log_level'))
route(args)
else:
program.print_help()
else:
hard_exit(2)
else:
hard_exit(2)
def route(args : Args) -> None:
system_memory_limit = state_manager.get_item('system_memory_limit')
if system_memory_limit and system_memory_limit > 0:
limit_system_memory(system_memory_limit)
if state_manager.get_item('command') == 'force-download':
error_code = force_download()
return hard_exit(error_code)
if state_manager.get_item('command') == 'benchmark':
if not common_pre_check() or not processors_pre_check() or not benchmarker.pre_check():
return hard_exit(2)
benchmarker.render()
if state_manager.get_item('command') in [ 'job-list', 'job-create', 'job-submit', 'job-submit-all', 'job-delete', 'job-delete-all', 'job-add-step', 'job-remix-step', 'job-insert-step', 'job-remove-step' ]:
if not job_manager.init_jobs(state_manager.get_item('jobs_path')):
hard_exit(1)
error_code = route_job_manager(args)
hard_exit(error_code)
if state_manager.get_item('command') == 'run':
# Импортируем существующий модуль и наш launcher
import facefusion.uis.core as ui
from facefusion.uis.launcher import launch_ui
# Сохраняем существующие проверки
if not common_pre_check() or not processors_pre_check():
return hard_exit(2)
for ui_layout in ui.get_ui_layouts_modules(state_manager.get_item('ui_layouts')):
if not ui_layout.pre_check():
return hard_exit(2)
# Инициализируем базовые компоненты UI
ui.init()
# Вместо ui.launch() используем наш launcher
launch_ui(args)
if state_manager.get_item('command') == 'headless-run':
if not job_manager.init_jobs(state_manager.get_item('jobs_path')):
hard_exit(1)
error_core = process_headless(args)
hard_exit(error_core)
if state_manager.get_item('command') == 'batch-run':
if not job_manager.init_jobs(state_manager.get_item('jobs_path')):
hard_exit(1)
error_core = process_batch(args)
hard_exit(error_core)
if state_manager.get_item('command') in [ 'job-run', 'job-run-all', 'job-retry', 'job-retry-all' ]:
if not job_manager.init_jobs(state_manager.get_item('jobs_path')):
hard_exit(1)
error_code = route_job_runner()
hard_exit(error_code)
def pre_check() -> bool:
if sys.version_info < (3, 10):
logger.error(wording.get('python_not_supported').format(version = '3.10'), __name__)
return False
if not shutil.which('curl'):
logger.error(wording.get('curl_not_installed'), __name__)
return False
if not shutil.which('ffmpeg'):
logger.error(wording.get('ffmpeg_not_installed'), __name__)
return False
return True
def common_pre_check() -> bool:
common_modules =\
[
content_analyser,
face_classifier,
face_detector,
face_landmarker,
face_masker,
face_recognizer,
voice_extractor
]
return all(module.pre_check() for module in common_modules)
def processors_pre_check() -> bool:
for processor_module in get_processors_modules(state_manager.get_item('processors')):
if not processor_module.pre_check():
return False
return True
def force_download() -> ErrorCode:
common_modules =\
[
content_analyser,
face_classifier,
face_detector,
face_landmarker,
face_masker,
face_recognizer,
voice_extractor
]
available_processors = [ get_file_name(file_path) for file_path in resolve_file_paths('facefusion/processors/modules') ]
processor_modules = get_processors_modules(available_processors)
for module in common_modules + processor_modules:
if hasattr(module, 'create_static_model_set'):
for model in module.create_static_model_set(state_manager.get_item('download_scope')).values():
model_hash_set = model.get('hashes')
model_source_set = model.get('sources')
if model_hash_set and model_source_set:
if not conditional_download_hashes(model_hash_set) or not conditional_download_sources(model_source_set):
return 1
return 0
def route_job_manager(args : Args) -> ErrorCode:
if state_manager.get_item('command') == 'job-list':
job_headers, job_contents = compose_job_list(state_manager.get_item('job_status'))
if job_contents:
cli_helper.render_table(job_headers, job_contents)
return 0
return 1
if state_manager.get_item('command') == 'job-create':
if job_manager.create_job(state_manager.get_item('job_id')):
logger.info(wording.get('job_created').format(job_id = state_manager.get_item('job_id')), __name__)
return 0
logger.error(wording.get('job_not_created').format(job_id = state_manager.get_item('job_id')), __name__)
return 1
if state_manager.get_item('command') == 'job-submit':
if job_manager.submit_job(state_manager.get_item('job_id')):
logger.info(wording.get('job_submitted').format(job_id = state_manager.get_item('job_id')), __name__)
return 0
logger.error(wording.get('job_not_submitted').format(job_id = state_manager.get_item('job_id')), __name__)
return 1
if state_manager.get_item('command') == 'job-submit-all':
if job_manager.submit_jobs(state_manager.get_item('halt_on_error')):
logger.info(wording.get('job_all_submitted'), __name__)
return 0
logger.error(wording.get('job_all_not_submitted'), __name__)
return 1
if state_manager.get_item('command') == 'job-delete':
if job_manager.delete_job(state_manager.get_item('job_id')):
logger.info(wording.get('job_deleted').format(job_id = state_manager.get_item('job_id')), __name__)
return 0
logger.error(wording.get('job_not_deleted').format(job_id = state_manager.get_item('job_id')), __name__)
return 1
if state_manager.get_item('command') == 'job-delete-all':
if job_manager.delete_jobs(state_manager.get_item('halt_on_error')):
logger.info(wording.get('job_all_deleted'), __name__)
return 0
logger.error(wording.get('job_all_not_deleted'), __name__)
return 1
if state_manager.get_item('command') == 'job-add-step':
step_args = reduce_step_args(args)
if job_manager.add_step(state_manager.get_item('job_id'), step_args):
logger.info(wording.get('job_step_added').format(job_id = state_manager.get_item('job_id')), __name__)
return 0
logger.error(wording.get('job_step_not_added').format(job_id = state_manager.get_item('job_id')), __name__)
return 1
if state_manager.get_item('command') == 'job-remix-step':
step_args = reduce_step_args(args)
if job_manager.remix_step(state_manager.get_item('job_id'), state_manager.get_item('step_index'), step_args):
logger.info(wording.get('job_remix_step_added').format(job_id = state_manager.get_item('job_id'), step_index = state_manager.get_item('step_index')), __name__)
return 0
logger.error(wording.get('job_remix_step_not_added').format(job_id = state_manager.get_item('job_id'), step_index = state_manager.get_item('step_index')), __name__)
return 1
if state_manager.get_item('command') == 'job-insert-step':
step_args = reduce_step_args(args)
if job_manager.insert_step(state_manager.get_item('job_id'), state_manager.get_item('step_index'), step_args):
logger.info(wording.get('job_step_inserted').format(job_id = state_manager.get_item('job_id'), step_index = state_manager.get_item('step_index')), __name__)
return 0
logger.error(wording.get('job_step_not_inserted').format(job_id = state_manager.get_item('job_id'), step_index = state_manager.get_item('step_index')), __name__)
return 1
if state_manager.get_item('command') == 'job-remove-step':
if job_manager.remove_step(state_manager.get_item('job_id'), state_manager.get_item('step_index')):
logger.info(wording.get('job_step_removed').format(job_id = state_manager.get_item('job_id'), step_index = state_manager.get_item('step_index')), __name__)
return 0
logger.error(wording.get('job_step_not_removed').format(job_id = state_manager.get_item('job_id'), step_index = state_manager.get_item('step_index')), __name__)
return 1
return 1
def route_job_runner() -> ErrorCode:
if state_manager.get_item('command') == 'job-run':
logger.info(wording.get('running_job').format(job_id = state_manager.get_item('job_id')), __name__)
if job_runner.run_job(state_manager.get_item('job_id'), process_step):
logger.info(wording.get('processing_job_succeed').format(job_id = state_manager.get_item('job_id')), __name__)
return 0
logger.info(wording.get('processing_job_failed').format(job_id = state_manager.get_item('job_id')), __name__)
return 1
if state_manager.get_item('command') == 'job-run-all':
logger.info(wording.get('running_jobs'), __name__)
if job_runner.run_jobs(process_step, state_manager.get_item('halt_on_error')):
logger.info(wording.get('processing_jobs_succeed'), __name__)
return 0
logger.info(wording.get('processing_jobs_failed'), __name__)
return 1
if state_manager.get_item('command') == 'job-retry':
logger.info(wording.get('retrying_job').format(job_id = state_manager.get_item('job_id')), __name__)
if job_runner.retry_job(state_manager.get_item('job_id'), process_step):
logger.info(wording.get('processing_job_succeed').format(job_id = state_manager.get_item('job_id')), __name__)
return 0
logger.info(wording.get('processing_job_failed').format(job_id = state_manager.get_item('job_id')), __name__)
return 1
if state_manager.get_item('command') == 'job-retry-all':
logger.info(wording.get('retrying_jobs'), __name__)
if job_runner.retry_jobs(process_step, state_manager.get_item('halt_on_error')):
logger.info(wording.get('processing_jobs_succeed'), __name__)
return 0
logger.info(wording.get('processing_jobs_failed'), __name__)
return 1
return 2
def process_headless(args : Args) -> ErrorCode:
job_id = job_helper.suggest_job_id('headless')
step_args = reduce_step_args(args)
if job_manager.create_job(job_id) and job_manager.add_step(job_id, step_args) and job_manager.submit_job(job_id) and job_runner.run_job(job_id, process_step):
return 0
return 1
def process_batch(args : Args) -> ErrorCode:
job_id = job_helper.suggest_job_id('batch')
step_args = reduce_step_args(args)
job_args = reduce_job_args(args)
source_paths = resolve_file_pattern(job_args.get('source_pattern'))
target_paths = resolve_file_pattern(job_args.get('target_pattern'))
if job_manager.create_job(job_id):
if source_paths and target_paths:
for index, (source_path, target_path) in enumerate(itertools.product(source_paths, target_paths)):
step_args['source_paths'] = [ source_path ]
step_args['target_path'] = target_path
step_args['output_path'] = job_args.get('output_pattern').format(index = index)
if not job_manager.add_step(job_id, step_args):
return 1
if job_manager.submit_job(job_id) and job_runner.run_job(job_id, process_step):
return 0
if not source_paths and target_paths:
for index, target_path in enumerate(target_paths):
step_args['target_path'] = target_path
step_args['output_path'] = job_args.get('output_pattern').format(index = index)
if not job_manager.add_step(job_id, step_args):
return 1
if job_manager.submit_job(job_id) and job_runner.run_job(job_id, process_step):
return 0
return 1
def process_step(job_id: str, step_index: int, step_args: Args) -> bool:
start_time = time()
try:
clear_reference_faces()
step_total = job_manager.count_step_total(job_id)
job_args = collect_job_args()
step_args = {**job_args, **step_args} # Дефолты из job_args, затем пользовательские из step_args (step_args имеет приоритет)
apply_args(step_args, state_manager.set_item)
# Явно установите face_selector_index из step_args, если он есть
if 'face_selector_index' in step_args:
state_manager.set_item('face_selector_index', step_args['face_selector_index'])
# Добавлено: получаем текущую дату и время
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logger.info(wording.get('processing_step').format(step_current=step_index + 1, step_total=step_total) + f" at {current_time}", __name__)
if common_pre_check() and processors_pre_check():
error_code = conditional_process()
if error_code == 0:
seconds = '{:.2f}'.format((time() - start_time) % 60)
logger.info(wording.get('processing_image_succeed').format(seconds = seconds), __name__)
return error_code == 0
else:
return False
except Exception as e:
return False
def conditional_process() -> int:
processors_list = state_manager.get_item('processors')
# print("[DEBUG] Starting conditional_process")
# print(f"[DEBUG] face_selector_index from state_manager: {state_manager.get_item('face_selector_index')}")
try:
if is_image(state_manager.get_item('target_path')):
target_vision_frame = read_static_image(state_manager.get_item('target_path'))
if target_vision_frame is None:
# print("[DEBUG] Failed to read target image")
return 1
# Получаем source face
source_paths = state_manager.get_item('source_paths')
source_frames = read_static_images(source_paths)
source_faces = get_many_faces(source_frames)
source_face = get_one_face(source_faces)
if not source_face:
# print("[DEBUG] No source face found")
return 1
# Получаем target faces
target_faces = get_many_faces([target_vision_frame])
if not target_faces:
# print("[DEBUG] No target faces found")
return 1
# Сохраняем target faces как reference
clear_reference_faces()
for face in target_faces:
append_reference_face(face=face)
# print(f"[DEBUG] Added {len(target_faces)} reference faces")
# Обрабатываем frame
modified_frame = target_vision_frame.copy()
# print(f"[DEBUG] About to process with processors: {processors_list}")
for processor_module in get_processors_modules(processors_list):
if processor_module.pre_process('output'):
face_selector_index = state_manager.get_item('face_selector_index') or 0
# print(f"[DEBUG] face_selector_index: {face_selector_index}")
inputs = {
'reference_faces': get_reference_faces(), # Оставь вызов функции, как было
'source_face': source_face,
'target_vision_frame': modified_frame,
'face_selector_index': face_selector_index, # Добавь это
'face_selector_mode': state_manager.get_item('face_selector_mode'), # Добавь это
'face_selector_order': state_manager.get_item('face_selector_order'), # Добавь это
}
# print(f"[DEBUG] Calling process_frame for {processor_module.__name__} with inputs keys: {list(inputs.keys())}")
result = processor_module.process_frame(inputs)
if result is not None:
modified_frame = result
# print(f"[DEBUG] Processor {processor_module.__name__} succeeded")
else:
print(f"[DEBUG] Processor {processor_module.__name__} returned None")
# Добавлено: Читаем и логируем ошибку из файла (если есть)
error_file = os.path.join(tempfile.gettempdir(), 'facefusion_error.txt')
if os.path.exists(error_file):
with open(error_file, 'r') as f:
error_message = f.read().strip()
logger.error(error_message, __name__) # Вывод в TERMINAL UI через logger
os.remove(error_file) # Удаляем файл
# Сохраняем результат
output_path = state_manager.get_item('output_path')
success = write_image(output_path, modified_frame)
# print(f"[DEBUG] Write image success: {success} to {output_path}")
if not success:
return 1
return 0
except Exception as e:
# print(f"[DEBUG] Exception in conditional_process: {str(e)}")
traceback.print_exc()
return 1
def conditional_append_reference_faces() -> None:
if 'reference' in state_manager.get_item('face_selector_mode'):
target_path = state_manager.get_item('target_path')
if is_image(target_path):
target_frame = read_static_image(target_path)
if target_frame is not None:
target_faces = get_many_faces([target_frame])
if target_faces:
clear_reference_faces()
for face in target_faces:
append_reference_face(face=face)
# process image
def process_image(start_time : float) -> ErrorCode:
if analyse_image(state_manager.get_item('target_path')):
return 3
logger.debug(wording.get('clearing_temp'), __name__)
clear_temp_directory(state_manager.get_item('target_path'))
logger.debug(wording.get('creating_temp'), __name__)
create_temp_directory(state_manager.get_item('target_path'))
process_manager.start()
temp_image_resolution = pack_resolution(restrict_image_resolution(state_manager.get_item('target_path'), unpack_resolution(state_manager.get_item('output_image_resolution'))))
logger.info(wording.get('copying_image').format(resolution = temp_image_resolution), __name__)
if copy_image(state_manager.get_item('target_path'), temp_image_resolution):
logger.debug(wording.get('copying_image_succeed'), __name__)
else:
logger.error(wording.get('copying_image_failed'), __name__)
process_manager.end()
return 1
temp_image_path = get_temp_file_path(state_manager.get_item('target_path'))
for processor_module in get_processors_modules(state_manager.get_item('processors')):
logger.info(wording.get('processing'), processor_module.__name__)
processor_module.process_image(state_manager.get_item('source_paths'), temp_image_path, temp_image_path)
processor_module.post_process()
if is_process_stopping():
process_manager.end()
return 4
# здесь изменяем
base, _ = os.path.splitext(state_manager.get_item('output_path'))
png_output_path = base + ".png"
state_manager.set_item('output_path', png_output_path)
logger.info(wording.get('finalizing_image').format(resolution = state_manager.get_item('output_image_resolution')), __name__)
# Вместо bool pull возвращаем путь
final_path = finalize_image(
state_manager.get_item('target_path'),
state_manager.get_item('output_path'),
state_manager.get_item('output_image_resolution')
)
if final_path:
logger.debug(wording.get('finalizing_image_succeed'), __name__)
# Обновляем путь в state_manager, чтобы Gradio GUI увидел новый файл
state_manager.set_item('output_path', final_path)
else:
logger.warn(wording.get('finalizing_image_skipped'), __name__)
logger.debug(wording.get('clearing_temp'), __name__)
clear_temp_directory(state_manager.get_item('target_path'))
# Проверяем существование файла по обновленному пути
if is_image(state_manager.get_item('output_path')):
seconds = '{:.2f}'.format((time() - start_time) % 60)
logger.info(wording.get('processing_image_succeed').format(seconds = seconds), __name__)
else:
logger.error(f"Output file not found: {state_manager.get_item('output_path')}", __name__)
process_manager.end()
return 1
process_manager.end()
return 0
def process_video(start_time : float) -> ErrorCode:
trim_frame_start, trim_frame_end = restrict_trim_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end'))
if analyse_video(state_manager.get_item('target_path'), trim_frame_start, trim_frame_end):
return 3
logger.debug(wording.get('clearing_temp'), __name__)
clear_temp_directory(state_manager.get_item('target_path'))
logger.debug(wording.get('creating_temp'), __name__)
create_temp_directory(state_manager.get_item('target_path'))
process_manager.start()
temp_video_resolution = pack_resolution(restrict_video_resolution(state_manager.get_item('target_path'), unpack_resolution(state_manager.get_item('output_video_resolution'))))
temp_video_fps = restrict_video_fps(state_manager.get_item('target_path'), state_manager.get_item('output_video_fps'))
logger.info(wording.get('extracting_frames').format(resolution = temp_video_resolution, fps = temp_video_fps), __name__)
if extract_frames(state_manager.get_item('target_path'), temp_video_resolution, temp_video_fps, trim_frame_start, trim_frame_end):
logger.debug(wording.get('extracting_frames_succeed'), __name__)
else:
if is_process_stopping():
process_manager.end()
return 4
logger.error(wording.get('extracting_frames_failed'), __name__)
process_manager.end()
return 1
temp_frame_paths = resolve_temp_frame_paths(state_manager.get_item('target_path'))
if temp_frame_paths:
for processor_module in get_processors_modules(state_manager.get_item('processors')):
logger.info(wording.get('processing'), processor_module.__name__)
processor_module.process_video(state_manager.get_item('source_paths'), temp_frame_paths)
processor_module.post_process()
if is_process_stopping():
return 4
else:
logger.error(wording.get('temp_frames_not_found'), __name__)
process_manager.end()
return 1
logger.info(wording.get('merging_video').format(resolution = state_manager.get_item('output_video_resolution'), fps = state_manager.get_item('output_video_fps')), __name__)
if merge_video(state_manager.get_item('target_path'), temp_video_fps, state_manager.get_item('output_video_resolution'), state_manager.get_item('output_video_fps'), trim_frame_start, trim_frame_end):
logger.debug(wording.get('merging_video_succeed'), __name__)
else:
if is_process_stopping():
process_manager.end()
return 4
logger.error(wording.get('merging_video_failed'), __name__)
process_manager.end()
return 1
if state_manager.get_item('output_audio_volume') == 0:
logger.info(wording.get('skipping_audio'), __name__)
move_temp_file(state_manager.get_item('target_path'), state_manager.get_item('output_path'))
else:
source_audio_path = get_first(filter_audio_paths(state_manager.get_item('source_paths')))
if source_audio_path:
if replace_audio(state_manager.get_item('target_path'), source_audio_path, state_manager.get_item('output_path')):
video_manager.clear_video_pool()
logger.debug(wording.get('replacing_audio_succeed'), __name__)
else:
video_manager.clear_video_pool()
if is_process_stopping():
process_manager.end()
return 4
logger.warn(wording.get('replacing_audio_skipped'), __name__)
move_temp_file(state_manager.get_item('target_path'), state_manager.get_item('output_path'))
else:
if restore_audio(state_manager.get_item('target_path'), state_manager.get_item('output_path'), trim_frame_start, trim_frame_end):
video_manager.clear_video_pool()
logger.debug(wording.get('restoring_audio_succeed'), __name__)
else:
video_manager.clear_video_pool()
if is_process_stopping():
process_manager.end()
return 4
logger.warn(wording.get('restoring_audio_skipped'), __name__)
move_temp_file(state_manager.get_item('target_path'), state_manager.get_item('output_path'))
logger.debug(wording.get('clearing_temp'), __name__)
clear_temp_directory(state_manager.get_item('target_path'))
if is_video(state_manager.get_item('output_path')):
seconds = '{:.2f}'.format((time() - start_time))
logger.info(wording.get('processing_video_succeed').format(seconds = seconds), __name__)
else:
logger.error(wording.get('processing_video_failed'), __name__)
process_manager.end()
return 1
process_manager.end()
return 0
def is_process_stopping() -> bool:
if process_manager.is_stopping():
process_manager.end()
logger.info(wording.get('processing_stopped'), __name__)
return process_manager.is_pending()