import gradio as gr from transformers import pipeline import yt_dlp import whisper import os import logging from urllib.parse import urlparse # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize components at startup def initialize_components(): logger.info("Loading Whisper model...") whisper_model = whisper.load_model("base") logger.info("Loading classifier...") classifier = pipeline( "zero-shot-classification", model="facebook/bart-large-mnli", device="cpu" # Explicitly set to CPU for Hugging Face Spaces ) return whisper_model, classifier # Global initialization whisper_model, classifier = initialize_components() def clean_temp_files(): """Remove temporary files""" temp_files = ["temp_video.mp4", "temp_audio.mp3"] for file in temp_files: if os.path.exists(file): try: os.remove(file) logger.info(f"Removed temporary file: {file}") except Exception as e: logger.warning(f"Could not remove {file}: {e}") def is_valid_youtube_url(url): """Validate YouTube URL""" youtube_domains = ['youtube.com', 'www.youtube.com', 'youtu.be', 'www.youtu.be'] try: parsed = urlparse(url) if not parsed.scheme in ('http', 'https'): return False if not any(domain in parsed.netloc for domain in youtube_domains): return False return True except Exception as e: logger.error(f"URL validation error: {e}") return False def download_video(video_url): """Download YouTube video with enhanced error handling""" try: ydl_opts = { 'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best', 'outtmpl': 'temp_video.%(ext)s', 'quiet': False, 'no_warnings': False, 'merge_output_format': 'mp4', 'retries': 3, 'socket_timeout': 30, 'extract_flat': False, 'ignoreerrors': True, 'cookiefile': os.getenv('COOKIES_PATH') if os.getenv('COOKIES_PATH') and os.path.exists(os.getenv('COOKIES_PATH')) else None, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: # Check availability first try: info = ydl.extract_info(video_url, download=False) if info.get('availability') == 'unavailable': return None, "Video is unavailable (private, deleted, or region-locked)" if info.get('age_limit', 0) > 0 and not ydl_opts['cookiefile']: return None, "Age-restricted content detected (try adding cookies.txt)" except Exception as e: logger.warning(f"Video info check failed: {e}") # Download the video try: ydl.download([video_url]) filename = 'temp_video.mp4' if os.path.exists('temp_video.mp4') else None return filename, None except yt_dlp.utils.DownloadError as e: return None, f"Download failed: {str(e)}" except Exception as e: logger.error(f"Download error: {e}") return None, f"Download system error: {str(e)}" def extract_audio(video_path): """Extract audio from video file""" try: if not os.path.exists(video_path): return None audio_path = "temp_audio.mp3" cmd = f"ffmpeg -i \"{video_path}\" -vn -acodec libmp3lame -q:a 2 \"{audio_path}\" -y -loglevel error" os.system(cmd) return audio_path if os.path.exists(audio_path) else None except Exception as e: logger.error(f"Audio extraction error: {e}") return None def transcribe_audio(audio_path): """Transcribe audio using Whisper""" try: if not os.path.exists(audio_path): return None result = whisper_model.transcribe(audio_path, fp16=False) return result['text'] except Exception as e: logger.error(f"Transcription error: {e}") return None def classify_content(text): """Classify content using zero-shot classification""" try: if not text or len(text.strip()) == 0: return None, None labels = [ "educational", "entertainment", "news", "political", "religious", "technical", "advertisement", "social" ] result = classifier( text, candidate_labels=labels, hypothesis_template="This text is about {}." ) return result['labels'][0], result['scores'][0] except Exception as e: logger.error(f"Classification error: {e}") return None, None def process_video(video_url): """Main processing pipeline""" clean_temp_files() if not video_url or len(video_url.strip()) == 0: return "Please enter a valid YouTube URL", "" if not is_valid_youtube_url(video_url): return "Please enter a valid YouTube URL (should start with https://youtube.com or https://youtu.be)", "" try: # Download video video_path, download_error = download_video(video_url) if not video_path: clean_temp_files() error_msg = download_error or "Failed to download video" return error_msg, "" # Extract audio audio_path = extract_audio(video_path) if not audio_path: clean_temp_files() return "Failed to extract audio from video", "" # Transcribe transcription = transcribe_audio(audio_path) if not transcription: clean_temp_files() return "Failed to transcribe audio (may be no speech detected)", "" # Classify category, confidence = classify_content(transcription) if not category: clean_temp_files() return transcription, "Failed to classify content" # Clean up clean_temp_files() # Format results classification_result = f"{category} (confidence: {confidence:.2%})" return transcription, classification_result except Exception as e: logger.error(f"Processing error: {e}") clean_temp_files() return f"An error occurred: {str(e)}", "" def create_app(): """Create Gradio interface""" with gr.Blocks(title="YouTube Content Analyzer", css=".gradio-container {max-width: 800px !important}") as demo: gr.Markdown(""" # ▶️ YouTube Content Analyzer Enter a YouTube video URL to get transcription and content classification """) with gr.Row(): url_input = gr.Textbox( label="YouTube URL", placeholder="Enter YouTube video URL here...", max_lines=1 ) with gr.Row(): submit_btn = gr.Button("Analyze Video", variant="primary") clear_btn = gr.Button("Clear") with gr.Row(): with gr.Column(): transcription_output = gr.Textbox( label="Transcription", interactive=True, lines=10, max_lines=20 ) with gr.Column(): category_output = gr.Textbox( label="Content Category", interactive=False ) # Examples gr.Examples( examples=[ ["https://www.youtube.com/watch?v=dQw4w9WgXcQ"], # Rick Astley ["https://youtu.be/J---aiyznGQ"] # Keyboard Cat ], inputs=url_input, label="Try these examples:" ) # Button actions submit_btn.click( fn=process_video, inputs=url_input, outputs=[transcription_output, category_output] ) clear_btn.click( fn=lambda: ["", ""], inputs=None, outputs=[transcription_output, category_output] ) return demo if __name__ == "__main__": app = create_app() app.launch( server_name="0.0.0.0", server_port=7860, share=False )