File size: 6,901 Bytes
db73228
 
 
a8e8490
6070d0c
 
db73228
 
 
 
 
 
 
 
6070d0c
 
 
34a9b56
db73228
 
 
 
81f2ffc
b7b932b
595e552
 
 
db73228
595e552
db73228
 
 
 
6070d0c
 
 
a8e8490
6070d0c
 
 
4910f9c
 
a8e8490
 
db73228
6070d0c
 
 
 
 
a8e8490
6070d0c
db73228
a8e8490
db73228
a8e8490
 
 
4910f9c
a8e8490
6070d0c
a8e8490
 
 
 
4910f9c
 
 
 
 
 
6070d0c
 
 
 
 
 
a8e8490
4910f9c
 
 
 
 
6070d0c
 
 
 
db73228
 
6070d0c
 
 
 
a8e8490
 
6070d0c
a8e8490
6070d0c
a8e8490
6070d0c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
db73228
6070d0c
 
 
db73228
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import os
import json
import uuid
import asyncio
import threading
import copy
from datetime import datetime
from pydantic import BaseModel
from huggingface_hub import HfApi, hf_hub_download, upload_file

TOKEN = os.getenv("HF_TOKEN")
REPO_ID = "ZHIWEI666/Flying-Music"
api = HfApi(token=TOKEN)

# ==========================================
# 🌟 请求体数据模型 (Pydantic Models)
# ==========================================
class AuthReq(BaseModel): username: str; password: str; source: str = ""
class LikeReq(BaseModel): song_hash: str; username: str
class LikeReplyReq(BaseModel): song_hash: str; comment_id: str; reply_id: str; username: str
class CommentReq(BaseModel): song_hash: str; username: str; content: str; parent_id: str = None; parent_user: str = None; parent_text: str = None
class BasicReq(BaseModel): username: str = "guest"
class PlaylistAddReq(BaseModel): username: str; type: str; author: str = ""; playlist_name: str; filename: str
class PlaylistBatchAddReq(BaseModel): username: str; type: str; author: str = ""; playlist_name: str; filenames: list
class PlaylistDeleteReq(BaseModel): username: str; type: str; name: str
class PlaylistLikeReq(BaseModel): username: str; type: str; author: str; name: str
class PlaylistCommentReq(BaseModel): username: str; type: str; author: str; name: str; content: str; parent_id: str = None; parent_user: str = None
class ShareLikeReq(BaseModel): post_id: str; username: str
class ShareCommentReq(BaseModel): post_id: str; username: str; content: str; parent_id: str = None; parent_user: str = None
class ShareCommentActionReq(BaseModel): post_id: str; comment_id: str; username: str; action: str
class ShareEditReq(BaseModel): post_id: str; username: str; title: str; content: str
class ShareDeleteReq(BaseModel): post_id: str; username: str

# ==========================================
# 🌟 全局内存单例数据库与锁机制
# ==========================================
DB_CACHE = {
    "users.json": {}, 
    "interaction.json": {},
    "cloud_playlists.json": {"admin": {}, "users": {}}, 
    "community_posts.json": [],
    "co_listen_rooms.json": {}
}
DB_MODIFIED = set()

# 同步锁:用于保护 get_db 和 save_db 的内存读写安全
db_lock = threading.Lock()
# 异步锁:用于保护后台定时上传任务,防止协程并发撞车
upload_lock = asyncio.Lock()

async def _download_db(filename, default_val):
    """底层辅助方法:从 HF 下载单个数据文件"""
    try:
        path = await asyncio.to_thread(hf_hub_download, repo_id=REPO_ID, filename=f"database/{filename}", repo_type="dataset", token=TOKEN)
        with open(path, 'r', encoding='utf-8') as f: return json.load(f)
    except: return default_val

async def init_db():
    """在 FastAPI 启动时调用,一次性将 5 个核心 JSON 载入内存字典"""
    global DB_CACHE
    print("⏳ 飞行音乐:正在拉取云端数据库到内存...")
    DB_CACHE["users.json"] = await _download_db("users.json", {})
    DB_CACHE["interaction.json"] = await _download_db("interaction.json", {})
    DB_CACHE["cloud_playlists.json"] = await _download_db("cloud_playlists.json", {"admin": {}, "users": {}})
    DB_CACHE["community_posts.json"] = await _download_db("community_posts.json", [])
    # 共听房间:使用 clear + update 保持外部引用有效
    rooms_data = await _download_db("co_listen_rooms.json", {})
    for room in rooms_data.values():
        room["online_users"] = []
    DB_CACHE["co_listen_rooms.json"].clear()
    DB_CACHE["co_listen_rooms.json"].update(rooms_data)
    print("✅ 飞行音乐:高速内存单例数据库加载完毕!")

def get_db(filename):
    """获取字典型数据(带线程锁与深拷贝防护)"""
    with db_lock:
        return copy.deepcopy(DB_CACHE.get(filename, {}))

def get_db_ref(filename):
    """获取数据的原始引用(用于需要原地修改并同步的场景,需谨慎使用)"""
    with db_lock:
        return DB_CACHE[filename]

def get_db_list(filename):
    """获取列表型数据(带线程锁与深拷贝防护)"""
    with db_lock:
        return copy.deepcopy(DB_CACHE.get(filename, []))

def save_db(data, filename):
    """将数据保存至内存并标记为脏数据,瞬间返回(不阻塞原路)"""
    with db_lock:
        DB_CACHE[filename] = copy.deepcopy(data)
        DB_MODIFIED.add(filename)

async def periodic_sync():
    """后台常驻协程,每隔 3 分钟将有变动的内存数据打包上传至 HF"""
    while True:
        await asyncio.sleep(180) # 每 180 秒执行一次检测
        
        with db_lock:
            if not DB_MODIFIED:
                continue
            files_to_sync = list(DB_MODIFIED)
            DB_MODIFIED.clear()
            # 抓取当前内存状态的快照,抓完立刻释放 db_lock,不影响前端请求
            snapshot = {f: copy.deepcopy(DB_CACHE[f]) for f in files_to_sync}

        # 获取异步上传锁,防止多个协程队列同时操作同一个文件
        async with upload_lock:
            for filename in files_to_sync:
                try:
                    temp_name = f"temp_db_{uuid.uuid4().hex}.json"
                    with open(temp_name, 'w', encoding='utf-8') as f: 
                        json.dump(snapshot[filename], f, ensure_ascii=False)
                    
                    # 使用线程池进行上传,坚决不阻塞主事件循环
                    await asyncio.to_thread(
                        upload_file, 
                        path_or_fileobj=temp_name, 
                        path_in_repo=f"database/{filename}", 
                        repo_id=REPO_ID, 
                        repo_type="dataset", 
                        token=TOKEN
                    )
                    if os.path.exists(temp_name): os.remove(temp_name)
                    print(f"🔄 [{filename}] 数据已静默打包同步至云端")
                except Exception as e:
                    print(f"❌ [{filename}] 同步云端失败,将在下一周期重试: {e}")
                    # 如果网络崩溃上传失败,将其重新标记为脏数据等待下一次拯救
                    with db_lock:
                        DB_MODIFIED.add(filename)

# ==========================================
# 🌟 辅助业务逻辑工具库
# ==========================================
def _get_today_comment_count(db_data, username):
    today_str = datetime.now().strftime("%Y-%m-%d")
    count = 0
    for song in db_data.values():
        if isinstance(song, dict):
            for c in song.get("comments", []):
                if isinstance(c, dict) and c.get("user") == username and c.get("time", "").startswith(today_str): count += 1
    return count