ayloll's picture
Update app.py
bfc16ca verified
import gradio as gr
from transformers import pipeline
import yt_dlp
import whisper
import os
import logging
from urllib.parse import urlparse
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize components at startup
def initialize_components():
logger.info("Loading Whisper model...")
whisper_model = whisper.load_model("base")
logger.info("Loading classifier...")
classifier = pipeline(
"zero-shot-classification",
model="facebook/bart-large-mnli",
device="cpu" # Explicitly set to CPU for Hugging Face Spaces
)
return whisper_model, classifier
# Global initialization
whisper_model, classifier = initialize_components()
def clean_temp_files():
"""Remove temporary files"""
temp_files = ["temp_video.mp4", "temp_audio.mp3"]
for file in temp_files:
if os.path.exists(file):
try:
os.remove(file)
logger.info(f"Removed temporary file: {file}")
except Exception as e:
logger.warning(f"Could not remove {file}: {e}")
def is_valid_youtube_url(url):
"""Validate YouTube URL"""
youtube_domains = ['youtube.com', 'www.youtube.com', 'youtu.be', 'www.youtu.be']
try:
parsed = urlparse(url)
if not parsed.scheme in ('http', 'https'):
return False
if not any(domain in parsed.netloc for domain in youtube_domains):
return False
return True
except Exception as e:
logger.error(f"URL validation error: {e}")
return False
def download_video(video_url):
"""Download YouTube video with enhanced error handling"""
try:
ydl_opts = {
'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best',
'outtmpl': 'temp_video.%(ext)s',
'quiet': False,
'no_warnings': False,
'merge_output_format': 'mp4',
'retries': 3,
'socket_timeout': 30,
'extract_flat': False,
'ignoreerrors': True,
'cookiefile': os.getenv('COOKIES_PATH') if os.getenv('COOKIES_PATH') and os.path.exists(os.getenv('COOKIES_PATH')) else None,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
# Check availability first
try:
info = ydl.extract_info(video_url, download=False)
if info.get('availability') == 'unavailable':
return None, "Video is unavailable (private, deleted, or region-locked)"
if info.get('age_limit', 0) > 0 and not ydl_opts['cookiefile']:
return None, "Age-restricted content detected (try adding cookies.txt)"
except Exception as e:
logger.warning(f"Video info check failed: {e}")
# Download the video
try:
ydl.download([video_url])
filename = 'temp_video.mp4' if os.path.exists('temp_video.mp4') else None
return filename, None
except yt_dlp.utils.DownloadError as e:
return None, f"Download failed: {str(e)}"
except Exception as e:
logger.error(f"Download error: {e}")
return None, f"Download system error: {str(e)}"
def extract_audio(video_path):
"""Extract audio from video file"""
try:
if not os.path.exists(video_path):
return None
audio_path = "temp_audio.mp3"
cmd = f"ffmpeg -i \"{video_path}\" -vn -acodec libmp3lame -q:a 2 \"{audio_path}\" -y -loglevel error"
os.system(cmd)
return audio_path if os.path.exists(audio_path) else None
except Exception as e:
logger.error(f"Audio extraction error: {e}")
return None
def transcribe_audio(audio_path):
"""Transcribe audio using Whisper"""
try:
if not os.path.exists(audio_path):
return None
result = whisper_model.transcribe(audio_path, fp16=False)
return result['text']
except Exception as e:
logger.error(f"Transcription error: {e}")
return None
def classify_content(text):
"""Classify content using zero-shot classification"""
try:
if not text or len(text.strip()) == 0:
return None, None
labels = [
"educational", "entertainment", "news", "political",
"religious", "technical", "advertisement", "social"
]
result = classifier(
text,
candidate_labels=labels,
hypothesis_template="This text is about {}."
)
return result['labels'][0], result['scores'][0]
except Exception as e:
logger.error(f"Classification error: {e}")
return None, None
def process_video(video_url):
"""Main processing pipeline"""
clean_temp_files()
if not video_url or len(video_url.strip()) == 0:
return "Please enter a valid YouTube URL", ""
if not is_valid_youtube_url(video_url):
return "Please enter a valid YouTube URL (should start with https://youtube.com or https://youtu.be)", ""
try:
# Download video
video_path, download_error = download_video(video_url)
if not video_path:
clean_temp_files()
error_msg = download_error or "Failed to download video"
return error_msg, ""
# Extract audio
audio_path = extract_audio(video_path)
if not audio_path:
clean_temp_files()
return "Failed to extract audio from video", ""
# Transcribe
transcription = transcribe_audio(audio_path)
if not transcription:
clean_temp_files()
return "Failed to transcribe audio (may be no speech detected)", ""
# Classify
category, confidence = classify_content(transcription)
if not category:
clean_temp_files()
return transcription, "Failed to classify content"
# Clean up
clean_temp_files()
# Format results
classification_result = f"{category} (confidence: {confidence:.2%})"
return transcription, classification_result
except Exception as e:
logger.error(f"Processing error: {e}")
clean_temp_files()
return f"An error occurred: {str(e)}", ""
def create_app():
"""Create Gradio interface"""
with gr.Blocks(title="YouTube Content Analyzer", css=".gradio-container {max-width: 800px !important}") as demo:
gr.Markdown("""
# ▶️ YouTube Content Analyzer
Enter a YouTube video URL to get transcription and content classification
""")
with gr.Row():
url_input = gr.Textbox(
label="YouTube URL",
placeholder="Enter YouTube video URL here...",
max_lines=1
)
with gr.Row():
submit_btn = gr.Button("Analyze Video", variant="primary")
clear_btn = gr.Button("Clear")
with gr.Row():
with gr.Column():
transcription_output = gr.Textbox(
label="Transcription",
interactive=True,
lines=10,
max_lines=20
)
with gr.Column():
category_output = gr.Textbox(
label="Content Category",
interactive=False
)
# Examples
gr.Examples(
examples=[
["https://www.youtube.com/watch?v=dQw4w9WgXcQ"], # Rick Astley
["https://youtu.be/J---aiyznGQ"] # Keyboard Cat
],
inputs=url_input,
label="Try these examples:"
)
# Button actions
submit_btn.click(
fn=process_video,
inputs=url_input,
outputs=[transcription_output, category_output]
)
clear_btn.click(
fn=lambda: ["", ""],
inputs=None,
outputs=[transcription_output, category_output]
)
return demo
if __name__ == "__main__":
app = create_app()
app.launch(
server_name="0.0.0.0",
server_port=7860,
share=False
)