FocusFlow Assistant commited on
Commit
cecb03b
·
1 Parent(s): f723af8

Replace YouTube fetch with timedtext API - no API keys needed, works from any server

Browse files
Files changed (2) hide show
  1. app.py +1 -7
  2. backend/rag_engine.py +119 -212
app.py CHANGED
@@ -1149,13 +1149,7 @@ if not st.session_state.focus_mode:
1149
  resp = requests.post(f"{API_URL}/ingest_url", json={"url": url_input}, headers=get_headers(), timeout=120)
1150
  if resp.status_code == 200:
1151
  data = resp.json()
1152
- message = data.get('message', 'Success')
1153
- # Show caption type info if available
1154
- if "auto-generated" in message:
1155
- st.success(f"✅ {message}")
1156
- st.info("ℹ️ Transcript extracted using auto-generated captions. Quality may vary — auto-captions can have errors.")
1157
- else:
1158
- st.success(f"✅ {message}")
1159
  time.sleep(1)
1160
  st.rerun()
1161
  else:
 
1149
  resp = requests.post(f"{API_URL}/ingest_url", json={"url": url_input}, headers=get_headers(), timeout=120)
1150
  if resp.status_code == 200:
1151
  data = resp.json()
1152
+ st.success(f"✅ YouTube transcript extracted successfully.")
 
 
 
 
 
 
1153
  time.sleep(1)
1154
  st.rerun()
1155
  else:
backend/rag_engine.py CHANGED
@@ -1,9 +1,11 @@
1
  import os
 
 
2
  from langchain_community.document_loaders import PyPDFLoader
3
  from langchain_text_splitters import RecursiveCharacterTextSplitter
4
  from langchain_chroma import Chroma
5
  from langchain_community.llms import Ollama
6
- from backend.config import get_llm, get_embeddings, has_youtube_api_key, YOUTUBE_API_KEY
7
  from langchain_core.documents import Document
8
  import logging
9
  import time
@@ -12,206 +14,125 @@ import re
12
  # Configure logger FIRST
13
  logger = logging.getLogger(__name__)
14
 
15
- # YouTube transcript support
16
- try:
17
- from youtube_transcript_api import YouTubeTranscriptApi
18
- from youtube_transcript_api import TranscriptsDisabled, NoTranscriptFound, InvalidVideoId
19
- HAS_YOUTUBE_API = True
20
- except ImportError:
21
- HAS_YOUTUBE_API = False
22
- logger.warning("youtube-transcript-api not installed - YouTube local fallback will not work")
23
-
24
  CACHE_DIR = "./chroma_db"
25
 
26
 
27
- def _parse_srt_to_text(srt_content: str) -> str:
28
- """Parse SRT subtitle format to plain text."""
29
- lines = srt_content.split('\n')
30
- text_lines = []
31
- for line in lines:
32
- line = line.strip()
33
- # Skip empty lines, sequence numbers, and timestamp lines
34
- if not line:
35
- continue
36
- if line.isdigit():
37
- continue
38
- if re.match(r'\d{2}:\d{2}:\d{2}', line):
39
- continue
40
- text_lines.append(line)
41
- return ' '.join(text_lines)
42
-
43
-
44
- def _fetch_youtube_transcript(video_id: str) -> tuple:
45
  """
46
- Fetch YouTube transcript using the best available method.
47
- Returns (transcript_text, caption_type) tuple.
48
-
49
- Method A: YouTube Data API v3 (when YOUTUBE_API_KEY is set - reliable in cloud)
50
- Method B: youtube-transcript-api fallback (local mode)
51
  """
