zer / app.py
xtristan's picture
Update app.py
b36e841 verified
import shutil
from fastapi import FastAPI, HTTPException, Request, Body
from deezspot.deezloader import DeeLogin
from deezspot.spotloader import SpoLogin
import requests
import os
import logging
from typing import Optional
from fastapi.staticfiles import StaticFiles
from dotenv import load_dotenv
from pydantic import BaseModel, Field, HttpUrl
from urllib.parse import quote
import uuid
import gc
from typing import Literal
import urllib.parse
from fastapi.responses import JSONResponse
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="Deezer API")
# Load environment variables
load_dotenv()
# Mount a static files directory to serve downloaded files
os.makedirs("downloads", exist_ok=True)
app.mount("/downloads", StaticFiles(directory="downloads"), name="downloads")
# Deezer API base URL
DEEZER_API_URL = "https://api.deezer.com"
# Deezer ARL token (required for deezspot downloads)
ARL_TOKEN = os.getenv('ARL')
class DownloadRequest(BaseModel):
url: str
quality: str
arl: str
def convert_deezer_short_link_async(short_link: str) -> str:
try:
response = requests.get(short_link, allow_redirects=True)
return response.url
except requests.RequestException as e:
print(f"An error occurred: {e}")
return ""
@app.middleware("http")
async def log_errors_middleware(request: Request, call_next):
try:
return await call_next(request)
except Exception as e:
logger.error("An unhandled exception occurred!", exc_info=True)
return JSONResponse(status_code=500, content={'detail': 'Internal Server Error'})
@app.get("/")
def read_root():
return {"message": "running"}
# Helper function to get track info
def get_track_info(track_id: str):
try:
response = requests.get(f"{DEEZER_API_URL}/track/{track_id}")
if response.status_code != 200:
raise HTTPException(status_code=404, detail="Track not found")
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"Network error fetching track metadata: {e}")
raise HTTPException(status_code=500, detail=str(e))
except Exception as e:
logger.error(f"Error fetching track metadata: {e}")
raise HTTPException(status_code=500, detail=str(e))
# Fetch track metadata from Deezer API
@app.get("/track/{track_id}")
def get_track(track_id: str):
return get_track_info(track_id)
# Download a track and return a download URL (NO RATE LIMITS)
@app.post("/download/track")
def download_track(request: Request, download_request: DownloadRequest):
try:
# Use provided ARL or fallback to env
if download_request.arl is None or download_request.arl.strip() == "":
ARL = ARL_TOKEN
else:
ARL = download_request.arl
logger.info(f'arl: {ARL}')
url = download_request.url
if 'dzr.page' in url or 'deezer.page' in url or 'link.deezer' in url:
url = convert_deezer_short_link_async(url)
quality = download_request.quality
dl = DeeLogin(arl=ARL)
logger.info(f'track_url: {url}')
if quality not in ["MP3_320", "MP3_128", "FLAC"]:
raise HTTPException(status_code=400, detail="Invalid quality specified")
# Extract track_id
track_id = url.split("/")[-1]
# Fetch track info
track_info = get_track_info(track_id)
track_link = track_info.get("link")
if not track_link:
raise HTTPException(status_code=404, detail="Track link not found")
# Sanitize filename
track_title = track_info.get("title", "track")
artist_name = track_info.get("artist", {}).get("name", "unknown")
file_extension = "flac" if quality == "FLAC" else "mp3"
# Clear the downloads directory
for root, dirs, files in os.walk("downloads"):
for file in files:
os.remove(os.path.join(root, file))
for dir in dirs:
shutil.rmtree(os.path.join(root, dir))
# Download the track
dl.download_trackdee(
link_track=track_link,
output_dir="downloads",
quality_download=quality,
recursive_quality=False,
recursive_download=False
)
# Find the file
filepath = None
for root, dirs, files in os.walk("downloads"):
for file in files:
if file.endswith(f'.{file_extension}'):
filepath = os.path.join(root, file)
break
if filepath:
break
if not filepath:
raise HTTPException(status_code=500, detail=f"{file_extension} file not found after download")
file_size = os.path.getsize(filepath)
logger.info(f"Downloaded file size: {file_size} bytes")
# Return the download URL
relative_path = quote(str(os.path.relpath(filepath, "downloads")))
base_url = str(request.base_url).rstrip('/')
if base_url.startswith("http://"):
base_url = base_url.replace("http://", "https://", 1)
download_url = f"{base_url}/downloads/{relative_path}"
logger.info(f"Download successful: {download_url}")
gc.collect()
return {"download_url": download_url}
except Exception as e:
logger.error(f"Error downloading track: {e}")
raise HTTPException(status_code=500, detail=str(e))
# Pydantic model for album request
class AlbumRequest(BaseModel):
id: str
# Fetch album data
@app.post("/z_album")
def fetch_album(request: AlbumRequest):
album_id = request.id
try:
response = requests.get(f"{DEEZER_API_URL}/album/{album_id}")
response.raise_for_status()
album_data = response.json()
tracks = album_data.get("tracks", {}).get("data", [])
result = []
for track in tracks:
title = track.get("title")
link = track.get("link")
if title and link:
result.append({
"title": title,
"link": link
})
return result
except requests.exceptions.RequestException as e:
logger.error(f"Network error fetching album: {e}")
raise HTTPException(status_code=500, detail=str(e))
except Exception as e:
logger.error(f"Error fetching album: {e}")
raise HTTPException(status_code=500, detail=str(e))
# Pydantic model for playlist request
class PlaylistRequest(BaseModel):
id: str
# Fetch playlist data
@app.post("/z_playlist")
def fetch_playlist(request: PlaylistRequest):
playlist_id = request.id
try:
response = requests.get(f"{DEEZER_API_URL}/playlist/{playlist_id}")
response.raise_for_status()
playlist_data = response.json()
tracks = playlist_data.get("tracks", {}).get("data", [])
result = []
for track in tracks:
title = track.get("title")
link = track.get("link")
if title and link:
result.append({
"title": title,
"link": link
})
return result
except requests.exceptions.RequestException as e:
logger.error(f"Network error fetching album: {e}")
raise HTTPException(status_code=500, detail=str(e))
except Exception as e:
logger.error(f"Error fetching album: {e}")
raise HTTPException(status_code=500, detail=str(e))
# Search tracks using Deezer API
@app.get("/z_search")
def search_tracks(query: str, limit: Optional[int] = 10):
try:
response = requests.get(f"{DEEZER_API_URL}/search", params={"q": query, "limit": limit})
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"Network error searching tracks: {e}")
raise HTTPException(status_code=500, detail=str(e))
except Exception as e:
logger.error(f"Error searching tracks: {e}")
raise HTTPException(status_code=500, detail=str(e))
# --- Request Body Model ---
class SpotDlRequest(BaseModel):
url: HttpUrl = Field(..., description="The URL to be processed.")
quality: Literal["128", "320", "FLAC"] = Field(
...,
description="The desired quality. Currently, only '128' is supported for link generation."
)
# --- Response Body Model ---
class SpotDlResponse(BaseModel):
download_url: str
# --- Error Response Model ---
class ErrorResponse(BaseModel):
detail: str
# --- API Endpoint ---
@app.post(
"/spot_dl",
response_model=SpotDlResponse,
responses={
400: {"model": ErrorResponse, "description": "Bad Request - Invalid input"},
422: {"model": ErrorResponse, "description": "Validation Error - Input data is not valid"}
},
summary="Generate SpotDL Link",
description="Accepts a URL and quality, returns a processed URL if quality is '128', "
"otherwise returns an error for higher qualities."
)
async def create_spot_dl_link(request: SpotDlRequest = Body(...)):
print(f"Received request: url='{request.url}', quality='{request.quality}'")
if request.quality == "128":
encoded_url = urllib.parse.quote(str(request.url), safe='')
output_url = f"https://velynapi.vercel.app/api/downloader/spotifydl?url={encoded_url}"
return SpotDlResponse(download_url=output_url)
elif request.quality == "320" or request.quality == "FLAC":
raise HTTPException(
status_code=400,
detail=f"Quality '{request.quality}' is for Premium Users Only. '128' is allowed."
)
else:
raise HTTPException(
status_code=400,
detail=f"Invalid quality value: '{request.quality}'. Allowed values are '128', '320', 'FLAC'."
)