tecuts commited on
Commit
40b49ef
·
verified ·
1 Parent(s): 4e02b71

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +171 -170
app.py CHANGED
@@ -1,4 +1,8 @@
1
- from flask import Flask, render_template, jsonify, request
 
 
 
 
2
  from ytmusicapi import YTMusic
3
  import os
4
  import logging
@@ -7,43 +11,64 @@ from datetime import datetime, timedelta
7
  import time
8
  import asyncio
9
  import cloudscraper
10
- from pydantic import BaseModel
11
  from urllib.parse import urlparse, parse_qs
12
  from collections import defaultdict
13
  import threading
 
14
 
15
- app = Flask(__name__)
16
- ytmusic = YTMusic()
 
 
 
17
 
 
18
 
19
- # Configure logging
20
  logging.basicConfig(level=logging.INFO)
21
  logger = logging.getLogger(__name__)
22
 
23
- @app.route('/')
24
- def index():
25
- return render_template('index.html')
26
-
27
- @app.route('/search', methods=['POST'])
28
- def search():
29
- query = request.json.get('query', '')
30
- search_results = ytmusic.search(query, filter="songs")
31
- return jsonify(search_results)
32
-
33
- @app.route('/searcht', methods=['POST'])
34
- def searcht():
35
- query = request.json.get('query', '')
36
- logger.info(f"serch query: {query}")
37
- search_results = ytmusic.search(query, filter="songs")
38
- first_song = next((song for song in search_results if 'videoId' in song and song['videoId']), {}) if search_results else {}
39
- return jsonify(first_song)
40
 
 
 
 
 
41
 
42
- def extract_amazon_track_id(url: str):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
43
  """
44
  Extracts track ID from various Amazon Music URL formats.
45
  """
46
- if "music.amazon.com" not in url: # MODIFIED: Slight logic inversion for early exit compared to original, same effective outcome.
47
  return None
48
 
49
  parsed_url = urlparse(url)
@@ -52,153 +77,135 @@ def extract_amazon_track_id(url: str):
52
  if "trackAsin" in query_params:
53
  return query_params["trackAsin"][0]
54
 
55
- path_parts = parsed_url.path.split('/') # MODIFIED: Changed from simple `url.split` to more robust path parsing for Case 2.
56
  if "tracks" in path_parts:
57
  try:
58
  track_id_index = path_parts.index("tracks") + 1
59
  if track_id_index < len(path_parts):
60
- return path_parts[track_id_index] # MODIFIED: Accessing specific part after "tracks".
61
  except (ValueError, IndexError):
62
  pass
63
 
64
- logger.warning(f"Could not extract Amazon track ID from URL: {url}") # ADDED: Logging for when no ID is found.
65
  return None
66
 
67
-
68
- def get_song_link_info(url: str):
69
  """
70
  Fetches track information from the Song.link API.
71
  Uses requests.get() which is a blocking call.
72
  """
73
- api_base_url = "https://api.song.link/v1-alpha.1/links" # ADDED: Defined base URL for clarity.
74
- params = {"userCountry": "US"} # MODIFIED: Using a params dictionary for requests.get().
75
 
76
  if "music.amazon.com" in url:
77
  track_id = extract_amazon_track_id(url)
78
  if track_id:
79
- params["platform"] = "amazonMusic" # MODIFIED: Populating params dict.
80
  params["id"] = track_id
81
  params["type"] = "song"
82
  else:
83
- params["url"] = url # MODIFIED: Populating params dict.
84
  else:
85
- params["url"] = url # MODIFIED: Populating params dict.
86
 
87
- try: # ADDED: try-except block for robust error handling during API call.
88
- logger.info(f"Querying Song.link API with params: {params}") # ADDED: Logging the API query.
89
- response = requests.get(api_base_url, params=params, timeout=10) # MODIFIED: Call uses base_url and params. ADDED: timeout.
90
- response.raise_for_status() # ADDED: Checks for HTTP errors (4xx or 5xx responses).
91
  return response.json()
92
- except requests.exceptions.RequestException as e: # ADDED: Catching network/request related exceptions.
93
- logger.error(f"Error fetching from Song.link API: {e}") # ADDED: Logging the specific error.
94
  return None
95
 
