import os import uuid import shutil from fastapi import FastAPI, Depends, HTTPException, Security, Request from fastapi.security.api_key import APIKeyHeader from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from sqlalchemy.orm import Session import yt_dlp import database # --- Configuration & Initial Setup --- ADMIN_API_KEY = os.getenv("ADMIN_API_KEY") app = FastAPI(title="Secure Video Downloader API", version="1.0") database.create_db_and_tables() DOWNLOADS_DIR = "downloads" if not os.path.exists(DOWNLOADS_DIR): os.makedirs(DOWNLOADS_DIR) app.mount(f"/{DOWNLOADS_DIR}", StaticFiles(directory=DOWNLOADS_DIR), name=DOWNLOADS_DIR) # --- Pydantic Models for Request Bodies --- class URLRequest(BaseModel): url: str class DownloadRequest(BaseModel): url: str format_id: str # ******** YEH NAYA CHANGE HAI ******** class AdminKeyRequest(BaseModel): admin_key: str # --- Security & API Key Verification --- API_KEY_HEADER = APIKeyHeader(name="X-API-Key") def get_api_key( api_key_header: str = Security(API_KEY_HEADER), db: Session = Depends(database.get_db), ): db_key = db.query(database.ApiKey).filter(database.ApiKey.key == api_key_header).first() if not db_key or not db_key.is_active: raise HTTPException(status_code=403, detail="Invalid or inactive API Key") return db_key # --- API Endpoints --- @app.get("/") def read_root(): return {"message": "Welcome to the Video Downloader API. Use /docs for documentation."} # ******** YEH ENDPOINT UPDATE KIYA GAYA HAI ******** @app.post("/admin/create-key") def create_api_key(request: AdminKeyRequest): """Admin-only endpoint to create a new API key. Expects admin_key in JSON body.""" if request.admin_key != ADMIN_API_KEY: raise HTTPException(status_code=403, detail="Unauthorized: Invalid admin key") db = database.SessionLocal() try: new_key_str = str(uuid.uuid4()) new_key_obj = database.ApiKey(key=new_key_str) db.add(new_key_obj) db.commit() db.refresh(new_key_obj) return {"api_key": new_key_str, "message": "API Key created successfully."} finally: db.close() @app.post("/info", dependencies=[Depends(get_api_key)]) def get_video_info(request: URLRequest): """Fetches available formats and video metadata for a given URL.""" ydl_opts = {'quiet': True, 'no_warnings': True} try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(request.url, download=False) formats = [] for f in info.get('formats', []): filesize_mb = round(f.get('filesize', 0) / (1024*1024), 2) if f.get('filesize') else "N/A" formats.append({ "format_id": f.get('format_id'), "resolution": f.get('resolution', 'audio'), "ext": f.get('ext'), "note": f.get('format_note', ''), "filesize_mb": filesize_mb }) return {"title": info.get('title'), "thumbnail": info.get('thumbnail'), "formats": formats} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/download") def download_video( request_data: DownloadRequest, request: Request, api_key: database.ApiKey = Depends(get_api_key), db: Session = Depends(database.get_db), ): """Downloads a video with a specific format and returns a direct link.""" shutil.rmtree(DOWNLOADS_DIR, ignore_errors=True) os.makedirs(DOWNLOADS_DIR) output_template = os.path.join(DOWNLOADS_DIR, '%(title)s.%(ext)s') ydl_opts = { 'format': request_data.format_id, 'outtmpl': output_template, 'quiet': True, 'no_warnings': True, } try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(request_data.url, download=True) filename = os.path.basename(ydl.prepare_filename(info)) api_key.usage_count += 1 db.commit() base_url = str(request.base_url).rstrip('/') download_url = f"{base_url}/{DOWNLOADS_DIR}/{filename}" return {"download_url": download_url, "filename": filename, "usage_count": api_key.usage_count} except Exception as e: raise HTTPException(status_code=500, detail=f"Download failed: {str(e)}")