File size: 5,790 Bytes
95dbfc5 deb38be 95dbfc5 2d64c58 95dbfc5 3ed4b33 2d64c58 39f8e54 2d64c58 39f8e54 3ed4b33 95dbfc5 deb38be 95dbfc5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
from typing import Any, List, Callable
import os
import cv2
import insightface
import threading
from huggingface_hub import hf_hub_download
import DeepFakeAI.globals
import DeepFakeAI.processors.frame.core as frame_processors
from DeepFakeAI import wording
from DeepFakeAI.core import update_status
from DeepFakeAI.face_analyser import get_one_face, get_many_faces, find_similar_faces
from DeepFakeAI.face_reference import get_face_reference, set_face_reference
from DeepFakeAI.typing import Face, Frame
from DeepFakeAI.utilities import conditional_download, resolve_relative_path, is_image, is_video
FRAME_PROCESSOR = None
THREAD_LOCK = threading.Lock()
NAME = 'FACEFUSION.FRAME_PROCESSOR.FACE_SWAPPER'
def get_frame_processor() -> Any:
global FRAME_PROCESSOR
with THREAD_LOCK:
if FRAME_PROCESSOR is None:
# 1) Honor explicit override path if provided via env INSWAPPER_PATH
override_path = os.environ.get('INSWAPPER_PATH')
if override_path and os.path.exists(override_path):
model_path = override_path
else:
# 2) Prefer local cached path inside repo
local_dir = resolve_relative_path('../.assets/models')
local_model_path = os.path.join(local_dir, 'inswapper_128.onnx')
if not os.path.exists(local_model_path):
# Try Hugging Face Hub first to avoid 404s on GitHub mirrors
token = os.environ.get('TOKEN') or os.environ.get('HF_TOKEN')
for repo_id in [
'zihaomu/inswapper_128.onnx',
'linyi/inswapper_128.onnx',
'banodoco/inswapper_128.onnx',
]:
try:
model_path = hf_hub_download(repo_id=repo_id, filename='inswapper_128.onnx', token=token)
break
except Exception:
model_path = None # keep trying
# If HF Hub failed, try public mirrors as a last resort
if not model_path:
conditional_download(local_dir, [
'https://huggingface.co/zihaomu/inswapper_128.onnx/resolve/main/inswapper_128.onnx',
'https://huggingface.co/linyi/inswapper_128.onnx/resolve/main/inswapper_128.onnx',
'https://huggingface.co/banodoco/inswapper_128.onnx/resolve/main/inswapper_128.onnx',
'https://github.com/facefusion/facefusion-assets/releases/download/models/inswapper_128.onnx'
])
model_path = local_model_path
else:
model_path = local_model_path
# export to env for downstream usage
if model_path and os.path.exists(model_path):
os.environ['INSWAPPER_PATH'] = model_path
FRAME_PROCESSOR = insightface.model_zoo.get_model(model_path, providers = DeepFakeAI.globals.execution_providers)
return FRAME_PROCESSOR
def clear_frame_processor() -> None:
global FRAME_PROCESSOR
FRAME_PROCESSOR = None
def pre_check() -> bool:
# Ensure directory exists; actual download handled in get_frame_processor
_ = resolve_relative_path('../.assets/models')
return True
def pre_process() -> bool:
if not is_image(DeepFakeAI.globals.source_path):
update_status(wording.get('select_image_source') + wording.get('exclamation_mark'), NAME)
return False
elif not get_one_face(cv2.imread(DeepFakeAI.globals.source_path)):
update_status(wording.get('no_source_face_detected') + wording.get('exclamation_mark'), NAME)
return False
if not is_image(DeepFakeAI.globals.target_path) and not is_video(DeepFakeAI.globals.target_path):
update_status(wording.get('select_image_or_video_target') + wording.get('exclamation_mark'), NAME)
return False
return True
def post_process() -> None:
clear_frame_processor()
def swap_face(source_face : Face, target_face : Face, temp_frame : Frame) -> Frame:
return get_frame_processor().get(temp_frame, target_face, source_face, paste_back = True)
def process_frame(source_face : Face, reference_face : Face, temp_frame : Frame) -> Frame:
if 'reference' in DeepFakeAI.globals.face_recognition:
similar_faces = find_similar_faces(temp_frame, reference_face, DeepFakeAI.globals.reference_face_distance)
if similar_faces:
for similar_face in similar_faces:
temp_frame = swap_face(source_face, similar_face, temp_frame)
if 'many' in DeepFakeAI.globals.face_recognition:
many_faces = get_many_faces(temp_frame)
if many_faces:
for target_face in many_faces:
temp_frame = swap_face(source_face, target_face, temp_frame)
return temp_frame
def process_frames(source_path : str, temp_frame_paths : List[str], update: Callable[[], None]) -> None:
source_face = get_one_face(cv2.imread(source_path))
reference_face = get_face_reference() if 'reference' in DeepFakeAI.globals.face_recognition else None
for temp_frame_path in temp_frame_paths:
temp_frame = cv2.imread(temp_frame_path)
result_frame = process_frame(source_face, reference_face, temp_frame)
cv2.imwrite(temp_frame_path, result_frame)
if update:
update()
def process_image(source_path : str, target_path : str, output_path : str) -> None:
source_face = get_one_face(cv2.imread(source_path))
target_frame = cv2.imread(target_path)
reference_face = get_one_face(target_frame, DeepFakeAI.globals.reference_face_position) if 'reference' in DeepFakeAI.globals.face_recognition else None
result_frame = process_frame(source_face, reference_face, target_frame)
cv2.imwrite(output_path, result_frame)
def process_video(source_path : str, temp_frame_paths : List[str]) -> None:
conditional_set_face_reference(temp_frame_paths)
frame_processors.process_video(source_path, temp_frame_paths, process_frames)
def conditional_set_face_reference(temp_frame_paths : List[str]) -> None:
if 'reference' in DeepFakeAI.globals.face_recognition and not get_face_reference():
reference_frame = cv2.imread(temp_frame_paths[DeepFakeAI.globals.reference_frame_number])
reference_face = get_one_face(reference_frame, DeepFakeAI.globals.reference_face_position)
set_face_reference(reference_face)
|