96
- def extract_url(links_by_platform: dict, platform: str):
97
  """
98
  Extracts a specific platform URL from Song.link API response.
99
  """
100
- # MODIFIED: Added .get("url") for safer access to prevent KeyError if "url" key is missing.
101
  if platform in links_by_platform and links_by_platform[platform].get("url"):
102
  return links_by_platform[platform]["url"]
103
- logger.warning(f"No URL found for platform '{platform}' in links: {links_by_platform.keys()}") # ADDED: Logging if platform URL not found.
 
104
  return None
105
 
106
- @app.route('/match', methods=['POST'])
107
- def match(): # MODIFIED: Changed from `async def` to `def` for synchronous Flask.
108
  """
109
  Matches a given music track URL to a YouTube Music URL.
110
- Expects a JSON body with "url".
111
  """
112
- data = request.get_json()
113
- if not data: # ADDED: Check for empty JSON payload.
114
- logger.error("Match endpoint: No JSON payload received.") # ADDED: Logging.
115
- return jsonify({"detail": "No JSON payload received."}), 400 # MODIFIED: Flask-style JSON error response.
116
-
117
- track_url = data.get('url')
118
- # MODIFIED: Added more specific validation for track_url presence and type.
119
- if not track_url or not isinstance(track_url, str):
120
- logger.error(f"Match endpoint: Invalid or missing URL: {track_url}") # ADDED: Logging.
121
- return jsonify({"detail": "Valid 'url' string is required in request body."}), 400 # MODIFIED: Flask-style JSON error response.
122
-
123
- logger.info(f"Match endpoint: Processing URL: {track_url}") # ADDED: Logging.
124
 
125
  track_info = get_song_link_info(track_url)
126
  if not track_info:
127
- logger.error(f"Match endpoint: Could not fetch track info for URL: {track_url}") # ADDED: Logging.
128
- # MODIFIED: Flask-style JSON error response instead of HTTPException.
129
- return jsonify({"detail": "Could not fetch track info from Song.link API."}), 404
130
 
131
- entity_unique_id = track_info.get("entityUniqueId") # MODIFIED: Used .get() for safer access.
132
  title = None
133
  artist = None
134
 
135
- # MODIFIED: More robust extraction of title and artist with checks and logging.
136
  if entity_unique_id and entity_unique_id in track_info.get("entitiesByUniqueId", {}):
137
  main_entity = track_info["entitiesByUniqueId"][entity_unique_id]
138
  title = main_entity.get("title")
139
  artist = main_entity.get("artistName")
140
- logger.info(f"Match endpoint: Found main entity - Title: '{title}', Artist: '{artist}'") # ADDED: Logging.
141
  else:
142
- logger.warning(f"Match endpoint: Could not find main entity details for {track_url} using entityUniqueId: {entity_unique_id}") # ADDED: Logging.
143
- # ADDED: Fallback logic to find title/artist from other entities if main one fails.
 
144
  for entity_id, entity_data in track_info.get("entitiesByUniqueId", {}).items():
145
  if entity_data.get("title") and entity_data.get("artistName"):
146
  title = entity_data.get("title")
147
  artist = entity_data.get("artistName")
148
- logger.info(f"Match endpoint: Using fallback entity - Title: '{title}', Artist: '{artist}' from entity ID {entity_id}") # ADDED: Logging.
149
  break
150
- if not title or not artist: # ADDED: Check if title/artist still not found after fallback.
151
- logger.error(f"Match endpoint: Could not determine title and artist for URL: {track_url}") # ADDED: Logging.
152
- return jsonify({"detail": "Could not determine title and artist from Song.link info."}), 404 # MODIFIED: Flask-style JSON error.
153
-
154
 
155
- youtube_url = extract_url(track_info.get("linksByPlatform", {}), "youtube") # MODIFIED: Used .get() for safer access.
 
 
156
 
 
 
157
  if youtube_url:
158
  video_id = None
159
- # MODIFIED: Improved video_id extraction from youtube_url, handles direct watch links and youtu.be, and strips extra params.
160
  if "v=" in youtube_url:
161
  video_id = youtube_url.split("v=")[1].split("&")[0]
