Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # alignment_subprocess.py - Subprocess para alineación de WhisperX | |
| import os | |
| import sys | |
| import json | |
| import time | |
| import gc | |
| import torch | |
| import torchaudio | |
| import whisperx | |
| import tempfile | |
| from pathlib import Path | |
| def _inject_local_ffmpeg(): | |
| """Ensure bundled ffmpeg bin folder is in PATH for audio loading in subprocess. | |
| Safe no-op if not found.""" | |
| try: | |
| base_dir = Path(__file__).parent | |
| candidates = [ | |
| base_dir / 'ffmpeg' / 'ffmpeg-8.0-essentials_build' / 'bin', | |
| base_dir / 'ffmpeg' / 'bin', | |
| ] | |
| for c in candidates: | |
| if c.is_dir(): | |
| bin_path = str(c) | |
| if bin_path not in os.environ.get('PATH', ''): | |
| os.environ['PATH'] = bin_path + os.pathsep + os.environ.get('PATH', '') | |
| print(f"🔧 FFmpeg agregado al PATH (align): {bin_path}") | |
| for exe in ('ffmpeg.exe','ffprobe.exe'): | |
| if (c / exe).is_file(): | |
| print(f"✅ Detectado {exe} en {c}") | |
| break | |
| else: | |
| print("⚠️ FFmpeg local no encontrado para alineación; se usará PATH del sistema") | |
| except Exception as e: | |
| print(f"⚠️ Error inyectando FFmpeg (align): {e}") | |
| _inject_local_ffmpeg() | |
| def _get_audio_duration_seconds(path: str) -> float: | |
| """Try to get duration (seconds) using torchaudio.info; fallback to ffprobe if present. | |
| Returns 0.0 on failure.""" | |
| try: | |
| info = torchaudio.info(path) | |
| if info.num_frames and info.sample_rate: | |
| return float(info.num_frames) / float(info.sample_rate) | |
| except Exception: | |
| pass | |
| # Fallback to ffprobe if available | |
| try: | |
| from shutil import which | |
| if which("ffprobe"): | |
| import subprocess | |
| cmd = [ | |
| "ffprobe", "-v", "error", "-show_entries", "format=duration", | |
| "-of", "default=noprint_wrappers=1:nokey=1", path | |
| ] | |
| out = subprocess.check_output(cmd, text=True).strip() | |
| return float(out) | |
| except Exception: | |
| pass | |
| return 0.0 | |
| def align_segments_subprocess(result_data, audio_path, device="cuda"): | |
| """ | |
| Realiza alineación de segmentos en subprocess separado para evitar deadlock CUDA | |
| """ | |
| print("🚀 Iniciando alineación en subprocess...") | |
| try: | |
| # Verificar que tenemos los datos necesarios | |
| if not result_data or not result_data.get("segments"): | |
| print("❌ No hay segmentos para alinear") | |
| return False | |
| language = result_data.get("language", "es") | |
| segments = result_data.get("segments", []) | |
| print(f"📐 Preparando alineación para {len(segments)} segmentos en idioma {language}") | |
| # Detectar device disponible | |
| if device == "cuda" and not torch.cuda.is_available(): | |
| device = "cpu" | |
| print("⚠️ CUDA no disponible, usando CPU para alineación") | |
| if device == "cuda" and torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| free_mem = torch.cuda.mem_get_info()[0] / 1024**3 | |
| print(f"📊 Memoria CUDA inicial: {free_mem:.1f}GB libres") | |
| # Medir duración del audio de trabajo y max end de segmentos | |
| work_audio_path = str(audio_path) | |
| orig_audio_path = os.environ.get("ORIGINAL_AUDIO_PATH", "") | |
| seg_max_end = 0.0 | |
| try: | |
| seg_max_end = max([(s.get("end") or 0.0) for s in segments]) if segments else 0.0 | |
| except Exception: | |
| seg_max_end = 0.0 | |
| work_duration = _get_audio_duration_seconds(work_audio_path) | |
| print(f"📏 Duración audio trabajo: {work_duration:.2f}s | Máx fin segmentos: {seg_max_end:.2f}s") | |
| # Cargar audio (robusto a diferentes firmas de retorno) | |
| print(f"📁 Cargando audio: {work_audio_path}") | |
| try: | |
| audio_ret = whisperx.load_audio(work_audio_path) | |
| if isinstance(audio_ret, (list, tuple)): | |
| audio = audio_ret[0] | |
| else: | |
| audio = audio_ret | |
| except Exception as e: | |
| raise RuntimeError(f"Error cargando audio con whisperx.load_audio: {e}") | |
| # 🧹 Limpieza antes de cargar modelo de alineación | |
| if device == "cuda": | |
| print("🧹 Limpiando memoria antes de cargar modelo de alineación...") | |
| for _ in range(3): | |
| torch.cuda.empty_cache() | |
| torch.cuda.ipc_collect() | |
| gc.collect() | |
| time.sleep(0.1) | |
| # Cargar modelo de alineación | |
| print(f"📐 Cargando modelo de alineación para {language} en {device}...") | |
| model_a, metadata = whisperx.load_align_model(language_code=language, device=device) | |
| if device == "cuda": | |
| allocated = torch.cuda.memory_allocated(0) / 1024**3 | |
| print(f"📊 Memoria tras cargar modelo: {allocated:.1f}GB asignada") | |
| # Decidir si reintentar con original en caso de duración insuficiente | |
| retried_with_original = False | |
| audio_path_used = work_audio_path | |
| def _run_align(audio_array): | |
| print(f"🎯 Alineando {len(segments)} segmentos...") | |
| return whisperx.align(segments, model_a, metadata, audio_array, device) | |
| try: | |
| # Pre-chequeo: si duración de trabajo parece menor que los segmentos | |
| if work_duration and seg_max_end and (work_duration + 0.25 < seg_max_end) and orig_audio_path and os.path.isfile(orig_audio_path): | |
| print("⚠️ Audio de trabajo parece más corto que los segmentos. Reintentando con audio ORIGINAL...") | |
| try: | |
| audio_ret2 = whisperx.load_audio(orig_audio_path) | |
| audio2 = audio_ret2[0] if isinstance(audio_ret2, (list, tuple)) else audio_ret2 | |
| aligned = _run_align(audio2) | |
| retried_with_original = True | |
| audio_path_used = orig_audio_path | |
| except Exception as e2: | |
| print(f"⚠️ Falló alineación con original en pre-chequeo: {e2}. Probando audio de trabajo igualmente...") | |
| aligned = _run_align(audio) | |
| else: | |
| aligned = _run_align(audio) | |
| except Exception as e: | |
| # Error durante alineación: si no hemos probado original y existe, reintentar una vez | |
| if (not retried_with_original) and orig_audio_path and os.path.isfile(orig_audio_path): | |
| print(f"⚠️ Error en alineación inicial: {e}. Reintentando con audio ORIGINAL...") | |
| try: | |
| audio_ret2 = whisperx.load_audio(orig_audio_path) | |
| audio2 = audio_ret2[0] if isinstance(audio_ret2, (list, tuple)) else audio_ret2 | |
| aligned = _run_align(audio2) | |
| retried_with_original = True | |
| audio_path_used = orig_audio_path | |
| except Exception as e2: | |
| # Añadir contexto y relanzar si sigue fallando | |
| raise RuntimeError(f"Error durante whisperx.align (reintento con original también falló): {e}; reintento: {e2}") | |
| else: | |
| # Añadir contexto y relanzar para el handler exterior | |
| raise RuntimeError(f"Error durante whisperx.align: {e}") | |
| # 🧹 Limpiar modelo de memoria | |
| print("🧹 Liberando modelo de alineación...") | |
| del model_a | |
| del metadata | |
| try: | |
| del audio | |
| except Exception: | |
| pass | |
| if device == "cuda": | |
| for _ in range(3): | |
| torch.cuda.empty_cache() | |
| torch.cuda.ipc_collect() | |
| gc.collect() | |
| time.sleep(0.1) | |
| final_allocated = torch.cuda.memory_allocated(0) / 1024**3 | |
| print(f"📊 Memoria final: {final_allocated:.1f}GB asignada") | |
| # Guardar resultado | |
| aligned_segments = aligned.get("segments", []) | |
| result_data_aligned = { | |
| "success": True, | |
| "result": { | |
| "segments": aligned_segments, | |
| "language": language | |
| }, | |
| "aligned_count": len(aligned_segments), | |
| "device": device, | |
| "audio_path": str(audio_path), | |
| "audio_path_used": audio_path_used, | |
| "retried_with_original": retried_with_original, | |
| "audio_duration_sec": work_duration, | |
| "segments_max_end_sec": seg_max_end | |
| } | |
| with open("alignment_result.json", "w", encoding="utf-8") as f: | |
| json.dump(result_data_aligned, f, ensure_ascii=False, indent=2) | |
| print(f"✅ Alineación completada: {len(aligned_segments)} segmentos alineados") | |
| return True | |
| except Exception as e: | |
| # 🧹 Limpiar memoria en caso de error | |
| if device == "cuda" and torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| import traceback | |
| tb = traceback.format_exc() | |
| # Guardar error con traceback completo para diagnóstico | |
| error_data = { | |
| "success": False, | |
| "error": str(e), | |
| "traceback": tb, | |
| "audio_path": str(audio_path) | |
| } | |
| try: | |
| with open("alignment_result.json", "w", encoding="utf-8") as f: | |
| json.dump(error_data, f, ensure_ascii=False, indent=2) | |
| except Exception as save_err: | |
| print(f"⚠️ No se pudo escribir alignment_result.json: {save_err}") | |
| print(f"❌ Error en alineación: {e}") | |
| print(f"❌ Traceback: {tb}") | |
| return False | |
| if __name__ == "__main__": | |
| try: | |
| if len(sys.argv) < 3: | |
| print("❌ Error: Faltan argumentos") | |
| print("Uso: python alignment_subprocess.py <transcription_result.json> <audio_path> [device]") | |
| sys.exit(1) | |
| transcription_file = sys.argv[1] | |
| audio_path = sys.argv[2] | |
| device = sys.argv[3] if len(sys.argv) > 3 else "cuda" | |
| print(f"🚀 Iniciando alineación subprocess: {audio_path}") | |
| print(f"📄 Archivo transcripción: {transcription_file}") | |
| print(f"🖥️ Device: {device}") | |
| # Cargar datos de transcripción | |
| with open(transcription_file, "r", encoding="utf-8") as f: | |
| result_data = json.load(f) | |
| if not result_data.get("success"): | |
| print("❌ Los datos de transcripción no son válidos") | |
| sys.exit(1) | |
| transcription_result = result_data.get("result", {}) | |
| # Realizar alineación | |
| success = align_segments_subprocess(transcription_result, audio_path, device) | |
| if success: | |
| print("✅ Alineación subprocess completada exitosamente") | |
| sys.exit(0) | |
| else: | |
| print("❌ Alineación subprocess falló") | |
| sys.exit(1) | |
| except Exception as e: | |
| print(f"❌ Error crítico en alignment subprocess: {e}") | |
| import traceback | |
| print(f"❌ Traceback: {traceback.format_exc()}") | |
| # Guardar error para el proceso principal | |
| error_data = { | |
| "success": False, | |
| "error": f"Error crítico: {str(e)}", | |
| "traceback": traceback.format_exc() | |
| } | |
| try: | |
| with open("alignment_result.json", "w", encoding="utf-8") as f: | |
| json.dump(error_data, f, ensure_ascii=False, indent=2) | |
| except Exception as save_error: | |
| print(f"❌ No se pudo guardar error: {save_error}") | |
| sys.exit(1) | |