Spaces:
Running
Running
| import os | |
| import json | |
| import uuid | |
| import asyncio | |
| import threading | |
| import copy | |
| from datetime import datetime | |
| from pydantic import BaseModel | |
| from huggingface_hub import HfApi, hf_hub_download, upload_file | |
| TOKEN = os.getenv("HF_TOKEN") | |
| REPO_ID = "ZHIWEI666/Flying-Music" | |
| api = HfApi(token=TOKEN) | |
| # ========================================== | |
| # 🌟 请求体数据模型 (Pydantic Models) | |
| # ========================================== | |
| class AuthReq(BaseModel): username: str; password: str; source: str = "" | |
| class LikeReq(BaseModel): song_hash: str; username: str | |
| class LikeReplyReq(BaseModel): song_hash: str; comment_id: str; reply_id: str; username: str | |
| class CommentReq(BaseModel): song_hash: str; username: str; content: str; parent_id: str = None; parent_user: str = None; parent_text: str = None | |
| class BasicReq(BaseModel): username: str = "guest" | |
| class PlaylistAddReq(BaseModel): username: str; type: str; author: str = ""; playlist_name: str; filename: str | |
| class PlaylistBatchAddReq(BaseModel): username: str; type: str; author: str = ""; playlist_name: str; filenames: list | |
| class PlaylistDeleteReq(BaseModel): username: str; type: str; name: str | |
| class PlaylistLikeReq(BaseModel): username: str; type: str; author: str; name: str | |
| class PlaylistCommentReq(BaseModel): username: str; type: str; author: str; name: str; content: str; parent_id: str = None; parent_user: str = None | |
| class ShareLikeReq(BaseModel): post_id: str; username: str | |
| class ShareCommentReq(BaseModel): post_id: str; username: str; content: str; parent_id: str = None; parent_user: str = None | |
| class ShareCommentActionReq(BaseModel): post_id: str; comment_id: str; username: str; action: str | |
| class ShareEditReq(BaseModel): post_id: str; username: str; title: str; content: str | |
| class ShareDeleteReq(BaseModel): post_id: str; username: str | |
| # ========================================== | |
| # 🌟 全局内存单例数据库与锁机制 | |
| # ========================================== | |
| DB_CACHE = { | |
| "users.json": {}, | |
| "interaction.json": {}, | |
| "cloud_playlists.json": {"admin": {}, "users": {}}, | |
| "community_posts.json": [], | |
| "co_listen_rooms.json": {} | |
| } | |
| DB_MODIFIED = set() | |
| # 同步锁:用于保护 get_db 和 save_db 的内存读写安全 | |
| db_lock = threading.Lock() | |
| # 异步锁:用于保护后台定时上传任务,防止协程并发撞车 | |
| upload_lock = asyncio.Lock() | |
| async def _download_db(filename, default_val): | |
| """底层辅助方法:从 HF 下载单个数据文件""" | |
| try: | |
| path = await asyncio.to_thread(hf_hub_download, repo_id=REPO_ID, filename=f"database/{filename}", repo_type="dataset", token=TOKEN) | |
| with open(path, 'r', encoding='utf-8') as f: return json.load(f) | |
| except: return default_val | |
| async def init_db(): | |
| """在 FastAPI 启动时调用,一次性将 5 个核心 JSON 载入内存字典""" | |
| global DB_CACHE | |
| print("⏳ 飞行音乐:正在拉取云端数据库到内存...") | |
| DB_CACHE["users.json"] = await _download_db("users.json", {}) | |
| DB_CACHE["interaction.json"] = await _download_db("interaction.json", {}) | |
| DB_CACHE["cloud_playlists.json"] = await _download_db("cloud_playlists.json", {"admin": {}, "users": {}}) | |
| DB_CACHE["community_posts.json"] = await _download_db("community_posts.json", []) | |
| # 共听房间:使用 clear + update 保持外部引用有效 | |
| rooms_data = await _download_db("co_listen_rooms.json", {}) | |
| for room in rooms_data.values(): | |
| room["online_users"] = [] | |
| DB_CACHE["co_listen_rooms.json"].clear() | |
| DB_CACHE["co_listen_rooms.json"].update(rooms_data) | |
| print("✅ 飞行音乐:高速内存单例数据库加载完毕!") | |
| def get_db(filename): | |
| """获取字典型数据(带线程锁与深拷贝防护)""" | |
| with db_lock: | |
| return copy.deepcopy(DB_CACHE.get(filename, {})) | |
| def get_db_ref(filename): | |
| """获取数据的原始引用(用于需要原地修改并同步的场景,需谨慎使用)""" | |
| with db_lock: | |
| return DB_CACHE[filename] | |
| def get_db_list(filename): | |
| """获取列表型数据(带线程锁与深拷贝防护)""" | |
| with db_lock: | |
| return copy.deepcopy(DB_CACHE.get(filename, [])) | |
| def save_db(data, filename): | |
| """将数据保存至内存并标记为脏数据,瞬间返回(不阻塞原路)""" | |
| with db_lock: | |
| DB_CACHE[filename] = copy.deepcopy(data) | |
| DB_MODIFIED.add(filename) | |
| async def periodic_sync(): | |
| """后台常驻协程,每隔 3 分钟将有变动的内存数据打包上传至 HF""" | |
| while True: | |
| await asyncio.sleep(180) # 每 180 秒执行一次检测 | |
| with db_lock: | |
| if not DB_MODIFIED: | |
| continue | |
| files_to_sync = list(DB_MODIFIED) | |
| DB_MODIFIED.clear() | |
| # 抓取当前内存状态的快照,抓完立刻释放 db_lock,不影响前端请求 | |
| snapshot = {f: copy.deepcopy(DB_CACHE[f]) for f in files_to_sync} | |
| # 获取异步上传锁,防止多个协程队列同时操作同一个文件 | |
| async with upload_lock: | |
| for filename in files_to_sync: | |
| try: | |
| temp_name = f"temp_db_{uuid.uuid4().hex}.json" | |
| with open(temp_name, 'w', encoding='utf-8') as f: | |
| json.dump(snapshot[filename], f, ensure_ascii=False) | |
| # 使用线程池进行上传,坚决不阻塞主事件循环 | |
| await asyncio.to_thread( | |
| upload_file, | |
| path_or_fileobj=temp_name, | |
| path_in_repo=f"database/{filename}", | |
| repo_id=REPO_ID, | |
| repo_type="dataset", | |
| token=TOKEN | |
| ) | |
| if os.path.exists(temp_name): os.remove(temp_name) | |
| print(f"🔄 [{filename}] 数据已静默打包同步至云端") | |
| except Exception as e: | |
| print(f"❌ [{filename}] 同步云端失败,将在下一周期重试: {e}") | |
| # 如果网络崩溃上传失败,将其重新标记为脏数据等待下一次拯救 | |
| with db_lock: | |
| DB_MODIFIED.add(filename) | |
| # ========================================== | |
| # 🌟 辅助业务逻辑工具库 | |
| # ========================================== | |
| def _get_today_comment_count(db_data, username): | |
| today_str = datetime.now().strftime("%Y-%m-%d") | |
| count = 0 | |
| for song in db_data.values(): | |
| if isinstance(song, dict): | |
| for c in song.get("comments", []): | |
| if isinstance(c, dict) and c.get("user") == username and c.get("time", "").startswith(today_str): count += 1 | |
| return count |