Spaces:
Sleeping
Sleeping
| import os | |
| import uuid | |
| import shutil | |
| from fastapi import FastAPI, Depends, HTTPException, Security, Request | |
| from fastapi.security.api_key import APIKeyHeader | |
| from fastapi.staticfiles import StaticFiles | |
| from pydantic import BaseModel | |
| from sqlalchemy.orm import Session | |
| import yt_dlp | |
| import database | |
| # --- Configuration & Initial Setup --- | |
| ADMIN_API_KEY = os.getenv("ADMIN_API_KEY") | |
| app = FastAPI(title="Secure Video Downloader API", version="1.0") | |
| database.create_db_and_tables() | |
| DOWNLOADS_DIR = "downloads" | |
| if not os.path.exists(DOWNLOADS_DIR): | |
| os.makedirs(DOWNLOADS_DIR) | |
| app.mount(f"/{DOWNLOADS_DIR}", StaticFiles(directory=DOWNLOADS_DIR), name=DOWNLOADS_DIR) | |
| # --- Pydantic Models for Request Bodies --- | |
| class URLRequest(BaseModel): | |
| url: str | |
| class DownloadRequest(BaseModel): | |
| url: str | |
| format_id: str | |
| # ******** YEH NAYA CHANGE HAI ******** | |
| class AdminKeyRequest(BaseModel): | |
| admin_key: str | |
| # --- Security & API Key Verification --- | |
| API_KEY_HEADER = APIKeyHeader(name="X-API-Key") | |
| def get_api_key( | |
| api_key_header: str = Security(API_KEY_HEADER), | |
| db: Session = Depends(database.get_db), | |
| ): | |
| db_key = db.query(database.ApiKey).filter(database.ApiKey.key == api_key_header).first() | |
| if not db_key or not db_key.is_active: | |
| raise HTTPException(status_code=403, detail="Invalid or inactive API Key") | |
| return db_key | |
| # --- API Endpoints --- | |
| def read_root(): | |
| return {"message": "Welcome to the Video Downloader API. Use /docs for documentation."} | |
| # ******** YEH ENDPOINT UPDATE KIYA GAYA HAI ******** | |
| def create_api_key(request: AdminKeyRequest): | |
| """Admin-only endpoint to create a new API key. Expects admin_key in JSON body.""" | |
| if request.admin_key != ADMIN_API_KEY: | |
| raise HTTPException(status_code=403, detail="Unauthorized: Invalid admin key") | |
| db = database.SessionLocal() | |
| try: | |
| new_key_str = str(uuid.uuid4()) | |
| new_key_obj = database.ApiKey(key=new_key_str) | |
| db.add(new_key_obj) | |
| db.commit() | |
| db.refresh(new_key_obj) | |
| return {"api_key": new_key_str, "message": "API Key created successfully."} | |
| finally: | |
| db.close() | |
| def get_video_info(request: URLRequest): | |
| """Fetches available formats and video metadata for a given URL.""" | |
| ydl_opts = {'quiet': True, 'no_warnings': True} | |
| try: | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(request.url, download=False) | |
| formats = [] | |
| for f in info.get('formats', []): | |
| filesize_mb = round(f.get('filesize', 0) / (1024*1024), 2) if f.get('filesize') else "N/A" | |
| formats.append({ | |
| "format_id": f.get('format_id'), | |
| "resolution": f.get('resolution', 'audio'), | |
| "ext": f.get('ext'), | |
| "note": f.get('format_note', ''), | |
| "filesize_mb": filesize_mb | |
| }) | |
| return {"title": info.get('title'), "thumbnail": info.get('thumbnail'), "formats": formats} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def download_video( | |
| request_data: DownloadRequest, | |
| request: Request, | |
| api_key: database.ApiKey = Depends(get_api_key), | |
| db: Session = Depends(database.get_db), | |
| ): | |
| """Downloads a video with a specific format and returns a direct link.""" | |
| shutil.rmtree(DOWNLOADS_DIR, ignore_errors=True) | |
| os.makedirs(DOWNLOADS_DIR) | |
| output_template = os.path.join(DOWNLOADS_DIR, '%(title)s.%(ext)s') | |
| ydl_opts = { | |
| 'format': request_data.format_id, | |
| 'outtmpl': output_template, | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| } | |
| try: | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(request_data.url, download=True) | |
| filename = os.path.basename(ydl.prepare_filename(info)) | |
| api_key.usage_count += 1 | |
| db.commit() | |
| base_url = str(request.base_url).rstrip('/') | |
| download_url = f"{base_url}/{DOWNLOADS_DIR}/{filename}" | |
| return {"download_url": download_url, "filename": filename, "usage_count": api_key.usage_count} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Download failed: {str(e)}") | |