Spaces:
Sleeping
Sleeping
| """yt-dlp transcript proxy — HuggingFace Space API | |
| Bypasses HF's YouTube DNS block via DNS-over-HTTPS resolution. | |
| """ | |
| import json, re, os, ssl, socket | |
| from urllib.request import Request, urlopen | |
| from fastapi import FastAPI, Query, HTTPException | |
| from fastapi.responses import PlainTextResponse | |
| import yt_dlp | |
| app = FastAPI(title="yt-dlp transcript proxy") | |
| YT_URL = "https://www.youtube.com/watch?v={}" | |
| # --------------------------------------------------------------------------- | |
| # DNS-over-HTTPS bootstrap — resolve YouTube IPs at startup | |
| # --------------------------------------------------------------------------- | |
| YOUTUBE_HOSTS = [ | |
| "www.youtube.com", "youtube.com", | |
| "youtubei.googleapis.com", | |
| "www.google.com", # for consent redirects | |
| ] | |
| def _resolve_via_doh(hostname): | |
| """Resolve hostname via Google DNS-over-HTTPS.""" | |
| try: | |
| url = f"https://dns.google/resolve?name={hostname}&type=A" | |
| with urlopen(url, timeout=10) as r: | |
| data = json.loads(r.read()) | |
| return [a["data"] for a in data.get("Answer", []) if a.get("type") == 1] | |
| except Exception as e: | |
| print(f"[DNS] DoH failed for {hostname}: {e}") | |
| return [] | |
| def _bootstrap_dns(): | |
| """Patch DNS resolution for YouTube hosts.""" | |
| ip_map = {} | |
| for host in YOUTUBE_HOSTS: | |
| ips = _resolve_via_doh(host) | |
| if ips: | |
| ip_map[host] = ips[0] | |
| print(f"[DNS] {host} -> {ips[0]}") | |
| if not ip_map: | |
| print("[DNS] WARNING: No YouTube IPs resolved") | |
| return | |
| # Try /etc/hosts first | |
| try: | |
| with open("/etc/hosts", "a") as f: | |
| f.write("\n# YouTube DoH resolution\n") | |
| for host, ip in ip_map.items(): | |
| f.write(f"{ip} {host}\n") | |
| print("[DNS] Patched /etc/hosts") | |
| return | |
| except PermissionError: | |
| pass | |
| # Fallback: monkey-patch socket.getaddrinfo | |
| _orig = socket.getaddrinfo | |
| def _patched(host, port, *args, **kwargs): | |
| if host in ip_map: | |
| return [(socket.AF_INET, socket.SOCK_STREAM, 6, '', (ip_map[host], port or 443))] | |
| return _orig(host, port, *args, **kwargs) | |
| socket.getaddrinfo = _patched | |
| print(f"[DNS] Socket patched for: {list(ip_map.keys())}") | |
| _bootstrap_dns() | |
| # --------------------------------------------------------------------------- | |
| # Helpers | |
| # --------------------------------------------------------------------------- | |
| def _base_opts(): | |
| return { | |
| "quiet": True, | |
| "no_warnings": True, | |
| "skip_download": True, | |
| "socket_timeout": 30, | |
| "nocheckcertificate": True, | |
| } | |
| def _ssl_ctx(): | |
| ctx = ssl.create_default_context() | |
| ctx.check_hostname = False | |
| ctx.verify_mode = ssl.CERT_NONE | |
| return ctx | |
| # --------------------------------------------------------------------------- | |
| # Endpoints | |
| # --------------------------------------------------------------------------- | |
| def health(): | |
| dns_ok = False | |
| try: | |
| socket.getaddrinfo("www.youtube.com", 443) | |
| dns_ok = True | |
| except Exception: | |
| pass | |
| return { | |
| "ok": True, | |
| "service": "yt-dlp-transcript-proxy", | |
| "yt_dlp_version": yt_dlp.version.__version__, | |
| "youtube_dns": dns_ok, | |
| } | |
| def list_subs(v: str = Query(..., description="YouTube video ID")): | |
| """List available subtitle languages.""" | |
| opts = _base_opts() | |
| try: | |
| with yt_dlp.YoutubeDL(opts) as ydl: | |
| info = ydl.extract_info(YT_URL.format(v), download=False) | |
| except yt_dlp.utils.DownloadError as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| manual = info.get("subtitles") or {} | |
| auto = info.get("automatic_captions") or {} | |
| return { | |
| "video_id": v, | |
| "title": info.get("title"), | |
| "duration": info.get("duration"), | |
| "manual": {lang: [f["ext"] for f in fmts] for lang, fmts in manual.items()}, | |
| "auto": list(auto.keys())[:20], | |
| } | |
| def get_transcript( | |
| v: str = Query(..., description="YouTube video ID"), | |
| lang: str = Query("en", description="Language code"), | |
| auto: bool = Query(True, description="Include auto-generated captions"), | |
| fmt: str = Query("json", description="Output: json or text"), | |
| ): | |
| """Extract transcript with timestamps.""" | |
| opts = _base_opts() | |
| try: | |
| with yt_dlp.YoutubeDL(opts) as ydl: | |
| info = ydl.extract_info(YT_URL.format(v), download=False) | |
| except yt_dlp.utils.DownloadError as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| # Find subtitle URL (prefer manual, fallback to auto) | |
| sub_url = None | |
| sources = ["subtitles", "automatic_captions"] if auto else ["subtitles"] | |
| for src in sources: | |
| subs = info.get(src) or {} | |
| if lang in subs: | |
| for f in subs[lang]: | |
| if f.get("ext") == "json3": | |
| sub_url = f.get("url") | |
| break | |
| if sub_url: | |
| break | |
| if not sub_url: | |
| available = list((info.get("subtitles") or {}).keys()) | |
| raise HTTPException( | |
| status_code=404, | |
| detail=f"No subtitles for lang={lang}. Available: {available}" | |
| ) | |
| # Fetch subtitle data | |
| try: | |
| req = Request(sub_url, headers={"User-Agent": "Mozilla/5.0"}) | |
| with urlopen(req, context=_ssl_ctx(), timeout=30) as r: | |
| raw = json.loads(r.read()) | |
| except Exception as e: | |
| raise HTTPException(status_code=502, detail=f"Subtitle fetch failed: {e}") | |
| segments = _parse_json3(raw) | |
| if fmt == "text": | |
| return PlainTextResponse(" ".join(s["text"] for s in segments)) | |
| return { | |
| "video_id": v, | |
| "title": info.get("title"), | |
| "channel": info.get("channel"), | |
| "duration": info.get("duration"), | |
| "language": lang, | |
| "segment_count": len(segments), | |
| "full_text": " ".join(s["text"] for s in segments), | |
| "segments": segments, | |
| } | |
| def _parse_json3(raw): | |
| events = raw.get("events", []) | |
| segments = [] | |
| for ev in events: | |
| segs = ev.get("segs") | |
| if not segs: | |
| continue | |
| text = "".join(s.get("utf8", "") for s in segs).strip().replace("\n", " ") | |
| if not text: | |
| continue | |
| start_ms = ev.get("tStartMs", 0) | |
| dur_ms = ev.get("dDurationMs", 0) | |
| segments.append({ | |
| "start": round(start_ms / 1000, 2), | |
| "end": round((start_ms + dur_ms) / 1000, 2), | |
| "text": text, | |
| }) | |
| return segments | |