Spaces:
Sleeping
Sleeping
File size: 9,894 Bytes
61e67a1 d1e778f a8e8490 61e67a1 b7b932b db73228 a8e8490 db73228 61e67a1 db73228 61e67a1 34a9b56 61e67a1 a8e8490 48ca787 61e67a1 a8e8490 61e67a1 a8e8490 48ca787 db73228 61e67a1 a8e8490 61e67a1 a8e8490 61e67a1 a8e8490 61e67a1 a8e8490 61e67a1 a8e8490 61e67a1 a8e8490 61e67a1 db73228 a8e8490 db73228 81f2ffc a8e8490 d1e778f b7b932b 48ca787 d1e778f a8e8490 d1e778f a8e8490 d1e778f a8e8490 d1e778f a8e8490 db73228 d1e778f a8e8490 d1e778f a8e8490 595e552 a8e8490 595e552 a8e8490 d1e778f a8e8490 db73228 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 | import os
import uuid
import time
import asyncio
from fastapi import APIRouter, UploadFile, File, Form
from huggingface_hub import upload_file
from core import get_db, save_db, api, REPO_ID, TOKEN, PlaylistAddReq, PlaylistBatchAddReq, PlaylistDeleteReq, PlaylistLikeReq, PlaylistCommentReq
router = APIRouter()
@router.get("/api/cloud_pl/get")
async def get_cloud_playlists():
return {"status": "success", "data": get_db("cloud_playlists.json")}
@router.post("/api/cloud_pl/create")
async def create_cloud_playlist(username: str = Form(...), type: str = Form(...), name: str = Form(...), description: str = Form(""), image: UploadFile = File(None)):
try:
db = get_db("cloud_playlists.json")
if "admin" not in db: db["admin"] = {}
if "users" not in db: db["users"] = {}
target_dict = db["admin"] if type == "admin" else db["users"].setdefault(username, {})
if type == "admin" and username != "666666": return {"status": "error", "message": "权限不足"}
if type == "user" and len(target_dict) >= 3: return {"status": "error", "message": "上限3个"}
if name in target_dict: return {"status": "error", "message": "歌单已存在"}
cover_name = ""
if image and image.filename:
ext = os.path.splitext(image.filename)[1].lower()
cover_name = f"pl_{uuid.uuid4().hex}{ext}"
temp_path = f"temp_{cover_name}"
with open(temp_path, "wb") as f: f.write(await image.read())
await asyncio.to_thread(upload_file, path_or_fileobj=temp_path, path_in_repo=f"community_images/{cover_name}", repo_id=REPO_ID, repo_type="dataset", token=TOKEN)
if os.path.exists(temp_path): os.remove(temp_path)
target_dict[name] = {
"description": description[:100], "cover": cover_name, "songs": [], "likes": 0, "liked_by": [], "comments": []
}
save_db(db, "cloud_playlists.json")
return {"status": "success"}
except Exception as e: return {"status": "error", "message": str(e)}
@router.post("/api/cloud_pl/edit")
async def edit_cloud_playlist(username: str = Form(...), type: str = Form(...), old_name: str = Form(...), name: str = Form(...), description: str = Form(""), image: UploadFile = File(None)):
try:
db = get_db("cloud_playlists.json")
target_dict = db.get("admin", {}) if type == "admin" else db.get("users", {}).get(username, {})
if old_name not in target_dict: return {"status": "error", "message": "歌单不存在"}
pl_data = target_dict.pop(old_name)
if image and image.filename:
ext = os.path.splitext(image.filename)[1].lower()
cover_name = f"pl_{uuid.uuid4().hex}{ext}"
temp_path = f"temp_{cover_name}"
with open(temp_path, "wb") as f: f.write(await image.read())
await asyncio.to_thread(upload_file, path_or_fileobj=temp_path, path_in_repo=f"community_images/{cover_name}", repo_id=REPO_ID, repo_type="dataset", token=TOKEN)
if os.path.exists(temp_path): os.remove(temp_path)
if pl_data.get("cover"):
try: asyncio.create_task(asyncio.to_thread(api.delete_file, path_in_repo=f"community_images/{pl_data['cover']}", repo_id=REPO_ID, token=TOKEN))
except: pass
pl_data["cover"] = cover_name
pl_data["description"] = description[:100]
target_dict[name] = pl_data
save_db(db, "cloud_playlists.json")
return {"status": "success"}
except Exception as e: return {"status": "error", "message": str(e)}
@router.post("/api/cloud_pl/delete")
async def delete_cloud_playlist(req: PlaylistDeleteReq):
try:
db = get_db("cloud_playlists.json")
target_dict = db.get("admin", {}) if req.type == "admin" else db.get("users", {}).get(req.username, {})
if req.name not in target_dict: return {"status": "error", "message": "不存在或无权限"}
pl_data = target_dict.pop(req.name)
if pl_data.get("cover"):
try: asyncio.create_task(asyncio.to_thread(api.delete_file, path_in_repo=f"community_images/{pl_data['cover']}", repo_id=REPO_ID, token=TOKEN))
except: pass
save_db(db, "cloud_playlists.json")
return {"status": "success"}
except Exception as e: return {"status": "error", "message": str(e)}
@router.post("/api/cloud_pl/add")
async def add_to_cloud_playlist(req: PlaylistAddReq):
try:
db = get_db("cloud_playlists.json")
if req.type == "admin":
target_dict = db.get("admin", {})
else:
# 使用 author 定位歌单所有者,支持所有已登录用户往任意用户歌单添加歌曲
author = req.author if req.author else req.username
target_dict = db.get("users", {}).get(author, {})
if req.playlist_name not in target_dict: return {"status": "error", "message": "歌单不存在"}
songs = target_dict[req.playlist_name].get("songs", [])
if req.filename in songs: return {"status": "error", "message": "歌曲已存在于歌单中"}
songs.append(req.filename)
target_dict[req.playlist_name]["songs"] = songs
save_db(db, "cloud_playlists.json")
return {"status": "success"}
except Exception as e: return {"status": "error", "message": str(e)}
# 🌟 新增:批量添加歌曲到歌单接口
@router.post("/api/cloud_pl/batch_add")
async def batch_add_to_cloud_playlist(req: PlaylistBatchAddReq):
"""批量添加歌曲到歌单(一次请求处理多首)"""
try:
db = get_db("cloud_playlists.json")
if req.type == "admin":
target_dict = db.get("admin", {})
else:
author = req.author if req.author else req.username
target_dict = db.get("users", {}).get(author, {})
if req.playlist_name not in target_dict:
return {"status": "error", "message": "歌单不存在"}
songs = target_dict[req.playlist_name].get("songs", [])
existing = set(songs)
added_count = 0
for filename in req.filenames:
if filename not in existing:
songs.append(filename)
existing.add(filename)
added_count += 1
target_dict[req.playlist_name]["songs"] = songs
save_db(db, "cloud_playlists.json")
return {"status": "success", "added_count": added_count, "total": len(songs)}
except Exception as e:
return {"status": "error", "message": str(e)}
# 🌟 新增:云端歌单移除单曲接口
@router.post("/api/cloud_pl/remove_song")
async def remove_from_cloud_playlist(req: PlaylistAddReq):
try:
db = get_db("cloud_playlists.json")
target_dict = db.get("admin", {}) if req.type == "admin" else db.get("users", {}).get(req.username, {})
if req.playlist_name not in target_dict: return {"status": "error", "message": "歌单不存在"}
songs = target_dict[req.playlist_name].get("songs", [])
if req.filename in songs:
songs.remove(req.filename)
target_dict[req.playlist_name]["songs"] = songs
save_db(db, "cloud_playlists.json")
return {"status": "success"}
return {"status": "error", "message": "未在歌单中找到此歌曲"}
except Exception as e: return {"status": "error", "message": str(e)}
@router.post("/api/cloud_pl/like")
async def like_cloud_playlist(req: PlaylistLikeReq):
try:
db = get_db("cloud_playlists.json")
target_dict = db.get("admin", {}) if req.type == "admin" else db.get("users", {}).get(req.author, {})
pl = target_dict.get(req.name)
if not pl or isinstance(pl, list): return {"status": "error", "message": "格式异常或不存在"}
liked_by = pl.get("liked_by", [])
if req.username in liked_by:
liked_by.remove(req.username)
pl["likes"] = max(0, pl.get("likes", 1) - 1)
else:
liked_by.append(req.username)
pl["likes"] = pl.get("likes", 0) + 1
pl["liked_by"] = liked_by
save_db(db, "cloud_playlists.json")
return {"status": "success"}
except Exception as e: return {"status": "error", "message": str(e)}
@router.post("/api/cloud_pl/comment")
async def comment_cloud_playlist(req: PlaylistCommentReq):
try:
if len(req.content) > 99: return {"status": "error", "message": "评论过长"}
db = get_db("cloud_playlists.json")
target_dict = db.get("admin", {}) if req.type == "admin" else db.get("users", {}).get(req.author, {})
pl = target_dict.get(req.name)
if not pl or isinstance(pl, list): return {"status": "error", "message": "格式异常或不存在"}
if "comments" not in pl: pl["comments"] = []
new_comment = {"id": uuid.uuid4().hex, "user": req.username, "text": req.content, "time": int(time.time() * 1000)}
if req.parent_id:
parent_found = False
for c in pl["comments"]:
if c.get("id") == req.parent_id:
if "replies" not in c: c["replies"] = []
c["replies"].append(new_comment)
parent_found = True; break
if not parent_found: return {"status": "error", "message": "原评论不存在"}
else:
pl["comments"].append(new_comment)
save_db(db, "cloud_playlists.json")
return {"status": "success"}
except Exception as e: return {"status": "error", "message": str(e)} |