#!/usr/bin/env python3 # alignment_subprocess.py - Subprocess para alineación de WhisperX import os import sys import json import time import gc import torch import torchaudio import whisperx import tempfile from pathlib import Path def _inject_local_ffmpeg(): """Ensure bundled ffmpeg bin folder is in PATH for audio loading in subprocess. Safe no-op if not found.""" try: base_dir = Path(__file__).parent candidates = [ base_dir / 'ffmpeg' / 'ffmpeg-8.0-essentials_build' / 'bin', base_dir / 'ffmpeg' / 'bin', ] for c in candidates: if c.is_dir(): bin_path = str(c) if bin_path not in os.environ.get('PATH', ''): os.environ['PATH'] = bin_path + os.pathsep + os.environ.get('PATH', '') print(f"🔧 FFmpeg agregado al PATH (align): {bin_path}") for exe in ('ffmpeg.exe','ffprobe.exe'): if (c / exe).is_file(): print(f"✅ Detectado {exe} en {c}") break else: print("⚠️ FFmpeg local no encontrado para alineación; se usará PATH del sistema") except Exception as e: print(f"⚠️ Error inyectando FFmpeg (align): {e}") _inject_local_ffmpeg() def _get_audio_duration_seconds(path: str) -> float: """Try to get duration (seconds) using torchaudio.info; fallback to ffprobe if present. Returns 0.0 on failure.""" try: info = torchaudio.info(path) if info.num_frames and info.sample_rate: return float(info.num_frames) / float(info.sample_rate) except Exception: pass # Fallback to ffprobe if available try: from shutil import which if which("ffprobe"): import subprocess cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", path ] out = subprocess.check_output(cmd, text=True).strip() return float(out) except Exception: pass return 0.0 def align_segments_subprocess(result_data, audio_path, device="cuda"): """ Realiza alineación de segmentos en subprocess separado para evitar deadlock CUDA """ print("🚀 Iniciando alineación en subprocess...") try: # Verificar que tenemos los datos necesarios if not result_data or not result_data.get("segments"): print("❌ No hay segmentos para alinear") return False language = result_data.get("language", "es") segments = result_data.get("segments", []) print(f"📐 Preparando alineación para {len(segments)} segmentos en idioma {language}") # Detectar device disponible if device == "cuda" and not torch.cuda.is_available(): device = "cpu" print("⚠️ CUDA no disponible, usando CPU para alineación") if device == "cuda" and torch.cuda.is_available(): torch.cuda.empty_cache() gc.collect() free_mem = torch.cuda.mem_get_info()[0] / 1024**3 print(f"📊 Memoria CUDA inicial: {free_mem:.1f}GB libres") # Medir duración del audio de trabajo y max end de segmentos work_audio_path = str(audio_path) orig_audio_path = os.environ.get("ORIGINAL_AUDIO_PATH", "") seg_max_end = 0.0 try: seg_max_end = max([(s.get("end") or 0.0) for s in segments]) if segments else 0.0 except Exception: seg_max_end = 0.0 work_duration = _get_audio_duration_seconds(work_audio_path) print(f"📏 Duración audio trabajo: {work_duration:.2f}s | Máx fin segmentos: {seg_max_end:.2f}s") # Cargar audio (robusto a diferentes firmas de retorno) print(f"📁 Cargando audio: {work_audio_path}") try: audio_ret = whisperx.load_audio(work_audio_path) if isinstance(audio_ret, (list, tuple)): audio = audio_ret[0] else: audio = audio_ret except Exception as e: raise RuntimeError(f"Error cargando audio con whisperx.load_audio: {e}") # 🧹 Limpieza antes de cargar modelo de alineación if device == "cuda": print("🧹 Limpiando memoria antes de cargar modelo de alineación...") for _ in range(3): torch.cuda.empty_cache() torch.cuda.ipc_collect() gc.collect() time.sleep(0.1) # Cargar modelo de alineación print(f"📐 Cargando modelo de alineación para {language} en {device}...") model_a, metadata = whisperx.load_align_model(language_code=language, device=device) if device == "cuda": allocated = torch.cuda.memory_allocated(0) / 1024**3 print(f"📊 Memoria tras cargar modelo: {allocated:.1f}GB asignada") # Decidir si reintentar con original en caso de duración insuficiente retried_with_original = False audio_path_used = work_audio_path def _run_align(audio_array): print(f"🎯 Alineando {len(segments)} segmentos...") return whisperx.align(segments, model_a, metadata, audio_array, device) try: # Pre-chequeo: si duración de trabajo parece menor que los segmentos if work_duration and seg_max_end and (work_duration + 0.25 < seg_max_end) and orig_audio_path and os.path.isfile(orig_audio_path): print("⚠️ Audio de trabajo parece más corto que los segmentos. Reintentando con audio ORIGINAL...") try: audio_ret2 = whisperx.load_audio(orig_audio_path) audio2 = audio_ret2[0] if isinstance(audio_ret2, (list, tuple)) else audio_ret2 aligned = _run_align(audio2) retried_with_original = True audio_path_used = orig_audio_path except Exception as e2: print(f"⚠️ Falló alineación con original en pre-chequeo: {e2}. Probando audio de trabajo igualmente...") aligned = _run_align(audio) else: aligned = _run_align(audio) except Exception as e: # Error durante alineación: si no hemos probado original y existe, reintentar una vez if (not retried_with_original) and orig_audio_path and os.path.isfile(orig_audio_path): print(f"⚠️ Error en alineación inicial: {e}. Reintentando con audio ORIGINAL...") try: audio_ret2 = whisperx.load_audio(orig_audio_path) audio2 = audio_ret2[0] if isinstance(audio_ret2, (list, tuple)) else audio_ret2 aligned = _run_align(audio2) retried_with_original = True audio_path_used = orig_audio_path except Exception as e2: # Añadir contexto y relanzar si sigue fallando raise RuntimeError(f"Error durante whisperx.align (reintento con original también falló): {e}; reintento: {e2}") else: # Añadir contexto y relanzar para el handler exterior raise RuntimeError(f"Error durante whisperx.align: {e}") # 🧹 Limpiar modelo de memoria print("🧹 Liberando modelo de alineación...") del model_a del metadata try: del audio except Exception: pass if device == "cuda": for _ in range(3): torch.cuda.empty_cache() torch.cuda.ipc_collect() gc.collect() time.sleep(0.1) final_allocated = torch.cuda.memory_allocated(0) / 1024**3 print(f"📊 Memoria final: {final_allocated:.1f}GB asignada") # Guardar resultado aligned_segments = aligned.get("segments", []) result_data_aligned = { "success": True, "result": { "segments": aligned_segments, "language": language }, "aligned_count": len(aligned_segments), "device": device, "audio_path": str(audio_path), "audio_path_used": audio_path_used, "retried_with_original": retried_with_original, "audio_duration_sec": work_duration, "segments_max_end_sec": seg_max_end } with open("alignment_result.json", "w", encoding="utf-8") as f: json.dump(result_data_aligned, f, ensure_ascii=False, indent=2) print(f"✅ Alineación completada: {len(aligned_segments)} segmentos alineados") return True except Exception as e: # 🧹 Limpiar memoria en caso de error if device == "cuda" and torch.cuda.is_available(): torch.cuda.empty_cache() gc.collect() import traceback tb = traceback.format_exc() # Guardar error con traceback completo para diagnóstico error_data = { "success": False, "error": str(e), "traceback": tb, "audio_path": str(audio_path) } try: with open("alignment_result.json", "w", encoding="utf-8") as f: json.dump(error_data, f, ensure_ascii=False, indent=2) except Exception as save_err: print(f"⚠️ No se pudo escribir alignment_result.json: {save_err}") print(f"❌ Error en alineación: {e}") print(f"❌ Traceback: {tb}") return False if __name__ == "__main__": try: if len(sys.argv) < 3: print("❌ Error: Faltan argumentos") print("Uso: python alignment_subprocess.py [device]") sys.exit(1) transcription_file = sys.argv[1] audio_path = sys.argv[2] device = sys.argv[3] if len(sys.argv) > 3 else "cuda" print(f"🚀 Iniciando alineación subprocess: {audio_path}") print(f"📄 Archivo transcripción: {transcription_file}") print(f"🖥️ Device: {device}") # Cargar datos de transcripción with open(transcription_file, "r", encoding="utf-8") as f: result_data = json.load(f) if not result_data.get("success"): print("❌ Los datos de transcripción no son válidos") sys.exit(1) transcription_result = result_data.get("result", {}) # Realizar alineación success = align_segments_subprocess(transcription_result, audio_path, device) if success: print("✅ Alineación subprocess completada exitosamente") sys.exit(0) else: print("❌ Alineación subprocess falló") sys.exit(1) except Exception as e: print(f"❌ Error crítico en alignment subprocess: {e}") import traceback print(f"❌ Traceback: {traceback.format_exc()}") # Guardar error para el proceso principal error_data = { "success": False, "error": f"Error crítico: {str(e)}", "traceback": traceback.format_exc() } try: with open("alignment_result.json", "w", encoding="utf-8") as f: json.dump(error_data, f, ensure_ascii=False, indent=2) except Exception as save_error: print(f"❌ No se pudo guardar error: {save_error}") sys.exit(1)