ytdlp / main.py
Rianofc's picture
Upload 4 files
156d0d9 verified
from fastapi import FastAPI, Request, Query
from fastapi.responses import FileResponse, JSONResponse, StreamingResponse
import yt_dlp
import os
import subprocess
from datetime import datetime, timedelta
import asyncio
from urllib.parse import quote, unquote
import json
import io
app = FastAPI(title="YouTube dan Spotify Downloader API", description="API untuk mengunduh video dan audio dari YouTube dan Spotify.", version="1.0.0")
OUTPUT_DIR = "output"
SPOTIFY_OUTPUT_DIR = "spotify_output"
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(SPOTIFY_OUTPUT_DIR, exist_ok=True)
COOKIES_FILE = "yt.txt"
async def delete_file_after_delay(file_path: str, delay: int = 600):
await asyncio.sleep(delay)
try:
os.remove(file_path)
print(f"File {file_path} telah dihapus setelah {delay} detik.")
except Exception as e:
print(f"Gagal menghapus file {file_path}: {e}")
@app.middleware("http")
async def log_requests(request: Request, call_next):
response = await call_next(request)
log_message = (
f"IP: {request.client.host}, "
f"Time: {datetime.now()}, "
f"URL: {request.url}, "
f"Status: {response.status_code}, "
f"Method: {request.method}"
)
print(log_message)
return response
@app.get("/", summary="Root Endpoint", description="Menampilkan halaman index.html.")
async def root():
html_path = "/root/ytnew/index.html"
if os.path.exists(html_path):
return FileResponse(html_path)
else:
return {"error": "index.html file not found"}
@app.get("/search/", summary="Pencarian Video YouTube", description="Mencari video YouTube berdasarkan kata kunci.")
async def search_video(query: str = Query(..., description="Kata kunci pencarian untuk video YouTube")):
try:
ydl_opts = {'quiet': True, 'cookiefile': COOKIES_FILE}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
search_result = ydl.extract_info(f"ytsearch5:{query}", download=False)
videos = [{"title": v["title"], "url": v["webpage_url"], "id": v["id"]} for v in search_result['entries']]
print(f"Endpoint: search, Query: {query}, Status: Success")
return {"results": videos}
except Exception as e:
print(f"Endpoint: search, Query: {query}, Error: {str(e)}")
return JSONResponse(status_code=500, content={"error": str(e)})
@app.get("/info/", summary="Informasi Video YouTube", description="Mendapatkan informasi tentang video YouTube berdasarkan URL.")
async def get_info(url: str = Query(..., description="URL video YouTube")):
try:
ydl_opts = {'quiet': True, 'cookiefile': COOKIES_FILE}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
formats = info.get('formats', [])
resolutions = []
for fmt in formats:
if fmt.get('vcodec') != 'none':
resolution = f"{fmt['height']}p" if fmt.get('height') else "Unknown"
resolutions.append({
"resolution": resolution,
"ext": fmt.get('ext', 'Unknown'),
"size": fmt.get('filesize_approx', 'Unknown')
})
print(f"Endpoint: info, URL: {url}, Resolutions: {resolutions}")
return {
"title": info['title'],
"duration": info['duration'],
"views": info.get('view_count', 'N/A'),
"resolutions": resolutions,
"thumbnail": info.get('thumbnail'),
}
except Exception as e:
print(f"Endpoint: info, URL: {url}, Error: {str(e)}")
return JSONResponse(status_code=500, content={"error": str(e)})
@app.get("/download/", summary="Unduhan Video YouTube", description="Mengunduh video YouTube.")
async def download_video(
url: str = Query(..., description="URL video YouTube"),
resolution: int = Query(720, description="Resolusi video yang diinginkan (misalnya, 720, 1080)"),
mode: str = Query("url", description="Mode unduhan: 'url' atau 'buffer'")
):
try:
ydl_opts_info = {'quiet': True, 'cookiefile': COOKIES_FILE}
with yt_dlp.YoutubeDL(ydl_opts_info) as ydl_info:
info = ydl_info.extract_info(url, download=False)
sanitized_title = "".join(c if c.isalnum() or c in [' ', '.', '_'] else "_" for c in info['title'])
file_name = f"{sanitized_title}.mp4"
final_path = os.path.join(OUTPUT_DIR, file_name)
video_path = os.path.join(OUTPUT_DIR, "video.mp4")
audio_path = os.path.join(OUTPUT_DIR, "audio.m4a")
ydl_opts_video = {
'format': f'bestvideo[height={resolution}]',
'outtmpl': video_path
}
ydl_opts_audio = {
'format': 'bestaudio',
'outtmpl': audio_path
}
with yt_dlp.YoutubeDL(ydl_opts_video) as video_ydl:
video_ydl.download([url])
with yt_dlp.YoutubeDL(ydl_opts_audio) as audio_ydl:
audio_ydl.download([url])
subprocess.run([
'ffmpeg', '-y', '-i', video_path, '-i', audio_path,
'-c:v', 'copy', '-c:a', 'aac', '-strict', 'experimental',
final_path
], check=True)
os.remove(video_path)
os.remove(audio_path)
if mode == "url":
return {
"title": info['title'],
"thumbnail": info['thumbnail'],
"resolution": resolution,
"filesize": os.path.getsize(final_path),
"author": "nauval",
"download_url": f"https://ytdlpyton.nvlgroup.my.id/download/file/{quote(file_name)}"
}
elif mode == "buffer":
with open(final_path, "rb") as f:
video_buffer = io.BytesIO(f.read())
return StreamingResponse(video_buffer, media_type="video/mp4", headers={"Content-Disposition": f"attachment; filename={file_name}"})
else:
return JSONResponse(status_code=400, content={"error": "Invalid download mode. Use 'url' or 'buffer'."})
except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e)})
@app.get("/download/file/{filename}", summary="Unduhan File Video", description="Mengunduh file video dari server.")
async def download_file(filename: str):
file_path = os.path.join(OUTPUT_DIR, filename)
if os.path.exists(file_path):
return FileResponse(file_path, media_type="video/mp4", filename=unquote(filename))
else:
return JSONResponse(status_code=404, content={"error": "File not found"})
@app.get("/spotify/download/", summary="Unduhan Lagu Spotify", description="Mengunduh lagu dari Spotify.")
async def spotify_download(url: str = Query(..., description="URL lagu Spotify")):
try:
subprocess.run(['spotdl', 'download', url, '--output', SPOTIFY_OUTPUT_DIR], check=True)
file_name = os.listdir(SPOTIFY_OUTPUT_DIR)[0]
file_path = os.path.join(SPOTIFY_OUTPUT_DIR, file_name)
download_url = f"https://ytdlpyton.nvlgroup.my.id/download/file/{quote(file_name)}"
return {"download_url": download_url}
except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e)})
from fastapi import BackgroundTasks
@app.get("/download/audio/", summary="Unduhan Audio YouTube", description="Mengunduh audio dari video YouTube.")
async def download_audio(
url: str = Query(..., description="URL video YouTube"),
mode: str = Query("url", description="Mode unduhan: 'url' atau 'buffer'"),
background_tasks: BackgroundTasks = BackgroundTasks
):
try:
ydl_opts = {
'outtmpl': os.path.join(OUTPUT_DIR, '%(title)s_downloadbynauval.%(ext)s'),
'format': 'bestaudio/best',
'cookiefile': COOKIES_FILE,
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
output_filename = f"{info['title']}_downloadbynauval.mp3"
file_path = os.path.join(OUTPUT_DIR, output_filename)
if not os.path.exists(file_path):
raise FileNotFoundError("File hasil konversi tidak ditemukan.")
asyncio.create_task(delete_file_after_delay(file_path))
if mode == "url":
return {
"title": info['title'],
"thumbnail": info['thumbnail'],
"filesize": os.path.getsize(file_path),
"author": "nauval",
"download_url": f"https://ytdlpyton.nvlgroup.my.id/download/file/{quote(output_filename)}"
}
elif mode == "buffer":
with open(file_path, "rb") as f:
audio_buffer = io.BytesIO(f.read())
return StreamingResponse(audio_buffer, media_type="audio/mp3", headers={"Content-Disposition": f"attachment; filename={output_filename}"})
else:
return JSONResponse(status_code=400, content={"error": "Invalid download mode. Use 'url' or 'buffer'."})
except Exception as e:
import traceback
print(f"Endpoint: download/audio, URL: {url}, Error: {traceback.format_exc()}")
return JSONResponse(status_code=500, content={"error": str(e)})