52
-
53
- # --- METHOD A: YouTube Data API v3 (cloud-reliable) ---
54
- if has_youtube_api_key():
55
- logger.info("Using YouTube Data API v3 (API key found)")
56
- try:
57
- from googleapiclient.discovery import build
58
- from googleapiclient.errors import HttpError
59
-
60
- youtube = build('youtube', 'v3', developerKey=YOUTUBE_API_KEY)
61
-
62
- # Get available caption tracks
63
- captions_response = youtube.captions().list(
64
- part='snippet',
65
- videoId=video_id
66
- ).execute()
67
-
68
- caption_items = captions_response.get('items', [])
69
-
70
- if not caption_items:
71
- raise ValueError(
72
- "No captions found for this video. "
73
- "The video may have captions disabled by the creator."
74
- )
75
-
76
- # Select best caption track by priority
77
- selected_track = None
78
- caption_type = "manual"
79
-
80
- # Priority 1: Manual English
81
- for item in caption_items:
82
- snippet = item['snippet']
83
- if snippet.get('language') == 'en' and snippet.get('trackKind') == 'standard':
84
- selected_track = item
85
- caption_type = "manual"
86
- break
87
-
88
- # Priority 2: Auto-generated English
89
- if not selected_track:
90
- for item in caption_items:
91
- snippet = item['snippet']
92
- if snippet.get('language') == 'en' and snippet.get('trackKind') == 'asr':
93
- selected_track = item
94
- caption_type = "auto-generated"
95
- break
96
-
97
- # Priority 3: Any manual caption
98
- if not selected_track:
99
- for item in caption_items:
100
- snippet = item['snippet']
101
- if snippet.get('trackKind') == 'standard':
102
- selected_track = item
103
- caption_type = "manual"
104
- break
105
-
106
- # Priority 4: Any auto-generated caption
107
- if not selected_track:
108
- for item in caption_items:
109
- selected_track = item
110
- caption_type = "auto-generated"
111
- break
112
-
113
- if not selected_track:
114
- raise ValueError(
115
- "No captions found for this video. "
116
- "The video may have captions disabled by the creator."
117
- )
118
-
119
- logger.info(f"Selected caption track: {selected_track['snippet'].get('language')} ({caption_type})")
120
-
121
- # Download the caption track
122
- caption_id = selected_track['id']
123
- subtitle_response = youtube.captions().download(
124
- id=caption_id,
125
- tfmt='srt'
126
- ).execute()
127
-
128
- # Parse SRT to plain text
129
- srt_text = subtitle_response.decode('utf-8') if isinstance(subtitle_response, bytes) else str(subtitle_response)
130
- transcript_text = _parse_srt_to_text(srt_text)
131
-
132
- if not transcript_text or len(transcript_text) < 50:
133
- raise ValueError(
134
- "Transcript is too short or empty. "
135
- "Try a different video with more spoken content."
136
- )
137
-
138
- logger.info(f"YouTube Data API: extracted {len(transcript_text)} chars ({caption_type})")
139
- return transcript_text, caption_type
140
-
141
- except HttpError as e:
142
- if e.resp.status == 403:
143
- if 'quotaExceeded' in str(e):
144
- raise ValueError(
145
- "YouTube API quota exceeded. "
146
- "Please try again later or upload a PDF instead."
147
- )
148
- # captions().download requires OAuth for third-party videos
149
- # Fall through to Method B
150
- logger.warning(f"YouTube Data API forbidden (likely needs OAuth for captions download): {e}")
151
- logger.info("Falling back to youtube-transcript-api...")
152
- elif e.resp.status == 404:
153
- raise ValueError(
154
- "Could not access this video. "
155
- "It may be private, deleted, or region-restricted."
156
- )
157
- else:
158
- logger.error(f"YouTube Data API error: {e}")
159
- logger.info("Falling back to youtube-transcript-api...")
160
- except ValueError:
161
- raise
162
- except Exception as e:
163
- logger.error(f"YouTube Data API unexpected error: {e}")
164
- logger.info("Falling back to youtube-transcript-api...")
165
-
166
- # --- METHOD B: youtube-transcript-api fallback (local / API fallback) ---
167
- if not HAS_YOUTUBE_API:
168
  raise ValueError(
169
- "YouTube transcript libraries not available. "
170
- "Please upload a PDF instead."
171
  )
172
-
173
- logger.info("Using youtube-transcript-api (local fallback)")
174
- ytt = YouTubeTranscriptApi()
175
-
 
 
 
 
 
 
 
176
  try:
