|
|
from fastapi import FastAPI, HTTPException, Query, Header |
|
|
from fastapi.responses import FileResponse, JSONResponse |
|
|
import os, tempfile, shutil, re, json, time, urllib.request |
|
|
|
|
|
|
|
|
PRIMARY = (os.getenv("PIPED_API") or "").strip() |
|
|
PIPED_LIST = [x for x in [ |
|
|
PRIMARY, |
|
|
"https://piped.mha.fi", |
|
|
"https://piped.garudalinux.org", |
|
|
"https://piped.projectsegfau.lt", |
|
|
] if x] |
|
|
|
|
|
UA = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122 Safari/537.36" |
|
|
API_KEY = os.getenv("API_KEY", "") |
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
@app.get("/") |
|
|
def root(): |
|
|
return {"ok": True, "usage": "/download?url=<youtube-url>"} |
|
|
|
|
|
def norm_url(u: str) -> str: |
|
|
m = re.search(r"shorts/([A-Za-z0-9_-]{6,})", u) |
|
|
return f"https://www.youtube.com/watch?v={m.group(1)}" if m else u |
|
|
|
|
|
def vid_from(u: str) -> str | None: |
|
|
m = re.search(r"(?:v=|shorts/)([A-Za-z0-9_-]{6,})", u) |
|
|
return m.group(1) if m else None |
|
|
|
|
|
def http_json(url: str, timeout=30): |
|
|
req = urllib.request.Request(url, headers={"User-Agent": UA, "Accept": "application/json"}) |
|
|
with urllib.request.urlopen(req, timeout=timeout) as r: |
|
|
raw = r.read() |
|
|
return json.loads(raw.decode("utf-8")) |
|
|
|
|
|
def pick_mp4_url(d: dict) -> str: |
|
|
streams = (d.get("videoStreams") or []) + (d.get("formatStreams") or []) |
|
|
|
|
|
for prefer_proxy in (True, False): |
|
|
for s in streams: |
|
|
mime = ((s.get("mimeType") or "") + " " + (s.get("codec") or "")).lower() |
|
|
if "mp4" not in mime: |
|
|
continue |
|
|
url = s.get("proxyUrl") if prefer_proxy else s.get("url") |
|
|
if url: |
|
|
return url |
|
|
raise RuntimeError("no mp4 stream") |
|
|
|
|
|
def piped_download(video_id: str, dst: str) -> str: |
|
|
last_err = None |
|
|
for base in PIPED_LIST: |
|
|
for attempt in (1, 2): |
|
|
try: |
|
|
|
|
|
meta = http_json(f"{base.rstrip('/')}/api/v1/streams/{video_id}?local=true") |
|
|
file_url = pick_mp4_url(meta) |
|
|
req = urllib.request.Request(file_url, headers={"User-Agent": UA, "Referer": base}) |
|
|
with urllib.request.urlopen(req, timeout=180) as r, open(dst, "wb") as f: |
|
|
shutil.copyfileobj(r, f) |
|
|
return dst |
|
|
except Exception as e: |
|
|
last_err = f"{base} try {attempt}: {e}" |
|
|
time.sleep(1) |
|
|
continue |
|
|
raise RuntimeError(last_err or "piped failed") |
|
|
|
|
|
@app.get("/download") |
|
|
def download( |
|
|
url: str = Query(..., min_length=10), |
|
|
x_api_key: str | None = Header(default=None), |
|
|
): |
|
|
if API_KEY and x_api_key != API_KEY: |
|
|
raise HTTPException(status_code=401, detail="Unauthorized") |
|
|
|
|
|
url = norm_url(url) |
|
|
vid = vid_from(url) |
|
|
if not vid: |
|
|
return JSONResponse({"error": f"bad url: {url}"}, status_code=400) |
|
|
|
|
|
tmp = tempfile.mkdtemp() |
|
|
try: |
|
|
dst = os.path.join(tmp, f"{vid}.mp4") |
|
|
try: |
|
|
piped_download(vid, dst) |
|
|
return FileResponse(dst, media_type="video/mp4", filename=os.path.basename(dst)) |
|
|
except Exception as e: |
|
|
return JSONResponse({"error": f"piped failed: {e}"}, status_code=500) |
|
|
finally: |
|
|
shutil.rmtree(tmp, ignore_errors=True) |
|
|
|