ZHIWEI666 commited on
Commit
61e67a1
·
verified ·
1 Parent(s): db73228

Upload api_playlist.py

Browse files
Files changed (1) hide show
  1. api_playlist.py +84 -16
api_playlist.py CHANGED
@@ -1,40 +1,108 @@
1
- from fastapi import APIRouter
2
- from core import get_db, save_db, PlaylistCreateReq, PlaylistAddReq
 
 
 
 
3
 
4
  router = APIRouter()
5
 
 
 
 
 
 
 
 
 
 
 
 
6
  @router.get("/api/cloud_pl/get")
7
  def get_cloud_playlists():
8
  return {"status": "success", "data": get_db("cloud_playlists.json")}
9
 
10
  @router.post("/api/cloud_pl/create")
11
- def create_cloud_playlist(req: PlaylistCreateReq):
12
  try:
13
  db = get_db("cloud_playlists.json")
14
  if "admin" not in db: db["admin"] = {}
15
  if "users" not in db: db["users"] = {}
16
- if req.type == "admin":
17
- if req.username != "猪的飞行梦": return {"status": "error", "message": "权限不足"}
18
- if req.name in db["admin"]: return {"status": "error", "message": "已存在"}
19
- db["admin"][req.name] = []
20
- else:
21
- if req.username not in db["users"]: db["users"][req.username] = {}
22
- if len(db["users"][req.username]) >= 3: return {"status": "error", "message": "上限3个"}
23
- if req.name in db["users"][req.username]: return {"status": "error", "message": "已存在"}
24
- db["users"][req.username][req.name] = []
 
 
 
 
 
 
 
25
  save_db(db, "cloud_playlists.json")
26
  return {"status": "success"}
27
  except Exception as e: return {"status": "error", "message": str(e)}
28
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
29
  @router.post("/api/cloud_pl/add")
30
  def add_cloud_playlist(req: PlaylistAddReq):
31
  try:
32
  db = get_db("cloud_playlists.json")
33
- if req.type == "admin":
34
- if req.username != "猪的飞行梦": return {"status": "error", "message": "权限不足"}
35
- if req.filename not in db.get("admin", {}).get(req.playlist_name, []): db["admin"][req.playlist_name].append(req.filename)
 
 
 
 
36
  else:
37
- if req.filename not in db.get("users", {}).get(req.username, {}).get(req.playlist_name, []): db["users"][req.username][req.playlist_name].append(req.filename)
 
38
  save_db(db, "cloud_playlists.json")
39
  return {"status": "success"}
40
  except Exception as e: return {"status": "error", "message": str(e)}
 
1
+ import os
2
+ import uuid
3
+ from fastapi import APIRouter, UploadFile, File, Form
4
+ from pydantic import BaseModel
5
+ from huggingface_hub import upload_file
6
+ from core import get_db, save_db, REPO_ID, TOKEN
7
 
8
  router = APIRouter()
9
 
10
+ class PlaylistAddReq(BaseModel):
11
+ username: str
12
+ type: str
13
+ playlist_name: str
14
+ filename: str
15
+
16
+ class PlaylistDeleteReq(BaseModel):
17
+ username: str
18
+ type: str
19
+ name: str
20
+
21
  @router.get("/api/cloud_pl/get")
22
  def get_cloud_playlists():
23
  return {"status": "success", "data": get_db("cloud_playlists.json")}
24
 
25
  @router.post("/api/cloud_pl/create")
26
+ async def create_cloud_playlist(username: str = Form(...), type: str = Form(...), name: str = Form(...), description: str = Form(""), image: UploadFile = File(None)):
27
  try:
28
  db = get_db("cloud_playlists.json")
29
  if "admin" not in db: db["admin"] = {}
30
  if "users" not in db: db["users"] = {}
