train / app.py
saeid1999's picture
Update app.py
65813e8 verified
from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse, JSONResponse
from pydantic import BaseModel
from pytubefix import YouTube
from pytubefix.cli import on_progress
from moviepy.audio.io.AudioFileClip import AudioFileClip
import os
app = FastAPI(title="YouTube Downloader API")
BASE_YT_URL = "https://www.youtube.com/watch?v="
def get_full_url(video_id: str) -> str:
return f"{BASE_YT_URL}{video_id}"
def estimate_compressed_size(original_size_mb, bitrate_factor=0.6):
if not original_size_mb:
return "?"
compressed = original_size_mb * bitrate_factor
return f"{round(compressed, 2)} MB (est.)"
def convert_to_mp3(temp_path, title, quality):
output_dir = "downloads"
os.makedirs(output_dir, exist_ok=True)
mp3_path = os.path.join(output_dir, f"{title} ({quality}).mp3")
clip = AudioFileClip(temp_path)
bitrate = {"High": "320k", "Medium": "192k", "Low": "128k"}[quality]
clip.write_audiofile(mp3_path, bitrate=bitrate)
clip.close()
os.remove(temp_path)
return mp3_path
def get_download_options(video_id: str):
url = get_full_url(video_id)
yt = YouTube(url, on_progress_callback=on_progress)
streams = yt.streams.order_by('resolution').desc()
choices = []
for s in streams:
res = s.resolution or "Audio only"
type_ = "video+audio" if s.is_progressive else (
"video only" if s.includes_video_track else "audio only")
ext = s.mime_type.split("/")[-1]
size_mb = round(s.filesize / 1048576, 2) if s.filesize else None
est_size = estimate_compressed_size(size_mb)
choices.append({
"label": f"{res} | {type_} | {ext} | {est_size}",
"resolution": res,
"type": type_,
"extension": ext
})
choices.append({"label": "Convert to MP3 – High Quality (320kbps)", "type": "mp3_high"})
choices.append({"label": "Convert to MP3 – Medium Quality (192kbps)", "type": "mp3_medium"})
choices.append({"label": "Convert to MP3 – Low Quality (128kbps)", "type": "mp3_low"})
return choices, yt.title
# ---------- Request Schemas ----------
class VideoRequest(BaseModel):
video_id: str
class DownloadRequest(BaseModel):
video_id: str
choice: str
@app.post("/get_choices")
def api_get_choices(req: VideoRequest):
try:
choices, title = get_download_options(req.video_id)
return {"title": title, "choices": choices}
except Exception as e:
import traceback
traceback.print_exc()
return JSONResponse(status_code=500, content={"error": str(e)})
@app.post("/download")
def api_download(req: DownloadRequest):
try:
url = get_full_url(req.video_id)
output_dir = "downloads"
os.makedirs(output_dir, exist_ok=True)
yt = YouTube(url, on_progress_callback=on_progress)
# MP3 Download
if "mp3" in req.choice.lower():
stream = yt.streams.filter(only_audio=True).first()
if not stream:
raise HTTPException(status_code=400, detail="No audio stream available")
temp_path = stream.download(output_path=output_dir, filename="temp.mp4")
if "high" in req.choice.lower():
mp3_path = convert_to_mp3(temp_path, yt.title, "High")
elif "medium" in req.choice.lower():
mp3_path = convert_to_mp3(temp_path, yt.title, "Medium")
else:
mp3_path = convert_to_mp3(temp_path, yt.title, "Low")
return FileResponse(mp3_path, filename=os.path.basename(mp3_path))
# MP4 Download
parts = req.choice.split(" | ")
res = parts[0]
ext = parts[2]
stream = yt.streams.filter(res=res, mime_type=f"video/{ext}").first() or \
yt.streams.filter(res=res).first()
if not stream:
raise HTTPException(status_code=400, detail="Selected quality not available")
output_path = stream.download(output_path=output_dir)
return FileResponse(output_path, filename=os.path.basename(output_path))
except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e)})
@app.post("/")
def home():
return {"message": "YouTube Downloader API running! Use /get_choices and /download with POST JSON body."}
if __name__ == "__main__":
import uvicorn
uvicorn.run("app:app", host="0.0.0.0", port=7860)