MalikSahib1's picture
Update app.py
d936599 verified
import os
import uuid
import shutil
from fastapi import FastAPI, Depends, HTTPException, Security, Request
from fastapi.security.api_key import APIKeyHeader
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from sqlalchemy.orm import Session
import yt_dlp
import database
# --- Configuration & Initial Setup ---
ADMIN_API_KEY = os.getenv("ADMIN_API_KEY")
app = FastAPI(title="Secure Video Downloader API", version="1.0")
database.create_db_and_tables()
DOWNLOADS_DIR = "downloads"
if not os.path.exists(DOWNLOADS_DIR):
os.makedirs(DOWNLOADS_DIR)
app.mount(f"/{DOWNLOADS_DIR}", StaticFiles(directory=DOWNLOADS_DIR), name=DOWNLOADS_DIR)
# --- Pydantic Models for Request Bodies ---
class URLRequest(BaseModel):
url: str
class DownloadRequest(BaseModel):
url: str
format_id: str
# ******** YEH NAYA CHANGE HAI ********
class AdminKeyRequest(BaseModel):
admin_key: str
# --- Security & API Key Verification ---
API_KEY_HEADER = APIKeyHeader(name="X-API-Key")
def get_api_key(
api_key_header: str = Security(API_KEY_HEADER),
db: Session = Depends(database.get_db),
):
db_key = db.query(database.ApiKey).filter(database.ApiKey.key == api_key_header).first()
if not db_key or not db_key.is_active:
raise HTTPException(status_code=403, detail="Invalid or inactive API Key")
return db_key
# --- API Endpoints ---
@app.get("/")
def read_root():
return {"message": "Welcome to the Video Downloader API. Use /docs for documentation."}
# ******** YEH ENDPOINT UPDATE KIYA GAYA HAI ********
@app.post("/admin/create-key")
def create_api_key(request: AdminKeyRequest):
"""Admin-only endpoint to create a new API key. Expects admin_key in JSON body."""
if request.admin_key != ADMIN_API_KEY:
raise HTTPException(status_code=403, detail="Unauthorized: Invalid admin key")
db = database.SessionLocal()
try:
new_key_str = str(uuid.uuid4())
new_key_obj = database.ApiKey(key=new_key_str)
db.add(new_key_obj)
db.commit()
db.refresh(new_key_obj)
return {"api_key": new_key_str, "message": "API Key created successfully."}
finally:
db.close()
@app.post("/info", dependencies=[Depends(get_api_key)])
def get_video_info(request: URLRequest):
"""Fetches available formats and video metadata for a given URL."""
ydl_opts = {'quiet': True, 'no_warnings': True}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(request.url, download=False)
formats = []
for f in info.get('formats', []):
filesize_mb = round(f.get('filesize', 0) / (1024*1024), 2) if f.get('filesize') else "N/A"
formats.append({
"format_id": f.get('format_id'),
"resolution": f.get('resolution', 'audio'),
"ext": f.get('ext'),
"note": f.get('format_note', ''),
"filesize_mb": filesize_mb
})
return {"title": info.get('title'), "thumbnail": info.get('thumbnail'), "formats": formats}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/download")
def download_video(
request_data: DownloadRequest,
request: Request,
api_key: database.ApiKey = Depends(get_api_key),
db: Session = Depends(database.get_db),
):
"""Downloads a video with a specific format and returns a direct link."""
shutil.rmtree(DOWNLOADS_DIR, ignore_errors=True)
os.makedirs(DOWNLOADS_DIR)
output_template = os.path.join(DOWNLOADS_DIR, '%(title)s.%(ext)s')
ydl_opts = {
'format': request_data.format_id,
'outtmpl': output_template,
'quiet': True,
'no_warnings': True,
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(request_data.url, download=True)
filename = os.path.basename(ydl.prepare_filename(info))
api_key.usage_count += 1
db.commit()
base_url = str(request.base_url).rstrip('/')
download_url = f"{base_url}/{DOWNLOADS_DIR}/{filename}"
return {"download_url": download_url, "filename": filename, "usage_count": api_key.usage_count}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Download failed: {str(e)}")