| import base64 |
| import json |
| import re |
| import xml.etree.ElementTree as ET |
| import asyncio |
| from typing import Optional, List, Any, Dict |
| from fastapi import FastAPI, HTTPException, Query, Response, Request |
| from fastapi.responses import JSONResponse, StreamingResponse |
| import httpx |
| import os |
|
|
| from contextlib import asynccontextmanager |
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| |
| app.state.client = httpx.AsyncClient(follow_redirects=True, timeout=30.0) |
| yield |
| |
| await app.state.client.aclose() |
|
|
| app = FastAPI(title="Tidal Stream Processor", lifespan=lifespan) |
|
|
| |
|
|
| |
| AVAILABLE_APIS = [ |
| "https://tidal-api.binimum.org", |
| "https://tidal.kinoplus.online", |
| "https://triton.squid.wtf", |
| "https://vogel.qqdl.site", |
| "https://maus.qqdl.site", |
| "https://hund.qqdl.site", |
| "https://katze.qqdl.site", |
| "https://wolf.qqdl.site", |
| "https://hifi-one.spotisaver.net", |
| "https://hifi-two.spotisaver.net", |
| "https://eu-central.monochrome.tf", |
| "https://us-west.monochrome.tf", |
| "https://api.monochrome.tf", |
| "https://monochrome-api.samidy.com", |
| "https://api.zarz.moe/v1/tidal", |
| "https://tidal.squid.wtf", |
| "https://tidal.afkarxyz.qzz.io" |
| ] |
|
|
| MAX_RETRIES = 2 |
| RETRY_DELAY = 0.5 |
| TIMEOUT = 15.0 |
| HEADERS = { |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36" |
| } |
|
|
| async def fetch_api_with_retry(client: httpx.AsyncClient, api: str, track_id: int, quality: str) -> Dict[str, Any]: |
| current_delay = RETRY_DELAY |
| base_url = api.rstrip('/') |
| url = f"{base_url}/track/" |
| params = {"id": track_id, "quality": quality} |
|
|
| for attempt in range(MAX_RETRIES + 1): |
| if attempt > 0: |
| await asyncio.sleep(current_delay) |
| current_delay *= 2 |
| try: |
| resp = await client.get(url, params=params, headers=HEADERS, timeout=TIMEOUT) |
| if resp.status_code == 429: |
| continue |
| if resp.status_code >= 500: |
| continue |
| if resp.status_code != 200: |
| continue |
| body = resp.json() |
| if (isinstance(body, list) and len(body) > 0) or (isinstance(body, dict) and (body.get("data") or body.get("manifest"))): |
| return body |
| except Exception: |
| continue |
| raise Exception(f"API {api} failed") |
|
|
| async def get_tidal_data_sequential(client: httpx.AsyncClient, track_id: int, quality: str) -> Dict[str, Any]: |
| """顺序请求 API,失败的移动到末尾,成功的保持在开头""" |
| global AVAILABLE_APIS |
| |
| |
| apis_to_try = list(AVAILABLE_APIS) |
| errors = [] |
| |
| for api in apis_to_try: |
| try: |
| result = await fetch_api_with_retry(client, api, track_id, quality) |
| return result |
| except Exception as e: |
| errors.append(f"{api}: {e}") |
| |
| |
| if api in AVAILABLE_APIS: |
| AVAILABLE_APIS.remove(api) |
| AVAILABLE_APIS.append(api) |
| |
| raise HTTPException(status_code=500, detail=f"All Tidal APIs failed: {'; '.join(errors[:3])}") |
|
|
| def parse_dash_manifest(xml_str: str): |
| xml_str = re.sub(r'\sxmlns="[^"]+"', '', xml_str, count=1) |
| try: |
| root = ET.fromstring(xml_str) |
| except ET.ParseError: |
| return None, None, 0 |
| seg_temp = root.find(".//SegmentTemplate") |
| if seg_temp is None: |
| return None, None, 0 |
| init_val = seg_temp.get("initialization") |
| media_val = seg_temp.get("media") |
| if not init_val or not media_val: |
| return None, None, 0 |
| init_url = init_val.replace("&", "&") |
| media_template = media_val.replace("&", "&") |
| segment_timeline = seg_temp.find("SegmentTimeline") |
| total_segments = 0 |
| if segment_timeline is not None: |
| for s in segment_timeline.findall("S"): |
| repeat = int(s.get("r", 0)) |
| total_segments += (repeat + 1) |
| return init_url, media_template, total_segments |
|
|
| def generate_m3u8(init_url: str, media_template: str, count: int) -> str: |
| m3u8 = ["#EXTM3U", "#EXT-X-VERSION:7", "#EXT-X-TARGETDURATION:5", "#EXT-X-MEDIA-SEQUENCE:1", f'#EXT-X-MAP:URI="{init_url}"'] |
| for i in range(1, count + 1): |
| segment_url = media_template.replace("$Number$", str(i)) |
| m3u8.append("#EXTINF:4.0,") |
| m3u8.append(segment_url) |
| m3u8.append("#EXT-X-ENDLIST") |
| return "\n".join(m3u8) |
|
|
| @app.get("/") |
| async def root(): |
| return {"message": "Tidal Stream Processor is running", "endpoints": {"fetch": "/fetch?id={track_id}&quality={quality}", "playlist": "/playlist.m3u8?id={track_id}&quality={quality}"}} |
|
|
| @app.get("/fetch") |
| async def fetch_tidal_media( |
| request: Request, |
| track_id: int = Query(..., alias="id"), |
| quality: str = "LOSSLESS" |
| ): |
| body = await get_tidal_data_sequential(request.app.state.client, track_id, quality) |
| if isinstance(body, list) and len(body) > 0: |
| direct_url = body[0].get("OriginalTrackUrl") |
| if direct_url: |
| return {"type": "direct", "url": direct_url} |
| manifest_b64 = body.get("data", {}).get("manifest") or body.get("manifest") |
| if not manifest_b64: |
| raise HTTPException(status_code=404, detail="Manifest not found") |
| manifest_str = base64.b64decode(manifest_b64).decode("utf-8") |
| if manifest_str.strip().startswith("{"): |
| bts_data = json.loads(manifest_str) |
| if bts_data.get("encryptionType") == "NONE" and bts_data.get("urls"): |
| return {"type": "direct", "url": bts_data["urls"][0]} |
| return {"type": "error", "detail": "Encrypted BTS not supported"} |
| return {"type": "m3u8", "manifest_url": f"/playlist.m3u8?id={track_id}&quality={quality}"} |
|
|
| @app.get("/playlist.m3u8") |
| async def get_m3u8_playlist( |
| request: Request, |
| track_id: int = Query(..., alias="id"), |
| quality: str = "LOSSLESS" |
| ): |
| body = await get_tidal_data_sequential(request.app.state.client, track_id, quality) |
| manifest_b64 = body.get("data", {}).get("manifest") or body.get("manifest") |
| if not manifest_b64: |
| raise HTTPException(status_code=404, detail="Manifest not found") |
| manifest_str = base64.b64decode(manifest_b64).decode("utf-8") |
| init_url, media_template, count = parse_dash_manifest(manifest_str) |
| if not init_url: |
| raise HTTPException(status_code=400, detail="Could not parse DASH manifest") |
| return Response(content=generate_m3u8(init_url, media_template, count), media_type="application/vnd.apple.mpegurl") |
|
|
| @app.api_route("/proxy", methods=["GET", "HEAD", "OPTIONS"]) |
| async def proxy(request: Request): |
| if request.method == "OPTIONS": |
| return Response(headers={"Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET, HEAD, OPTIONS", "Access-Control-Allow-Headers": "*"}) |
| |
| url = request.query_params.get("url") |
| if not url: |
| return Response("Missing url", status_code=400) |
| |
| client = request.app.state.client |
| |
| |
| req = client.build_request( |
| request.method, |
| url, |
| headers={ |
| "User-Agent": request.headers.get("User-Agent", "Mozilla/5.0"), |
| "Accept": request.headers.get("Accept", "*/*"), |
| "Referer": request.headers.get("Referer", "") |
| } |
| ) |
| resp = await client.send(req, stream=True) |
| |
| response_headers = dict(resp.headers) |
| response_headers["Access-Control-Allow-Origin"] = "*" |
| response_headers.pop("X-Frame-Options", None) |
| |
| |
| async def iterate_and_close(): |
| try: |
| async for chunk in resp.aiter_raw(): |
| yield chunk |
| finally: |
| await resp.aclose() |
|
|
| return StreamingResponse( |
| iterate_and_close(), |
| status_code=resp.status_code, |
| headers=response_headers |
| ) |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", 7860))) |
|
|