|
|
import shutil |
|
|
from fastapi import FastAPI, HTTPException, Request, Body |
|
|
from deezspot.deezloader import DeeLogin |
|
|
from deezspot.spotloader import SpoLogin |
|
|
import requests |
|
|
import os |
|
|
import logging |
|
|
from typing import Optional |
|
|
from fastapi.staticfiles import StaticFiles |
|
|
from dotenv import load_dotenv |
|
|
from pydantic import BaseModel, Field, HttpUrl |
|
|
from urllib.parse import quote |
|
|
import uuid |
|
|
import gc |
|
|
from typing import Literal |
|
|
import urllib.parse |
|
|
from fastapi.responses import JSONResponse |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
app = FastAPI(title="Deezer API") |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
os.makedirs("downloads", exist_ok=True) |
|
|
app.mount("/downloads", StaticFiles(directory="downloads"), name="downloads") |
|
|
|
|
|
|
|
|
DEEZER_API_URL = "https://api.deezer.com" |
|
|
|
|
|
|
|
|
ARL_TOKEN = os.getenv('ARL') |
|
|
|
|
|
|
|
|
class DownloadRequest(BaseModel): |
|
|
url: str |
|
|
quality: str |
|
|
arl: str |
|
|
|
|
|
|
|
|
def convert_deezer_short_link_async(short_link: str) -> str: |
|
|
try: |
|
|
response = requests.get(short_link, allow_redirects=True) |
|
|
return response.url |
|
|
except requests.RequestException as e: |
|
|
print(f"An error occurred: {e}") |
|
|
return "" |
|
|
|
|
|
|
|
|
@app.middleware("http") |
|
|
async def log_errors_middleware(request: Request, call_next): |
|
|
try: |
|
|
return await call_next(request) |
|
|
except Exception as e: |
|
|
logger.error("An unhandled exception occurred!", exc_info=True) |
|
|
return JSONResponse(status_code=500, content={'detail': 'Internal Server Error'}) |
|
|
|
|
|
|
|
|
@app.get("/") |
|
|
def read_root(): |
|
|
return {"message": "running"} |
|
|
|
|
|
|
|
|
|
|
|
def get_track_info(track_id: str): |
|
|
try: |
|
|
response = requests.get(f"{DEEZER_API_URL}/track/{track_id}") |
|
|
if response.status_code != 200: |
|
|
raise HTTPException(status_code=404, detail="Track not found") |
|
|
return response.json() |
|
|
except requests.exceptions.RequestException as e: |
|
|
logger.error(f"Network error fetching track metadata: {e}") |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
except Exception as e: |
|
|
logger.error(f"Error fetching track metadata: {e}") |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
|
|
|
|
@app.get("/track/{track_id}") |
|
|
def get_track(track_id: str): |
|
|
return get_track_info(track_id) |
|
|
|
|
|
|
|
|
|
|
|
@app.post("/download/track") |
|
|
def download_track(request: Request, download_request: DownloadRequest): |
|
|
try: |
|
|
|
|
|
if download_request.arl is None or download_request.arl.strip() == "": |
|
|
ARL = ARL_TOKEN |
|
|
else: |
|
|
ARL = download_request.arl |
|
|
logger.info(f'arl: {ARL}') |
|
|
url = download_request.url |
|
|
if 'dzr.page' in url or 'deezer.page' in url or 'link.deezer' in url: |
|
|
url = convert_deezer_short_link_async(url) |
|
|
quality = download_request.quality |
|
|
dl = DeeLogin(arl=ARL) |
|
|
logger.info(f'track_url: {url}') |
|
|
|
|
|
if quality not in ["MP3_320", "MP3_128", "FLAC"]: |
|
|
raise HTTPException(status_code=400, detail="Invalid quality specified") |
|
|
|
|
|
|
|
|
track_id = url.split("/")[-1] |
|
|
|
|
|
|
|
|
track_info = get_track_info(track_id) |
|
|
track_link = track_info.get("link") |
|
|
if not track_link: |
|
|
raise HTTPException(status_code=404, detail="Track link not found") |
|
|
|
|
|
|
|
|
track_title = track_info.get("title", "track") |
|
|
artist_name = track_info.get("artist", {}).get("name", "unknown") |
|
|
file_extension = "flac" if quality == "FLAC" else "mp3" |
|
|
|
|
|
|
|
|
for root, dirs, files in os.walk("downloads"): |
|
|
for file in files: |
|
|
os.remove(os.path.join(root, file)) |
|
|
for dir in dirs: |
|
|
shutil.rmtree(os.path.join(root, dir)) |
|
|
|
|
|
|
|
|
dl.download_trackdee( |
|
|
link_track=track_link, |
|
|
output_dir="downloads", |
|
|
quality_download=quality, |
|
|
recursive_quality=False, |
|
|
recursive_download=False |
|
|
) |
|
|
|
|
|
|
|
|
filepath = None |
|
|
for root, dirs, files in os.walk("downloads"): |
|
|
for file in files: |
|
|
if file.endswith(f'.{file_extension}'): |
|
|
filepath = os.path.join(root, file) |
|
|
break |
|
|
if filepath: |
|
|
break |
|
|
|
|
|
if not filepath: |
|
|
raise HTTPException(status_code=500, detail=f"{file_extension} file not found after download") |
|
|
|
|
|
file_size = os.path.getsize(filepath) |
|
|
logger.info(f"Downloaded file size: {file_size} bytes") |
|
|
|
|
|
|
|
|
relative_path = quote(str(os.path.relpath(filepath, "downloads"))) |
|
|
base_url = str(request.base_url).rstrip('/') |
|
|
|
|
|
if base_url.startswith("http://"): |
|
|
base_url = base_url.replace("http://", "https://", 1) |
|
|
|
|
|
download_url = f"{base_url}/downloads/{relative_path}" |
|
|
logger.info(f"Download successful: {download_url}") |
|
|
gc.collect() |
|
|
return {"download_url": download_url} |
|
|
except Exception as e: |
|
|
logger.error(f"Error downloading track: {e}") |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
|
|
|
|
class AlbumRequest(BaseModel): |
|
|
id: str |
|
|
|
|
|
|
|
|
|
|
|
@app.post("/z_album") |
|
|
def fetch_album(request: AlbumRequest): |
|
|
album_id = request.id |
|
|
try: |
|
|
response = requests.get(f"{DEEZER_API_URL}/album/{album_id}") |
|
|
response.raise_for_status() |
|
|
album_data = response.json() |
|
|
tracks = album_data.get("tracks", {}).get("data", []) |
|
|
result = [] |
|
|
for track in tracks: |
|
|
title = track.get("title") |
|
|
link = track.get("link") |
|
|
if title and link: |
|
|
result.append({ |
|
|
"title": title, |
|
|
"link": link |
|
|
}) |
|
|
return result |
|
|
except requests.exceptions.RequestException as e: |
|
|
logger.error(f"Network error fetching album: {e}") |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
except Exception as e: |
|
|
logger.error(f"Error fetching album: {e}") |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
|
|
|
|
class PlaylistRequest(BaseModel): |
|
|
id: str |
|
|
|
|
|
|
|
|
|
|
|
@app.post("/z_playlist") |
|
|
def fetch_playlist(request: PlaylistRequest): |
|
|
playlist_id = request.id |
|
|
try: |
|
|
response = requests.get(f"{DEEZER_API_URL}/playlist/{playlist_id}") |
|
|
response.raise_for_status() |
|
|
playlist_data = response.json() |
|
|
tracks = playlist_data.get("tracks", {}).get("data", []) |
|
|
result = [] |
|
|
for track in tracks: |
|
|
title = track.get("title") |
|
|
link = track.get("link") |
|
|
if title and link: |
|
|
result.append({ |
|
|
"title": title, |
|
|
"link": link |
|
|
}) |
|
|
return result |
|
|
except requests.exceptions.RequestException as e: |
|
|
logger.error(f"Network error fetching album: {e}") |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
except Exception as e: |
|
|
logger.error(f"Error fetching album: {e}") |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
|
|
|
|
@app.get("/z_search") |
|
|
def search_tracks(query: str, limit: Optional[int] = 10): |
|
|
try: |
|
|
response = requests.get(f"{DEEZER_API_URL}/search", params={"q": query, "limit": limit}) |
|
|
return response.json() |
|
|
except requests.exceptions.RequestException as e: |
|
|
logger.error(f"Network error searching tracks: {e}") |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
except Exception as e: |
|
|
logger.error(f"Error searching tracks: {e}") |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
|
|
|
|
class SpotDlRequest(BaseModel): |
|
|
url: HttpUrl = Field(..., description="The URL to be processed.") |
|
|
quality: Literal["128", "320", "FLAC"] = Field( |
|
|
..., |
|
|
description="The desired quality. Currently, only '128' is supported for link generation." |
|
|
) |
|
|
|
|
|
|
|
|
class SpotDlResponse(BaseModel): |
|
|
download_url: str |
|
|
|
|
|
|
|
|
class ErrorResponse(BaseModel): |
|
|
detail: str |
|
|
|
|
|
|
|
|
@app.post( |
|
|
"/spot_dl", |
|
|
response_model=SpotDlResponse, |
|
|
responses={ |
|
|
400: {"model": ErrorResponse, "description": "Bad Request - Invalid input"}, |
|
|
422: {"model": ErrorResponse, "description": "Validation Error - Input data is not valid"} |
|
|
}, |
|
|
summary="Generate SpotDL Link", |
|
|
description="Accepts a URL and quality, returns a processed URL if quality is '128', " |
|
|
"otherwise returns an error for higher qualities." |
|
|
) |
|
|
async def create_spot_dl_link(request: SpotDlRequest = Body(...)): |
|
|
print(f"Received request: url='{request.url}', quality='{request.quality}'") |
|
|
|
|
|
if request.quality == "128": |
|
|
encoded_url = urllib.parse.quote(str(request.url), safe='') |
|
|
output_url = f"https://velynapi.vercel.app/api/downloader/spotifydl?url={encoded_url}" |
|
|
return SpotDlResponse(download_url=output_url) |
|
|
elif request.quality == "320" or request.quality == "FLAC": |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail=f"Quality '{request.quality}' is for Premium Users Only. '128' is allowed." |
|
|
) |
|
|
else: |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail=f"Invalid quality value: '{request.quality}'. Allowed values are '128', '320', 'FLAC'." |
|
|
) |
|
|
|