from fastapi import FastAPI, HTTPException, Query, Header from fastapi.responses import FileResponse, JSONResponse import os, tempfile, shutil, re, json, time, urllib.request # ตั้งค่า Piped instance ผ่าน Variables ได้ (PIPED_API) PRIMARY = (os.getenv("PIPED_API") or "").strip() PIPED_LIST = [x for x in [ PRIMARY, "https://piped.mha.fi", "https://piped.garudalinux.org", "https://piped.projectsegfau.lt", ] if x] UA = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122 Safari/537.36" API_KEY = os.getenv("API_KEY", "") app = FastAPI() @app.get("/") def root(): return {"ok": True, "usage": "/download?url="} def norm_url(u: str) -> str: m = re.search(r"shorts/([A-Za-z0-9_-]{6,})", u) return f"https://www.youtube.com/watch?v={m.group(1)}" if m else u def vid_from(u: str) -> str | None: m = re.search(r"(?:v=|shorts/)([A-Za-z0-9_-]{6,})", u) return m.group(1) if m else None def http_json(url: str, timeout=30): req = urllib.request.Request(url, headers={"User-Agent": UA, "Accept": "application/json"}) with urllib.request.urlopen(req, timeout=timeout) as r: raw = r.read() return json.loads(raw.decode("utf-8")) def pick_mp4_url(d: dict) -> str: streams = (d.get("videoStreams") or []) + (d.get("formatStreams") or []) # เลือก proxyUrl ก่อน ถ้าไม่มีค่อยใช้ url for prefer_proxy in (True, False): for s in streams: mime = ((s.get("mimeType") or "") + " " + (s.get("codec") or "")).lower() if "mp4" not in mime: continue url = s.get("proxyUrl") if prefer_proxy else s.get("url") if url: return url raise RuntimeError("no mp4 stream") def piped_download(video_id: str, dst: str) -> str: last_err = None for base in PIPED_LIST: for attempt in (1, 2): try: # local=true ขอให้ Piped proxy ให้ (เลี่ยง googlevideo ตรง ๆ) meta = http_json(f"{base.rstrip('/')}/api/v1/streams/{video_id}?local=true") file_url = pick_mp4_url(meta) req = urllib.request.Request(file_url, headers={"User-Agent": UA, "Referer": base}) with urllib.request.urlopen(req, timeout=180) as r, open(dst, "wb") as f: shutil.copyfileobj(r, f) return dst except Exception as e: last_err = f"{base} try {attempt}: {e}" time.sleep(1) continue raise RuntimeError(last_err or "piped failed") @app.get("/download") def download( url: str = Query(..., min_length=10), x_api_key: str | None = Header(default=None), ): if API_KEY and x_api_key != API_KEY: raise HTTPException(status_code=401, detail="Unauthorized") url = norm_url(url) vid = vid_from(url) if not vid: return JSONResponse({"error": f"bad url: {url}"}, status_code=400) tmp = tempfile.mkdtemp() try: dst = os.path.join(tmp, f"{vid}.mp4") try: piped_download(vid, dst) return FileResponse(dst, media_type="video/mp4", filename=os.path.basename(dst)) except Exception as e: return JSONResponse({"error": f"piped failed: {e}"}, status_code=500) finally: shutil.rmtree(tmp, ignore_errors=True)