ZHIWEI666's picture
Upload app.py
da1038d verified
raw
history blame
11.1 kB
# ⚙️ 后端逻辑/核心服务端.py (Hugging Face Spaces app.py)
from fastapi import FastAPI, HTTPException, File, UploadFile, Form
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional
import time
import uuid
import hashlib
# 导入数据库操作模块
import 数据库连接 as db
app = FastAPI(title="ComfyUI Ranking Community API")
# 允许跨域请求,这是 ComfyUI 能够访问云端的关键
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- 增加根目录探针,防止 Hugging Face 健康检查失败 ---
@app.get("/")
def read_root():
return {"status": "ok", "message": "ComfyUI Ranking API is running!"}
# --- 【新增】:通用文件上传与哈希分类接口 ---
@app.post("/api/upload")
async def upload_file(file: UploadFile = File(...), file_type: str = Form(...)):
content = await file.read()
# 计算文件内容的 MD5 哈希值,截取前10位保证唯一性
file_hash = hashlib.md5(content).hexdigest()[:10]
original_name = file.filename
# 将哈希值拼接到文件名前,防止同名文件覆盖 (后台物理名称)
new_filename = f"{file_hash}_{original_name}"
# 根据文件类型,物理隔离到不同的目录
dir_mapping = {
"avatar": "avatars",
"cover": "covers",
"tool": "tools",
"app": "apps"
}
target_dir = dir_mapping.get(file_type, "others")
full_path_in_repo = f"{target_dir}/{new_filename}"
# 将真实文件存入数据库(Dataset)
db.save_file(full_path_in_repo, content)
# 组装供前端直接下载或显示的原始直连 URL
url = f"https://huggingface.co/datasets/{db.DATASET_REPO_ID}/resolve/main/{full_path_in_repo}"
return {
"status": "success",
"url": url, # 前端 src 或 href 使用的真实链接
"display_name": original_name, # 供前端 UI 干净展示的文件名
"hashed_name": new_filename # 实际带哈希的文件名
}
# --- 数据模型 ---
class UserRegister(BaseModel):
account: str # 唯一账号 (新增为主键)
password: str # 密码 (新增)
email: str # 邮箱 (新增)
phone: str # 手机号 (新增)
name: str # 显示名称 (昵称)
gender: str
avatarDataUrl: Optional[str] = None
age: Optional[int] = None
country: Optional[str] = None
region: Optional[str] = None
intro: Optional[str] = None
class UserLogin(BaseModel):
account: str # 改为使用 account 登录
password: str
class InteractionToggle(BaseModel):
item_id: str
user_id: str # 使用 account 作为唯一标识
action_type: str # "like", "favorite"
is_active: bool
class CommentCreate(BaseModel):
item_id: str
author: str # 存储用户的 account 或 name
content: str
reply_to_user: Optional[str] = None
parent_id: Optional[str] = None
# 【核心修改】:新增发布项目的数据模型
class ItemCreate(BaseModel):
type: str
title: str
shortDesc: str
fullDesc: str
link: str
coverUrl: Optional[str] = None
author: str
price: float = 0.0 # 标价/积分字段
# --- 1. 用户系统 ---
@app.post("/api/users/register")
async def register_user(user: UserRegister):
users_db = db.load_data("users.json", default_data={})
# 通过 account 查重
if user.account in users_db:
raise HTTPException(status_code=400, detail="该账号已被注册,请尝试其他账号名")
# 额外防重校验:邮箱和手机号不能被多个账号绑定
for existing_user in users_db.values():
if existing_user.get("email") == user.email:
raise HTTPException(status_code=400, detail="该邮箱已被绑定")
if existing_user.get("phone") == user.phone:
raise HTTPException(status_code=400, detail="该手机号已被绑定")
# 构建用户档案,初始化各项统计为 0
new_user = user.dict()
new_user.update({
"receivedLikes": 0, "receivedFollows": 0,
"privacy": {"follows": False, "likes": False, "favorites": False, "downloads": False}
})
users_db[user.account] = new_user # 使用 account 作为字典的 Key
db.save_data("users.json", users_db)
# 返回时为了安全,剔除密码字段
safe_user_data = {k: v for k, v in new_user.items() if k != "password"}
return {"status": "success", "message": "注册成功", "data": safe_user_data}
@app.post("/api/users/login")
async def login_user(user: UserLogin):
users_db = db.load_data("users.json", default_data={})
if user.account not in users_db:
raise HTTPException(status_code=404, detail="账号不存在,请检查或先注册")
user_data = users_db[user.account]
if user_data.get("password") != user.password:
raise HTTPException(status_code=401, detail="密码错误")
return {
"status": "success",
"token": f"mock_token_{user.account}", # 模拟生成 token
"account": user.account,
"name": user_data["name"],
"avatar": user_data.get("avatarDataUrl", "https://via.placeholder.com/150")
}
@app.get("/api/users/{account}")
async def get_user_profile(account: str):
users_db = db.load_data("users.json", default_data={})
if account not in users_db:
raise HTTPException(status_code=404, detail="用户不存在")
user_data = users_db[account]
safe_user_data = {k: v for k, v in user_data.items() if k != "password"}
return {"status": "success", "data": safe_user_data}
# --- 2. 排行榜核心数据流 ---
@app.get("/api/items")
async def get_items(type: str = "tool", sort: str = "time", limit: int = 20):
items_db = db.load_data("items.json", default_data=[])
filtered_items = [item for item in items_db if item.get("type") == type]
if sort == "likes":
filtered_items.sort(key=lambda x: x.get("likes", 0), reverse=True)
elif sort == "favorites":
filtered_items.sort(key=lambda x: x.get("favorites", 0), reverse=True)
elif sort == "downloads":
filtered_items.sort(key=lambda x: x.get("uses", 0), reverse=True)
else:
filtered_items.sort(key=lambda x: x.get("created_at", 0), reverse=True)
return {"status": "success", "data": filtered_items[:limit]}
# 【核心修改】:处理前端发布的新插件/应用请求
@app.post("/api/items")
async def create_item(item: ItemCreate):
items_db = db.load_data("items.json", default_data=[])
# 生成独一无二的项目 ID (如: tool_169..._a1b2c3)
new_id = f"{item.type}_{int(time.time())}_{uuid.uuid4().hex[:6]}"
new_item = {
"id": new_id,
"type": item.type,
"title": item.title,
"author": item.author,
"shortDesc": item.shortDesc,
"fullDesc": item.fullDesc,
"link": item.link,
"coverUrl": item.coverUrl,
"price": item.price,
"likes": 0,
"favorites": 0,
"comments": 0,
"uses": 0,
"created_at": int(time.time()),
"liked_by": [],
"favorited_by": []
}
# 核心:将新项目插入到列表的【最前面】(第0位),保证 "最新发布" 排在第一
items_db.insert(0, new_item)
db.save_data("items.json", items_db)
return {"status": "success", "data": new_item}
# --- 3. 社交与互动 (点赞/收藏防抖与排重) ---
@app.post("/api/interactions/toggle")
async def toggle_interaction(interaction: InteractionToggle):
items_db = db.load_data("items.json", default_data=[])
target_item = next((item for item in items_db if item["id"] == interaction.item_id), None)
if not target_item:
raise HTTPException(status_code=404, detail="目标工具/应用不存在")
list_key = f"{interaction.action_type}d_by"
count_key = "likes" if interaction.action_type == "like" else "favorites"
if list_key not in target_item:
target_item[list_key] = []
target_item[count_key] = 0
user_list = target_item[list_key]
if interaction.is_active:
if interaction.user_id not in user_list:
user_list.append(interaction.user_id)
target_item[count_key] += 1
else:
if interaction.user_id in user_list:
user_list.remove(interaction.user_id)
target_item[count_key] = max(0, target_item[count_key] - 1)
db.save_data("items.json", items_db)
return {
"status": "success",
"message": "操作成功",
"new_count": target_item[count_key]
}
# --- 4. 评论系统 ---
@app.post("/api/comments")
async def post_comment(comment: CommentCreate):
comments_db = db.load_data("comments.json", default_data={})
item_comments = comments_db.get(comment.item_id, [])
new_comment = {
"id": f"c_{int(time.time())}_{uuid.uuid4().hex[:6]}",
"author": comment.author,
"content": comment.content,
"replyToUser": comment.reply_to_user,
"isDeleted": False,
"replies": [],
"created_at": int(time.time())
}
if comment.parent_id:
parent = next((c for c in item_comments if c["id"] == comment.parent_id), None)
if parent:
parent["replies"].append(new_comment)
else:
raise HTTPException(status_code=404, detail="找不到要回复的父级评论")
else:
item_comments.append(new_comment)
comments_db[comment.item_id] = item_comments
db.save_data("comments.json", comments_db)
return {"status": "success", "data": new_comment}
@app.delete("/api/comments/{item_id}/{comment_id}")
async def soft_delete_comment(item_id: str, comment_id: str, author: str):
comments_db = db.load_data("comments.json", default_data={})
item_comments = comments_db.get(item_id, [])
def find_and_delete(comments_list):
for c in comments_list:
if c["id"] == comment_id:
if c["author"] != author:
raise HTTPException(status_code=403, detail="无权删除他人的评论")
c["isDeleted"] = True
c["content"] = ""
return True
if "replies" in c and find_and_delete(c["replies"]):
return True
return False
if find_and_delete(item_comments):
db.save_data("comments.json", comments_db)
return {"status": "success", "message": "评论已软删除"}
else:
raise HTTPException(status_code=404, detail="找不到该评论")