Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from transformers import pipeline | |
| import yt_dlp | |
| import whisper | |
| import os | |
| import logging | |
| from urllib.parse import urlparse | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Initialize components at startup | |
| def initialize_components(): | |
| logger.info("Loading Whisper model...") | |
| whisper_model = whisper.load_model("base") | |
| logger.info("Loading classifier...") | |
| classifier = pipeline( | |
| "zero-shot-classification", | |
| model="facebook/bart-large-mnli", | |
| device="cpu" # Explicitly set to CPU for Hugging Face Spaces | |
| ) | |
| return whisper_model, classifier | |
| # Global initialization | |
| whisper_model, classifier = initialize_components() | |
| def clean_temp_files(): | |
| """Remove temporary files""" | |
| temp_files = ["temp_video.mp4", "temp_audio.mp3"] | |
| for file in temp_files: | |
| if os.path.exists(file): | |
| try: | |
| os.remove(file) | |
| logger.info(f"Removed temporary file: {file}") | |
| except Exception as e: | |
| logger.warning(f"Could not remove {file}: {e}") | |
| def is_valid_youtube_url(url): | |
| """Validate YouTube URL""" | |
| youtube_domains = ['youtube.com', 'www.youtube.com', 'youtu.be', 'www.youtu.be'] | |
| try: | |
| parsed = urlparse(url) | |
| if not parsed.scheme in ('http', 'https'): | |
| return False | |
| if not any(domain in parsed.netloc for domain in youtube_domains): | |
| return False | |
| return True | |
| except Exception as e: | |
| logger.error(f"URL validation error: {e}") | |
| return False | |
| def download_video(video_url): | |
| """Download YouTube video with enhanced error handling""" | |
| try: | |
| ydl_opts = { | |
| 'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best', | |
| 'outtmpl': 'temp_video.%(ext)s', | |
| 'quiet': False, | |
| 'no_warnings': False, | |
| 'merge_output_format': 'mp4', | |
| 'retries': 3, | |
| 'socket_timeout': 30, | |
| 'extract_flat': False, | |
| 'ignoreerrors': True, | |
| 'cookiefile': os.getenv('COOKIES_PATH') if os.getenv('COOKIES_PATH') and os.path.exists(os.getenv('COOKIES_PATH')) else None, | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| # Check availability first | |
| try: | |
| info = ydl.extract_info(video_url, download=False) | |
| if info.get('availability') == 'unavailable': | |
| return None, "Video is unavailable (private, deleted, or region-locked)" | |
| if info.get('age_limit', 0) > 0 and not ydl_opts['cookiefile']: | |
| return None, "Age-restricted content detected (try adding cookies.txt)" | |
| except Exception as e: | |
| logger.warning(f"Video info check failed: {e}") | |
| # Download the video | |
| try: | |
| ydl.download([video_url]) | |
| filename = 'temp_video.mp4' if os.path.exists('temp_video.mp4') else None | |
| return filename, None | |
| except yt_dlp.utils.DownloadError as e: | |
| return None, f"Download failed: {str(e)}" | |
| except Exception as e: | |
| logger.error(f"Download error: {e}") | |
| return None, f"Download system error: {str(e)}" | |
| def extract_audio(video_path): | |
| """Extract audio from video file""" | |
| try: | |
| if not os.path.exists(video_path): | |
| return None | |
| audio_path = "temp_audio.mp3" | |
| cmd = f"ffmpeg -i \"{video_path}\" -vn -acodec libmp3lame -q:a 2 \"{audio_path}\" -y -loglevel error" | |
| os.system(cmd) | |
| return audio_path if os.path.exists(audio_path) else None | |
| except Exception as e: | |
| logger.error(f"Audio extraction error: {e}") | |
| return None | |
| def transcribe_audio(audio_path): | |
| """Transcribe audio using Whisper""" | |
| try: | |
| if not os.path.exists(audio_path): | |
| return None | |
| result = whisper_model.transcribe(audio_path, fp16=False) | |
| return result['text'] | |
| except Exception as e: | |
| logger.error(f"Transcription error: {e}") | |
| return None | |
| def classify_content(text): | |
| """Classify content using zero-shot classification""" | |
| try: | |
| if not text or len(text.strip()) == 0: | |
| return None, None | |
| labels = [ | |
| "educational", "entertainment", "news", "political", | |
| "religious", "technical", "advertisement", "social" | |
| ] | |
| result = classifier( | |
| text, | |
| candidate_labels=labels, | |
| hypothesis_template="This text is about {}." | |
| ) | |
| return result['labels'][0], result['scores'][0] | |
| except Exception as e: | |
| logger.error(f"Classification error: {e}") | |
| return None, None | |
| def process_video(video_url): | |
| """Main processing pipeline""" | |
| clean_temp_files() | |
| if not video_url or len(video_url.strip()) == 0: | |
| return "Please enter a valid YouTube URL", "" | |
| if not is_valid_youtube_url(video_url): | |
| return "Please enter a valid YouTube URL (should start with https://youtube.com or https://youtu.be)", "" | |
| try: | |
| # Download video | |
| video_path, download_error = download_video(video_url) | |
| if not video_path: | |
| clean_temp_files() | |
| error_msg = download_error or "Failed to download video" | |
| return error_msg, "" | |
| # Extract audio | |
| audio_path = extract_audio(video_path) | |
| if not audio_path: | |
| clean_temp_files() | |
| return "Failed to extract audio from video", "" | |
| # Transcribe | |
| transcription = transcribe_audio(audio_path) | |
| if not transcription: | |
| clean_temp_files() | |
| return "Failed to transcribe audio (may be no speech detected)", "" | |
| # Classify | |
| category, confidence = classify_content(transcription) | |
| if not category: | |
| clean_temp_files() | |
| return transcription, "Failed to classify content" | |
| # Clean up | |
| clean_temp_files() | |
| # Format results | |
| classification_result = f"{category} (confidence: {confidence:.2%})" | |
| return transcription, classification_result | |
| except Exception as e: | |
| logger.error(f"Processing error: {e}") | |
| clean_temp_files() | |
| return f"An error occurred: {str(e)}", "" | |
| def create_app(): | |
| """Create Gradio interface""" | |
| with gr.Blocks(title="YouTube Content Analyzer", css=".gradio-container {max-width: 800px !important}") as demo: | |
| gr.Markdown(""" | |
| # ▶️ YouTube Content Analyzer | |
| Enter a YouTube video URL to get transcription and content classification | |
| """) | |
| with gr.Row(): | |
| url_input = gr.Textbox( | |
| label="YouTube URL", | |
| placeholder="Enter YouTube video URL here...", | |
| max_lines=1 | |
| ) | |
| with gr.Row(): | |
| submit_btn = gr.Button("Analyze Video", variant="primary") | |
| clear_btn = gr.Button("Clear") | |
| with gr.Row(): | |
| with gr.Column(): | |
| transcription_output = gr.Textbox( | |
| label="Transcription", | |
| interactive=True, | |
| lines=10, | |
| max_lines=20 | |
| ) | |
| with gr.Column(): | |
| category_output = gr.Textbox( | |
| label="Content Category", | |
| interactive=False | |
| ) | |
| # Examples | |
| gr.Examples( | |
| examples=[ | |
| ["https://www.youtube.com/watch?v=dQw4w9WgXcQ"], # Rick Astley | |
| ["https://youtu.be/J---aiyznGQ"] # Keyboard Cat | |
| ], | |
| inputs=url_input, | |
| label="Try these examples:" | |
| ) | |
| # Button actions | |
| submit_btn.click( | |
| fn=process_video, | |
| inputs=url_input, | |
| outputs=[transcription_output, category_output] | |
| ) | |
| clear_btn.click( | |
| fn=lambda: ["", ""], | |
| inputs=None, | |
| outputs=[transcription_output, category_output] | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| app = create_app() | |
| app.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False | |
| ) |