162
- elif "youtu.be/" in youtube_url: # MODIFIED: Handling for youtu.be links if present in song.link
163
  video_id = youtube_url.split("youtu.be/")[1].split("?")[0]
164
 
165
  filename = f"{title} - {artist}" if title and artist else "Unknown Track - Unknown Artist"
166
- logger.info(f"Match endpoint: Found direct YouTube URL: {youtube_url}, Video ID: {video_id}") # ADDED: Logging.
167
- # MODIFIED: Flask-style JSON response instead of returning dict directly.
168
- return jsonify({"url": youtube_url, "filename": filename, "track_id": video_id}), 200
169
  else:
170
- logger.info(f"Match endpoint: No direct YouTube URL. Searching YTMusic with: '{title} - {artist}'") # ADDED: Logging.
171
- # ADDED: Explicit check if title or artist is missing before searching.
172
- if not title or not artist:
173
- logger.error("Match endpoint: Cannot search YTMusic without title and artist.") # ADDED: Logging.
174
- return jsonify({"detail": "Cannot search on YouTube Music without title and artist information."}), 400 # MODIFIED: Flask-style JSON error.
175
-
176
- search_query = f'{title} {artist}' # MODIFIED: Changed from '+' to space for a more natural search query.
177
  search_results = ytmusic.search(search_query, filter="songs")
178
-
179
  if search_results:
180
- # MODIFIED: Improved logic to pick the first song with a videoId using next() and .get().
181
  first_song = next((song for song in search_results if song.get('videoId')), None)
 
182
  if first_song and first_song.get('videoId'):
183
  video_id = first_song["videoId"]
184
- # MODIFIED: Changed ym_url to a standard YouTube watch URL format.
185
  ym_url = f'https://music.youtube.com/watch?v={video_id}'
186
- # MODIFIED: More robust filename generation using .get() and providing fallbacks.
187
- filename = f"{first_song.get('title', title)} - {first_song.get('artists', [{'name': artist}])[0]['name']}"
188
- logger.info(f"Match endpoint: Found YTMusic search result - URL: {ym_url}, Video ID: {video_id}") # ADDED: Logging.
189
- # MODIFIED: Flask-style JSON response.
190
- return jsonify({"filename": filename, "url": ym_url, "track_id": video_id}), 200
 
 
 
 
 
191
  else:
192
- logger.error(f"Match endpoint: YTMusic search for '{search_query}' yielded no results with a videoId.") # ADDED: Logging.
193
- # MODIFIED: Flask-style JSON error response.
194
- return jsonify({"detail": "No matching video ID found on YouTube Music after search."}), 404
195
  else:
196
- logger.error(f"Match endpoint: YTMusic search for '{search_query}' yielded no results.") # ADDED: Logging.
197
- # MODIFIED: Flask-style JSON error response.
198
- return jsonify({"detail": "No results found on YouTube Music for the track."}), 404
199
- # REMOVED: The final `raise HTTPException` was determined to be unreachable and removed.
200
-
201
-
202
 
203
  class ApiRotator:
204
  def __init__(self, apis):
@@ -207,10 +214,9 @@ class ApiRotator:
207
 
208
  def get_prioritized_apis(self):
209
  if self.last_successful_index is not None:
210
- # Move the last successful API to the front
211
  rotated_apis = (
212
- [self.apis[self.last_successful_index]] +
213
- self.apis[:self.last_successful_index] +
214
  self.apis[self.last_successful_index+1:]
215
  )
216
  return rotated_apis
@@ -219,21 +225,18 @@ class ApiRotator:
219
  def update_last_successful(self, index):
220
  self.last_successful_index = index
221
 
222
- # In your function:
223
  api_rotator = ApiRotator([
224
  "https://cobalt-api.ayo.tf",
225
  "https://cobalt-api.kwiatekmiki.com",
226
  "http://34.107.254.11",
227
  "https://dwnld.nichind.dev",
228
  "https://yt.edd1e.xyz/"
229
-
230
  ])
231
 
232
-
233
-
234
  async def get_track_download_url(track_id: str, quality: str) -> str:
235
  apis = api_rotator.get_prioritized_apis()
236
- session = cloudscraper.create_scraper() # Requires cloudscraper package
 
