|
|
import yt_dlp |
|
|
import requests |
|
|
import tempfile |
|
|
import os |
|
|
import json |
|
|
import time |
|
|
from fastapi import FastAPI, Query |
|
|
from fastapi.responses import JSONResponse, FileResponse |
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
def process(url, audio=False): |
|
|
tmpdir = tempfile.mkdtemp() |
|
|
filename = os.path.join(tmpdir, "%(title)s.%(ext)s") |
|
|
ydl_opts = { |
|
|
"outtmpl": filename, |
|
|
"quiet": True, |
|
|
"format": "bestaudio/best" if audio else "bestvideo+bestaudio/best", |
|
|
"postprocessors": [ |
|
|
{"key": "FFmpegExtractAudio", "preferredcodec": "mp3", "preferredquality": "192"} |
|
|
] if audio else [ |
|
|
{"key": "FFmpegVideoConvertor", "preferedformat": "mp4"} |
|
|
], |
|
|
} |
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
|
info = ydl.extract_info(url, download=True) |
|
|
final_file = ydl.prepare_filename(info) |
|
|
if audio: |
|
|
final_file = os.path.splitext(final_file)[0] + ".mp3" |
|
|
else: |
|
|
final_file = os.path.splitext(final_file)[0] + ".mp4" |
|
|
video_id = info.get("id") |
|
|
dislikes = {} |
|
|
try: |
|
|
r = requests.get(f"https://returnyoutubedislikeapi.com/votes?videoId={video_id}") |
|
|
dislikes = r.json() |
|
|
except: |
|
|
pass |
|
|
metadata = { |
|
|
"title": info.get("title"), |
|
|
"uploader": info.get("uploader"), |
|
|
"duration": info.get("duration"), |
|
|
"view_count": info.get("view_count"), |
|
|
"like_count": info.get("like_count"), |
|
|
"dislikes": dislikes.get("dislikes"), |
|
|
"id": video_id, |
|
|
"ext": "mp3" if audio else "mp4", |
|
|
"download_path": final_file |
|
|
} |
|
|
return metadata |
|
|
|
|
|
@app.get("/download") |
|
|
async def download(url: str = Query(...), type: str = Query("video")): |
|
|
audio = type == "audio" |
|
|
meta = process(url, audio) |
|
|
return JSONResponse(meta) |
|
|
|
|
|
@app.get("/file") |
|
|
async def file(path: str): |
|
|
if os.path.exists(path): |
|
|
resp = FileResponse(path, filename=os.path.basename(path)) |
|
|
def cleanup(): |
|
|
time.sleep(2) |
|
|
if os.path.exists(path): |
|
|
os.remove(path) |
|
|
os.rmdir(os.path.dirname(path)) |
|
|
import threading |
|
|
threading.Thread(target=cleanup).start() |
|
|
return resp |
|
|
return JSONResponse({"error": "file not found"}, status_code=404) |