diff --git "a/api.py" "b/api.py" --- "a/api.py" +++ "b/api.py" @@ -1,1425 +1,1428 @@ -from __future__ import annotations -from fastapi import FastAPI, UploadFile, File,Query, Form, BackgroundTasks, HTTPException -from fastapi import Body -from fastapi.responses import JSONResponse, FileResponse -from fastapi.middleware.cors import CORSMiddleware -from pathlib import Path -import shutil -import uvicorn -import json -import uuid -from datetime import datetime -from typing import Dict -from enum import Enum -import os -import yaml -import io - -from video_processing import process_video_pipeline -from audio_tools import process_audio_for_video, extract_audio_ffmpeg, embed_voice_segments, VoiceEmbedder -from casting_loader import ensure_chroma, build_faces_index, build_voices_index -from narration_system import NarrationSystem -from llm_router import load_yaml, LLMRouter -from character_detection import detect_characters_from_video -from vision_tools import FaceOfImageEmbedding - -from pipelines.audiodescription import generate as ad_generate - -from storage.files.file_manager import FileManager -from storage.media_routers import router as media_router -from storage.db_routers import router as db_router -from storage.embeddings_routers import router as embeddings_router -from storage.pending_videos_routers import router as pending_videos_router -from main_process.main_router import router as main_router - -app = FastAPI(title="Veureu Engine API", version="0.2.0") -app.add_middleware( - CORSMiddleware, - allow_origins=["*"], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) - -ROOT = Path("/tmp/veureu") -ROOT.mkdir(parents=True, exist_ok=True) -TEMP_ROOT = Path("/tmp/temp") -TEMP_ROOT.mkdir(parents=True, exist_ok=True) -VIDEOS_ROOT = Path("/tmp/data/videos") -VIDEOS_ROOT.mkdir(parents=True, exist_ok=True) -IDENTITIES_ROOT = Path("/tmp/characters") -IDENTITIES_ROOT.mkdir(parents=True, exist_ok=True) - - -# Sistema de jobs asíncronos -class JobStatus(str, Enum): - QUEUED = "queued" - PROCESSING = "processing" - DONE = "done" - FAILED = "failed" - -jobs: Dict[str, dict] = {} - -app.include_router(main_router) -app.include_router(media_router) -app.include_router(db_router) -app.include_router(embeddings_router) -app.include_router(pending_videos_router) - -def describe_image_with_svision(image_path: str, is_face: bool = True) -> tuple[str, str]: - """ - Llama al space svision para describir una imagen (usado en generación de AD). - - Args: - image_path: Ruta absoluta a la imagen - is_face: True si es una cara, False si es una escena - - Returns: - tuple (descripción_completa, nombre_abreviado) - """ - try: - from pathlib import Path as _P - import yaml - from llm_router import LLMRouter - - # Cargar configuración - config_path = _P(__file__).parent / "config.yaml" - if not config_path.exists(): - print(f"[svision] Config no encontrado: {config_path}") - return ("", "") - - with open(config_path, 'r', encoding='utf-8') as f: - cfg = yaml.safe_load(f) or {} - - router = LLMRouter(cfg) - - # Contexto diferente para caras vs escenas - if is_face: - context = { - "task": "describe_person", - "instructions": "Descriu la persona en la imatge. Inclou: edat aproximada (jove/adult), gènere, característiques físiques notables (ulleres, barba, bigoti, etc.), expressió i vestimenta.", - "max_tokens": 256 - } - else: - context = { - "task": "describe_scene", - "instructions": "Descriu aquesta escena breument en 2-3 frases: tipus de localització i elements principals.", - "max_tokens": 128 - } - - # Llamar a svision - descriptions = router.vision_describe([str(image_path)], context=context, model="salamandra-vision") - full_description = descriptions[0] if descriptions else "" - - if not full_description: - return ("", "") - - print(f"[svision] Descripció generada: {full_description[:100]}...") - - return (full_description, "") - - except Exception as e: - print(f"[svision] Error al descriure imatge: {e}") - import traceback - traceback.print_exc() - return ("", "") - -def normalize_face_lighting(image): - """ - Normaliza el brillo de una imagen de cara usando técnicas combinadas: - 1. CLAHE para ecualización adaptativa - 2. Normalización de rango para homogeneizar brillo general - - Esto reduce el impacto de diferentes condiciones de iluminación en los embeddings - y en la visualización de las imágenes. - - Args: - image: Imagen BGR (OpenCV format) - - Returns: - Imagen normalizada en el mismo formato - """ - import cv2 - import numpy as np - - # Paso 1: Convertir a LAB color space (más robusto para iluminación) - lab = cv2.cvtColor(image, cv2.COLOR_BGR2LAB) - l, a, b = cv2.split(lab) - - # Paso 2: Aplicar CLAHE (Contrast Limited Adaptive Histogram Equalization) al canal L - # Usar clipLimit más alto para normalización más agresiva - clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) - l_clahe = clahe.apply(l) - - # Paso 3: Normalizar el rango del canal L para asegurar distribución uniforme - # Esto garantiza que todas las imágenes tengan un rango de brillo similar - l_min, l_max = l_clahe.min(), l_clahe.max() - if l_max > l_min: - # Estirar el histograma al rango completo [0, 255] - l_normalized = ((l_clahe - l_min) * 255.0 / (l_max - l_min)).astype(np.uint8) - else: - l_normalized = l_clahe - - # Paso 4: Aplicar suavizado suave para reducir ruido introducido por la normalización - l_normalized = cv2.GaussianBlur(l_normalized, (3, 3), 0) - - # Recombinar canales - lab_normalized = cv2.merge([l_normalized, a, b]) - - # Convertir de vuelta a BGR - normalized = cv2.cvtColor(lab_normalized, cv2.COLOR_LAB2BGR) - return normalized - -def hierarchical_cluster_with_min_size(X, max_groups: int, min_cluster_size: int, sensitivity: float = 0.5) -> np.ndarray: - """ - Clustering jerárquico con silhouette score para encontrar automáticamente el mejor número de clusters. - Selecciona automáticamente el mejor número de clusters (hasta max_groups) usando silhouette score. - Filtra clusters con menos de min_cluster_size muestras (marcados como -1/ruido). - - Args: - X: Array de embeddings (N, D) - max_groups: Número máximo de clusters a formar - min_cluster_size: Tamaño mínimo de cluster válido - sensitivity: Sensibilidad del clustering (0.0-1.0) - - 0.0 = muy agresivo (menos clusters) - - 0.5 = balanceado (recomendado) - - 1.0 = muy permisivo (más clusters) - - Returns: - Array de labels (N,) donde -1 indica ruido - """ - import numpy as np - from scipy.cluster.hierarchy import linkage, fcluster - from sklearn.metrics import silhouette_score - from collections import Counter - - if len(X) == 0: - return np.array([]) - - if len(X) < min_cluster_size: - # Si hay menos muestras que el mínimo, todo es ruido - return np.full(len(X), -1, dtype=int) - - # Linkage usando average linkage (más flexible que ward, menos sensible a outliers) - # Esto ayuda a agrupar mejor la misma persona con diferentes ángulos/expresiones - Z = linkage(X, method='average', metric='cosine') # Cosine similarity para embeddings - - # Encontrar el número óptimo de clusters usando silhouette score - best_n_clusters = 2 - best_score = -1 - - # Probar diferentes números de clusters (de 2 a max_groups) - max_to_try = min(max_groups, len(X) - 1) # No puede haber más clusters que muestras - - if max_to_try >= 2: - for n_clusters in range(2, max_to_try + 1): - trial_labels = fcluster(Z, t=n_clusters, criterion='maxclust') - 1 - - # Calcular cuántos clusters válidos tendríamos después del filtrado - trial_counts = Counter(trial_labels) - valid_clusters = sum(1 for count in trial_counts.values() if count >= min_cluster_size) - - # Solo evaluar si hay al menos 2 clusters válidos - if valid_clusters >= 2: - try: - score = silhouette_score(X, trial_labels, metric='cosine') - # Penalización dinámica basada en sensibilidad: - # - sensitivity=0.0 → penalty=0.14 (muy agresivo, menos clusters) - # - sensitivity=0.5 → penalty=0.07 (balanceado, recomendado) - # - sensitivity=1.0 → penalty=0.01 (permisivo, más clusters) - penalty = 0.14 - (sensitivity * 0.13) - adjusted_score = score - (n_clusters * penalty) - - if adjusted_score > best_score: - best_score = adjusted_score - best_n_clusters = n_clusters - except: - pass # Si falla el cálculo, ignorar esta configuración - - # Usar el número óptimo de clusters encontrado - penalty = 0.14 - (sensitivity * 0.13) - print(f"Clustering óptimo: {best_n_clusters} clusters (de máximo {max_groups}), sensitivity={sensitivity:.2f}, penalty={penalty:.3f}, silhouette={best_score:.3f}") - labels = fcluster(Z, t=best_n_clusters, criterion='maxclust') - - # fcluster devuelve labels 1-indexed, convertir a 0-indexed - labels = labels - 1 - - # Filtrar clusters pequeños - label_counts = Counter(labels) - filtered_labels = [] - for lbl in labels: - if label_counts[lbl] >= min_cluster_size: - filtered_labels.append(lbl) - else: - filtered_labels.append(-1) # Ruido - - return np.array(filtered_labels, dtype=int) - -@app.get("/") -def root(): - return {"ok": True, "service": "veureu-engine"} - -@app.post("/process_video") -async def process_video( - video_file: UploadFile = File(...), - config_path: str = Form("config.yaml"), - out_root: str = Form("results"), - db_dir: str = Form("chroma_db"), -): - tmp_video = ROOT / video_file.filename - with tmp_video.open("wb") as f: - shutil.copyfileobj(video_file.file, f) - result = process_video_pipeline(str(tmp_video), config_path=config_path, out_root=out_root, db_dir=db_dir) - return JSONResponse(result) - -@app.post("/create_initial_casting") -async def create_initial_casting( - background_tasks: BackgroundTasks, - video: UploadFile = File(...), - max_groups: int = Form(default=3), - min_cluster_size: int = Form(default=3), - face_sensitivity: float = Form(default=0.5), - voice_max_groups: int = Form(default=3), - voice_min_cluster_size: int = Form(default=3), - voice_sensitivity: float = Form(default=0.5), - max_frames: int = Form(default=100), -): - """ - Crea un job para procesar el vídeo de forma asíncrona usando clustering jerárquico. - Devuelve un job_id inmediatamente. - """ - # Guardar vídeo en carpeta de datos - video_name = Path(video.filename).stem - dst_video = VIDEOS_ROOT / f"{video_name}.mp4" - with dst_video.open("wb") as f: - shutil.copyfileobj(video.file, f) - - # Crear job_id único - job_id = str(uuid.uuid4()) - - # Inicializar el job - jobs[job_id] = { - "id": job_id, - "status": JobStatus.QUEUED, - "video_path": str(dst_video), - "video_name": video_name, - "max_groups": int(max_groups), - "min_cluster_size": int(min_cluster_size), - "face_sensitivity": float(face_sensitivity), - "voice_max_groups": int(voice_max_groups), - "voice_min_cluster_size": int(voice_min_cluster_size), - "voice_sensitivity": float(voice_sensitivity), - "max_frames": int(max_frames), - "created_at": datetime.now().isoformat(), - "results": None, - "error": None - } - - print(f"[{job_id}] Job creado para vídeo: {video_name}") - - # Iniciar procesamiento en background - background_tasks.add_task(process_video_job, job_id) - - # Devolver job_id inmediatamente - return {"job_id": job_id} - -@app.get("/jobs/{job_id}/status") -def get_job_status(job_id: str): - """ - Devuelve el estado actual de un job. - El UI hace polling de este endpoint cada 5 segundos. - """ - if job_id not in jobs: - raise HTTPException(status_code=404, detail="Job not found") - - job = jobs[job_id] - - # Normalizar el estado a string - status_value = job["status"].value if isinstance(job["status"], JobStatus) else str(job["status"]) - response = {"status": status_value} - - # Incluir resultados si existen (evita condiciones de carrera) - if job.get("results") is not None: - response["results"] = job["results"] - - # Incluir error si existe - if job.get("error"): - response["error"] = job["error"] - - return response - -@app.get("/files/{video_name}/{char_id}/{filename}") -def serve_character_file(video_name: str, char_id: str, filename: str): - """ - Sirve archivos estáticos de personajes (imágenes). - Ejemplo: /files/dif_catala_1/char1/representative.jpg - """ - # Las caras se guardan en /tmp/temp/