vsqx / main.py
atonyxu's picture
Update main.py
1d8d0de verified
from fastapi import FastAPI, Request, Query, BackgroundTasks
from fastapi.responses import HTMLResponse, JSONResponse, FileResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
import requests
import json
import os
import tempfile
import shutil
from datetime import datetime
from typing import Optional
import uvicorn
import asyncio
import aiohttp
import aiofiles
# 配置
CLOUDFLARE_API_TOKEN = "kX5qf8z570tAZ5UywNuzEVUfRhPt8Ay2QW8ga2hY"
CLOUDFLARE_ACCOUNT_ID = "85542ea7b47219a8449eef1fa8052563"
KV_NAMESPACE_ID = "vsqx-updater-VSQX_KV" # KV namespace 名称
DATA_FILE = "vsqx_data.json"
META_FILE = "vsqx_meta.json"
# 临时下载目录
TEMP_DIR = tempfile.mkdtemp(prefix="vsqx_downloads_")
# 语言映射
LANGUAGE_MAP = {
0: "全部",
1: "中文",
2: "日文",
3: "英文",
4: "其他"
}
app = FastAPI(title="VSQX.TOP 数据浏览器")
templates = Jinja2Templates(directory="templates")
def load_meta():
"""加载元数据"""
if os.path.exists(META_FILE):
with open(META_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
return {"last_update": None}
def save_meta(meta):
"""保存元数据"""
with open(META_FILE, 'w', encoding='utf-8') as f:
json.dump(meta, f, ensure_ascii=False)
def get_kv_namespace_id():
"""获取 KV namespace 的真实 ID"""
try:
url = f"https://api.cloudflare.com/client/v4/accounts/{CLOUDFLARE_ACCOUNT_ID}/storage/kv/namespaces"
headers = {
"Authorization": f"Bearer {CLOUDFLARE_API_TOKEN}"
}
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
result = response.json()
if result.get('success'):
for ns in result.get('result', []):
if ns.get('title') == KV_NAMESPACE_ID:
return ns.get('id')
return None
except Exception as e:
print(f"获取 KV namespace ID 失败:{e}")
return None
def download_data():
"""从 Cloudflare Workers KV 下载数据"""
try:
# 获取 KV namespace 的真实 ID
namespace_id = get_kv_namespace_id()
if not namespace_id:
print("无法获取 KV namespace ID")
return []
# 获取数据
data_url = f"https://api.cloudflare.com/client/v4/accounts/{CLOUDFLARE_ACCOUNT_ID}/storage/kv/namespaces/{namespace_id}/values/vsqx.all"
headers = {
"Authorization": f"Bearer {CLOUDFLARE_API_TOKEN}"
}
response = requests.get(data_url, headers=headers, timeout=60)
response.raise_for_status()
data = response.json()
# 获取更新时间
meta_url = f"https://api.cloudflare.com/client/v4/accounts/{CLOUDFLARE_ACCOUNT_ID}/storage/kv/namespaces/{namespace_id}/values/vsqx.update"
meta_response = requests.get(meta_url, headers=headers, timeout=60)
update_time = None
if meta_response.status_code == 200:
try:
update_time = meta_response.text.strip()
except:
pass
# 保存数据
with open(DATA_FILE, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False)
# 保存元数据
meta = {"last_update": update_time if update_time else datetime.now().isoformat()}
save_meta(meta)
return data
except Exception as e:
print(f"下载数据失败:{e}")
# 如果下载失败,尝试加载本地文件
if os.path.exists(DATA_FILE):
try:
with open(DATA_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except:
pass
return []
def load_data():
"""加载数据,检查是否需要更新"""
meta = load_meta()
need_update = False
if not os.path.exists(DATA_FILE):
need_update = True
elif meta.get("last_update"):
last_update_str = meta["last_update"]
# 处理带 Z 后缀的时间格式
if last_update_str.endswith('Z'):
last_update_str = last_update_str[:-1] + '+00:00'
try:
last = datetime.fromisoformat(last_update_str)
now = datetime.now()
if last.date() != now.date():
need_update = True
except:
need_update = True
if need_update:
print("正在更新数据...")
data = download_data()
if not data and os.path.exists(DATA_FILE):
with open(DATA_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
else:
with open(DATA_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
return data
def search_filter_sort(data, keyword="", language=None, author="", sort_by="music_id", sort_order="desc"):
"""搜索、筛选、排序"""
result = data.copy()
# 关键词搜索
if keyword:
kw = keyword.lower()
result = [
item for item in result
if kw in item.get("music_name", "").lower()
or kw in item.get("music_author", "").lower()
or kw in item.get("music_lyric", "").lower()
or kw in item.get("author", "").lower()
or kw in item.get("p_name", "").lower()
]
# 语言筛选
if language is not None:
result = [item for item in result if item.get("language") == language]
# 作者筛选
if author:
result = [item for item in result if author in item.get("p_name", "") or author in item.get("author", "")]
# 排序
reverse = sort_order == "desc"
sort_fields = {
"music_id": "music_id",
"music_name": "music_name",
"author": "p_name",
"download_count": "download_num",
"collect_count": "comment_num",
"click_count": "click_num"
}
field = sort_fields.get(sort_by, "music_id")
def get_sort_key(item):
val = item.get(field)
if field in ["music_id", "download_num", "comment_num", "click_num"]:
return val if val is not None else 0
return val if val is not None else ""
result.sort(key=get_sort_key, reverse=reverse)
return result
# 全局数据
all_data = []
@app.on_event("startup")
async def startup():
global all_data
all_data = load_data()
print(f"已加载 {len(all_data)} 条记录")
print(f"临时下载目录: {TEMP_DIR}")
@app.on_event("shutdown")
async def shutdown():
"""清理临时目录"""
if os.path.exists(TEMP_DIR):
shutil.rmtree(TEMP_DIR, ignore_errors=True)
print(f"已清理临时目录: {TEMP_DIR}")
@app.get("/config")
async def config():
"""Gradio/ModelScope 健康检查端点"""
return {
"version": "3.50.2",
"mode": "dev",
"dev_mode": True,
"analytics_enabled": False,
"components": [],
"theme": "default",
"title": "VSQX.TOP 数据浏览器",
"css": "",
"js": "",
"head": "",
"api_prefix": "",
"api_names": [],
"root": "",
"space_id": None
}
@app.get("/health")
async def health():
"""健康检查"""
return {"status": "ok", "loaded_records": len(all_data)}
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
"""主页"""
# 检查是否需要更新数据
meta = load_meta()
need_update = False
if not os.path.exists(DATA_FILE):
need_update = True
elif meta.get("last_update"):
last_update_str = meta["last_update"]
# 处理带 Z 后缀的时间格式
if last_update_str.endswith('Z'):
last_update_str = last_update_str[:-1] + '+00:00'
try:
last = datetime.fromisoformat(last_update_str)
now = datetime.now()
if last.date() != now.date():
need_update = True
except:
need_update = True
if need_update:
print("正在更新数据...")
download_data()
# 重新加载元数据和数据
meta = load_meta()
last_update = meta.get("last_update", "从未更新")
total = len(all_data)
return templates.TemplateResponse("index.html", {
"request": request,
"total": total,
"last_update": last_update,
"language_map": LANGUAGE_MAP
})
@app.get("/api/data")
async def get_data(
keyword: str = Query(default=""),
language: str = Query(default=""),
author: str = Query(default=""),
sort_by: str = Query(default="music_id"),
sort_order: str = Query(default="desc"),
page: int = Query(default=1, ge=1),
page_size: int = Query(default=50, ge=1, le=200)
):
"""获取数据 API"""
global all_data
# 处理 language 参数,空字符串或 0 转为 None
lang_value = None
if language and language != "0":
try:
lang_value = int(language)
except ValueError:
lang_value = None
result = search_filter_sort(all_data, keyword, lang_value, author, sort_by, sort_order)
total = len(result)
total_pages = (total + page_size - 1) // page_size
start = (page - 1) * page_size
end = start + page_size
items = result[start:end]
return {
"items": items,
"total": total,
"page": page,
"page_size": page_size,
"total_pages": total_pages
}
@app.post("/api/download")
async def proxy_download(request: Request, background_tasks: BackgroundTasks):
"""代理下载 API - 后台下载后转发"""
try:
body = await request.json()
vn = body.get("vn")
token = body.get("token")
if not vn or not token:
return JSONResponse(
status_code=400,
content={"success": False, "message": "缺少 vn 或 token 参数"}
)
# 创建临时文件路径 - 使用简单命名 vnXXXXXX.zip
temp_filename = f"vn{vn}.zip"
temp_filepath = os.path.join(TEMP_DIR, temp_filename)
# 如果文件已存在(可能之前下载过),先删除
if os.path.exists(temp_filepath):
os.remove(temp_filepath)
# 后台下载任务
download_result = await download_from_vsqx(vn, token, temp_filepath)
if not download_result["success"]:
return JSONResponse(
status_code=400,
content=download_result
)
# 添加后台任务:5分钟后删除临时文件
background_tasks.add_task(cleanup_file, temp_filepath, delay=300)
# 返回文件 - 使用简单的 vnXXXXXX.zip 作为下载名
return FileResponse(
path=temp_filepath,
filename=f"vn{vn}.zip",
media_type="application/zip",
background=background_tasks
)
except Exception as e:
import traceback
print(f"下载错误: {traceback.format_exc()}")
return JSONResponse(
status_code=500,
content={"success": False, "message": f"服务器错误: {str(e)}"}
)
async def download_from_vsqx(vn: int, token: str, save_path: str) -> dict:
"""从 VSQX 下载文件到本地"""
try:
# 1. 获取下载链接
api_url = "https://www.vsqx.top/api/app/download_project"
headers = {
"Content-Type": "application/json",
"token": token,
"referer": f"https://www.vsqx.top/project/vn{vn}",
"origin": "https://www.vsqx.top",
"user-agent": "Mozilla/5.0 (X11; Linux aarch64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.5005.200 Safari/537.36 Qaxbrowser",
"accept": "application/json, text/plain, */*"
}
async with aiohttp.ClientSession() as session:
async with session.post(
api_url,
json={"vn": vn},
headers=headers,
timeout=aiohttp.ClientTimeout(total=10)
) as resp:
if resp.status != 200:
return {
"success": False,
"message": f"获取下载链接失败: HTTP {resp.status}"
}
data = await resp.json()
if not data.get("success"):
return {
"success": False,
"message": data.get("message", "获取下载链接失败")
}
download_url = data.get("data")
# 2. 下载文件
file_headers = {
"referer": f"https://www.vsqx.top/project/vn{vn}",
"user-agent": headers["user-agent"]
}
async with session.get(
download_url,
headers=file_headers,
timeout=aiohttp.ClientTimeout(total=60)
) as file_resp:
if file_resp.status != 200:
return {
"success": False,
"message": f"下载文件失败: HTTP {file_resp.status}"
}
# 写入文件
async with aiofiles.open(save_path, 'wb') as f:
async for chunk in file_resp.content.iter_chunked(8192):
await f.write(chunk)
return {
"success": True,
"path": save_path
}
except asyncio.TimeoutError:
return {"success": False, "message": "请求超时"}
except Exception as e:
return {"success": False, "message": f"下载异常: {str(e)}"}
async def cleanup_file(filepath: str, delay: int = 300):
"""延迟删除临时文件"""
await asyncio.sleep(delay)
try:
if os.path.exists(filepath):
os.remove(filepath)
print(f"已清理临时文件: {filepath}")
except Exception as e:
print(f"清理文件失败 {filepath}: {e}")
@app.get("/api/refresh")
async def refresh():
"""刷新数据"""
global all_data
all_data = load_data()
return {"success": True, "total": len(all_data)}
@app.get("/api/status")
async def status():
"""获取状态"""
meta = load_meta()
return {
"total": len(all_data),
"last_update": meta.get("last_update")
}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)