File size: 5,790 Bytes
95dbfc5
deb38be
95dbfc5
 
 
2d64c58
95dbfc5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3ed4b33
 
 
 
 
 
 
 
 
2d64c58
 
 
 
 
 
 
 
 
 
 
 
 
 
39f8e54
 
 
 
 
 
 
2d64c58
 
39f8e54
 
 
3ed4b33
95dbfc5
 
 
 
 
 
 
 
 
 
deb38be
 
95dbfc5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
from typing import Any, List, Callable
import os
import cv2
import insightface
import threading
from huggingface_hub import hf_hub_download

import DeepFakeAI.globals
import DeepFakeAI.processors.frame.core as frame_processors
from DeepFakeAI import wording
from DeepFakeAI.core import update_status
from DeepFakeAI.face_analyser import get_one_face, get_many_faces, find_similar_faces
from DeepFakeAI.face_reference import get_face_reference, set_face_reference
from DeepFakeAI.typing import Face, Frame
from DeepFakeAI.utilities import conditional_download, resolve_relative_path, is_image, is_video

FRAME_PROCESSOR = None
THREAD_LOCK = threading.Lock()
NAME = 'FACEFUSION.FRAME_PROCESSOR.FACE_SWAPPER'


def get_frame_processor() -> Any:
	global FRAME_PROCESSOR

	with THREAD_LOCK:
		if FRAME_PROCESSOR is None:
			# 1) Honor explicit override path if provided via env INSWAPPER_PATH
			override_path = os.environ.get('INSWAPPER_PATH')
			if override_path and os.path.exists(override_path):
				model_path = override_path
			else:
				# 2) Prefer local cached path inside repo
				local_dir = resolve_relative_path('../.assets/models')
				local_model_path = os.path.join(local_dir, 'inswapper_128.onnx')
				if not os.path.exists(local_model_path):
					# Try Hugging Face Hub first to avoid 404s on GitHub mirrors
					token = os.environ.get('TOKEN') or os.environ.get('HF_TOKEN')
					for repo_id in [
						'zihaomu/inswapper_128.onnx',
						'linyi/inswapper_128.onnx',
						'banodoco/inswapper_128.onnx',
					]:
						try:
							model_path = hf_hub_download(repo_id=repo_id, filename='inswapper_128.onnx', token=token)
							break
						except Exception:
							model_path = None  # keep trying
					# If HF Hub failed, try public mirrors as a last resort
					if not model_path:
						conditional_download(local_dir, [
							'https://huggingface.co/zihaomu/inswapper_128.onnx/resolve/main/inswapper_128.onnx',
							'https://huggingface.co/linyi/inswapper_128.onnx/resolve/main/inswapper_128.onnx',
							'https://huggingface.co/banodoco/inswapper_128.onnx/resolve/main/inswapper_128.onnx',
							'https://github.com/facefusion/facefusion-assets/releases/download/models/inswapper_128.onnx'
						])
						model_path = local_model_path
				else:
					model_path = local_model_path
				# export to env for downstream usage
				if model_path and os.path.exists(model_path):
					os.environ['INSWAPPER_PATH'] = model_path
			FRAME_PROCESSOR = insightface.model_zoo.get_model(model_path, providers = DeepFakeAI.globals.execution_providers)
	return FRAME_PROCESSOR


def clear_frame_processor() -> None:
	global FRAME_PROCESSOR

	FRAME_PROCESSOR = None


def pre_check() -> bool:
	# Ensure directory exists; actual download handled in get_frame_processor
	_ = resolve_relative_path('../.assets/models')
	return True


def pre_process() -> bool:
	if not is_image(DeepFakeAI.globals.source_path):
		update_status(wording.get('select_image_source') + wording.get('exclamation_mark'), NAME)
		return False
	elif not get_one_face(cv2.imread(DeepFakeAI.globals.source_path)):
		update_status(wording.get('no_source_face_detected') + wording.get('exclamation_mark'), NAME)
		return False
	if not is_image(DeepFakeAI.globals.target_path) and not is_video(DeepFakeAI.globals.target_path):
		update_status(wording.get('select_image_or_video_target') + wording.get('exclamation_mark'), NAME)
		return False
	return True


def post_process() -> None:
	clear_frame_processor()


def swap_face(source_face : Face, target_face : Face, temp_frame : Frame) -> Frame:
	return get_frame_processor().get(temp_frame, target_face, source_face, paste_back = True)


def process_frame(source_face : Face, reference_face : Face, temp_frame : Frame) -> Frame:
	if 'reference' in DeepFakeAI.globals.face_recognition:
		similar_faces = find_similar_faces(temp_frame, reference_face, DeepFakeAI.globals.reference_face_distance)
		if similar_faces:
			for similar_face in similar_faces:
				temp_frame = swap_face(source_face, similar_face, temp_frame)
	if 'many' in DeepFakeAI.globals.face_recognition:
		many_faces = get_many_faces(temp_frame)
		if many_faces:
			for target_face in many_faces:
				temp_frame = swap_face(source_face, target_face, temp_frame)
	return temp_frame


def process_frames(source_path : str, temp_frame_paths : List[str], update: Callable[[], None]) -> None:
	source_face = get_one_face(cv2.imread(source_path))
	reference_face = get_face_reference() if 'reference' in DeepFakeAI.globals.face_recognition else None
	for temp_frame_path in temp_frame_paths:
		temp_frame = cv2.imread(temp_frame_path)
		result_frame = process_frame(source_face, reference_face, temp_frame)
		cv2.imwrite(temp_frame_path, result_frame)
		if update:
			update()


def process_image(source_path : str, target_path : str, output_path : str) -> None:
	source_face = get_one_face(cv2.imread(source_path))
	target_frame = cv2.imread(target_path)
	reference_face = get_one_face(target_frame, DeepFakeAI.globals.reference_face_position) if 'reference' in DeepFakeAI.globals.face_recognition else None
	result_frame = process_frame(source_face, reference_face, target_frame)
	cv2.imwrite(output_path, result_frame)


def process_video(source_path : str, temp_frame_paths : List[str]) -> None:
	conditional_set_face_reference(temp_frame_paths)
	frame_processors.process_video(source_path, temp_frame_paths, process_frames)


def conditional_set_face_reference(temp_frame_paths : List[str]) -> None:
	if 'reference' in DeepFakeAI.globals.face_recognition and not get_face_reference():
		reference_frame = cv2.imread(temp_frame_paths[DeepFakeAI.globals.reference_frame_number])
		reference_face = get_one_face(reference_frame, DeepFakeAI.globals.reference_face_position)
		set_face_reference(reference_face)