duck3-create Claude Opus 4.6 commited on
Commit
49a1ac5
Β·
1 Parent(s): 0e7d362

Add retry logic for YouTube transcript fetching

Browse files

Retries up to 3 times with 1.5s delay to handle intermittent
YouTube API blocks on cloud server IPs (Railway).
Skips retry for non-retryable errors (no subtitles, disabled).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

Files changed (1) hide show
  1. main.py +64 -43
main.py CHANGED
@@ -1,12 +1,17 @@
 
1
  from fastapi import FastAPI
2
  from fastapi.responses import FileResponse, JSONResponse
3
  from fastapi.middleware.cors import CORSMiddleware
4
  from pydantic import BaseModel
5
  from youtube_transcript_api import YouTubeTranscriptApi
6
  import re
 
7
  import asyncio
8
  from concurrent.futures import ThreadPoolExecutor
9
 
 
 
 
10
  app = FastAPI(title="YouTube Transcript Extractor")
11
 
12
  app.add_middleware(
@@ -52,6 +57,9 @@ KOREAN_FILLERS = {
52
 
53
  NOISE_PATTERN = re.compile(r"^\[.*\]$")
54
 
 
 
 
55
 
56
  def denoise_text(text: str) -> str:
57
  lines = text.split("\n")
@@ -73,49 +81,62 @@ def denoise_text(text: str) -> str:
73
 
74
 
75
  def _fetch_transcript(video_id: str, language: str, denoise: bool, fmt: str) -> dict:
76
- try:
77
- languages = [language]
78
- if language == "ko":
79
- languages.append("en")
80
- elif language == "en":
81
- languages.append("ko")
82
-
83
- data = _yt_api.fetch(video_id, languages=languages)
84
-
85
- if fmt == "json":
86
- entries = [
87
- {"text": e.text, "start": e.start, "duration": e.duration}
88
- for e in data
89
- ]
90
- if denoise:
91
- deduped = []
92
- prev_text = None
93
- for entry in entries:
94
- t = entry["text"].strip()
95
- if t in KOREAN_FILLERS or NOISE_PATTERN.match(t):
96
- continue
97
- if t == prev_text:
98
- continue
99
- if t:
100
- entry["text"] = t
101
- deduped.append(entry)
102
- prev_text = t
103
- entries = deduped
104
- return {"transcript": entries, "error": None}
105
- else:
106
- text = "\n".join(e.text for e in data)
107
- if denoise:
108
- text = denoise_text(text)
109
- return {"transcript": text, "error": None}
110
- except Exception as e:
111
- error_msg = str(e)
112
- if "No transcripts" in error_msg or "Could not retrieve" in error_msg:
113
- error_msg = "μžλ§‰μ„ 찾을 수 μ—†μŠ΅λ‹ˆλ‹€."
114
- elif "disabled" in error_msg.lower():
115
- error_msg = "이 μ˜μƒμ€ μžλ§‰μ΄ λΉ„ν™œμ„±ν™”λ˜μ–΄ μžˆμŠ΅λ‹ˆλ‹€."
116
- elif "unavailable" in error_msg.lower():
117
- error_msg = "μ˜μƒμ„ 찾을 수 μ—†μŠ΅λ‹ˆλ‹€."
118
- return {"transcript": None, "error": error_msg}
 
 
 
 
 
 
 
 
 
 
 
 
 
119
 
120
 
121
  @app.post("/api/transcripts")
 
1
+ import logging
2
  from fastapi import FastAPI
3
  from fastapi.responses import FileResponse, JSONResponse
4
  from fastapi.middleware.cors import CORSMiddleware
5
  from pydantic import BaseModel
6
  from youtube_transcript_api import YouTubeTranscriptApi
7
  import re
8
+ import time
9
  import asyncio
10
  from concurrent.futures import ThreadPoolExecutor
11
 
12
+ logging.basicConfig(level=logging.INFO)
13
+ logger = logging.getLogger(__name__)
14
+
15
  app = FastAPI(title="YouTube Transcript Extractor")
16
 
17
  app.add_middleware(
 
57
 
58
  NOISE_PATTERN = re.compile(r"^\[.*\]$")
59
 
60
+ MAX_RETRIES = 3
61
+ RETRY_DELAY = 1.5
62
+
63
 
64
  def denoise_text(text: str) -> str:
65
  lines = text.split("\n")
 
81
 
82
 
83
  def _fetch_transcript(video_id: str, language: str, denoise: bool, fmt: str) -> dict:
84
+ languages = [language]
85
+ if language == "ko":
86
+ languages.append("en")
87
+ elif language == "en":
88
+ languages.append("ko")
89
+
90
+ last_error = None
91
+ for attempt in range(1, MAX_RETRIES + 1):
92
+ try:
93
+ data = _yt_api.fetch(video_id, languages=languages)
94
+
95
+ if fmt == "json":
96
+ entries = [
97
+ {"text": e.text, "start": e.start, "duration": e.duration}
98
+ for e in data
99
+ ]
100
+ if denoise:
101
+ deduped = []
102
+ prev_text = None
103
+ for entry in entries:
104
+ t = entry["text"].strip()
105
+ if t in KOREAN_FILLERS or NOISE_PATTERN.match(t):
106
+ continue
107
+ if t == prev_text:
108
+ continue
109
+ if t:
110
+ entry["text"] = t
111
+ deduped.append(entry)
112
+ prev_text = t
113
+ entries = deduped
114
+ return {"transcript": entries, "error": None}
115
+ else:
116
+ text = "\n".join(e.text for e in data)
117
+ if denoise:
118
+ text = denoise_text(text)
119
+ return {"transcript": text, "error": None}
120
+ except Exception as e:
121
+ last_error = str(e)
122
+ logger.error(f"Attempt {attempt}/{MAX_RETRIES} failed for {video_id}: {last_error}")
123
+
124
+ # Don't retry if video genuinely has no subtitles
125
+ if "No transcripts" in last_error or "disabled" in last_error.lower():
126
+ break
127
+
128
+ if attempt < MAX_RETRIES:
129
+ time.sleep(RETRY_DELAY)
130
+
131
+ # All retries exhausted or non-retryable error
132
+ error_msg = last_error or "Unknown error"
133
+ if "No transcripts" in error_msg or "Could not retrieve" in error_msg:
134
+ error_msg = f"μžλ§‰μ„ 찾을 수 μ—†μŠ΅λ‹ˆλ‹€. ({error_msg[:120]})"
135
+ elif "disabled" in error_msg.lower():
136
+ error_msg = "이 μ˜μƒμ€ μžλ§‰μ΄ λΉ„ν™œμ„±ν™”λ˜μ–΄ μžˆμŠ΅λ‹ˆλ‹€."
137
+ elif "unavailable" in error_msg.lower():
138
+ error_msg = "μ˜μƒμ„ 찾을 수 μ—†μŠ΅λ‹ˆλ‹€."
139
+ return {"transcript": None, "error": error_msg}
140
 
141
 
142
  @app.post("/api/transcripts")