Spaces:
Build error
Build error
| import os | |
| import numpy as np | |
| from PIL import Image | |
| import gradio as gr | |
| from deepface import DeepFace | |
| from datasets import load_dataset | |
| import pickle | |
| from io import BytesIO | |
| from huggingface_hub import upload_file, hf_hub_download, list_repo_files | |
| from pathlib import Path | |
| import gc | |
| import requests | |
| import time | |
| import shutil | |
| import tarfile | |
| import tensorflow as tf | |
| # Configuración de GPU | |
| print("Dispositivos GPU disponibles:", tf.config.list_physical_devices('GPU')) | |
| # Configurar memoria GPU | |
| gpus = tf.config.list_physical_devices('GPU') | |
| if gpus: | |
| try: | |
| # Permitir crecimiento de memoria | |
| for gpu in gpus: | |
| tf.config.experimental.set_memory_growth(gpu, True) | |
| print("✅ GPU configurada correctamente") | |
| # Configurar para usar solo GPU | |
| tf.config.set_visible_devices(gpus[0], 'GPU') | |
| print(f"✅ Usando GPU: {gpus[0]}") | |
| except RuntimeError as e: | |
| print(f"⚠️ Error configurando GPU: {e}") | |
| else: | |
| print("⚠️ No se detectó GPU, usando CPU") | |
| # Configurar para usar mixed precision | |
| tf.keras.mixed_precision.set_global_policy('mixed_float16') | |
| # 🔁 Limpiar almacenamiento temporal si existe | |
| def clean_temp_dirs(): | |
| print("🧹 Limpiando carpetas temporales...") | |
| for folder in ["embeddings", "batches"]: | |
| path = Path(folder) | |
| if path.exists() and path.is_dir(): | |
| shutil.rmtree(path) | |
| print(f"✅ Carpeta eliminada: {folder}") | |
| path.mkdir(exist_ok=True) | |
| clean_temp_dirs() | |
| # 📁 Parámetros | |
| DATASET_ID = "Segizu/facial-recognition-preview" | |
| EMBEDDINGS_SUBFOLDER = "embeddings" | |
| LOCAL_EMB_DIR = Path("embeddings") | |
| LOCAL_EMB_DIR.mkdir(exist_ok=True) | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| headers = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {} | |
| # 💾 Configuración | |
| MAX_TEMP_STORAGE_GB = 40 | |
| UPLOAD_EVERY = 50 | |
| def get_folder_size(path): | |
| total = 0 | |
| for dirpath, _, filenames in os.walk(path): | |
| for f in filenames: | |
| fp = os.path.join(dirpath, f) | |
| total += os.path.getsize(fp) | |
| return total / (1024 ** 3) | |
| def preprocess_image(img: Image.Image) -> np.ndarray: | |
| # Convertir a RGB si no lo es | |
| if img.mode != 'RGB': | |
| img = img.convert('RGB') | |
| # Obtener la orientación EXIF si existe | |
| try: | |
| exif = img._getexif() | |
| if exif is not None: | |
| orientation = exif.get(274) # 274 es el tag de orientación en EXIF | |
| if orientation is not None: | |
| # Rotar la imagen según la orientación EXIF | |
| if orientation == 3: | |
| img = img.rotate(180, expand=True) | |
| elif orientation == 6: | |
| img = img.rotate(270, expand=True) | |
| elif orientation == 8: | |
| img = img.rotate(90, expand=True) | |
| except: | |
| pass # Si no hay EXIF o hay error, continuamos con la imagen original | |
| # Intentar detectar la orientación del rostro | |
| try: | |
| # Convertir a array numpy para DeepFace | |
| img_array = np.array(img) | |
| # Detectar rostros con GPU | |
| face_objs = DeepFace.extract_faces( | |
| img_path=img_array, | |
| target_size=(160, 160), | |
| detector_backend='retinaface', | |
| enforce_detection=False | |
| ) | |
| if face_objs and len(face_objs) > 0: | |
| # Si se detecta un rostro, usar la imagen detectada | |
| img_array = face_objs[0]['face'] | |
| return img_array | |
| except: | |
| pass # Si falla la detección, continuamos con el procesamiento normal | |
| # Si no se detectó rostro o falló la detección, redimensionar la imagen original | |
| img_resized = img.resize((160, 160), Image.Resampling.LANCZOS) | |
| return np.array(img_resized) | |
| # ✅ Cargar CSV desde el dataset | |
| dataset = load_dataset( | |
| "csv", | |
| data_files="metadata.csv", | |
| split="train", | |
| column_names=["image"], | |
| header=0 | |
| ) | |
| def build_database(): | |
| print(f"📊 Uso actual de almacenamiento temporal INICIO: {get_folder_size('.'):.2f} GB") | |
| print("🔄 Generando embeddings...") | |
| batch_size = 10 | |
| archive_batch_size = 50 | |
| batch_files = [] | |
| batch_index = 0 | |
| ARCHIVE_DIR = Path("batches") | |
| ARCHIVE_DIR.mkdir(exist_ok=True) | |
| for i in range(0, len(dataset), batch_size): | |
| batch = dataset[i:i + batch_size] | |
| print(f"📦 Lote {i // batch_size + 1}/{(len(dataset) + batch_size - 1) // batch_size}") | |
| for j in range(len(batch["image"])): | |
| image_url = batch["image"][j] | |
| if not isinstance(image_url, str) or not image_url.startswith("http") or image_url.strip().lower() == "image": | |
| print(f"⚠️ Saltando {i + j} - URL inválida: {image_url}") | |
| continue | |
| name = f"image_{i + j}" | |
| filename = LOCAL_EMB_DIR / f"{name}.pkl" | |
| # Verificar si ya fue subido | |
| try: | |
| hf_hub_download( | |
| repo_id=DATASET_ID, | |
| repo_type="dataset", | |
| filename=f"{EMBEDDINGS_SUBFOLDER}/batch_{batch_index:03}.tar.gz", | |
| token=HF_TOKEN | |
| ) | |
| print(f"⏩ Ya existe en remoto: {name}.pkl") | |
| continue | |
| except: | |
| pass | |
| try: | |
| response = requests.get(image_url, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| img = Image.open(BytesIO(response.content)).convert("RGB") | |
| img_processed = preprocess_image(img) | |
| embedding = DeepFace.represent( | |
| img_path=img_processed, | |
| model_name="Facenet", | |
| enforce_detection=False | |
| )[0]["embedding"] | |
| with open(filename, "wb") as f: | |
| pickle.dump({"name": name, "img": img, "embedding": embedding}, f) | |
| batch_files.append(filename) | |
| del img_processed | |
| gc.collect() | |
| if len(batch_files) >= archive_batch_size or get_folder_size(".") > MAX_TEMP_STORAGE_GB: | |
| archive_path = ARCHIVE_DIR / f"batch_{batch_index:03}.tar.gz" | |
| with tarfile.open(archive_path, "w:gz") as tar: | |
| for file in batch_files: | |
| tar.add(file, arcname=file.name) | |
| print(f"📦 Empaquetado: {archive_path}") | |
| upload_file( | |
| path_or_fileobj=str(archive_path), | |
| path_in_repo=f"{EMBEDDINGS_SUBFOLDER}/{archive_path.name}", | |
| repo_id=DATASET_ID, | |
| repo_type="dataset", | |
| token=HF_TOKEN | |
| ) | |
| print(f"✅ Subido: {archive_path.name}") | |
| for f in batch_files: | |
| f.unlink() | |
| archive_path.unlink() | |
| print("🧹 Limpieza completada tras subida") | |
| batch_files = [] | |
| batch_index += 1 | |
| time.sleep(2) | |
| print(f"📊 Uso actual FINAL: {get_folder_size('.'):.2f} GB") | |
| except Exception as e: | |
| print(f"❌ Error en {name}: {e}") | |
| continue | |
| if batch_files: | |
| archive_path = ARCHIVE_DIR / f"batch_{batch_index:03}.tar.gz" | |
| with tarfile.open(archive_path, "w:gz") as tar: | |
| for file in batch_files: | |
| tar.add(file, arcname=file.name) | |
| print(f"📦 Empaquetado final: {archive_path}") | |
| upload_file( | |
| path_or_fileobj=str(archive_path), | |
| path_in_repo=f"{EMBEDDINGS_SUBFOLDER}/{archive_path.name}", | |
| repo_id=DATASET_ID, | |
| repo_type="dataset", | |
| token=HF_TOKEN | |
| ) | |
| for f in batch_files: | |
| f.unlink() | |
| archive_path.unlink() | |
| print("✅ Subida y limpieza final") | |
| # 🔍 Buscar similitudes | |
| def find_similar_faces(uploaded_image: Image.Image): | |
| if uploaded_image is None: | |
| return [], "⚠ Por favor, sube una imagen primero" | |
| try: | |
| print("🔄 Procesando imagen de entrada...") | |
| # Convertir a RGB si no lo es | |
| if uploaded_image.mode != 'RGB': | |
| uploaded_image = uploaded_image.convert('RGB') | |
| # Mostrar dimensiones de la imagen | |
| print(f"📐 Dimensiones de la imagen: {uploaded_image.size}") | |
| img_processed = preprocess_image(uploaded_image) | |
| print("✅ Imagen preprocesada correctamente") | |
| # Intentar primero con enforce_detection=True | |
| try: | |
| query_embedding = DeepFace.represent( | |
| img_path=img_processed, | |
| model_name="Facenet", | |
| enforce_detection=True, | |
| detector_backend='retinaface' | |
| )[0]["embedding"] | |
| print("✅ Rostro detectado con enforce_detection=True") | |
| except Exception as e: | |
| print(f"⚠ No se pudo detectar rostro con enforce_detection=True, intentando con False: {str(e)}") | |
| # Si falla, intentar con enforce_detection=False | |
| query_embedding = DeepFace.represent( | |
| img_path=img_processed, | |
| model_name="Facenet", | |
| enforce_detection=False, | |
| detector_backend='retinaface' | |
| )[0]["embedding"] | |
| print("✅ Embedding generado con enforce_detection=False") | |
| del img_processed | |
| gc.collect() | |
| except Exception as e: | |
| print(f"❌ Error en procesamiento de imagen: {str(e)}") | |
| return [], f"⚠ Error procesando imagen: {str(e)}" | |
| similarities = [] | |
| print("🔍 Buscando similitudes en la base de datos...") | |
| try: | |
| embedding_files = [ | |
| f for f in list_repo_files(DATASET_ID, repo_type="dataset", token=HF_TOKEN) | |
| if f.startswith(f"{EMBEDDINGS_SUBFOLDER}/") and f.endswith(".tar.gz") | |
| ] | |
| print(f"📁 Encontrados {len(embedding_files)} archivos de embeddings") | |
| except Exception as e: | |
| print(f"❌ Error obteniendo archivos: {str(e)}") | |
| return [], f"⚠ Error obteniendo archivos: {str(e)}" | |
| # Procesar en lotes para mejor rendimiento | |
| batch_size = 10 | |
| for i in range(0, len(embedding_files), batch_size): | |
| batch_files = embedding_files[i:i + batch_size] | |
| print(f"📦 Procesando lote {i//batch_size + 1}/{(len(embedding_files) + batch_size - 1)//batch_size}") | |
| for file_path in batch_files: | |
| try: | |
| file_bytes = requests.get( | |
| f"https://huggingface.co/datasets/{DATASET_ID}/resolve/main/{file_path}", | |
| headers=headers, | |
| timeout=30 | |
| ).content | |
| # Crear un archivo temporal para el tar.gz | |
| temp_archive = Path("temp_archive.tar.gz") | |
| with open(temp_archive, "wb") as f: | |
| f.write(file_bytes) | |
| # Extraer el contenido | |
| with tarfile.open(temp_archive, "r:gz") as tar: | |
| tar.extractall(path="temp_extract") | |
| # Procesar cada archivo .pkl en el tar | |
| for pkl_file in Path("temp_extract").glob("*.pkl"): | |
| with open(pkl_file, "rb") as f: | |
| record = pickle.load(f) | |
| name = record["name"] | |
| img = record["img"] | |
| emb = record["embedding"] | |
| dist = np.linalg.norm(np.array(query_embedding) - np.array(emb)) | |
| sim_score = 1 / (1 + dist) | |
| similarities.append((sim_score, name, np.array(img))) | |
| # Limpiar archivos temporales | |
| shutil.rmtree("temp_extract") | |
| temp_archive.unlink() | |
| except Exception as e: | |
| print(f"⚠ Error procesando {file_path}: {e}") | |
| continue | |
| if not similarities: | |
| return [], "⚠ No se encontraron similitudes en la base de datos" | |
| print(f"✅ Encontradas {len(similarities)} similitudes") | |
| similarities.sort(reverse=True) | |
| top = similarities[:5] | |
| gallery = [(img, f"{name} - Similitud: {sim:.2f}") for sim, name, img in top] | |
| summary = "\n".join([f"{name} - Similitud: {sim:.2f}" for sim, name, _ in top]) | |
| return gallery, summary | |
| # 🎛️ Interfaz Gradio | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## 🔍 Reconocimiento facial con DeepFace + ZeroGPU") | |
| with gr.Row(): | |
| image_input = gr.Image(label="📤 Sube una imagen", type="pil") | |
| find_btn = gr.Button("🔎 Buscar similares") | |
| gallery = gr.Gallery(label="📸 Rostros similares") | |
| summary = gr.Textbox(label="🧠 Detalle", lines=6) | |
| find_btn.click(fn=find_similar_faces, inputs=image_input, outputs=[gallery, summary]) | |
| with gr.Row(): | |
| build_btn = gr.Button("⚙️ Construir base de embeddings (usa GPU)") | |
| build_btn.click(fn=build_database, inputs=[], outputs=[]) | |
| demo.launch(share=True) | |