|
|
from fastapi import FastAPI, HTTPException |
|
|
from fastapi.responses import FileResponse, JSONResponse |
|
|
from pydantic import BaseModel |
|
|
from pytubefix import YouTube |
|
|
from pytubefix.cli import on_progress |
|
|
from moviepy.audio.io.AudioFileClip import AudioFileClip |
|
|
import os |
|
|
|
|
|
app = FastAPI(title="YouTube Downloader API") |
|
|
|
|
|
BASE_YT_URL = "https://www.youtube.com/watch?v=" |
|
|
|
|
|
def get_full_url(video_id: str) -> str: |
|
|
return f"{BASE_YT_URL}{video_id}" |
|
|
|
|
|
def estimate_compressed_size(original_size_mb, bitrate_factor=0.6): |
|
|
if not original_size_mb: |
|
|
return "?" |
|
|
compressed = original_size_mb * bitrate_factor |
|
|
return f"{round(compressed, 2)} MB (est.)" |
|
|
|
|
|
def convert_to_mp3(temp_path, title, quality): |
|
|
output_dir = "downloads" |
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
mp3_path = os.path.join(output_dir, f"{title} ({quality}).mp3") |
|
|
clip = AudioFileClip(temp_path) |
|
|
bitrate = {"High": "320k", "Medium": "192k", "Low": "128k"}[quality] |
|
|
clip.write_audiofile(mp3_path, bitrate=bitrate) |
|
|
clip.close() |
|
|
os.remove(temp_path) |
|
|
return mp3_path |
|
|
|
|
|
def get_download_options(video_id: str): |
|
|
url = get_full_url(video_id) |
|
|
yt = YouTube(url, on_progress_callback=on_progress) |
|
|
streams = yt.streams.order_by('resolution').desc() |
|
|
choices = [] |
|
|
for s in streams: |
|
|
res = s.resolution or "Audio only" |
|
|
type_ = "video+audio" if s.is_progressive else ( |
|
|
"video only" if s.includes_video_track else "audio only") |
|
|
ext = s.mime_type.split("/")[-1] |
|
|
size_mb = round(s.filesize / 1048576, 2) if s.filesize else None |
|
|
est_size = estimate_compressed_size(size_mb) |
|
|
choices.append({ |
|
|
"label": f"{res} | {type_} | {ext} | {est_size}", |
|
|
"resolution": res, |
|
|
"type": type_, |
|
|
"extension": ext |
|
|
}) |
|
|
choices.append({"label": "Convert to MP3 – High Quality (320kbps)", "type": "mp3_high"}) |
|
|
choices.append({"label": "Convert to MP3 – Medium Quality (192kbps)", "type": "mp3_medium"}) |
|
|
choices.append({"label": "Convert to MP3 – Low Quality (128kbps)", "type": "mp3_low"}) |
|
|
return choices, yt.title |
|
|
|
|
|
|
|
|
|
|
|
class VideoRequest(BaseModel): |
|
|
video_id: str |
|
|
|
|
|
|
|
|
class DownloadRequest(BaseModel): |
|
|
video_id: str |
|
|
choice: str |
|
|
|
|
|
|
|
|
@app.post("/get_choices") |
|
|
def api_get_choices(req: VideoRequest): |
|
|
try: |
|
|
choices, title = get_download_options(req.video_id) |
|
|
return {"title": title, "choices": choices} |
|
|
except Exception as e: |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return JSONResponse(status_code=500, content={"error": str(e)}) |
|
|
|
|
|
|
|
|
@app.post("/download") |
|
|
def api_download(req: DownloadRequest): |
|
|
try: |
|
|
url = get_full_url(req.video_id) |
|
|
output_dir = "downloads" |
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
yt = YouTube(url, on_progress_callback=on_progress) |
|
|
|
|
|
|
|
|
if "mp3" in req.choice.lower(): |
|
|
stream = yt.streams.filter(only_audio=True).first() |
|
|
if not stream: |
|
|
raise HTTPException(status_code=400, detail="No audio stream available") |
|
|
|
|
|
temp_path = stream.download(output_path=output_dir, filename="temp.mp4") |
|
|
|
|
|
if "high" in req.choice.lower(): |
|
|
mp3_path = convert_to_mp3(temp_path, yt.title, "High") |
|
|
elif "medium" in req.choice.lower(): |
|
|
mp3_path = convert_to_mp3(temp_path, yt.title, "Medium") |
|
|
else: |
|
|
mp3_path = convert_to_mp3(temp_path, yt.title, "Low") |
|
|
|
|
|
return FileResponse(mp3_path, filename=os.path.basename(mp3_path)) |
|
|
|
|
|
|
|
|
parts = req.choice.split(" | ") |
|
|
res = parts[0] |
|
|
ext = parts[2] |
|
|
stream = yt.streams.filter(res=res, mime_type=f"video/{ext}").first() or \ |
|
|
yt.streams.filter(res=res).first() |
|
|
|
|
|
if not stream: |
|
|
raise HTTPException(status_code=400, detail="Selected quality not available") |
|
|
|
|
|
output_path = stream.download(output_path=output_dir) |
|
|
return FileResponse(output_path, filename=os.path.basename(output_path)) |
|
|
|
|
|
except Exception as e: |
|
|
return JSONResponse(status_code=500, content={"error": str(e)}) |
|
|
|
|
|
|
|
|
@app.post("/") |
|
|
def home(): |
|
|
return {"message": "YouTube Downloader API running! Use /get_choices and /download with POST JSON body."} |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
uvicorn.run("app:app", host="0.0.0.0", port=7860) |
|
|
|