""" Face Swapper HD Professional - Optimized for Hugging Face Spaces Author: AI Assistant Version: 2.2.0 License: MIT """ import os import sys import gc import json import warnings from contextlib import contextmanager from dataclasses import dataclass from enum import Enum from typing import Optional, Tuple, List, Any, Dict, Generator from types import ModuleType # --- PARCHE DE COMPATIBILIDAD TOTAL PARA TORCHVISION (Versión 2026) --- # Este bloque debe ejecutarse antes que cualquier importación de IA try: import torchvision.transforms.functional as F # Creamos el módulo fantasma que las librerías antiguas (GFPGAN/facexlib) buscan mock_ft = ModuleType("torchvision.transforms.functional_tensor") # Inyectamos las funciones críticas para procesamiento profesional mock_ft.rgb_to_grayscale = F.rgb_to_grayscale mock_ft.to_tensor = F.to_tensor mock_ft.normalize = F.normalize mock_ft.convert_image_dtype = F.convert_image_dtype # Lo registramos globalmente en el sistema sys.modules["torchvision.transforms.functional_tensor"] = mock_ft print("✅ Parche de compatibilidad Vision aplicado exitosamente.") except Exception as e: print(f"⚠️ Aviso: No se pudo aplicar el parche: {e}") # ----------------------------------------------------------------------- import cv2 import gradio as gr import numpy as np from pathlib import Path import insightface from insightface.app import FaceAnalysis # Configuración inicial warnings.filterwarnings('ignore') os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' # Logger profesional import logging import time import traceback # ============================================================================ # CONFIGURACIÓN Y CONSTANTES # ============================================================================ class ModelStatus(Enum): """Estado de los modelos.""" NOT_LOADED = "not_loaded" LOADING = "loading" LOADED = "loaded" ERROR = "error" @dataclass class AppConfig: """Configuración de la aplicación.""" model_dir: Path = Path(".") swapper_model: Path = Path("inswapper_128.onnx") enhancer_model: Path = Path("GFPGANv1.4.pth") det_size: Tuple[int, int] = (640, 640) max_image_size: int = 2048 device: str = "cpu" providers: List[str] = None def __post_init__(self): if self.providers is None: self.providers = ['CPUExecutionProvider'] # ============================================================================ # MANEJO DE LOGGING PROFESIONAL # ============================================================================ class ColorFormatter(logging.Formatter): """Formateador de logs con colores.""" COLORS = { 'DEBUG': '\033[36m', # Cyan 'INFO': '\033[32m', # Verde 'WARNING': '\033[33m', # Amarillo 'ERROR': '\033[31m', # Rojo 'CRITICAL': '\033[41m', # Rojo fondo } RESET = '\033[0m' def format(self, record): color = self.COLORS.get(record.levelname, '') record.levelname = f"{color}{record.levelname}{self.RESET}" record.msg = f"{color}{record.msg}{self.RESET}" return super().format(record) def setup_logger(name: str) -> logging.Logger: """Configura un logger profesional.""" logger = logging.getLogger(name) if not logger.handlers: logger.setLevel(logging.INFO) # Handler para consola con colores console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(logging.INFO) console_formatter = ColorFormatter( '%(asctime)s | %(levelname)-8s | %(name)s:%(funcName)s:%(lineno)d - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) console_handler.setFormatter(console_formatter) logger.addHandler(console_handler) # Handler para archivo file_handler = logging.FileHandler('face_swap.log', encoding='utf-8') file_handler.setLevel(logging.DEBUG) file_formatter = logging.Formatter( '%(asctime)s | %(levelname)-8s | %(name)s:%(funcName)s:%(lineno)d - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) file_handler.setFormatter(file_formatter) logger.addHandler(file_handler) return logger logger = setup_logger(__name__) # ============================================================================ # MANEJO DE MODELOS CON PATRÓN SINGLETON # ============================================================================ class ModelManager: """Gestor centralizado de modelos con cache y lazy loading.""" _instance = None _initialized = False def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self): if not self._initialized: self.config = AppConfig() self.status = ModelStatus.NOT_LOADED self._face_analyzer = None self._swapper = None self._enhancer = None self._has_gfpgan = False self._load_times = {} self._initialized = True self._check_gfpgan_availability() def _check_gfpgan_availability(self) -> None: """Verifica disponibilidad de GFPGAN de forma segura.""" try: import importlib.util gfpgan_spec = importlib.util.find_spec("gfpgan") if gfpgan_spec is not None: self._has_gfpgan = True logger.info("GFPGAN está disponible en el sistema") else: logger.warning("GFPGAN no encontrado, mejora HD deshabilitada") except Exception as e: logger.warning(f"Error verificando GFPGAN: {e}") self._has_gfpgan = False @property def has_gfpgan(self) -> bool: """Retorna si GFPGAN está disponible.""" return self._has_gfpgan @contextmanager def timer(self, operation: str) -> Generator[None, None, None]: """Context manager para medir tiempos de operación.""" start_time = time.time() yield elapsed = time.time() - start_time self._load_times[operation] = elapsed logger.info(f"{operation} completado en {elapsed:.2f}s") def validate_models(self) -> Tuple[bool, Dict[str, str]]: """Valida existencia de archivos de modelos.""" missing = {} # Verificar modelo de intercambio if not self.config.swapper_model.exists(): missing['swapper'] = str(self.config.swapper_model) # Verificar GFPGAN si está disponible if self._has_gfpgan and not self.config.enhancer_model.exists(): missing['enhancer'] = str(self.config.enhancer_model) return len(missing) == 0, missing def load_models(self) -> Tuple[bool, str]: """Carga todos los modelos necesarios.""" if self.status == ModelStatus.LOADED: return True, "Modelos ya cargados" if self.status == ModelStatus.LOADING: return False, "Los modelos ya se están cargando" self.status = ModelStatus.LOADING logger.info("Iniciando carga de modelos...") try: # 1. Cargar FaceAnalyzer with self.timer("Carga FaceAnalyzer"): self._face_analyzer = FaceAnalysis( name='buffalo_l', providers=self.config.providers ) self._face_analyzer.prepare( ctx_id=0, det_size=self.config.det_size ) # 2. Cargar modelo de intercambio with self.timer("Carga modelo de intercambio"): self._swapper = insightface.model_zoo.get_model( str(self.config.swapper_model), download=False ) # 3. Cargar GFPGAN si está disponible if self._has_gfpgan and self.config.enhancer_model.exists(): with self.timer("Carga GFPGAN"): from gfpgan import GFPGANer self._enhancer = GFPGANer( model_path=str(self.config.enhancer_model), upscale=1, arch='clean', channel_multiplier=2, bg_upsampler=None, device=self.config.device ) self.status = ModelStatus.LOADED total_time = sum(self._load_times.values()) summary = ( f"✅ Modelos cargados exitosamente en {total_time:.2f}s\n" f" • FaceAnalyzer: {self._load_times.get('Carga FaceAnalyzer', 0):.2f}s\n" f" • Face Swapper: {self._load_times.get('Carga modelo de intercambio', 0):.2f}s\n" f" • GFPGAN: {self._load_times.get('Carga GFPGAN', 0):.2f}s" ) logger.info(summary) return True, summary except Exception as e: self.status = ModelStatus.ERROR logger.error(f"Error cargando modelos: {e}") logger.error(traceback.format_exc()) return False, f"Error al cargar modelos: {str(e)}" def get_models(self) -> Tuple[Any, Any, Any]: """Obtiene instancias de modelos, cargándolos si es necesario.""" if self.status != ModelStatus.LOADED: success, msg = self.load_models() if not success: raise RuntimeError(msg) return self._face_analyzer, self._swapper, self._enhancer def cleanup(self) -> None: """Limpia recursos de modelos.""" logger.info("Limpiando recursos de modelos...") self._face_analyzer = None self._swapper = None self._enhancer = None gc.collect() self.status = ModelStatus.NOT_LOADED # ============================================================================ # PROCESAMIENTO DE IMÁGENES # ============================================================================ class ImageProcessor: """Procesador de imágenes con optimizaciones.""" @staticmethod def validate_image(image: np.ndarray, name: str = "imagen") -> Tuple[bool, str]: """Valida que una imagen sea adecuada para procesamiento.""" if image is None: return False, f"{name}: No proporcionada" if not isinstance(image, np.ndarray): return False, f"{name}: Debe ser numpy array" if len(image.shape) != 3 or image.shape[2] != 3: return False, f"{name}: Formato debe ser (H, W, 3)" h, w = image.shape[:2] if h == 0 or w == 0: return False, f"{name}: Dimensiones inválidas" if h * w > 4096 * 4096: # Límite de 16MP return False, f"{name}: Demasiado grande ({h}x{w})" return True, f"{name}: Válida ({h}x{w})" @staticmethod def resize_if_needed(image: np.ndarray, max_size: int = 2048) -> np.ndarray: """Redimensiona imagen si es muy grande.""" h, w = image.shape[:2] if max(h, w) > max_size: scale = max_size / max(h, w) new_h, new_w = int(h * scale), int(w * scale) logger.info(f"Redimensionando imagen de {w}x{h} a {new_w}x{new_h}") return cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA) return image @staticmethod def convert_color(image: np.ndarray, to_bgr: bool = True) -> np.ndarray: """Convierte entre RGB y BGR.""" if to_bgr: return cv2.cvtColor(image, cv2.COLOR_RGB2BGR) return cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # ============================================================================ # CORE DEL FACE SWAPPER # ============================================================================ class FaceSwapperCore: """Núcleo principal del sistema de intercambio facial.""" def __init__(self): self.model_manager = ModelManager() self.image_processor = ImageProcessor() @contextmanager def process_timer(self, operation: str): """Timer para operaciones de procesamiento.""" start = time.time() yield elapsed = time.time() - start logger.info(f"{operation}: {elapsed:.2f}s") def process( self, source_img: np.ndarray, target_img: np.ndarray, enhance: bool = False ) -> Tuple[Optional[np.ndarray], str]: """Procesa el intercambio facial.""" try: valid, msg = self.image_processor.validate_image(source_img, "Origen") if not valid: return None, f"❌ {msg}" valid, msg = self.image_processor.validate_image(target_img, "Destino") if not valid: return None, f"❌ {msg}" analyzer, swapper, enhancer = self.model_manager.get_models() with self.process_timer("Procesamiento completo"): src_bgr = self.image_processor.convert_color(source_img, to_bgr=True) dst_bgr = self.image_processor.convert_color(target_img, to_bgr=True) src_bgr = self.image_processor.resize_if_needed(src_bgr) dst_bgr = self.image_processor.resize_if_needed(dst_bgr) with self.process_timer("Detección de caras"): src_faces = analyzer.get(src_bgr) dst_faces = analyzer.get(dst_bgr) if not src_faces: return None, "⚠️ No se detectaron caras en la imagen origen" if not dst_faces: return None, "⚠️ No se detectaron caras en la imagen destino" src_face = max(src_faces, key=lambda x: x.det_score) result_img = dst_bgr.copy() with self.process_timer("Intercambio facial"): for i, face in enumerate(dst_faces): result_img = swapper.get(result_img, face, src_face, paste_back=True) if enhance and enhancer is not None: with self.process_timer("Mejora GFPGAN"): try: _, _, result_img = enhancer.enhance( result_img, has_aligned=False, only_center_face=False, paste_back=True ) except Exception as e: logger.warning(f"Error en mejora GFPGAN: {e}") result_rgb = self.image_processor.convert_color(result_img, to_bgr=False) stats = f"✅ Procesamiento completado\n • Caras: {len(dst_faces)}\n • Mejora HD: {'Sí' if enhance and enhancer else 'No'}" return result_rgb, stats except Exception as e: logger.error(f"Error en procesamiento: {e}") logger.error(traceback.format_exc()) return None, f"❌ Error: {str(e)}" # ============================================================================ # INTERFAZ DE USUARIO Y APP # ============================================================================ def create_app() -> gr.Blocks: model_manager = ModelManager() swapper_core = FaceSwapperCore() with gr.Blocks(title="Face Swapper HD Pro", theme=gr.themes.Soft(primary_hue="purple")) as app: gr.Markdown("

🎭 Face Swapper HD Pro

") with gr.Row(): with gr.Column(): source_img = gr.Image(label="Cara Origen", type="numpy") target_img = gr.Image(label="Imagen Destino", type="numpy") enhance = gr.Checkbox(label="✨ Mejora HD (GFPGAN)", value=model_manager.has_gfpgan) process_btn = gr.Button("🚀 Intercambiar Rostros", variant="primary") with gr.Column(): result_img = gr.Image(label="Resultado", type="numpy") status_output = gr.Textbox(label="Estado", interactive=False) stats_output = gr.JSON(label="Métricas") def process_wrapper(source, target, enhance_hd): result, message = swapper_core.process(source, target, enhance_hd) stats = {"status": "success" if result is not None else "error", "message": message} return result, stats, message process_btn.click( fn=process_wrapper, inputs=[source_img, target_img, enhance], outputs=[result_img, stats_output, status_output] ) return app def main(): try: logger.info("🚀 Iniciando Face Swapper HD Pro") port = int(os.environ.get("PORT", 7860)) app = create_app() app.launch( server_name="0.0.0.0", server_port=port, show_error=True ) except Exception as e: logger.error(f"💥 Error fatal: {e}") sys.exit(1) if __name__ == "__main__": main()