import os import uuid import json import asyncio import logging import subprocess from pathlib import Path from typing import Optional import yt_dlp from fastapi import FastAPI, HTTPException, Request, Body from fastapi.responses import JSONResponse, FileResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, HttpUrl # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s:%(message)s") logger = logging.getLogger("main") # --------------------------------------------------------------------------- # App setup # --------------------------------------------------------------------------- app = FastAPI( title="yt-dlp API", description="Download videos, fetch info, and stream HLS via yt-dlp + Deno/EJS.", version="1.0.0", ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- DOWNLOAD_DIR = Path("downloads") DOWNLOAD_DIR.mkdir(exist_ok=True) COOKIE_FILE = "www.youtube.com_cookies.txt" # Map human-friendly quality labels → yt-dlp format selectors # For video qualities, we target the max dimension (height or width) # and prefer H.264 for ≤1080, best codec for higher. QUALITY_MAP: dict[str, str] = { # --- Video --- "best": "bestvideo+bestaudio/best", "2160": "bestvideo[height<=3888][width<=3888]+bestaudio/best", # 2160 * 1.8 "1440": "bestvideo[height<=2592][width<=2592]+bestaudio/best", "1080": "bestvideo[vcodec^=avc][height<=1944][width<=1944]+bestaudio/bestvideo[height<=1944][width<=1944]+bestaudio/best", "720": "bestvideo[vcodec^=avc][height<=1296][width<=1296]+bestaudio/bestvideo[height<=1296][width<=1296]+bestaudio/best", "480": "bestvideo[vcodec^=avc][height<=864][width<=864]+bestaudio/bestvideo[height<=864][width<=864]+bestaudio/best", "360": "bestvideo[vcodec^=avc][height<=648][width<=648]+bestaudio/bestvideo[height<=648][width<=648]+bestaudio/best", "240": "bestvideo[vcodec^=avc][height<=432][width<=432]+bestaudio/bestvideo[height<=432][width<=432]+bestaudio/best", # --- Audio only --- "mp3": "bestaudio/best", "m4a": "bestaudio[ext=m4a]/bestaudio/best", "wav": "bestaudio/best", "flac": "bestaudio/best", "opus": "bestaudio[ext=webm]/bestaudio/best", } AUDIO_FORMATS = {"mp3", "m4a", "wav", "flac", "opus"} ALLOWED_QUALITIES = set(QUALITY_MAP.keys()) # --------------------------------------------------------------------------- # Pydantic models # --------------------------------------------------------------------------- class DownloadRequest(BaseModel): url: HttpUrl quality: str = "best" # any key from QUALITY_MAP prefer_h264: bool = True # ignored for audio / >1080 (already baked in) class InfoRequest(BaseModel): url: HttpUrl flat: bool = False # True = fast playlist-level info only class HLSRequest(BaseModel): url: HttpUrl quality: str = "best" class DownloadResponse(BaseModel): url: str filename: str format: str filesize_approx: Optional[int] = None class HLSResponse(BaseModel): url: str filename: str title: Optional[str] = None duration: Optional[float] = None thumbnail: Optional[str] = None class ErrorResponse(BaseModel): detail: str # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def base_ydl_opts() -> dict: """Common yt-dlp options shared across all calls.""" opts: dict = { "javascript_runtime": "deno", "extractor_args": { "youtube": { "player_client": ["web", "tv"], } }, "quiet": True, "noprogress": True, "noplaylist": True, } if os.path.exists(COOKIE_FILE): opts["cookiefile"] = COOKIE_FILE logger.info("Cookie file found, using it.") else: logger.warning(f"Cookie file '{COOKIE_FILE}' not found.") return opts def resolve_format_selector(quality: str) -> tuple[str, bool]: """ Returns (format_selector, is_audio_only). Raises HTTPException 400 if quality is unknown. """ q = quality.lower().strip() if q not in QUALITY_MAP: raise HTTPException( status_code=400, detail=f"Unknown quality '{quality}'. Allowed: {sorted(ALLOWED_QUALITIES)}", ) return QUALITY_MAP[q], q in AUDIO_FORMATS def perform_download(ydl_opts: dict, url: str, stem: Path) -> Path: """Run yt-dlp download synchronously and return the output file path.""" logger.info(f"Starting download: {url}") try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([url]) except yt_dlp.utils.DownloadError as e: logger.error(f"yt-dlp download error: {e}") raise HTTPException(status_code=500, detail=str(e)) # Find the file that was written (extension unknown ahead of time) candidates = list(stem.parent.glob(f"{stem.name}.*")) # Filter out yt-dlp temp files candidates = [f for f in candidates if not f.suffix in (".part", ".ytdl")] if not candidates: raise HTTPException(status_code=500, detail="Download completed but output file not found.") # Pick the largest file if somehow multiple exist final = max(candidates, key=lambda f: f.stat().st_size) logger.info(f"Download complete: {final}") return final # --------------------------------------------------------------------------- # Routes # --------------------------------------------------------------------------- @app.get("/") async def root(): return {"status": "ok", "message": "yt-dlp API is running. See /docs for usage."} # ── /download ──────────────────────────────────────────────────────────────── @app.post( "/download", response_model=DownloadResponse, responses={400: {"model": ErrorResponse}, 500: {"model": ErrorResponse}}, summary="Download a video or audio file", description=( "Download media from any yt-dlp supported URL. " "Supported qualities: best, 2160, 1440, 1080, 720, 480, 360, 240, mp3, m4a, wav, flac, opus." ), ) async def download(request: Request, payload: DownloadRequest = Body(...)): logger.info(f"/download url={payload.url} quality={payload.quality}") format_selector, is_audio = resolve_format_selector(payload.quality) quality_lower = payload.quality.lower() unique_id = str(uuid.uuid4()) stem = DOWNLOAD_DIR / unique_id opts = base_ydl_opts() opts["format"] = format_selector opts["outtmpl"] = str(stem) + ".%(ext)s" if is_audio: # Transcode to the requested audio format target_ext = quality_lower # e.g. "mp3" opts["postprocessors"] = [ { "key": "FFmpegExtractAudio", "preferredcodec": target_ext, "preferredquality": "192" if target_ext == "mp3" else "0", } ] logger.info(f"Audio mode: extracting as {target_ext}") else: # Apple Compatibility: Prioritize H.264 (avc1) and AAC (m4a) # We modify the format selector to prefer these codecs if available if quality_lower == "best" or quality_lower.isdigit(): # If a specific quality is requested, we try to find H.264 within that constraint original_fmt = opts.get("format", "bestvideo+bestaudio/best") # This selector tries to find H.264+AAC first, then falls back to original selector opts["format"] = f"bestvideo[vcodec^=avc]+bestaudio[acodec^=mp4a]/best[vcodec^=avc]/({original_fmt})" opts["merge_output_format"] = "mp4" # Ensure audio is AAC if merging to MP4 for best Apple compatibility opts["postprocessors"] = opts.get("postprocessors", []) + [ { "key": "FFmpegVideoConvertor", "preferedformat": "mp4", } ] loop = asyncio.get_event_loop() final_path = await loop.run_in_executor(None, perform_download, opts, str(payload.url), stem) filename = final_path.name filesize = final_path.stat().st_size if final_path.exists() else None download_url = f"{str(request.base_url).rstrip('/')}/downloads/{filename}" return DownloadResponse( url=download_url, filename=filename, format=payload.quality, filesize_approx=filesize, ) # ── /get-info ──────────────────────────────────────────────────────────────── @app.post( "/get-info", summary="Fetch raw media info without downloading", description=( "Returns the raw yt-dlp info dict for the given URL as JSON. " "Set flat=true for fast playlist-level info." ), responses={400: {"model": ErrorResponse}, 500: {"model": ErrorResponse}}, ) async def get_info(payload: InfoRequest = Body(...)): logger.info(f"/get-info url={payload.url} flat={payload.flat}") opts = base_ydl_opts() opts["skip_download"] = True if payload.flat: opts["extract_flat"] = True try: def _extract(): with yt_dlp.YoutubeDL(opts) as ydl: return ydl.extract_info(str(payload.url), download=False) loop = asyncio.get_event_loop() info = await loop.run_in_executor(None, _extract) except yt_dlp.utils.DownloadError as e: logger.error(f"yt-dlp info extraction error: {e}") raise HTTPException(status_code=500, detail=str(e)) if info is None: raise HTTPException(status_code=500, detail="yt-dlp returned no info.") # Sanitize: yt-dlp info dicts are JSON-serialisable but may contain # non-serialisable objects in edge cases — use yt-dlp's own sanitizer. sanitized = yt_dlp.utils.sanitize_filename # just importing to confirm available try: clean = json.loads(json.dumps(info, default=str)) except Exception: clean = {"error": "Info dict could not be fully serialized.", "title": info.get("title")} return JSONResponse(content=clean) # ── /hls ───────────────────────────────────────────────────────────────────── @app.post( "/hls", response_model=HLSResponse, summary="Download video and return metadata + download link", description=( "Extracts metadata and downloads the video in one call. " "Returns the local download URL and video info." ), responses={400: {"model": ErrorResponse}, 500: {"model": ErrorResponse}}, ) async def get_hls(request: Request, payload: HLSRequest = Body(...)): logger.info(f"/hls (download mode) url={payload.url} quality={payload.quality}") format_selector, _ = resolve_format_selector(payload.quality) unique_id = str(uuid.uuid4()) stem = DOWNLOAD_DIR / unique_id opts = base_ydl_opts() opts["format"] = format_selector opts["outtmpl"] = str(stem) + ".%(ext)s" opts["merge_output_format"] = "mp4" try: def _extract_and_download(): with yt_dlp.YoutubeDL(opts) as ydl: # Extract info first to get metadata info = ydl.extract_info(str(payload.url), download=True) return info loop = asyncio.get_event_loop() info = await loop.run_in_executor(None, _extract_and_download) except yt_dlp.utils.DownloadError as e: logger.error(f"yt-dlp error in /hls: {e}") raise HTTPException(status_code=500, detail=str(e)) if info is None: raise HTTPException(status_code=500, detail="yt-dlp returned no info.") # Find the downloaded file candidates = list(stem.parent.glob(f"{stem.name}.*")) candidates = [f for f in candidates if not f.suffix in (".part", ".ytdl")] if not candidates: raise HTTPException(status_code=500, detail="Download failed or file not found.") final_path = max(candidates, key=lambda f: f.stat().st_size) filename = final_path.name download_url = f"{str(request.base_url).rstrip('/')}/downloads/{filename}" return HLSResponse( url=download_url, filename=filename, title=info.get("title"), duration=info.get("duration"), thumbnail=info.get("thumbnail"), ) # ── /downloads/{filename} ───────────────────────────────────────────────────── @app.get( "/downloads/{filename}", summary="Serve a previously downloaded file", ) async def serve_file(filename: str): """Serve files from the downloads directory.""" # Basic path traversal guard safe_name = Path(filename).name file_path = DOWNLOAD_DIR / safe_name if not file_path.exists(): raise HTTPException(status_code=404, detail="File not found.") return FileResponse( path=file_path, filename=safe_name, media_type="application/octet-stream", )