| from fastapi import FastAPI, Request, Query, BackgroundTasks |
| from fastapi.responses import HTMLResponse, JSONResponse, FileResponse |
| from fastapi.templating import Jinja2Templates |
| from fastapi.staticfiles import StaticFiles |
| import requests |
| import json |
| import os |
| import tempfile |
| import shutil |
| from datetime import datetime |
| from typing import Optional |
| import uvicorn |
| import asyncio |
| import aiohttp |
| import aiofiles |
|
|
| |
| CLOUDFLARE_API_TOKEN = "kX5qf8z570tAZ5UywNuzEVUfRhPt8Ay2QW8ga2hY" |
| CLOUDFLARE_ACCOUNT_ID = "85542ea7b47219a8449eef1fa8052563" |
| KV_NAMESPACE_ID = "vsqx-updater-VSQX_KV" |
| DATA_FILE = "vsqx_data.json" |
| META_FILE = "vsqx_meta.json" |
|
|
| |
| TEMP_DIR = tempfile.mkdtemp(prefix="vsqx_downloads_") |
|
|
| |
| LANGUAGE_MAP = { |
| 0: "全部", |
| 1: "中文", |
| 2: "日文", |
| 3: "英文", |
| 4: "其他" |
| } |
|
|
| app = FastAPI(title="VSQX.TOP 数据浏览器") |
| templates = Jinja2Templates(directory="templates") |
|
|
| def load_meta(): |
| """加载元数据""" |
| if os.path.exists(META_FILE): |
| with open(META_FILE, 'r', encoding='utf-8') as f: |
| return json.load(f) |
| return {"last_update": None} |
|
|
| def save_meta(meta): |
| """保存元数据""" |
| with open(META_FILE, 'w', encoding='utf-8') as f: |
| json.dump(meta, f, ensure_ascii=False) |
|
|
| def get_kv_namespace_id(): |
| """获取 KV namespace 的真实 ID""" |
| try: |
| url = f"https://api.cloudflare.com/client/v4/accounts/{CLOUDFLARE_ACCOUNT_ID}/storage/kv/namespaces" |
| headers = { |
| "Authorization": f"Bearer {CLOUDFLARE_API_TOKEN}" |
| } |
| |
| response = requests.get(url, headers=headers, timeout=30) |
| response.raise_for_status() |
| result = response.json() |
| |
| if result.get('success'): |
| for ns in result.get('result', []): |
| if ns.get('title') == KV_NAMESPACE_ID: |
| return ns.get('id') |
| |
| return None |
| except Exception as e: |
| print(f"获取 KV namespace ID 失败:{e}") |
| return None |
|
|
| def download_data(): |
| """从 Cloudflare Workers KV 下载数据""" |
| try: |
| |
| namespace_id = get_kv_namespace_id() |
| if not namespace_id: |
| print("无法获取 KV namespace ID") |
| return [] |
| |
| |
| data_url = f"https://api.cloudflare.com/client/v4/accounts/{CLOUDFLARE_ACCOUNT_ID}/storage/kv/namespaces/{namespace_id}/values/vsqx.all" |
| headers = { |
| "Authorization": f"Bearer {CLOUDFLARE_API_TOKEN}" |
| } |
| |
| response = requests.get(data_url, headers=headers, timeout=60) |
| response.raise_for_status() |
| data = response.json() |
| |
| |
| meta_url = f"https://api.cloudflare.com/client/v4/accounts/{CLOUDFLARE_ACCOUNT_ID}/storage/kv/namespaces/{namespace_id}/values/vsqx.update" |
| meta_response = requests.get(meta_url, headers=headers, timeout=60) |
| update_time = None |
| if meta_response.status_code == 200: |
| try: |
| update_time = meta_response.text.strip() |
| except: |
| pass |
| |
| |
| with open(DATA_FILE, 'w', encoding='utf-8') as f: |
| json.dump(data, f, ensure_ascii=False) |
| |
| |
| meta = {"last_update": update_time if update_time else datetime.now().isoformat()} |
| save_meta(meta) |
| return data |
| except Exception as e: |
| print(f"下载数据失败:{e}") |
| |
| if os.path.exists(DATA_FILE): |
| try: |
| with open(DATA_FILE, 'r', encoding='utf-8') as f: |
| return json.load(f) |
| except: |
| pass |
| return [] |
|
|
| def load_data(): |
| """加载数据,检查是否需要更新""" |
| meta = load_meta() |
| need_update = False |
| |
| if not os.path.exists(DATA_FILE): |
| need_update = True |
| elif meta.get("last_update"): |
| last_update_str = meta["last_update"] |
| |
| if last_update_str.endswith('Z'): |
| last_update_str = last_update_str[:-1] + '+00:00' |
| try: |
| last = datetime.fromisoformat(last_update_str) |
| now = datetime.now() |
| if last.date() != now.date(): |
| need_update = True |
| except: |
| need_update = True |
| |
| if need_update: |
| print("正在更新数据...") |
| data = download_data() |
| if not data and os.path.exists(DATA_FILE): |
| with open(DATA_FILE, 'r', encoding='utf-8') as f: |
| data = json.load(f) |
| else: |
| with open(DATA_FILE, 'r', encoding='utf-8') as f: |
| data = json.load(f) |
| |
| return data |
|
|
| def search_filter_sort(data, keyword="", language=None, author="", sort_by="music_id", sort_order="desc"): |
| """搜索、筛选、排序""" |
| result = data.copy() |
| |
| |
| if keyword: |
| kw = keyword.lower() |
| result = [ |
| item for item in result |
| if kw in item.get("music_name", "").lower() |
| or kw in item.get("music_author", "").lower() |
| or kw in item.get("music_lyric", "").lower() |
| or kw in item.get("author", "").lower() |
| or kw in item.get("p_name", "").lower() |
| ] |
| |
| |
| if language is not None: |
| result = [item for item in result if item.get("language") == language] |
| |
| |
| if author: |
| result = [item for item in result if author in item.get("p_name", "") or author in item.get("author", "")] |
| |
| |
| reverse = sort_order == "desc" |
| sort_fields = { |
| "music_id": "music_id", |
| "music_name": "music_name", |
| "author": "p_name", |
| "download_count": "download_num", |
| "collect_count": "comment_num", |
| "click_count": "click_num" |
| } |
| |
| field = sort_fields.get(sort_by, "music_id") |
| |
| def get_sort_key(item): |
| val = item.get(field) |
| if field in ["music_id", "download_num", "comment_num", "click_num"]: |
| return val if val is not None else 0 |
| return val if val is not None else "" |
| |
| result.sort(key=get_sort_key, reverse=reverse) |
| return result |
|
|
| |
| all_data = [] |
|
|
| @app.on_event("startup") |
| async def startup(): |
| global all_data |
| all_data = load_data() |
| print(f"已加载 {len(all_data)} 条记录") |
| print(f"临时下载目录: {TEMP_DIR}") |
|
|
| @app.on_event("shutdown") |
| async def shutdown(): |
| """清理临时目录""" |
| if os.path.exists(TEMP_DIR): |
| shutil.rmtree(TEMP_DIR, ignore_errors=True) |
| print(f"已清理临时目录: {TEMP_DIR}") |
|
|
| @app.get("/config") |
| async def config(): |
| """Gradio/ModelScope 健康检查端点""" |
| return { |
| "version": "3.50.2", |
| "mode": "dev", |
| "dev_mode": True, |
| "analytics_enabled": False, |
| "components": [], |
| "theme": "default", |
| "title": "VSQX.TOP 数据浏览器", |
| "css": "", |
| "js": "", |
| "head": "", |
| "api_prefix": "", |
| "api_names": [], |
| "root": "", |
| "space_id": None |
| } |
|
|
| @app.get("/health") |
| async def health(): |
| """健康检查""" |
| return {"status": "ok", "loaded_records": len(all_data)} |
|
|
| @app.get("/", response_class=HTMLResponse) |
| async def index(request: Request): |
| """主页""" |
| |
| meta = load_meta() |
| need_update = False |
| |
| if not os.path.exists(DATA_FILE): |
| need_update = True |
| elif meta.get("last_update"): |
| last_update_str = meta["last_update"] |
| |
| if last_update_str.endswith('Z'): |
| last_update_str = last_update_str[:-1] + '+00:00' |
| try: |
| last = datetime.fromisoformat(last_update_str) |
| now = datetime.now() |
| if last.date() != now.date(): |
| need_update = True |
| except: |
| need_update = True |
| |
| if need_update: |
| print("正在更新数据...") |
| download_data() |
| |
| |
| meta = load_meta() |
| last_update = meta.get("last_update", "从未更新") |
| total = len(all_data) |
| |
| return templates.TemplateResponse("index.html", { |
| "request": request, |
| "total": total, |
| "last_update": last_update, |
| "language_map": LANGUAGE_MAP |
| }) |
|
|
| @app.get("/api/data") |
| async def get_data( |
| keyword: str = Query(default=""), |
| language: str = Query(default=""), |
| author: str = Query(default=""), |
| sort_by: str = Query(default="music_id"), |
| sort_order: str = Query(default="desc"), |
| page: int = Query(default=1, ge=1), |
| page_size: int = Query(default=50, ge=1, le=200) |
| ): |
| """获取数据 API""" |
| global all_data |
| |
| |
| lang_value = None |
| if language and language != "0": |
| try: |
| lang_value = int(language) |
| except ValueError: |
| lang_value = None |
| |
| result = search_filter_sort(all_data, keyword, lang_value, author, sort_by, sort_order) |
| total = len(result) |
| total_pages = (total + page_size - 1) // page_size |
| |
| start = (page - 1) * page_size |
| end = start + page_size |
| items = result[start:end] |
| |
| return { |
| "items": items, |
| "total": total, |
| "page": page, |
| "page_size": page_size, |
| "total_pages": total_pages |
| } |
|
|
| @app.post("/api/download") |
| async def proxy_download(request: Request, background_tasks: BackgroundTasks): |
| """代理下载 API - 后台下载后转发""" |
| try: |
| body = await request.json() |
| vn = body.get("vn") |
| token = body.get("token") |
| |
| if not vn or not token: |
| return JSONResponse( |
| status_code=400, |
| content={"success": False, "message": "缺少 vn 或 token 参数"} |
| ) |
| |
| |
| temp_filename = f"vn{vn}.zip" |
| temp_filepath = os.path.join(TEMP_DIR, temp_filename) |
| |
| |
| if os.path.exists(temp_filepath): |
| os.remove(temp_filepath) |
| |
| |
| download_result = await download_from_vsqx(vn, token, temp_filepath) |
| |
| if not download_result["success"]: |
| return JSONResponse( |
| status_code=400, |
| content=download_result |
| ) |
| |
| |
| background_tasks.add_task(cleanup_file, temp_filepath, delay=300) |
| |
| |
| return FileResponse( |
| path=temp_filepath, |
| filename=f"vn{vn}.zip", |
| media_type="application/zip", |
| background=background_tasks |
| ) |
| |
| except Exception as e: |
| import traceback |
| print(f"下载错误: {traceback.format_exc()}") |
| return JSONResponse( |
| status_code=500, |
| content={"success": False, "message": f"服务器错误: {str(e)}"} |
| ) |
|
|
| async def download_from_vsqx(vn: int, token: str, save_path: str) -> dict: |
| """从 VSQX 下载文件到本地""" |
| try: |
| |
| api_url = "https://www.vsqx.top/api/app/download_project" |
| headers = { |
| "Content-Type": "application/json", |
| "token": token, |
| "referer": f"https://www.vsqx.top/project/vn{vn}", |
| "origin": "https://www.vsqx.top", |
| "user-agent": "Mozilla/5.0 (X11; Linux aarch64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.5005.200 Safari/537.36 Qaxbrowser", |
| "accept": "application/json, text/plain, */*" |
| } |
| |
| async with aiohttp.ClientSession() as session: |
| async with session.post( |
| api_url, |
| json={"vn": vn}, |
| headers=headers, |
| timeout=aiohttp.ClientTimeout(total=10) |
| ) as resp: |
| if resp.status != 200: |
| return { |
| "success": False, |
| "message": f"获取下载链接失败: HTTP {resp.status}" |
| } |
| |
| data = await resp.json() |
| |
| if not data.get("success"): |
| return { |
| "success": False, |
| "message": data.get("message", "获取下载链接失败") |
| } |
| |
| download_url = data.get("data") |
| |
| |
| file_headers = { |
| "referer": f"https://www.vsqx.top/project/vn{vn}", |
| "user-agent": headers["user-agent"] |
| } |
| |
| async with session.get( |
| download_url, |
| headers=file_headers, |
| timeout=aiohttp.ClientTimeout(total=60) |
| ) as file_resp: |
| if file_resp.status != 200: |
| return { |
| "success": False, |
| "message": f"下载文件失败: HTTP {file_resp.status}" |
| } |
| |
| |
| async with aiofiles.open(save_path, 'wb') as f: |
| async for chunk in file_resp.content.iter_chunked(8192): |
| await f.write(chunk) |
| |
| return { |
| "success": True, |
| "path": save_path |
| } |
| |
| except asyncio.TimeoutError: |
| return {"success": False, "message": "请求超时"} |
| except Exception as e: |
| return {"success": False, "message": f"下载异常: {str(e)}"} |
|
|
| async def cleanup_file(filepath: str, delay: int = 300): |
| """延迟删除临时文件""" |
| await asyncio.sleep(delay) |
| try: |
| if os.path.exists(filepath): |
| os.remove(filepath) |
| print(f"已清理临时文件: {filepath}") |
| except Exception as e: |
| print(f"清理文件失败 {filepath}: {e}") |
|
|
| @app.get("/api/refresh") |
| async def refresh(): |
| """刷新数据""" |
| global all_data |
| all_data = load_data() |
| return {"success": True, "total": len(all_data)} |
|
|
| @app.get("/api/status") |
| async def status(): |
| """获取状态""" |
| meta = load_meta() |
| return { |
| "total": len(all_data), |
| "last_update": meta.get("last_update") |
| } |
|
|
| if __name__ == "__main__": |
| uvicorn.run(app, host="0.0.0.0", port=7860) |