Flying-Music-API / core.py
ZHIWEI666's picture
Upload 9 files
b7b932b verified
import os
import json
import uuid
import asyncio
import threading
import copy
from datetime import datetime
from pydantic import BaseModel
from huggingface_hub import HfApi, hf_hub_download, upload_file
TOKEN = os.getenv("HF_TOKEN")
REPO_ID = "ZHIWEI666/Flying-Music"
api = HfApi(token=TOKEN)
# ==========================================
# 🌟 请求体数据模型 (Pydantic Models)
# ==========================================
class AuthReq(BaseModel): username: str; password: str; source: str = ""
class LikeReq(BaseModel): song_hash: str; username: str
class LikeReplyReq(BaseModel): song_hash: str; comment_id: str; reply_id: str; username: str
class CommentReq(BaseModel): song_hash: str; username: str; content: str; parent_id: str = None; parent_user: str = None; parent_text: str = None
class BasicReq(BaseModel): username: str = "guest"
class PlaylistAddReq(BaseModel): username: str; type: str; author: str = ""; playlist_name: str; filename: str
class PlaylistBatchAddReq(BaseModel): username: str; type: str; author: str = ""; playlist_name: str; filenames: list
class PlaylistDeleteReq(BaseModel): username: str; type: str; name: str
class PlaylistLikeReq(BaseModel): username: str; type: str; author: str; name: str
class PlaylistCommentReq(BaseModel): username: str; type: str; author: str; name: str; content: str; parent_id: str = None; parent_user: str = None
class ShareLikeReq(BaseModel): post_id: str; username: str
class ShareCommentReq(BaseModel): post_id: str; username: str; content: str; parent_id: str = None; parent_user: str = None
class ShareCommentActionReq(BaseModel): post_id: str; comment_id: str; username: str; action: str
class ShareEditReq(BaseModel): post_id: str; username: str; title: str; content: str
class ShareDeleteReq(BaseModel): post_id: str; username: str
# ==========================================
# 🌟 全局内存单例数据库与锁机制
# ==========================================
DB_CACHE = {
"users.json": {},
"interaction.json": {},
"cloud_playlists.json": {"admin": {}, "users": {}},
"community_posts.json": [],
"co_listen_rooms.json": {}
}
DB_MODIFIED = set()
# 同步锁:用于保护 get_db 和 save_db 的内存读写安全
db_lock = threading.Lock()
# 异步锁:用于保护后台定时上传任务,防止协程并发撞车
upload_lock = asyncio.Lock()
async def _download_db(filename, default_val):
"""底层辅助方法:从 HF 下载单个数据文件"""
try:
path = await asyncio.to_thread(hf_hub_download, repo_id=REPO_ID, filename=f"database/{filename}", repo_type="dataset", token=TOKEN)
with open(path, 'r', encoding='utf-8') as f: return json.load(f)
except: return default_val
async def init_db():
"""在 FastAPI 启动时调用,一次性将 5 个核心 JSON 载入内存字典"""
global DB_CACHE
print("⏳ 飞行音乐:正在拉取云端数据库到内存...")
DB_CACHE["users.json"] = await _download_db("users.json", {})
DB_CACHE["interaction.json"] = await _download_db("interaction.json", {})
DB_CACHE["cloud_playlists.json"] = await _download_db("cloud_playlists.json", {"admin": {}, "users": {}})
DB_CACHE["community_posts.json"] = await _download_db("community_posts.json", [])
# 共听房间:使用 clear + update 保持外部引用有效
rooms_data = await _download_db("co_listen_rooms.json", {})
for room in rooms_data.values():
room["online_users"] = []
DB_CACHE["co_listen_rooms.json"].clear()
DB_CACHE["co_listen_rooms.json"].update(rooms_data)
print("✅ 飞行音乐:高速内存单例数据库加载完毕!")
def get_db(filename):
"""获取字典型数据(带线程锁与深拷贝防护)"""
with db_lock:
return copy.deepcopy(DB_CACHE.get(filename, {}))
def get_db_ref(filename):
"""获取数据的原始引用(用于需要原地修改并同步的场景,需谨慎使用)"""
with db_lock:
return DB_CACHE[filename]
def get_db_list(filename):
"""获取列表型数据(带线程锁与深拷贝防护)"""
with db_lock:
return copy.deepcopy(DB_CACHE.get(filename, []))
def save_db(data, filename):
"""将数据保存至内存并标记为脏数据,瞬间返回(不阻塞原路)"""
with db_lock:
DB_CACHE[filename] = copy.deepcopy(data)
DB_MODIFIED.add(filename)
async def periodic_sync():
"""后台常驻协程,每隔 3 分钟将有变动的内存数据打包上传至 HF"""
while True:
await asyncio.sleep(180) # 每 180 秒执行一次检测
with db_lock:
if not DB_MODIFIED:
continue
files_to_sync = list(DB_MODIFIED)
DB_MODIFIED.clear()
# 抓取当前内存状态的快照,抓完立刻释放 db_lock,不影响前端请求
snapshot = {f: copy.deepcopy(DB_CACHE[f]) for f in files_to_sync}
# 获取异步上传锁,防止多个协程队列同时操作同一个文件
async with upload_lock:
for filename in files_to_sync:
try:
temp_name = f"temp_db_{uuid.uuid4().hex}.json"
with open(temp_name, 'w', encoding='utf-8') as f:
json.dump(snapshot[filename], f, ensure_ascii=False)
# 使用线程池进行上传,坚决不阻塞主事件循环
await asyncio.to_thread(
upload_file,
path_or_fileobj=temp_name,
path_in_repo=f"database/{filename}",
repo_id=REPO_ID,
repo_type="dataset",
token=TOKEN
)
if os.path.exists(temp_name): os.remove(temp_name)
print(f"🔄 [{filename}] 数据已静默打包同步至云端")
except Exception as e:
print(f"❌ [{filename}] 同步云端失败,将在下一周期重试: {e}")
# 如果网络崩溃上传失败,将其重新标记为脏数据等待下一次拯救
with db_lock:
DB_MODIFIED.add(filename)
# ==========================================
# 🌟 辅助业务逻辑工具库
# ==========================================
def _get_today_comment_count(db_data, username):
today_str = datetime.now().strftime("%Y-%m-%d")
count = 0
for song in db_data.values():
if isinstance(song, dict):
for c in song.get("comments", []):
if isinstance(c, dict) and c.get("user") == username and c.get("time", "").startswith(today_str): count += 1
return count