237
  headers = {
238
  "Accept": "application/json",
239
  "Content-Type": "application/json",
@@ -244,12 +247,19 @@ async def get_track_download_url(track_id: str, quality: str) -> str:
244
  try:
245
  logger.info(f"Attempting to get download URL from: {api_url}")
246
  y_url = f"https://youtu.be/{track_id}"
247
- response = session.post(
248
- api_url,
249
- timeout=20,
250
- json={"url": y_url, "audioFormat": "mp3", "downloadMode": "audio", "audioBitrate": quality},
251
- headers=headers
 
 
 
 
 
 
252
  )
 
253
  logger.info(f"Response status: {response.status_code}")
254
  logger.info(f"Response content: {response.content}")
255
 
@@ -259,81 +269,72 @@ async def get_track_download_url(track_id: str, quality: str) -> str:
259
 
260
  if error_code == "error.api.content.video.unavailable":
261
  logger.warning(f"Video unavailable error from {api_url}")
262
- break # Only break for specific error
263
 
264
  if "url" in json_response:
265
  api_rotator.update_last_successful(i)
266
  return json_response["url"]
267
-
268
  except Exception as e:
269
  logger.error(f"Failed with {api_url}: {str(e)}")
270
  continue
271
-
272
- logger.error(f"No download URL found")
273
- return {"error": "Download URL not found"}
274
-
275
-
276
-
277
-
278
-
279
 
 
 
280
 
281
- @app.route('/track_dl', methods=['POST'])
282
- async def track_dl():
283
-
284
- data = request.get_json()
285
- track_id = data.get('track_id')
286
- quality = data.get('quality', '128')
287
-
288
  try:
289
- quality_num = int(quality)
290
- if quality_num > 128 or quality.upper() == 'FLAC':
291
- return jsonify({
292
- "error": "Quality above 128 or FLAC is for Premium users Only.",
293
- "premium": "https://chrunos.com/premium-shortcuts/"
294
- }), 400
295
-
296
- dl_url = await get_track_download_url(track_id, quality)
 
 
 
297
 
298
  if dl_url and "http" in dl_url:
299
-
300
- result = {
301
  "url": dl_url,
302
  "premium": "https://chrunos.com/premium-shortcuts/"
303
  }
304
- return jsonify(result)
305
  else:
306
- return jsonify({
307
- "error": "Failed to Fetch the Track.",
308
- "premium": "https://chrunos.com/premium-shortcuts/"
309
- }), 400
310
-
311
- except ValueError:
312
- return jsonify({
313
- "error": "Invalid quality value provided. It should be a valid integer or FLAC.",
314
- "premium": "https://chrunos.com/premium-shortcuts/"
315
- }), 400
316
-
317
-
318
 
 
 
 
 
 
 
 
 
319
 
320
- @app.route('/get_artist', methods=['GET'])
321
- def get_artist():
322
- artist_id = request.args.get('id')
323
- artist_info = ytmusic.get_artist(artist_id)
324
- return jsonify(artist_info)
325
 
326
- @app.route('/get_album', methods=['GET'])
327
- def get_album():
328
- album_id = request.args.get('id')
329
- album_info = ytmusic.get_album(album_id)
330
- return jsonify(album_info)
331
 
332
- @app.route('/get_song', methods=['GET'])
333
- def get_song():
334
- song_id = request.args.get('id')
335
- song_info = ytmusic.get_song(song_id)
336
- return jsonify(song_info)
337
 
338
  if __name__ == '__main__':
339
- app.run(host='0.0.0.0', port=7860)
 
 
1
+ from fastapi import FastAPI, HTTPException, Request
2
+ from fastapi.responses import HTMLResponse
3
+ from fastapi.staticfiles import StaticFiles
4
+ from fastapi.templating import Jinja2Templates
5
+ from pydantic import BaseModel
6
  from ytmusicapi import YTMusic
7
  import os
8
  import logging
 
11
  import time
12
  import asyncio
13
  import cloudscraper
 
14
  from urllib.parse import urlparse, parse_qs
15
  from collections import defaultdict
16
  import threading
17
+ from typing import Optional, Dict, Any
18
 
19
+ app = FastAPI()
20
+
21
+ # Mount static files and templates
22
+ app.mount("/static", StaticFiles(directory="static"), name="static")
23
+ templates = Jinja2Templates(directory="templates")
24
 
25
+ ytmusic = YTMusic()
26
 
27
+ # Configure logging
28
  logging.basicConfig(level=logging.INFO)
29
  logger = logging.getLogger(__name__)
30
 
31
+ # Pydantic models for request/response validation
32
+ class SearchRequest(BaseModel):
33
+ query: str
34
+
35
+ class MatchRequest(BaseModel):
36
+ url: str
37
+
38
+ class TrackDownloadRequest(BaseModel):
39
+ track_id: str
40
+ quality: str = "128"
 
 
 
 
 
 
 
41
 
42
+ class MatchResponse(BaseModel):
43
+ url: str
44
+ filename: str
45
+ track_id: str
46
 
47
+ class ErrorResponse(BaseModel):
48
+ detail: str
49
+ premium: Optional[str] = None
50
+
51
+ @app.get("/", response_class=HTMLResponse)
52
+ async def index(request: Request):
53
+ return templates.TemplateResponse("index.html", {"request": request})
54
+
55
+ @app.post("/search")
56
+ async def search(request: SearchRequest):
57
+ search_results = ytmusic.search(request.query, filter="songs")
58
+ return search_results
59
+
60
+ @app.post("/searcht")
61
+ async def searcht(request: SearchRequest):
62
+ logger.info(f"search query: {request.query}")
63
+ search_results = ytmusic.search(request.query, filter="songs")
64
+ first_song = next((song for song in search_results if 'videoId' in song and song['videoId']), {}) if search_results else {}
65
+ return first_song
66
+
67
+ def extract_amazon_track_id(url: str) -> Optional[str]:
68
  """
69
  Extracts track ID from various Amazon Music URL formats.
70
  """
71
+ if "music.amazon.com" not in url:
72
  return None
73
 
74
  parsed_url = urlparse(url)
 
77
  if "trackAsin" in query_params:
78
  return query_params["trackAsin"][0]
79
 
80
+ path_parts = parsed_url.path.split('/')
81
  if "tracks" in path_parts:
82
  try:
83
  track_id_index = path_parts.index("tracks") + 1
84
  if track_id_index < len(path_parts):
85
+ return path_parts[track_id_index]
86
  except (ValueError, IndexError):
87
  pass
88
 
89
+ logger.warning(f"Could not extract Amazon track ID from URL: {url}")
90
  return None
91
 
92
+ def get_song_link_info(url: str) -> Optional[Dict[str, Any]]:
 
93
  """
94
  Fetches track information from the Song.link API.
95
  Uses requests.get() which is a blocking call.
96
  """
97
+ api_base_url = "https://api.song.link/v1-alpha.1/links"
98
+ params = {"userCountry": "US"}
99
 
100
  if "music.amazon.com" in url:
101
  track_id = extract_amazon_track_id(url)
102
  if track_id:
103
+ params["platform"] = "amazonMusic"
104
  params["id"] = track_id
105
  params["type"] = "song"
106
  else:
107
+ params["url"] = url
108
  else:
109
+ params["url"] = url
110
 
111
+ try:
112
+ logger.info(f"Querying Song.link API with params: {params}")
113
+ response = requests.get(api_base_url, params=params, timeout=10)
114
+ response.raise_for_status()
115
  return response.json()
116
+ except requests.exceptions.RequestException as e:
117
+ logger.error(f"Error fetching from Song.link API: {e}")
118
  return None
119
 
120
+ def extract_url(links_by_platform: dict, platform: str) -> Optional[str]:
121
  """
122
  Extracts a specific platform URL from Song.link API response.
123
  """
 
124
  if platform in links_by_platform and links_by_platform[platform].get("url"):
125
  return links_by_platform[platform]["url"]
126
+
127
+ logger.warning(f"No URL found for platform '{platform}' in links: {links_by_platform.keys()}")
128
  return None
129
 
130
+ @app.post("/match", response_model=MatchResponse)
131
+ async def match(request: MatchRequest):
132
  """
133
  Matches a given music track URL to a YouTube Music URL.
 
134
  """
135
+ track_url = request.url
136
+ logger.info(f"Match endpoint: Processing URL: {track_url}")
 
 
 
 
 
 
 
 
 
 
137
 
138
  track_info = get_song_link_info(track_url)
139
  if not track_info:
140
+ logger.error(f"Match endpoint: Could not fetch track info for URL: {track_url}")
141
+ raise HTTPException(status_code=404, detail="Could not fetch track info from Song.link API.")
 
142
 
143
+ entity_unique_id = track_info.get("entityUniqueId")
144
  title = None
145
  artist = None
146
 
 
147
  if entity_unique_id and entity_unique_id in track_info.get("entitiesByUniqueId", {}):
148
  main_entity = track_info["entitiesByUniqueId"][entity_unique_id]
149
  title = main_entity.get("title")
150
  artist = main_entity.get("artistName")
151
+ logger.info(f"Match endpoint: Found main entity - Title: '{title}', Artist: '{artist}'")
152
  else:
153
+ logger.warning(f"Match endpoint: Could not find main entity details for {track_url} using entityUniqueId: {entity_unique_id}")
154
+
155
+ # Fallback logic to find title/artist from other entities
156
  for entity_id, entity_data in track_info.get("entitiesByUniqueId", {}).items():
157
  if entity_data.get("title") and entity_data.get("artistName"):
158
  title = entity_data.get("title")
159
  artist = entity_data.get("artistName")
160
+ logger.info(f"Match endpoint: Using fallback entity - Title: '{title}', Artist: '{artist}' from entity ID {entity_id}")
161
  break
 
 
 
 
162
 
163
+ if not title or not artist:
164
+ logger.error(f"Match endpoint: Could not determine title and artist for URL: {track_url}")
165
+ raise HTTPException(status_code=404, detail="Could not determine title and artist from Song.link info.")
166
 
167
+ youtube_url = extract_url(track_info.get("linksByPlatform", {}), "youtube")
168
+
169
  if youtube_url:
170
  video_id = None
171
+
172
  if "v=" in youtube_url:
173
  video_id = youtube_url.split("v=")[1].split("&")[0]
174
+ elif "youtu.be/" in youtube_url:
175
  video_id = youtube_url.split("youtu.be/")[1].split("?")[0]
176
 
177
  filename = f"{title} - {artist}" if title and artist else "Unknown Track - Unknown Artist"
178
+ logger.info(f"Match endpoint: Found direct YouTube URL: {youtube_url}, Video ID: {video_id}")
179
+
180
+ return MatchResponse(url=youtube_url, filename=filename, track_id=video_id)
181
  else:
182
+ logger.info(f"Match endpoint: No direct YouTube URL. Searching YTMusic with: '{title} - {artist}'")
183
+
184
+ search_query = f'{title} {artist}'
 
 
 
 
185
  search_results = ytmusic.search(search_query, filter="songs")
186
+
187
  if search_results:
 
188
  first_song = next((song for song in search_results if song.get('videoId')), None)
189
+
190
  if first_song and first_song.get('videoId'):
191
  video_id = first_song["videoId"]
 
192
  ym_url = f'https://music.youtube.com/watch?v={video_id}'
193
+
194
+ # Get artist name safely
195
+ artist_name = artist
196
+ if first_song.get('artists') and len(first_song['artists']) > 0:
197
+ artist_name = first_song['artists'][0]['name']
198
+
199
+ filename = f"{first_song.get('title', title)} - {artist_name}"
200
+ logger.info(f"Match endpoint: Found YTMusic search result - URL: {ym_url}, Video ID: {video_id}")
201
+
202
+ return MatchResponse(filename=filename, url=ym_url, track_id=video_id)
203
  else:
204
+ logger.error(f"Match endpoint: YTMusic search for '{search_query}' yielded no results with a videoId.")
205
+ raise HTTPException(status_code=404, detail="No matching video ID found on YouTube Music after search.")
 
206
  else:
207
+ logger.error(f"Match endpoint: YTMusic search for '{search_query}' yielded no results.")
208
+ raise HTTPException(status_code=404, detail="No results found on YouTube Music for the track.")
 
 
 
 
209
 
210
  class ApiRotator:
211
  def __init__(self, apis):
 
214
 
215
  def get_prioritized_apis(self):
216
  if self.last_successful_index is not None:
 
217
  rotated_apis = (
218
+ [self.apis[self.last_successful_index]] +
219
+ self.apis[:self.last_successful_index] +
220
  self.apis[self.last_successful_index+1:]
221
  )
222
  return rotated_apis
 
225
  def update_last_successful(self, index):
226
  self.last_successful_index = index
227
 
 
228
  api_rotator = ApiRotator([
229
  "https://cobalt-api.ayo.tf",
230
  "https://cobalt-api.kwiatekmiki.com",
231
  "http://34.107.254.11",
232
  "https://dwnld.nichind.dev",
233
  "https://yt.edd1e.xyz/"
 
234
  ])
235
 
 
 
236
  async def get_track_download_url(track_id: str, quality: str) -> str:
237
  apis = api_rotator.get_prioritized_apis()
238
+ session = cloudscraper.create_scraper()
239
+
240
  headers = {
241
  "Accept": "application/json",
242
  "Content-Type": "application/json",
 
247
  try:
248
  logger.info(f"Attempting to get download URL from: {api_url}")
249
  y_url = f"https://youtu.be/{track_id}"
250
+
251
+ # Use asyncio to run the blocking request in a thread pool
252
+ loop = asyncio.get_event_loop()
253
+ response = await loop.run_in_executor(
254
+ None,
255
+ lambda: session.post(
256
+ api_url,
257
+ timeout=20,
258
+ json={"url": y_url, "audioFormat": "mp3", "downloadMode": "audio", "audioBitrate": quality},
259
+ headers=headers
260
+ )
261
  )
262
+
263
  logger.info(f"Response status: {response.status_code}")
264
  logger.info(f"Response content: {response.content}")
265
 
 
269
 
270
  if error_code == "error.api.content.video.unavailable":
271
  logger.warning(f"Video unavailable error from {api_url}")
272
+ break
273
 
274
  if "url" in json_response:
275
  api_rotator.update_last_successful(i)
276
  return json_response["url"]
277
+
278
  except Exception as e:
279
  logger.error(f"Failed with {api_url}: {str(e)}")
280
  continue
 
 
 
 
 
 
 
 
281
 
282
+ logger.error(f"No download URL found")
283
+ return ""
284
 
285
+ @app.post("/track_dl")
286
+ async def track_dl(request: TrackDownloadRequest):
 
 
 
 
 
287
  try:
288
+ quality_num = int(request.quality)
289
+ if quality_num > 128 or request.quality.upper() == 'FLAC':
290
+ raise HTTPException(
291
+ status_code=400,
292
+ detail={
293
+ "error": "Quality above 128 or FLAC is for Premium users Only.",
294
+ "premium": "https://chrunos.com/premium-shortcuts/"
295
+ }
296
+ )
297
+
298
+ dl_url = await get_track_download_url(request.track_id, request.quality)
299
 
300
  if dl_url and "http" in dl_url:
301
+ return {
 
302
  "url": dl_url,
303
  "premium": "https://chrunos.com/premium-shortcuts/"
304
  }
 
305
  else:
306
+ raise HTTPException(
307
+ status_code=400,
308
+ detail={
309
+ "error": "Failed to Fetch the Track.",
310
+ "premium": "https://chrunos.com/premium-shortcuts/"
311
+ }
312
+ )
 
 
 
 
 
313
 
314
+ except ValueError:
315
+ raise HTTPException(
316
+ status_code=400,
317
+ detail={
318
+ "error": "Invalid quality value provided. It should be a valid integer or FLAC.",
319
+ "premium": "https://chrunos.com/premium-shortcuts/"
320
+ }
321
+ )
322
 
323
+ @app.get("/get_artist")
324
+ async def get_artist(id: str):
325
+ artist_info = ytmusic.get_artist(id)
326
+ return artist_info
 
327
 
328
+ @app.get("/get_album")
329
+ async def get_album(id: str):
330
+ album_info = ytmusic.get_album(id)
331
+ return album_info
 
332
 
333
+ @app.get("/get_song")
334
+ async def get_song(id: str):
335
+ song_info = ytmusic.get_song(id)
336
+ return song_info
 
337
 
338
  if __name__ == '__main__':
339
+ import uvicorn
340
+ uvicorn.run(app, host='0.0.0.0', port=7860)