177
- # PRIORITY 1: Try manual English captions
178
- logger.info("Trying manual English captions...")
179
- transcript = ytt.fetch(video_id, languages=['en'])
180
- transcript_text = ' '.join([t.text for t in transcript])
181
- logger.info(f"Got manual English transcript ({len(transcript_text)} chars)")
182
- return transcript_text, "manual"
183
- except Exception as e1:
184
- logger.info(f"Manual English not available: {e1}")
185
- try:
186
- # PRIORITY 2: Try any available transcript (includes auto-generated)
187
- logger.info("Trying any available transcript...")
188
- transcript_list = ytt.list(video_id)
189
- first_available = next(iter(transcript_list))
190
- transcript = first_available.fetch()
191
- transcript_text = ' '.join([t.text for t in transcript])
192
- logger.info(f"Got fallback transcript ({len(transcript_text)} chars)")
193
- return transcript_text, "auto-generated"
194
- except (TranscriptsDisabled, NoTranscriptFound):
195
- raise ValueError(
196
- "No captions found for this video. "
197
- "This video may have captions disabled entirely. "
198
- "Try a different video or upload a PDF instead."
199
- )
200
- except InvalidVideoId:
201
- raise ValueError(f"Invalid YouTube video ID: {video_id}. Please check the URL.")
202
- except StopIteration:
203
- raise ValueError(
204
- "No captions found for this video. "
205
- "This video may have captions disabled entirely. "
206
- "Try a different video or upload a PDF instead."
207
- )
208
- except Exception as e2:
209
- logger.error(f"All transcript fetch attempts failed: {e2}")
210
- raise ValueError(
211
- "No captions found for this video. "
212
- "This video may have captions disabled entirely. "
213
- "Try a different video or upload a PDF instead."
214
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
215
 
216
 
217
 
@@ -302,13 +223,10 @@ def ingest_document(file_path: str):
302
  def ingest_url(url: str):
303
  """
304
  Ingests content from a URL (YouTube or Web).
305
- Uses YouTube Data API v3 in cloud mode, youtube-transcript-api locally.
306
  """
307
  from langchain_community.document_loaders import WebBaseLoader
308
 
309
- if not HAS_YOUTUBE_API and not has_youtube_api_key() and ("youtube.com" in url or "youtu.be" in url):
310
- raise ValueError("YouTube support not available - no API key or transcript library found")
311
-
312
  docs = []
313
  title = url
314
 
@@ -331,18 +249,8 @@ def ingest_url(url: str):
331
 
332
  logger.info(f"Extracted video ID: {video_id}")
333
 
334
- # Fetch transcript using the appropriate method
335
- transcript_text, caption_type = _fetch_youtube_transcript(video_id)
336
-
337
- # Clean up transcript text
338
- transcript_text = re.sub(r'\[.*?\]', '', transcript_text) # Remove [Music], [Applause], etc.
339
- transcript_text = re.sub(r'\s+', ' ', transcript_text).strip() # Normalize whitespace
340
-
341
- if len(transcript_text) < 50:
342
- raise ValueError(
343
- "Transcript is too short or empty after cleanup. "
344
- "Try a different video with more spoken content."
345
- )
346
 
347
  # Create a document from the transcript
348
  docs = [Document(
@@ -350,11 +258,10 @@ def ingest_url(url: str):
350
  metadata={
351
  "source": url,
352
  "title": f"YouTube: {video_id}",
353
- "type": "youtube",
354
- "caption_type": caption_type
355
  }
356
  )]
357
- title = f"YouTube Video: {video_id} ({caption_type} captions)"
358
 
359
  else:
360
  # Regular web page
 
1
  import os
2
+ import json
3
+ import requests as http_requests
4
  from langchain_community.document_loaders import PyPDFLoader
5
  from langchain_text_splitters import RecursiveCharacterTextSplitter
6
  from langchain_chroma import Chroma
7
  from langchain_community.llms import Ollama
8
+ from backend.config import get_llm, get_embeddings
9
  from langchain_core.documents import Document
10
  import logging
11
  import time
 
14
  # Configure logger FIRST
15
  logger = logging.getLogger(__name__)
16
 
 
 
 
 
 
 
 
 
 
17
  CACHE_DIR = "./chroma_db"
18
 
19
 
20
+ def _fetch_youtube_transcript(video_id: str) -> str:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21
  """
22
+ Fetch YouTube transcript using the timedtext API endpoint.
23
+ Works from any server — no API keys or OAuth required.
24
+ Returns cleaned transcript text.
 
 
25
  """
26
+ # Step 1: Fetch the YouTube page to get caption track info
27
+ page_url = f"https://www.youtube.com/watch?v={video_id}"
28
+ headers = {
29
+ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
30
+ "AppleWebKit/537.36 (KHTML, like Gecko) "
31
+ "Chrome/120.0.0.0 Safari/537.36",
32
+ "Accept-Language": "en-US,en;q=0.9"
33
+ }
34
+
35
+ response = http_requests.get(page_url, headers=headers, timeout=30)
36
+
37
+ if response.status_code != 200:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
38
  raise ValueError(
39
+ "Could not access this YouTube video. "
40
+ "It may be private or region-restricted."
41
  )
42
+
43
+ # Step 2: Extract captionTracks from ytInitialPlayerResponse
44
+ match = re.search(r'"captionTracks":(\[.*?\])', response.text)
45
+
46
+ if not match:
47
+ raise ValueError(
48
+ "No captions available for this video. "
49
+ "The creator may have disabled captions. "
50
+ "Try a different video or upload a PDF instead."
51
+ )
52
+
53
  try:
54
+ caption_tracks = json.loads(match.group(1))
55
+ except json.JSONDecodeError:
56
+ raise ValueError(
57
+ "Failed to parse caption data. "
58
+ "Try a different video or upload a PDF instead."
59
+ )
60
+
61
+ if not caption_tracks:
62
+ raise ValueError(
63
+ "No captions available for this video. "
64
+ "The creator may have disabled captions. "
65
+ "Try a different video or upload a PDF instead."
66
+ )
67
+
68
+ # Step 3: Pick best caption track by priority
69
+ selected = None
70
+
71
+ # Priority 1: Manual English captions
72
+ for track in caption_tracks:
73
+ lang = track.get('languageCode', '')
74
+ kind = track.get('kind', '')
75
+ if lang == 'en' and kind != 'asr':
76
+ selected = track
77
+ break
78
+
79
+ # Priority 2: Auto-generated English captions
80
+ if not selected:
81
+ for track in caption_tracks:
82
+ if track.get('languageCode', '') == 'en':
83
+ selected = track
84
+ break
85
+
86
+ # Priority 3: Any available track
87
+ if not selected:
88
+ selected = caption_tracks[0]
89
+
90
+ logger.info(f"Selected caption track: {selected.get('languageCode')} (kind={selected.get('kind', 'standard')})")
91
+
92
+ # Step 4: Download the caption track in JSON3 format
93
+ caption_url = selected.get('baseUrl')
94
+ if not caption_url:
95
+ raise ValueError("Could not retrieve caption URL.")
96
+
97
+ caption_response = http_requests.get(
98
+ caption_url + "&fmt=json3",
99
+ headers=headers,
100
+ timeout=30
101
+ )
102
+
103
+ if caption_response.status_code != 200:
104
+ raise ValueError("Failed to download captions.")
105
+
106
+ # Step 5: Parse the JSON3 caption format
107
+ try:
108
+ caption_data = caption_response.json()
109
+ except json.JSONDecodeError:
110
+ raise ValueError("Failed to parse caption data.")
111
+
112
+ events = caption_data.get('events', [])
113
+ text_parts = []
114
+
115
+ for event in events:
116
+ segs = event.get('segs', [])
117
+ for seg in segs:
118
+ utf8 = seg.get('utf8', '')
119
+ if utf8 and utf8 != '\n':
120
+ text_parts.append(utf8)
121
+
122
+ transcript_text = ' '.join(text_parts)
123
+
124
+ # Step 6: Clean the text
125
+ transcript_text = re.sub(r'\[.*?\]', '', transcript_text)
126
+ transcript_text = re.sub(r'\s+', ' ', transcript_text).strip()
127
+
128
+ if len(transcript_text) < 50:
129
+ raise ValueError(
130
+ "Transcript is too short or empty. "
131
+ "Try a different video with more spoken content."
132
+ )
133
+
134
+ logger.info(f"Timedtext API: extracted {len(transcript_text)} chars")
135
+ return transcript_text
136
 
137
 
138
 
 
223
  def ingest_url(url: str):
224
  """
225
  Ingests content from a URL (YouTube or Web).
226
+ Uses YouTube's timedtext API for transcripts no API keys needed.
227
  """
228
  from langchain_community.document_loaders import WebBaseLoader
229
 
 
 
 
230
  docs = []
231
  title = url
232
 
 
249
 
250
  logger.info(f"Extracted video ID: {video_id}")
251
 
252
+ # Fetch transcript using timedtext API
253
+ transcript_text = _fetch_youtube_transcript(video_id)
 
 
 
 
 
 
 
 
 
 
254
 
255
  # Create a document from the transcript
256
  docs = [Document(
 
258
  metadata={
259
  "source": url,
260
  "title": f"YouTube: {video_id}",
261
+ "type": "youtube"
 
262
  }
263
  )]
264
+ title = f"YouTube Video: {video_id}"
265
 
266
  else:
267
  # Regular web page