import socket import sys import os import base64 import time import asyncio import httpx import re import traceback import logging import requests from difflib import SequenceMatcher from urllib.parse import urlparse, parse_qs from bs4 import BeautifulSoup from ytmusicapi import YTMusic from fastapi import FastAPI, HTTPException from pydantic import BaseModel from typing import Optional, Dict, Any from dotenv import load_dotenv # Load environment variables load_dotenv() # --- DNS BYPASS PATCH START --- BYPASS_DOMAINS = [ "youtube.com", "music.youtube.com", "googlevideo.com", "youtu.be" ] try: import dns.resolver _original_getaddrinfo = socket.getaddrinfo def patched_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0): if host and any(domain in host for domain in BYPASS_DOMAINS): try: res = dns.resolver.Resolver() res.nameservers = ['8.8.8.8', '1.1.1.1'] answers = res.resolve(host, 'A') ip_address = answers[0].to_text() return [(socket.AF_INET, type, proto, '', (ip_address, port))] except Exception: return _original_getaddrinfo(host, port, family, type, proto, flags) return _original_getaddrinfo(host, port, family, type, proto, flags) socket.getaddrinfo = patched_getaddrinfo print(f"[INIT] DNS Bypass installed for {len(BYPASS_DOMAINS)} domains.", file=sys.stderr) except ImportError: print("❌ CRITICAL: dnspython not installed. DNS Bypass failed.", file=sys.stderr) # --- DNS BYPASS PATCH END --- # Initialize FastAPI and YTMusic app = FastAPI() ytmusic = YTMusic() # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # --- Pydantic Models --- class SearchRequest(BaseModel): query: str class MatchRequest(BaseModel): url: str class MatchResponse(BaseModel): url: str filename: str track_id: str # ========================================== # TIDAL AUTHENTICATION # ========================================== TIDAL_CLIENT_ID = os.getenv('TIDAL_CLIENT_ID') TIDAL_CLIENT_SECRET = os.getenv('TIDAL_CLIENT_SECRET') QOBUZ_APP_ID = os.getenv('QOBUZ_APP_ID') QOBUZ_TOKEN = os.getenv('QOBUZ_TOKEN') if not all([TIDAL_CLIENT_ID, TIDAL_CLIENT_SECRET]): print("⚠️ Warning: TIDAL_CLIENT_ID or TIDAL_CLIENT_SECRET not found in environment. Tidal search will fail.", file=sys.stderr) if not all([QOBUZ_APP_ID, QOBUZ_TOKEN]): print("⚠️ Warning: QOBUZ_APP_ID or QOBUZ_TOKEN not found in environment. Qobuz lookup will fail.", file=sys.stderr) cached_tidal_token = None token_expiry_time = 0 def similar(a: str, b: str) -> float: return SequenceMatcher(None, a.lower(), b.lower()).ratio() def clean_title(title: str) -> str: title = re.sub(r'(?i)\s*-\s*(single|ep)$', '', title) title = re.sub(r'\s*\([^)]*\)', '', title) return title.strip() async def get_tidal_access_token(): global cached_tidal_token, token_expiry_time if not TIDAL_CLIENT_ID or not TIDAL_CLIENT_SECRET: return None if cached_tidal_token and time.time() < (token_expiry_time - 60): return cached_tidal_token try: b64_creds = base64.b64encode(f"{TIDAL_CLIENT_ID}:{TIDAL_CLIENT_SECRET}".encode()).decode() async with httpx.AsyncClient() as client: response = await client.post( "https://auth.tidal.com/v1/oauth2/token", headers={"Authorization": f"Basic {b64_creds}"}, data={"grant_type": "client_credentials"} ) if response.status_code == 200: data = response.json() cached_tidal_token = data.get("access_token") token_expiry_time = time.time() + data.get("expires_in", 3600) return cached_tidal_token except Exception as e: print(f"Tidal Auth Error: {e}") return None def search_youtube_sync(search_query: str, target_title: str, target_artist: str): """Searches YouTube and returns the BEST match and the SECOND BEST (alternative) match.""" try: yt_results = ytmusic.search(search_query, filter="songs") if not yt_results: return None, None scored_results = [] for result in yt_results[:5]: yt_title = result.get('title', '') yt_artists = " ".join([a.get('name', '') for a in result.get('artists', [])]) title_score = similar(clean_title(target_title), clean_title(yt_title)) artist_score = similar(target_artist, yt_artists) total_score = (title_score * 0.6) + (artist_score * 0.4) if title_score > 0.8: total_score += 0.2 scored_results.append((total_score, result['videoId'])) scored_results.sort(key=lambda x: x[0], reverse=True) best_id = scored_results[0][1] if scored_results else None alt_id = scored_results[1][1] if len(scored_results) > 1 else None return best_id, alt_id except Exception as e: print(f"YouTube search error: {e}") return None, None async def search_tidal_async(search_query: str, target_title: str, target_artist: str): access_token = await get_tidal_access_token() if not access_token: return None async with httpx.AsyncClient() as client: try: search_response = await client.get( "https://api.tidal.com/v1/search", headers={"Authorization": f"Bearer {access_token}"}, params={"query": search_query, "types": "TRACKS", "countryCode": "US", "limit": 5} ) if search_response.status_code == 200: tidal_data = search_response.json() if "tracks" in tidal_data and tidal_data["tracks"]["items"]: best_match_id = None highest_score = 0.0 for track in tidal_data["tracks"]["items"]: t_title = track.get("title", "") t_artists = " ".join([a.get("name", "") for a in track.get("artists", [])]) title_score = similar(clean_title(target_title), clean_title(t_title)) artist_score = similar(target_artist, t_artists) total_score = (title_score * 0.6) + (artist_score * 0.4) if title_score > 0.8: total_score += 0.2 if total_score > highest_score: highest_score = total_score best_match_id = track["id"] if highest_score < 0.4: return None if best_match_id: return f"https://tidal.com/browse/track/{best_match_id}" except Exception as e: print(f"Tidal search error: {e}") return None # --- Match Helper Functions --- def extract_amazon_track_id(url: str) -> Optional[str]: if "music.amazon.com" not in url: return None parsed_url = urlparse(url) query_params = parse_qs(parsed_url.query) if "trackAsin" in query_params: return query_params["trackAsin"][0] path_parts = parsed_url.path.split('/') if "tracks" in path_parts: try: idx = path_parts.index("tracks") + 1 if idx < len(path_parts): return path_parts[idx] except (ValueError, IndexError): pass return None def get_song_link_info(url: str) -> Optional[Dict[str, Any]]: api_base_url = "https://api.song.link/v1-alpha.1/links" params = {"userCountry": "US"} if "music.amazon.com" in url: track_id = extract_amazon_track_id(url) if track_id: params["platform"] = "amazonMusic" params["id"] = track_id params["type"] = "song" else: params["url"] = url else: params["url"] = url try: response = requests.get(api_base_url, params=params, timeout=10) response.raise_for_status() return response.json() except Exception as e: logger.error(f"Error fetching from Song.link API: {e}") return None def extract_url_from_songlink(links_by_platform: dict, platform: str) -> Optional[str]: if platform in links_by_platform and links_by_platform[platform].get("url"): return links_by_platform[platform]["url"] return None # --- Endpoints --- @app.get("/") async def root(): return {"message": "Combined Music API is running. Use /convert, /match, or /searcht."} @app.post("/searcht") async def searcht(request: SearchRequest): logger.info(f"search query: {request.query}") search_results = ytmusic.search(request.query, filter="songs") first_song = next((song for song in search_results if 'videoId' in song and song['videoId']), {}) if search_results else {} return first_song @app.post("/match", response_model=MatchResponse) async def match(request: MatchRequest): track_url = request.url logger.info(f"Match endpoint: Processing URL: {track_url}") track_info = get_song_link_info(track_url) if not track_info: raise HTTPException(status_code=404, detail="Could not fetch track info from Song.link API.") entity_unique_id = track_info.get("entityUniqueId") title, artist = None, None if entity_unique_id and entity_unique_id in track_info.get("entitiesByUniqueId", {}): ent = track_info["entitiesByUniqueId"][entity_unique_id] title, artist = ent.get("title"), ent.get("artistName") else: for _, edata in track_info.get("entitiesByUniqueId", {}).items(): if edata.get("title") and edata.get("artistName"): title, artist = edata.get("title"), edata.get("artistName") break if not title or not artist: raise HTTPException(status_code=404, detail="Could not determine title and artist.") youtube_url = extract_url_from_songlink(track_info.get("linksByPlatform", {}), "youtube") if youtube_url: video_id = None if "v=" in youtube_url: video_id = youtube_url.split("v=")[1].split("&")[0] elif "youtu.be/" in youtube_url: video_id = youtube_url.split("youtu.be/")[1].split("?")[0] return MatchResponse(url=youtube_url, filename=f"{title} - {artist}", track_id=video_id) else: search_query = f'{title} {artist}' search_results = ytmusic.search(search_query, filter="songs") if search_results: first = next((song for song in search_results if song.get('videoId')), None) if first: v_id = first["videoId"] ym_url = f'https://music.youtube.com/watch?v={v_id}' a_name = first['artists'][0]['name'] if first.get('artists') else artist return MatchResponse(filename=f"{first.get('title', title)} - {a_name}", url=ym_url, track_id=v_id) raise HTTPException(status_code=404, detail="No matching video found on YouTube Music.") @app.get("/convert") async def convert_url_to_all(url: str): track_title, artist_clean = "", "" if "music.apple.com" in url: parsed_url = urlparse(url) path_parts = [p for p in parsed_url.path.split('/') if p] if not path_parts: raise HTTPException(status_code=400, detail="Invalid Apple Music URL") country = path_parts[0] if len(path_parts[0]) == 2 else "us" query_params = parse_qs(parsed_url.query) lookup_id = query_params.get('i', [path_parts[-1]])[0] async with httpx.AsyncClient() as client: response = await client.get(f"https://itunes.apple.com/lookup?id={lookup_id}&country={country}") if response.status_code == 200 and response.json().get("resultCount", 0) > 0: result = response.json()["results"][0] track_title = result.get("trackName", result.get("collectionName", "")) artist_clean = result.get("artistName", "") else: raise HTTPException(status_code=404, detail="Track not found on Apple Music") elif "deezer.com" in url: match_obj = re.search(r'track/(\d+)', url) if not match_obj: raise HTTPException(status_code=400, detail="Invalid Deezer URL") async with httpx.AsyncClient() as client: response = await client.get(f"https://api.deezer.com/track/{match_obj.group(1)}") data = response.json() if "error" in data: raise HTTPException(status_code=404, detail="Track not found on Deezer") track_title, artist_clean = data.get("title", ""), data.get("artist", {}).get("name", "") elif "http://googleusercontent.com/spotify.com" in url or "open.spotify.com" in url: headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"} async with httpx.AsyncClient(follow_redirects=True) as client: response = await client.get(url, headers=headers) if response.status_code != 200: raise HTTPException(status_code=400, detail="Failed to fetch Spotify page") soup = BeautifulSoup(response.text, 'html.parser') og_title = soup.find("meta", property="og:title") og_desc = soup.find("meta", property="og:description") if not og_title or not og_desc: raise HTTPException(status_code=404, detail="Could not extract Spotify metadata") track_title = og_title["content"] artist_raw = og_desc["content"].split("·")[0] artist_clean = artist_raw.replace("Listen to ", "").replace(" on Spotify.", "").strip() elif "qobuz.com" in url: if not QOBUZ_APP_ID or not QOBUZ_TOKEN: raise HTTPException(status_code=500, detail="Qobuz API credentials not configured.") parsed_url = urlparse(url) path_parts = [p for p in parsed_url.path.split('/') if p] if not path_parts: raise HTTPException(status_code=400, detail="Invalid Qobuz URL") q_id, is_track = path_parts[-1], "track/" in parsed_url.path api_url = f"https://www.qobuz.com/api.json/0.2/{'track/get' if is_track else 'album/get'}" headers = {"X-App-Id": QOBUZ_APP_ID, "X-User-Auth-Token": QOBUZ_TOKEN, "User-Agent": "Mozilla/5.0"} async with httpx.AsyncClient(timeout=10.0, trust_env=False) as client: response = await client.get(api_url, params={"track_id" if is_track else "album_id": q_id}, headers=headers) data = response.json() track_title = data.get("title", "") artist_data = data.get("artist") or data.get("performer") or data.get("album", {}).get("artist") artist_clean = artist_data.get("name", "") if isinstance(artist_data, dict) else (artist_data if isinstance(artist_data, str) else "") if not track_title: raise HTTPException(status_code=404, detail="Qobuz track not found") clean_search_title = clean_title(track_title) search_query = f"{clean_search_title} {artist_clean}".strip() yt_task = asyncio.to_thread(search_youtube_sync, search_query, track_title, artist_clean) tidal_task = search_tidal_async(search_query, track_title, artist_clean) (best_yt_id, alt_yt_id), tidal_url = await asyncio.gather(yt_task, tidal_task) return { "track_title": track_title, "artist": artist_clean, "search_query_used": search_query, "source_url": url, "youtube_music_url": f"https://music.youtube.com/watch?v={best_yt_id}" if best_yt_id else None, "youtube_video_url": f"https://www.youtube.com/watch?v={best_yt_id}" if best_yt_id else None, "alternative_youtube_url": f"https://www.youtube.com/watch?v={alt_yt_id}" if alt_yt_id else None, "tidal_url": tidal_url }