xtristan commited on
Commit
32fe069
·
verified ·
1 Parent(s): b0375ba

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +19 -113
app.py CHANGED
@@ -5,20 +5,14 @@ from deezspot.spotloader import SpoLogin
5
  import requests
6
  import os
7
  import logging
8
- import json
9
  from typing import Optional
10
  from fastapi.staticfiles import StaticFiles
11
  from dotenv import load_dotenv
12
  from pydantic import BaseModel, Field, HttpUrl
13
  from urllib.parse import quote
14
- from pathlib import Path
15
  import uuid
16
- from fastapi import BackgroundTasks
17
- from collections import defaultdict
18
- import time
19
- from datetime import timedelta
20
  import gc
21
- from typing import Dict, Union, Literal
22
  import urllib.parse
23
  from fastapi.responses import JSONResponse
24
 
@@ -31,7 +25,6 @@ app = FastAPI(title="Deezer API")
31
  load_dotenv()
32
 
33
  # Mount a static files directory to serve downloaded files
34
-
35
  os.makedirs("downloads", exist_ok=True)
36
  app.mount("/downloads", StaticFiles(directory="downloads"), name="downloads")
37
 
@@ -56,6 +49,7 @@ def convert_deezer_short_link_async(short_link: str) -> str:
56
  print(f"An error occurred: {e}")
57
  return ""
58
 
 
59
  @app.middleware("http")
60
  async def log_errors_middleware(request: Request, call_next):
61
  try:
@@ -91,71 +85,11 @@ def get_track(track_id: str):
91
  return get_track_info(track_id)
92
 
93
 
94
- # Rate limiting dictionary
95
- class RateLimiter:
96
- def __init__(self, max_requests: int, time_window: timedelta):
97
- self.max_requests = max_requests
98
- self.time_window = time_window
99
- self.requests: Dict[str, list] = defaultdict(list)
100
-
101
- def _cleanup_old_requests(self, user_ip: str) -> None:
102
- """Remove requests that are outside the time window."""
103
- current_time = time.time()
104
- self.requests[user_ip] = [
105
- timestamp for timestamp in self.requests[user_ip]
106
- if current_time - timestamp < self.time_window.total_seconds()
107
- ]
108
-
109
- def is_rate_limited(self, user_ip: str) -> bool:
110
- """Check if the user has exceeded their rate limit."""
111
- self._cleanup_old_requests(user_ip)
112
-
113
- # Get current count after cleanup
114
- current_count = len(self.requests[user_ip])
115
-
116
- # Add current request timestamp (incrementing the count)
117
- current_time = time.time()
118
- self.requests[user_ip].append(current_time)
119
-
120
- # Check if user has exceeded the maximum requests
121
- return (current_count + 1) > self.max_requests
122
-
123
- def get_current_count(self, user_ip: str) -> int:
124
- """Get the current request count for an IP."""
125
- self._cleanup_old_requests(user_ip)
126
- return len(self.requests[user_ip])
127
-
128
-
129
- # Initialize rate limiter with 25 requests per day
130
- rate_limiter = RateLimiter(
131
- max_requests=5,
132
- time_window=timedelta(days=1)
133
- )
134
-
135
-
136
- def get_user_ip(request: Request) -> str:
137
- """Helper function to get user's IP address."""
138
- forwarded = request.headers.get("X-Forwarded-For")
139
- if forwarded:
140
- return forwarded.split(",")[0]
141
- return request.client.host
142
-
143
-
144
- # Download a track and return a download URL
145
  @app.post("/download/track")
146
  def download_track(request: Request, download_request: DownloadRequest):
147
  try:
148
- user_ip = get_user_ip(request)
149
-
150
- if rate_limiter.is_rate_limited(user_ip):
151
- current_count = rate_limiter.get_current_count(user_ip)
152
- raise HTTPException(
153
- status_code=429,
154
- detail={
155
- "detail": "You have exceeded the maximum number of requests per day. Please try again tomorrow or get the Premium version for unlimited downloads at https://chrunos.com/premium-shortcuts/.",
156
- "help": "https://t.me/chrunoss"
157
- }
158
- )
159
  if download_request.arl is None or download_request.arl.strip() == "":
160
  ARL = ARL_TOKEN
161
  else:
@@ -171,7 +105,7 @@ def download_track(request: Request, download_request: DownloadRequest):
171
  if quality not in ["MP3_320", "MP3_128", "FLAC"]:
172
  raise HTTPException(status_code=400, detail="Invalid quality specified")
173
 
174
- # 提取 track_id (假设 url 格式为 https://api.deezer.com/track/{track_id})
175
  track_id = url.split("/")[-1]
176
 
177
  # Fetch track info
@@ -184,7 +118,6 @@ def download_track(request: Request, download_request: DownloadRequest):
184
  track_title = track_info.get("title", "track")
185
  artist_name = track_info.get("artist", {}).get("name", "unknown")
186
  file_extension = "flac" if quality == "FLAC" else "mp3"
187
- expected_filename = f"{artist_name} - {track_title}.{file_extension}".replace("/", "_") # Sanitize filename
188
 
