#!/usr/bin/env python3 """ TIDAL Download API - Hugging Face Space FastAPI endpoint to download TIDAL tracks in various qualities """ import os import sys import json import time import base64 import tempfile import requests from fastapi import FastAPI, HTTPException, Query from fastapi.responses import FileResponse from xml.etree import ElementTree as ET # TV session credentials (used by OrpheusDL for lossless access) TV_CLIENT_ID = "cgiF7TQuB97BUIu3" TV_CLIENT_SECRET = "1nqpgx8uvBdZigrx4hUPDV2hOwgYAAAG5DYXOr6uNf8=" # Mobile session credentials MOBILE_CLIENT_ID = "fX2JxdmntZWK0ixT" MOBILE_CLIENT_SECRET = "1Nm5AfDAjxrgJFJbKNWLeAyKGVGmINuXPPLHVXAvxAg=" # Read tokens from environment variables (Hugging Face Space secrets) ACCESS_TOKEN = os.environ.get("TIDAL_ACCESS_TOKEN", "") REFRESH_TOKEN = os.environ.get("TIDAL_REFRESH_TOKEN", "") CURRENT_CLIENT_ID = TV_CLIENT_ID # Use TV client by default app = FastAPI(title="TIDAL Download API", description="Download TIDAL tracks in LOSSLESS/HI_RES quality") def refresh_access_token(): """Refresh the access token using the refresh token""" global ACCESS_TOKEN, REFRESH_TOKEN, CURRENT_CLIENT_ID print("🔄 Refreshing access token with TV session credentials...") response = requests.post( "https://auth.tidal.com/v1/oauth2/token", data={ "grant_type": "refresh_token", "refresh_token": REFRESH_TOKEN, "client_id": TV_CLIENT_ID, "client_secret": TV_CLIENT_SECRET }, headers={ "Content-Type": "application/x-www-form-urlencoded", "User-Agent": "TIDAL_ANDROID/1039 okhttp/3.14.9" }, timeout=15 ) if response.status_code == 200: data = response.json() ACCESS_TOKEN = data.get('access_token') new_refresh_token = data.get('refresh_token') if new_refresh_token: REFRESH_TOKEN = new_refresh_token CURRENT_CLIENT_ID = TV_CLIENT_ID print(f"✅ Access token refreshed") return True else: print(f"❌ Failed to refresh token: {response.status_code}") return False def get_headers(): """Get headers with current access token""" return { "X-Tidal-Token": CURRENT_CLIENT_ID, "Authorization": f"Bearer {ACCESS_TOKEN}", "User-Agent": "TIDAL_ANDROID/1039 okhttp/3.14.9" } def decrypt_security_token(security_token): """Decrypt TIDAL security token to get AES key and nonce""" security_token = base64.b64decode(security_token) # Key derivation constants key = bytes([0x7A, 0x5C, 0x38, 0x5A, 0x7B, 0x25, 0x25, 0x49, 0x43, 0x4B, 0x31, 0x71, 0x5E, 0x24, 0x76, 0x5D]) # Extract nonce (first 12 bytes) and ciphertext nonce = security_token[:12] ciphertext = security_token[12:-16] # Remove auth tag from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend decryptor = Cipher(algorithms.AES(key), modes.GCM(nonce), backend=default_backend()).decryptor() decrypted = decryptor.update(ciphertext) # Decrypted data contains key and nonce for actual audio decryption audio_key = decrypted[:16] audio_nonce = decrypted[16:28] return audio_key, audio_nonce def decrypt_file(input_path, output_path, key, nonce): """Decrypt a file using AES-GCM""" from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend with open(input_path, 'rb') as infile: encrypted_data = infile.read() # Remove auth tag (last 16 bytes) encrypted_data = encrypted_data[:-16] decryptor = Cipher(algorithms.AES(key), modes.GCM(nonce), backend=default_backend()).decryptor() decrypted_data = decryptor.update(encrypted_data) with open(output_path, 'wb') as outfile: outfile.write(decrypted_data) def parse_dash_manifest(manifest_data): """Parse DASH manifest XML and extract segment URLs and audio format""" manifest_str = manifest_data.decode('utf-8') base_url = "" segments = [] media_template = "" start_number = 1 audio_format = "flac" codecs_detected = "" try: root = ET.fromstring(manifest_str) for elem in root.iter(): tag = elem.tag if '}' in tag: tag = tag.split('}')[1] if tag == 'BaseURL' or tag.lower() == 'baseurl': base_url = elem.text if tag == 'Representation' or tag.lower() == 'representation': codec = elem.get('codecs', '').upper() codecs_detected = codec if codec: print(f" Found Representation with codecs: '{codec}'") if 'MP4A' in codec or 'AAC' in codec: audio_format = "m4a" elif 'FLAC' in codec: audio_format = "flac" elif 'EC3' in codec or 'EAC3' in codec: audio_format = "eac3" print(f" Detected audio format: {audio_format}") if tag == 'SegmentTemplate' or tag.lower() == 'segmenttemplate': media_template = elem.get('media', '') initialization = elem.get('initialization', '') start_number = int(elem.get('startNumber', '1')) if initialization: segments.append(initialization) print(f"✅ Added initialization segment: {initialization}") for timeline in elem.iter(): tl_tag = timeline.tag if '}' in tl_tag: tl_tag = tl_tag.split('}')[1] if tl_tag == 'SegmentTimeline' or tl_tag.lower() == 'segmenttimeline': total_segments = 0 for s_elem in timeline.iter(): s_tag = s_elem.tag if '}' in s_tag: s_tag = s_tag.split('}')[1] if s_tag == 'S' or s_tag.lower() == 's': repeat = int(s_elem.get('r', '0')) total_segments += repeat + 1 if media_template and total_segments > 0: for i in range(start_number, start_number + total_segments): segment_url = media_template.replace('$Number$', str(i)) segments.append(segment_url) print(f"✅ Generated {len(segments)} segment URLs from template") except Exception as e: print(f"⚠️ Error parsing DASH manifest: {e}") if not segments: print("⚠️ Trying regex parsing...") init_match = re.search(r'initialization="([^"]+)"', manifest_str) if init_match: initialization = init_match.group(1) segments.append(initialization) print(f"✅ Added initialization segment with regex: {initialization}") template_match = re.search(r'media="([^"]+\$Number\$[^"]+)"', manifest_str) if template_match: media_template = template_match.group(1) start_num_match = re.search(r'startNumber="(\d+)"', manifest_str) if start_num_match: start_number = int(start_num_match.group(1)) timeline_matches = re.findall(r']*d="(\d+)"[^>]*r="(\d+)"', manifest_str) total_segments = 0 for d, r in timeline_matches: total_segments += int(r) + 1 single_segments = re.findall(r']*d="(\d+)"[^>]*[^r][^>]*>', manifest_str) total_segments += len(single_segments) if total_segments > 0: for i in range(start_number, start_number + total_segments): segment_url = media_template.replace('$Number$', str(i)) segments.append(segment_url) print(f"✅ Generated {len(segments)} segment URLs with regex") codecs_match = re.search(r'codecs="([^"]+)"', manifest_str) if codecs_match: codec = codecs_match.group(1).upper() codecs_detected = codec if 'MP4A' in codec or 'AAC' in codec: audio_format = "m4a" elif 'FLAC' in codec: audio_format = "flac" elif 'EC3' in codec or 'EAC3' in codec: audio_format = "eac3" mime_match = re.search(r'mimeType="([^"]+)"', manifest_str) if mime_match: mime = mime_match.group(1).lower() if 'mp4' in mime or 'aac' in mime or 'm4a' in mime: audio_format = "m4a" elif 'flac' in mime: audio_format = "flac" base_match = re.search(r'<[^>]*BaseURL[^>]*>([^<]+)