import os import io import re import ast import json import tempfile from pathlib import Path from typing import List, Dict, Counter # --- Third-Party Libraries --- import cv2 import torch from fastapi import APIRouter, UploadFile, File, Query, HTTPException from fastapi.responses import JSONResponse, StreamingResponse, FileResponse from transformers import AutoModelForCausalLM, AutoTokenizer from openai import OpenAI # --- Internal Modules / Project Imports --- from svision_client import ( extract_scenes, add_ocr_and_faces, keyframes_every_second_extraction, extract_descripcion_escena ) from asr_client import ( extract_audio_from_video, diarize_audio, transcribe_long_audio, transcribe_short_audio, identificar_veu ) from storage.common import validate_token from storage.files.file_manager import FileManager from storage.embeddings_routers import get_embeddings_json from main_process.main_router import ( get_initial_info_path, get_initial_srt_path ) EMBEDDINGS_ROOT = Path("/data/embeddings") MEDIA_ROOT = Path("/data/media") os.environ["CUDA_VISIBLE_DEVICES"] = "1" router = APIRouter(prefix="/moe", tags=["MoE Process"]) HF_TOKEN = os.getenv("HF_TOKEN") OPEN_AI_KEY = os.getenv("OPEN_AI_KEY") class DataHub: def __init__(self, video_analysis_json: str): print("DataHub inicializando con JSON:", video_analysis_json) self.video = json.loads(Path(video_analysis_json).read_text(encoding='utf-8')) class NState(dict): pass # ---------------- LLM utilizado para el free_narration ---------------- class SalamandraClient: def __init__(self, model_id="BSC-LT/salamandra-7b-instruct"): self.tokenizer = AutoTokenizer.from_pretrained(model_id) self.model = AutoModelForCausalLM.from_pretrained( model_id, device_map="auto", torch_dtype=torch.bfloat16 ) def chat(self, prompt) -> str: encodings = self.tokenizer( prompt, return_tensors="pt", padding=True, ) inputs = encodings["input_ids"].to(self.model.device) attention_mask = encodings["attention_mask"].to(self.model.device) outputs = self.model.generate( input_ids=inputs, attention_mask=attention_mask, pad_token_id=self.tokenizer.pad_token_id, max_new_tokens=300, # más grande si el texto es largo temperature=0.01, # control de creatividad top_k=50, # tokens más probables top_p=0.9 ) print(self.tokenizer.decode(outputs[0], skip_special_tokens=True)) print("Separación") # Cortar la parte del prompt generated_tokens = outputs[0][inputs.shape[1]:] return self.tokenizer.decode(generated_tokens, skip_special_tokens=True) # Esto aquí sólo se utiliza para la valoración: class GPT5Client: def __init__(self, api_key: str): key = api_key if not key: raise RuntimeError(f"Missing key in environment for GPT-5 client") self.cli = OpenAI(api_key=key) def chat(self, messages: list, model: str = 'gpt-4o-mini') -> str: print("GPT5Client.chat llamado con", len(messages), "mensajes") r = self.cli.chat.completions.create(model=model, messages=messages,temperature=0) content = r.choices[0].message.content.strip() return content def get_video_duration(video_path: str) -> float: """ Devuelve la duración total del vídeo en segundos. """ cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise RuntimeError(f"No s'ha pogut obrir el vídeo: {video_path}") fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 0 cap.release() duration_sec = total_frames / fps if total_frames > 0 else 0.0 return duration_sec def generate_srt_con_silencios(path_srt_original, path_srt_silences, video_path): # Obtenir duració total del vídeo duracio_total = get_video_duration(video_path) with open(path_srt_original, "r", encoding="utf-8-sig") as f: srt_text = f.read() blocks = srt_text.strip().split("\n\n") prev = 0 srt_entries = [] idx = 1 for block in blocks: lines = block.split("\n") time_range = lines[1] print(time_range) content = " ".join(line.strip() for line in lines[2:]) start_str, end_str = time_range.split(" --> ") start_sec = srt_time_to_seconds(start_str) end_sec = srt_time_to_seconds(end_str) # Afegir silenci si hi ha espai if prev < start_sec: srt_entries.append( f"{idx}\n{seconds_to_srt_time(prev)} --> {seconds_to_srt_time(start_sec)}\n[silenci]\n" ) idx += 1 # Afegir clip amb text srt_entries.append( f"{idx}\n{seconds_to_srt_time(start_sec)} --> {seconds_to_srt_time(end_sec)}\n{content}\n" ) idx += 1 prev = end_sec # Afegir últim bloc de silenci si la duració del vídeo és més llarga que l'últim clip if prev < duracio_total: srt_entries.append( f"{idx}\n{seconds_to_srt_time(prev)} --> {seconds_to_srt_time(duracio_total)}\n[silenci]\n" ) # Guardar a l'arxiu final with open(path_srt_silences, "w", encoding="utf-8") as f: f.write("\n".join(srt_entries)) def srt_time_to_seconds(s): h, m, rest = s.split(":") s, ms = rest.split(",") return int(h)*3600 + int(m)*60 + float(s) + int(ms)/1000 def seconds_to_srt_time(seconds): h = int(seconds // 3600) m = int((seconds % 3600) // 60) s = int(seconds % 60) ms = int((seconds - int(seconds)) * 1000) return f"{h:02}:{m:02}:{s:02},{ms:03}" class Add_AD: def __init__(self, data: DataHub): self.data = data def __call__(self, state: NState, srt_modified_silence, srt_modified_silence_con_ad) -> NState: print("Add_Ad.__call__ iniciado") # Leer SRT original with open(srt_modified_silence, "r", encoding="utf-8") as f: srt_text = f.read() # Frames del video frames = self.data.video.get('info_escenas', {}) # Parsear SRT a bloques srt_blocks = [] srt_blocks_modified=[] pattern = re.compile( r"(\d+)\s+(\d{2}:\d{2}:\d{2},\d{3}) --> (\d{2}:\d{2}:\d{2},\d{3})\s+(.*?)(?=\n\d+\n|\Z)", re.S ) for match in pattern.finditer(srt_text): index = int(match.group(1)) start = srt_time_to_seconds(match.group(2)) end = srt_time_to_seconds(match.group(3)) text = match.group(4).strip() srt_blocks.append({ "index": index, "start": start, "end": end, "text": text }) index=1 # Procesar cada bloque for block in srt_blocks: if "[silenci]" in block["text"]: start_block = block["start"] end_block = block["end"] for frame in frames: if frame.get("start")<=start_block and frame.get("end")>=end_block: srt_blocks_modified.append({ "index":index, "start": start_block, "end": end_block, "text": f"(AD): {frame.get('descripcion', '')}" }) index+=1 elif start_block=end_block: srt_blocks_modified.append({ "index":index, "start": start_block, "end": end_block, "text": f"(AD): {frame.get('descripcion', '')}" }) start_block=end_block index+=1 else: srt_blocks_modified.append({ "index": index, "start": block["start"], "end": block["end"], "text": block["text"] }) index+=1 # Reconstruir el SRT final srt_final = "" for block in srt_blocks_modified: start_tc = seconds_to_srt_time(block["start"]) end_tc = seconds_to_srt_time(block["end"]) srt_final += f"{block['index']}\n{start_tc} --> {end_tc}\n{block['text']}\n\n" # Guardar en un nuevo archivo with open(srt_modified_silence_con_ad, "w", encoding="utf-8") as f: f.write(srt_final) # Actualizar estado state['srt_con_audiodescripcion'] = srt_final return state class Free_Narration: def __init__(self, data: DataHub): self.data = data def __call__(self, state: NState, srt_original_silence_con_ad, story_path) -> NState: print("Free_Narration.__call__ iniciado") descriptions=[] frames = self.data.video.get('info_escenas', []) for frame in frames: descriptions.append(frame["descripcion"]) full_transcription = self.data.video.get('full_transcription', []) with open(srt_original_silence_con_ad, "r", encoding="utf-8-sig") as f: diarization_text = f.read() prompt = f""" La teva tasca és elaborar una descripció lliure d'un vídeo d'unes 100 paraules a partir de la informació següent: 1.) A partir del vídeo s'han extret captures de pantalla en els moments en què es canviava d'escena i tens una descripció de cadascuna d'elles a: {descriptions} 2.) La transcripció completa del vídeo és: {full_transcription} Per tant, a partir de tota aquesta informació, genera'm la història completa, intentant incloure els personatges identificats i la trama general de la història. """ out = state['llm_Salamandra'](prompt) print(out) with open(story_path, "w", encoding="utf-8-sig") as f: f.write(out) state['free_narration'] = out return state class Valoracion_Final: def __call__(self, state, srt_final, csv_evaluacion): print("Valoracion_Final.__call__ iniciat") # Llegeix el contingut del fitxer SRT with open(srt_final, "r", encoding="utf-8-sig") as f: srt_text = f.read().strip() # Defineix el prompt principal prompt = f""" Ets un avaluador expert en accessibilitat audiovisual segons la NORMA UNE 153020. Analitza el següent fitxer SRT i avalua'l segons les característiques indicades. Per a cada característica, assigna una puntuació del 0 al 7 i una justificació breu i específica, seguint el format establert. SRT a analitzar: {srt_text} Format de sortida: Caracteristica,Valoracio (0-7),Justificacio Les característiques a avaluar són: - Precisió Descriptiva: Avalua si la descripció visual dels plans, accions i context és exacta i coherent amb el contingut esperat. - Sincronització Temporal: Avalua si el text apareix i desapareix al moment adequat segons el contingut visual o sonor. - Claredat i Concisió: Analitza si el llenguatge és clar, natural i sense redundàncies. - Inclusió de Diàleg/So: Determina si es recullen correctament els diàlegs, sons i elements musicals rellevants. - Contextualització: Avalua si el context (ambient, espai, personatges, situacions) està ben representat. - Flux i Ritme de la Narració: Avalua la fluïdesa de la lectura i la coherència temporal entre segments. Respon només amb la taula CSV, sense cap text addicional. """ # Missatges estructurats per al model (rols system + user) messages = [ {"role": "system", "content": "Ets un assistent expert en accessibilitat audiovisual i normativa UNE 153020."}, {"role": "user", "content": prompt} ] # Crida al model (s’assumeix que state['llm_GPT'] és una funció que processa missatges) out = state['llm_GPT'](messages) out_text = str(out).strip() # Escriu el resultat CSV with open(csv_evaluacion, "w", encoding="utf-8-sig") as f: f.write(out_text) return state @router.post("/generate_moe_result", tags=["MoE Process"]) async def generate_salamadra_result( sha1: str, token: str = Query(..., description="Token required for authorization") ): """ Generate all MoE output files (final SRT, free narration, and evaluation CSV) for a processed video identified by its SHA1 hash. This endpoint orchestrates the full Salamandra processing pipeline: - Validates the access token. - Locates the processed video and its associated metadata. - Generates an intermediate SRT file enriched with silence markers. - Runs the Salamandra logic to produce: * A finalized SRT subtitle file (`result.srt`) * A free-narration text file (`free_narration.txt`) * An evaluation CSV (`evaluation.csv`) - Ensures the expected directory structure exists, creating folders if necessary. - Uses both GPT-based and Salamandra-based LLMs to generate narrative and evaluation content. Args: sha1 (str): The SHA1 hash that identifies the media processing workspace. token (str): Authorization token required to execute Salamandra operations. Raises: HTTPException: - 404 if the SHA1 folder does not exist. - 404 if the `clip` folder is missing. - 404 if no MP4 file is found inside the clip folder. Processing Steps: 1. Validates that all required folders exist (`sha1`, `clip`, `result/Salamandra`). 2. Retrieves the input video and initial metadata (original SRT, info JSON). 3. Creates temporary enriched SRT with silence detection. 4. Runs Add_AD, Free_Narration, and Valoracion_Final modules. 5. Generates the final Salamandra output files: - result.srt - free_narration.txt - evaluation.csv Returns: dict: A JSON response indicating successful generation: { "status": "ok", "message": "Salamandra SRT, free_narration and CSV evaluation generated" } """ validate_token(token) # Resolve directories file_manager = FileManager(MEDIA_ROOT) sha1_folder = MEDIA_ROOT / sha1 clip_folder = sha1_folder / "clip" if not sha1_folder.exists() or not sha1_folder.is_dir(): raise HTTPException(status_code=404, detail="SHA1 folder not found") if not clip_folder.exists() or not clip_folder.is_dir(): raise HTTPException(status_code=404, detail="Clip folder not found") # Locate video file mp4_files = list(clip_folder.glob("*.mp4")) if not mp4_files: raise HTTPException(status_code=404, detail="No MP4 files found") video_path = clip_folder / mp4_files[0] # Get initial srt srt_original = get_initial_srt_path(sha1) # Get initial info json informacion_json = get_initial_info_path(sha1) # Generate srt final path file_manager = FileManager(MEDIA_ROOT) sha1_folder = MEDIA_ROOT / sha1 result_folder = sha1_folder / "result" result_folder.mkdir(parents=True, exist_ok=True) salamdra_folder = result_folder / "Salamandra" salamdra_folder.mkdir(parents=True, exist_ok=True) srt_final = salamdra_folder / "result.srt" # Generate free_narration_salamandra final path file_manager = FileManager(MEDIA_ROOT) sha1_folder = MEDIA_ROOT / sha1 result_folder = sha1_folder / "result" result_folder.mkdir(parents=True, exist_ok=True) salamdra_folder = result_folder / "Salamandra" salamdra_folder.mkdir(parents=True, exist_ok=True) free_narration_salamandra = salamdra_folder / "free_narration.txt" # Generate evaluation csv path file_manager = FileManager(MEDIA_ROOT) sha1_folder = MEDIA_ROOT / sha1 result_folder = sha1_folder / "result" result_folder.mkdir(parents=True, exist_ok=True) salamdra_folder = result_folder / "Salamandra" salamdra_folder.mkdir(parents=True, exist_ok=True) csv_evaluacion = salamdra_folder / "evaluation.csv" # Temp srt name srt_name = sha1 + "_srt" tmp = tempfile.NamedTemporaryFile(mode="w+", suffix=".srt", prefix=srt_name + "_", delete=False) generate_srt_con_silencios(srt_original, tmp.name, video_path) datahub=DataHub(informacion_json) add_ad = Add_AD(datahub) free_narration = Free_Narration(datahub) valoracion_final = Valoracion_Final() GPTclient = GPT5Client(api_key=OPEN_AI_KEY) salamandraclient = SalamandraClient() state = { "llm_GPT": GPTclient.chat, "llm_Salamandra": salamandraclient.chat } state = add_ad(state, tmp.name, srt_final) state= free_narration(state, srt_final, free_narration_salamandra) state = valoracion_final(state, srt_final, csv_evaluacion) tmp.close() return {"status": "ok", "message": "Salamandra SRT, free_narration and CSV evaluation generated"} @router.get("/download_moe_srt", tags=["MoE Process"]) def download_salamadra_srt( sha1: str, token: str = Query(..., description="Token required for authorization") ): """ Download the final SRT subtitle file generated by the Salamandra processing pipeline. This endpoint retrieves the file `result.srt` associated with a specific SHA1 hash. It validates the authorization token, checks the expected folder structure, and returns the subtitle file if it exists. Args: sha1 (str): The SHA1 identifier corresponding to the processed media folder. token (str): Authorization token required to access the resource. Raises: HTTPException: - 404 if any of the required directories (SHA1 folder, result folder, Salamandra folder) are missing. - 404 if the `result.srt` file is not found. Returns: FileResponse: The SRT file (`result.srt`) with media type `text/srt`. """ validate_token(token) file_manager = FileManager(MEDIA_ROOT) sha1_folder = MEDIA_ROOT / sha1 result_folder = sha1_folder / "result" result_folder.mkdir(parents=True, exist_ok=True) salamandra_folder = result_folder / "Salamandra" salamandra_folder.mkdir(parents=True, exist_ok=True) srt_final = salamandra_folder / "result.srt" if not sha1_folder.exists() or not sha1_folder.is_dir(): raise HTTPException(status_code=404, detail="SHA1 folder not found") if not result_folder.exists() or not result_folder.is_dir(): raise HTTPException(status_code=404, detail="result folder not found") if not salamandra_folder.exists() or not salamandra_folder.is_dir(): raise HTTPException(status_code=404, detail="Salamandra folder not found") if not srt_final.exists() or not srt_final.is_file(): raise HTTPException(status_code=404, detail="result.srt SRT not found") return FileResponse( path=srt_final, media_type="text/srt", filename="result.srt" ) @router.get("/download_moe_free_narration", tags=["MoE Process"]) def download_salamadra_free_narration( sha1: str, token: str = Query(..., description="Token required for authorization") ): """ Download the free narration text file generated by the Salamandra process. This endpoint retrieves `free_narration.txt` from the Salamandra result directory associated with a specific SHA1 hash. The token is validated before accessing the file system. If the file or required folders do not exist, appropriate HTTP errors are returned. Args: sha1 (str): The SHA1 identifier for the processed media folder. token (str): Authorization token required to access the file. Raises: HTTPException: - 404 if the SHA1 folder, result folder, or Salamandra folder is missing. - 404 if `free_narration.txt` is not found. Returns: FileResponse: The free narration text file with media type `text/srt`. """ validate_token(token) file_manager = FileManager(MEDIA_ROOT) sha1_folder = MEDIA_ROOT / sha1 result_folder = sha1_folder / "result" result_folder.mkdir(parents=True, exist_ok=True) salamandra_folder = result_folder / "Salamandra" salamandra_folder.mkdir(parents=True, exist_ok=True) free_narration_salamandra = salamandra_folder / "free_narration.txt" if not sha1_folder.exists() or not sha1_folder.is_dir(): raise HTTPException(status_code=404, detail="SHA1 folder not found") if not result_folder.exists() or not result_folder.is_dir(): raise HTTPException(status_code=404, detail="result folder not found") if not salamandra_folder.exists() or not salamandra_folder.is_dir(): raise HTTPException(status_code=404, detail="Salamandra folder not found") if not free_narration_salamandra.exists() or not free_narration_salamandra.is_file(): raise HTTPException(status_code=404, detail="free_narration.txt not found") return FileResponse( path=free_narration_salamandra, media_type="text/srt", filename="free_narration.tx" ) @router.get("/download_moe_csv_evaluation", tags=["MoE Process"]) def download_salamadra_csv_evaluation( sha1: str, token: str = Query(..., description="Token required for authorization") ): """ Download the evaluation CSV generated by the Salamandra processing workflow. This endpoint returns the `evaluation.csv` file corresponding to the given SHA1 hash. It performs token validation and ensures that the folder structure and file exist. If any element is missing, a 404 HTTP error is raised. Args: sha1 (str): The SHA1 identifier representing the processed media directory. token (str): Authorization token required for file retrieval. Raises: HTTPException: - 404 if the SHA1 folder, result folder, or Salamandra folder does not exist. - 404 if the `evaluation.csv` file is missing. Returns: FileResponse: The evaluation CSV file with media type `text/srt`. """ validate_token(token) file_manager = FileManager(MEDIA_ROOT) sha1_folder = MEDIA_ROOT / sha1 result_folder = sha1_folder / "result" result_folder.mkdir(parents=True, exist_ok=True) salamandra_folder = result_folder / "Salamandra" salamandra_folder.mkdir(parents=True, exist_ok=True) csv_evaluacion = salamandra_folder / "evaluation.csv" if not sha1_folder.exists() or not sha1_folder.is_dir(): raise HTTPException(status_code=404, detail="SHA1 folder not found") if not result_folder.exists() or not result_folder.is_dir(): raise HTTPException(status_code=404, detail="result folder not found") if not salamandra_folder.exists() or not salamandra_folder.is_dir(): raise HTTPException(status_code=404, detail="Salamandra folder not found") if not csv_evaluacion.exists() or not csv_evaluacion.is_file(): raise HTTPException(status_code=404, detail="evaluation.csv CSV not found") return FileResponse( path=csv_evaluacion, media_type="text/srt", filename="evaluation.csv" )