Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """Download transcripts from YouTube videos and save them as text files.""" | |
| import argparse | |
| import logging | |
| import re | |
| import sys | |
| from pathlib import Path | |
| from urllib.parse import parse_qs, urlparse | |
| import yt_dlp | |
| from docx import Document | |
| from docx.oxml import OxmlElement | |
| from docx.oxml.ns import qn | |
| from youtube_transcript_api import ( | |
| NoTranscriptFound, | |
| TranscriptsDisabled, | |
| VideoUnavailable, | |
| YouTubeTranscriptApi, | |
| ) | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") | |
| logger = logging.getLogger(__name__) | |
| _ENGLISH_CODES = ["en", "en-US", "en-GB"] | |
| def parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser(description="Download YouTube video transcripts to text files.") | |
| parser.add_argument("videos", nargs="+", metavar="URL_OR_ID", help="YouTube URLs or video IDs") | |
| parser.add_argument("-o", "--output", type=Path, default=Path("transcripts"), metavar="DIR") | |
| parser.add_argument("-v", "--verbose", action="store_true") | |
| return parser.parse_args() | |
| def extract_video_id(url_or_id: str) -> str: | |
| parsed = urlparse(url_or_id) | |
| if parsed.scheme in ("http", "https"): | |
| host = parsed.netloc.lower().lstrip("www.") | |
| if host == "youtu.be": | |
| vid = parsed.path.lstrip("/").split("/")[0] | |
| elif host in ("youtube.com", "m.youtube.com"): | |
| if parsed.path.startswith("/shorts/"): | |
| vid = parsed.path.split("/shorts/")[1].split("/")[0] | |
| else: | |
| qs = parse_qs(parsed.query) | |
| candidates = qs.get("v", []) | |
| if not candidates: | |
| raise ValueError(f"No video ID in URL: {url_or_id}") | |
| vid = candidates[0] | |
| else: | |
| raise ValueError(f"Unrecognised YouTube host: {parsed.netloc}") | |
| vid = vid.split("&")[0].split("?")[0] | |
| else: | |
| vid = url_or_id.strip() | |
| if not re.fullmatch(r"[a-zA-Z0-9_-]{11}", vid): | |
| raise ValueError(f"Does not look like a video ID: {vid!r}") | |
| return vid | |
| def expand_input(url_or_id: str) -> list[str]: | |
| """Return video IDs for a single video URL/ID or all videos in a playlist.""" | |
| if re.fullmatch(r"[a-zA-Z0-9_-]{11}", url_or_id.strip()): | |
| return [url_or_id.strip()] | |
| opts = {"quiet": True, "no_warnings": True, "extract_flat": True} | |
| with yt_dlp.YoutubeDL(opts) as ydl: | |
| info = ydl.extract_info(url_or_id, download=False) | |
| if info.get("_type") == "playlist": | |
| ids = [e["id"] for e in (info.get("entries") or []) if e.get("id")] | |
| logger.info("Playlist %r: %d videos", info.get("title", url_or_id), len(ids)) | |
| return ids | |
| return [info["id"]] | |
| def fetch_title(video_id: str) -> str: | |
| opts = { | |
| "quiet": True, | |
| "no_warnings": True, | |
| "extract_flat": True, | |
| "skip_download": True, | |
| } | |
| with yt_dlp.YoutubeDL(opts) as ydl: | |
| info = ydl.extract_info(f"https://www.youtube.com/watch?v={video_id}", download=False) | |
| if not info or "title" not in info: | |
| raise RuntimeError("yt-dlp returned no title") | |
| return info["title"] | |
| def fetch_transcript(video_id: str) -> str: | |
| api = YouTubeTranscriptApi() | |
| transcript_list = api.list(video_id) | |
| transcript = None | |
| try: | |
| transcript = transcript_list.find_manually_created_transcript(_ENGLISH_CODES) | |
| logger.debug("Using manually-created English transcript for %s", video_id) | |
| except NoTranscriptFound: | |
| pass | |
| if transcript is None: | |
| try: | |
| transcript = transcript_list.find_generated_transcript(_ENGLISH_CODES) | |
| logger.debug("Using auto-generated English transcript for %s", video_id) | |
| except NoTranscriptFound: | |
| pass | |
| if transcript is None: | |
| available = list(transcript_list) | |
| if not available: | |
| raise NoTranscriptFound(video_id, []) | |
| transcript = available[0] | |
| logger.warning( | |
| "No English transcript for %s; using language %s", video_id, transcript.language_code | |
| ) | |
| entries = transcript.fetch() | |
| return " ".join(entry.text for entry in entries) | |
| def sanitize_filename(title: str, video_id: str) -> str: | |
| name = re.sub(r'[<>:"/\\|?*\x00-\x1f]', "_", title) | |
| name = re.sub(r"\s+", "_", name) | |
| name = re.sub(r"_+", "_", name) | |
| name = name.strip("_.") | |
| name = name[:200] | |
| return name if name else video_id | |
| def _add_hyperlink(paragraph, text: str, url: str) -> None: | |
| r_id = paragraph.part.relate_to( | |
| url, | |
| "http://schemas.openxmlformats.org/officeDocument/2006/relationships/hyperlink", | |
| is_external=True, | |
| ) | |
| hyperlink = OxmlElement("w:hyperlink") | |
| hyperlink.set(qn("r:id"), r_id) | |
| run = OxmlElement("w:r") | |
| rpr = OxmlElement("w:rPr") | |
| style = OxmlElement("w:rStyle") | |
| style.set(qn("w:val"), "Hyperlink") | |
| rpr.append(style) | |
| run.append(rpr) | |
| t = OxmlElement("w:t") | |
| t.text = text | |
| run.append(t) | |
| hyperlink.append(run) | |
| paragraph._p.append(hyperlink) | |
| def save_transcript(text: str, title: str, video_id: str, stem: str, output_dir: Path) -> Path: | |
| url = f"https://www.youtube.com/watch?v={video_id}" | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| path = output_dir / f"{stem}.docx" | |
| doc = Document() | |
| doc.add_heading(title, level=1) | |
| link_para = doc.add_paragraph() | |
| _add_hyperlink(link_para, url, url) | |
| doc.add_heading("Transcript", level=2) | |
| doc.add_paragraph(text) | |
| doc.save(path) | |
| return path | |
| def process_video(url_or_id: str, output_dir: Path) -> bool: | |
| video_id: str | None = None | |
| try: | |
| video_id = extract_video_id(url_or_id) | |
| except ValueError as exc: | |
| logger.error("Could not parse video ID from %r: %s", url_or_id, exc) | |
| return False | |
| title = video_id | |
| try: | |
| title = fetch_title(video_id) | |
| except Exception as exc: | |
| logger.warning("Could not fetch title for %s: %s — using video ID as filename", video_id, exc) | |
| title_stem = sanitize_filename(title, video_id) | |
| try: | |
| text = fetch_transcript(video_id) | |
| except TranscriptsDisabled: | |
| logger.error("Transcripts are disabled for %s", video_id) | |
| return False | |
| except VideoUnavailable: | |
| logger.error("Video unavailable: %s", video_id) | |
| return False | |
| except NoTranscriptFound: | |
| logger.error("No transcript available for %s", video_id) | |
| return False | |
| except Exception as exc: | |
| logger.error("Unexpected error fetching transcript for %s: %s", video_id, exc) | |
| return False | |
| try: | |
| path = save_transcript(text, title, video_id, title_stem, output_dir) | |
| logger.info("Saved: %s", path) | |
| return True | |
| except Exception as exc: | |
| logger.error("Could not write file for %s: %s", video_id, exc) | |
| return False | |
| def main() -> None: | |
| args = parse_args() | |
| if args.verbose: | |
| logging.getLogger().setLevel(logging.DEBUG) | |
| video_ids: list[str] = [] | |
| expand_failures = 0 | |
| for inp in args.videos: | |
| try: | |
| video_ids.extend(expand_input(inp)) | |
| except Exception as exc: | |
| logger.error("Could not expand %r: %s", inp, exc) | |
| expand_failures += 1 | |
| results = [process_video(vid, args.output) for vid in video_ids] | |
| success = sum(results) | |
| total = len(results) + expand_failures | |
| logger.info("%d/%d transcripts downloaded", success, total) | |
| if success < total: | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() | |