31
+
32
+ target_dict = db["admin"] if type == "admin" else db["users"].setdefault(username, {})
33
+ if type == "admin" and username != "猪的飞行梦": return {"status": "error", "message": "权限不足"}
34
+ if type == "user" and len(target_dict) >= 3: return {"status": "error", "message": "上限3个"}
35
+ if name in target_dict: return {"status": "error", "message": "歌单已存在"}
36
+
37
+ cover_name = ""
38
+ if image:
39
+ ext = os.path.splitext(image.filename)[1]
40
+ cover_name = f"pl_{uuid.uuid4().hex[:8]}{ext}"
41
+ temp_path = f"temp_{cover_name}"
42
+ with open(temp_path, "wb") as f: f.write(await image.read())
43
+ upload_file(path_or_fileobj=temp_path, path_in_repo=f"community_images/{cover_name}", repo_id=REPO_ID, repo_type="dataset", token=TOKEN)
44
+ os.remove(temp_path)
45
+
46
+ target_dict[name] = {"description": description[:200], "cover": cover_name, "songs": []}
47
  save_db(db, "cloud_playlists.json")
48
  return {"status": "success"}
49
  except Exception as e: return {"status": "error", "message": str(e)}
50
 
51
+ @router.post("/api/cloud_pl/edit")
52
+ async def edit_cloud_playlist(username: str = Form(...), type: str = Form(...), old_name: str = Form(...), name: str = Form(...), description: str = Form(""), image: UploadFile = File(None)):
53
+ try:
54
+ db = get_db("cloud_playlists.json")
55
+ target_dict = db.get("admin", {}) if type == "admin" else db.get("users", {}).get(username, {})
56
+ if old_name not in target_dict: return {"status": "error", "message": "歌单不存在"}
57
+ if type == "admin" and username != "猪的飞行梦": return {"status": "error", "message": "权限不足"}
58
+ if old_name != name and name in target_dict: return {"status": "error", "message": "新名称已存在"}
59
+
60
+ pl_data = target_dict.pop(old_name)
61
+ # 兼容旧版本纯数组数据
62
+ if isinstance(pl_data, list): pl_data = {"description": "", "cover": "", "songs": pl_data}
63
+
64
+ pl_data["description"] = description[:200]
65
+ if image:
66
+ ext = os.path.splitext(image.filename)[1]
67
+ cover_name = f"pl_{uuid.uuid4().hex[:8]}{ext}"
68
+ temp_path = f"temp_{cover_name}"
69
+ with open(temp_path, "wb") as f: f.write(await image.read())
70
+ upload_file(path_or_fileobj=temp_path, path_in_repo=f"community_images/{cover_name}", repo_id=REPO_ID, repo_type="dataset", token=TOKEN)
71
+ os.remove(temp_path)
72
+ pl_data["cover"] = cover_name
73
+
74
+ target_dict[name] = pl_data
75
+ save_db(db, "cloud_playlists.json")
76
+ return {"status": "success"}
77
+ except Exception as e: return {"status": "error", "message": str(e)}
78
+
79
+ @router.post("/api/cloud_pl/delete")
80
+ def delete_cloud_playlist(req: PlaylistDeleteReq):
81
+ try:
82
+ db = get_db("cloud_playlists.json")
83
+ target_dict = db.get("admin", {}) if req.type == "admin" else db.get("users", {}).get(req.username, {})
84
+ if req.type == "admin" and req.username != "猪的飞行梦": return {"status": "error", "message": "权限不足"}
85
+ if req.name in target_dict:
86
+ del target_dict[req.name]
87
+ save_db(db, "cloud_playlists.json")
88
+ return {"status": "success"}
89
+ return {"status": "error", "message": "歌单不存在"}
90
+ except Exception as e: return {"status": "error", "message": str(e)}
91
+
92
  @router.post("/api/cloud_pl/add")
93
  def add_cloud_playlist(req: PlaylistAddReq):
94
  try:
95
  db = get_db("cloud_playlists.json")
96
+ target_dict = db.get("admin", {}) if req.type == "admin" else db.get("users", {}).get(req.username, {})
97
+ pl = target_dict.get(req.playlist_name)
98
+ if pl is None: return {"status": "error", "message": "歌单不存在"}
99
+
100
+ # 兼容旧版本列表
101
+ if isinstance(pl, list):
102
+ if req.filename not in pl: pl.append(req.filename)
103
  else:
104
+ if req.filename not in pl["songs"]: pl["songs"].append(req.filename)
105
+
106
  save_db(db, "cloud_playlists.json")
107
  return {"status": "success"}
108
  except Exception as e: return {"status": "error", "message": str(e)}