189
  # Clear the downloads directory
190
  for root, dirs, files in os.walk("downloads"):
@@ -193,21 +126,16 @@ def download_track(request: Request, download_request: DownloadRequest):
193
  for dir in dirs:
194
  shutil.rmtree(os.path.join(root, dir))
195
 
196
- # Download the track using deezspot
197
- try:
198
- # 下载文件的代码
199
- dl.download_trackdee(
200
- link_track=track_link,
201
- output_dir="downloads",
202
- quality_download=quality,
203
- recursive_quality=False,
204
- recursive_download=False
205
- )
206
- except Exception as e:
207
- logger.error(f"Error downloading file: {e}")
208
- raise HTTPException(status_code=500, detail="File download failed")
209
-
210
- # Recursively search for the file in the downloads directory
211
  filepath = None
212
  for root, dirs, files in os.walk("downloads"):
213
  for file in files:
@@ -219,25 +147,22 @@ def download_track(request: Request, download_request: DownloadRequest):
219
 
220
  if not filepath:
221
  raise HTTPException(status_code=500, detail=f"{file_extension} file not found after download")
222
- if filepath:
223
- file_size = os.path.getsize(filepath)
224
- logger.info(f"Downloaded file size: {file_size} bytes")
225
 
226
  # Return the download URL
227
  relative_path = quote(str(os.path.relpath(filepath, "downloads")))
228
- # Remove spaces from the relative path
229
- # Auto-detect base URL from request
230
  base_url = str(request.base_url).rstrip('/')
231
  download_url = f"{base_url}/downloads/{relative_path}"
232
  logger.info(f"Download successful: {download_url}")
233
  gc.collect()
234
- return {"download_url": download_url, "requests_remaining": rate_limiter.max_requests - rate_limiter.get_current_count(user_ip)}
235
  except Exception as e:
236
  logger.error(f"Error downloading track: {e}")
237
  raise HTTPException(status_code=500, detail=str(e))
238
 
239
 
240
-
241
  # Pydantic model for album request
242
  class AlbumRequest(BaseModel):
243
  id: str
@@ -302,7 +227,6 @@ def fetch_playlist(request: PlaylistRequest):
302
  raise HTTPException(status_code=500, detail=str(e))
303
 
304
 
305
-
306
  # Search tracks using Deezer API
307
  @app.get("/z_search")
308
  def search_tracks(query: str, limit: Optional[int] = 10):
@@ -318,7 +242,6 @@ def search_tracks(query: str, limit: Optional[int] = 10):
318
 
319
 
320
  # --- Request Body Model ---
321
- # Defines the expected structure and validation for the request body.
322
  class SpotDlRequest(BaseModel):
323
  url: HttpUrl = Field(..., description="The URL to be processed.")
324
  quality: Literal["128", "320", "FLAC"] = Field(
@@ -327,12 +250,10 @@ class SpotDlRequest(BaseModel):
327
  )
328
 
329
  # --- Response Body Model ---
330
- # Defines the structure for a successful response.
331
  class SpotDlResponse(BaseModel):
332
  download_url: str
333
 
334
  # --- Error Response Model ---
335
- # Defines the structure for an error response.
336
  class ErrorResponse(BaseModel):
337
  detail: str
338
 
@@ -349,34 +270,19 @@ class ErrorResponse(BaseModel):
349
  "otherwise returns an error for higher qualities."
350
  )
351
  async def create_spot_dl_link(request: SpotDlRequest = Body(...)):
352
- """
353
- Processes a URL and quality to generate a download link.
354
-
355
- - **url**: The URL to process (must be a valid HTTP/HTTPS URL).
356
- - **quality**: The desired quality. Must be one of "128", "320", "FLAC".
357
- Currently, only "128" will result in a successful link generation.
358
- """
359
- # Log the received request for debugging (optional)
360
  print(f"Received request: url='{request.url}', quality='{request.quality}'")
361
 
362
- # Check the quality
363
  if request.quality == "128":
364
- # URL-encode the input URL
365
  encoded_url = urllib.parse.quote(str(request.url), safe='')
366
- # Construct the target URL
367
  output_url = f"https://velynapi.vercel.app/api/downloader/spotifydl?url={encoded_url}"
368
  return SpotDlResponse(download_url=output_url)
369
  elif request.quality == "320" or request.quality == "FLAC":
370
- # If quality is 320 or FLAC, return an error as per requirements
371
  raise HTTPException(
372
  status_code=400,
373
  detail=f"Quality '{request.quality}' is for Premium Users Only. '128' is allowed."
374
  )
375
  else:
376
- # This case should ideally be caught by Pydantic's Literal validation,
377
- # but as a fallback or for more general invalid quality values:
378
  raise HTTPException(
379
  status_code=400,
380
  detail=f"Invalid quality value: '{request.quality}'. Allowed values are '128', '320', 'FLAC'."
381
  )
382
-
 
5
  import requests
6
  import os
7
  import logging
 
8
  from typing import Optional
9
  from fastapi.staticfiles import StaticFiles
