Spaces:
Sleeping
Sleeping
| """ | |
| Face Swapper HD Professional - Optimized for Hugging Face Spaces | |
| Author: AI Assistant | |
| Version: 2.2.0 | |
| License: MIT | |
| """ | |
| import os | |
| import sys | |
| import gc | |
| import json | |
| import warnings | |
| from contextlib import contextmanager | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| from typing import Optional, Tuple, List, Any, Dict, Generator | |
| from types import ModuleType | |
| # --- PARCHE DE COMPATIBILIDAD TOTAL PARA TORCHVISION (Versión 2026) --- | |
| # Este bloque debe ejecutarse antes que cualquier importación de IA | |
| try: | |
| import torchvision.transforms.functional as F | |
| # Creamos el módulo fantasma que las librerías antiguas (GFPGAN/facexlib) buscan | |
| mock_ft = ModuleType("torchvision.transforms.functional_tensor") | |
| # Inyectamos las funciones críticas para procesamiento profesional | |
| mock_ft.rgb_to_grayscale = F.rgb_to_grayscale | |
| mock_ft.to_tensor = F.to_tensor | |
| mock_ft.normalize = F.normalize | |
| mock_ft.convert_image_dtype = F.convert_image_dtype | |
| # Lo registramos globalmente en el sistema | |
| sys.modules["torchvision.transforms.functional_tensor"] = mock_ft | |
| print("✅ Parche de compatibilidad Vision aplicado exitosamente.") | |
| except Exception as e: | |
| print(f"⚠️ Aviso: No se pudo aplicar el parche: {e}") | |
| # ----------------------------------------------------------------------- | |
| import cv2 | |
| import gradio as gr | |
| import numpy as np | |
| from pathlib import Path | |
| import insightface | |
| from insightface.app import FaceAnalysis | |
| # Configuración inicial | |
| warnings.filterwarnings('ignore') | |
| os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' | |
| # Logger profesional | |
| import logging | |
| import time | |
| import traceback | |
| # ============================================================================ | |
| # CONFIGURACIÓN Y CONSTANTES | |
| # ============================================================================ | |
| class ModelStatus(Enum): | |
| """Estado de los modelos.""" | |
| NOT_LOADED = "not_loaded" | |
| LOADING = "loading" | |
| LOADED = "loaded" | |
| ERROR = "error" | |
| class AppConfig: | |
| """Configuración de la aplicación.""" | |
| model_dir: Path = Path(".") | |
| swapper_model: Path = Path("inswapper_128.onnx") | |
| enhancer_model: Path = Path("GFPGANv1.4.pth") | |
| det_size: Tuple[int, int] = (640, 640) | |
| max_image_size: int = 2048 | |
| device: str = "cpu" | |
| providers: List[str] = None | |
| def __post_init__(self): | |
| if self.providers is None: | |
| self.providers = ['CPUExecutionProvider'] | |
| # ============================================================================ | |
| # MANEJO DE LOGGING PROFESIONAL | |
| # ============================================================================ | |
| class ColorFormatter(logging.Formatter): | |
| """Formateador de logs con colores.""" | |
| COLORS = { | |
| 'DEBUG': '\033[36m', # Cyan | |
| 'INFO': '\033[32m', # Verde | |
| 'WARNING': '\033[33m', # Amarillo | |
| 'ERROR': '\033[31m', # Rojo | |
| 'CRITICAL': '\033[41m', # Rojo fondo | |
| } | |
| RESET = '\033[0m' | |
| def format(self, record): | |
| color = self.COLORS.get(record.levelname, '') | |
| record.levelname = f"{color}{record.levelname}{self.RESET}" | |
| record.msg = f"{color}{record.msg}{self.RESET}" | |
| return super().format(record) | |
| def setup_logger(name: str) -> logging.Logger: | |
| """Configura un logger profesional.""" | |
| logger = logging.getLogger(name) | |
| if not logger.handlers: | |
| logger.setLevel(logging.INFO) | |
| # Handler para consola con colores | |
| console_handler = logging.StreamHandler(sys.stdout) | |
| console_handler.setLevel(logging.INFO) | |
| console_formatter = ColorFormatter( | |
| '%(asctime)s | %(levelname)-8s | %(name)s:%(funcName)s:%(lineno)d - %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| console_handler.setFormatter(console_formatter) | |
| logger.addHandler(console_handler) | |
| # Handler para archivo | |
| file_handler = logging.FileHandler('face_swap.log', encoding='utf-8') | |
| file_handler.setLevel(logging.DEBUG) | |
| file_formatter = logging.Formatter( | |
| '%(asctime)s | %(levelname)-8s | %(name)s:%(funcName)s:%(lineno)d - %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| file_handler.setFormatter(file_formatter) | |
| logger.addHandler(file_handler) | |
| return logger | |
| logger = setup_logger(__name__) | |
| # ============================================================================ | |
| # MANEJO DE MODELOS CON PATRÓN SINGLETON | |
| # ============================================================================ | |
| class ModelManager: | |
| """Gestor centralizado de modelos con cache y lazy loading.""" | |
| _instance = None | |
| _initialized = False | |
| def __new__(cls): | |
| if cls._instance is None: | |
| cls._instance = super().__new__(cls) | |
| return cls._instance | |
| def __init__(self): | |
| if not self._initialized: | |
| self.config = AppConfig() | |
| self.status = ModelStatus.NOT_LOADED | |
| self._face_analyzer = None | |
| self._swapper = None | |
| self._enhancer = None | |
| self._has_gfpgan = False | |
| self._load_times = {} | |
| self._initialized = True | |
| self._check_gfpgan_availability() | |
| def _check_gfpgan_availability(self) -> None: | |
| """Verifica disponibilidad de GFPGAN de forma segura.""" | |
| try: | |
| import importlib.util | |
| gfpgan_spec = importlib.util.find_spec("gfpgan") | |
| if gfpgan_spec is not None: | |
| self._has_gfpgan = True | |
| logger.info("GFPGAN está disponible en el sistema") | |
| else: | |
| logger.warning("GFPGAN no encontrado, mejora HD deshabilitada") | |
| except Exception as e: | |
| logger.warning(f"Error verificando GFPGAN: {e}") | |
| self._has_gfpgan = False | |
| def has_gfpgan(self) -> bool: | |
| """Retorna si GFPGAN está disponible.""" | |
| return self._has_gfpgan | |
| def timer(self, operation: str) -> Generator[None, None, None]: | |
| """Context manager para medir tiempos de operación.""" | |
| start_time = time.time() | |
| yield | |
| elapsed = time.time() - start_time | |
| self._load_times[operation] = elapsed | |
| logger.info(f"{operation} completado en {elapsed:.2f}s") | |
| def validate_models(self) -> Tuple[bool, Dict[str, str]]: | |
| """Valida existencia de archivos de modelos.""" | |
| missing = {} | |
| # Verificar modelo de intercambio | |
| if not self.config.swapper_model.exists(): | |
| missing['swapper'] = str(self.config.swapper_model) | |
| # Verificar GFPGAN si está disponible | |
| if self._has_gfpgan and not self.config.enhancer_model.exists(): | |
| missing['enhancer'] = str(self.config.enhancer_model) | |
| return len(missing) == 0, missing | |
| def load_models(self) -> Tuple[bool, str]: | |
| """Carga todos los modelos necesarios.""" | |
| if self.status == ModelStatus.LOADED: | |
| return True, "Modelos ya cargados" | |
| if self.status == ModelStatus.LOADING: | |
| return False, "Los modelos ya se están cargando" | |
| self.status = ModelStatus.LOADING | |
| logger.info("Iniciando carga de modelos...") | |
| try: | |
| # 1. Cargar FaceAnalyzer | |
| with self.timer("Carga FaceAnalyzer"): | |
| self._face_analyzer = FaceAnalysis( | |
| name='buffalo_l', | |
| providers=self.config.providers | |
| ) | |
| self._face_analyzer.prepare( | |
| ctx_id=0, | |
| det_size=self.config.det_size | |
| ) | |
| # 2. Cargar modelo de intercambio | |
| with self.timer("Carga modelo de intercambio"): | |
| self._swapper = insightface.model_zoo.get_model( | |
| str(self.config.swapper_model), | |
| download=False | |
| ) | |
| # 3. Cargar GFPGAN si está disponible | |
| if self._has_gfpgan and self.config.enhancer_model.exists(): | |
| with self.timer("Carga GFPGAN"): | |
| from gfpgan import GFPGANer | |
| self._enhancer = GFPGANer( | |
| model_path=str(self.config.enhancer_model), | |
| upscale=1, | |
| arch='clean', | |
| channel_multiplier=2, | |
| bg_upsampler=None, | |
| device=self.config.device | |
| ) | |
| self.status = ModelStatus.LOADED | |
| total_time = sum(self._load_times.values()) | |
| summary = ( | |
| f"✅ Modelos cargados exitosamente en {total_time:.2f}s\n" | |
| f" • FaceAnalyzer: {self._load_times.get('Carga FaceAnalyzer', 0):.2f}s\n" | |
| f" • Face Swapper: {self._load_times.get('Carga modelo de intercambio', 0):.2f}s\n" | |
| f" • GFPGAN: {self._load_times.get('Carga GFPGAN', 0):.2f}s" | |
| ) | |
| logger.info(summary) | |
| return True, summary | |
| except Exception as e: | |
| self.status = ModelStatus.ERROR | |
| logger.error(f"Error cargando modelos: {e}") | |
| logger.error(traceback.format_exc()) | |
| return False, f"Error al cargar modelos: {str(e)}" | |
| def get_models(self) -> Tuple[Any, Any, Any]: | |
| """Obtiene instancias de modelos, cargándolos si es necesario.""" | |
| if self.status != ModelStatus.LOADED: | |
| success, msg = self.load_models() | |
| if not success: | |
| raise RuntimeError(msg) | |
| return self._face_analyzer, self._swapper, self._enhancer | |
| def cleanup(self) -> None: | |
| """Limpia recursos de modelos.""" | |
| logger.info("Limpiando recursos de modelos...") | |
| self._face_analyzer = None | |
| self._swapper = None | |
| self._enhancer = None | |
| gc.collect() | |
| self.status = ModelStatus.NOT_LOADED | |
| # ============================================================================ | |
| # PROCESAMIENTO DE IMÁGENES | |
| # ============================================================================ | |
| class ImageProcessor: | |
| """Procesador de imágenes con optimizaciones.""" | |
| def validate_image(image: np.ndarray, name: str = "imagen") -> Tuple[bool, str]: | |
| """Valida que una imagen sea adecuada para procesamiento.""" | |
| if image is None: | |
| return False, f"{name}: No proporcionada" | |
| if not isinstance(image, np.ndarray): | |
| return False, f"{name}: Debe ser numpy array" | |
| if len(image.shape) != 3 or image.shape[2] != 3: | |
| return False, f"{name}: Formato debe ser (H, W, 3)" | |
| h, w = image.shape[:2] | |
| if h == 0 or w == 0: | |
| return False, f"{name}: Dimensiones inválidas" | |
| if h * w > 4096 * 4096: # Límite de 16MP | |
| return False, f"{name}: Demasiado grande ({h}x{w})" | |
| return True, f"{name}: Válida ({h}x{w})" | |
| def resize_if_needed(image: np.ndarray, max_size: int = 2048) -> np.ndarray: | |
| """Redimensiona imagen si es muy grande.""" | |
| h, w = image.shape[:2] | |
| if max(h, w) > max_size: | |
| scale = max_size / max(h, w) | |
| new_h, new_w = int(h * scale), int(w * scale) | |
| logger.info(f"Redimensionando imagen de {w}x{h} a {new_w}x{new_h}") | |
| return cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA) | |
| return image | |
| def convert_color(image: np.ndarray, to_bgr: bool = True) -> np.ndarray: | |
| """Convierte entre RGB y BGR.""" | |
| if to_bgr: | |
| return cv2.cvtColor(image, cv2.COLOR_RGB2BGR) | |
| return cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
| # ============================================================================ | |
| # CORE DEL FACE SWAPPER | |
| # ============================================================================ | |
| class FaceSwapperCore: | |
| """Núcleo principal del sistema de intercambio facial.""" | |
| def __init__(self): | |
| self.model_manager = ModelManager() | |
| self.image_processor = ImageProcessor() | |
| def process_timer(self, operation: str): | |
| """Timer para operaciones de procesamiento.""" | |
| start = time.time() | |
| yield | |
| elapsed = time.time() - start | |
| logger.info(f"{operation}: {elapsed:.2f}s") | |
| def process( | |
| self, | |
| source_img: np.ndarray, | |
| target_img: np.ndarray, | |
| enhance: bool = False | |
| ) -> Tuple[Optional[np.ndarray], str]: | |
| """Procesa el intercambio facial.""" | |
| try: | |
| valid, msg = self.image_processor.validate_image(source_img, "Origen") | |
| if not valid: return None, f"❌ {msg}" | |
| valid, msg = self.image_processor.validate_image(target_img, "Destino") | |
| if not valid: return None, f"❌ {msg}" | |
| analyzer, swapper, enhancer = self.model_manager.get_models() | |
| with self.process_timer("Procesamiento completo"): | |
| src_bgr = self.image_processor.convert_color(source_img, to_bgr=True) | |
| dst_bgr = self.image_processor.convert_color(target_img, to_bgr=True) | |
| src_bgr = self.image_processor.resize_if_needed(src_bgr) | |
| dst_bgr = self.image_processor.resize_if_needed(dst_bgr) | |
| with self.process_timer("Detección de caras"): | |
| src_faces = analyzer.get(src_bgr) | |
| dst_faces = analyzer.get(dst_bgr) | |
| if not src_faces: return None, "⚠️ No se detectaron caras en la imagen origen" | |
| if not dst_faces: return None, "⚠️ No se detectaron caras en la imagen destino" | |
| src_face = max(src_faces, key=lambda x: x.det_score) | |
| result_img = dst_bgr.copy() | |
| with self.process_timer("Intercambio facial"): | |
| for i, face in enumerate(dst_faces): | |
| result_img = swapper.get(result_img, face, src_face, paste_back=True) | |
| if enhance and enhancer is not None: | |
| with self.process_timer("Mejora GFPGAN"): | |
| try: | |
| _, _, result_img = enhancer.enhance( | |
| result_img, has_aligned=False, only_center_face=False, paste_back=True | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Error en mejora GFPGAN: {e}") | |
| result_rgb = self.image_processor.convert_color(result_img, to_bgr=False) | |
| stats = f"✅ Procesamiento completado\n • Caras: {len(dst_faces)}\n • Mejora HD: {'Sí' if enhance and enhancer else 'No'}" | |
| return result_rgb, stats | |
| except Exception as e: | |
| logger.error(f"Error en procesamiento: {e}") | |
| logger.error(traceback.format_exc()) | |
| return None, f"❌ Error: {str(e)}" | |
| # ============================================================================ | |
| # INTERFAZ DE USUARIO Y APP | |
| # ============================================================================ | |
| def create_app() -> gr.Blocks: | |
| model_manager = ModelManager() | |
| swapper_core = FaceSwapperCore() | |
| with gr.Blocks(title="Face Swapper HD Pro", theme=gr.themes.Soft(primary_hue="purple")) as app: | |
| gr.Markdown("<div align='center'><h1>🎭 Face Swapper HD Pro</h1></div>") | |
| with gr.Row(): | |
| with gr.Column(): | |
| source_img = gr.Image(label="Cara Origen", type="numpy") | |
| target_img = gr.Image(label="Imagen Destino", type="numpy") | |
| enhance = gr.Checkbox(label="✨ Mejora HD (GFPGAN)", value=model_manager.has_gfpgan) | |
| process_btn = gr.Button("🚀 Intercambiar Rostros", variant="primary") | |
| with gr.Column(): | |
| result_img = gr.Image(label="Resultado", type="numpy") | |
| status_output = gr.Textbox(label="Estado", interactive=False) | |
| stats_output = gr.JSON(label="Métricas") | |
| def process_wrapper(source, target, enhance_hd): | |
| result, message = swapper_core.process(source, target, enhance_hd) | |
| stats = {"status": "success" if result is not None else "error", "message": message} | |
| return result, stats, message | |
| process_btn.click( | |
| fn=process_wrapper, | |
| inputs=[source_img, target_img, enhance], | |
| outputs=[result_img, stats_output, status_output] | |
| ) | |
| return app | |
| def main(): | |
| try: | |
| logger.info("🚀 Iniciando Face Swapper HD Pro") | |
| port = int(os.environ.get("PORT", 7860)) | |
| app = create_app() | |
| app.launch( | |
| server_name="0.0.0.0", | |
| server_port=port, | |
| show_error=True | |
| ) | |
| except Exception as e: | |
| logger.error(f"💥 Error fatal: {e}") | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() | |