import os import uuid import time import asyncio from fastapi import APIRouter, UploadFile, File, Form from huggingface_hub import upload_file from core import get_db, save_db, api, REPO_ID, TOKEN, PlaylistAddReq, PlaylistBatchAddReq, PlaylistDeleteReq, PlaylistLikeReq, PlaylistCommentReq router = APIRouter() @router.get("/api/cloud_pl/get") async def get_cloud_playlists(): return {"status": "success", "data": get_db("cloud_playlists.json")} @router.post("/api/cloud_pl/create") async def create_cloud_playlist(username: str = Form(...), type: str = Form(...), name: str = Form(...), description: str = Form(""), image: UploadFile = File(None)): try: db = get_db("cloud_playlists.json") if "admin" not in db: db["admin"] = {} if "users" not in db: db["users"] = {} target_dict = db["admin"] if type == "admin" else db["users"].setdefault(username, {}) if type == "admin" and username != "666666": return {"status": "error", "message": "权限不足"} if type == "user" and len(target_dict) >= 3: return {"status": "error", "message": "上限3个"} if name in target_dict: return {"status": "error", "message": "歌单已存在"} cover_name = "" if image and image.filename: ext = os.path.splitext(image.filename)[1].lower() cover_name = f"pl_{uuid.uuid4().hex}{ext}" temp_path = f"temp_{cover_name}" with open(temp_path, "wb") as f: f.write(await image.read()) await asyncio.to_thread(upload_file, path_or_fileobj=temp_path, path_in_repo=f"community_images/{cover_name}", repo_id=REPO_ID, repo_type="dataset", token=TOKEN) if os.path.exists(temp_path): os.remove(temp_path) target_dict[name] = { "description": description[:100], "cover": cover_name, "songs": [], "likes": 0, "liked_by": [], "comments": [] } save_db(db, "cloud_playlists.json") return {"status": "success"} except Exception as e: return {"status": "error", "message": str(e)} @router.post("/api/cloud_pl/edit") async def edit_cloud_playlist(username: str = Form(...), type: str = Form(...), old_name: str = Form(...), name: str = Form(...), description: str = Form(""), image: UploadFile = File(None)): try: db = get_db("cloud_playlists.json") target_dict = db.get("admin", {}) if type == "admin" else db.get("users", {}).get(username, {}) if old_name not in target_dict: return {"status": "error", "message": "歌单不存在"} pl_data = target_dict.pop(old_name) if image and image.filename: ext = os.path.splitext(image.filename)[1].lower() cover_name = f"pl_{uuid.uuid4().hex}{ext}" temp_path = f"temp_{cover_name}" with open(temp_path, "wb") as f: f.write(await image.read()) await asyncio.to_thread(upload_file, path_or_fileobj=temp_path, path_in_repo=f"community_images/{cover_name}", repo_id=REPO_ID, repo_type="dataset", token=TOKEN) if os.path.exists(temp_path): os.remove(temp_path) if pl_data.get("cover"): try: asyncio.create_task(asyncio.to_thread(api.delete_file, path_in_repo=f"community_images/{pl_data['cover']}", repo_id=REPO_ID, token=TOKEN)) except: pass pl_data["cover"] = cover_name pl_data["description"] = description[:100] target_dict[name] = pl_data save_db(db, "cloud_playlists.json") return {"status": "success"} except Exception as e: return {"status": "error", "message": str(e)} @router.post("/api/cloud_pl/delete") async def delete_cloud_playlist(req: PlaylistDeleteReq): try: db = get_db("cloud_playlists.json") target_dict = db.get("admin", {}) if req.type == "admin" else db.get("users", {}).get(req.username, {}) if req.name not in target_dict: return {"status": "error", "message": "不存在或无权限"} pl_data = target_dict.pop(req.name) if pl_data.get("cover"): try: asyncio.create_task(asyncio.to_thread(api.delete_file, path_in_repo=f"community_images/{pl_data['cover']}", repo_id=REPO_ID, token=TOKEN)) except: pass save_db(db, "cloud_playlists.json") return {"status": "success"} except Exception as e: return {"status": "error", "message": str(e)} @router.post("/api/cloud_pl/add") async def add_to_cloud_playlist(req: PlaylistAddReq): try: db = get_db("cloud_playlists.json") if req.type == "admin": target_dict = db.get("admin", {}) else: # 使用 author 定位歌单所有者,支持所有已登录用户往任意用户歌单添加歌曲 author = req.author if req.author else req.username target_dict = db.get("users", {}).get(author, {}) if req.playlist_name not in target_dict: return {"status": "error", "message": "歌单不存在"} songs = target_dict[req.playlist_name].get("songs", []) if req.filename in songs: return {"status": "error", "message": "歌曲已存在于歌单中"} songs.append(req.filename) target_dict[req.playlist_name]["songs"] = songs save_db(db, "cloud_playlists.json") return {"status": "success"} except Exception as e: return {"status": "error", "message": str(e)} # 🌟 新增:批量添加歌曲到歌单接口 @router.post("/api/cloud_pl/batch_add") async def batch_add_to_cloud_playlist(req: PlaylistBatchAddReq): """批量添加歌曲到歌单(一次请求处理多首)""" try: db = get_db("cloud_playlists.json") if req.type == "admin": target_dict = db.get("admin", {}) else: author = req.author if req.author else req.username target_dict = db.get("users", {}).get(author, {}) if req.playlist_name not in target_dict: return {"status": "error", "message": "歌单不存在"} songs = target_dict[req.playlist_name].get("songs", []) existing = set(songs) added_count = 0 for filename in req.filenames: if filename not in existing: songs.append(filename) existing.add(filename) added_count += 1 target_dict[req.playlist_name]["songs"] = songs save_db(db, "cloud_playlists.json") return {"status": "success", "added_count": added_count, "total": len(songs)} except Exception as e: return {"status": "error", "message": str(e)} # 🌟 新增:云端歌单移除单曲接口 @router.post("/api/cloud_pl/remove_song") async def remove_from_cloud_playlist(req: PlaylistAddReq): try: db = get_db("cloud_playlists.json") target_dict = db.get("admin", {}) if req.type == "admin" else db.get("users", {}).get(req.username, {}) if req.playlist_name not in target_dict: return {"status": "error", "message": "歌单不存在"} songs = target_dict[req.playlist_name].get("songs", []) if req.filename in songs: songs.remove(req.filename) target_dict[req.playlist_name]["songs"] = songs save_db(db, "cloud_playlists.json") return {"status": "success"} return {"status": "error", "message": "未在歌单中找到此歌曲"} except Exception as e: return {"status": "error", "message": str(e)} @router.post("/api/cloud_pl/like") async def like_cloud_playlist(req: PlaylistLikeReq): try: db = get_db("cloud_playlists.json") target_dict = db.get("admin", {}) if req.type == "admin" else db.get("users", {}).get(req.author, {}) pl = target_dict.get(req.name) if not pl or isinstance(pl, list): return {"status": "error", "message": "格式异常或不存在"} liked_by = pl.get("liked_by", []) if req.username in liked_by: liked_by.remove(req.username) pl["likes"] = max(0, pl.get("likes", 1) - 1) else: liked_by.append(req.username) pl["likes"] = pl.get("likes", 0) + 1 pl["liked_by"] = liked_by save_db(db, "cloud_playlists.json") return {"status": "success"} except Exception as e: return {"status": "error", "message": str(e)} @router.post("/api/cloud_pl/comment") async def comment_cloud_playlist(req: PlaylistCommentReq): try: if len(req.content) > 99: return {"status": "error", "message": "评论过长"} db = get_db("cloud_playlists.json") target_dict = db.get("admin", {}) if req.type == "admin" else db.get("users", {}).get(req.author, {}) pl = target_dict.get(req.name) if not pl or isinstance(pl, list): return {"status": "error", "message": "格式异常或不存在"} if "comments" not in pl: pl["comments"] = [] new_comment = {"id": uuid.uuid4().hex, "user": req.username, "text": req.content, "time": int(time.time() * 1000)} if req.parent_id: parent_found = False for c in pl["comments"]: if c.get("id") == req.parent_id: if "replies" not in c: c["replies"] = [] c["replies"].append(new_comment) parent_found = True; break if not parent_found: return {"status": "error", "message": "原评论不存在"} else: pl["comments"].append(new_comment) save_db(db, "cloud_playlists.json") return {"status": "success"} except Exception as e: return {"status": "error", "message": str(e)}