ayloll commited on
Commit
85563e1
·
verified ·
1 Parent(s): ae93310

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +204 -25
app.py CHANGED
@@ -3,44 +3,80 @@ from transformers import pipeline
3
  import yt_dlp
4
  import whisper
5
  import os
 
6
  from urllib.parse import urlparse
7
 
8
- # Delete temporary files
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9
  def clean_temp_files():
 
10
  temp_files = ["temp_video.mp4", "temp_audio.mp3"]
11
  for file in temp_files:
12
  if os.path.exists(file):
13
- os.remove(file)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
14
 
15
- # Enhanced YouTube downloader with error handling
16
  def download_video(video_url):
 
17
  try:
18
  ydl_opts = {
19
  'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best',
20
  'outtmpl': 'temp_video.%(ext)s',
21
- 'quiet': True,
22
- 'no_warnings': True,
23
  'merge_output_format': 'mp4',
24
  'retries': 3,
25
  'socket_timeout': 30,
26
  'extract_flat': False,
27
  'ignoreerrors': True,
28
- 'cookiefile': 'cookies.txt' if os.path.exists('cookies.txt') else None,
29
  }
30
 
31
  with yt_dlp.YoutubeDL(ydl_opts) as ydl:
32
- # First check if video is available
33
  try:
34
  info = ydl.extract_info(video_url, download=False)
35
  if info.get('availability') == 'unavailable':
36
  return None, "Video is unavailable (private, deleted, or region-locked)"
37
-
38
- if info.get('age_limit', 0) > 0:
39
- return None, "Age-restricted content (try with cookies)"
40
- except:
41
- pass
42
-
43
- # Try to download
44
  try:
45
  ydl.download([video_url])
46
  filename = 'temp_video.mp4' if os.path.exists('temp_video.mp4') else None
@@ -49,27 +85,170 @@ def download_video(video_url):
49
  return None, f"Download failed: {str(e)}"
50
 
51
  except Exception as e:
52
- return None, f"Error: {str(e)}"
 
53
 
54
- # [Rest of your functions (extract_audio, transcribe_audio, classify_content) remain the same...]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
55
 
56
- # Main processing function with enhanced error handling
57
  def process_video(video_url):
 
58
  clean_temp_files()
59
 
60
  if not video_url or len(video_url.strip()) == 0:
61
  return "Please enter a valid YouTube URL", ""
62
 
63
  if not is_valid_youtube_url(video_url):
64
- return "Please enter a valid YouTube URL", ""
65
 
66
- # Download video
67
- video_path, download_error = download_video(video_url)
68
- if not video_path:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69
  clean_temp_files()
70
- error_msg = download_error or "Failed to download video"
71
- return error_msg, ""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
72
 
73
- # [Rest of your processing logic remains the same...]
74
 
75
- # [Rest of your Gradio interface code remains the same...]
 
 
 
 
 
 
 
 
 
3
  import yt_dlp
4
  import whisper
5
  import os
6
+ import logging
7
  from urllib.parse import urlparse
8
 
9
+ # Configure logging
10
+ logging.basicConfig(level=logging.INFO)
11
+ logger = logging.getLogger(__name__)
12
+
13
+ # Initialize components at startup
14
+ def initialize_components():
15
+ logger.info("Loading Whisper model...")
16
+ whisper_model = whisper.load_model("base")
17
+ logger.info("Loading classifier...")
18
+ classifier = pipeline(
19
+ "zero-shot-classification",
20
+ model="facebook/bart-large-mnli"
21
+ )
22
+ return whisper_model, classifier
23
+
24
+ # Global initialization
25
+ whisper_model, classifier = initialize_components()
26
+
27
  def clean_temp_files():
28
+ """Remove temporary files"""
29
  temp_files = ["temp_video.mp4", "temp_audio.mp3"]
30
  for file in temp_files:
31
  if os.path.exists(file):
