| import os |
| import io |
| import re |
| import ast |
| import json |
| import tempfile |
| from pathlib import Path |
| from typing import List, Dict, Counter |
|
|
| |
| import cv2 |
| import torch |
| from fastapi import APIRouter, UploadFile, File, Query, HTTPException |
| from fastapi.responses import JSONResponse, StreamingResponse, FileResponse |
| from transformers import AutoModelForCausalLM, AutoTokenizer |
| from openai import OpenAI |
|
|
| |
| from svision_client import ( |
| extract_scenes, |
| add_ocr_and_faces, |
| keyframes_every_second_extraction, |
| extract_descripcion_escena |
| ) |
|
|
| from asr_client import ( |
| extract_audio_from_video, |
| diarize_audio, |
| transcribe_long_audio, |
| transcribe_short_audio, |
| identificar_veu |
| ) |
|
|
| from storage.common import validate_token |
| from storage.files.file_manager import FileManager |
| from storage.embeddings_routers import get_embeddings_json |
|
|
| from main_process.main_router import ( |
| get_initial_info_path, |
| get_initial_srt_path |
| ) |
|
|
| EMBEDDINGS_ROOT = Path("/data/embeddings") |
| MEDIA_ROOT = Path("/data/media") |
| os.environ["CUDA_VISIBLE_DEVICES"] = "1" |
| router = APIRouter(prefix="/salamandra", tags=["Salamandra Process"]) |
| HF_TOKEN = os.getenv("HF_TOKEN") |
| OPEN_AI_KEY = os.getenv("OPEN_AI_KEY") |
|
|
| class DataHub: |
| def __init__(self, video_analysis_json: str): |
| print("DataHub inicializando con JSON:", video_analysis_json) |
| self.video = json.loads(Path(video_analysis_json).read_text(encoding='utf-8')) |
|
|
| class NState(dict): |
| pass |
|
|
| |
| class SalamandraClient: |
| def __init__(self, model_id="BSC-LT/salamandra-7b-instruct"): |
| self.tokenizer = AutoTokenizer.from_pretrained(model_id) |
| self.model = AutoModelForCausalLM.from_pretrained( |
| model_id, |
| device_map="auto", |
| torch_dtype=torch.bfloat16 |
| ) |
|
|
| def chat(self, prompt) -> str: |
| encodings = self.tokenizer( |
| prompt, |
| return_tensors="pt", |
| padding=True, |
| ) |
|
|
| inputs = encodings["input_ids"].to(self.model.device) |
| attention_mask = encodings["attention_mask"].to(self.model.device) |
|
|
| outputs = self.model.generate( |
| input_ids=inputs, |
| attention_mask=attention_mask, |
| pad_token_id=self.tokenizer.pad_token_id, |
| max_new_tokens=300, |
| temperature=0.01, |
| top_k=50, |
| top_p=0.9 |
| ) |
| print(self.tokenizer.decode(outputs[0], skip_special_tokens=True)) |
| print("Separaci贸n") |
| |
| generated_tokens = outputs[0][inputs.shape[1]:] |
| return self.tokenizer.decode(generated_tokens, skip_special_tokens=True) |
|
|
| |
| class GPT5Client: |
| def __init__(self, api_key: str): |
| key = api_key |
| if not key: |
| raise RuntimeError(f"Missing key in environment for GPT-5 client") |
| self.cli = OpenAI(api_key=key) |
|
|
| def chat(self, messages: list, model: str = 'gpt-4o-mini') -> str: |
| print("GPT5Client.chat llamado con", len(messages), "mensajes") |
| r = self.cli.chat.completions.create(model=model, messages=messages,temperature=0) |
| content = r.choices[0].message.content.strip() |
| return content |
|
|
|
|
| def get_video_duration(video_path: str) -> float: |
| """ |
| Devuelve la duraci贸n total del v铆deo en segundos. |
| """ |
| cap = cv2.VideoCapture(video_path) |
| if not cap.isOpened(): |
| raise RuntimeError(f"No s'ha pogut obrir el v铆deo: {video_path}") |
|
|
| fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 0 |
| cap.release() |
|
|
| duration_sec = total_frames / fps if total_frames > 0 else 0.0 |
| return duration_sec |
|
|
| def generate_srt_con_silencios(path_srt_original, path_srt_silences, video_path): |
| |
| duracio_total = get_video_duration(video_path) |
|
|
| with open(path_srt_original, "r", encoding="utf-8-sig") as f: |
| srt_text = f.read() |
| |
| blocks = srt_text.strip().split("\n\n") |
| prev = 0 |
| srt_entries = [] |
| idx = 1 |
|
|
| for block in blocks: |
| lines = block.split("\n") |
| time_range = lines[1] |
| print(time_range) |
| content = " ".join(line.strip() for line in lines[2:]) |
| |
| start_str, end_str = time_range.split(" --> ") |
| start_sec = srt_time_to_seconds(start_str) |
| end_sec = srt_time_to_seconds(end_str) |
|
|
| |
| if prev < start_sec: |
| srt_entries.append( |
| f"{idx}\n{seconds_to_srt_time(prev)} --> {seconds_to_srt_time(start_sec)}\n[silenci]\n" |
| ) |
| idx += 1 |
|
|
| |
| srt_entries.append( |
| f"{idx}\n{seconds_to_srt_time(start_sec)} --> {seconds_to_srt_time(end_sec)}\n{content}\n" |
| ) |
| idx += 1 |
| prev = end_sec |
|
|
| |
| if prev < duracio_total: |
| srt_entries.append( |
| f"{idx}\n{seconds_to_srt_time(prev)} --> {seconds_to_srt_time(duracio_total)}\n[silenci]\n" |
| ) |
|
|
| |
| with open(path_srt_silences, "w", encoding="utf-8") as f: |
| f.write("\n".join(srt_entries)) |
|
|
| def srt_time_to_seconds(s): |
| h, m, rest = s.split(":") |
| s, ms = rest.split(",") |
| return int(h)*3600 + int(m)*60 + float(s) + int(ms)/1000 |
|
|
| def seconds_to_srt_time(seconds): |
| h = int(seconds // 3600) |
| m = int((seconds % 3600) // 60) |
| s = int(seconds % 60) |
| ms = int((seconds - int(seconds)) * 1000) |
| return f"{h:02}:{m:02}:{s:02},{ms:03}" |
|
|
| class Add_AD: |
| def __init__(self, data: DataHub): |
| self.data = data |
|
|
| def __call__(self, state: NState, srt_modified_silence, srt_modified_silence_con_ad) -> NState: |
| print("Add_Ad.__call__ iniciado") |
|
|
| |
| with open(srt_modified_silence, "r", encoding="utf-8") as f: |
| srt_text = f.read() |
|
|
| |
| frames = self.data.video.get('info_escenas', {}) |
|
|
| |
| srt_blocks = [] |
| srt_blocks_modified=[] |
| pattern = re.compile( |
| r"(\d+)\s+(\d{2}:\d{2}:\d{2},\d{3}) --> (\d{2}:\d{2}:\d{2},\d{3})\s+(.*?)(?=\n\d+\n|\Z)", |
| re.S |
| ) |
|
|
| for match in pattern.finditer(srt_text): |
| index = int(match.group(1)) |
| start = srt_time_to_seconds(match.group(2)) |
| end = srt_time_to_seconds(match.group(3)) |
| text = match.group(4).strip() |
| srt_blocks.append({ |
| "index": index, |
| "start": start, |
| "end": end, |
| "text": text |
| }) |
|
|
| index=1 |
| |
| for block in srt_blocks: |
| if "[silenci]" in block["text"]: |
| start_block = block["start"] |
| end_block = block["end"] |
|
|
| for frame in frames: |
| if frame.get("start")<=start_block and frame.get("end")>=end_block: |
| srt_blocks_modified.append({ |
| "index":index, |
| "start": start_block, |
| "end": end_block, |
| "text": f"(AD): {frame.get('descripcion', '')}" |
| }) |
| index+=1 |
|
|
| elif start_block<frame.get("end")<end_block: |
| srt_blocks_modified.append({ |
| "index":index, |
| "start": start_block, |
| "end": frame.get("end"), |
| "text": f"(AD): {frame.get('descripcion', '')}" |
| }) |
| start_block=frame.get("end") |
| index+=1 |
| |
| elif start_block==frame.get("start") and start_block<end_block and frame.get("end")>=end_block: |
| srt_blocks_modified.append({ |
| "index":index, |
| "start": start_block, |
| "end": end_block, |
| "text": f"(AD): {frame.get('descripcion', '')}" |
| }) |
| start_block=end_block |
| index+=1 |
|
|
| else: |
| srt_blocks_modified.append({ |
| "index": index, |
| "start": block["start"], |
| "end": block["end"], |
| "text": block["text"] |
| }) |
| index+=1 |
|
|
| |
| srt_final = "" |
|
|
| for block in srt_blocks_modified: |
| start_tc = seconds_to_srt_time(block["start"]) |
| end_tc = seconds_to_srt_time(block["end"]) |
| srt_final += f"{block['index']}\n{start_tc} --> {end_tc}\n{block['text']}\n\n" |
|
|
| |
| with open(srt_modified_silence_con_ad, "w", encoding="utf-8") as f: |
| f.write(srt_final) |
|
|
| |
| state['srt_con_audiodescripcion'] = srt_final |
| return state |
|
|
| class Free_Narration: |
| def __init__(self, data: DataHub): |
| self.data = data |
|
|
| def __call__(self, state: NState, srt_original_silence_con_ad, story_path) -> NState: |
| print("Free_Narration.__call__ iniciado") |
|
|
| descriptions=[] |
| frames = self.data.video.get('info_escenas', []) |
| for frame in frames: |
| descriptions.append(frame["descripcion"]) |
|
|
| full_transcription = self.data.video.get('full_transcription', []) |
|
|
| with open(srt_original_silence_con_ad, "r", encoding="utf-8-sig") as f: |
| diarization_text = f.read() |
| |
| prompt = f""" |
| La teva tasca 茅s elaborar una descripci贸 lliure d'un v铆deo d'unes 100 paraules a partir de la informaci贸 seg眉ent: |
| 1.) A partir del v铆deo s'han extret captures de pantalla en els moments en qu猫 es canviava d'escena i tens una descripci贸 de cadascuna d'elles a: {descriptions} |
| 2.) La transcripci贸 completa del v铆deo 茅s: {full_transcription} |
| Per tant, a partir de tota aquesta informaci贸, genera'm la hist貌ria completa, intentant incloure els personatges identificats i la trama general de la hist貌ria. |
| """ |
| out = state['llm_Salamandra'](prompt) |
| print(out) |
|
|
| with open(story_path, "w", encoding="utf-8-sig") as f: |
| f.write(out) |
|
|
| state['free_narration'] = out |
|
|
| return state |
| |
| class Valoracion_Final: |
| def __call__(self, state, srt_final, csv_evaluacion): |
| print("Valoracion_Final.__call__ iniciat") |
|
|
| |
| with open(srt_final, "r", encoding="utf-8-sig") as f: |
| srt_text = f.read().strip() |
|
|
| |
| prompt = f""" |
| Ets un avaluador expert en accessibilitat audiovisual segons la NORMA UNE 153020. |
| |
| Analitza el seg眉ent fitxer SRT i avalua'l segons les caracter铆stiques indicades. |
| Per a cada caracter铆stica, assigna una puntuaci贸 del 0 al 7 i una justificaci贸 breu i espec铆fica, |
| seguint el format establert. |
| |
| SRT a analitzar: |
| {srt_text} |
| |
| Format de sortida: |
| Caracteristica,Valoracio (0-7),Justificacio |
| |
| Les caracter铆stiques a avaluar s贸n: |
| - Precisi贸 Descriptiva: Avalua si la descripci贸 visual dels plans, accions i context 茅s exacta i coherent amb el contingut esperat. |
| - Sincronitzaci贸 Temporal: Avalua si el text apareix i desapareix al moment adequat segons el contingut visual o sonor. |
| - Claredat i Concisi贸: Analitza si el llenguatge 茅s clar, natural i sense redund脿ncies. |
| - Inclusi贸 de Di脿leg/So: Determina si es recullen correctament els di脿legs, sons i elements musicals rellevants. |
| - Contextualitzaci贸: Avalua si el context (ambient, espai, personatges, situacions) est脿 ben representat. |
| - Flux i Ritme de la Narraci贸: Avalua la flu茂desa de la lectura i la coher猫ncia temporal entre segments. |
| |
| Respon nom茅s amb la taula CSV, sense cap text addicional. |
| """ |
|
|
| |
| messages = [ |
| {"role": "system", "content": "Ets un assistent expert en accessibilitat audiovisual i normativa UNE 153020."}, |
| {"role": "user", "content": prompt} |
| ] |
|
|
| |
| out = state['llm_GPT'](messages) |
|
|
| out_text = str(out).strip() |
|
|
| |
| with open(csv_evaluacion, "w", encoding="utf-8-sig") as f: |
| f.write(out_text) |
|
|
| return state |
|
|
| @router.post("/generate_salamadra_result", tags=["Salamandra Process"]) |
| async def generate_salamadra_result( |
| sha1: str, |
| token: str = Query(..., description="Token required for authorization") |
| ): |
| """ |
| Generate all Salamandra output files (final SRT, free narration, and evaluation CSV) |
| for a processed video identified by its SHA1 hash. |
| |
| This endpoint orchestrates the full Salamandra processing pipeline: |
| - Validates the access token. |
| - Locates the processed video and its associated metadata. |
| - Generates an intermediate SRT file enriched with silence markers. |
| - Runs the Salamandra logic to produce: |
| * A finalized SRT subtitle file (`result.srt`) |
| * A free-narration text file (`free_narration.txt`) |
| * An evaluation CSV (`evaluation.csv`) |
| - Ensures the expected directory structure exists, creating folders if necessary. |
| - Uses both GPT-based and Salamandra-based LLMs to generate narrative and evaluation content. |
| |
| Args: |
| sha1 (str): The SHA1 hash that identifies the media processing workspace. |
| token (str): Authorization token required to execute Salamandra operations. |
| |
| Raises: |
| HTTPException: |
| - 404 if the SHA1 folder does not exist. |
| - 404 if the `clip` folder is missing. |
| - 404 if no MP4 file is found inside the clip folder. |
| |
| Processing Steps: |
| 1. Validates that all required folders exist (`sha1`, `clip`, `result/Salamandra`). |
| 2. Retrieves the input video and initial metadata (original SRT, info JSON). |
| 3. Creates temporary enriched SRT with silence detection. |
| 4. Runs Add_AD, Free_Narration, and Valoracion_Final modules. |
| 5. Generates the final Salamandra output files: |
| - result.srt |
| - free_narration.txt |
| - evaluation.csv |
| |
| Returns: |
| dict: A JSON response indicating successful generation: |
| { |
| "status": "ok", |
| "message": "Salamandra SRT, free_narration and CSV evaluation generated" |
| } |
| """ |
| validate_token(token) |
|
|
| |
| file_manager = FileManager(MEDIA_ROOT) |
| sha1_folder = MEDIA_ROOT / sha1 |
| clip_folder = sha1_folder / "clip" |
|
|
| if not sha1_folder.exists() or not sha1_folder.is_dir(): |
| raise HTTPException(status_code=404, detail="SHA1 folder not found") |
|
|
| if not clip_folder.exists() or not clip_folder.is_dir(): |
| raise HTTPException(status_code=404, detail="Clip folder not found") |
|
|
| |
| mp4_files = list(clip_folder.glob("*.mp4")) |
| if not mp4_files: |
| raise HTTPException(status_code=404, detail="No MP4 files found") |
| video_path = clip_folder / mp4_files[0] |
|
|
| |
| srt_original = get_initial_srt_path(sha1) |
|
|
| |
| informacion_json = get_initial_info_path(sha1) |
|
|
| |
| file_manager = FileManager(MEDIA_ROOT) |
| sha1_folder = MEDIA_ROOT / sha1 |
| result_folder = sha1_folder / "result" |
| result_folder.mkdir(parents=True, exist_ok=True) |
| salamdra_folder = result_folder / "Salamandra" |
| salamdra_folder.mkdir(parents=True, exist_ok=True) |
| srt_final = salamdra_folder / "result.srt" |
|
|
| |
| file_manager = FileManager(MEDIA_ROOT) |
| sha1_folder = MEDIA_ROOT / sha1 |
| result_folder = sha1_folder / "result" |
| result_folder.mkdir(parents=True, exist_ok=True) |
| salamdra_folder = result_folder / "Salamandra" |
| salamdra_folder.mkdir(parents=True, exist_ok=True) |
| free_narration_salamandra = salamdra_folder / "free_narration.txt" |
|
|
| |
| file_manager = FileManager(MEDIA_ROOT) |
| sha1_folder = MEDIA_ROOT / sha1 |
| result_folder = sha1_folder / "result" |
| result_folder.mkdir(parents=True, exist_ok=True) |
| salamdra_folder = result_folder / "Salamandra" |
| salamdra_folder.mkdir(parents=True, exist_ok=True) |
| csv_evaluacion = salamdra_folder / "evaluation.csv" |
| |
| |
| srt_name = sha1 + "_srt" |
| tmp = tempfile.NamedTemporaryFile(mode="w+", suffix=".srt", prefix=srt_name + "_", delete=False) |
| |
| generate_srt_con_silencios(srt_original, tmp.name, video_path) |
| |
| datahub=DataHub(informacion_json) |
| add_ad = Add_AD(datahub) |
| free_narration = Free_Narration(datahub) |
| valoracion_final = Valoracion_Final() |
| |
| GPTclient = GPT5Client(api_key=OPEN_AI_KEY) |
| salamandraclient = SalamandraClient() |
| |
| state = { |
| "llm_GPT": GPTclient.chat, |
| "llm_Salamandra": salamandraclient.chat |
| } |
| |
| state = add_ad(state, tmp.name, srt_final) |
| state= free_narration(state, srt_final, free_narration_salamandra) |
| state = valoracion_final(state, srt_final, csv_evaluacion) |
| tmp.close() |
|
|
| return {"status": "ok", "message": "Salamandra SRT, free_narration and CSV evaluation generated"} |
|
|
| @router.get("/download_salamadra_srt", tags=["Salamandra Process"]) |
| def download_salamadra_srt( |
| sha1: str, |
| token: str = Query(..., description="Token required for authorization") |
| ): |
| """ |
| Download the final SRT subtitle file generated by the Salamandra processing pipeline. |
| |
| This endpoint retrieves the file `result.srt` associated with a specific SHA1 hash. |
| It validates the authorization token, checks the expected folder structure, and |
| returns the subtitle file if it exists. |
| |
| Args: |
| sha1 (str): The SHA1 identifier corresponding to the processed media folder. |
| token (str): Authorization token required to access the resource. |
| |
| Raises: |
| HTTPException: |
| - 404 if any of the required directories (SHA1 folder, result folder, Salamandra folder) |
| are missing. |
| - 404 if the `result.srt` file is not found. |
| |
| Returns: |
| FileResponse: The SRT file (`result.srt`) with media type `text/srt`. |
| """ |
| validate_token(token) |
| |
| file_manager = FileManager(MEDIA_ROOT) |
| sha1_folder = MEDIA_ROOT / sha1 |
| result_folder = sha1_folder / "result" |
| result_folder.mkdir(parents=True, exist_ok=True) |
| salamandra_folder = result_folder / "Salamandra" |
| salamandra_folder.mkdir(parents=True, exist_ok=True) |
| srt_final = salamandra_folder / "result.srt" |
|
|
| if not sha1_folder.exists() or not sha1_folder.is_dir(): |
| raise HTTPException(status_code=404, detail="SHA1 folder not found") |
| if not result_folder.exists() or not result_folder.is_dir(): |
| raise HTTPException(status_code=404, detail="result folder not found") |
| if not salamandra_folder.exists() or not salamandra_folder.is_dir(): |
| raise HTTPException(status_code=404, detail="Salamandra folder not found") |
| if not srt_final.exists() or not srt_final.is_file(): |
| raise HTTPException(status_code=404, detail="result.srt SRT not found") |
|
|
| return FileResponse( |
| path=srt_final, |
| media_type="text/srt", |
| filename="result.srt" |
| ) |
|
|
| @router.get("/download_salamadra_free_narration", tags=["Salamandra Process"]) |
| def download_salamadra_free_narration( |
| sha1: str, |
| token: str = Query(..., description="Token required for authorization") |
| ): |
| """ |
| Download the free narration text file generated by the Salamandra process. |
| |
| This endpoint retrieves `free_narration.txt` from the Salamandra result directory |
| associated with a specific SHA1 hash. The token is validated before accessing the |
| file system. If the file or required folders do not exist, appropriate HTTP |
| errors are returned. |
| |
| Args: |
| sha1 (str): The SHA1 identifier for the processed media folder. |
| token (str): Authorization token required to access the file. |
| |
| Raises: |
| HTTPException: |
| - 404 if the SHA1 folder, result folder, or Salamandra folder is missing. |
| - 404 if `free_narration.txt` is not found. |
| |
| Returns: |
| FileResponse: The free narration text file with media type `text/srt`. |
| """ |
| validate_token(token) |
| |
| file_manager = FileManager(MEDIA_ROOT) |
| sha1_folder = MEDIA_ROOT / sha1 |
| result_folder = sha1_folder / "result" |
| result_folder.mkdir(parents=True, exist_ok=True) |
| salamandra_folder = result_folder / "Salamandra" |
| salamandra_folder.mkdir(parents=True, exist_ok=True) |
| free_narration_salamandra = salamandra_folder / "free_narration.txt" |
|
|
| if not sha1_folder.exists() or not sha1_folder.is_dir(): |
| raise HTTPException(status_code=404, detail="SHA1 folder not found") |
| if not result_folder.exists() or not result_folder.is_dir(): |
| raise HTTPException(status_code=404, detail="result folder not found") |
| if not salamandra_folder.exists() or not salamandra_folder.is_dir(): |
| raise HTTPException(status_code=404, detail="Salamandra folder not found") |
| if not free_narration_salamandra.exists() or not free_narration_salamandra.is_file(): |
| raise HTTPException(status_code=404, detail="free_narration.txt not found") |
|
|
| return FileResponse( |
| path=free_narration_salamandra, |
| media_type="text/srt", |
| filename="free_narration.tx" |
| ) |
|
|
| @router.get("/download_salamadra_csv_evaluation", tags=["Salamandra Process"]) |
| def download_salamadra_csv_evaluation( |
| sha1: str, |
| token: str = Query(..., description="Token required for authorization") |
| ): |
| """ |
| Download the evaluation CSV generated by the Salamandra processing workflow. |
| |
| This endpoint returns the `evaluation.csv` file corresponding to the given SHA1 hash. |
| It performs token validation and ensures that the folder structure and file exist. |
| If any element is missing, a 404 HTTP error is raised. |
| |
| Args: |
| sha1 (str): The SHA1 identifier representing the processed media directory. |
| token (str): Authorization token required for file retrieval. |
| |
| Raises: |
| HTTPException: |
| - 404 if the SHA1 folder, result folder, or Salamandra folder does not exist. |
| - 404 if the `evaluation.csv` file is missing. |
| |
| Returns: |
| FileResponse: The evaluation CSV file with media type `text/srt`. |
| """ |
| validate_token(token) |
| |
| file_manager = FileManager(MEDIA_ROOT) |
| sha1_folder = MEDIA_ROOT / sha1 |
| result_folder = sha1_folder / "result" |
| result_folder.mkdir(parents=True, exist_ok=True) |
| salamandra_folder = result_folder / "Salamandra" |
| salamandra_folder.mkdir(parents=True, exist_ok=True) |
| csv_evaluacion = salamandra_folder / "evaluation.csv" |
|
|
| if not sha1_folder.exists() or not sha1_folder.is_dir(): |
| raise HTTPException(status_code=404, detail="SHA1 folder not found") |
| if not result_folder.exists() or not result_folder.is_dir(): |
| raise HTTPException(status_code=404, detail="result folder not found") |
| if not salamandra_folder.exists() or not salamandra_folder.is_dir(): |
| raise HTTPException(status_code=404, detail="Salamandra folder not found") |
| if not csv_evaluacion.exists() or not csv_evaluacion.is_file(): |
| raise HTTPException(status_code=404, detail="evaluation.csv CSV not found") |
|
|
| return FileResponse( |
| path=csv_evaluacion, |
| media_type="text/srt", |
| filename="evaluation.csv" |
| ) |