File size: 4,471 Bytes
50bca25 55e47e9 50bca25 0838a22 5a82288 dad9df4 37b4a51 55e47e9 70564c8 5a82288 dad9df4 55e47e9 50bca25 5a82288 50bca25 5a82288 70564c8 c2b1385 55e47e9 50bca25 55e47e9 50bca25 55e47e9 50bca25 3b4c543 50bca25 55e47e9 70564c8 65813e8 55e47e9 70564c8 50bca25 70564c8 50bca25 55e47e9 5a82288 55e47e9 50bca25 55e47e9 50bca25 55e47e9 50bca25 55e47e9 50bca25 55e47e9 50bca25 55e47e9 50bca25 0838a22 50bca25 55e47e9 5a82288 3b4c543 55e47e9 70564c8 50bca25 55e47e9 50bca25 37b4a51 70564c8 55e47e9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse, JSONResponse
from pydantic import BaseModel
from pytubefix import YouTube
from pytubefix.cli import on_progress
from moviepy.audio.io.AudioFileClip import AudioFileClip
import os
app = FastAPI(title="YouTube Downloader API")
BASE_YT_URL = "https://www.youtube.com/watch?v="
def get_full_url(video_id: str) -> str:
return f"{BASE_YT_URL}{video_id}"
def estimate_compressed_size(original_size_mb, bitrate_factor=0.6):
if not original_size_mb:
return "?"
compressed = original_size_mb * bitrate_factor
return f"{round(compressed, 2)} MB (est.)"
def convert_to_mp3(temp_path, title, quality):
output_dir = "downloads"
os.makedirs(output_dir, exist_ok=True)
mp3_path = os.path.join(output_dir, f"{title} ({quality}).mp3")
clip = AudioFileClip(temp_path)
bitrate = {"High": "320k", "Medium": "192k", "Low": "128k"}[quality]
clip.write_audiofile(mp3_path, bitrate=bitrate)
clip.close()
os.remove(temp_path)
return mp3_path
def get_download_options(video_id: str):
url = get_full_url(video_id)
yt = YouTube(url, on_progress_callback=on_progress)
streams = yt.streams.order_by('resolution').desc()
choices = []
for s in streams:
res = s.resolution or "Audio only"
type_ = "video+audio" if s.is_progressive else (
"video only" if s.includes_video_track else "audio only")
ext = s.mime_type.split("/")[-1]
size_mb = round(s.filesize / 1048576, 2) if s.filesize else None
est_size = estimate_compressed_size(size_mb)
choices.append({
"label": f"{res} | {type_} | {ext} | {est_size}",
"resolution": res,
"type": type_,
"extension": ext
})
choices.append({"label": "Convert to MP3 – High Quality (320kbps)", "type": "mp3_high"})
choices.append({"label": "Convert to MP3 – Medium Quality (192kbps)", "type": "mp3_medium"})
choices.append({"label": "Convert to MP3 – Low Quality (128kbps)", "type": "mp3_low"})
return choices, yt.title
# ---------- Request Schemas ----------
class VideoRequest(BaseModel):
video_id: str
class DownloadRequest(BaseModel):
video_id: str
choice: str
@app.post("/get_choices")
def api_get_choices(req: VideoRequest):
try:
choices, title = get_download_options(req.video_id)
return {"title": title, "choices": choices}
except Exception as e:
import traceback
traceback.print_exc()
return JSONResponse(status_code=500, content={"error": str(e)})
@app.post("/download")
def api_download(req: DownloadRequest):
try:
url = get_full_url(req.video_id)
output_dir = "downloads"
os.makedirs(output_dir, exist_ok=True)
yt = YouTube(url, on_progress_callback=on_progress)
# MP3 Download
if "mp3" in req.choice.lower():
stream = yt.streams.filter(only_audio=True).first()
if not stream:
raise HTTPException(status_code=400, detail="No audio stream available")
temp_path = stream.download(output_path=output_dir, filename="temp.mp4")
if "high" in req.choice.lower():
mp3_path = convert_to_mp3(temp_path, yt.title, "High")
elif "medium" in req.choice.lower():
mp3_path = convert_to_mp3(temp_path, yt.title, "Medium")
else:
mp3_path = convert_to_mp3(temp_path, yt.title, "Low")
return FileResponse(mp3_path, filename=os.path.basename(mp3_path))
# MP4 Download
parts = req.choice.split(" | ")
res = parts[0]
ext = parts[2]
stream = yt.streams.filter(res=res, mime_type=f"video/{ext}").first() or \
yt.streams.filter(res=res).first()
if not stream:
raise HTTPException(status_code=400, detail="Selected quality not available")
output_path = stream.download(output_path=output_dir)
return FileResponse(output_path, filename=os.path.basename(output_path))
except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e)})
@app.post("/")
def home():
return {"message": "YouTube Downloader API running! Use /get_choices and /download with POST JSON body."}
if __name__ == "__main__":
import uvicorn
uvicorn.run("app:app", host="0.0.0.0", port=7860)
|