import os import json import uuid import asyncio import threading import copy from datetime import datetime from pydantic import BaseModel from huggingface_hub import HfApi, hf_hub_download, upload_file TOKEN = os.getenv("HF_TOKEN") REPO_ID = "ZHIWEI666/Flying-Music" api = HfApi(token=TOKEN) # ========================================== # 🌟 请求体数据模型 (Pydantic Models) # ========================================== class AuthReq(BaseModel): username: str; password: str; source: str = "" class LikeReq(BaseModel): song_hash: str; username: str class LikeReplyReq(BaseModel): song_hash: str; comment_id: str; reply_id: str; username: str class CommentReq(BaseModel): song_hash: str; username: str; content: str; parent_id: str = None; parent_user: str = None; parent_text: str = None class BasicReq(BaseModel): username: str = "guest" class PlaylistAddReq(BaseModel): username: str; type: str; author: str = ""; playlist_name: str; filename: str class PlaylistBatchAddReq(BaseModel): username: str; type: str; author: str = ""; playlist_name: str; filenames: list class PlaylistDeleteReq(BaseModel): username: str; type: str; name: str class PlaylistLikeReq(BaseModel): username: str; type: str; author: str; name: str class PlaylistCommentReq(BaseModel): username: str; type: str; author: str; name: str; content: str; parent_id: str = None; parent_user: str = None class ShareLikeReq(BaseModel): post_id: str; username: str class ShareCommentReq(BaseModel): post_id: str; username: str; content: str; parent_id: str = None; parent_user: str = None class ShareCommentActionReq(BaseModel): post_id: str; comment_id: str; username: str; action: str class ShareEditReq(BaseModel): post_id: str; username: str; title: str; content: str class ShareDeleteReq(BaseModel): post_id: str; username: str # ========================================== # 🌟 全局内存单例数据库与锁机制 # ========================================== DB_CACHE = { "users.json": {}, "interaction.json": {}, "cloud_playlists.json": {"admin": {}, "users": {}}, "community_posts.json": [], "co_listen_rooms.json": {} } DB_MODIFIED = set() # 同步锁:用于保护 get_db 和 save_db 的内存读写安全 db_lock = threading.Lock() # 异步锁:用于保护后台定时上传任务,防止协程并发撞车 upload_lock = asyncio.Lock() async def _download_db(filename, default_val): """底层辅助方法:从 HF 下载单个数据文件""" try: path = await asyncio.to_thread(hf_hub_download, repo_id=REPO_ID, filename=f"database/{filename}", repo_type="dataset", token=TOKEN) with open(path, 'r', encoding='utf-8') as f: return json.load(f) except: return default_val async def init_db(): """在 FastAPI 启动时调用,一次性将 5 个核心 JSON 载入内存字典""" global DB_CACHE print("⏳ 飞行音乐:正在拉取云端数据库到内存...") DB_CACHE["users.json"] = await _download_db("users.json", {}) DB_CACHE["interaction.json"] = await _download_db("interaction.json", {}) DB_CACHE["cloud_playlists.json"] = await _download_db("cloud_playlists.json", {"admin": {}, "users": {}}) DB_CACHE["community_posts.json"] = await _download_db("community_posts.json", []) # 共听房间:使用 clear + update 保持外部引用有效 rooms_data = await _download_db("co_listen_rooms.json", {}) for room in rooms_data.values(): room["online_users"] = [] DB_CACHE["co_listen_rooms.json"].clear() DB_CACHE["co_listen_rooms.json"].update(rooms_data) print("✅ 飞行音乐:高速内存单例数据库加载完毕!") def get_db(filename): """获取字典型数据(带线程锁与深拷贝防护)""" with db_lock: return copy.deepcopy(DB_CACHE.get(filename, {})) def get_db_ref(filename): """获取数据的原始引用(用于需要原地修改并同步的场景,需谨慎使用)""" with db_lock: return DB_CACHE[filename] def get_db_list(filename): """获取列表型数据(带线程锁与深拷贝防护)""" with db_lock: return copy.deepcopy(DB_CACHE.get(filename, [])) def save_db(data, filename): """将数据保存至内存并标记为脏数据,瞬间返回(不阻塞原路)""" with db_lock: DB_CACHE[filename] = copy.deepcopy(data) DB_MODIFIED.add(filename) async def periodic_sync(): """后台常驻协程,每隔 3 分钟将有变动的内存数据打包上传至 HF""" while True: await asyncio.sleep(180) # 每 180 秒执行一次检测 with db_lock: if not DB_MODIFIED: continue files_to_sync = list(DB_MODIFIED) DB_MODIFIED.clear() # 抓取当前内存状态的快照,抓完立刻释放 db_lock,不影响前端请求 snapshot = {f: copy.deepcopy(DB_CACHE[f]) for f in files_to_sync} # 获取异步上传锁,防止多个协程队列同时操作同一个文件 async with upload_lock: for filename in files_to_sync: try: temp_name = f"temp_db_{uuid.uuid4().hex}.json" with open(temp_name, 'w', encoding='utf-8') as f: json.dump(snapshot[filename], f, ensure_ascii=False) # 使用线程池进行上传,坚决不阻塞主事件循环 await asyncio.to_thread( upload_file, path_or_fileobj=temp_name, path_in_repo=f"database/{filename}", repo_id=REPO_ID, repo_type="dataset", token=TOKEN ) if os.path.exists(temp_name): os.remove(temp_name) print(f"🔄 [{filename}] 数据已静默打包同步至云端") except Exception as e: print(f"❌ [{filename}] 同步云端失败,将在下一周期重试: {e}") # 如果网络崩溃上传失败,将其重新标记为脏数据等待下一次拯救 with db_lock: DB_MODIFIED.add(filename) # ========================================== # 🌟 辅助业务逻辑工具库 # ========================================== def _get_today_comment_count(db_data, username): today_str = datetime.now().strftime("%Y-%m-%d") count = 0 for song in db_data.values(): if isinstance(song, dict): for c in song.get("comments", []): if isinstance(c, dict) and c.get("user") == username and c.get("time", "").startswith(today_str): count += 1 return count