Spaces:
Build error
Build error
| #!/usr/bin/env python3 | |
| """ | |
| Hugging Face Spaces Entry Point for AudioScribe Backend | |
| Integrated FastAPI + Gradio on single port - FIXED VERSION | |
| """ | |
| import gradio as gr | |
| import subprocess | |
| import sys | |
| import os | |
| import asyncio | |
| import time | |
| import json | |
| from fastapi import FastAPI, File, UploadFile, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| import requests | |
| import tempfile | |
| # Import our main FastAPI app components | |
| try: | |
| from main import (app as fastapi_app, transcription_pipeline, summarization_pipeline, | |
| is_youtube_url, download_youtube_audio, preprocess_audio, generate_summary) | |
| print("β Successfully imported from main.py") | |
| except ImportError as e: | |
| print(f"β οΈ Could not import from main.py: {e}") | |
| fastapi_app = None | |
| def check_system_dependencies(): | |
| """Check if required system dependencies are available""" | |
| try: | |
| subprocess.run(["ffmpeg", "-version"], capture_output=True, check=True) | |
| print("β ffmpeg is available") | |
| except (subprocess.CalledProcessError, FileNotFoundError): | |
| print("β οΈ ffmpeg not found") | |
| try: | |
| result = subprocess.run(["yt-dlp", "--version"], capture_output=True, text=True, check=True) | |
| print(f"β yt-dlp version: {result.stdout.strip()}") | |
| except (subprocess.CalledProcessError, FileNotFoundError): | |
| print("β οΈ yt-dlp not found - installing...") | |
| subprocess.run([sys.executable, "-m", "pip", "install", "--upgrade", "yt-dlp"], check=True) | |
| # Run system checks | |
| print("π§ Checking system dependencies...") | |
| check_system_dependencies() | |
| # Create a service status tracker | |
| service_status = { | |
| "fastapi_ready": False, | |
| "models_loaded": False, | |
| "startup_time": time.time(), | |
| "error": None | |
| } | |
| def get_service_status(): | |
| """Get current service status""" | |
| try: | |
| if fastapi_app is None: | |
| return "β FastAPI app not available" | |
| # Check if we can access the models | |
| from main import transcription_pipeline, summarization_pipeline | |
| transcription_ready = transcription_pipeline is not None | |
| summarization_ready = summarization_pipeline is not None | |
| elapsed = time.time() - service_status["startup_time"] | |
| if transcription_ready: | |
| service_status["models_loaded"] = True | |
| return f"β AudioScribe is ready! (startup took {elapsed:.1f}s)" | |
| else: | |
| return f"π€ Loading AI models... ({elapsed:.1f}s elapsed)" | |
| except Exception as e: | |
| return f"β Service error: {str(e)}" | |
| def transcribe_youtube_direct(youtube_url, mode): | |
| """Direct YouTube transcription with proper error handling""" | |
| try: | |
| if not youtube_url or not youtube_url.strip(): | |
| return "Error", "Please provide a YouTube URL", "β No URL provided" | |
| # Validate YouTube URL | |
| if not any(domain in youtube_url.lower() for domain in ['youtube.com', 'youtu.be']): | |
| return "Error", "Please provide a valid YouTube URL", "β Invalid URL format" | |
| # Check if models are loaded | |
| try: | |
| from main import transcription_pipeline, summarization_pipeline | |
| if transcription_pipeline is None: | |
| elapsed = time.time() - service_status["startup_time"] | |
| return ( | |
| "β³ Models Loading", | |
| f"AI models are still loading... ({elapsed:.0f}s elapsed)\n\nThis usually takes 1-2 minutes on first startup. Please wait and try again.", | |
| "π€ Please wait - transcription model is loading" | |
| ) | |
| except ImportError: | |
| return "Error", "Backend not properly initialized", "β System error" | |
| print(f"π¬ Processing YouTube URL: {youtube_url}") | |
| # Download audio with comprehensive error handling | |
| try: | |
| audio_path = download_youtube_audio(youtube_url) | |
| except Exception as e: | |
| error_msg = str(e) | |
| # Handle specific network error types | |
| if "Network Access Restricted" in error_msg or "YouTube Access Blocked" in error_msg: | |
| return ( | |
| "π Network Restriction", | |
| "**π« YouTube Blocked by Hugging Face**\n\n" + | |
| "This Space cannot access YouTube due to network policies. " + | |
| "This is a **platform limitation**, not a bug.\n\n" + | |
| "**β Easy Workaround:**\n" + | |
| "1. Download the video's audio using any YouTube downloader\n" + | |
| "2. Upload the audio file using the 'π΅ Audio Upload' tab above\n" + | |
| "3. Get the same transcription and AI study notes!\n\n" + | |
| "*This restriction exists in most cloud platforms for resource management.*", | |
| "π Use Audio Upload instead" | |
| ) | |
| elif "timeout" in error_msg.lower(): | |
| return ( | |
| "β±οΈ Network Timeout", | |
| "The download took too long. This might be due to network restrictions or a large video.\n\n**Try:** Upload the audio file directly instead.", | |
| "β±οΈ Try audio upload" | |
| ) | |
| elif "unavailable" in error_msg.lower() or "private" in error_msg.lower(): | |
| return ( | |
| "πΉ Video Unavailable", | |
| "This video is unavailable, private, or has been removed from YouTube.", | |
| "πΉ Video not accessible" | |
| ) | |
| elif "403" in error_msg or "forbidden" in error_msg.lower(): | |
| return ( | |
| "π Access Denied", | |
| "This video is age-restricted, geo-blocked, or requires special permissions.", | |
| "π Video restricted" | |
| ) | |
| else: | |
| return ( | |
| "β Download Failed", | |
| f"YouTube download error: {error_msg[:300]}...\n\n**Alternative:** Try uploading the audio file directly.", | |
| "β Use audio upload instead" | |
| ) | |
| # Continue with transcription if download succeeded | |
| try: | |
| file_size = os.path.getsize(audio_path) if os.path.exists(audio_path) else 0 | |
| # Preprocess audio | |
| processed_audio = preprocess_audio(audio_path) | |
| # Transcribe | |
| result = transcription_pipeline(processed_audio) | |
| # Extract transcription text | |
| if isinstance(result, dict) and "text" in result: | |
| transcription = result["text"].strip() | |
| elif isinstance(result, dict) and "chunks" in result: | |
| transcription = " ".join([chunk["text"] for chunk in result["chunks"]]).strip() | |
| else: | |
| transcription = str(result).strip() | |
| if not transcription: | |
| return "Error", "No speech detected in YouTube video", "β No speech found" | |
| # Generate summary/notes | |
| summary = generate_summary(transcription, mode) | |
| # Cleanup | |
| try: | |
| os.unlink(audio_path) | |
| if processed_audio != audio_path: | |
| os.unlink(processed_audio) | |
| except: | |
| pass | |
| return ( | |
| transcription, | |
| summary, | |
| f"β Success! File size: {file_size:,} bytes" | |
| ) | |
| except Exception as e: | |
| return ( | |
| "Error", | |
| f"Transcription processing failed: {str(e)}", | |
| "β Processing error" | |
| ) | |
| except Exception as e: | |
| print(f"β YouTube transcription error: {e}") | |
| return ( | |
| "Error", | |
| f"Processing failed: {str(e)}\n\n**Try:** Upload an audio file instead.", | |
| "β Use audio upload" | |
| ) | |
| def transcribe_audio_direct(audio_file): | |
| """Direct audio transcription without HTTP calls""" | |
| try: | |
| if audio_file is None: | |
| return "Error", "No audio file provided", "β No file uploaded" | |
| # Check if models are loaded | |
| try: | |
| from main import transcription_pipeline, summarization_pipeline | |
| if transcription_pipeline is None: | |
| elapsed = time.time() - service_status["startup_time"] | |
| return ( | |
| "β³ Models Loading", | |
| f"AI models are still loading... ({elapsed:.0f}s elapsed)\n\nThis usually takes 1-2 minutes on first startup. Please wait and try again.", | |
| "π€ Please wait - transcription model is loading" | |
| ) | |
| except ImportError: | |
| return "Error", "Backend not properly initialized", "β System error" | |
| print(f"π΅ Processing audio file: {audio_file}") | |
| # Get file info | |
| file_size = os.path.getsize(audio_file) if os.path.exists(audio_file) else 0 | |
| filename = os.path.basename(audio_file) | |
| # Preprocess audio | |
| processed_audio = preprocess_audio(audio_file) | |
| # Transcribe | |
| result = transcription_pipeline(processed_audio) | |
| # Extract transcription text | |
| if isinstance(result, dict) and "text" in result: | |
| transcription = result["text"].strip() | |
| elif isinstance(result, dict) and "chunks" in result: | |
| transcription = " ".join([chunk["text"] for chunk in result["chunks"]]).strip() | |
| else: | |
| transcription = str(result).strip() | |
| if not transcription: | |
| return "Error", "No speech detected in audio file", "β No speech found" | |
| # Generate summary | |
| summary = generate_summary(transcription) | |
| # Cleanup | |
| try: | |
| if processed_audio != audio_file: | |
| os.unlink(processed_audio) | |
| except: | |
| pass | |
| return ( | |
| transcription, | |
| summary, | |
| f"β Success! File: {filename}, Size: {file_size:,} bytes" | |
| ) | |
| except Exception as e: | |
| print(f"β Audio transcription error: {e}") | |
| return "Error", f"Audio transcription failed: {str(e)}", "β Processing failed" | |
| # Create Gradio interface | |
| with gr.Blocks( | |
| title="π΅ AudioScribe - AI Audio Transcription", | |
| theme=gr.themes.Soft(), | |
| css=""" | |
| .gradio-container { | |
| font-family: 'Segoe UI', system-ui, sans-serif; | |
| } | |
| .gr-button { | |
| background: linear-gradient(90deg, #4f46e5 0%, #7c3aed 100%); | |
| border: none; | |
| } | |
| .gr-button:hover { | |
| background: linear-gradient(90deg, #4338ca 0%, #6d28d9 100%); | |
| transform: translateY(-1px); | |
| box-shadow: 0 4px 12px rgba(0,0,0,0.15); | |
| } | |
| """ | |
| ) as demo: | |
| gr.HTML(""" | |
| <div style="text-align: center; padding: 20px;"> | |
| <h1 style="color: #4f46e5; margin-bottom: 10px;">π΅ AudioScribe</h1> | |
| <p style="color: #6b7280; font-size: 18px;">AI-Powered Audio Transcription & Summarization</p> | |
| <p style="color: #9ca3af;">Convert audio/video files and YouTube videos to text with AI-generated summaries</p> | |
| </div> | |
| """) | |
| # Service status display | |
| with gr.Row(): | |
| startup_display = gr.Textbox( | |
| label="π Service Status", | |
| value=get_service_status(), | |
| interactive=False | |
| ) | |
| refresh_btn = gr.Button("π Refresh Status", size="sm") | |
| with gr.Tabs(): | |
| # YouTube Tab | |
| with gr.TabItem("π¬ YouTube Video", elem_id="youtube-tab"): | |
| gr.HTML("<h3>πΉ Transcribe YouTube Videos</h3>") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| youtube_url = gr.Textbox( | |
| label="YouTube URL", | |
| placeholder="https://www.youtube.com/watch?v=...", | |
| lines=1 | |
| ) | |
| mode = gr.Radio( | |
| choices=["summarize", "notes"], | |
| value="summarize", | |
| label="Output Mode", | |
| info="Choose between summary or detailed study notes" | |
| ) | |
| youtube_btn = gr.Button("π Transcribe YouTube Video", variant="primary") | |
| with gr.Column(scale=1): | |
| youtube_status = gr.Textbox(label="Status", interactive=False) | |
| with gr.Row(): | |
| youtube_transcription = gr.Textbox( | |
| label="π Transcription", | |
| lines=8, | |
| max_lines=15 | |
| ) | |
| youtube_summary = gr.Textbox( | |
| label="π Summary/Notes", | |
| lines=8, | |
| max_lines=15 | |
| ) | |
| # Audio Upload Tab | |
| with gr.TabItem("π΅ Audio Upload", elem_id="upload-tab"): | |
| gr.HTML("<h3>π Upload Audio/Video Files</h3>") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| audio_file = gr.Audio( | |
| label="Upload Audio/Video File", | |
| type="filepath" | |
| ) | |
| upload_btn = gr.Button("π Transcribe Audio", variant="primary") | |
| with gr.Column(scale=1): | |
| upload_status = gr.Textbox(label="Status", interactive=False) | |
| with gr.Row(): | |
| upload_transcription = gr.Textbox( | |
| label="π Transcription", | |
| lines=8, | |
| max_lines=15 | |
| ) | |
| upload_summary = gr.Textbox( | |
| label="π Summary", | |
| lines=8, | |
| max_lines=15 | |
| ) | |
| # Event handlers | |
| youtube_btn.click( | |
| fn=transcribe_youtube_direct, | |
| inputs=[youtube_url, mode], | |
| outputs=[youtube_transcription, youtube_summary, youtube_status] | |
| ) | |
| upload_btn.click( | |
| fn=transcribe_audio_direct, | |
| inputs=[audio_file], | |
| outputs=[upload_transcription, upload_summary, upload_status] | |
| ) | |
| refresh_btn.click( | |
| fn=get_service_status, | |
| outputs=[startup_display] | |
| ) | |
| gr.HTML(""" | |
| <div style="text-align: center; padding: 20px; margin-top: 40px; border-top: 1px solid #e5e7eb;"> | |
| <p style="color: #6b7280;"> | |
| Built with β€οΈ using FastAPI, OpenAI Whisper, and OpenRouter API<br> | |
| π <strong>FastAPI Endpoints:</strong> /docs, /health, /transcribe, /transcribe-youtube | |
| </p> | |
| </div> | |
| """) | |
| # Mount FastAPI app with Gradio | |
| if fastapi_app: | |
| app = gr.mount_gradio_app(fastapi_app, demo, path="/") | |
| else: | |
| app = demo | |
| if __name__ == "__main__": | |
| print("π΅ Starting AudioScribe with integrated FastAPI + Gradio...") | |
| port = int(os.environ.get("PORT", 7860)) | |
| if fastapi_app: | |
| # Run with FastAPI + Gradio integration | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=port) | |
| else: | |
| # Fallback to Gradio only | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=port, | |
| share=False, | |
| show_error=True | |
| ) | |