|
|
import os |
|
|
import csv |
|
|
import json |
|
|
import logging |
|
|
import shutil |
|
|
from pathlib import Path |
|
|
from typing import TypedDict, Annotated, List, Dict, Union |
|
|
from langgraph.graph import StateGraph, END |
|
|
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage |
|
|
from langchain_openai import ChatOpenAI |
|
|
from operator import itemgetter |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
BASE_DIR = Path(__file__).resolve().parent |
|
|
TEMP_DIR = BASE_DIR / "temp" |
|
|
TEMP_DIR.mkdir(exist_ok=True) |
|
|
|
|
|
LOG_FILE = TEMP_DIR / "finetuning.log" |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(levelname)s: %(message)s', |
|
|
handlers=[ |
|
|
logging.StreamHandler(), |
|
|
logging.FileHandler(LOG_FILE, encoding="utf-8") |
|
|
], |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
api_key = os.environ.get("OPENAI_API_KEY") |
|
|
if not api_key: |
|
|
raise EnvironmentError("OPENAI_API_KEY no est谩 configurada. Define la variable de entorno antes de ejecutar finetuning.py.") |
|
|
|
|
|
|
|
|
|
|
|
llm = ChatOpenAI(model="gpt-4o", temperature=0.3) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
INITIAL_SRT_CONTENT = """ |
|
|
1 |
|
|
00:00:00,000 --> 00:00:05,340 |
|
|
[Sandra] Per貌 de veritat crec que aquest projecte canviar脿 la nostra nota final. |
|
|
|
|
|
2 |
|
|
00:00:04,340 --> 00:00:05,790 |
|
|
[Luc铆a] Hem de donar-ho tot. |
|
|
|
|
|
3 |
|
|
00:00:05,790 --> 00:00:08,790 |
|
|
[Sandra] Ho s茅, ho s茅. |
|
|
|
|
|
4 |
|
|
00:00:08,000 --> 00:00:10,000 |
|
|
(AD) De sobte, s贸n al parc. |
|
|
|
|
|
5 |
|
|
00:00:10,000 --> 00:00:14,000 |
|
|
(AD) Ara tallen menjar i fan una amanida a una cuina. |
|
|
""" |
|
|
|
|
|
|
|
|
CONTEXT_JSON_CONTENT = """ |
|
|
{ |
|
|
"segments": [ |
|
|
{"id": 1, "start": "00:00:00,000", "end": "00:00:05,340", "type": "dialog", "text": "[Sandra] Per貌 de veritat crec que aquest projecte canviar脿 la nostra nota final."}, |
|
|
{"id": 2, "start": "00:00:04,340", "end": "00:00:05,790", "type": "dialog", "text": "[Luc铆a] Hem de donar-ho tot."}, |
|
|
{"id": 3, "start": "00:00:05,790", "end": "00:00:08,790", "type": "dialog", "text": "[Sandra] Ho s茅, ho s茅."}, |
|
|
{"id": 4, "start": "00:00:08,000", "end": "00:00:10,000", "type": "visual_context", "text": "Cambio de escena a un parque. Personajes caminando."}, |
|
|
{"id": 5, "start": "00:00:10,000", "end": "00:00:14,000", "type": "visual_context", "text": "Escena en una cocina. Los personajes est谩n cortando vegetales y haciendo una ensalada."} |
|
|
] |
|
|
} |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
UNE_RULES = """ |
|
|
### Reglas UNE de Audiodescripci贸n (Para el Cr铆tico) |
|
|
1. **Objetividad y Foco Visual:** La descripci贸n debe ser puramente objetiva, describiendo solo lo que se ve. Debe priorizar la acci贸n y los elementos relevantes (personajes, objetos, localizaci贸n). |
|
|
2. **Tiempo y Espacio (Sincronizaci贸n):** Las audiodescripciones (AD) deben insertarse en los silencios del di谩logo. El tiempo de la AD (entre START y END) debe ser suficiente para narrar el contenido sin solaparse con el di谩logo o la m煤sica importante. |
|
|
3. **Concisi贸n y Claridad:** Usar lenguaje simple y conciso. Evitar redundancias y juicios de valor. |
|
|
4. **Formato:** Cada segmento de AD debe tener un formato SRT v谩lido, incluyendo el marcador (AD) al principio de la l铆nea de texto. |
|
|
5. **Utilidad:** Cada segmento de AD debe ser 煤til para la comprensi贸n y nunca ser redundante. En caso de repetir algo ya explicado antes, mejor no decir nada. |
|
|
""" |
|
|
|
|
|
EVALUATION_CRITERIA = [ |
|
|
"Precisi贸 Descriptiva", |
|
|
"Sincronitzaci贸 Temporal", |
|
|
"Claredat i Concisi贸", |
|
|
"Inclusi贸 de Di脿leg/So", |
|
|
"Contextualitzaci贸", |
|
|
"Flux i Ritme de la Narraci贸", |
|
|
] |
|
|
|
|
|
CRITERIA_WEIGHTS = { |
|
|
"Precisi贸 Descriptiva": 1, |
|
|
"Sincronitzaci贸 Temporal": 4, |
|
|
"Claredat i Concisi贸": 1, |
|
|
"Inclusi贸 de Di脿leg/So": 1, |
|
|
"Contextualitzaci贸": 1, |
|
|
"Flux i Ritme de la Narraci贸": 1, |
|
|
} |
|
|
|
|
|
|
|
|
def setup_files(initial_srt_content: str, context_json_content: str): |
|
|
"""Crea los ficheros iniciales necesarios en el sistema de archivos local.""" |
|
|
(TEMP_DIR / "une_ad_0.srt").write_text(initial_srt_content, encoding="utf-8") |
|
|
(TEMP_DIR / "json_ad.json").write_text(context_json_content, encoding="utf-8") |
|
|
logger.info("Ficheros iniciales 'une_ad_0.srt' y 'json_ad.json' creados.") |
|
|
|
|
|
|
|
|
def _strip_markdown_fences(content: str) -> str: |
|
|
"""Elimina fences ```...``` alrededor de una respuesta JSON si existen.""" |
|
|
text = content.strip() |
|
|
if text.startswith("```"): |
|
|
lines = text.splitlines() |
|
|
|
|
|
lines = lines[1:] |
|
|
|
|
|
while lines and lines[-1].strip() == "```": |
|
|
lines.pop() |
|
|
text = "\n".join(lines).strip() |
|
|
return text |
|
|
|
|
|
|
|
|
def generate_evaluation_report(srt_content: str, iteration: int) -> tuple[float, float, Path]: |
|
|
"""Solicita al LLM una avaluaci贸 estructurada i guarda'n el CSV.""" |
|
|
criteria_formatted = "\n".join(f"- {name}" for name in EVALUATION_CRITERIA) |
|
|
prompt = ( |
|
|
"Actua com un auditor UNE. Avalua l'SRT generat, puntuant cada caracter铆stica de 0 a 7 " |
|
|
"segons la qualitat observada. D贸nega justificaci贸 breve per貌 concreta per a cada cas. " |
|
|
"Les caracter铆stiques obligat貌ries s贸n:\n" |
|
|
f"{criteria_formatted}\n" |
|
|
"Retorna 脷NICAMENT un array JSON d'objectes amb les claus: " |
|
|
"'caracteristica', 'valoracio' (nombre enter de 0 a 7) i 'justificacio'." |
|
|
) |
|
|
|
|
|
response = llm.invoke( |
|
|
[ |
|
|
SystemMessage(content=prompt), |
|
|
HumanMessage( |
|
|
content=( |
|
|
"# SRT AVALUAT\n" |
|
|
f"{srt_content}\n\n" |
|
|
"Assegura't de complir el format indicat." |
|
|
) |
|
|
), |
|
|
] |
|
|
) |
|
|
|
|
|
cleaned = _strip_markdown_fences(response.content) |
|
|
try: |
|
|
data = json.loads(cleaned) |
|
|
if not isinstance(data, list): |
|
|
raise ValueError("La resposta no 茅s una llista.") |
|
|
except Exception as exc: |
|
|
logger.error( |
|
|
"Error al generar l'avaluaci贸 estructurada: %s. Resposta original: %s", |
|
|
exc, |
|
|
response.content, |
|
|
) |
|
|
data = [ |
|
|
{ |
|
|
"caracteristica": "Avaluaci贸 fallida", |
|
|
"valoracio": 1, |
|
|
"justificacio": "No s'ha pogut obtenir l'avaluaci贸 del LLM.", |
|
|
} |
|
|
] |
|
|
|
|
|
eval_path = TEMP_DIR / f"eval_{iteration}.csv" |
|
|
with eval_path.open("w", encoding="utf-8", newline="") as csvfile: |
|
|
writer = csv.writer(csvfile) |
|
|
writer.writerow(["Caracteristica", "Valoracio (0-7)", "Justificacio"]) |
|
|
for item in data: |
|
|
writer.writerow( |
|
|
[ |
|
|
item.get("caracteristica", ""), |
|
|
item.get("valoracio", 0), |
|
|
item.get("justificacio", ""), |
|
|
] |
|
|
) |
|
|
|
|
|
scores = [] |
|
|
weighted_sum = 0.0 |
|
|
total_weight = 0.0 |
|
|
|
|
|
for entry in data: |
|
|
if not isinstance(entry, dict): |
|
|
continue |
|
|
try: |
|
|
score = float(entry.get("valoracio", 0)) |
|
|
except (TypeError, ValueError): |
|
|
score = 0.0 |
|
|
scores.append(score) |
|
|
|
|
|
weight = CRITERIA_WEIGHTS.get(entry.get("caracteristica", ""), 1) |
|
|
weighted_sum += score * weight |
|
|
total_weight += weight |
|
|
|
|
|
mean_score = sum(scores) / len(scores) if scores else 0.0 |
|
|
weighted_mean = weighted_sum / total_weight if total_weight else mean_score |
|
|
return mean_score, weighted_mean, eval_path |
|
|
|
|
|
|
|
|
class ReflectionState(TypedDict): |
|
|
"""Representa el estado del bucle de reflexi贸n.""" |
|
|
iteration: int |
|
|
current_srt_path: str |
|
|
critic_report: Dict[str, Union[float, str]] |
|
|
history: List[SystemMessage] |
|
|
evaluation_mean: float |
|
|
best_iteration: int |
|
|
best_weighted_mean: float |
|
|
best_srt_path: str |
|
|
best_eval_path: str |
|
|
|
|
|
|
|
|
def narrator_agent(state: ReflectionState): |
|
|
""" |
|
|
Agente que genera o reescribe el SRT. |
|
|
- En el ciclo 0, genera el SRT inicial. |
|
|
- En ciclos > 0, reescribe el SRT bas谩ndose en el critic_report. |
|
|
""" |
|
|
iteration = state["iteration"] |
|
|
critic_report = state["critic_report"] |
|
|
history = state["history"] |
|
|
|
|
|
|
|
|
json_context = (TEMP_DIR / "json_ad.json").read_text(encoding="utf-8") |
|
|
current_srt = Path(state["current_srt_path"]).read_text(encoding="utf-8") |
|
|
|
|
|
|
|
|
if iteration == 0: |
|
|
|
|
|
|
|
|
prompt = ( |
|
|
"Ets un Narrador expert en Audiodescripci贸 (AD). La teva tasca inicial 茅s generar " |
|
|
"un fitxer SRT d'audiodescripcions basat en el JSON de context visual. " |
|
|
"TOT I AIX脥, per a aquesta primera iteraci贸, l'SRT ja s'ha generat. " |
|
|
"Simplement retorna el contingut de 'une_ad_0.srt' com si fos la teva sortida. " |
|
|
"Assegura't que totes les audiodescripcions estiguin en catal脿 i que cadascuna pugui ser locutada " |
|
|
"dins del temps disponible (utilitza un m脿xim aproximat d'11 car脿cters per segon). Si el tram de temps " |
|
|
"茅s massa curt (<1.5s), combina'l amb el bloc d'AD m茅s proper i ajusta els timestamps perqu猫 la narraci贸 sigui fluida. " |
|
|
"Evita redund脿ncies: no repeteixis informaci贸 ja descrita en segments d'AD anteriors o al di脿leg, i elimina qualsevol detall que no sigui essencial." |
|
|
) |
|
|
output_srt = current_srt |
|
|
reflection_text = "Generaci贸n inicial. No hay reflexi贸n." |
|
|
else: |
|
|
|
|
|
prompt = ( |
|
|
"Ets un Narrador expert en Audiodescripci贸 (AD). Has rebut una cr铆tica sobre la teva 煤ltima versi贸 de l'SRT. " |
|
|
"La teva tasca 茅s REESCRIURE el contingut d'audiodescripci贸 (l铆nies amb '(AD)') del fitxer SRT, " |
|
|
"assegurant que sigui coherent amb el JSON de context i, sobretot, que CORREGEIXIS TOTS els problemes " |
|
|
"mencionats a l'Informe Cr铆tic adjunt. Mant茅n intactes els di脿legs (l铆nies amb [Nom]) i escriu totes les audiodescripcions en catal脿 natural. " |
|
|
"Garanteix que cada bloc d'AD pugui ser locutat dins del seu interval temporal disponible considerant un m脿xim d'11 car脿cters per segon. " |
|
|
"Si l'interval 茅s massa curt (<1.5s), fusiona'l amb el bloc d'AD anterior o posterior m茅s proper i ajusta els timestamps perqu猫 quedin cont铆nues. " |
|
|
"Prefereix frases concises i accionables, prioritzant la informaci贸 visual essencial, i elimina redund脿ncies amb AD anteriors o amb els di脿legs." |
|
|
) |
|
|
|
|
|
|
|
|
input_content = f""" |
|
|
# INFORME CR脥TICO |
|
|
Porcentaje de Fiabilidad Anterior: {critic_report.get('reliability_percentage')} |
|
|
Cr铆tica Cualitativa: {critic_report.get('qualitative_critique')} |
|
|
|
|
|
# JSON DE CONTEXTO VISUAL (Gu铆a para la AD) |
|
|
{json_context} |
|
|
|
|
|
# 脷LTIMO ARCHIVO SRT GENERADO (une_ad_{iteration-1}.srt) |
|
|
{current_srt} |
|
|
|
|
|
REGLAS: Tu respuesta debe ser *SOLAMENTE* el contenido completo del nuevo archivo SRT (incluyendo di谩logos), sin ning煤n comentario o explicaci贸n adicional. |
|
|
""" |
|
|
|
|
|
|
|
|
response = llm.invoke( |
|
|
[ |
|
|
SystemMessage(content=prompt), |
|
|
HumanMessage(content=input_content) |
|
|
] |
|
|
) |
|
|
|
|
|
output_srt = response.content |
|
|
reflection_text = f"Reescrito en base al informe cr铆tico: {critic_report.get('qualitative_critique', 'N/A')}" |
|
|
|
|
|
|
|
|
new_srt_path = TEMP_DIR / f"une_ad_{iteration}.srt" |
|
|
new_srt_path.write_text(output_srt, encoding="utf-8") |
|
|
|
|
|
|
|
|
(TEMP_DIR / f"thinking_{iteration}.txt").write_text(reflection_text, encoding="utf-8") |
|
|
|
|
|
logger.info(f"Narrador: Generada la versi贸n {iteration} del SRT en '{new_srt_path}'.") |
|
|
|
|
|
|
|
|
new_history = history + [AIMessage(content=f"Narrador v{iteration} completado. Raz贸n de reflexi贸n: {reflection_text}")] |
|
|
return { |
|
|
"iteration": iteration, |
|
|
"current_srt_path": str(new_srt_path), |
|
|
"history": new_history, |
|
|
"evaluation_mean": state.get("evaluation_mean", 0.0), |
|
|
"best_iteration": state.get("best_iteration", -1), |
|
|
"best_weighted_mean": state.get("best_weighted_mean", 0.0), |
|
|
"best_srt_path": state.get("best_srt_path", str(new_srt_path)), |
|
|
"best_eval_path": state.get("best_eval_path", str(TEMP_DIR / f"eval_{iteration}.csv")), |
|
|
} |
|
|
|
|
|
def identity_manager_agent(state: ReflectionState): |
|
|
""" |
|
|
Agente que gestiona la identidad del usuario. |
|
|
""" |
|
|
iteration = state["iteration"] |
|
|
history = state["history"] |
|
|
current_srt = Path(state["current_srt_path"]).read_text(encoding="utf-8") |
|
|
|
|
|
prompt = ( |
|
|
"Ets un gestor d'identitats. La teva tasca 茅s verificar la identitat de l'usuari " |
|
|
"i assegurar-te que les seves dades estiguin actualitzades." |
|
|
) |
|
|
|
|
|
input_content = f""" |
|
|
# 脷LTIMO ARCHIVO SRT GENERADO (une_ad_{iteration}.srt): |
|
|
{current_srt} |
|
|
|
|
|
REGLAS: Tu respuesta debe ser *SOLAMENTE* un objeto JSON con la informaci贸n de la identidad del usuario. |
|
|
""" |
|
|
|
|
|
|
|
|
response = llm.invoke( |
|
|
[ |
|
|
SystemMessage(content=prompt), |
|
|
HumanMessage(content=input_content) |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
cleaned_response = _strip_markdown_fences(response.content) |
|
|
identity_info = json.loads(cleaned_response) |
|
|
if not isinstance(identity_info, dict): |
|
|
raise ValueError("Estructura JSON incorrecta.") |
|
|
except Exception as e: |
|
|
logger.error(f"Error al parsear el JSON de la identidad: {e}. Respuesta: {response.content}") |
|
|
identity_info = {"error": "No s'ha pogut obtenir la informaci贸 d'identitat."} |
|
|
|
|
|
logger.info(f"Identity Manager: Informaci贸n de identidad actualizada.") |
|
|
|
|
|
new_history = history + [AIMessage(content=f"Identity Manager v{iteration} completado.")] |
|
|
return { |
|
|
"iteration": iteration, |
|
|
"current_srt_path": state["current_srt_path"], |
|
|
"history": new_history, |
|
|
"evaluation_mean": state.get("evaluation_mean", 0.0), |
|
|
"best_iteration": state.get("best_iteration", -1), |
|
|
"best_weighted_mean": state.get("best_weighted_mean", 0.0), |
|
|
"best_srt_path": state.get("best_srt_path", state["current_srt_path"]), |
|
|
"best_eval_path": state.get("best_eval_path", str(TEMP_DIR / f"eval_{iteration}.csv")), |
|
|
} |
|
|
|
|
|
def critic_agent(state: ReflectionState): |
|
|
""" |
|
|
Agente que eval煤a la calidad del SRT generado por el Narrador bas谩ndose en las Reglas UNE. |
|
|
Devuelve una puntuaci贸n y una cr铆tica cualitativa. |
|
|
""" |
|
|
iteration = state["iteration"] |
|
|
history = state["history"] |
|
|
current_srt = Path(state["current_srt_path"]).read_text(encoding="utf-8") |
|
|
|
|
|
prompt = ( |
|
|
"Ets un Cr铆tic d'Audiodescripci贸 molt estricte. La teva tasca 茅s avaluar l'SRT adjunt " |
|
|
"煤nicament segons les Regles UNE proporcionades. L'avaluaci贸 ha de ser doble: " |
|
|
"1. **Num猫rica**: Un percentatge de fiabilitat (ex. 85.5) de 0 a 100%. " |
|
|
"2. **Qualitativa**: Una cr铆tica constructiva sobre les principals mancances de les AD respecte a les regles. " |
|
|
"Has de ser EXTREMADAMENT estricte amb la sincronitzaci贸 (sense solapament amb el di脿leg), " |
|
|
"amb l'adequaci贸 temporal (velocitat m脿xima recomanada d'11 car脿cters per segon) i amb l'abs猫ncia de redund脿ncies. " |
|
|
"Comprova tamb茅 que totes les audiodescripcions estan escrites en catal脿 natural." |
|
|
) |
|
|
|
|
|
input_content = f""" |
|
|
# REGLAS UNE DE AUDIODESCRIPCI脫N: |
|
|
{UNE_RULES} |
|
|
|
|
|
# ARCHIVO SRT A EVALUAR (une_ad_{iteration}.srt): |
|
|
{current_srt} |
|
|
|
|
|
REGLAS DE RESPUESTA: |
|
|
Tu respuesta debe ser *SOLAMENTE* un objeto JSON con dos claves: |
|
|
1. "reliability_percentage": (float) El porcentaje de fiabilidad. |
|
|
2. "qualitative_critique": (string) La cr铆tica cualitativa y sugerencias de mejora. |
|
|
Ejemplo de respuesta: {{"reliability_percentage": 75.0, "qualitative_critique": "El segmento 4 se solapa 0.34s con el di谩logo de Sandra. El segmento 5 es demasiado gen茅rico y no describe bien la acci贸n."}} |
|
|
""" |
|
|
|
|
|
|
|
|
response = llm.invoke( |
|
|
[ |
|
|
SystemMessage(content=prompt), |
|
|
HumanMessage(content=input_content) |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
cleaned_response = _strip_markdown_fences(response.content) |
|
|
report = json.loads(cleaned_response) |
|
|
if not isinstance(report, dict) or 'reliability_percentage' not in report: |
|
|
raise ValueError("Estructura JSON incorrecta.") |
|
|
except Exception as e: |
|
|
logger.error(f"Error al parsear el JSON del Cr铆tico: {e}. Respuesta: {response.content}") |
|
|
report = {"reliability_percentage": 1.0, "qualitative_critique": "El Cr铆tico no devolvi贸 un JSON v谩lido. Reintentar."} |
|
|
|
|
|
logger.info(f"Cr铆tico: Evaluaci贸n completada. Fiabilidad: {report.get('reliability_percentage')}%.") |
|
|
|
|
|
mean_score, weighted_mean, eval_path = generate_evaluation_report(current_srt, iteration) |
|
|
|
|
|
thinking_path = TEMP_DIR / f"thinking_{iteration}.txt" |
|
|
if thinking_path.exists(): |
|
|
previous_text = thinking_path.read_text(encoding="utf-8") |
|
|
thinking_path.write_text( |
|
|
( |
|
|
f"{previous_text}\n\nMitjana simple d'avaluaci贸: {mean_score:.2f} / 7" |
|
|
f"\nMitjana ponderada d'avaluaci贸: {weighted_mean:.2f} / 7" |
|
|
), |
|
|
encoding="utf-8", |
|
|
) |
|
|
|
|
|
best_iteration = state.get("best_iteration", -1) |
|
|
best_weighted_mean = state.get("best_weighted_mean", -1.0) |
|
|
best_srt_path = state.get("best_srt_path", state["current_srt_path"]) |
|
|
best_eval_path = state.get("best_eval_path", str(TEMP_DIR / f"eval_{iteration}.csv")) |
|
|
|
|
|
if weighted_mean > best_weighted_mean: |
|
|
best_iteration = iteration |
|
|
best_weighted_mean = weighted_mean |
|
|
best_srt_path = state["current_srt_path"] |
|
|
best_eval_path = str(eval_path) |
|
|
|
|
|
new_history = history + [ |
|
|
AIMessage( |
|
|
content=( |
|
|
"Cr铆tico v{iter} completado. Fiabilidad: {reliab}%. " |
|
|
"Mitjana simple: {mean:.2f}/7. Mitjana ponderada: {wmean:.2f}/7" |
|
|
).format( |
|
|
iter=iteration, |
|
|
reliab=report.get("reliability_percentage"), |
|
|
mean=mean_score, |
|
|
wmean=weighted_mean, |
|
|
) |
|
|
) |
|
|
] |
|
|
return { |
|
|
"iteration": iteration + 1, |
|
|
"critic_report": report, |
|
|
"history": new_history, |
|
|
"evaluation_mean": weighted_mean, |
|
|
"best_iteration": best_iteration, |
|
|
"best_weighted_mean": best_weighted_mean, |
|
|
"best_srt_path": best_srt_path, |
|
|
"best_eval_path": best_eval_path, |
|
|
} |
|
|
|
|
|
def identity_manager_agent(state: ReflectionState): |
|
|
""" |
|
|
Agente que verifica coherencia entre hablantes en SRT, casting.csv y contexto visual. |
|
|
Corrige asignaciones de hablantes y genera log de cambios. |
|
|
""" |
|
|
iteration = state["iteration"] |
|
|
|
|
|
|
|
|
current_srt = Path(state["current_srt_path"]).read_text(encoding="utf-8") |
|
|
casting_path = TEMP_DIR / "casting.csv" |
|
|
json_context = (TEMP_DIR / "json_ad.json").read_text(encoding="utf-8") |
|
|
|
|
|
|
|
|
if not casting_path.exists(): |
|
|
logger.warning("Casting.csv no encontrado. Saltando identity_manager.") |
|
|
return state |
|
|
|
|
|
casting_content = casting_path.read_text(encoding="utf-8") |
|
|
|
|
|
prompt = ( |
|
|
"Ets un Identity Manager. La teva tasca 茅s:\n" |
|
|
"1. Verificar que les assignacions de parlants a l'SRT coincideixen amb casting.csv\n" |
|
|
"2. Comprovar que els parlants assignats s贸n coherents amb el context visual de json_ad.json\n" |
|
|
"3. Si trobes inconsist猫ncies, re-assigna els parlants corregint les etiquetes [Nom]\n" |
|
|
"4. Justifica canvis al fitxer identity_log.txt\n" |
|
|
"\n" |
|
|
"Dades d'entrada:\n" |
|
|
f"- CASTING.CSV:\n{casting_content}\n" |
|
|
f"- JSON CONTEXT:\n{json_context}\n" |
|
|
f"- SRT ACTUAL:\n{current_srt}\n" |
|
|
"\n" |
|
|
"REGLES:\n" |
|
|
"- Nom茅s modifica les l铆nies de di脿leg (ex: [Nom])\n" |
|
|
"- Mant茅 la numeraci贸 i timestamps\n" |
|
|
"- Si no hi ha canvis, retorna l'SRT original\n" |
|
|
"\n" |
|
|
"Format de sortida:\n" |
|
|
"```json\n" |
|
|
"{{\n" |
|
|
" \"srt_content\": \"<nou contingut SRT>\",\n" |
|
|
" \"log_message\": \"<explicaci贸 canvis o 'Sense canvis'>\"\n" |
|
|
"}}\n" |
|
|
"```" |
|
|
) |
|
|
|
|
|
response = llm.invoke([SystemMessage(content=prompt)]) |
|
|
|
|
|
try: |
|
|
|
|
|
cleaned = _strip_markdown_fences(response.content) |
|
|
data = json.loads(cleaned) |
|
|
new_srt = data["srt_content"] |
|
|
log_msg = data["log_message"] |
|
|
|
|
|
|
|
|
log_path = TEMP_DIR / f"identity_log_{iteration}.txt" |
|
|
log_path.write_text(f"Iteraci贸 {iteration}: {log_msg}", encoding="utf-8") |
|
|
|
|
|
|
|
|
if new_srt != current_srt: |
|
|
new_srt_path = TEMP_DIR / f"une_ad_{iteration}_corrected.srt" |
|
|
new_srt_path.write_text(new_srt, encoding="utf-8") |
|
|
logger.info(f"Identity Manager: Correccions aplicades. Detalls: {log_msg}") |
|
|
return { |
|
|
**state, |
|
|
"current_srt_path": str(new_srt_path) |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error en identity_manager: {e}") |
|
|
|
|
|
return state |
|
|
|
|
|
def background_descriptor_agent(state: ReflectionState): |
|
|
""" |
|
|
Agente que verifica coherencia entre escenarios en SRT y scenarios.csv. |
|
|
Corrige nombres de escenarios usando descripciones coherentes. |
|
|
""" |
|
|
iteration = state["iteration"] |
|
|
|
|
|
|
|
|
current_srt = Path(state["current_srt_path"]).read_text(encoding="utf-8") |
|
|
scenarios_path = TEMP_DIR / "scenarios.csv" |
|
|
|
|
|
|
|
|
if not scenarios_path.exists(): |
|
|
logger.warning("Scenarios.csv no encontrado. Saltando background_descriptor.") |
|
|
return state |
|
|
|
|
|
scenarios_content = scenarios_path.read_text(encoding="utf-8") |
|
|
|
|
|
prompt = ( |
|
|
"Ets un Background Descriptor. La teva tasca 茅s:\n" |
|
|
"1. Verificar que les descripcions d'escenaris a l'SRT coincideixen amb scenarios.csv\n" |
|
|
"2. Si trobes coincid猫ncies, reempla莽a les descripcions gen猫riques pel nom oficial de l'escenari\n" |
|
|
"3. Justifica canvis al fitxer background_log.txt\n" |
|
|
"\n" |
|
|
"Dades d'entrada:\n" |
|
|
f"- SCENARIOS.CSV:\n{scenarios_content}\n" |
|
|
f"- SRT ACTUAL:\n{current_srt}\n" |
|
|
"\n" |
|
|
"REGLES:\n" |
|
|
"- Nom茅s modifica l铆nies d'audiodescripci贸 (ex: (AD) ...)\n" |
|
|
"- Mant茅 la numeraci贸 i timestamps\n" |
|
|
"- Si no hi ha canvis, retorna l'SRT original\n" |
|
|
"\n" |
|
|
"Format de sortida:\n" |
|
|
"```json\n" |
|
|
"{{\n" |
|
|
" \"srt_content\": \"<nou contingut SRT>\",\n" |
|
|
" \"log_message\": \"<explicaci贸 canvis o 'Sense canvis'>\"\n" |
|
|
"}}\n" |
|
|
"```" |
|
|
) |
|
|
|
|
|
response = llm.invoke([SystemMessage(content=prompt)]) |
|
|
|
|
|
try: |
|
|
|
|
|
cleaned = _strip_markdown_fences(response.content) |
|
|
data = json.loads(cleaned) |
|
|
new_srt = data["srt_content"] |
|
|
log_msg = data["log_message"] |
|
|
|
|
|
|
|
|
log_path = TEMP_DIR / f"background_log_{iteration}.txt" |
|
|
log_path.write_text(f"Iteraci贸 {iteration}: {log_msg}", encoding="utf-8") |
|
|
|
|
|
|
|
|
if new_srt != current_srt: |
|
|
new_srt_path = TEMP_DIR / f"une_ad_{iteration}_scenario_corrected.srt" |
|
|
new_srt_path.write_text(new_srt, encoding="utf-8") |
|
|
logger.info(f"Background Descriptor: Correccions aplicades. Detalls: {log_msg}") |
|
|
return { |
|
|
**state, |
|
|
"current_srt_path": str(new_srt_path) |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error en background_descriptor: {e}") |
|
|
|
|
|
return state |
|
|
|
|
|
|
|
|
|
|
|
def should_continue(state: ReflectionState) -> str: |
|
|
""" |
|
|
Funci贸n de chequeo que decide si continuar iterando o finalizar. |
|
|
""" |
|
|
MAX_ITERATIONS = 5 |
|
|
MIN_AVERAGE_SCORE = 6.0 |
|
|
|
|
|
iteration = state["iteration"] |
|
|
mean_score = state.get("evaluation_mean", 0.0) |
|
|
|
|
|
if mean_score >= MIN_AVERAGE_SCORE: |
|
|
logger.info(f"FIN: Mitjana ponderada d'avaluaci贸 assolida ({mean_score:.2f} >= {MIN_AVERAGE_SCORE}).") |
|
|
return "end" |
|
|
|
|
|
if iteration >= MAX_ITERATIONS: |
|
|
logger.info(f"FIN: S'ha assolit el m脿xim d'iteracions ({iteration} / {MAX_ITERATIONS}).") |
|
|
return "end" |
|
|
|
|
|
logger.info(f"CONTINUAR: Iteraci贸 {iteration} / {MAX_ITERATIONS}. Mitjana ponderada actual: {mean_score:.2f} / 7.") |
|
|
return "continue" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
initial_state: ReflectionState = { |
|
|
"iteration": 0, |
|
|
"current_srt_path": str(TEMP_DIR / "une_ad_0.srt"), |
|
|
"critic_report": {"reliability_percentage": 0.0, "qualitative_critique": "Inicializando el proceso."}, |
|
|
"history": [], |
|
|
"evaluation_mean": 0.0, |
|
|
"best_iteration": -1, |
|
|
"best_weighted_mean": -1.0, |
|
|
"best_srt_path": str(TEMP_DIR / "une_ad_0.srt"), |
|
|
"best_eval_path": str(TEMP_DIR / "eval_0.csv"), |
|
|
} |
|
|
|
|
|
|
|
|
workflow = StateGraph(ReflectionState) |
|
|
|
|
|
|
|
|
workflow.add_node("narrator", narrator_agent) |
|
|
workflow.add_node("identity_manager", identity_manager_agent) |
|
|
workflow.add_node("background_descriptor", background_descriptor_agent) |
|
|
workflow.add_node("critic", critic_agent) |
|
|
|
|
|
|
|
|
workflow.set_entry_point("narrator") |
|
|
workflow.add_edge("narrator", "identity_manager") |
|
|
workflow.add_edge("identity_manager", "background_descriptor") |
|
|
workflow.add_edge("background_descriptor", "critic") |
|
|
|
|
|
|
|
|
workflow.add_conditional_edges( |
|
|
"critic", |
|
|
should_continue, |
|
|
{ |
|
|
"continue": "narrator", |
|
|
"end": END |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
app = workflow.compile() |
|
|
|
|
|
|
|
|
def generate_free_ad_from_srt(srt_path: Path) -> Path: |
|
|
"""Genera una narraci贸n libre detallada a partir del SRT final.""" |
|
|
srt_content = srt_path.read_text(encoding="utf-8") |
|
|
prompt = ( |
|
|
"Actua com una narradora professional d'audiodescripcions lliures. " |
|
|
"A partir de l'SRT proporcionat, escriu un text narratiu en catal脿 que descrigui " |
|
|
"de manera exhaustiva i fluida tot el que succeeix a la pe莽a audiovisual. " |
|
|
"Inclou accions, aparen莽a, gestos, canvis d'escena i qualsevol detall rellevant, " |
|
|
"sense limitar-te a les restriccions temporals del format SRT. " |
|
|
"Evita repetir literalment els di脿legs, per貌 contextualitza'ls quan sigui 煤til. " |
|
|
"La narraci贸 ha de ser clara, coherent i apta per ser locutada com una narraci贸 lliure." |
|
|
) |
|
|
|
|
|
response = llm.invoke( |
|
|
[ |
|
|
SystemMessage(content=prompt), |
|
|
HumanMessage( |
|
|
content=( |
|
|
"# SRT FINAL\n" |
|
|
f"{srt_content}\n\n" |
|
|
"Respon 煤nicamente con la narraci贸 lliure sin cap comentario adicional." |
|
|
) |
|
|
), |
|
|
] |
|
|
) |
|
|
|
|
|
free_ad_path = TEMP_DIR / "free_ad.txt" |
|
|
free_ad_path.write_text(response.content, encoding="utf-8") |
|
|
logger.info(f"Narraci贸 lliure generada en '{free_ad_path}'.") |
|
|
return free_ad_path |
|
|
|
|
|
def run_reflection_pipeline(srt_content: str, context_json: str | None = None) -> str: |
|
|
"""Ejecuta el grafo de finetuning sobre un SRT y devuelve el SRT final. |
|
|
|
|
|
Args: |
|
|
srt_content: Contenido SRT inicial. |
|
|
context_json: Contexto visual en JSON; si es None se usa CONTEXT_JSON_CONTENT. |
|
|
|
|
|
Returns: |
|
|
Contenido del SRT final seleccionado como mejor iteraci贸n. |
|
|
""" |
|
|
|
|
|
|
|
|
setup_files(srt_content, context_json or CONTEXT_JSON_CONTENT) |
|
|
|
|
|
logger.info("--- Comenzando el Bucle de Finetuning (run_reflection_pipeline) ---") |
|
|
|
|
|
|
|
|
final_state = app.invoke(initial_state) |
|
|
|
|
|
logger.info("\n--- Bucle Finalizado ---") |
|
|
|
|
|
best_iteration = final_state.get("best_iteration", -1) |
|
|
best_weighted_mean = final_state.get("best_weighted_mean", 0.0) |
|
|
best_srt_path = Path(final_state.get("best_srt_path", final_state["current_srt_path"])) |
|
|
best_eval_path = Path(final_state.get("best_eval_path", TEMP_DIR / "eval_0.csv")) |
|
|
|
|
|
final_srt_path = TEMP_DIR / "une_ad.srt" |
|
|
final_eval_path = TEMP_DIR / "eval.csv" |
|
|
|
|
|
try: |
|
|
shutil.copy(best_srt_path, final_srt_path) |
|
|
logger.info(f"SRT final copiado a '{final_srt_path}'.") |
|
|
except Exception as exc: |
|
|
logger.error(f"No se pudo copiar el SRT final: {exc}") |
|
|
|
|
|
try: |
|
|
shutil.copy(best_eval_path, final_eval_path) |
|
|
logger.info(f"Evaluaci贸n final copiada a '{final_eval_path}'.") |
|
|
except Exception as exc: |
|
|
logger.error(f"No se pudo copiar el CSV final: {exc}") |
|
|
|
|
|
logger.info( |
|
|
"Resultado 贸ptimo en iteraci贸n %s (mitjana ponderada %.2f/7)", |
|
|
best_iteration, |
|
|
best_weighted_mean, |
|
|
) |
|
|
|
|
|
return final_srt_path.read_text(encoding="utf-8") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
final_srt_text = run_reflection_pipeline(INITIAL_SRT_CONTENT, CONTEXT_JSON_CONTENT) |
|
|
|
|
|
|
|
|
free_ad_path: Union[Path, None] = None |
|
|
try: |
|
|
final_srt_path = TEMP_DIR / "une_ad.srt" |
|
|
free_ad_path = generate_free_ad_from_srt(final_srt_path) |
|
|
except Exception as exc: |
|
|
logger.error(f"No s'ha pogut generar la narraci贸 lliure: {exc}") |
|
|
|
|
|
|
|
|
print("\n--- Contenido del SRT Final ---") |
|
|
print(final_srt_text) |
|
|
|
|
|
if free_ad_path is not None: |
|
|
print("\n--- Narraci贸 Lliure ---") |
|
|
print(free_ad_path.read_text(encoding="utf-8")) |
|
|
|