| |
| """ |
| TIDAL Download API - Hugging Face Space |
| FastAPI endpoint to download TIDAL tracks in various qualities |
| """ |
|
|
| import os |
| import sys |
| import json |
| import time |
| import base64 |
| import tempfile |
| import requests |
| from fastapi import FastAPI, HTTPException, Query |
| from fastapi.responses import FileResponse |
| from xml.etree import ElementTree as ET |
|
|
| |
| TV_CLIENT_ID = "cgiF7TQuB97BUIu3" |
| TV_CLIENT_SECRET = "1nqpgx8uvBdZigrx4hUPDV2hOwgYAAAG5DYXOr6uNf8=" |
|
|
| |
| MOBILE_CLIENT_ID = "fX2JxdmntZWK0ixT" |
| MOBILE_CLIENT_SECRET = "1Nm5AfDAjxrgJFJbKNWLeAyKGVGmINuXPPLHVXAvxAg=" |
|
|
| |
| ACCESS_TOKEN = os.environ.get("TIDAL_ACCESS_TOKEN", "") |
| REFRESH_TOKEN = os.environ.get("TIDAL_REFRESH_TOKEN", "") |
| CURRENT_CLIENT_ID = TV_CLIENT_ID |
|
|
| app = FastAPI(title="TIDAL Download API", description="Download TIDAL tracks in LOSSLESS/HI_RES quality") |
|
|
| def refresh_access_token(): |
| """Refresh the access token using the refresh token""" |
| global ACCESS_TOKEN, REFRESH_TOKEN, CURRENT_CLIENT_ID |
| |
| print("🔄 Refreshing access token with TV session credentials...") |
| |
| response = requests.post( |
| "https://auth.tidal.com/v1/oauth2/token", |
| data={ |
| "grant_type": "refresh_token", |
| "refresh_token": REFRESH_TOKEN, |
| "client_id": TV_CLIENT_ID, |
| "client_secret": TV_CLIENT_SECRET |
| }, |
| headers={ |
| "Content-Type": "application/x-www-form-urlencoded", |
| "User-Agent": "TIDAL_ANDROID/1039 okhttp/3.14.9" |
| }, |
| timeout=15 |
| ) |
| |
| if response.status_code == 200: |
| data = response.json() |
| ACCESS_TOKEN = data.get('access_token') |
| new_refresh_token = data.get('refresh_token') |
| if new_refresh_token: |
| REFRESH_TOKEN = new_refresh_token |
| CURRENT_CLIENT_ID = TV_CLIENT_ID |
| print(f"✅ Access token refreshed") |
| return True |
| else: |
| print(f"❌ Failed to refresh token: {response.status_code}") |
| return False |
|
|
| def get_headers(): |
| """Get headers with current access token""" |
| return { |
| "X-Tidal-Token": CURRENT_CLIENT_ID, |
| "Authorization": f"Bearer {ACCESS_TOKEN}", |
| "User-Agent": "TIDAL_ANDROID/1039 okhttp/3.14.9" |
| } |
|
|
| def decrypt_security_token(security_token): |
| """Decrypt TIDAL security token to get AES key and nonce""" |
| security_token = base64.b64decode(security_token) |
| |
| |
| key = bytes([0x7A, 0x5C, 0x38, 0x5A, 0x7B, 0x25, 0x25, 0x49, |
| 0x43, 0x4B, 0x31, 0x71, 0x5E, 0x24, 0x76, 0x5D]) |
| |
| |
| nonce = security_token[:12] |
| ciphertext = security_token[12:-16] |
| |
| from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes |
| from cryptography.hazmat.backends import default_backend |
| |
| decryptor = Cipher(algorithms.AES(key), modes.GCM(nonce), backend=default_backend()).decryptor() |
| decrypted = decryptor.update(ciphertext) |
| |
| |
| audio_key = decrypted[:16] |
| audio_nonce = decrypted[16:28] |
| |
| return audio_key, audio_nonce |
|
|
| def decrypt_file(input_path, output_path, key, nonce): |
| """Decrypt a file using AES-GCM""" |
| from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes |
| from cryptography.hazmat.backends import default_backend |
| |
| with open(input_path, 'rb') as infile: |
| encrypted_data = infile.read() |
| |
| |
| encrypted_data = encrypted_data[:-16] |
| |
| decryptor = Cipher(algorithms.AES(key), modes.GCM(nonce), backend=default_backend()).decryptor() |
| decrypted_data = decryptor.update(encrypted_data) |
| |
| with open(output_path, 'wb') as outfile: |
| outfile.write(decrypted_data) |
|
|
| def parse_dash_manifest(manifest_data): |
| """Parse DASH manifest XML and extract segment URLs and audio format""" |
| manifest_str = manifest_data.decode('utf-8') |
|
|
| base_url = "" |
| segments = [] |
| media_template = "" |
| start_number = 1 |
| audio_format = "flac" |
| codecs_detected = "" |
|
|
| try: |
| root = ET.fromstring(manifest_str) |
|
|
| for elem in root.iter(): |
| tag = elem.tag |
| if '}' in tag: |
| tag = tag.split('}')[1] |
|
|
| if tag == 'BaseURL' or tag.lower() == 'baseurl': |
| base_url = elem.text |
|
|
| if tag == 'Representation' or tag.lower() == 'representation': |
| codec = elem.get('codecs', '').upper() |
| codecs_detected = codec |
| if codec: |
| print(f" Found Representation with codecs: '{codec}'") |
| if 'MP4A' in codec or 'AAC' in codec: |
| audio_format = "m4a" |
| elif 'FLAC' in codec: |
| audio_format = "flac" |
| elif 'EC3' in codec or 'EAC3' in codec: |
| audio_format = "eac3" |
| print(f" Detected audio format: {audio_format}") |
|
|
| if tag == 'SegmentTemplate' or tag.lower() == 'segmenttemplate': |
| media_template = elem.get('media', '') |
| initialization = elem.get('initialization', '') |
| start_number = int(elem.get('startNumber', '1')) |
| |
| if initialization: |
| segments.append(initialization) |
| print(f"✅ Added initialization segment: {initialization}") |
|
|
| for timeline in elem.iter(): |
| tl_tag = timeline.tag |
| if '}' in tl_tag: |
| tl_tag = tl_tag.split('}')[1] |
|
|
| if tl_tag == 'SegmentTimeline' or tl_tag.lower() == 'segmenttimeline': |
| total_segments = 0 |
| for s_elem in timeline.iter(): |
| s_tag = s_elem.tag |
| if '}' in s_tag: |
| s_tag = s_tag.split('}')[1] |
|
|
| if s_tag == 'S' or s_tag.lower() == 's': |
| repeat = int(s_elem.get('r', '0')) |
| total_segments += repeat + 1 |
|
|
| if media_template and total_segments > 0: |
| for i in range(start_number, start_number + total_segments): |
| segment_url = media_template.replace('$Number$', str(i)) |
| segments.append(segment_url) |
| print(f"✅ Generated {len(segments)} segment URLs from template") |
| except Exception as e: |
| print(f"⚠️ Error parsing DASH manifest: {e}") |
|
|
| if not segments: |
| print("⚠️ Trying regex parsing...") |
|
|
| init_match = re.search(r'initialization="([^"]+)"', manifest_str) |
| if init_match: |
| initialization = init_match.group(1) |
| segments.append(initialization) |
| print(f"✅ Added initialization segment with regex: {initialization}") |
|
|
| template_match = re.search(r'media="([^"]+\$Number\$[^"]+)"', manifest_str) |
| if template_match: |
| media_template = template_match.group(1) |
|
|
| start_num_match = re.search(r'startNumber="(\d+)"', manifest_str) |
| if start_num_match: |
| start_number = int(start_num_match.group(1)) |
|
|
| timeline_matches = re.findall(r'<S[^>]*d="(\d+)"[^>]*r="(\d+)"', manifest_str) |
| total_segments = 0 |
| for d, r in timeline_matches: |
| total_segments += int(r) + 1 |
|
|
| single_segments = re.findall(r'<S[^>]*d="(\d+)"[^>]*[^r][^>]*>', manifest_str) |
| total_segments += len(single_segments) |
|
|
| if total_segments > 0: |
| for i in range(start_number, start_number + total_segments): |
| segment_url = media_template.replace('$Number$', str(i)) |
| segments.append(segment_url) |
| print(f"✅ Generated {len(segments)} segment URLs with regex") |
|
|
| codecs_match = re.search(r'codecs="([^"]+)"', manifest_str) |
| if codecs_match: |
| codec = codecs_match.group(1).upper() |
| codecs_detected = codec |
| if 'MP4A' in codec or 'AAC' in codec: |
| audio_format = "m4a" |
| elif 'FLAC' in codec: |
| audio_format = "flac" |
| elif 'EC3' in codec or 'EAC3' in codec: |
| audio_format = "eac3" |
|
|
| mime_match = re.search(r'mimeType="([^"]+)"', manifest_str) |
| if mime_match: |
| mime = mime_match.group(1).lower() |
| if 'mp4' in mime or 'aac' in mime or 'm4a' in mime: |
| audio_format = "m4a" |
| elif 'flac' in mime: |
| audio_format = "flac" |
|
|
| base_match = re.search(r'<[^>]*BaseURL[^>]*>([^<]+)</', manifest_str) |
| if base_match: |
| base_url = base_match.group(1) |
|
|
| print(f" Final detected codec: {codecs_detected}, format: {audio_format}") |
| return base_url, segments, audio_format |
|
|
| def parse_bt_manifest(manifest_data): |
| """Parse BT manifest (JSON format)""" |
| import json |
| |
| try: |
| manifest = json.loads(manifest_data.decode('utf-8')) |
| urls = manifest.get('urls', []) |
| if urls: |
| return "", urls |
| else: |
| print("⚠️ BT manifest has no URLs") |
| return "", [] |
| except json.JSONDecodeError as e: |
| print(f"❌ Failed to parse BT manifest: {e}") |
| return "", [] |
|
|
| @app.get("/download/{track_id}") |
| async def download_track(track_id: int, quality: str = "LOSSLESS"): |
| """Download a TIDAL track in specified quality. |
| |
| Returns JSON with track info and download URL. |
| - HIGH/LOW: Returns direct TIDAL CDN URL (time-limited) |
| - LOSSLESS/HI_RES_LOSSLESS: Downloads, decrypts, merges, returns mounted file URL |
| """ |
| try: |
| print(f"🔍 Download request: track_id={track_id}, quality={quality}") |
| |
| if not ACCESS_TOKEN: |
| raise HTTPException(status_code=401, detail="Access token not set") |
| |
| headers = get_headers() |
| |
| response = requests.get( |
| f"https://api.tidal.com/v1/tracks/{track_id}/playbackinfopostpaywall/v4", |
| params={ |
| "countryCode": "HK", |
| "playbackmode": "STREAM", |
| "assetpresentation": "FULL", |
| "audioquality": quality, |
| "prefetch": "false" |
| }, |
| headers=headers, |
| timeout=15 |
| ) |
| |
| if response.status_code == 401: |
| print("🔄 Token expired, refreshing...") |
| if refresh_access_token(): |
| headers = get_headers() |
| response = requests.get( |
| f"https://api.tidal.com/v1/tracks/{track_id}/playbackinfopostpaywall/v4", |
| params={ |
| "countryCode": "HK", |
| "playbackmode": "STREAM", |
| "assetpresentation": "FULL", |
| "audioquality": quality, |
| "prefetch": "false" |
| }, |
| headers=headers, |
| timeout=15 |
| ) |
| |
| if response.status_code != 200: |
| raise HTTPException(status_code=response.status_code, detail="Failed to get playback info") |
| |
| data = response.json() |
| audio_quality = data.get('audioQuality') |
| manifest_mime = data.get('manifestMimeType', '') |
| manifest_data = base64.b64decode(data.get('manifest', '')) |
| |
| print(f"✅ Got playback data: audioQuality={audio_quality}, manifestType={manifest_mime}") |
| |
| if audio_quality != quality: |
| print(f"⚠️ Requested {quality} but got {audio_quality}") |
| |
| track_info = requests.get( |
| f"https://api.tidal.com/v1/tracks/{track_id}", |
| params={"countryCode": "HK"}, |
| headers=headers, |
| timeout=10 |
| ).json() |
| |
| title = track_info.get('title', f"track_{track_id}") |
| artist = track_info.get('artist', {}).get('name', 'Unknown') |
| duration = track_info.get('duration', 0) |
| album = track_info.get('album', {}).get('title', '') |
| album_artist = track_info.get('album', {}).get('artist', {}).get('name', '') |
| track_number = track_info.get('trackNumber', 1) |
| isrc = track_info.get('isrc', '') |
| |
| |
| if quality in ['HIGH', 'LOW']: |
| print("📦 Parsing manifest for direct URL...") |
| base_url, segments, audio_format = parse_dash_manifest(manifest_data) |
| |
| if segments: |
| |
| first_segment = segments[0] |
| if first_segment.startswith('http://') or first_segment.startswith('https://'): |
| direct_url = first_segment |
| elif base_url: |
| direct_url = base_url + first_segment |
| else: |
| direct_url = first_segment |
| |
| print(f"✅ Direct URL found for {quality}: {direct_url[:100]}...") |
| |
| return { |
| "success": True, |
| "track": { |
| "id": track_id, |
| "title": title, |
| "artist": artist, |
| "album": album, |
| "albumArtist": album_artist, |
| "trackNumber": track_number, |
| "duration": duration, |
| "isrc": isrc, |
| "quality": audio_quality, |
| "format": audio_format |
| }, |
| "downloadUrl": direct_url, |
| "message": f"Direct TIDAL CDN URL for {quality} quality (time-limited)" |
| } |
| else: |
| raise HTTPException(status_code=500, detail="No segments found in manifest") |
| |
| |
| is_lossless = 'dash+xml' in manifest_mime |
| |
| if is_lossless: |
| security_token = data.get('securityToken') |
| print(f" securityToken present: {bool(security_token)}, length: {len(security_token) if security_token else 0}") |
| key = None |
| nonce = None |
| |
| if security_token: |
| print("🔐 Decrypting security token...") |
| key, nonce = decrypt_security_token(security_token) |
| |
| download_dir = "./downloads" |
| os.makedirs(download_dir, exist_ok=True) |
| tmpdir = tempfile.mkdtemp(prefix="tidal-segments-") |
| temp_files = [] |
| |
| try: |
| print("📦 Parsing DASH manifest...") |
| base_url, segments, audio_format = parse_dash_manifest(manifest_data) |
| print(f"✅ Found {len(segments)} segments, format: {audio_format}") |
| |
| print("⬇️ Downloading segments...") |
| |
| for i, seg in enumerate(segments): |
| if seg.startswith('http://') or seg.startswith('https://'): |
| seg_url = seg |
| elif base_url: |
| seg_url = base_url + seg |
| else: |
| seg_url = seg |
| seg_path = os.path.join(tmpdir, f"segment_{i:04d}.mp4") |
| temp_files.append(seg_path) |
| |
| max_retries = 3 |
| success = False |
| |
| for attempt in range(max_retries): |
| try: |
| resp = requests.get(seg_url, headers=headers, timeout=60) |
| if resp.status_code == 200: |
| with open(seg_path, 'wb') as f: |
| f.write(resp.content) |
| success = True |
| break |
| else: |
| print(f"⚠️ Segment {i+1} attempt {attempt+1} failed: {resp.status_code}") |
| except Exception as e: |
| print(f"⚠️ Segment {i+1} attempt {attempt+1} failed: {e}") |
| |
| if not success: |
| raise HTTPException(status_code=500, detail=f"Failed to download segment {i+1}") |
| |
| print(f" Segment {i+1}/{len(segments)} downloaded") |
| |
| print(f"🔄 Merging {len(segments)} segments into {audio_format}...") |
| print(f" Temp files count: {len(temp_files)}") |
| print(f" Temp directory: {tmpdir}") |
| |
| |
| if temp_files: |
| with open(temp_files[0], 'rb') as f: |
| header = f.read(16).hex() |
| print(f" First segment header (hex): {header}") |
| if header.startswith('0000001866747970'): |
| print(" ✅ First segment is MP4 (ftyp header)") |
| elif header.startswith('0000010c6d6f6f66'): |
| print(" ⚠️ First segment is fragmented MP4 (moof header)") |
| else: |
| print(f" ❓ Unknown header: {header[:20]}...") |
| |
| output_filename = f"{artist} - {title}.{audio_format}" |
| output_path = os.path.join(download_dir, output_filename) |
| print(f" Output path: {output_path}") |
| |
| if audio_format == "flac": |
| merged_path = os.path.join(tmpdir, "merged.mp4") |
| print(f" Merged path: {merged_path}") |
| |
| with open(merged_path, 'wb') as out: |
| print(" Starting merge...") |
| for idx, temp_file in enumerate(temp_files): |
| with open(temp_file, 'rb') as f: |
| out.write(f.read()) |
| print(" Merge complete") |
| |
| print(" Starting FFmpeg conversion...") |
| try: |
| import subprocess |
| |
| |
| probe_result = subprocess.run([ |
| 'ffprobe', '-v', 'error', '-show_streams', '-of', 'json', merged_path |
| ], capture_output=True, text=True, timeout=30) |
| print(f"ffprobe return code: {probe_result.returncode}") |
| if probe_result.returncode == 0: |
| print(f"✅ Merged file is valid MP4") |
| print(f"ffprobe output: {probe_result.stdout[:500]}") |
| else: |
| print(f"❌ Merged file analysis failed: {probe_result.stderr}") |
| |
| |
| result = subprocess.run([ |
| 'ffmpeg', '-y', '-i', merged_path, |
| '-c', 'copy', |
| '-loglevel', 'error', |
| output_path |
| ], capture_output=True, text=True, timeout=120) |
| |
| print(f"FFmpeg return code: {result.returncode}") |
| print(f"FFmpeg stdout: {result.stdout[:2000] if result.stdout else 'None'}") |
| print(f"FFmpeg stderr: {result.stderr[:2000] if result.stderr else 'None'}") |
| |
| if result.returncode != 0: |
| print(f"⚠️ FFmpeg conversion failed with code {result.returncode}: {result.stderr}") |
| raise HTTPException(status_code=500, detail=f"FFmpeg conversion failed: {result.stderr}") |
| |
| print(f"✅ Converted to FLAC: {output_path}") |
| except FileNotFoundError: |
| print("⚠️ FFmpeg not found, using raw merge") |
| with open(output_path, 'wb') as out: |
| for temp_file in temp_files: |
| with open(temp_file, 'rb') as f: |
| out.write(f.read()) |
| except subprocess.TimeoutExpired: |
| print("⚠️ FFmpeg timeout") |
| raise HTTPException(status_code=500, detail="FFmpeg timeout") |
| except Exception as e: |
| print(f"⚠️ Exception during FFmpeg: {e}") |
| raise |
| else: |
| with open(output_path, 'wb') as out: |
| for temp_file in temp_files: |
| with open(temp_file, 'rb') as f: |
| out.write(f.read()) |
| |
| print(f"✅ Download complete: {output_path}") |
| |
| print(" Cleaning up temp files...") |
| import shutil |
| shutil.rmtree(tmpdir, ignore_errors=True) |
| print(" Cleanup complete") |
| |
| file_size = os.path.getsize(output_path) |
| |
| return { |
| "success": True, |
| "track": { |
| "id": track_id, |
| "title": title, |
| "artist": artist, |
| "album": album, |
| "albumArtist": album_artist, |
| "trackNumber": track_number, |
| "duration": duration, |
| "isrc": isrc, |
| "quality": audio_quality, |
| "fileSize": file_size, |
| "format": audio_format |
| }, |
| "downloadUrl": f"/download_file/{output_filename}", |
| "message": "File downloaded and ready for download" |
| } |
| finally: |
| import shutil |
| shutil.rmtree(tmpdir, ignore_errors=True) |
| |
| else: |
| print("📦 Parsing BT manifest...") |
| base_url, segments = parse_bt_manifest(manifest_data) |
| print(f"✅ Found {len(segments)} URLs") |
| |
| if not segments: |
| raise HTTPException(status_code=500, detail="No URLs found in BT manifest") |
| |
| download_url = segments[0] if segments else None |
| |
| return { |
| "success": True, |
| "track": { |
| "id": track_id, |
| "title": title, |
| "artist": artist, |
| "album": album, |
| "albumArtist": album_artist, |
| "trackNumber": track_number, |
| "duration": duration, |
| "isrc": isrc, |
| "quality": audio_quality, |
| "format": "m4a" |
| }, |
| "downloadUrl": download_url, |
| "message": "Direct TIDAL CDN URL (time-limited)" |
| } |
| |
| except HTTPException: |
| raise |
| except Exception as e: |
| print(f"❌ Unexpected error: {type(e).__name__}: {str(e)}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.get("/download_file/{filename}") |
| async def download_file(filename: str): |
| """Serve downloaded file for LOSSLESS/HI_RES tracks""" |
| try: |
| file_path = os.path.join("./downloads", filename) |
| |
| if not os.path.exists(file_path): |
| raise HTTPException(status_code=404, detail="File not found") |
| |
| ext = os.path.splitext(filename)[1].lower() |
| media_type = "audio/flac" if ext == ".flac" else "audio/mp4" |
| |
| return FileResponse( |
| file_path, |
| media_type=media_type, |
| filename=filename |
| ) |
| except HTTPException: |
| raise |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.get("/search") |
| async def search_tracks(query: str = Query(..., description="Search query"), limit: int = 5): |
| """Search for TIDAL tracks""" |
| try: |
| headers = get_headers() |
| response = requests.get( |
| "https://api.tidal.com/v1/search", |
| params={ |
| "query": query, |
| "countryCode": "HK", |
| "limit": limit, |
| "types": "TRACKS" |
| }, |
| headers=headers, |
| timeout=15 |
| ) |
| |
| if response.status_code == 401: |
| if refresh_access_token(): |
| headers = get_headers() |
| response = requests.get( |
| "https://api.tidal.com/v1/search", |
| params={ |
| "query": query, |
| "countryCode": "HK", |
| "limit": limit, |
| "types": "TRACKS" |
| }, |
| headers=headers, |
| timeout=15 |
| ) |
| |
| if response.status_code != 200: |
| raise HTTPException(status_code=response.status_code, detail="Failed to search") |
| |
| data = response.json() |
| tracks = data.get('tracks', {}).get('items', []) |
| |
| results = [] |
| for track in tracks: |
| results.append({ |
| "id": track.get('id'), |
| "title": track.get('title'), |
| "artist": track.get('artist', {}).get('name', 'Unknown'), |
| "quality": track.get('audioQuality', 'Unknown'), |
| "duration": track.get('duration') |
| }) |
| |
| return {"results": results} |
| |
| except HTTPException: |
| raise |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.get("/track/{track_id}") |
| async def get_track_info(track_id: int): |
| """Get detailed track information""" |
| try: |
| headers = get_headers() |
| |
| response = requests.get( |
| f"https://api.tidal.com/v1/tracks/{track_id}", |
| params={"countryCode": "HK"}, |
| headers=headers, |
| timeout=15 |
| ) |
| |
| if response.status_code == 401: |
| if refresh_access_token(): |
| headers = get_headers() |
| response = requests.get( |
| f"https://api.tidal.com/v1/tracks/{track_id}", |
| params={"countryCode": "HK"}, |
| headers=headers, |
| timeout=15 |
| ) |
| |
| if response.status_code != 200: |
| raise HTTPException(status_code=response.status_code, detail="Failed to get track info") |
| |
| return response.json() |
| |
| except HTTPException: |
| raise |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.get("/") |
| async def root(): |
| """API endpoint listing""" |
| return { |
| "message": "TIDAL Download API", |
| "endpoints": { |
| "/download/{track_id}": "Download track (query: quality=LOSSLESS/HIGH/HI_RES_LOSSLESS)", |
| "/search": "Search tracks (query: query, limit)", |
| "/track/{track_id}": "Get track information" |
| }, |
| "usage": "GET /download/5267743?quality=LOSSLESS" |
| } |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| port = int(os.environ.get("PORT", 7860)) |
| uvicorn.run(app, host="0.0.0.0", port=port) |
|
|