| import hashlib |
| import time |
| import re |
| import logging |
| import requests |
| import subprocess |
| from fastapi import FastAPI, Request, HTTPException |
| from fastapi.responses import FileResponse |
| from fastapi.staticfiles import StaticFiles |
| import tempfile |
| import os |
| import uuid |
| import gc |
| import json |
|
|
|
|
| app = FastAPI() |
|
|
| |
| global_download_dir = tempfile.mkdtemp() |
| logging.basicConfig(level=logging.DEBUG) |
|
|
| def md5(string): |
| """Generate MD5 hash of a string.""" |
| return hashlib.md5(string.encode('utf-8')).hexdigest() |
| |
| BASE_URL = 'https://tecuts-request.hf.space' |
| BASE = 'https://www.qobuz.com/api.json/0.2/' |
| TOKEN = os.getenv('TOKEN') |
|
|
|
|
| APP_ID = '579939560' |
|
|
| |
| HEADERS = { |
| 'X-App-Id': APP_ID, |
| 'X-User-Auth-Token': TOKEN, |
| 'User-Agent': 'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.8.1.21) Gecko/20100312 Firefox/3.6' |
| } |
|
|
| @app.get('/') |
| def main(): |
| return {"appStatus": "running"} |
|
|
| @app.post('/track') |
| async def fetch_data_for_url(request: Request): |
| try: |
| data = await request.json() |
| except Exception as e: |
| logging.error(f"Error parsing JSON: {e}") |
| raise HTTPException(status_code=400, detail="Invalid JSON") |
| |
| url = data.get('url') |
| if not url: |
| raise HTTPException(status_code=400, detail="URL is required") |
|
|
| logging.info(f'Fetching data for: {url}') |
| |
| track_id_match = re.search(r'\d+$', url) |
| if not track_id_match: |
| logging.error('Track ID not found in your input.') |
| raise HTTPException(status_code=400, detail="Track ID not found in URL") |
| |
| track_id = track_id_match.group(0) |
| timestamp = int(time.time()) |
| rSigRaw = f'trackgetFileUrlformat_id27intentstreamtrack_id{track_id}{timestamp}fa31fc13e7a28e7d70bb61e91aa9e178' |
| rSig = md5(rSigRaw) |
| |
| download_url = f'{BASE}track/getFileUrl?format_id=27&intent=stream&track_id={track_id}&request_ts={timestamp}&request_sig={rSig}' |
| |
| response = requests.get(download_url, headers=HEADERS) |
| if response.status_code != 200: |
| logging.error(f"Failed to fetch the track file URL: {response.status_code}") |
| raise HTTPException(status_code=500, detail="Failed to fetch the track file URL") |
| |
| file_url = response.json().get('url') |
| if not file_url: |
| logging.error("No file URL returned from Qobuz") |
| raise HTTPException(status_code=500, detail="No file URL returned") |
| |
| |
| flac_file_path = os.path.join(global_download_dir, f'{uuid.uuid4()}.flac') |
| with requests.get(file_url, stream=True) as r: |
| r.raise_for_status() |
| with open(flac_file_path, 'wb') as f: |
| for chunk in r.iter_content(chunk_size=8192): |
| f.write(chunk) |
| logging.info(f"FLAC file downloaded at {flac_file_path}") |
|
|
| |
| alac_file_path = os.path.join(global_download_dir, f'{uuid.uuid4()}.m4a') |
| with subprocess.Popen(['ffmpeg', '-i', flac_file_path, '-c:a', 'alac', alac_file_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE) as process: |
| process.communicate() |
| logging.info(f"Converted to ALAC: {alac_file_path}") |
|
|
| |
| os.remove(flac_file_path) |
| logging.info(f"Deleted FLAC file: {flac_file_path}") |
|
|
| |
| alac_file_url = f'{BASE_URL}/file/{os.path.basename(alac_file_path)}' |
| logging.info(f"Returning ALAC file URL: {alac_file_url}") |
|
|
| |
| gc.collect() |
|
|
| |
| return {"url": alac_file_url} |
|
|
| |
| app.mount("/file", StaticFiles(directory=global_download_dir), name="downloads") |
|
|
|
|
| def extract_video_id(url): |
| |
| pattern = r'(?:(?:youtube\.com\/(?:watch\?v=|embed\/|shorts\/)|youtu\.be\/)([a-zA-Z0-9_-]{11}))' |
| |
| |
| match = re.search(pattern, url) |
| |
| if match: |
| return match.group(1) |
| else: |
| return None |
|
|
|
|
| @app.get("/yt-audio") |
| async def yt_audio(url: str): |
| try: |
| |
| videoId = extract_video_id(url) |
| if videoId is None: |
| raise ValueError("Video ID extraction failed") |
|
|
| |
| headers = { |
| "X-Goog-Authuser": "0", |
| "X-Origin": "https://www.youtube.com", |
| "X-goog-Visitor-Id": "CgtMWlVXOTFCcE5Edyj9p4i9BjIKCgJVUxIEGgAgZA%3D%3D" |
| } |
|
|
| |
| body = { |
| "context": { |
| "client": { |
| "clientName": "IOS", |
| "clientVersion": "19.14.3", |
| "deviceModel": "iPhone15,4" |
| } |
| }, |
| "videoId": videoId |
| } |
| Player_Url = "https://www.youtube.com/youtubei/v1/player?key=AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8&prettyPrint=false" |
|
|
| |
| logging.info(f"Sending request to URL: {url} with video ID: {videoId}") |
| response = requests.post( |
| Player_Url, |
| timeout=20, |
| json=body, |
| headers=headers |
| ) |
| response.raise_for_status() |
|
|
| logging.info(f"Received response from URL: {url}, status code: {response.status_code}") |
| return response.json() |
|
|
| except requests.RequestException as e: |
| logging.error(f"Request error: {str(e)}") |
| return {"error": f"Request error: {str(e)}"} |
| except json.JSONDecodeError as e: |
| logging.error(f"JSON decoding error: {str(e)}") |
| return {"error": f"JSON decoding error: {str(e)}"} |
| except ValueError as e: |
| logging.error(f"Value error: {str(e)}") |
| return {"error": f"Value error: {str(e)}"} |
| except Exception as e: |
| logging.error(f"Unexpected error: {str(e)}") |
| return {"error": f"Unexpected error: {str(e)}"} |
|
|
| |
| @app.middleware("http") |
| async def set_mime_type_middleware(request: Request, call_next): |
| response = await call_next(request) |
| if request.url.path.endswith(".m4a"): |
| response.headers["Content-Type"] = "audio/m4a" |
| return response |
|
|