Update src/tools/youtube_transcript.py
Browse files
src/tools/youtube_transcript.py
CHANGED
|
@@ -0,0 +1,72 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
from langchain.tools import tool
|
| 2 |
+
|
| 3 |
+
try:
|
| 4 |
+
from youtube_transcript_api import YouTubeTranscriptApi
|
| 5 |
+
except Exception:
|
| 6 |
+
YouTubeTranscriptApi = None
|
| 7 |
+
|
| 8 |
+
import re
|
| 9 |
+
from urllib.parse import urlparse, parse_qs
|
| 10 |
+
|
| 11 |
+
|
| 12 |
+
def _extract_video_id(url_or_id: str) -> str | None:
|
| 13 |
+
s = (url_or_id or "").strip()
|
| 14 |
+
if re.fullmatch(r"[A-Za-z0-9_-]{11}", s):
|
| 15 |
+
return s
|
| 16 |
+
u = urlparse(s)
|
| 17 |
+
# youtu.be/<id>
|
| 18 |
+
if u.netloc.endswith("youtu.be"):
|
| 19 |
+
vid = u.path.strip("/").split("/")[0]
|
| 20 |
+
return vid if re.fullmatch(r"[A-Za-z0-9_-]{11}", vid) else None
|
| 21 |
+
# watch?v=<id>
|
| 22 |
+
qs = parse_qs(u.query or "")
|
| 23 |
+
if "v" in qs:
|
| 24 |
+
vid = qs["v"][0]
|
| 25 |
+
return vid if re.fullmatch(r"[A-Za-z0-9_-]{11}", vid) else None
|
| 26 |
+
# /embed/<id>, /shorts/<id>, /v/<id>
|
| 27 |
+
for pref in ("/embed/", "/shorts/", "/v/"):
|
| 28 |
+
if u.path.startswith(pref):
|
| 29 |
+
vid = u.path[len(pref):].split("/")[0]
|
| 30 |
+
return vid if re.fullmatch(r"[A-Za-z0-9_-]{11}", vid) else None
|
| 31 |
+
return None
|
| 32 |
+
|
| 33 |
+
|
| 34 |
+
@tool
|
| 35 |
+
def extract_youtube_transcript(url: str, chars: int = 1000) -> str:
|
| 36 |
+
"""
|
| 37 |
+
Simple YouTube transcript fetcher.
|
| 38 |
+
|
| 39 |
+
Input:
|
| 40 |
+
- url: Regular YouTube URL (or the 11-char video_id).
|
| 41 |
+
- chars: Return the first `chars` characters of the transcript.
|
| 42 |
+
|
| 43 |
+
Output:
|
| 44 |
+
- String with the transcript (trimmed to `chars`), or an error string:
|
| 45 |
+
"yt_error:<reason>"
|
| 46 |
+
"""
|
| 47 |
+
if YouTubeTranscriptApi is None:
|
| 48 |
+
return "yt_error:missing_dependency"
|
| 49 |
+
|
| 50 |
+
vid = _extract_video_id(url)
|
| 51 |
+
if not vid:
|
| 52 |
+
return "yt_error:id_not_found"
|
| 53 |
+
|
| 54 |
+
try:
|
| 55 |
+
api = YouTubeTranscriptApi()
|
| 56 |
+
# New API returns a list of FetchedTranscriptSnippet objects
|
| 57 |
+
snippets = api.fetch(vid)
|
| 58 |
+
|
| 59 |
+
parts = []
|
| 60 |
+
for s in snippets:
|
| 61 |
+
# Support both object (new) and dict (old) shapes
|
| 62 |
+
text = getattr(s, "text", None)
|
| 63 |
+
if text is None and isinstance(s, dict):
|
| 64 |
+
text = s.get("text")
|
| 65 |
+
if not text:
|
| 66 |
+
continue
|
| 67 |
+
parts.append(text.replace("\n", " ").strip())
|
| 68 |
+
|
| 69 |
+
full_text = " ".join(p for p in parts if p)
|
| 70 |
+
return full_text[: max(0, int(chars))]
|
| 71 |
+
except Exception as e:
|
| 72 |
+
return f"yt_error:{type(e).__name__}:{e}"
|