yt-dlp-proxy / app.py
korakot's picture
Upload folder using huggingface_hub
154046b verified
"""yt-dlp transcript proxy — HuggingFace Space API
Bypasses HF's YouTube DNS block via DNS-over-HTTPS resolution.
"""
import json, re, os, ssl, socket
from urllib.request import Request, urlopen
from fastapi import FastAPI, Query, HTTPException
from fastapi.responses import PlainTextResponse
import yt_dlp
app = FastAPI(title="yt-dlp transcript proxy")
YT_URL = "https://www.youtube.com/watch?v={}"
# ---------------------------------------------------------------------------
# DNS-over-HTTPS bootstrap — resolve YouTube IPs at startup
# ---------------------------------------------------------------------------
YOUTUBE_HOSTS = [
"www.youtube.com", "youtube.com",
"youtubei.googleapis.com",
"www.google.com", # for consent redirects
]
def _resolve_via_doh(hostname):
"""Resolve hostname via Google DNS-over-HTTPS."""
try:
url = f"https://dns.google/resolve?name={hostname}&type=A"
with urlopen(url, timeout=10) as r:
data = json.loads(r.read())
return [a["data"] for a in data.get("Answer", []) if a.get("type") == 1]
except Exception as e:
print(f"[DNS] DoH failed for {hostname}: {e}")
return []
def _bootstrap_dns():
"""Patch DNS resolution for YouTube hosts."""
ip_map = {}
for host in YOUTUBE_HOSTS:
ips = _resolve_via_doh(host)
if ips:
ip_map[host] = ips[0]
print(f"[DNS] {host} -> {ips[0]}")
if not ip_map:
print("[DNS] WARNING: No YouTube IPs resolved")
return
# Try /etc/hosts first
try:
with open("/etc/hosts", "a") as f:
f.write("\n# YouTube DoH resolution\n")
for host, ip in ip_map.items():
f.write(f"{ip} {host}\n")
print("[DNS] Patched /etc/hosts")
return
except PermissionError:
pass
# Fallback: monkey-patch socket.getaddrinfo
_orig = socket.getaddrinfo
def _patched(host, port, *args, **kwargs):
if host in ip_map:
return [(socket.AF_INET, socket.SOCK_STREAM, 6, '', (ip_map[host], port or 443))]
return _orig(host, port, *args, **kwargs)
socket.getaddrinfo = _patched
print(f"[DNS] Socket patched for: {list(ip_map.keys())}")
_bootstrap_dns()
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _base_opts():
return {
"quiet": True,
"no_warnings": True,
"skip_download": True,
"socket_timeout": 30,
"nocheckcertificate": True,
}
def _ssl_ctx():
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
return ctx
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@app.get("/")
def health():
dns_ok = False
try:
socket.getaddrinfo("www.youtube.com", 443)
dns_ok = True
except Exception:
pass
return {
"ok": True,
"service": "yt-dlp-transcript-proxy",
"yt_dlp_version": yt_dlp.version.__version__,
"youtube_dns": dns_ok,
}
@app.get("/subs")
def list_subs(v: str = Query(..., description="YouTube video ID")):
"""List available subtitle languages."""
opts = _base_opts()
try:
with yt_dlp.YoutubeDL(opts) as ydl:
info = ydl.extract_info(YT_URL.format(v), download=False)
except yt_dlp.utils.DownloadError as e:
raise HTTPException(status_code=400, detail=str(e))
manual = info.get("subtitles") or {}
auto = info.get("automatic_captions") or {}
return {
"video_id": v,
"title": info.get("title"),
"duration": info.get("duration"),
"manual": {lang: [f["ext"] for f in fmts] for lang, fmts in manual.items()},
"auto": list(auto.keys())[:20],
}
@app.get("/transcript")
def get_transcript(
v: str = Query(..., description="YouTube video ID"),
lang: str = Query("en", description="Language code"),
auto: bool = Query(True, description="Include auto-generated captions"),
fmt: str = Query("json", description="Output: json or text"),
):
"""Extract transcript with timestamps."""
opts = _base_opts()
try:
with yt_dlp.YoutubeDL(opts) as ydl:
info = ydl.extract_info(YT_URL.format(v), download=False)
except yt_dlp.utils.DownloadError as e:
raise HTTPException(status_code=400, detail=str(e))
# Find subtitle URL (prefer manual, fallback to auto)
sub_url = None
sources = ["subtitles", "automatic_captions"] if auto else ["subtitles"]
for src in sources:
subs = info.get(src) or {}
if lang in subs:
for f in subs[lang]:
if f.get("ext") == "json3":
sub_url = f.get("url")
break
if sub_url:
break
if not sub_url:
available = list((info.get("subtitles") or {}).keys())
raise HTTPException(
status_code=404,
detail=f"No subtitles for lang={lang}. Available: {available}"
)
# Fetch subtitle data
try:
req = Request(sub_url, headers={"User-Agent": "Mozilla/5.0"})
with urlopen(req, context=_ssl_ctx(), timeout=30) as r:
raw = json.loads(r.read())
except Exception as e:
raise HTTPException(status_code=502, detail=f"Subtitle fetch failed: {e}")
segments = _parse_json3(raw)
if fmt == "text":
return PlainTextResponse(" ".join(s["text"] for s in segments))
return {
"video_id": v,
"title": info.get("title"),
"channel": info.get("channel"),
"duration": info.get("duration"),
"language": lang,
"segment_count": len(segments),
"full_text": " ".join(s["text"] for s in segments),
"segments": segments,
}
def _parse_json3(raw):
events = raw.get("events", [])
segments = []
for ev in events:
segs = ev.get("segs")
if not segs:
continue
text = "".join(s.get("utf8", "") for s in segs).strip().replace("\n", " ")
if not text:
continue
start_ms = ev.get("tStartMs", 0)
dur_ms = ev.get("dDurationMs", 0)
segments.append({
"start": round(start_ms / 1000, 2),
"end": round((start_ms + dur_ms) / 1000, 2),
"text": text,
})
return segments