tecuts commited on
Commit
5863066
·
verified ·
1 Parent(s): fd5a651

Upload 2 files

Browse files
Files changed (2) hide show
  1. requirements.txt +6 -3
  2. spotube_match.py +342 -0
requirements.txt CHANGED
@@ -1,4 +1,7 @@
1
- requests
2
  fastapi
3
- uvicorn[standard]
4
- python-dotenv
 
 
 
 
 
 
1
  fastapi
2
+ uvicorn
3
+ pydantic
4
+ ytmusicapi
5
+ requests
6
+ dnspython
7
+ python-dotenv
spotube_match.py ADDED
@@ -0,0 +1,342 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import socket
2
+ import sys
3
+ import os
4
+ import base64
5
+ import time
6
+ import asyncio
7
+ import httpx
8
+ import re
9
+ import traceback
10
+ import logging
11
+ import requests
12
+ from difflib import SequenceMatcher
13
+ from urllib.parse import urlparse, parse_qs
14
+ from bs4 import BeautifulSoup
15
+ from ytmusicapi import YTMusic
16
+ from fastapi import FastAPI, HTTPException
17
+ from pydantic import BaseModel
18
+ from typing import Optional, Dict, Any
19
+ from dotenv import load_dotenv
20
+
21
+ # Load environment variables
22
+ load_dotenv()
23
+
24
+ # --- DNS BYPASS PATCH START ---
25
+ BYPASS_DOMAINS = [
26
+ "youtube.com", "music.youtube.com", "googlevideo.com", "youtu.be"
27
+ ]
28
+
29
+ try:
30
+ import dns.resolver
31
+ _original_getaddrinfo = socket.getaddrinfo
32
+
33
+ def patched_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0):
34
+ if host and any(domain in host for domain in BYPASS_DOMAINS):
35
+ try:
36
+ res = dns.resolver.Resolver()
37
+ res.nameservers = ['8.8.8.8', '1.1.1.1']
38
+ answers = res.resolve(host, 'A')
39
+ ip_address = answers[0].to_text()
40
+ return [(socket.AF_INET, type, proto, '', (ip_address, port))]
41
+ except Exception:
42
+ return _original_getaddrinfo(host, port, family, type, proto, flags)
43
+ return _original_getaddrinfo(host, port, family, type, proto, flags)
44
+
45
+ socket.getaddrinfo = patched_getaddrinfo
46
+ print(f"[INIT] DNS Bypass installed for {len(BYPASS_DOMAINS)} domains.", file=sys.stderr)
47
+ except ImportError:
48
+ print("❌ CRITICAL: dnspython not installed. DNS Bypass failed.", file=sys.stderr)
49
+ # --- DNS BYPASS PATCH END ---
50
+
51
+ # Initialize FastAPI and YTMusic
52
+ app = FastAPI()
53
+ ytmusic = YTMusic()
54
+
55
+ # Configure logging
56
+ logging.basicConfig(level=logging.INFO)
57
+ logger = logging.getLogger(__name__)
58
+
59
+ # --- Pydantic Models ---
60
+
61
+ class SearchRequest(BaseModel):
62
+ query: str
63
+
64
+ class MatchRequest(BaseModel):
65
+ url: str
66
+
67
+ class MatchResponse(BaseModel):
68
+ url: str
69
+ filename: str
70
+ track_id: str
71
+
72
+ # ==========================================
73
+ # TIDAL AUTHENTICATION
74
+ # ==========================================
75
+ TIDAL_CLIENT_ID = os.getenv('TIDAL_CLIENT_ID')
76
+ TIDAL_CLIENT_SECRET = os.getenv('TIDAL_CLIENT_SECRET')
77
+ QOBUZ_APP_ID = os.getenv('QOBUZ_APP_ID')
78
+ QOBUZ_TOKEN = os.getenv('QOBUZ_TOKEN')
79
+
80
+ if not all([TIDAL_CLIENT_ID, TIDAL_CLIENT_SECRET]):
81
+ print("⚠️ Warning: TIDAL_CLIENT_ID or TIDAL_CLIENT_SECRET not found in environment. Tidal search will fail.", file=sys.stderr)
82
+ if not all([QOBUZ_APP_ID, QOBUZ_TOKEN]):
83
+ print("⚠️ Warning: QOBUZ_APP_ID or QOBUZ_TOKEN not found in environment. Qobuz lookup will fail.", file=sys.stderr)
84
+
85
+ cached_tidal_token = None
86
+ token_expiry_time = 0
87
+
88
+ def similar(a: str, b: str) -> float:
89
+ return SequenceMatcher(None, a.lower(), b.lower()).ratio()
90
+
91
+ def clean_title(title: str) -> str:
92
+ title = re.sub(r'(?i)\s*-\s*(single|ep)$', '', title)
93
+ title = re.sub(r'\s*\([^)]*\)', '', title)
94
+ return title.strip()
95
+
96
+ async def get_tidal_access_token():
97
+ global cached_tidal_token, token_expiry_time
98
+ if not TIDAL_CLIENT_ID or not TIDAL_CLIENT_SECRET:
99
+ return None
100
+
101
+ if cached_tidal_token and time.time() < (token_expiry_time - 60):
102
+ return cached_tidal_token
103
+
104
+ try:
105
+ b64_creds = base64.b64encode(f"{TIDAL_CLIENT_ID}:{TIDAL_CLIENT_SECRET}".encode()).decode()
106
+ async with httpx.AsyncClient() as client:
107
+ response = await client.post(
108
+ "https://auth.tidal.com/v1/oauth2/token",
109
+ headers={"Authorization": f"Basic {b64_creds}"},
110
+ data={"grant_type": "client_credentials"}
111
+ )
112
+ if response.status_code == 200:
113
+ data = response.json()
114
+ cached_tidal_token = data.get("access_token")
115
+ token_expiry_time = time.time() + data.get("expires_in", 3600)
116
+ return cached_tidal_token
117
+ except Exception as e:
118
+ print(f"Tidal Auth Error: {e}")
119
+ return None
120
+
121
+ def search_youtube_sync(search_query: str, target_title: str, target_artist: str):
122
+ """Searches YouTube and returns the BEST match and the SECOND BEST (alternative) match."""
123
+ try:
124
+ yt_results = ytmusic.search(search_query, filter="songs")
125
+ if not yt_results:
126
+ return None, None
127
+
128
+ scored_results = []
129
+ for result in yt_results[:5]:
130
+ yt_title = result.get('title', '')
131
+ yt_artists = " ".join([a.get('name', '') for a in result.get('artists', [])])
132
+ title_score = similar(clean_title(target_title), clean_title(yt_title))
133
+ artist_score = similar(target_artist, yt_artists)
134
+ total_score = (title_score * 0.6) + (artist_score * 0.4)
135
+ if title_score > 0.8:
136
+ total_score += 0.2
137
+ scored_results.append((total_score, result['videoId']))
138
+ scored_results.sort(key=lambda x: x[0], reverse=True)
139
+ best_id = scored_results[0][1] if scored_results else None
140
+ alt_id = scored_results[1][1] if len(scored_results) > 1 else None
141
+ return best_id, alt_id
142
+ except Exception as e:
143
+ print(f"YouTube search error: {e}")
144
+ return None, None
145
+
146
+ async def search_tidal_async(search_query: str, target_title: str, target_artist: str):
147
+ access_token = await get_tidal_access_token()
148
+ if not access_token:
149
+ return None
150
+ async with httpx.AsyncClient() as client:
151
+ try:
152
+ search_response = await client.get(
153
+ "https://api.tidal.com/v1/search",
154
+ headers={"Authorization": f"Bearer {access_token}"},
155
+ params={"query": search_query, "types": "TRACKS", "countryCode": "US", "limit": 5}
156
+ )
157
+ if search_response.status_code == 200:
158
+ tidal_data = search_response.json()
159
+ if "tracks" in tidal_data and tidal_data["tracks"]["items"]:
160
+ best_match_id = None
161
+ highest_score = 0.0
162
+ for track in tidal_data["tracks"]["items"]:
163
+ t_title = track.get("title", "")
164
+ t_artists = " ".join([a.get("name", "") for a in track.get("artists", [])])
165
+ title_score = similar(clean_title(target_title), clean_title(t_title))
166
+ artist_score = similar(target_artist, t_artists)
167
+ total_score = (title_score * 0.6) + (artist_score * 0.4)
168
+ if title_score > 0.8: total_score += 0.2
169
+ if total_score > highest_score:
170
+ highest_score = total_score
171
+ best_match_id = track["id"]
172
+ if highest_score < 0.4: return None
173
+ if best_match_id: return f"https://tidal.com/browse/track/{best_match_id}"
174
+ except Exception as e:
175
+ print(f"Tidal search error: {e}")
176
+ return None
177
+
178
+ # --- Match Helper Functions ---
179
+
180
+ def extract_amazon_track_id(url: str) -> Optional[str]:
181
+ if "music.amazon.com" not in url: return None
182
+ parsed_url = urlparse(url)
183
+ query_params = parse_qs(parsed_url.query)
184
+ if "trackAsin" in query_params: return query_params["trackAsin"][0]
185
+ path_parts = parsed_url.path.split('/')
186
+ if "tracks" in path_parts:
187
+ try:
188
+ idx = path_parts.index("tracks") + 1
189
+ if idx < len(path_parts): return path_parts[idx]
190
+ except (ValueError, IndexError): pass
191
+ return None
192
+
193
+ def get_song_link_info(url: str) -> Optional[Dict[str, Any]]:
194
+ api_base_url = "https://api.song.link/v1-alpha.1/links"
195
+ params = {"userCountry": "US"}
196
+ if "music.amazon.com" in url:
197
+ track_id = extract_amazon_track_id(url)
198
+ if track_id:
199
+ params["platform"] = "amazonMusic"
200
+ params["id"] = track_id
201
+ params["type"] = "song"
202
+ else: params["url"] = url
203
+ else: params["url"] = url
204
+ try:
205
+ response = requests.get(api_base_url, params=params, timeout=10)
206
+ response.raise_for_status()
207
+ return response.json()
208
+ except Exception as e:
209
+ logger.error(f"Error fetching from Song.link API: {e}")
210
+ return None
211
+
212
+ def extract_url_from_songlink(links_by_platform: dict, platform: str) -> Optional[str]:
213
+ if platform in links_by_platform and links_by_platform[platform].get("url"):
214
+ return links_by_platform[platform]["url"]
215
+ return None
216
+
217
+ # --- Endpoints ---
218
+
219
+ @app.get("/")
220
+ async def root():
221
+ return {"message": "Combined Music API is running. Use /convert, /match, or /searcht."}
222
+
223
+ @app.post("/searcht")
224
+ async def searcht(request: SearchRequest):
225
+ logger.info(f"search query: {request.query}")
226
+ search_results = ytmusic.search(request.query, filter="songs")
227
+ first_song = next((song for song in search_results if 'videoId' in song and song['videoId']), {}) if search_results else {}
228
+ return first_song
229
+
230
+ @app.post("/match", response_model=MatchResponse)
231
+ async def match(request: MatchRequest):
232
+ track_url = request.url
233
+ logger.info(f"Match endpoint: Processing URL: {track_url}")
234
+ track_info = get_song_link_info(track_url)
235
+ if not track_info:
236
+ raise HTTPException(status_code=404, detail="Could not fetch track info from Song.link API.")
237
+
238
+ entity_unique_id = track_info.get("entityUniqueId")
239
+ title, artist = None, None
240
+ if entity_unique_id and entity_unique_id in track_info.get("entitiesByUniqueId", {}):
241
+ ent = track_info["entitiesByUniqueId"][entity_unique_id]
242
+ title, artist = ent.get("title"), ent.get("artistName")
243
+ else:
244
+ for _, edata in track_info.get("entitiesByUniqueId", {}).items():
245
+ if edata.get("title") and edata.get("artistName"):
246
+ title, artist = edata.get("title"), edata.get("artistName")
247
+ break
248
+
249
+ if not title or not artist:
250
+ raise HTTPException(status_code=404, detail="Could not determine title and artist.")
251
+
252
+ youtube_url = extract_url_from_songlink(track_info.get("linksByPlatform", {}), "youtube")
253
+ if youtube_url:
254
+ video_id = None
255
+ if "v=" in youtube_url: video_id = youtube_url.split("v=")[1].split("&")[0]
256
+ elif "youtu.be/" in youtube_url: video_id = youtube_url.split("youtu.be/")[1].split("?")[0]
257
+ return MatchResponse(url=youtube_url, filename=f"{title} - {artist}", track_id=video_id)
258
+ else:
259
+ search_query = f'{title} {artist}'
260
+ search_results = ytmusic.search(search_query, filter="songs")
261
+ if search_results:
262
+ first = next((song for song in search_results if song.get('videoId')), None)
263
+ if first:
264
+ v_id = first["videoId"]
265
+ ym_url = f'https://music.youtube.com/watch?v={v_id}'
266
+ a_name = first['artists'][0]['name'] if first.get('artists') else artist
267
+ return MatchResponse(filename=f"{first.get('title', title)} - {a_name}", url=ym_url, track_id=v_id)
268
+ raise HTTPException(status_code=404, detail="No matching video found on YouTube Music.")
269
+
270
+ @app.get("/convert")
271
+ async def convert_url_to_all(url: str):
272
+ track_title, artist_clean = "", ""
273
+ if "music.apple.com" in url:
274
+ parsed_url = urlparse(url)
275
+ path_parts = [p for p in parsed_url.path.split('/') if p]
276
+ if not path_parts: raise HTTPException(status_code=400, detail="Invalid Apple Music URL")
277
+ country = path_parts[0] if len(path_parts[0]) == 2 else "us"
278
+ query_params = parse_qs(parsed_url.query)
279
+ lookup_id = query_params.get('i', [path_parts[-1]])[0]
280
+ async with httpx.AsyncClient() as client:
281
+ response = await client.get(f"https://itunes.apple.com/lookup?id={lookup_id}&country={country}")
282
+ if response.status_code == 200 and response.json().get("resultCount", 0) > 0:
283
+ result = response.json()["results"][0]
284
+ track_title = result.get("trackName", result.get("collectionName", ""))
285
+ artist_clean = result.get("artistName", "")
286
+ else: raise HTTPException(status_code=404, detail="Track not found on Apple Music")
287
+ elif "deezer.com" in url:
288
+ match_obj = re.search(r'track/(\d+)', url)
289
+ if not match_obj: raise HTTPException(status_code=400, detail="Invalid Deezer URL")
290
+ async with httpx.AsyncClient() as client:
291
+ response = await client.get(f"https://api.deezer.com/track/{match_obj.group(1)}")
292
+ data = response.json()
293
+ if "error" in data: raise HTTPException(status_code=404, detail="Track not found on Deezer")
294
+ track_title, artist_clean = data.get("title", ""), data.get("artist", {}).get("name", "")
295
+ elif "http://googleusercontent.com/spotify.com" in url or "open.spotify.com" in url:
296
+ headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}
297
+ async with httpx.AsyncClient(follow_redirects=True) as client:
298
+ response = await client.get(url, headers=headers)
299
+
300
+ if response.status_code != 200:
301
+ raise HTTPException(status_code=400, detail="Failed to fetch Spotify page")
302
+
303
+ soup = BeautifulSoup(response.text, 'html.parser')
304
+ og_title = soup.find("meta", property="og:title")
305
+ og_desc = soup.find("meta", property="og:description")
306
+
307
+ if not og_title or not og_desc:
308
+ raise HTTPException(status_code=404, detail="Could not extract Spotify metadata")
309
+
310
+ track_title = og_title["content"]
311
+ artist_raw = og_desc["content"].split("·")[0]
312
+ artist_clean = artist_raw.replace("Listen to ", "").replace(" on Spotify.", "").strip()
313
+ elif "qobuz.com" in url:
314
+ if not QOBUZ_APP_ID or not QOBUZ_TOKEN:
315
+ raise HTTPException(status_code=500, detail="Qobuz API credentials not configured.")
316
+ parsed_url = urlparse(url)
317
+ path_parts = [p for p in parsed_url.path.split('/') if p]
318
+ if not path_parts: raise HTTPException(status_code=400, detail="Invalid Qobuz URL")
319
+ q_id, is_track = path_parts[-1], "track/" in parsed_url.path
320
+ api_url = f"https://www.qobuz.com/api.json/0.2/{'track/get' if is_track else 'album/get'}"
321
+ headers = {"X-App-Id": QOBUZ_APP_ID, "X-User-Auth-Token": QOBUZ_TOKEN, "User-Agent": "Mozilla/5.0"}
322
+ async with httpx.AsyncClient(timeout=10.0, trust_env=False) as client:
323
+ response = await client.get(api_url, params={"track_id" if is_track else "album_id": q_id}, headers=headers)
324
+ data = response.json()
325
+ track_title = data.get("title", "")
326
+ artist_data = data.get("artist") or data.get("performer") or data.get("album", {}).get("artist")
327
+ artist_clean = artist_data.get("name", "") if isinstance(artist_data, dict) else (artist_data if isinstance(artist_data, str) else "")
328
+ if not track_title: raise HTTPException(status_code=404, detail="Qobuz track not found")
329
+
330
+ clean_search_title = clean_title(track_title)
331
+ search_query = f"{clean_search_title} {artist_clean}".strip()
332
+ yt_task = asyncio.to_thread(search_youtube_sync, search_query, track_title, artist_clean)
333
+ tidal_task = search_tidal_async(search_query, track_title, artist_clean)
334
+ (best_yt_id, alt_yt_id), tidal_url = await asyncio.gather(yt_task, tidal_task)
335
+
336
+ return {
337
+ "track_title": track_title, "artist": artist_clean, "search_query_used": search_query,
338
+ "source_url": url, "youtube_music_url": f"https://music.youtube.com/watch?v={best_yt_id}" if best_yt_id else None,
339
+ "youtube_video_url": f"https://www.youtube.com/watch?v={best_yt_id}" if best_yt_id else None,
340
+ "alternative_youtube_url": f"https://www.youtube.com/watch?v={alt_yt_id}" if alt_yt_id else None,
341
+ "tidal_url": tidal_url
342
+ }