| import os |
| import uuid |
| import json |
| import asyncio |
| import logging |
| import subprocess |
| from pathlib import Path |
| from typing import Optional |
|
|
| import yt_dlp |
| from fastapi import FastAPI, HTTPException, Request, Body |
| from fastapi.responses import JSONResponse, FileResponse |
| from fastapi.middleware.cors import CORSMiddleware |
| from pydantic import BaseModel, HttpUrl |
|
|
| |
| |
| |
| logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s:%(message)s") |
| logger = logging.getLogger("main") |
|
|
| |
| |
| |
| app = FastAPI( |
| title="yt-dlp API", |
| description="Download videos, fetch info, and stream HLS via yt-dlp + Deno/EJS.", |
| version="1.0.0", |
| ) |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| |
| |
| DOWNLOAD_DIR = Path("downloads") |
| DOWNLOAD_DIR.mkdir(exist_ok=True) |
|
|
| COOKIE_FILE = "www.youtube.com_cookies.txt" |
|
|
| |
| |
| |
| QUALITY_MAP: dict[str, str] = { |
| |
| "best": "bestvideo+bestaudio/best", |
| "2160": "bestvideo[height<=3888][width<=3888]+bestaudio/best", |
| "1440": "bestvideo[height<=2592][width<=2592]+bestaudio/best", |
| "1080": "bestvideo[vcodec^=avc][height<=1944][width<=1944]+bestaudio/bestvideo[height<=1944][width<=1944]+bestaudio/best", |
| "720": "bestvideo[vcodec^=avc][height<=1296][width<=1296]+bestaudio/bestvideo[height<=1296][width<=1296]+bestaudio/best", |
| "480": "bestvideo[vcodec^=avc][height<=864][width<=864]+bestaudio/bestvideo[height<=864][width<=864]+bestaudio/best", |
| "360": "bestvideo[vcodec^=avc][height<=648][width<=648]+bestaudio/bestvideo[height<=648][width<=648]+bestaudio/best", |
| "240": "bestvideo[vcodec^=avc][height<=432][width<=432]+bestaudio/bestvideo[height<=432][width<=432]+bestaudio/best", |
| |
| "mp3": "bestaudio/best", |
| "m4a": "bestaudio[ext=m4a]/bestaudio/best", |
| "wav": "bestaudio/best", |
| "flac": "bestaudio/best", |
| "opus": "bestaudio[ext=webm]/bestaudio/best", |
| } |
|
|
| AUDIO_FORMATS = {"mp3", "m4a", "wav", "flac", "opus"} |
|
|
| ALLOWED_QUALITIES = set(QUALITY_MAP.keys()) |
|
|
|
|
| |
| |
| |
| class DownloadRequest(BaseModel): |
| url: HttpUrl |
| quality: str = "best" |
| prefer_h264: bool = True |
|
|
|
|
| class InfoRequest(BaseModel): |
| url: HttpUrl |
| flat: bool = False |
|
|
|
|
| class HLSRequest(BaseModel): |
| url: HttpUrl |
| quality: str = "best" |
|
|
|
|
| class DownloadResponse(BaseModel): |
| url: str |
| filename: str |
| format: str |
| filesize_approx: Optional[int] = None |
|
|
|
|
| class HLSResponse(BaseModel): |
| url: str |
| filename: str |
| title: Optional[str] = None |
| duration: Optional[float] = None |
| thumbnail: Optional[str] = None |
|
|
|
|
| class ErrorResponse(BaseModel): |
| detail: str |
|
|
|
|
| |
| |
| |
| def base_ydl_opts() -> dict: |
| """Common yt-dlp options shared across all calls.""" |
| opts: dict = { |
| "javascript_runtime": "deno", |
| "extractor_args": { |
| "youtube": { |
| "player_client": ["web", "tv"], |
| } |
| }, |
| "quiet": True, |
| "noprogress": True, |
| "noplaylist": True, |
| } |
| if os.path.exists(COOKIE_FILE): |
| opts["cookiefile"] = COOKIE_FILE |
| logger.info("Cookie file found, using it.") |
| else: |
| logger.warning(f"Cookie file '{COOKIE_FILE}' not found.") |
| return opts |
|
|
|
|
| def resolve_format_selector(quality: str) -> tuple[str, bool]: |
| """ |
| Returns (format_selector, is_audio_only). |
| Raises HTTPException 400 if quality is unknown. |
| """ |
| q = quality.lower().strip() |
| if q not in QUALITY_MAP: |
| raise HTTPException( |
| status_code=400, |
| detail=f"Unknown quality '{quality}'. Allowed: {sorted(ALLOWED_QUALITIES)}", |
| ) |
| return QUALITY_MAP[q], q in AUDIO_FORMATS |
|
|
|
|
| def perform_download(ydl_opts: dict, url: str, stem: Path) -> Path: |
| """Run yt-dlp download synchronously and return the output file path.""" |
| logger.info(f"Starting download: {url}") |
| try: |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
| ydl.download([url]) |
| except yt_dlp.utils.DownloadError as e: |
| logger.error(f"yt-dlp download error: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| |
| candidates = list(stem.parent.glob(f"{stem.name}.*")) |
| |
| candidates = [f for f in candidates if not f.suffix in (".part", ".ytdl")] |
| if not candidates: |
| raise HTTPException(status_code=500, detail="Download completed but output file not found.") |
|
|
| |
| final = max(candidates, key=lambda f: f.stat().st_size) |
| logger.info(f"Download complete: {final}") |
| return final |
|
|
|
|
| |
| |
| |
|
|
| @app.get("/") |
| async def root(): |
| return {"status": "ok", "message": "yt-dlp API is running. See /docs for usage."} |
|
|
|
|
| |
|
|
| @app.post( |
| "/download", |
| response_model=DownloadResponse, |
| responses={400: {"model": ErrorResponse}, 500: {"model": ErrorResponse}}, |
| summary="Download a video or audio file", |
| description=( |
| "Download media from any yt-dlp supported URL. " |
| "Supported qualities: best, 2160, 1440, 1080, 720, 480, 360, 240, mp3, m4a, wav, flac, opus." |
| ), |
| ) |
| async def download(request: Request, payload: DownloadRequest = Body(...)): |
| logger.info(f"/download url={payload.url} quality={payload.quality}") |
|
|
| format_selector, is_audio = resolve_format_selector(payload.quality) |
| quality_lower = payload.quality.lower() |
|
|
| unique_id = str(uuid.uuid4()) |
| stem = DOWNLOAD_DIR / unique_id |
|
|
| opts = base_ydl_opts() |
| opts["format"] = format_selector |
| opts["outtmpl"] = str(stem) + ".%(ext)s" |
|
|
| if is_audio: |
| |
| target_ext = quality_lower |
| opts["postprocessors"] = [ |
| { |
| "key": "FFmpegExtractAudio", |
| "preferredcodec": target_ext, |
| "preferredquality": "192" if target_ext == "mp3" else "0", |
| } |
| ] |
| logger.info(f"Audio mode: extracting as {target_ext}") |
| else: |
| |
| |
| if quality_lower == "best" or quality_lower.isdigit(): |
| |
| original_fmt = opts.get("format", "bestvideo+bestaudio/best") |
| |
| opts["format"] = f"bestvideo[vcodec^=avc]+bestaudio[acodec^=mp4a]/best[vcodec^=avc]/({original_fmt})" |
| |
| opts["merge_output_format"] = "mp4" |
| |
| opts["postprocessors"] = opts.get("postprocessors", []) + [ |
| { |
| "key": "FFmpegVideoConvertor", |
| "preferedformat": "mp4", |
| } |
| ] |
|
|
| loop = asyncio.get_event_loop() |
| final_path = await loop.run_in_executor(None, perform_download, opts, str(payload.url), stem) |
|
|
| filename = final_path.name |
| filesize = final_path.stat().st_size if final_path.exists() else None |
| download_url = f"{str(request.base_url).rstrip('/')}/downloads/{filename}" |
|
|
| return DownloadResponse( |
| url=download_url, |
| filename=filename, |
| format=payload.quality, |
| filesize_approx=filesize, |
| ) |
|
|
|
|
| |
|
|
| @app.post( |
| "/get-info", |
| summary="Fetch raw media info without downloading", |
| description=( |
| "Returns the raw yt-dlp info dict for the given URL as JSON. " |
| "Set flat=true for fast playlist-level info." |
| ), |
| responses={400: {"model": ErrorResponse}, 500: {"model": ErrorResponse}}, |
| ) |
| async def get_info(payload: InfoRequest = Body(...)): |
| logger.info(f"/get-info url={payload.url} flat={payload.flat}") |
|
|
| opts = base_ydl_opts() |
| opts["skip_download"] = True |
|
|
| if payload.flat: |
| opts["extract_flat"] = True |
|
|
| try: |
| def _extract(): |
| with yt_dlp.YoutubeDL(opts) as ydl: |
| return ydl.extract_info(str(payload.url), download=False) |
|
|
| loop = asyncio.get_event_loop() |
| info = await loop.run_in_executor(None, _extract) |
| except yt_dlp.utils.DownloadError as e: |
| logger.error(f"yt-dlp info extraction error: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| if info is None: |
| raise HTTPException(status_code=500, detail="yt-dlp returned no info.") |
|
|
| |
| |
| sanitized = yt_dlp.utils.sanitize_filename |
| try: |
| clean = json.loads(json.dumps(info, default=str)) |
| except Exception: |
| clean = {"error": "Info dict could not be fully serialized.", "title": info.get("title")} |
|
|
| return JSONResponse(content=clean) |
|
|
|
|
| |
|
|
| @app.post( |
| "/hls", |
| response_model=HLSResponse, |
| summary="Download video and return metadata + download link", |
| description=( |
| "Extracts metadata and downloads the video in one call. " |
| "Returns the local download URL and video info." |
| ), |
| responses={400: {"model": ErrorResponse}, 500: {"model": ErrorResponse}}, |
| ) |
| async def get_hls(request: Request, payload: HLSRequest = Body(...)): |
| logger.info(f"/hls (download mode) url={payload.url} quality={payload.quality}") |
|
|
| format_selector, _ = resolve_format_selector(payload.quality) |
| unique_id = str(uuid.uuid4()) |
| stem = DOWNLOAD_DIR / unique_id |
|
|
| opts = base_ydl_opts() |
| opts["format"] = format_selector |
| opts["outtmpl"] = str(stem) + ".%(ext)s" |
| opts["merge_output_format"] = "mp4" |
|
|
| try: |
| def _extract_and_download(): |
| with yt_dlp.YoutubeDL(opts) as ydl: |
| |
| info = ydl.extract_info(str(payload.url), download=True) |
| return info |
|
|
| loop = asyncio.get_event_loop() |
| info = await loop.run_in_executor(None, _extract_and_download) |
| except yt_dlp.utils.DownloadError as e: |
| logger.error(f"yt-dlp error in /hls: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| if info is None: |
| raise HTTPException(status_code=500, detail="yt-dlp returned no info.") |
|
|
| |
| candidates = list(stem.parent.glob(f"{stem.name}.*")) |
| candidates = [f for f in candidates if not f.suffix in (".part", ".ytdl")] |
| if not candidates: |
| raise HTTPException(status_code=500, detail="Download failed or file not found.") |
| |
| final_path = max(candidates, key=lambda f: f.stat().st_size) |
| filename = final_path.name |
| download_url = f"{str(request.base_url).rstrip('/')}/downloads/{filename}" |
|
|
| return HLSResponse( |
| url=download_url, |
| filename=filename, |
| title=info.get("title"), |
| duration=info.get("duration"), |
| thumbnail=info.get("thumbnail"), |
| ) |
|
|
|
|
| |
|
|
| @app.get( |
| "/downloads/{filename}", |
| summary="Serve a previously downloaded file", |
| ) |
| async def serve_file(filename: str): |
| """Serve files from the downloads directory.""" |
| |
| safe_name = Path(filename).name |
| file_path = DOWNLOAD_DIR / safe_name |
|
|
| if not file_path.exists(): |
| raise HTTPException(status_code=404, detail="File not found.") |
|
|
| return FileResponse( |
| path=file_path, |
| filename=safe_name, |
| media_type="application/octet-stream", |
| ) |
|
|