Flying-Music-API / api_playlist.py
ZHIWEI666's picture
Upload 9 files
b7b932b verified
import os
import uuid
import time
import asyncio
from fastapi import APIRouter, UploadFile, File, Form
from huggingface_hub import upload_file
from core import get_db, save_db, api, REPO_ID, TOKEN, PlaylistAddReq, PlaylistBatchAddReq, PlaylistDeleteReq, PlaylistLikeReq, PlaylistCommentReq
router = APIRouter()
@router.get("/api/cloud_pl/get")
async def get_cloud_playlists():
return {"status": "success", "data": get_db("cloud_playlists.json")}
@router.post("/api/cloud_pl/create")
async def create_cloud_playlist(username: str = Form(...), type: str = Form(...), name: str = Form(...), description: str = Form(""), image: UploadFile = File(None)):
try:
db = get_db("cloud_playlists.json")
if "admin" not in db: db["admin"] = {}
if "users" not in db: db["users"] = {}
target_dict = db["admin"] if type == "admin" else db["users"].setdefault(username, {})
if type == "admin" and username != "666666": return {"status": "error", "message": "权限不足"}
if type == "user" and len(target_dict) >= 3: return {"status": "error", "message": "上限3个"}
if name in target_dict: return {"status": "error", "message": "歌单已存在"}
cover_name = ""
if image and image.filename:
ext = os.path.splitext(image.filename)[1].lower()
cover_name = f"pl_{uuid.uuid4().hex}{ext}"
temp_path = f"temp_{cover_name}"
with open(temp_path, "wb") as f: f.write(await image.read())
await asyncio.to_thread(upload_file, path_or_fileobj=temp_path, path_in_repo=f"community_images/{cover_name}", repo_id=REPO_ID, repo_type="dataset", token=TOKEN)
if os.path.exists(temp_path): os.remove(temp_path)
target_dict[name] = {
"description": description[:100], "cover": cover_name, "songs": [], "likes": 0, "liked_by": [], "comments": []
}
save_db(db, "cloud_playlists.json")
return {"status": "success"}
except Exception as e: return {"status": "error", "message": str(e)}
@router.post("/api/cloud_pl/edit")
async def edit_cloud_playlist(username: str = Form(...), type: str = Form(...), old_name: str = Form(...), name: str = Form(...), description: str = Form(""), image: UploadFile = File(None)):
try:
db = get_db("cloud_playlists.json")
target_dict = db.get("admin", {}) if type == "admin" else db.get("users", {}).get(username, {})
if old_name not in target_dict: return {"status": "error", "message": "歌单不存在"}
pl_data = target_dict.pop(old_name)
if image and image.filename:
ext = os.path.splitext(image.filename)[1].lower()
cover_name = f"pl_{uuid.uuid4().hex}{ext}"
temp_path = f"temp_{cover_name}"
with open(temp_path, "wb") as f: f.write(await image.read())
await asyncio.to_thread(upload_file, path_or_fileobj=temp_path, path_in_repo=f"community_images/{cover_name}", repo_id=REPO_ID, repo_type="dataset", token=TOKEN)
if os.path.exists(temp_path): os.remove(temp_path)
if pl_data.get("cover"):
try: asyncio.create_task(asyncio.to_thread(api.delete_file, path_in_repo=f"community_images/{pl_data['cover']}", repo_id=REPO_ID, token=TOKEN))
except: pass
pl_data["cover"] = cover_name
pl_data["description"] = description[:100]
target_dict[name] = pl_data
save_db(db, "cloud_playlists.json")
return {"status": "success"}
except Exception as e: return {"status": "error", "message": str(e)}
@router.post("/api/cloud_pl/delete")
async def delete_cloud_playlist(req: PlaylistDeleteReq):
try:
db = get_db("cloud_playlists.json")
target_dict = db.get("admin", {}) if req.type == "admin" else db.get("users", {}).get(req.username, {})
if req.name not in target_dict: return {"status": "error", "message": "不存在或无权限"}
pl_data = target_dict.pop(req.name)
if pl_data.get("cover"):
try: asyncio.create_task(asyncio.to_thread(api.delete_file, path_in_repo=f"community_images/{pl_data['cover']}", repo_id=REPO_ID, token=TOKEN))
except: pass
save_db(db, "cloud_playlists.json")
return {"status": "success"}
except Exception as e: return {"status": "error", "message": str(e)}
@router.post("/api/cloud_pl/add")
async def add_to_cloud_playlist(req: PlaylistAddReq):
try:
db = get_db("cloud_playlists.json")
if req.type == "admin":
target_dict = db.get("admin", {})
else:
# 使用 author 定位歌单所有者,支持所有已登录用户往任意用户歌单添加歌曲
author = req.author if req.author else req.username
target_dict = db.get("users", {}).get(author, {})
if req.playlist_name not in target_dict: return {"status": "error", "message": "歌单不存在"}
songs = target_dict[req.playlist_name].get("songs", [])
if req.filename in songs: return {"status": "error", "message": "歌曲已存在于歌单中"}
songs.append(req.filename)
target_dict[req.playlist_name]["songs"] = songs
save_db(db, "cloud_playlists.json")
return {"status": "success"}
except Exception as e: return {"status": "error", "message": str(e)}
# 🌟 新增:批量添加歌曲到歌单接口
@router.post("/api/cloud_pl/batch_add")
async def batch_add_to_cloud_playlist(req: PlaylistBatchAddReq):
"""批量添加歌曲到歌单(一次请求处理多首)"""
try:
db = get_db("cloud_playlists.json")
if req.type == "admin":
target_dict = db.get("admin", {})
else:
author = req.author if req.author else req.username
target_dict = db.get("users", {}).get(author, {})
if req.playlist_name not in target_dict:
return {"status": "error", "message": "歌单不存在"}
songs = target_dict[req.playlist_name].get("songs", [])
existing = set(songs)
added_count = 0
for filename in req.filenames:
if filename not in existing:
songs.append(filename)
existing.add(filename)
added_count += 1
target_dict[req.playlist_name]["songs"] = songs
save_db(db, "cloud_playlists.json")
return {"status": "success", "added_count": added_count, "total": len(songs)}
except Exception as e:
return {"status": "error", "message": str(e)}
# 🌟 新增:云端歌单移除单曲接口
@router.post("/api/cloud_pl/remove_song")
async def remove_from_cloud_playlist(req: PlaylistAddReq):
try:
db = get_db("cloud_playlists.json")
target_dict = db.get("admin", {}) if req.type == "admin" else db.get("users", {}).get(req.username, {})
if req.playlist_name not in target_dict: return {"status": "error", "message": "歌单不存在"}
songs = target_dict[req.playlist_name].get("songs", [])
if req.filename in songs:
songs.remove(req.filename)
target_dict[req.playlist_name]["songs"] = songs
save_db(db, "cloud_playlists.json")
return {"status": "success"}
return {"status": "error", "message": "未在歌单中找到此歌曲"}
except Exception as e: return {"status": "error", "message": str(e)}
@router.post("/api/cloud_pl/like")
async def like_cloud_playlist(req: PlaylistLikeReq):
try:
db = get_db("cloud_playlists.json")
target_dict = db.get("admin", {}) if req.type == "admin" else db.get("users", {}).get(req.author, {})
pl = target_dict.get(req.name)
if not pl or isinstance(pl, list): return {"status": "error", "message": "格式异常或不存在"}
liked_by = pl.get("liked_by", [])
if req.username in liked_by:
liked_by.remove(req.username)
pl["likes"] = max(0, pl.get("likes", 1) - 1)
else:
liked_by.append(req.username)
pl["likes"] = pl.get("likes", 0) + 1
pl["liked_by"] = liked_by
save_db(db, "cloud_playlists.json")
return {"status": "success"}
except Exception as e: return {"status": "error", "message": str(e)}
@router.post("/api/cloud_pl/comment")
async def comment_cloud_playlist(req: PlaylistCommentReq):
try:
if len(req.content) > 99: return {"status": "error", "message": "评论过长"}
db = get_db("cloud_playlists.json")
target_dict = db.get("admin", {}) if req.type == "admin" else db.get("users", {}).get(req.author, {})
pl = target_dict.get(req.name)
if not pl or isinstance(pl, list): return {"status": "error", "message": "格式异常或不存在"}
if "comments" not in pl: pl["comments"] = []
new_comment = {"id": uuid.uuid4().hex, "user": req.username, "text": req.content, "time": int(time.time() * 1000)}
if req.parent_id:
parent_found = False
for c in pl["comments"]:
if c.get("id") == req.parent_id:
if "replies" not in c: c["replies"] = []
c["replies"].append(new_comment)
parent_found = True; break
if not parent_found: return {"status": "error", "message": "原评论不存在"}
else:
pl["comments"].append(new_comment)
save_db(db, "cloud_playlists.json")
return {"status": "success"}
except Exception as e: return {"status": "error", "message": str(e)}