import gradio as gr import torch import yt_dlp import os import subprocess import json from transformers import AutoTokenizer, AutoModelForCausalLM import moviepy.editor as mp import langdetect import uuid import time import random import tempfile import shutil import traceback import sys # --- CONFIGURACIÓN INICIAL CON DEBUGGING --- print("🔧 DEBUGGING MODE ACTIVATED") print("🐍 Python version:", sys.version) print("📦 Starting intensive debugging version...") # Verificar dependencias críticas dependencies_status = {} try: import curl_cffi dependencies_status['curl_cffi'] = f"✅ Available (v{curl_cffi.__version__})" CURL_CFFI_AVAILABLE = True except ImportError as e: dependencies_status['curl_cffi'] = f"❌ Not available: {str(e)}" CURL_CFFI_AVAILABLE = False try: import yt_dlp dependencies_status['yt_dlp'] = f"✅ Available (v{yt_dlp.__version__})" except Exception as e: dependencies_status['yt_dlp'] = f"❌ Error: {str(e)}" # Verificar impersonation targets try: with yt_dlp.YoutubeDL() as ydl: # Test para ver qué targets están disponibles print("🧪 Testing impersonation targets...") test_result = subprocess.run(['yt-dlp', '--list-impersonate-targets'], capture_output=True, text=True, timeout=10) if test_result.returncode == 0: targets = test_result.stdout dependencies_status['impersonation'] = f"✅ Available targets:\n{targets[:200]}..." else: dependencies_status['impersonation'] = f"❌ Command failed: {test_result.stderr}" except Exception as e: dependencies_status['impersonation'] = f"❌ Test failed: {str(e)}" print("\n📊 DEPENDENCY STATUS:") for dep, status in dependencies_status.items(): print(f" {dep}: {status}") # Carga del modelo model_path = "Qwen/Qwen2.5-7B-Instruct" print(f"\n🤖 Loading model {model_path}...") try: tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True) model = AutoModelForCausalLM.from_pretrained(model_path, torch_dtype=torch.float16, trust_remote_code=True) model = model.eval() print("✅ Model loaded successfully") dependencies_status['model'] = "✅ Loaded" except Exception as e: print(f"❌ Model loading failed: {e}") dependencies_status['model'] = f"❌ Failed: {str(e)}" model = None tokenizer = None # --- FUNCIONES DE DEBUGGING --- def debug_log(message, level="INFO"): """Log con timestamp""" timestamp = time.strftime("%H:%M:%S") print(f"[{timestamp}] {level}: {message}") def test_yt_dlp_basic(): """Test básico de yt-dlp""" debug_log("Testing basic yt-dlp functionality...") try: # Test con video público conocido test_url = "https://www.youtube.com/watch?v=dQw4w9WgXcQ" # Rick Roll - siempre funciona ydl_opts = {'quiet': True, 'no_warnings': True} with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(test_url, download=False) if info: debug_log("✅ Basic yt-dlp test PASSED") return True else: debug_log("❌ Basic yt-dlp test FAILED - no info returned") return False except Exception as e: debug_log(f"❌ Basic yt-dlp test FAILED: {str(e)}", "ERROR") return False def test_vimeo_access(): """Test específico de acceso a Vimeo""" debug_log("Testing Vimeo access...") try: # Video público de test de Vimeo test_url = "https://vimeo.com/34741214" ydl_opts = { 'quiet': True, 'no_warnings': True, 'socket_timeout': 15 } # Test 1: Sin impersonation debug_log("Test 1: Basic Vimeo access (no impersonation)") with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(test_url, download=False) if info: debug_log("✅ Basic Vimeo access: SUCCESS") return "basic_success" except Exception as e: debug_log(f"❌ Basic Vimeo access failed: {str(e)}", "ERROR") # Test 2: Con impersonation si está disponible if CURL_CFFI_AVAILABLE: try: debug_log("Test 2: Vimeo with chrome120 impersonation") ydl_opts['impersonate'] = 'chrome120' with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(test_url, download=False) if info: debug_log("✅ Vimeo with impersonation: SUCCESS") return "impersonation_success" except Exception as e2: debug_log(f"❌ Vimeo with impersonation failed: {str(e2)}", "ERROR") return f"failed: {str(e)}" # Ejecutar tests al inicio basic_test_result = test_yt_dlp_basic() vimeo_test_result = test_vimeo_access() dependencies_status['basic_test'] = "✅ Passed" if basic_test_result else "❌ Failed" dependencies_status['vimeo_test'] = vimeo_test_result # --- FUNCIONES AUXILIARES --- def generate_unique_filename(extension): return f"{uuid.uuid4()}{extension}" def cleanup_files(*files): for file in files: if file and os.path.exists(file): try: os.remove(file) debug_log(f"Cleaned up: {file}") except OSError as e: debug_log(f"Cleanup error: {file} - {e}", "ERROR") def clean_url(url): """Limpiar URL como en código exitoso""" cleaned = url.split('?')[0] if '?' in url else url debug_log(f"URL cleaned: {url} → {cleaned}") return cleaned # --- FUNCIÓN PRINCIPAL CON DEBUGGING INTENSIVO --- def download_video_debug(url): """ Función con debugging intensivo para identificar exactamente dónde falla """ debug_log(f"🎯 STARTING DEBUG DOWNLOAD", "INFO") debug_log(f"Original URL: {url}") # Limpiar URL clean_url_value = clean_url(url) debug_log(f"Cleaned URL: {clean_url_value}") # Crear directorio temporal with tempfile.TemporaryDirectory() as temp_dir: debug_log(f"Temp directory: {temp_dir}") temp_filename = generate_unique_filename("") debug_log(f"Temp filename base: {temp_filename}") # Lista de métodos a probar methods = [ { 'name': 'Chrome120 Impersonation', 'opts': { 'format': 'bestaudio/best', 'outtmpl': os.path.join(temp_dir, f'{temp_filename}_method1.%(ext)s'), 'postprocessors': [{'key': 'FFmpegExtractAudio', 'preferredcodec': 'wav', 'preferredquality': '64'}], 'quiet': True, 'no_warnings': True, 'retries': 1, 'socket_timeout': 30, 'impersonate': 'chrome120', 'postprocessor_args': ['-ar', '16000', '-ac', '1'] } }, { 'name': 'Chrome Generic', 'opts': { 'format': 'bestaudio/best', 'outtmpl': os.path.join(temp_dir, f'{temp_filename}_method2.%(ext)s'), 'postprocessors': [{'key': 'FFmpegExtractAudio', 'preferredcodec': 'wav'}], 'quiet': True, 'no_warnings': True, 'retries': 1, 'socket_timeout': 30, 'impersonate': 'chrome' } }, { 'name': 'No Impersonation', 'opts': { 'format': 'bestaudio/best', 'outtmpl': os.path.join(temp_dir, f'{temp_filename}_method3.%(ext)s'), 'postprocessors': [{'key': 'FFmpegExtractAudio', 'preferredcodec': 'wav'}], 'quiet': True, 'no_warnings': True, 'retries': 1, 'socket_timeout': 45 } }, { 'name': 'Basic Fallback', 'opts': { 'format': 'best', 'outtmpl': os.path.join(temp_dir, f'{temp_filename}_method4.%(ext)s'), 'quiet': False, 'no_warnings': False, 'retries': 1, 'socket_timeout': 60 } } ] # Probar cada método for i, method in enumerate(methods): method_name = method['name'] ydl_opts = method['opts'] debug_log(f"\n🔄 METHOD {i+1}: {method_name}") debug_log(f"Options: {json.dumps(ydl_opts, indent=2)}") try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: # Paso 1: Extraer información debug_log("Step 1: Extracting video info...") try: info_dict = ydl.extract_info(clean_url_value, download=False) if not info_dict: raise Exception("No info_dict returned") video_title = info_dict.get('title', 'Unknown') duration = info_dict.get('duration') uploader = info_dict.get('uploader', 'Unknown') debug_log(f"✅ Info extracted successfully:") debug_log(f" Title: {video_title}") debug_log(f" Duration: {duration} seconds") debug_log(f" Uploader: {uploader}") # Verificar duración if duration and duration > 1800: # 30 minutos raise Exception(f"Video too long: {duration}s > 1800s") except Exception as extract_error: debug_log(f"❌ Info extraction failed: {str(extract_error)}", "ERROR") debug_log(f"Full traceback: {traceback.format_exc()}", "ERROR") continue # Paso 2: Descargar debug_log("Step 2: Downloading audio...") try: ydl.download([clean_url_value]) debug_log("✅ Download command completed") except Exception as download_error: debug_log(f"❌ Download failed: {str(download_error)}", "ERROR") debug_log(f"Full traceback: {traceback.format_exc()}", "ERROR") continue # Paso 3: Verificar archivos descargados debug_log("Step 3: Checking downloaded files...") files_found = [] for filename in os.listdir(temp_dir): file_path = os.path.join(temp_dir, filename) file_size = os.path.getsize(file_path) files_found.append((filename, file_size)) debug_log(f" Found: {filename} ({file_size} bytes)") # Buscar archivo de audio válido for filename, file_size in files_found: if filename.endswith(('.wav', '.mp3', '.m4a', '.webm')) and file_size >= 1024: source_path = os.path.join(temp_dir, filename) final_path = generate_unique_filename(".wav") shutil.copy2(source_path, final_path) debug_log(f"✅ SUCCESS with {method_name}!") debug_log(f" Final file: {final_path} ({file_size} bytes)") return final_path, f"Success with {method_name}" debug_log(f"❌ No valid audio file found from {method_name}", "ERROR") except Exception as method_error: debug_log(f"❌ Method {method_name} failed completely: {str(method_error)}", "ERROR") debug_log(f"Full traceback: {traceback.format_exc()}", "ERROR") continue # Si llegamos aquí, todos los métodos fallaron debug_log("❌ ALL METHODS FAILED", "ERROR") raise Exception("All download methods failed. Check debug logs for details.") def transcribe_audio_debug(file_path): """Transcripción con debugging""" debug_log(f"🎤 Starting transcription: {file_path}") try: if not os.path.exists(file_path): raise FileNotFoundError(f"Audio file not found: {file_path}") file_size = os.path.getsize(file_path) debug_log(f"Audio file size: {file_size} bytes") if file_size < 1000: raise Exception("Audio file too small") # Usar whisper directamente para debugging output_file = generate_unique_filename(".json") command = [ "insanely-fast-whisper", "--file-name", file_path, "--device-id", "cpu", "--model-name", "openai/whisper-large-v3", "--task", "transcribe", "--timestamp", "chunk", "--transcript-path", output_file, "--batch-size", "1", ] debug_log(f"Whisper command: {' '.join(command)}") result = subprocess.run( command, check=True, capture_output=True, text=True, timeout=300 ) debug_log(f"Whisper stdout: {result.stdout}") if result.stderr: debug_log(f"Whisper stderr: {result.stderr}") # Leer resultado with open(output_file, "r", encoding='utf-8') as f: transcription_data = json.load(f) result_text = transcription_data.get("text", "").strip() cleanup_files(output_file) debug_log(f"Transcription completed: {len(result_text)} characters") return result_text except Exception as e: debug_log(f"Transcription error: {str(e)}", "ERROR") debug_log(f"Full traceback: {traceback.format_exc()}", "ERROR") raise def generate_summary_debug(transcription): """Resumen con debugging""" if not transcription or len(transcription.strip()) < 20: return "⚠️ Transcription too short" if not model: return "⚠️ Model not available" try: detected_language = langdetect.detect(transcription) debug_log(f"Detected language: {detected_language}") except: detected_language = "en" text = transcription[:8000] prompt = f"Summarize this in {detected_language} (150-250 words): {text}" try: response, _ = model.chat(tokenizer, prompt, history=[]) return response except Exception as e: debug_log(f"Summary error: {str(e)}", "ERROR") return f"Summary error: {str(e)}" # --- FUNCIONES DE INTERFAZ CON DEBUGGING --- def process_video_debug(url): """Función principal con debugging completo""" if not url or not url.strip(): return "❌ No URL provided", "❌ No URL" url = url.strip() debug_log(f"\n{'='*60}") debug_log(f"🎯 PROCESSING VIDEO WITH FULL DEBUGGING") debug_log(f"URL: {url}") debug_log(f"Dependencies: {json.dumps(dependencies_status, indent=2)}") debug_log(f"{'='*60}") audio_file = None try: # Download con debugging audio_file, download_method = download_video_debug(url) debug_log(f"Download successful with: {download_method}") # Transcription con debugging transcription = transcribe_audio_debug(audio_file) success_msg = f"✅ Success with {download_method} ({len(transcription)} chars)" debug_log(f"FINAL SUCCESS: {success_msg}") return transcription, success_msg except Exception as e: error_msg = str(e) debug_log(f"FINAL ERROR: {error_msg}", "ERROR") debug_log(f"Full traceback: {traceback.format_exc()}", "ERROR") # Error detallado detailed_error = f"""❌ DETAILED ERROR REPORT: Primary Error: {error_msg} System Status: {json.dumps(dependencies_status, indent=2)} Debug Info: - curl-cffi: {dependencies_status.get('curl_cffi', 'Unknown')} - Basic test: {dependencies_status.get('basic_test', 'Unknown')} - Vimeo test: {dependencies_status.get('vimeo_test', 'Unknown')} Check console logs for full debugging information.""" return detailed_error, f"❌ Error: {error_msg[:50]}..." finally: if audio_file and os.path.exists(audio_file): cleanup_files(audio_file) # --- INTERFAZ GRADIO CON DEBUGGING --- with gr.Blocks(theme=gr.themes.Soft(), title="🔧 Debug Mode") as demo: gr.Markdown("# 🔧 DEBUG MODE - Intensive Diagnostics") # Status de dependencias status_md = "## 📊 System Status\n" for dep, status in dependencies_status.items(): status_md += f"- **{dep}**: {status}\n" gr.Markdown(status_md) with gr.Row(): url_input = gr.Textbox( label="Video URL for Debug", placeholder="https://vimeo.com/34741214 (test video)", info="Try with this test URL first" ) url_button = gr.Button("🔧 Debug Process", variant="primary") with gr.Row(): transcription_output = gr.Textbox( label="📝 Transcription / Debug Output", lines=20, interactive=True ) summary_output = gr.Textbox( label="📊 Summary", lines=10 ) status_output = gr.Textbox( label="🔍 Debug Status", interactive=False, lines=3 ) summary_button = gr.Button("📝 Generate Summary", variant="secondary") gr.Markdown(""" ## 🔍 Debug Instructions 1. **Try the test URL first**: `https://vimeo.com/34741214` 2. **Check console logs** in your browser (F12 → Console) 3. **Report any errors** with the full debug output This version will tell us **exactly** where and why it's failing. """) # Event handlers url_button.click( fn=process_video_debug, inputs=[url_input], outputs=[transcription_output, status_output] ) summary_button.click( fn=generate_summary_debug, inputs=[transcription_output], outputs=[summary_output] ) debug_log("🎨 Gradio interface ready in DEBUG MODE") demo.launch(server_name="0.0.0.0", server_port=7860, show_error=True)