post-conference-report / scripts /download_transcripts.py
Kacper Łukawski
Allow downloading a playlist from YT
070f7e4
#!/usr/bin/env python3
"""Download transcripts from YouTube videos and save them as text files."""
import argparse
import logging
import re
import sys
from pathlib import Path
from urllib.parse import parse_qs, urlparse
import yt_dlp
from docx import Document
from docx.oxml import OxmlElement
from docx.oxml.ns import qn
from youtube_transcript_api import (
NoTranscriptFound,
TranscriptsDisabled,
VideoUnavailable,
YouTubeTranscriptApi,
)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
logger = logging.getLogger(__name__)
_ENGLISH_CODES = ["en", "en-US", "en-GB"]
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Download YouTube video transcripts to text files.")
parser.add_argument("videos", nargs="+", metavar="URL_OR_ID", help="YouTube URLs or video IDs")
parser.add_argument("-o", "--output", type=Path, default=Path("transcripts"), metavar="DIR")
parser.add_argument("-v", "--verbose", action="store_true")
return parser.parse_args()
def extract_video_id(url_or_id: str) -> str:
parsed = urlparse(url_or_id)
if parsed.scheme in ("http", "https"):
host = parsed.netloc.lower().lstrip("www.")
if host == "youtu.be":
vid = parsed.path.lstrip("/").split("/")[0]
elif host in ("youtube.com", "m.youtube.com"):
if parsed.path.startswith("/shorts/"):
vid = parsed.path.split("/shorts/")[1].split("/")[0]
else:
qs = parse_qs(parsed.query)
candidates = qs.get("v", [])
if not candidates:
raise ValueError(f"No video ID in URL: {url_or_id}")
vid = candidates[0]
else:
raise ValueError(f"Unrecognised YouTube host: {parsed.netloc}")
vid = vid.split("&")[0].split("?")[0]
else:
vid = url_or_id.strip()
if not re.fullmatch(r"[a-zA-Z0-9_-]{11}", vid):
raise ValueError(f"Does not look like a video ID: {vid!r}")
return vid
def expand_input(url_or_id: str) -> list[str]:
"""Return video IDs for a single video URL/ID or all videos in a playlist."""
if re.fullmatch(r"[a-zA-Z0-9_-]{11}", url_or_id.strip()):
return [url_or_id.strip()]
opts = {"quiet": True, "no_warnings": True, "extract_flat": True}
with yt_dlp.YoutubeDL(opts) as ydl:
info = ydl.extract_info(url_or_id, download=False)
if info.get("_type") == "playlist":
ids = [e["id"] for e in (info.get("entries") or []) if e.get("id")]
logger.info("Playlist %r: %d videos", info.get("title", url_or_id), len(ids))
return ids
return [info["id"]]
def fetch_title(video_id: str) -> str:
opts = {
"quiet": True,
"no_warnings": True,
"extract_flat": True,
"skip_download": True,
}
with yt_dlp.YoutubeDL(opts) as ydl:
info = ydl.extract_info(f"https://www.youtube.com/watch?v={video_id}", download=False)
if not info or "title" not in info:
raise RuntimeError("yt-dlp returned no title")
return info["title"]
def fetch_transcript(video_id: str) -> str:
api = YouTubeTranscriptApi()
transcript_list = api.list(video_id)
transcript = None
try:
transcript = transcript_list.find_manually_created_transcript(_ENGLISH_CODES)
logger.debug("Using manually-created English transcript for %s", video_id)
except NoTranscriptFound:
pass
if transcript is None:
try:
transcript = transcript_list.find_generated_transcript(_ENGLISH_CODES)
logger.debug("Using auto-generated English transcript for %s", video_id)
except NoTranscriptFound:
pass
if transcript is None:
available = list(transcript_list)
if not available:
raise NoTranscriptFound(video_id, [])
transcript = available[0]
logger.warning(
"No English transcript for %s; using language %s", video_id, transcript.language_code
)
entries = transcript.fetch()
return " ".join(entry.text for entry in entries)
def sanitize_filename(title: str, video_id: str) -> str:
name = re.sub(r'[<>:"/\\|?*\x00-\x1f]', "_", title)
name = re.sub(r"\s+", "_", name)
name = re.sub(r"_+", "_", name)
name = name.strip("_.")
name = name[:200]
return name if name else video_id
def _add_hyperlink(paragraph, text: str, url: str) -> None:
r_id = paragraph.part.relate_to(
url,
"http://schemas.openxmlformats.org/officeDocument/2006/relationships/hyperlink",
is_external=True,
)
hyperlink = OxmlElement("w:hyperlink")
hyperlink.set(qn("r:id"), r_id)
run = OxmlElement("w:r")
rpr = OxmlElement("w:rPr")
style = OxmlElement("w:rStyle")
style.set(qn("w:val"), "Hyperlink")
rpr.append(style)
run.append(rpr)
t = OxmlElement("w:t")
t.text = text
run.append(t)
hyperlink.append(run)
paragraph._p.append(hyperlink)
def save_transcript(text: str, title: str, video_id: str, stem: str, output_dir: Path) -> Path:
url = f"https://www.youtube.com/watch?v={video_id}"
output_dir.mkdir(parents=True, exist_ok=True)
path = output_dir / f"{stem}.docx"
doc = Document()
doc.add_heading(title, level=1)
link_para = doc.add_paragraph()
_add_hyperlink(link_para, url, url)
doc.add_heading("Transcript", level=2)
doc.add_paragraph(text)
doc.save(path)
return path
def process_video(url_or_id: str, output_dir: Path) -> bool:
video_id: str | None = None
try:
video_id = extract_video_id(url_or_id)
except ValueError as exc:
logger.error("Could not parse video ID from %r: %s", url_or_id, exc)
return False
title = video_id
try:
title = fetch_title(video_id)
except Exception as exc:
logger.warning("Could not fetch title for %s: %s — using video ID as filename", video_id, exc)
title_stem = sanitize_filename(title, video_id)
try:
text = fetch_transcript(video_id)
except TranscriptsDisabled:
logger.error("Transcripts are disabled for %s", video_id)
return False
except VideoUnavailable:
logger.error("Video unavailable: %s", video_id)
return False
except NoTranscriptFound:
logger.error("No transcript available for %s", video_id)
return False
except Exception as exc:
logger.error("Unexpected error fetching transcript for %s: %s", video_id, exc)
return False
try:
path = save_transcript(text, title, video_id, title_stem, output_dir)
logger.info("Saved: %s", path)
return True
except Exception as exc:
logger.error("Could not write file for %s: %s", video_id, exc)
return False
def main() -> None:
args = parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
video_ids: list[str] = []
expand_failures = 0
for inp in args.videos:
try:
video_ids.extend(expand_input(inp))
except Exception as exc:
logger.error("Could not expand %r: %s", inp, exc)
expand_failures += 1
results = [process_video(vid, args.output) for vid in video_ids]
success = sum(results)
total = len(results) + expand_failures
logger.info("%d/%d transcripts downloaded", success, total)
if success < total:
sys.exit(1)
if __name__ == "__main__":
main()