32
+ try:
33
+ os.remove(file)
34
+ logger.info(f"Removed temporary file: {file}")
35
+ except Exception as e:
36
+ logger.warning(f"Could not remove {file}: {e}")
37
+
38
+ def is_valid_youtube_url(url):
39
+ """Validate YouTube URL"""
40
+ youtube_domains = ['youtube.com', 'www.youtube.com', 'youtu.be', 'www.youtu.be']
41
+ try:
42
+ parsed = urlparse(url)
43
+ if not parsed.scheme in ('http', 'https'):
44
+ return False
45
+ if not any(domain in parsed.netloc for domain in youtube_domains):
46
+ return False
47
+ return True
48
+ except Exception as e:
49
+ logger.error(f"URL validation error: {e}")
50
+ return False
51
 
 
52
  def download_video(video_url):
53
+ """Download YouTube video with enhanced error handling"""
54
  try:
55
  ydl_opts = {
56
  'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best',
57
  'outtmpl': 'temp_video.%(ext)s',
58
+ 'quiet': False,
59
+ 'no_warnings': False,
60
  'merge_output_format': 'mp4',
61
  'retries': 3,
62
  'socket_timeout': 30,
63
  'extract_flat': False,
64
  'ignoreerrors': True,
65
+ 'cookiefile': os.getenv('COOKIES_PATH') if os.getenv('COOKIES_PATH') and os.path.exists(os.getenv('COOKIES_PATH')) else None,
66
  }
67
 
68
  with yt_dlp.YoutubeDL(ydl_opts) as ydl:
69
+ # Check availability first
70
  try:
71
  info = ydl.extract_info(video_url, download=False)
72
  if info.get('availability') == 'unavailable':
73
  return None, "Video is unavailable (private, deleted, or region-locked)"
74
+ if info.get('age_limit', 0) > 0 and not ydl_opts['cookiefile']:
75
+ return None, "Age-restricted content detected (try adding cookies.txt)"
76
+ except Exception as e:
77
+ logger.warning(f"Video info check failed: {e}")
78
+
79
+ # Download the video
 
80
  try:
81
  ydl.download([video_url])
82
  filename = 'temp_video.mp4' if os.path.exists('temp_video.mp4') else None
 
85
  return None, f"Download failed: {str(e)}"
86
 
87
  except Exception as e:
88
+ logger.error(f"Download error: {e}")
89
+ return None, f"Download system error: {str(e)}"
90
 
91
+ def extract_audio(video_path):
92
+ """Extract audio from video file"""
93
+ try:
94
+ if not os.path.exists(video_path):
95
+ return None
96
+
97
+ audio_path = "temp_audio.mp3"
98
+ cmd = f"ffmpeg -i \"{video_path}\" -vn -acodec libmp3lame -q:a 2 \"{audio_path}\" -y -loglevel error"
99
+ os.system(cmd)
100
+ return audio_path if os.path.exists(audio_path) else None
101
+ except Exception as e:
102
+ logger.error(f"Audio extraction error: {e}")
103
+ return None
104
+
105
+ def transcribe_audio(audio_path):
106
+ """Transcribe audio using Whisper"""
107
+ try:
108
+ if not os.path.exists(audio_path):
109
+ return None
110
+
111
+ result = whisper_model.transcribe(audio_path, fp16=False)
112
+ return result['text']
113
+ except Exception as e:
114
+ logger.error(f"Transcription error: {e}")
115
+ return None
116
+
117
+ def classify_content(text):
118
+ """Classify content using zero-shot classification"""
119
+ try:
120
+ if not text or len(text.strip()) == 0:
121
+ return None, None
122
+
123
+ labels = [
124
+ "educational", "entertainment", "news", "political",
125
+ "religious", "technical", "advertisement", "social"
126
+ ]
127
+
128
+ result = classifier(
129
+ text,
130
+ candidate_labels=labels,
131
+ hypothesis_template="This text is about {}."
132
+ )
133
+
134
+ return result['labels'][0], result['scores'][0]
135
+ except Exception as e:
136
+ logger.error(f"Classification error: {e}")
137
+ return None, None
138
 
 
139
  def process_video(video_url):