10
  from dotenv import load_dotenv
11
  from pydantic import BaseModel, Field, HttpUrl
12
  from urllib.parse import quote
 
13
  import uuid
 
 
 
 
14
  import gc
15
+ from typing import Literal
16
  import urllib.parse
17
  from fastapi.responses import JSONResponse
18
 
 
25
  load_dotenv()
26
 
27
  # Mount a static files directory to serve downloaded files
 
28
  os.makedirs("downloads", exist_ok=True)
29
  app.mount("/downloads", StaticFiles(directory="downloads"), name="downloads")
30
 
 
49
  print(f"An error occurred: {e}")
50
  return ""
51
 
52
+
53
  @app.middleware("http")
54
  async def log_errors_middleware(request: Request, call_next):
55
  try:
 
85
  return get_track_info(track_id)
86
 
87
 
88
+ # Download a track and return a download URL (NO RATE LIMITS)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
89
  @app.post("/download/track")
90
  def download_track(request: Request, download_request: DownloadRequest):
91
  try:
92
+ # Use provided ARL or fallback to env
 
 
 
 
 
 
 
 
 
 
93
  if download_request.arl is None or download_request.arl.strip() == "":
94
  ARL = ARL_TOKEN
95
  else:
 
105
  if quality not in ["MP3_320", "MP3_128", "FLAC"]:
106
  raise HTTPException(status_code=400, detail="Invalid quality specified")
107
 
108
+ # Extract track_id
109
  track_id = url.split("/")[-1]
110
 
111
  # Fetch track info
 
118
  track_title = track_info.get("title", "track")
119
  artist_name = track_info.get("artist", {}).get("name", "unknown")
120
  file_extension = "flac" if quality == "FLAC" else "mp3"
 
121
 
122
  # Clear the downloads directory
123
  for root, dirs, files in os.walk("downloads"):
 
126
  for dir in dirs:
127
  shutil.rmtree(os.path.join(root, dir))
128
 
129
+ # Download the track
130
+ dl.download_trackdee(
131
+ link_track=track_link,
132
+ output_dir="downloads",
133
+ quality_download=quality,
134
+ recursive_quality=False,
135
+ recursive_download=False
136
+ )
137
+
138
+ # Find the file
 
 
 
 
 
139
  filepath = None
140
  for root, dirs, files in os.walk("downloads"):
141
  for file in files:
 
147
 
148
  if not filepath:
149
  raise HTTPException(status_code=500, detail=f"{file_extension} file not found after download")
150
+
151
+ file_size = os.path.getsize(filepath)
152
+ logger.info(f"Downloaded file size: {file_size} bytes")
153
 
154
  # Return the download URL
155
  relative_path = quote(str(os.path.relpath(filepath, "downloads")))
 
 
156
  base_url = str(request.base_url).rstrip('/')
157
  download_url = f"{base_url}/downloads/{relative_path}"
158
  logger.info(f"Download successful: {download_url}")
159
  gc.collect()
160
+ return {"download_url": download_url}
161
  except Exception as e:
162
  logger.error(f"Error downloading track: {e}")
163
  raise HTTPException(status_code=500, detail=str(e))
164
 
165
 
 
166
  # Pydantic model for album request
167
  class AlbumRequest(BaseModel):
168
  id: str
 
227
  raise HTTPException(status_code=500, detail=str(e))
228
 
229
 
 
230
  # Search tracks using Deezer API
231
  @app.get("/z_search")
232
  def search_tracks(query: str, limit: Optional[int] = 10):
 
242
 
243
 
244
  # --- Request Body Model ---
 
245
  class SpotDlRequest(BaseModel):
246
  url: HttpUrl = Field(..., description="The URL to be processed.")
247
  quality: Literal["128", "320", "FLAC"] = Field(
 
250
  )
251
 
252
  # --- Response Body Model ---
 
253
  class SpotDlResponse(BaseModel):
254
  download_url: str
255
 
256
  # --- Error Response Model ---
 
257
  class ErrorResponse(BaseModel):
258
  detail: str
259
 
 
270
  "otherwise returns an error for higher qualities."
271
  )
272
  async def create_spot_dl_link(request: SpotDlRequest = Body(...)):
 
 
 
 
 
 
 
 
273
  print(f"Received request: url='{request.url}', quality='{request.quality}'")
274
 
 
275
  if request.quality == "128":
 
276
  encoded_url = urllib.parse.quote(str(request.url), safe='')
 
277
  output_url = f"https://velynapi.vercel.app/api/downloader/spotifydl?url={encoded_url}"
278
  return SpotDlResponse(download_url=output_url)
279
  elif request.quality == "320" or request.quality == "FLAC":
 
280
  raise HTTPException(
281
  status_code=400,
282
  detail=f"Quality '{request.quality}' is for Premium Users Only. '128' is allowed."
283
  )
284
  else:
 
 
285
  raise HTTPException(
286
  status_code=400,
287
  detail=f"Invalid quality value: '{request.quality}'. Allowed values are '128', '320', 'FLAC'."
288
  )