Andro0s's picture
Update app.py
3e1e1f1 verified
"""
Face Swapper HD Professional - Optimized for Hugging Face Spaces
Author: AI Assistant
Version: 2.2.0
License: MIT
"""
import os
import sys
import gc
import json
import warnings
from contextlib import contextmanager
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Tuple, List, Any, Dict, Generator
from types import ModuleType
# --- PARCHE DE COMPATIBILIDAD TOTAL PARA TORCHVISION (Versión 2026) ---
# Este bloque debe ejecutarse antes que cualquier importación de IA
try:
import torchvision.transforms.functional as F
# Creamos el módulo fantasma que las librerías antiguas (GFPGAN/facexlib) buscan
mock_ft = ModuleType("torchvision.transforms.functional_tensor")
# Inyectamos las funciones críticas para procesamiento profesional
mock_ft.rgb_to_grayscale = F.rgb_to_grayscale
mock_ft.to_tensor = F.to_tensor
mock_ft.normalize = F.normalize
mock_ft.convert_image_dtype = F.convert_image_dtype
# Lo registramos globalmente en el sistema
sys.modules["torchvision.transforms.functional_tensor"] = mock_ft
print("✅ Parche de compatibilidad Vision aplicado exitosamente.")
except Exception as e:
print(f"⚠️ Aviso: No se pudo aplicar el parche: {e}")
# -----------------------------------------------------------------------
import cv2
import gradio as gr
import numpy as np
from pathlib import Path
import insightface
from insightface.app import FaceAnalysis
# Configuración inicial
warnings.filterwarnings('ignore')
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
# Logger profesional
import logging
import time
import traceback
# ============================================================================
# CONFIGURACIÓN Y CONSTANTES
# ============================================================================
class ModelStatus(Enum):
"""Estado de los modelos."""
NOT_LOADED = "not_loaded"
LOADING = "loading"
LOADED = "loaded"
ERROR = "error"
@dataclass
class AppConfig:
"""Configuración de la aplicación."""
model_dir: Path = Path(".")
swapper_model: Path = Path("inswapper_128.onnx")
enhancer_model: Path = Path("GFPGANv1.4.pth")
det_size: Tuple[int, int] = (640, 640)
max_image_size: int = 2048
device: str = "cpu"
providers: List[str] = None
def __post_init__(self):
if self.providers is None:
self.providers = ['CPUExecutionProvider']
# ============================================================================
# MANEJO DE LOGGING PROFESIONAL
# ============================================================================
class ColorFormatter(logging.Formatter):
"""Formateador de logs con colores."""
COLORS = {
'DEBUG': '\033[36m', # Cyan
'INFO': '\033[32m', # Verde
'WARNING': '\033[33m', # Amarillo
'ERROR': '\033[31m', # Rojo
'CRITICAL': '\033[41m', # Rojo fondo
}
RESET = '\033[0m'
def format(self, record):
color = self.COLORS.get(record.levelname, '')
record.levelname = f"{color}{record.levelname}{self.RESET}"
record.msg = f"{color}{record.msg}{self.RESET}"
return super().format(record)
def setup_logger(name: str) -> logging.Logger:
"""Configura un logger profesional."""
logger = logging.getLogger(name)
if not logger.handlers:
logger.setLevel(logging.INFO)
# Handler para consola con colores
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_formatter = ColorFormatter(
'%(asctime)s | %(levelname)-8s | %(name)s:%(funcName)s:%(lineno)d - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)
# Handler para archivo
file_handler = logging.FileHandler('face_swap.log', encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
file_formatter = logging.Formatter(
'%(asctime)s | %(levelname)-8s | %(name)s:%(funcName)s:%(lineno)d - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
return logger
logger = setup_logger(__name__)
# ============================================================================
# MANEJO DE MODELOS CON PATRÓN SINGLETON
# ============================================================================
class ModelManager:
"""Gestor centralizado de modelos con cache y lazy loading."""
_instance = None
_initialized = False
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if not self._initialized:
self.config = AppConfig()
self.status = ModelStatus.NOT_LOADED
self._face_analyzer = None
self._swapper = None
self._enhancer = None
self._has_gfpgan = False
self._load_times = {}
self._initialized = True
self._check_gfpgan_availability()
def _check_gfpgan_availability(self) -> None:
"""Verifica disponibilidad de GFPGAN de forma segura."""
try:
import importlib.util
gfpgan_spec = importlib.util.find_spec("gfpgan")
if gfpgan_spec is not None:
self._has_gfpgan = True
logger.info("GFPGAN está disponible en el sistema")
else:
logger.warning("GFPGAN no encontrado, mejora HD deshabilitada")
except Exception as e:
logger.warning(f"Error verificando GFPGAN: {e}")
self._has_gfpgan = False
@property
def has_gfpgan(self) -> bool:
"""Retorna si GFPGAN está disponible."""
return self._has_gfpgan
@contextmanager
def timer(self, operation: str) -> Generator[None, None, None]:
"""Context manager para medir tiempos de operación."""
start_time = time.time()
yield
elapsed = time.time() - start_time
self._load_times[operation] = elapsed
logger.info(f"{operation} completado en {elapsed:.2f}s")
def validate_models(self) -> Tuple[bool, Dict[str, str]]:
"""Valida existencia de archivos de modelos."""
missing = {}
# Verificar modelo de intercambio
if not self.config.swapper_model.exists():
missing['swapper'] = str(self.config.swapper_model)
# Verificar GFPGAN si está disponible
if self._has_gfpgan and not self.config.enhancer_model.exists():
missing['enhancer'] = str(self.config.enhancer_model)
return len(missing) == 0, missing
def load_models(self) -> Tuple[bool, str]:
"""Carga todos los modelos necesarios."""
if self.status == ModelStatus.LOADED:
return True, "Modelos ya cargados"
if self.status == ModelStatus.LOADING:
return False, "Los modelos ya se están cargando"
self.status = ModelStatus.LOADING
logger.info("Iniciando carga de modelos...")
try:
# 1. Cargar FaceAnalyzer
with self.timer("Carga FaceAnalyzer"):
self._face_analyzer = FaceAnalysis(
name='buffalo_l',
providers=self.config.providers
)
self._face_analyzer.prepare(
ctx_id=0,
det_size=self.config.det_size
)
# 2. Cargar modelo de intercambio
with self.timer("Carga modelo de intercambio"):
self._swapper = insightface.model_zoo.get_model(
str(self.config.swapper_model),
download=False
)
# 3. Cargar GFPGAN si está disponible
if self._has_gfpgan and self.config.enhancer_model.exists():
with self.timer("Carga GFPGAN"):
from gfpgan import GFPGANer
self._enhancer = GFPGANer(
model_path=str(self.config.enhancer_model),
upscale=1,
arch='clean',
channel_multiplier=2,
bg_upsampler=None,
device=self.config.device
)
self.status = ModelStatus.LOADED
total_time = sum(self._load_times.values())
summary = (
f"✅ Modelos cargados exitosamente en {total_time:.2f}s\n"
f" • FaceAnalyzer: {self._load_times.get('Carga FaceAnalyzer', 0):.2f}s\n"
f" • Face Swapper: {self._load_times.get('Carga modelo de intercambio', 0):.2f}s\n"
f" • GFPGAN: {self._load_times.get('Carga GFPGAN', 0):.2f}s"
)
logger.info(summary)
return True, summary
except Exception as e:
self.status = ModelStatus.ERROR
logger.error(f"Error cargando modelos: {e}")
logger.error(traceback.format_exc())
return False, f"Error al cargar modelos: {str(e)}"
def get_models(self) -> Tuple[Any, Any, Any]:
"""Obtiene instancias de modelos, cargándolos si es necesario."""
if self.status != ModelStatus.LOADED:
success, msg = self.load_models()
if not success:
raise RuntimeError(msg)
return self._face_analyzer, self._swapper, self._enhancer
def cleanup(self) -> None:
"""Limpia recursos de modelos."""
logger.info("Limpiando recursos de modelos...")
self._face_analyzer = None
self._swapper = None
self._enhancer = None
gc.collect()
self.status = ModelStatus.NOT_LOADED
# ============================================================================
# PROCESAMIENTO DE IMÁGENES
# ============================================================================
class ImageProcessor:
"""Procesador de imágenes con optimizaciones."""
@staticmethod
def validate_image(image: np.ndarray, name: str = "imagen") -> Tuple[bool, str]:
"""Valida que una imagen sea adecuada para procesamiento."""
if image is None:
return False, f"{name}: No proporcionada"
if not isinstance(image, np.ndarray):
return False, f"{name}: Debe ser numpy array"
if len(image.shape) != 3 or image.shape[2] != 3:
return False, f"{name}: Formato debe ser (H, W, 3)"
h, w = image.shape[:2]
if h == 0 or w == 0:
return False, f"{name}: Dimensiones inválidas"
if h * w > 4096 * 4096: # Límite de 16MP
return False, f"{name}: Demasiado grande ({h}x{w})"
return True, f"{name}: Válida ({h}x{w})"
@staticmethod
def resize_if_needed(image: np.ndarray, max_size: int = 2048) -> np.ndarray:
"""Redimensiona imagen si es muy grande."""
h, w = image.shape[:2]
if max(h, w) > max_size:
scale = max_size / max(h, w)
new_h, new_w = int(h * scale), int(w * scale)
logger.info(f"Redimensionando imagen de {w}x{h} a {new_w}x{new_h}")
return cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA)
return image
@staticmethod
def convert_color(image: np.ndarray, to_bgr: bool = True) -> np.ndarray:
"""Convierte entre RGB y BGR."""
if to_bgr:
return cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
return cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# ============================================================================
# CORE DEL FACE SWAPPER
# ============================================================================
class FaceSwapperCore:
"""Núcleo principal del sistema de intercambio facial."""
def __init__(self):
self.model_manager = ModelManager()
self.image_processor = ImageProcessor()
@contextmanager
def process_timer(self, operation: str):
"""Timer para operaciones de procesamiento."""
start = time.time()
yield
elapsed = time.time() - start
logger.info(f"{operation}: {elapsed:.2f}s")
def process(
self,
source_img: np.ndarray,
target_img: np.ndarray,
enhance: bool = False
) -> Tuple[Optional[np.ndarray], str]:
"""Procesa el intercambio facial."""
try:
valid, msg = self.image_processor.validate_image(source_img, "Origen")
if not valid: return None, f"❌ {msg}"
valid, msg = self.image_processor.validate_image(target_img, "Destino")
if not valid: return None, f"❌ {msg}"
analyzer, swapper, enhancer = self.model_manager.get_models()
with self.process_timer("Procesamiento completo"):
src_bgr = self.image_processor.convert_color(source_img, to_bgr=True)
dst_bgr = self.image_processor.convert_color(target_img, to_bgr=True)
src_bgr = self.image_processor.resize_if_needed(src_bgr)
dst_bgr = self.image_processor.resize_if_needed(dst_bgr)
with self.process_timer("Detección de caras"):
src_faces = analyzer.get(src_bgr)
dst_faces = analyzer.get(dst_bgr)
if not src_faces: return None, "⚠️ No se detectaron caras en la imagen origen"
if not dst_faces: return None, "⚠️ No se detectaron caras en la imagen destino"
src_face = max(src_faces, key=lambda x: x.det_score)
result_img = dst_bgr.copy()
with self.process_timer("Intercambio facial"):
for i, face in enumerate(dst_faces):
result_img = swapper.get(result_img, face, src_face, paste_back=True)
if enhance and enhancer is not None:
with self.process_timer("Mejora GFPGAN"):
try:
_, _, result_img = enhancer.enhance(
result_img, has_aligned=False, only_center_face=False, paste_back=True
)
except Exception as e:
logger.warning(f"Error en mejora GFPGAN: {e}")
result_rgb = self.image_processor.convert_color(result_img, to_bgr=False)
stats = f"✅ Procesamiento completado\n • Caras: {len(dst_faces)}\n • Mejora HD: {'Sí' if enhance and enhancer else 'No'}"
return result_rgb, stats
except Exception as e:
logger.error(f"Error en procesamiento: {e}")
logger.error(traceback.format_exc())
return None, f"❌ Error: {str(e)}"
# ============================================================================
# INTERFAZ DE USUARIO Y APP
# ============================================================================
def create_app() -> gr.Blocks:
model_manager = ModelManager()
swapper_core = FaceSwapperCore()
with gr.Blocks(title="Face Swapper HD Pro", theme=gr.themes.Soft(primary_hue="purple")) as app:
gr.Markdown("<div align='center'><h1>🎭 Face Swapper HD Pro</h1></div>")
with gr.Row():
with gr.Column():
source_img = gr.Image(label="Cara Origen", type="numpy")
target_img = gr.Image(label="Imagen Destino", type="numpy")
enhance = gr.Checkbox(label="✨ Mejora HD (GFPGAN)", value=model_manager.has_gfpgan)
process_btn = gr.Button("🚀 Intercambiar Rostros", variant="primary")
with gr.Column():
result_img = gr.Image(label="Resultado", type="numpy")
status_output = gr.Textbox(label="Estado", interactive=False)
stats_output = gr.JSON(label="Métricas")
def process_wrapper(source, target, enhance_hd):
result, message = swapper_core.process(source, target, enhance_hd)
stats = {"status": "success" if result is not None else "error", "message": message}
return result, stats, message
process_btn.click(
fn=process_wrapper,
inputs=[source_img, target_img, enhance],
outputs=[result_img, stats_output, status_output]
)
return app
def main():
try:
logger.info("🚀 Iniciando Face Swapper HD Pro")
port = int(os.environ.get("PORT", 7860))
app = create_app()
app.launch(
server_name="0.0.0.0",
server_port=port,
show_error=True
)
except Exception as e:
logger.error(f"💥 Error fatal: {e}")
sys.exit(1)
if __name__ == "__main__":
main()