engine / main_process /moe_router.py
VeuReu's picture
Update main_process/moe_router.py
102b5e7 verified
raw
history blame
24.2 kB
import os
import io
import re
import ast
import json
import tempfile
from pathlib import Path
from typing import List, Dict, Counter
# --- Third-Party Libraries ---
import cv2
import torch
from fastapi import APIRouter, UploadFile, File, Query, HTTPException
from fastapi.responses import JSONResponse, StreamingResponse, FileResponse
from transformers import AutoModelForCausalLM, AutoTokenizer
from openai import OpenAI
# --- Internal Modules / Project Imports ---
from svision_client import (
extract_scenes,
add_ocr_and_faces,
keyframes_every_second_extraction,
extract_descripcion_escena
)
from asr_client import (
extract_audio_from_video,
diarize_audio,
transcribe_long_audio,
transcribe_short_audio,
identificar_veu
)
from storage.common import validate_token
from storage.files.file_manager import FileManager
from storage.embeddings_routers import get_embeddings_json
from main_process.main_router import (
get_initial_info_path,
get_initial_srt_path
)
EMBEDDINGS_ROOT = Path("/data/embeddings")
MEDIA_ROOT = Path("/data/media")
os.environ["CUDA_VISIBLE_DEVICES"] = "1"
router = APIRouter(prefix="/moe", tags=["MoE Process"])
HF_TOKEN = os.getenv("HF_TOKEN")
OPEN_AI_KEY = os.getenv("OPEN_AI_KEY")
class DataHub:
def __init__(self, video_analysis_json: str):
print("DataHub inicializando con JSON:", video_analysis_json)
self.video = json.loads(Path(video_analysis_json).read_text(encoding='utf-8'))
class NState(dict):
pass
# ---------------- LLM utilizado para el free_narration ----------------
class SalamandraClient:
def __init__(self, model_id="BSC-LT/salamandra-7b-instruct"):
self.tokenizer = AutoTokenizer.from_pretrained(model_id)
self.model = AutoModelForCausalLM.from_pretrained(
model_id,
device_map="auto",
torch_dtype=torch.bfloat16
)
def chat(self, prompt) -> str:
encodings = self.tokenizer(
prompt,
return_tensors="pt",
padding=True,
)
inputs = encodings["input_ids"].to(self.model.device)
attention_mask = encodings["attention_mask"].to(self.model.device)
outputs = self.model.generate(
input_ids=inputs,
attention_mask=attention_mask,
pad_token_id=self.tokenizer.pad_token_id,
max_new_tokens=300, # m谩s grande si el texto es largo
temperature=0.01, # control de creatividad
top_k=50, # tokens m谩s probables
top_p=0.9
)
print(self.tokenizer.decode(outputs[0], skip_special_tokens=True))
print("Separaci贸n")
# Cortar la parte del prompt
generated_tokens = outputs[0][inputs.shape[1]:]
return self.tokenizer.decode(generated_tokens, skip_special_tokens=True)
# Esto aqu铆 s贸lo se utiliza para la valoraci贸n:
class GPT5Client:
def __init__(self, api_key: str):
key = api_key
if not key:
raise RuntimeError(f"Missing key in environment for GPT-5 client")
self.cli = OpenAI(api_key=key)
def chat(self, messages: list, model: str = 'gpt-4o-mini') -> str:
print("GPT5Client.chat llamado con", len(messages), "mensajes")
r = self.cli.chat.completions.create(model=model, messages=messages,temperature=0)
content = r.choices[0].message.content.strip()
return content
def get_video_duration(video_path: str) -> float:
"""
Devuelve la duraci贸n total del v铆deo en segundos.
"""
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise RuntimeError(f"No s'ha pogut obrir el v铆deo: {video_path}")
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 0
cap.release()
duration_sec = total_frames / fps if total_frames > 0 else 0.0
return duration_sec
def generate_srt_con_silencios(path_srt_original, path_srt_silences, video_path):
# Obtenir duraci贸 total del v铆deo
duracio_total = get_video_duration(video_path)
with open(path_srt_original, "r", encoding="utf-8-sig") as f:
srt_text = f.read()
blocks = srt_text.strip().split("\n\n")
prev = 0
srt_entries = []
idx = 1
for block in blocks:
lines = block.split("\n")
time_range = lines[1]
print(time_range)
content = " ".join(line.strip() for line in lines[2:])
start_str, end_str = time_range.split(" --> ")
start_sec = srt_time_to_seconds(start_str)
end_sec = srt_time_to_seconds(end_str)
# Afegir silenci si hi ha espai
if prev < start_sec:
srt_entries.append(
f"{idx}\n{seconds_to_srt_time(prev)} --> {seconds_to_srt_time(start_sec)}\n[silenci]\n"
)
idx += 1
# Afegir clip amb text
srt_entries.append(
f"{idx}\n{seconds_to_srt_time(start_sec)} --> {seconds_to_srt_time(end_sec)}\n{content}\n"
)
idx += 1
prev = end_sec
# Afegir 煤ltim bloc de silenci si la duraci贸 del v铆deo 茅s m茅s llarga que l'煤ltim clip
if prev < duracio_total:
srt_entries.append(
f"{idx}\n{seconds_to_srt_time(prev)} --> {seconds_to_srt_time(duracio_total)}\n[silenci]\n"
)
# Guardar a l'arxiu final
with open(path_srt_silences, "w", encoding="utf-8") as f:
f.write("\n".join(srt_entries))
def srt_time_to_seconds(s):
h, m, rest = s.split(":")
s, ms = rest.split(",")
return int(h)*3600 + int(m)*60 + float(s) + int(ms)/1000
def seconds_to_srt_time(seconds):
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = int(seconds % 60)
ms = int((seconds - int(seconds)) * 1000)
return f"{h:02}:{m:02}:{s:02},{ms:03}"
class Add_AD:
def __init__(self, data: DataHub):
self.data = data
def __call__(self, state: NState, srt_modified_silence, srt_modified_silence_con_ad) -> NState:
print("Add_Ad.__call__ iniciado")
# Leer SRT original
with open(srt_modified_silence, "r", encoding="utf-8") as f:
srt_text = f.read()
# Frames del video
frames = self.data.video.get('info_escenas', {})
# Parsear SRT a bloques
srt_blocks = []
srt_blocks_modified=[]
pattern = re.compile(
r"(\d+)\s+(\d{2}:\d{2}:\d{2},\d{3}) --> (\d{2}:\d{2}:\d{2},\d{3})\s+(.*?)(?=\n\d+\n|\Z)",
re.S
)
for match in pattern.finditer(srt_text):
index = int(match.group(1))
start = srt_time_to_seconds(match.group(2))
end = srt_time_to_seconds(match.group(3))
text = match.group(4).strip()
srt_blocks.append({
"index": index,
"start": start,
"end": end,
"text": text
})
index=1
# Procesar cada bloque
for block in srt_blocks:
if "[silenci]" in block["text"]:
start_block = block["start"]
end_block = block["end"]
for frame in frames:
if frame.get("start")<=start_block and frame.get("end")>=end_block:
srt_blocks_modified.append({
"index":index,
"start": start_block,
"end": end_block,
"text": f"(AD): {frame.get('descripcion', '')}"
})
index+=1
elif start_block<frame.get("end")<end_block:
srt_blocks_modified.append({
"index":index,
"start": start_block,
"end": frame.get("end"),
"text": f"(AD): {frame.get('descripcion', '')}"
})
start_block=frame.get("end")
index+=1
elif start_block==frame.get("start") and start_block<end_block and frame.get("end")>=end_block:
srt_blocks_modified.append({
"index":index,
"start": start_block,
"end": end_block,
"text": f"(AD): {frame.get('descripcion', '')}"
})
start_block=end_block
index+=1
else:
srt_blocks_modified.append({
"index": index,
"start": block["start"],
"end": block["end"],
"text": block["text"]
})
index+=1
# Reconstruir el SRT final
srt_final = ""
for block in srt_blocks_modified:
start_tc = seconds_to_srt_time(block["start"])
end_tc = seconds_to_srt_time(block["end"])
srt_final += f"{block['index']}\n{start_tc} --> {end_tc}\n{block['text']}\n\n"
# Guardar en un nuevo archivo
with open(srt_modified_silence_con_ad, "w", encoding="utf-8") as f:
f.write(srt_final)
# Actualizar estado
state['srt_con_audiodescripcion'] = srt_final
return state
class Free_Narration:
def __init__(self, data: DataHub):
self.data = data
def __call__(self, state: NState, srt_original_silence_con_ad, story_path) -> NState:
print("Free_Narration.__call__ iniciado")
descriptions=[]
frames = self.data.video.get('info_escenas', [])
for frame in frames:
descriptions.append(frame["descripcion"])
full_transcription = self.data.video.get('full_transcription', [])
with open(srt_original_silence_con_ad, "r", encoding="utf-8-sig") as f:
diarization_text = f.read()
prompt = f"""
La teva tasca 茅s elaborar una descripci贸 lliure d'un v铆deo d'unes 100 paraules a partir de la informaci贸 seg眉ent:
1.) A partir del v铆deo s'han extret captures de pantalla en els moments en qu猫 es canviava d'escena i tens una descripci贸 de cadascuna d'elles a: {descriptions}
2.) La transcripci贸 completa del v铆deo 茅s: {full_transcription}
Per tant, a partir de tota aquesta informaci贸, genera'm la hist貌ria completa, intentant incloure els personatges identificats i la trama general de la hist貌ria.
"""
out = state['llm_Salamandra'](prompt)
print(out)
with open(story_path, "w", encoding="utf-8-sig") as f:
f.write(out)
state['free_narration'] = out
return state
class Valoracion_Final:
def __call__(self, state, srt_final, csv_evaluacion):
print("Valoracion_Final.__call__ iniciat")
# Llegeix el contingut del fitxer SRT
with open(srt_final, "r", encoding="utf-8-sig") as f:
srt_text = f.read().strip()
# Defineix el prompt principal
prompt = f"""
Ets un avaluador expert en accessibilitat audiovisual segons la NORMA UNE 153020.
Analitza el seg眉ent fitxer SRT i avalua'l segons les caracter铆stiques indicades.
Per a cada caracter铆stica, assigna una puntuaci贸 del 0 al 7 i una justificaci贸 breu i espec铆fica,
seguint el format establert.
SRT a analitzar:
{srt_text}
Format de sortida:
Caracteristica,Valoracio (0-7),Justificacio
Les caracter铆stiques a avaluar s贸n:
- Precisi贸 Descriptiva: Avalua si la descripci贸 visual dels plans, accions i context 茅s exacta i coherent amb el contingut esperat.
- Sincronitzaci贸 Temporal: Avalua si el text apareix i desapareix al moment adequat segons el contingut visual o sonor.
- Claredat i Concisi贸: Analitza si el llenguatge 茅s clar, natural i sense redund脿ncies.
- Inclusi贸 de Di脿leg/So: Determina si es recullen correctament els di脿legs, sons i elements musicals rellevants.
- Contextualitzaci贸: Avalua si el context (ambient, espai, personatges, situacions) est脿 ben representat.
- Flux i Ritme de la Narraci贸: Avalua la flu茂desa de la lectura i la coher猫ncia temporal entre segments.
Respon nom茅s amb la taula CSV, sense cap text addicional.
"""
# Missatges estructurats per al model (rols system + user)
messages = [
{"role": "system", "content": "Ets un assistent expert en accessibilitat audiovisual i normativa UNE 153020."},
{"role": "user", "content": prompt}
]
# Crida al model (s鈥檃ssumeix que state['llm_GPT'] 茅s una funci贸 que processa missatges)
out = state['llm_GPT'](messages)
out_text = str(out).strip()
# Escriu el resultat CSV
with open(csv_evaluacion, "w", encoding="utf-8-sig") as f:
f.write(out_text)
return state
@router.post("/generate_moe_result", tags=["MoE Process"])
async def generate_salamadra_result(
sha1: str,
token: str = Query(..., description="Token required for authorization")
):
"""
Generate all MoE output files (final SRT, free narration, and evaluation CSV)
for a processed video identified by its SHA1 hash.
This endpoint orchestrates the full Salamandra processing pipeline:
- Validates the access token.
- Locates the processed video and its associated metadata.
- Generates an intermediate SRT file enriched with silence markers.
- Runs the Salamandra logic to produce:
* A finalized SRT subtitle file (`result.srt`)
* A free-narration text file (`free_narration.txt`)
* An evaluation CSV (`evaluation.csv`)
- Ensures the expected directory structure exists, creating folders if necessary.
- Uses both GPT-based and Salamandra-based LLMs to generate narrative and evaluation content.
Args:
sha1 (str): The SHA1 hash that identifies the media processing workspace.
token (str): Authorization token required to execute Salamandra operations.
Raises:
HTTPException:
- 404 if the SHA1 folder does not exist.
- 404 if the `clip` folder is missing.
- 404 if no MP4 file is found inside the clip folder.
Processing Steps:
1. Validates that all required folders exist (`sha1`, `clip`, `result/Salamandra`).
2. Retrieves the input video and initial metadata (original SRT, info JSON).
3. Creates temporary enriched SRT with silence detection.
4. Runs Add_AD, Free_Narration, and Valoracion_Final modules.
5. Generates the final Salamandra output files:
- result.srt
- free_narration.txt
- evaluation.csv
Returns:
dict: A JSON response indicating successful generation:
{
"status": "ok",
"message": "Salamandra SRT, free_narration and CSV evaluation generated"
}
"""
validate_token(token)
# Resolve directories
file_manager = FileManager(MEDIA_ROOT)
sha1_folder = MEDIA_ROOT / sha1
clip_folder = sha1_folder / "clip"
if not sha1_folder.exists() or not sha1_folder.is_dir():
raise HTTPException(status_code=404, detail="SHA1 folder not found")
if not clip_folder.exists() or not clip_folder.is_dir():
raise HTTPException(status_code=404, detail="Clip folder not found")
# Locate video file
mp4_files = list(clip_folder.glob("*.mp4"))
if not mp4_files:
raise HTTPException(status_code=404, detail="No MP4 files found")
video_path = clip_folder / mp4_files[0]
# Get initial srt
srt_original = get_initial_srt_path(sha1)
# Get initial info json
informacion_json = get_initial_info_path(sha1)
# Generate srt final path
file_manager = FileManager(MEDIA_ROOT)
sha1_folder = MEDIA_ROOT / sha1
result_folder = sha1_folder / "result"
result_folder.mkdir(parents=True, exist_ok=True)
salamdra_folder = result_folder / "Salamandra"
salamdra_folder.mkdir(parents=True, exist_ok=True)
srt_final = salamdra_folder / "result.srt"
# Generate free_narration_salamandra final path
file_manager = FileManager(MEDIA_ROOT)
sha1_folder = MEDIA_ROOT / sha1
result_folder = sha1_folder / "result"
result_folder.mkdir(parents=True, exist_ok=True)
salamdra_folder = result_folder / "Salamandra"
salamdra_folder.mkdir(parents=True, exist_ok=True)
free_narration_salamandra = salamdra_folder / "free_narration.txt"
# Generate evaluation csv path
file_manager = FileManager(MEDIA_ROOT)
sha1_folder = MEDIA_ROOT / sha1
result_folder = sha1_folder / "result"
result_folder.mkdir(parents=True, exist_ok=True)
salamdra_folder = result_folder / "Salamandra"
salamdra_folder.mkdir(parents=True, exist_ok=True)
csv_evaluacion = salamdra_folder / "evaluation.csv"
# Temp srt name
srt_name = sha1 + "_srt"
tmp = tempfile.NamedTemporaryFile(mode="w+", suffix=".srt", prefix=srt_name + "_", delete=False)
generate_srt_con_silencios(srt_original, tmp.name, video_path)
datahub=DataHub(informacion_json)
add_ad = Add_AD(datahub)
free_narration = Free_Narration(datahub)
valoracion_final = Valoracion_Final()
GPTclient = GPT5Client(api_key=OPEN_AI_KEY)
salamandraclient = SalamandraClient()
state = {
"llm_GPT": GPTclient.chat,
"llm_Salamandra": salamandraclient.chat
}
state = add_ad(state, tmp.name, srt_final)
state= free_narration(state, srt_final, free_narration_salamandra)
state = valoracion_final(state, srt_final, csv_evaluacion)
tmp.close()
return {"status": "ok", "message": "Salamandra SRT, free_narration and CSV evaluation generated"}
@router.get("/download_moe_srt", tags=["MoE Process"])
def download_salamadra_srt(
sha1: str,
token: str = Query(..., description="Token required for authorization")
):
"""
Download the final SRT subtitle file generated by the Salamandra processing pipeline.
This endpoint retrieves the file `result.srt` associated with a specific SHA1 hash.
It validates the authorization token, checks the expected folder structure, and
returns the subtitle file if it exists.
Args:
sha1 (str): The SHA1 identifier corresponding to the processed media folder.
token (str): Authorization token required to access the resource.
Raises:
HTTPException:
- 404 if any of the required directories (SHA1 folder, result folder, Salamandra folder)
are missing.
- 404 if the `result.srt` file is not found.
Returns:
FileResponse: The SRT file (`result.srt`) with media type `text/srt`.
"""
validate_token(token)
file_manager = FileManager(MEDIA_ROOT)
sha1_folder = MEDIA_ROOT / sha1
result_folder = sha1_folder / "result"
result_folder.mkdir(parents=True, exist_ok=True)
salamandra_folder = result_folder / "Salamandra"
salamandra_folder.mkdir(parents=True, exist_ok=True)
srt_final = salamandra_folder / "result.srt"
if not sha1_folder.exists() or not sha1_folder.is_dir():
raise HTTPException(status_code=404, detail="SHA1 folder not found")
if not result_folder.exists() or not result_folder.is_dir():
raise HTTPException(status_code=404, detail="result folder not found")
if not salamandra_folder.exists() or not salamandra_folder.is_dir():
raise HTTPException(status_code=404, detail="Salamandra folder not found")
if not srt_final.exists() or not srt_final.is_file():
raise HTTPException(status_code=404, detail="result.srt SRT not found")
return FileResponse(
path=srt_final,
media_type="text/srt",
filename="result.srt"
)
@router.get("/download_moe_free_narration", tags=["MoE Process"])
def download_salamadra_free_narration(
sha1: str,
token: str = Query(..., description="Token required for authorization")
):
"""
Download the free narration text file generated by the Salamandra process.
This endpoint retrieves `free_narration.txt` from the Salamandra result directory
associated with a specific SHA1 hash. The token is validated before accessing the
file system. If the file or required folders do not exist, appropriate HTTP
errors are returned.
Args:
sha1 (str): The SHA1 identifier for the processed media folder.
token (str): Authorization token required to access the file.
Raises:
HTTPException:
- 404 if the SHA1 folder, result folder, or Salamandra folder is missing.
- 404 if `free_narration.txt` is not found.
Returns:
FileResponse: The free narration text file with media type `text/srt`.
"""
validate_token(token)
file_manager = FileManager(MEDIA_ROOT)
sha1_folder = MEDIA_ROOT / sha1
result_folder = sha1_folder / "result"
result_folder.mkdir(parents=True, exist_ok=True)
salamandra_folder = result_folder / "Salamandra"
salamandra_folder.mkdir(parents=True, exist_ok=True)
free_narration_salamandra = salamandra_folder / "free_narration.txt"
if not sha1_folder.exists() or not sha1_folder.is_dir():
raise HTTPException(status_code=404, detail="SHA1 folder not found")
if not result_folder.exists() or not result_folder.is_dir():
raise HTTPException(status_code=404, detail="result folder not found")
if not salamandra_folder.exists() or not salamandra_folder.is_dir():
raise HTTPException(status_code=404, detail="Salamandra folder not found")
if not free_narration_salamandra.exists() or not free_narration_salamandra.is_file():
raise HTTPException(status_code=404, detail="free_narration.txt not found")
return FileResponse(
path=free_narration_salamandra,
media_type="text/srt",
filename="free_narration.tx"
)
@router.get("/download_moe_csv_evaluation", tags=["MoE Process"])
def download_salamadra_csv_evaluation(
sha1: str,
token: str = Query(..., description="Token required for authorization")
):
"""
Download the evaluation CSV generated by the Salamandra processing workflow.
This endpoint returns the `evaluation.csv` file corresponding to the given SHA1 hash.
It performs token validation and ensures that the folder structure and file exist.
If any element is missing, a 404 HTTP error is raised.
Args:
sha1 (str): The SHA1 identifier representing the processed media directory.
token (str): Authorization token required for file retrieval.
Raises:
HTTPException:
- 404 if the SHA1 folder, result folder, or Salamandra folder does not exist.
- 404 if the `evaluation.csv` file is missing.
Returns:
FileResponse: The evaluation CSV file with media type `text/srt`.
"""
validate_token(token)
file_manager = FileManager(MEDIA_ROOT)
sha1_folder = MEDIA_ROOT / sha1
result_folder = sha1_folder / "result"
result_folder.mkdir(parents=True, exist_ok=True)
salamandra_folder = result_folder / "Salamandra"
salamandra_folder.mkdir(parents=True, exist_ok=True)
csv_evaluacion = salamandra_folder / "evaluation.csv"
if not sha1_folder.exists() or not sha1_folder.is_dir():
raise HTTPException(status_code=404, detail="SHA1 folder not found")
if not result_folder.exists() or not result_folder.is_dir():
raise HTTPException(status_code=404, detail="result folder not found")
if not salamandra_folder.exists() or not salamandra_folder.is_dir():
raise HTTPException(status_code=404, detail="Salamandra folder not found")
if not csv_evaluacion.exists() or not csv_evaluacion.is_file():
raise HTTPException(status_code=404, detail="evaluation.csv CSV not found")
return FileResponse(
path=csv_evaluacion,
media_type="text/srt",
filename="evaluation.csv"
)