140
+ """Main processing pipeline"""
141
  clean_temp_files()
142
 
143
  if not video_url or len(video_url.strip()) == 0:
144
  return "Please enter a valid YouTube URL", ""
145
 
146
  if not is_valid_youtube_url(video_url):
147
+ return "Please enter a valid YouTube URL (should start with https://youtube.com or https://youtu.be)", ""
148
 
149
+ try:
150
+ # Download video
151
+ video_path, download_error = download_video(video_url)
152
+ if not video_path:
153
+ clean_temp_files()
154
+ error_msg = download_error or "Failed to download video"
155
+ return error_msg, ""
156
+
157
+ # Extract audio
158
+ audio_path = extract_audio(video_path)
159
+ if not audio_path:
160
+ clean_temp_files()
161
+ return "Failed to extract audio from video", ""
162
+
163
+ # Transcribe
164
+ transcription = transcribe_audio(audio_path)
165
+ if not transcription:
166
+ clean_temp_files()
167
+ return "Failed to transcribe audio (may be no speech detected)", ""
168
+
169
+ # Classify
170
+ category, confidence = classify_content(transcription)
171
+ if not category:
172
+ clean_temp_files()
173
+ return transcription, "Failed to classify content"
174
+
175
+ # Clean up
176
+ clean_temp_files()
177
+
178
+ # Format results
179
+ classification_result = f"{category} (confidence: {confidence:.2%})"
180
+ return transcription, classification_result
181
+
182
+ except Exception as e:
183
+ logger.error(f"Processing error: {e}")
184
  clean_temp_files()
185
+ return f"An error occurred: {str(e)}", ""
186
+
187
+ def create_app():
188
+ """Create Gradio interface"""
189
+ with gr.Blocks(title="YouTube Content Analyzer", css=".gradio-container {max-width: 800px !important}") as demo:
190
+ gr.Markdown("""
191
+ # ▶️ YouTube Content Analyzer
192
+ Enter a YouTube video URL to get transcription and content classification
193
+ """)
194
+
195
+ with gr.Row():
196
+ url_input = gr.Textbox(
197
+ label="YouTube URL",
198
+ placeholder="Enter YouTube video URL here...",
199
+ max_lines=1
200
+ )
201
+
202
+ with gr.Row():
203
+ submit_btn = gr.Button("Analyze Video", variant="primary")
204
+ clear_btn = gr.Button("Clear")
205
+
206
+ with gr.Row():
207
+ with gr.Column():
208
+ transcription_output = gr.Textbox(
209
+ label="Transcription",
210
+ interactive=True,
211
+ lines=10,
212
+ max_lines=20
213
+ )
214
+
215
+ with gr.Column():
216
+ category_output = gr.Textbox(
217
+ label="Content Category",
218
+ interactive=False
219
+ )
220
+
221
+ # Examples
222
+ gr.Examples(
223
+ examples=[
224
+ ["https://www.youtube.com/watch?v=dQw4w9WgXcQ"], # Rick Astley
225
+ ["https://youtu.be/J---aiyznGQ"] # Keyboard Cat
226
+ ],
227
+ inputs=url_input,
228
+ label="Try these examples:"
229
+ )
230
+
231
+ # Button actions
232
+ submit_btn.click(
233
+ fn=process_video,
234
+ inputs=url_input,
235
+ outputs=[transcription_output, category_output]
236
+ )
237
+
238
+ clear_btn.click(
239
+ fn=lambda: ["", ""],
240
+ inputs=None,
241
+ outputs=[transcription_output, category_output]
242
+ )
243
 
244
+ return demo
245
 
246
+ if __name__ == "__main__":
247
+ app = create_app()
248
+ app.launch(
249
+ server_name="0.0.0.0",
250
+ server_port=7860,
251
+ enable_queue=True,
252
+ share=False,
253
+ show_error=True
254
+ )