DesgrabadorAI / alignment_subprocess.py
JoaquinZ's picture
Upload 19 files
e641d41 verified
#!/usr/bin/env python3
# alignment_subprocess.py - Subprocess para alineación de WhisperX
import os
import sys
import json
import time
import gc
import torch
import torchaudio
import whisperx
import tempfile
from pathlib import Path
def _inject_local_ffmpeg():
"""Ensure bundled ffmpeg bin folder is in PATH for audio loading in subprocess.
Safe no-op if not found."""
try:
base_dir = Path(__file__).parent
candidates = [
base_dir / 'ffmpeg' / 'ffmpeg-8.0-essentials_build' / 'bin',
base_dir / 'ffmpeg' / 'bin',
]
for c in candidates:
if c.is_dir():
bin_path = str(c)
if bin_path not in os.environ.get('PATH', ''):
os.environ['PATH'] = bin_path + os.pathsep + os.environ.get('PATH', '')
print(f"🔧 FFmpeg agregado al PATH (align): {bin_path}")
for exe in ('ffmpeg.exe','ffprobe.exe'):
if (c / exe).is_file():
print(f"✅ Detectado {exe} en {c}")
break
else:
print("⚠️ FFmpeg local no encontrado para alineación; se usará PATH del sistema")
except Exception as e:
print(f"⚠️ Error inyectando FFmpeg (align): {e}")
_inject_local_ffmpeg()
def _get_audio_duration_seconds(path: str) -> float:
"""Try to get duration (seconds) using torchaudio.info; fallback to ffprobe if present.
Returns 0.0 on failure."""
try:
info = torchaudio.info(path)
if info.num_frames and info.sample_rate:
return float(info.num_frames) / float(info.sample_rate)
except Exception:
pass
# Fallback to ffprobe if available
try:
from shutil import which
if which("ffprobe"):
import subprocess
cmd = [
"ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", path
]
out = subprocess.check_output(cmd, text=True).strip()
return float(out)
except Exception:
pass
return 0.0
def align_segments_subprocess(result_data, audio_path, device="cuda"):
"""
Realiza alineación de segmentos en subprocess separado para evitar deadlock CUDA
"""
print("🚀 Iniciando alineación en subprocess...")
try:
# Verificar que tenemos los datos necesarios
if not result_data or not result_data.get("segments"):
print("❌ No hay segmentos para alinear")
return False
language = result_data.get("language", "es")
segments = result_data.get("segments", [])
print(f"📐 Preparando alineación para {len(segments)} segmentos en idioma {language}")
# Detectar device disponible
if device == "cuda" and not torch.cuda.is_available():
device = "cpu"
print("⚠️ CUDA no disponible, usando CPU para alineación")
if device == "cuda" and torch.cuda.is_available():
torch.cuda.empty_cache()
gc.collect()
free_mem = torch.cuda.mem_get_info()[0] / 1024**3
print(f"📊 Memoria CUDA inicial: {free_mem:.1f}GB libres")
# Medir duración del audio de trabajo y max end de segmentos
work_audio_path = str(audio_path)
orig_audio_path = os.environ.get("ORIGINAL_AUDIO_PATH", "")
seg_max_end = 0.0
try:
seg_max_end = max([(s.get("end") or 0.0) for s in segments]) if segments else 0.0
except Exception:
seg_max_end = 0.0
work_duration = _get_audio_duration_seconds(work_audio_path)
print(f"📏 Duración audio trabajo: {work_duration:.2f}s | Máx fin segmentos: {seg_max_end:.2f}s")
# Cargar audio (robusto a diferentes firmas de retorno)
print(f"📁 Cargando audio: {work_audio_path}")
try:
audio_ret = whisperx.load_audio(work_audio_path)
if isinstance(audio_ret, (list, tuple)):
audio = audio_ret[0]
else:
audio = audio_ret
except Exception as e:
raise RuntimeError(f"Error cargando audio con whisperx.load_audio: {e}")
# 🧹 Limpieza antes de cargar modelo de alineación
if device == "cuda":
print("🧹 Limpiando memoria antes de cargar modelo de alineación...")
for _ in range(3):
torch.cuda.empty_cache()
torch.cuda.ipc_collect()
gc.collect()
time.sleep(0.1)
# Cargar modelo de alineación
print(f"📐 Cargando modelo de alineación para {language} en {device}...")
model_a, metadata = whisperx.load_align_model(language_code=language, device=device)
if device == "cuda":
allocated = torch.cuda.memory_allocated(0) / 1024**3
print(f"📊 Memoria tras cargar modelo: {allocated:.1f}GB asignada")
# Decidir si reintentar con original en caso de duración insuficiente
retried_with_original = False
audio_path_used = work_audio_path
def _run_align(audio_array):
print(f"🎯 Alineando {len(segments)} segmentos...")
return whisperx.align(segments, model_a, metadata, audio_array, device)
try:
# Pre-chequeo: si duración de trabajo parece menor que los segmentos
if work_duration and seg_max_end and (work_duration + 0.25 < seg_max_end) and orig_audio_path and os.path.isfile(orig_audio_path):
print("⚠️ Audio de trabajo parece más corto que los segmentos. Reintentando con audio ORIGINAL...")
try:
audio_ret2 = whisperx.load_audio(orig_audio_path)
audio2 = audio_ret2[0] if isinstance(audio_ret2, (list, tuple)) else audio_ret2
aligned = _run_align(audio2)
retried_with_original = True
audio_path_used = orig_audio_path
except Exception as e2:
print(f"⚠️ Falló alineación con original en pre-chequeo: {e2}. Probando audio de trabajo igualmente...")
aligned = _run_align(audio)
else:
aligned = _run_align(audio)
except Exception as e:
# Error durante alineación: si no hemos probado original y existe, reintentar una vez
if (not retried_with_original) and orig_audio_path and os.path.isfile(orig_audio_path):
print(f"⚠️ Error en alineación inicial: {e}. Reintentando con audio ORIGINAL...")
try:
audio_ret2 = whisperx.load_audio(orig_audio_path)
audio2 = audio_ret2[0] if isinstance(audio_ret2, (list, tuple)) else audio_ret2
aligned = _run_align(audio2)
retried_with_original = True
audio_path_used = orig_audio_path
except Exception as e2:
# Añadir contexto y relanzar si sigue fallando
raise RuntimeError(f"Error durante whisperx.align (reintento con original también falló): {e}; reintento: {e2}")
else:
# Añadir contexto y relanzar para el handler exterior
raise RuntimeError(f"Error durante whisperx.align: {e}")
# 🧹 Limpiar modelo de memoria
print("🧹 Liberando modelo de alineación...")
del model_a
del metadata
try:
del audio
except Exception:
pass
if device == "cuda":
for _ in range(3):
torch.cuda.empty_cache()
torch.cuda.ipc_collect()
gc.collect()
time.sleep(0.1)
final_allocated = torch.cuda.memory_allocated(0) / 1024**3
print(f"📊 Memoria final: {final_allocated:.1f}GB asignada")
# Guardar resultado
aligned_segments = aligned.get("segments", [])
result_data_aligned = {
"success": True,
"result": {
"segments": aligned_segments,
"language": language
},
"aligned_count": len(aligned_segments),
"device": device,
"audio_path": str(audio_path),
"audio_path_used": audio_path_used,
"retried_with_original": retried_with_original,
"audio_duration_sec": work_duration,
"segments_max_end_sec": seg_max_end
}
with open("alignment_result.json", "w", encoding="utf-8") as f:
json.dump(result_data_aligned, f, ensure_ascii=False, indent=2)
print(f"✅ Alineación completada: {len(aligned_segments)} segmentos alineados")
return True
except Exception as e:
# 🧹 Limpiar memoria en caso de error
if device == "cuda" and torch.cuda.is_available():
torch.cuda.empty_cache()
gc.collect()
import traceback
tb = traceback.format_exc()
# Guardar error con traceback completo para diagnóstico
error_data = {
"success": False,
"error": str(e),
"traceback": tb,
"audio_path": str(audio_path)
}
try:
with open("alignment_result.json", "w", encoding="utf-8") as f:
json.dump(error_data, f, ensure_ascii=False, indent=2)
except Exception as save_err:
print(f"⚠️ No se pudo escribir alignment_result.json: {save_err}")
print(f"❌ Error en alineación: {e}")
print(f"❌ Traceback: {tb}")
return False
if __name__ == "__main__":
try:
if len(sys.argv) < 3:
print("❌ Error: Faltan argumentos")
print("Uso: python alignment_subprocess.py <transcription_result.json> <audio_path> [device]")
sys.exit(1)
transcription_file = sys.argv[1]
audio_path = sys.argv[2]
device = sys.argv[3] if len(sys.argv) > 3 else "cuda"
print(f"🚀 Iniciando alineación subprocess: {audio_path}")
print(f"📄 Archivo transcripción: {transcription_file}")
print(f"🖥️ Device: {device}")
# Cargar datos de transcripción
with open(transcription_file, "r", encoding="utf-8") as f:
result_data = json.load(f)
if not result_data.get("success"):
print("❌ Los datos de transcripción no son válidos")
sys.exit(1)
transcription_result = result_data.get("result", {})
# Realizar alineación
success = align_segments_subprocess(transcription_result, audio_path, device)
if success:
print("✅ Alineación subprocess completada exitosamente")
sys.exit(0)
else:
print("❌ Alineación subprocess falló")
sys.exit(1)
except Exception as e:
print(f"❌ Error crítico en alignment subprocess: {e}")
import traceback
print(f"❌ Traceback: {traceback.format_exc()}")
# Guardar error para el proceso principal
error_data = {
"success": False,
"error": f"Error crítico: {str(e)}",
"traceback": traceback.format_exc()
}
try:
with open("alignment_result.json", "w", encoding="utf-8") as f:
json.dump(error_data, f, ensure_ascii=False, indent=2)
except Exception as save_error:
print(f"❌ No se pudo guardar error: {save_error}")
sys.exit(1)