File size: 4,471 Bytes
50bca25
55e47e9
50bca25
0838a22
 
5a82288
dad9df4
37b4a51
55e47e9
70564c8
5a82288
 
 
 
dad9df4
55e47e9
 
 
 
50bca25
5a82288
 
 
 
 
 
50bca25
5a82288
 
 
 
70564c8
c2b1385
 
55e47e9
 
 
 
 
50bca25
 
55e47e9
 
 
 
 
 
 
 
 
50bca25
 
 
55e47e9
 
50bca25
 
 
 
 
 
 
 
 
 
 
 
 
3b4c543
50bca25
55e47e9
70564c8
65813e8
 
55e47e9
70564c8
50bca25
 
 
70564c8
50bca25
55e47e9
 
5a82288
55e47e9
50bca25
 
55e47e9
 
50bca25
 
55e47e9
50bca25
 
55e47e9
50bca25
55e47e9
 
 
50bca25
55e47e9
 
50bca25
 
 
 
 
 
 
0838a22
50bca25
 
55e47e9
 
5a82288
3b4c543
55e47e9
70564c8
50bca25
 
55e47e9
50bca25
 
37b4a51
 
70564c8
55e47e9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse, JSONResponse
from pydantic import BaseModel
from pytubefix import YouTube
from pytubefix.cli import on_progress
from moviepy.audio.io.AudioFileClip import AudioFileClip
import os

app = FastAPI(title="YouTube Downloader API")

BASE_YT_URL = "https://www.youtube.com/watch?v="

def get_full_url(video_id: str) -> str:
    return f"{BASE_YT_URL}{video_id}"

def estimate_compressed_size(original_size_mb, bitrate_factor=0.6):
    if not original_size_mb:
        return "?"
    compressed = original_size_mb * bitrate_factor
    return f"{round(compressed, 2)} MB (est.)"

def convert_to_mp3(temp_path, title, quality):
    output_dir = "downloads"
    os.makedirs(output_dir, exist_ok=True)
    mp3_path = os.path.join(output_dir, f"{title} ({quality}).mp3")
    clip = AudioFileClip(temp_path)
    bitrate = {"High": "320k", "Medium": "192k", "Low": "128k"}[quality]
    clip.write_audiofile(mp3_path, bitrate=bitrate)
    clip.close()
    os.remove(temp_path)
    return mp3_path

def get_download_options(video_id: str):
    url = get_full_url(video_id)
    yt = YouTube(url, on_progress_callback=on_progress)
    streams = yt.streams.order_by('resolution').desc()
    choices = []
    for s in streams:
        res = s.resolution or "Audio only"
        type_ = "video+audio" if s.is_progressive else (
            "video only" if s.includes_video_track else "audio only")
        ext = s.mime_type.split("/")[-1]
        size_mb = round(s.filesize / 1048576, 2) if s.filesize else None
        est_size = estimate_compressed_size(size_mb)
        choices.append({
            "label": f"{res} | {type_} | {ext} | {est_size}",
            "resolution": res,
            "type": type_,
            "extension": ext
        })
    choices.append({"label": "Convert to MP3 – High Quality (320kbps)", "type": "mp3_high"})
    choices.append({"label": "Convert to MP3 – Medium Quality (192kbps)", "type": "mp3_medium"})
    choices.append({"label": "Convert to MP3 – Low Quality (128kbps)", "type": "mp3_low"})
    return choices, yt.title


# ---------- Request Schemas ----------
class VideoRequest(BaseModel):
    video_id: str


class DownloadRequest(BaseModel):
    video_id: str
    choice: str


@app.post("/get_choices")
def api_get_choices(req: VideoRequest):
    try:
        choices, title = get_download_options(req.video_id)
        return {"title": title, "choices": choices}
    except Exception as e:
        import traceback
        traceback.print_exc()
        return JSONResponse(status_code=500, content={"error": str(e)})


@app.post("/download")
def api_download(req: DownloadRequest):
    try:
        url = get_full_url(req.video_id)
        output_dir = "downloads"
        os.makedirs(output_dir, exist_ok=True)
        yt = YouTube(url, on_progress_callback=on_progress)

        # MP3 Download
        if "mp3" in req.choice.lower():
            stream = yt.streams.filter(only_audio=True).first()
            if not stream:
                raise HTTPException(status_code=400, detail="No audio stream available")

            temp_path = stream.download(output_path=output_dir, filename="temp.mp4")

            if "high" in req.choice.lower():
                mp3_path = convert_to_mp3(temp_path, yt.title, "High")
            elif "medium" in req.choice.lower():
                mp3_path = convert_to_mp3(temp_path, yt.title, "Medium")
            else:
                mp3_path = convert_to_mp3(temp_path, yt.title, "Low")

            return FileResponse(mp3_path, filename=os.path.basename(mp3_path))

        # MP4 Download
        parts = req.choice.split(" | ")
        res = parts[0]
        ext = parts[2]
        stream = yt.streams.filter(res=res, mime_type=f"video/{ext}").first() or \
                 yt.streams.filter(res=res).first()

        if not stream:
            raise HTTPException(status_code=400, detail="Selected quality not available")

        output_path = stream.download(output_path=output_dir)
        return FileResponse(output_path, filename=os.path.basename(output_path))

    except Exception as e:
        return JSONResponse(status_code=500, content={"error": str(e)})


@app.post("/")
def home():
    return {"message": "YouTube Downloader API running! Use /get_choices and /download with POST JSON body."}


if __name__ == "__main__":
    import uvicorn
    uvicorn.run("app:app", host="0.0.0.0", port=7860)