Spaces:
Sleeping
Sleeping
| import os | |
| import uuid | |
| import time | |
| import asyncio | |
| from fastapi import APIRouter, UploadFile, File, Form | |
| from huggingface_hub import upload_file | |
| from core import get_db, save_db, api, REPO_ID, TOKEN, PlaylistAddReq, PlaylistBatchAddReq, PlaylistDeleteReq, PlaylistLikeReq, PlaylistCommentReq | |
| router = APIRouter() | |
| async def get_cloud_playlists(): | |
| return {"status": "success", "data": get_db("cloud_playlists.json")} | |
| async def create_cloud_playlist(username: str = Form(...), type: str = Form(...), name: str = Form(...), description: str = Form(""), image: UploadFile = File(None)): | |
| try: | |
| db = get_db("cloud_playlists.json") | |
| if "admin" not in db: db["admin"] = {} | |
| if "users" not in db: db["users"] = {} | |
| target_dict = db["admin"] if type == "admin" else db["users"].setdefault(username, {}) | |
| if type == "admin" and username != "666666": return {"status": "error", "message": "权限不足"} | |
| if type == "user" and len(target_dict) >= 3: return {"status": "error", "message": "上限3个"} | |
| if name in target_dict: return {"status": "error", "message": "歌单已存在"} | |
| cover_name = "" | |
| if image and image.filename: | |
| ext = os.path.splitext(image.filename)[1].lower() | |
| cover_name = f"pl_{uuid.uuid4().hex}{ext}" | |
| temp_path = f"temp_{cover_name}" | |
| with open(temp_path, "wb") as f: f.write(await image.read()) | |
| await asyncio.to_thread(upload_file, path_or_fileobj=temp_path, path_in_repo=f"community_images/{cover_name}", repo_id=REPO_ID, repo_type="dataset", token=TOKEN) | |
| if os.path.exists(temp_path): os.remove(temp_path) | |
| target_dict[name] = { | |
| "description": description[:100], "cover": cover_name, "songs": [], "likes": 0, "liked_by": [], "comments": [] | |
| } | |
| save_db(db, "cloud_playlists.json") | |
| return {"status": "success"} | |
| except Exception as e: return {"status": "error", "message": str(e)} | |
| async def edit_cloud_playlist(username: str = Form(...), type: str = Form(...), old_name: str = Form(...), name: str = Form(...), description: str = Form(""), image: UploadFile = File(None)): | |
| try: | |
| db = get_db("cloud_playlists.json") | |
| target_dict = db.get("admin", {}) if type == "admin" else db.get("users", {}).get(username, {}) | |
| if old_name not in target_dict: return {"status": "error", "message": "歌单不存在"} | |
| pl_data = target_dict.pop(old_name) | |
| if image and image.filename: | |
| ext = os.path.splitext(image.filename)[1].lower() | |
| cover_name = f"pl_{uuid.uuid4().hex}{ext}" | |
| temp_path = f"temp_{cover_name}" | |
| with open(temp_path, "wb") as f: f.write(await image.read()) | |
| await asyncio.to_thread(upload_file, path_or_fileobj=temp_path, path_in_repo=f"community_images/{cover_name}", repo_id=REPO_ID, repo_type="dataset", token=TOKEN) | |
| if os.path.exists(temp_path): os.remove(temp_path) | |
| if pl_data.get("cover"): | |
| try: asyncio.create_task(asyncio.to_thread(api.delete_file, path_in_repo=f"community_images/{pl_data['cover']}", repo_id=REPO_ID, token=TOKEN)) | |
| except: pass | |
| pl_data["cover"] = cover_name | |
| pl_data["description"] = description[:100] | |
| target_dict[name] = pl_data | |
| save_db(db, "cloud_playlists.json") | |
| return {"status": "success"} | |
| except Exception as e: return {"status": "error", "message": str(e)} | |
| async def delete_cloud_playlist(req: PlaylistDeleteReq): | |
| try: | |
| db = get_db("cloud_playlists.json") | |
| target_dict = db.get("admin", {}) if req.type == "admin" else db.get("users", {}).get(req.username, {}) | |
| if req.name not in target_dict: return {"status": "error", "message": "不存在或无权限"} | |
| pl_data = target_dict.pop(req.name) | |
| if pl_data.get("cover"): | |
| try: asyncio.create_task(asyncio.to_thread(api.delete_file, path_in_repo=f"community_images/{pl_data['cover']}", repo_id=REPO_ID, token=TOKEN)) | |
| except: pass | |
| save_db(db, "cloud_playlists.json") | |
| return {"status": "success"} | |
| except Exception as e: return {"status": "error", "message": str(e)} | |
| async def add_to_cloud_playlist(req: PlaylistAddReq): | |
| try: | |
| db = get_db("cloud_playlists.json") | |
| if req.type == "admin": | |
| target_dict = db.get("admin", {}) | |
| else: | |
| # 使用 author 定位歌单所有者,支持所有已登录用户往任意用户歌单添加歌曲 | |
| author = req.author if req.author else req.username | |
| target_dict = db.get("users", {}).get(author, {}) | |
| if req.playlist_name not in target_dict: return {"status": "error", "message": "歌单不存在"} | |
| songs = target_dict[req.playlist_name].get("songs", []) | |
| if req.filename in songs: return {"status": "error", "message": "歌曲已存在于歌单中"} | |
| songs.append(req.filename) | |
| target_dict[req.playlist_name]["songs"] = songs | |
| save_db(db, "cloud_playlists.json") | |
| return {"status": "success"} | |
| except Exception as e: return {"status": "error", "message": str(e)} | |
| # 🌟 新增:批量添加歌曲到歌单接口 | |
| async def batch_add_to_cloud_playlist(req: PlaylistBatchAddReq): | |
| """批量添加歌曲到歌单(一次请求处理多首)""" | |
| try: | |
| db = get_db("cloud_playlists.json") | |
| if req.type == "admin": | |
| target_dict = db.get("admin", {}) | |
| else: | |
| author = req.author if req.author else req.username | |
| target_dict = db.get("users", {}).get(author, {}) | |
| if req.playlist_name not in target_dict: | |
| return {"status": "error", "message": "歌单不存在"} | |
| songs = target_dict[req.playlist_name].get("songs", []) | |
| existing = set(songs) | |
| added_count = 0 | |
| for filename in req.filenames: | |
| if filename not in existing: | |
| songs.append(filename) | |
| existing.add(filename) | |
| added_count += 1 | |
| target_dict[req.playlist_name]["songs"] = songs | |
| save_db(db, "cloud_playlists.json") | |
| return {"status": "success", "added_count": added_count, "total": len(songs)} | |
| except Exception as e: | |
| return {"status": "error", "message": str(e)} | |
| # 🌟 新增:云端歌单移除单曲接口 | |
| async def remove_from_cloud_playlist(req: PlaylistAddReq): | |
| try: | |
| db = get_db("cloud_playlists.json") | |
| target_dict = db.get("admin", {}) if req.type == "admin" else db.get("users", {}).get(req.username, {}) | |
| if req.playlist_name not in target_dict: return {"status": "error", "message": "歌单不存在"} | |
| songs = target_dict[req.playlist_name].get("songs", []) | |
| if req.filename in songs: | |
| songs.remove(req.filename) | |
| target_dict[req.playlist_name]["songs"] = songs | |
| save_db(db, "cloud_playlists.json") | |
| return {"status": "success"} | |
| return {"status": "error", "message": "未在歌单中找到此歌曲"} | |
| except Exception as e: return {"status": "error", "message": str(e)} | |
| async def like_cloud_playlist(req: PlaylistLikeReq): | |
| try: | |
| db = get_db("cloud_playlists.json") | |
| target_dict = db.get("admin", {}) if req.type == "admin" else db.get("users", {}).get(req.author, {}) | |
| pl = target_dict.get(req.name) | |
| if not pl or isinstance(pl, list): return {"status": "error", "message": "格式异常或不存在"} | |
| liked_by = pl.get("liked_by", []) | |
| if req.username in liked_by: | |
| liked_by.remove(req.username) | |
| pl["likes"] = max(0, pl.get("likes", 1) - 1) | |
| else: | |
| liked_by.append(req.username) | |
| pl["likes"] = pl.get("likes", 0) + 1 | |
| pl["liked_by"] = liked_by | |
| save_db(db, "cloud_playlists.json") | |
| return {"status": "success"} | |
| except Exception as e: return {"status": "error", "message": str(e)} | |
| async def comment_cloud_playlist(req: PlaylistCommentReq): | |
| try: | |
| if len(req.content) > 99: return {"status": "error", "message": "评论过长"} | |
| db = get_db("cloud_playlists.json") | |
| target_dict = db.get("admin", {}) if req.type == "admin" else db.get("users", {}).get(req.author, {}) | |
| pl = target_dict.get(req.name) | |
| if not pl or isinstance(pl, list): return {"status": "error", "message": "格式异常或不存在"} | |
| if "comments" not in pl: pl["comments"] = [] | |
| new_comment = {"id": uuid.uuid4().hex, "user": req.username, "text": req.content, "time": int(time.time() * 1000)} | |
| if req.parent_id: | |
| parent_found = False | |
| for c in pl["comments"]: | |
| if c.get("id") == req.parent_id: | |
| if "replies" not in c: c["replies"] = [] | |
| c["replies"].append(new_comment) | |
| parent_found = True; break | |
| if not parent_found: return {"status": "error", "message": "原评论不存在"} | |
| else: | |
| pl["comments"].append(new_comment) | |
| save_db(db, "cloud_playlists.json") | |
| return {"status": "success"} | |
| except Exception as e: return {"status": "error", "message": str(e)} |