hs / facefusion /vision.py
V0pr0S's picture
Initial commit for FaceFusion-Face-Swap-Hyperswap
74f4dcc
import math
from functools import lru_cache
from typing import List, Optional, Tuple
import cv2
import numpy
from cv2.typing import Size
import facefusion.choices
from facefusion.common_helper import is_windows
from facefusion.filesystem import get_file_extension, is_image, is_video
from facefusion.thread_helper import thread_semaphore
from facefusion.types import Duration, Fps, Orientation, Resolution, VisionFrame
from facefusion.video_manager import get_video_capture
# === ДОБАВЬТЕ ЭТИ ИМПОРТЫ В НАЧАЛО ФАЙЛА ===
from PIL import Image
import tempfile
import os
import traceback
import time
# ==========================================
_png_cache = {} # кэш для хранения путей к PNG файлам
_frame_cache = {} # кэш для фреймов
_is_processing = False # Флаг для отслеживания процесса обработки
def start_processing():
global _is_processing
_is_processing = True
def end_processing():
global _is_processing
_is_processing = False
# Функции для работы с кэшем
def clear_caches():
"""
Очищает все кэши
"""
global _png_cache, _frame_cache
_png_cache.clear()
_frame_cache.clear()
read_static_image.cache_clear() # Очищаем LRU кэш
def get_cached_frame(image_path: str) -> Optional[VisionFrame]:
"""
Получает кэшированный фрейм или читает его заново
"""
if image_path in _frame_cache:
return _frame_cache[image_path].copy()
return None
def cache_frame(image_path: str, frame: VisionFrame) -> None:
"""
Кэширует фрейм
"""
_frame_cache[image_path] = frame.copy()
# Модифицируем read_static_image:
@lru_cache()
def read_static_image(image_path: str) -> Optional[VisionFrame]:
# Сначала проверяем кэш фреймов
cached_frame = get_cached_frame(image_path)
if cached_frame is not None:
print(f"[FaceFusion Cache] Using cached frame for '{os.path.basename(image_path)}'")
return cached_frame
frame = read_image(image_path)
if frame is not None:
cache_frame(image_path, frame)
return frame
def read_static_images(image_paths : List[str]) -> List[VisionFrame]:
frames = []
if image_paths:
for image_path in image_paths:
frames.append(read_static_image(image_path))
return frames
def get_or_create_png(original_path: str) -> str:
"""
Возвращает путь к PNG версии файла, создавая его при необходимости.
"""
if not original_path or not os.path.exists(original_path):
# print(f"[DEBUG] Invalid or non-existent path: {original_path}")
return None
if 'temp.png' in original_path or original_path.lower().endswith('.png'):
return original_path
file_name = os.path.basename(original_path)
try:
# Читаем через PIL сначала
with Image.open(original_path) as img:
if img.mode != 'RGB':
img = img.convert('RGB')
# Создаем временный файл
temp_png_path = os.path.join(
tempfile.gettempdir(),
f'ff_temp_{hash(original_path)}_{os.getpid()}.png'
)
# Сохраняем через PIL
img.save(temp_png_path, "PNG")
print(f"[FaceFusion PNG Converter] Successfully created PNG for '{file_name}'")
# Проверяем результат
if os.path.exists(temp_png_path):
return temp_png_path
except Exception as e:
print(f"[FaceFusion PNG Converter] Error converting '{file_name}': {e}")
# Если PIL не сработал, пробуем через OpenCV
try:
if is_windows():
image_buffer = numpy.fromfile(original_path, dtype=numpy.uint8)
image = cv2.imdecode(image_buffer, cv2.IMREAD_COLOR)
else:
image = cv2.imread(original_path)
if image is not None:
temp_png_path = os.path.join(
tempfile.gettempdir(),
f'ff_temp_{hash(original_path)}_{os.getpid()}_cv.png'
)
cv2.imwrite(temp_png_path, image)
return temp_png_path
except Exception as e:
print(f"[FaceFusion PNG Converter] Error with OpenCV fallback for '{file_name}': {e}")
return None
# Добавим функцию очистки кэша:
def cleanup_png_cache():
"""
Очищает временные PNG файлы и кэш.
"""
for original_path, png_path in _png_cache.items():
if png_path != original_path: # Не удаляем оригинальные PNG файлы
try:
if os.path.exists(png_path):
os.remove(png_path)
print(f"[FaceFusion Cache] Cleaned up cached PNG for '{os.path.basename(original_path)}'")
except Exception as e:
print(f"[FaceFusion Cache] Warning: Could not remove '{png_path}'. Error: {e}")
_png_cache.clear()
#--------------------- end PNG cashing ------
def read_image(image_path: str) -> Optional[VisionFrame]:
"""
Читает изображение из файла.
"""
if not is_image(image_path):
return None
try:
if is_windows():
image_buffer = numpy.fromfile(image_path, dtype=numpy.uint8)
image = cv2.imdecode(image_buffer, cv2.IMREAD_COLOR)
else:
image = cv2.imread(image_path)
if image is None:
# print(f"[DEBUG] Failed to read image: {image_path}")
return None
return image
except Exception as e:
# print(f"[DEBUG] Error reading image: {e}")
return None
def write_image(image_path: str, vision_frame: VisionFrame) -> bool:
# print(f"[DEBUG] Writing image to: {image_path}")
# print(f"[DEBUG] Frame shape: {vision_frame.shape if vision_frame is not None else 'None'}")
if not image_path or vision_frame is None:
# print("[DEBUG] Invalid image path or frame")
return False
try:
if is_windows():
# print("[DEBUG] Writing with Windows method")
image_file_extension = get_file_extension(image_path)
_, encoded_frame = cv2.imencode(image_file_extension, vision_frame)
encoded_frame.tofile(image_path)
else:
# print("[DEBUG] Writing with cv2.imwrite")
cv2.imwrite(image_path, vision_frame)
success = os.path.exists(image_path)
# print(f"[DEBUG] Write success: {success}")
return success
except Exception as e:
# print(f"[DEBUG] Error writing image: {e}")
traceback.print_exc()
return False
def detect_image_resolution(image_path : str) -> Optional[Resolution]:
if is_image(image_path):
image = read_image(image_path)
if image is not None: # Добавлена проверка на None, так как read_image теперь может вернуть None
height, width = image.shape[:2]
if width > 0 and height > 0:
return width, height
return None
def restrict_image_resolution(image_path : str, resolution : Resolution) -> Resolution:
if is_image(image_path):
image_resolution = detect_image_resolution(image_path)
if image_resolution and image_resolution < resolution: # Добавлена проверка на image_resolution
return image_resolution
return resolution
def create_image_resolutions(resolution : Resolution) -> List[str]:
resolutions = []
temp_resolutions = []
if resolution:
width, height = resolution
temp_resolutions.append(normalize_resolution(resolution))
for image_template_size in facefusion.choices.image_template_sizes:
temp_resolutions.append(normalize_resolution((width * image_template_size, height * image_template_size)))
temp_resolutions = sorted(set(temp_resolutions))
for temp_resolution in temp_resolutions:
resolutions.append(pack_resolution(temp_resolution))
return resolutions
def read_video_frame(video_path : str, frame_number : int = 0) -> Optional[VisionFrame]:
if is_video(video_path):
video_capture = get_video_capture(video_path)
if video_capture.isOpened():
frame_total = video_capture.get(cv2.CAP_PROP_FRAME_COUNT)
with thread_semaphore():
video_capture.set(cv2.CAP_PROP_POS_FRAMES, min(frame_total, frame_number - 1))
has_vision_frame, vision_frame = video_capture.read()
if has_vision_frame:
return vision_frame
return None
def count_video_frame_total(video_path : str) -> int:
if is_video(video_path):
video_capture = get_video_capture(video_path)
if video_capture.isOpened():
with thread_semaphore():
video_frame_total = int(video_capture.get(cv2.CAP_PROP_FRAME_COUNT))
return video_frame_total
return 0
def predict_video_frame_total(video_path : str, fps : Fps, trim_frame_start : int, trim_frame_end : int) -> int:
if is_video(video_path):
video_fps = detect_video_fps(video_path)
extract_frame_total = count_trim_frame_total(video_path, trim_frame_start, trim_frame_end) * fps / video_fps
return math.floor(extract_frame_total)
return 0
def detect_video_fps(video_path : str) -> Optional[float]:
if is_video(video_path):
video_capture = get_video_capture(video_path)
if video_capture.isOpened():
with thread_semaphore():
video_fps = video_capture.get(cv2.CAP_PROP_FPS)
return video_fps
return None
def restrict_video_fps(video_path : str, fps : Fps) -> Fps:
if is_video(video_path):
video_fps = detect_video_fps(video_path)
if video_fps < fps:
return video_fps
return fps
def detect_video_duration(video_path : str) -> Duration:
video_frame_total = count_video_frame_total(video_path)
video_fps = detect_video_fps(video_path)
if video_frame_total and video_fps:
return video_frame_total / video_fps
return 0
def count_trim_frame_total(video_path : str, trim_frame_start : Optional[int], trim_frame_end : Optional[int]) -> int:
trim_frame_start, trim_frame_end = restrict_trim_frame(video_path, trim_frame_start, trim_frame_end)
return trim_frame_end - trim_frame_start
def restrict_trim_frame(video_path : str, trim_frame_start : Optional[int], trim_frame_end : Optional[int]) -> Tuple[int, int]:
video_frame_total = count_video_frame_total(video_path)
if isinstance(trim_frame_start, int):
trim_frame_start = max(0, min(trim_frame_start, video_frame_total))
if isinstance(trim_frame_end, int):
trim_frame_end = max(0, min(trim_frame_end, video_frame_total))
if isinstance(trim_frame_start, int) and isinstance(trim_frame_end, int):
return trim_frame_start, trim_frame_end
if isinstance(trim_frame_start, int):
return trim_frame_start, video_frame_total
if isinstance(trim_frame_end, int):
return 0, trim_frame_end
return 0, video_frame_total
def detect_video_resolution(video_path : str) -> Optional[Resolution]:
if is_video(video_path):
video_capture = get_video_capture(video_path)
if video_capture.isOpened():
with thread_semaphore():
width = video_capture.get(cv2.CAP_PROP_FRAME_WIDTH)
height = video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT)
return int(width), int(height)
return None
def restrict_video_resolution(video_path : str, resolution : Resolution) -> Resolution:
if is_video(video_path):
video_resolution = detect_video_resolution(video_path)
if video_resolution < resolution:
return video_resolution
return resolution
def create_video_resolutions(resolution : Resolution) -> List[str]:
resolutions = []
temp_resolutions = []
if resolution:
width, height = resolution
temp_resolutions.append(normalize_resolution(resolution))
for video_template_size in facefusion.choices.video_template_sizes:
if width > height:
temp_resolutions.append(normalize_resolution((video_template_size * width / height, video_template_size)))
else:
temp_resolutions.append(normalize_resolution((video_template_size, video_template_size * height / width)))
temp_resolutions = sorted(set(temp_resolutions))
for temp_resolution in temp_resolutions:
resolutions.append(pack_resolution(temp_resolution))
return resolutions
def normalize_resolution(resolution : Tuple[float, float]) -> Resolution:
width, height = resolution
if width > 0 and height > 0:
normalize_width = round(width / 2) * 2
normalize_height = round(height / 2) * 2
return normalize_width, normalize_height
return 0, 0
def pack_resolution(resolution: Resolution) -> str:
if resolution is None:
# print("[DEBUG] Warning: Received None resolution in pack_resolution")
return "1024x1024" # возвращаем значение по умолчанию вместо "0x0"
try:
width, height = normalize_resolution(resolution)
return str(width) + 'x' + str(height)
except Exception as e:
# print(f"[DEBUG] Error in pack_resolution: {e}")
return "1024x1024" # возвращаем значение по умолчанию
def unpack_resolution(resolution : str) -> Resolution:
width, height = map(int, resolution.split('x'))
return width, height
def detect_frame_orientation(vision_frame : VisionFrame) -> Orientation:
height, width = vision_frame.shape[:2]
if width > height:
return 'landscape'
return 'portrait'
def restrict_frame(vision_frame : VisionFrame, resolution : Resolution) -> VisionFrame:
height, width = vision_frame.shape[:2]
restrict_width, restrict_height = resolution
if height > restrict_height or width > restrict_width:
scale = min(restrict_height / height, restrict_width / width)
new_width = int(width * scale)
new_height = int(height * scale)
return cv2.resize(vision_frame, (new_width, new_height))
return vision_frame
def fit_frame(vision_frame : VisionFrame, resolution: Resolution) -> VisionFrame:
fit_width, fit_height = resolution
height, width = vision_frame.shape[:2]
scale = min(fit_height / height, fit_width / width)
new_width = int(width * scale)
new_height = int(height * scale)
paste_vision_frame = cv2.resize(vision_frame, (new_width, new_height))
x_pad = (fit_width - new_width) // 2
y_pad = (fit_height - new_height) // 2
temp_vision_frame = numpy.pad(paste_vision_frame, ((y_pad, fit_height - new_height - y_pad), (x_pad, fit_width - new_width - x_pad), (0, 0)))
return temp_vision_frame
def normalize_frame_color(vision_frame: VisionFrame) -> VisionFrame:
if vision_frame is None:
# print("[DEBUG] Warning: Received None vision_frame in normalize_frame_color")
return None
try:
return cv2.cvtColor(vision_frame, cv2.COLOR_BGR2RGB)
except Exception as e:
# print(f"[DEBUG] Error in normalize_frame_color: {e}")
return vision_frame
def conditional_match_frame_color(source_vision_frame : VisionFrame, target_vision_frame : VisionFrame) -> VisionFrame:
histogram_factor = calc_histogram_difference(source_vision_frame, target_vision_frame)
target_vision_frame = blend_vision_frames(target_vision_frame, match_frame_color(source_vision_frame, target_vision_frame), histogram_factor)
return target_vision_frame
def match_frame_color(source_vision_frame : VisionFrame, target_vision_frame : VisionFrame) -> VisionFrame:
color_difference_sizes = numpy.linspace(16, target_vision_frame.shape[0], 3, endpoint = False)
for color_difference_size in color_difference_sizes:
source_vision_frame = equalize_frame_color(source_vision_frame, target_vision_frame, normalize_resolution((color_difference_size, color_difference_size)))
target_vision_frame = equalize_frame_color(source_vision_frame, target_vision_frame, target_vision_frame.shape[:2][::-1])
return target_vision_frame
def equalize_frame_color(source_vision_frame : VisionFrame, target_vision_frame : VisionFrame, size : Size) -> VisionFrame:
source_frame_resize = cv2.resize(source_vision_frame, size, interpolation = cv2.INTER_AREA).astype(numpy.float32)
target_frame_resize = cv2.resize(target_vision_frame, size, interpolation = cv2.INTER_AREA).astype(numpy.float32)
color_difference_vision_frame = numpy.subtract(source_frame_resize, target_frame_resize)
color_difference_vision_frame = cv2.resize(color_difference_vision_frame, target_vision_frame.shape[:2][::-1], interpolation = cv2.INTER_CUBIC)
target_vision_frame = numpy.add(target_vision_frame, color_difference_vision_frame).clip(0, 255).astype(numpy.uint8)
return target_vision_frame
def calc_histogram_difference(source_vision_frame : VisionFrame, target_vision_frame : VisionFrame) -> float:
histogram_source = cv2.calcHist([cv2.cvtColor(source_vision_frame, cv2.COLOR_BGR2HSV)], [ 0, 1 ], None, [ 50, 60 ], [ 0, 180, 0, 256 ])
histogram_target = cv2.calcHist([cv2.cvtColor(target_vision_frame, cv2.COLOR_BGR2HSV)], [ 0, 1 ], None, [ 50, 60 ], [ 0, 180, 0, 256 ])
histogram_difference = float(numpy.interp(cv2.compareHist(histogram_source, histogram_target, cv2.HISTCMP_CORREL), [ -1, 1 ], [ 0, 1 ]))
return histogram_difference
def blend_vision_frames(source_vision_frame : VisionFrame, target_vision_frame : VisionFrame, blend_factor : float) -> VisionFrame:
blend_vision_frame = cv2.addWeighted(source_vision_frame, 1 - blend_factor, target_vision_frame, blend_factor, 0)
return blend_vision_frame
def create_tile_frames(vision_frame : VisionFrame, size : Size) -> Tuple[List[VisionFrame], int, int]:
vision_frame = numpy.pad(vision_frame, ((size[1], size[1]), (size[1], size[1]), (0, 0)))
tile_width = size[0] - 2 * size[2]
pad_size_bottom = size[2] + tile_width - vision_frame.shape[0] % tile_width
pad_size_right = size[2] + tile_width - vision_frame.shape[1] % tile_width
pad_vision_frame = numpy.pad(vision_frame, ((size[2], pad_size_bottom), (size[2], pad_size_right), (0, 0)))
pad_height, pad_width = pad_vision_frame.shape[:2]
row_range = range(size[2], pad_height - size[2], tile_width)
col_range = range(size[2], pad_width - size[2], tile_width)
tile_vision_frames = []
for row_vision_frame in row_range:
top = row_vision_frame - size[2]
bottom = row_vision_frame + size[2] + tile_width
for column_vision_frame in col_range:
left = column_vision_frame - size[2]
right = column_vision_frame + size[2] + tile_width
tile_vision_frames.append(pad_vision_frame[top:bottom, left:right, :])
return tile_vision_frames, pad_width, pad_height
def merge_tile_frames(tile_vision_frames : List[VisionFrame], temp_width : int, temp_height : int, pad_width : int, pad_height : int, size : Size) -> VisionFrame:
merge_vision_frame = numpy.zeros((pad_height, pad_width, 3)).astype(numpy.uint8)
tile_width = tile_vision_frames[0].shape[1] - 2 * size[2]
tiles_per_row = min(pad_width // tile_width, len(tile_vision_frames))
for index, tile_vision_frame in enumerate(tile_vision_frames):
tile_vision_frame = tile_vision_frame[size[2]:-size[2], size[2]:-size[2]]
row_index = index // tiles_per_row
col_index = index % tiles_per_row
top = row_index * tile_vision_frame.shape[0]
bottom = top + tile_vision_frame.shape[0]
left = col_index * tile_vision_frame.shape[1]
right = left + tile_vision_frame.shape[1]
merge_vision_frame[top:bottom, left:right, :] = tile_vision_frame
merge_vision_frame = merge_vision_frame[size[1] : size[1] + temp_height, size[1]: size[1] + temp_width, :]
return merge_vision_frame