Spaces:
Sleeping
Sleeping
| import os | |
| import uuid | |
| import logging | |
| from pathlib import Path | |
| from typing import Optional, Literal, Union # Import Union | |
| # --- FastAPI Imports --- | |
| from fastapi import FastAPI, Request, HTTPException, BackgroundTasks, Body | |
| from fastapi.responses import JSONResponse, FileResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from pydantic import BaseModel, HttpUrl, Field, field_validator # Import field_validator | |
| # --- yt-dlp Import --- | |
| from yt_dlp import YoutubeDL | |
| # --- Logging Configuration --- | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # --- Constants --- | |
| DOWNLOAD_DIR = Path('downloads') # Use pathlib for paths | |
| COOKIE_FILE = 'www.youtube.com_cookies.txt' # Define cookie file path | |
| # --- Create Download Directory --- | |
| DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True) | |
| # --- FastAPI App Initialization --- | |
| app = FastAPI( | |
| title="YouTube Downloader API", | |
| description="API to fetch info and download audio/video from YouTube using yt-dlp.", | |
| version="1.4.0", # Incremented version | |
| ) | |
| # --- Mount Static Files Directory --- | |
| app.mount("/downloads", StaticFiles(directory=DOWNLOAD_DIR), name="downloads") | |
| # --- Pydantic Models for Request/Response Validation --- | |
| class UrlRequest(BaseModel): | |
| """Request model for endpoints needing just a URL.""" | |
| url: HttpUrl | |
| # Define allowed quality string literals (including numerical ones) | |
| AllowedQualityStr = Literal['best', '240', '480', '720', '1080', '1440', '2160'] | |
| class MaxDownloadRequest(BaseModel): | |
| """Request model for the /max endpoint.""" | |
| url: HttpUrl | |
| # Accept 'best' or specific numerical resolutions as strings | |
| quality: Optional[AllowedQualityStr] = 'best' | |
| class InfoResponse(BaseModel): | |
| """Response model for the /get-info endpoint.""" | |
| title: Optional[str] = None | |
| thumbnail: Optional[str] = None | |
| duration: Optional[float] = None | |
| channel: Optional[str] = None | |
| class DownloadResponse(BaseModel): | |
| """Response model for download endpoints.""" | |
| url: str | |
| filename: str | |
| message: Optional[str] = None | |
| class ErrorResponse(BaseModel): | |
| """Standard error response model.""" | |
| detail: str | |
| # --- Helper Function for Download --- | |
| def perform_download(ydl_opts: dict, url: str, file_path: Path): | |
| """Synchronously downloads using yt-dlp.""" | |
| try: | |
| logger.info(f"Starting download for URL: {url} with options: {ydl_opts}") | |
| ydl_opts['outtmpl'] = str(file_path.with_suffix('.%(ext)s')) | |
| with YoutubeDL(ydl_opts) as ydl: | |
| ydl.extract_info(url, download=True) | |
| logger.info(f"Download finished successfully for URL: {url}") | |
| downloaded_files = list(DOWNLOAD_DIR.glob(f"{file_path.stem}.*")) | |
| if not downloaded_files: | |
| logger.error(f"Download completed but no file found for stem: {file_path.stem}") | |
| part_files = list(DOWNLOAD_DIR.glob(f"{file_path.stem}.*.part")) | |
| for part_file in part_files: | |
| try: | |
| os.remove(part_file) | |
| logger.info(f"Removed leftover part file: {part_file}") | |
| except OSError as rm_err: | |
| logger.error(f"Error removing part file {part_file}: {rm_err}") | |
| raise RuntimeError(f"Could not find downloaded file for {url}") | |
| return downloaded_files[0] | |
| except Exception as e: | |
| logger.error(f"yt-dlp download failed for URL {url}: {e}", exc_info=True) | |
| possible_files = list(DOWNLOAD_DIR.glob(f"{file_path.stem}.*")) | |
| for f in possible_files: | |
| if f.is_file(): | |
| try: | |
| os.remove(f) | |
| logger.info(f"Removed potentially incomplete/failed file: {f}") | |
| except OSError as rm_err: | |
| logger.error(f"Error removing file {f}: {rm_err}") | |
| raise | |
| # --- API Endpoints --- | |
| async def root(): | |
| """Root endpoint providing basic API info.""" | |
| return {"message": "YouTube Downloader API. Use /docs for documentation."} | |
| async def get_info(payload: UrlRequest = Body(...)): | |
| """ | |
| Extracts video information (title, thumbnail, duration, channel) from a given URL. | |
| """ | |
| logger.info(f"Received /get-info request for URL: {payload.url}") | |
| ydl_opts = {} | |
| if os.path.exists(COOKIE_FILE): | |
| ydl_opts['cookiefile'] = COOKIE_FILE | |
| logger.info("Using cookie file.") | |
| else: | |
| logger.warning(f"Cookie file '{COOKIE_FILE}' not found. Some videos might require login/cookies.") | |
| try: | |
| # Use str(payload.url) to pass the URL string to yt-dlp | |
| with YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(str(payload.url), download=False) | |
| return InfoResponse( | |
| title=info.get('title'), | |
| thumbnail=info.get('thumbnail'), | |
| duration=info.get('duration'), | |
| channel=info.get('channel') | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error fetching info for {payload.url}: {e}", exc_info=True) | |
| raise HTTPException(status_code=500, detail=f"Failed to extract video info: {str(e)}") | |
| '''@app.post( | |
| "/download", | |
| response_model=DownloadResponse, | |
| responses={400: {"model": ErrorResponse}, 500: {"model": ErrorResponse}} | |
| ) | |
| async def download_audio(request: Request, payload: UrlRequest = Body(...)): | |
| """ | |
| Downloads the audio track of a video as an MP3 file (128kbps). | |
| """ | |
| logger.info(f"Received /download (audio) request for URL: {payload.url}") | |
| unique_id = str(uuid.uuid4()) | |
| file_path_stem = DOWNLOAD_DIR / unique_id | |
| ydl_opts = { | |
| 'format': '140/m4a/bestaudio/best', | |
| 'outtmpl': str(file_path_stem.with_suffix('.%(ext)s')), | |
| 'postprocessors': [{ | |
| 'key': 'FFmpegExtractAudio', | |
| 'preferredcodec': 'mp3', | |
| 'preferredquality': '128', | |
| }], | |
| 'noplaylist': True, | |
| 'quiet': False, | |
| 'progress_hooks': [lambda d: logger.debug(f"Download progress: {d['status']} - {d.get('_percent_str', '')}")], | |
| } | |
| if os.path.exists(COOKIE_FILE): | |
| ydl_opts['cookiefile'] = COOKIE_FILE | |
| logger.info("Using cookie file for audio download.") | |
| else: | |
| logger.warning(f"Cookie file '{COOKIE_FILE}' not found for audio download.") | |
| try: | |
| # Use str(payload.url) to pass the URL string to the helper | |
| final_file_path = perform_download(ydl_opts, str(payload.url), file_path_stem) | |
| final_filename = final_file_path.name | |
| download_url = f"{str(request.base_url).rstrip('/')}/downloads/{final_filename}" | |
| logger.info(f"Audio download complete for {payload.url}. URL: {download_url}") | |
| return DownloadResponse(url=download_url, filename=final_filename) | |
| except Exception as e: | |
| # Error logged in perform_download | |
| raise HTTPException(status_code=500, detail=f"Audio download failed: {str(e)}") | |
| ''' | |
| async def download_video_max_quality(request: Request, payload: MaxDownloadRequest = Body(...)): | |
| """ | |
| Downloads the video in the specified quality or 'best' available, handling | |
| both landscape and portrait videos correctly. Attempts H.264 codec for 1080 | |
| and lower. Merges video and audio into MP4. | |
| Accepted qualities: 'best', '240', '480', '720', '1080', '1440', '2160'. | |
| Quality number (as string) refers to the maximum dimension (height or width). | |
| """ | |
| logger.info(f"Received /max (video) request for URL: {payload.url} with quality: {payload.quality}") | |
| unique_id = str(uuid.uuid4()) | |
| file_path_stem = DOWNLOAD_DIR / unique_id | |
| # --- Determine yt-dlp Format Selector based on Quality and Codec Preference --- | |
| quality_str = payload.quality # Quality is now guaranteed to be a string from AllowedQualityStr | |
| format_selector = None | |
| max_dim = 0 # Initialize max_dim | |
| if quality_str == 'best': | |
| format_selector = 'bestvideo+bestaudio/best' # Best video and audio, merged if possible | |
| logger.info("Using format selector for 'best' quality.") | |
| else: | |
| # Quality is a numerical string ('240', '480', etc.) | |
| try: | |
| # Convert the validated string quality to an integer for logic | |
| max_dim = int(quality_str) | |
| except ValueError: | |
| # This should not happen if Pydantic validation works, but good practice | |
| logger.error(f"Internal error: Could not convert validated quality string '{quality_str}' to int. Falling back to 'best'.") | |
| format_selector = 'bestvideo+bestaudio/best' | |
| # Set max_dim to a high value to skip specific logic below if format_selector is set | |
| max_dim = 99999 | |
| # Only proceed if format_selector wasn't set in the except block | |
| long_edge = int(max_dim * 1.8) | |
| if not format_selector: | |
| # --- Codec Preference Logic --- | |
| if max_dim <= 1080: | |
| # Prefer H.264 (avc1) for 1080 or lower max dimension | |
| logger.info(f"Attempting H.264 codec for requested quality (max dimension): {max_dim}") | |
| format_selector = f'bestvideo[vcodec^=avc][height<={long_edge}][width<={long_edge}]+bestaudio/best' | |
| #f'bestvideo[height<={long_edge}]/bestvideo[width<={long_edge}]+bestaudio/' | |
| #f'best[height<={long_edge}]/best[width<={long_edge}]' | |
| else: | |
| # For > 1080 max dimension, prioritize best available codec | |
| logger.info(f"Attempting best available codec for requested quality (max dimension): {max_dim}") | |
| format_selector = f'bestvideo[height<={long_edge}][width<={long_edge}]+bestaudio/best' | |
| logger.info(f"Using format selector: '{format_selector}'") | |
| # --- yt-dlp Options for Video Download --- | |
| ydl_opts = { | |
| 'format': format_selector, | |
| 'outtmpl': str(file_path_stem.with_suffix('.%(ext)s')), | |
| 'merge_output_format': 'mp4', # Merge into MP4 container | |
| 'noplaylist': False, | |
| 'quiet': True, | |
| #'verbose': True, | |
| 'noprogress': True | |
| } | |
| if os.path.exists(COOKIE_FILE): | |
| ydl_opts['cookiefile'] = COOKIE_FILE | |
| logger.info("Using cookie file for video download.") | |
| else: | |
| logger.warning(f"Cookie file '{COOKIE_FILE}' not found for video download.") | |
| try: | |
| # Use str(payload.url) to pass the URL string to the helper | |
| final_file_path = perform_download(ydl_opts, str(payload.url), file_path_stem) | |
| final_filename = final_file_path.name | |
| download_url = f"{str(request.base_url).rstrip('/')}/downloads/{final_filename}" | |
| logger.info(f"Video download complete for {payload.url}. URL: {download_url}") | |
| # Changed 'download_url=' to 'url=' | |
| return DownloadResponse(url=download_url, filename=final_filename) | |
| except Exception as e: | |
| # Error logged in perform_download | |
| raise HTTPException(status_code=500, detail=f"Video download failed: {str(e)}") | |
| # --- Optional: Cleanup Task --- | |
| async def cleanup_old_files(directory: Path, max_age_seconds: int): | |
| """Removes files older than max_age_seconds in the background.""" | |
| import time | |
| now = time.time() | |
| count = 0 | |
| try: | |
| for item in directory.iterdir(): | |
| if item.is_file(): | |
| try: | |
| if now - item.stat().st_mtime > max_age_seconds: | |
| os.remove(item) | |
| logger.info(f"Cleaned up old file: {item.name}") | |
| count += 1 | |
| except OSError as e: | |
| logger.error(f"Error removing file {item}: {e}") | |
| if count > 0: | |
| logger.info(f"Background cleanup finished. Removed {count} old files.") | |
| else: | |
| logger.info("Background cleanup finished. No old files found.") | |
| except Exception as e: | |
| logger.error(f"Error during background file cleanup: {e}", exc_info=True) | |
| async def trigger_cleanup(background_tasks: BackgroundTasks): | |
| """Manually trigger a cleanup of files older than 1 day.""" | |
| logger.info("Triggering background cleanup of old download files.") | |
| background_tasks.add_task(cleanup_old_files, DOWNLOAD_DIR, 86400) # 1 day | |
| return {"message": "Background cleanup task scheduled."} | |