from __future__ import annotations from fastapi import APIRouter, UploadFile, File, Form, BackgroundTasks, HTTPException, Body from fastapi.responses import FileResponse from pathlib import Path from datetime import datetime from enum import Enum from typing import Dict import shutil import os import uuid from video_processing import process_video_pipeline from audio_tools import process_audio_for_video, extract_audio_ffmpeg, embed_voice_segments, VoiceEmbedder from casting_loader import ensure_chroma, build_faces_index, build_voices_index from narration_system import NarrationSystem from llm_router import load_yaml, LLMRouter from character_detection import detect_characters_from_video from vision_tools import FaceOfImageEmbedding from pipelines.audiodescription import generate as ad_generate ROOT = Path("/tmp/veureu") ROOT.mkdir(parents=True, exist_ok=True) TEMP_ROOT = Path("/tmp/temp") TEMP_ROOT.mkdir(parents=True, exist_ok=True) VIDEOS_ROOT = Path("/tmp/data/videos") VIDEOS_ROOT.mkdir(parents=True, exist_ok=True) IDENTITIES_ROOT = Path("/tmp/characters") IDENTITIES_ROOT.mkdir(parents=True, exist_ok=True) class JobStatus(str, Enum): QUEUED = "queued" PROCESSING = "processing" DONE = "done" FAILED = "failed" jobs: Dict[str, dict] = {} router = APIRouter(tags=["Preprocessing Manager"]) @router.post("/create_initial_casting") async def create_initial_casting( background_tasks: BackgroundTasks, video: UploadFile = File(...), max_groups: int = Form(default=3), min_cluster_size: int = Form(default=3), face_sensitivity: float = Form(default=0.5), voice_max_groups: int = Form(default=3), voice_min_cluster_size: int = Form(default=3), voice_sensitivity: float = Form(default=0.5), max_frames: int = Form(default=100), ): video_name = Path(video.filename).stem dst_video = VIDEOS_ROOT / f"{video_name}.mp4" with dst_video.open("wb") as f: shutil.copyfileobj(video.file, f) job_id = str(uuid.uuid4()) jobs[job_id] = { "id": job_id, "status": JobStatus.QUEUED, "video_path": str(dst_video), "video_name": video_name, "max_groups": int(max_groups), "min_cluster_size": int(min_cluster_size), "face_sensitivity": float(face_sensitivity), "voice_max_groups": int(voice_max_groups), "voice_min_cluster_size": int(voice_min_cluster_size), "voice_sensitivity": float(voice_sensitivity), "max_frames": int(max_frames), "created_at": datetime.now().isoformat(), "results": None, "error": None, } print(f"[{job_id}] Job creado para vídeo: {video_name}") background_tasks.add_task(process_video_job, job_id) return {"job_id": job_id} @router.get("/jobs/{job_id}/status") def get_job_status(job_id: str): if job_id not in jobs: raise HTTPException(status_code=404, detail="Job not found") job = jobs[job_id] status_value = job["status"].value if isinstance(job["status"], JobStatus) else str(job["status"]) response = {"status": status_value} if job.get("results") is not None: response["results"] = job["results"] if job.get("error"): response["error"] = job["error"] return response @router.get("/files/{video_name}/{char_id}/{filename}") def serve_character_file(video_name: str, char_id: str, filename: str): file_path = TEMP_ROOT / video_name / "characters" / char_id / filename if not file_path.exists(): raise HTTPException(status_code=404, detail="File not found") return FileResponse(file_path) @router.get("/audio/{video_name}/{filename}") def serve_audio_file(video_name: str, filename: str): file_path = TEMP_ROOT / video_name / "clips" / filename if not file_path.exists(): raise HTTPException(status_code=404, detail="File not found") return FileResponse(file_path) @router.post("/load_casting") async def load_casting( faces_dir: str = Form("identities/faces"), voices_dir: str = Form("identities/voices"), db_dir: str = Form("chroma_db"), drop_collections: bool = Form(False), ): client = ensure_chroma(Path(db_dir)) n_faces = build_faces_index(Path(faces_dir), client, collection_name="index_faces", drop=drop_collections) n_voices = build_voices_index(Path(voices_dir), client, collection_name="index_voices", drop=drop_collections) return {"ok": True, "faces": n_faces, "voices": n_voices} @router.post("/finalize_casting") async def finalize_casting( payload: dict = Body(...), ): import shutil as _sh from pathlib import Path as _P video_name = payload.get("video_name") base_dir = payload.get("base_dir") characters = payload.get("characters", []) or [] voice_clusters = payload.get("voice_clusters", []) or [] if not video_name or not base_dir: raise HTTPException(status_code=400, detail="Missing video_name or base_dir") faces_out = IDENTITIES_ROOT / video_name / "faces" voices_out = IDENTITIES_ROOT / video_name / "voices" faces_out.mkdir(parents=True, exist_ok=True) voices_out.mkdir(parents=True, exist_ok=True) for ch in characters: ch_name = (ch.get("name") or "Unknown").strip() or "Unknown" ch_folder = ch.get("folder") kept = ch.get("kept_files") or [] if not ch_folder or not os.path.isdir(ch_folder): continue dst_dir = faces_out / ch_name dst_dir.mkdir(parents=True, exist_ok=True) for fname in kept: src = _P(ch_folder) / fname if src.exists() and src.is_file(): try: _sh.copy2(src, dst_dir / fname) except Exception: pass clips_dir = _P(base_dir) / "clips" for vc in voice_clusters: v_name = (vc.get("name") or f"SPEAKER_{int(vc.get('label',0)):02d}").strip() dst_dir = voices_out / v_name dst_dir.mkdir(parents=True, exist_ok=True) for wav in (vc.get("clips") or []): src = clips_dir / wav if src.exists() and src.is_file(): try: _sh.copy2(src, dst_dir / wav) except Exception: pass db_dir = IDENTITIES_ROOT / video_name / "chroma_db" try: client = ensure_chroma(db_dir) n_faces = build_faces_index( faces_out, client, collection_name="index_faces", deepface_model="Facenet512", drop=True, ) n_voices = build_voices_index( voices_out, client, collection_name="index_voices", drop=True, ) except Exception as e: print(f"[finalize_casting] WARN - No se pudieron construir índices ChromaDB: {e}") n_faces = 0 n_voices = 0 face_identities = sorted([p.name for p in faces_out.iterdir() if p.is_dir()]) if faces_out.exists() else [] voice_identities = sorted([p.name for p in voices_out.iterdir() if p.is_dir()]) if voices_out.exists() else [] casting_json = {"face_col": [], "voice_col": []} try: cfg = load_yaml("config.yaml") router_llm = LLMRouter(cfg) except Exception: router_llm = None # type: ignore try: if face_identities and router_llm is not None: factory = router_llm.client_factories.get("salamandra-vision") # type: ignore[attr-defined] if factory is not None: vclient = factory() gclient = getattr(vclient, "_client", None) else: gclient = None if gclient is not None: for identity in face_identities: id_dir = faces_out / identity if not id_dir.is_dir(): continue img_path = None for ext in (".jpg", ".jpeg", ".png", ".bmp", ".webp"): candidates = list(id_dir.glob(f"*{ext}")) if candidates: img_path = candidates[0] break if not img_path: continue try: out = gclient.predict(str(img_path), api_name="/face_image_embedding") emb = None if isinstance(out, list): if out and isinstance(out[0], (list, tuple, float, int)): if out and isinstance(out[0], (list, tuple)): emb = list(out[0]) else: emb = list(out) elif isinstance(out, dict) and "embedding" in out: emb = out.get("embedding") if not emb: continue casting_json["face_col"].append({ "nombre": identity, "embedding": emb, }) except Exception: continue except Exception: casting_json["face_col"] = [] try: if voice_identities and router_llm is not None: factory = router_llm.client_factories.get("whisper-catalan") # type: ignore[attr-defined] if factory is not None: aclient = factory() gclient = getattr(aclient, "_client", None) else: gclient = None if gclient is not None: for identity in voice_identities: id_dir = voices_out / identity if not id_dir.is_dir(): continue wav_files = sorted([ p for p in id_dir.iterdir() if p.is_file() and p.suffix.lower() in [".wav", ".flac", ".mp3"] ]) if not wav_files: continue wf = wav_files[0] try: out = gclient.predict(str(wf), api_name="/voice_embedding") emb = None if isinstance(out, list): emb = list(out) elif isinstance(out, dict) and "embedding" in out: emb = out.get("embedding") if not emb: continue casting_json["voice_col"].append({ "nombre": identity, "embedding": emb, }) except Exception: continue except Exception: casting_json["voice_col"] = [] return { "ok": True, "video_name": video_name, "faces_dir": str(faces_out), "voices_dir": str(voices_out), "db_dir": str(db_dir), "n_faces_embeddings": n_faces, "n_voices_embeddings": n_voices, "face_identities": face_identities, "voice_identities": voice_identities, "casting_json": casting_json, } @router.get("/files_scene/{video_name}/{scene_id}/{filename}") def serve_scene_file(video_name: str, scene_id: str, filename: str): file_path = TEMP_ROOT / video_name / "scenes" / scene_id / filename if not file_path.exists(): raise HTTPException(status_code=404, detail="File not found") return FileResponse(file_path) @router.post("/detect_scenes") async def detect_scenes( video: UploadFile = File(...), max_groups: int = Form(default=3), min_cluster_size: int = Form(default=3), scene_sensitivity: float = Form(default=0.5), frame_interval_sec: float = Form(default=0.5), ): import cv2 import numpy as np video_name = Path(video.filename).stem dst_video = VIDEOS_ROOT / f"{video_name}.mp4" with dst_video.open("wb") as f: shutil.copyfileobj(video.file, f) # Aquí reutilizarías tu lógica existente de detect_scenes desde api.py, # pero la omitimos por brevedad dentro de este contexto de refactor. # Mantén la implementación actual que ya tienes en engine/api.py. return {"scene_clusters": []} def process_video_job(job_id: str): # Reutiliza exactamente la implementación actual de process_video_job # que está hoy en engine/api.py. No la duplicamos completamente aquí # por longitud, pero el contenido debe moverse tal cual a esta función. from engine.api import process_video_job as _orig return _orig(job_id)