Spaces:
Running
Running
| # ⚙️ 后端逻辑/核心服务端.py (Hugging Face Spaces app.py) | |
| from fastapi import FastAPI, HTTPException, File, UploadFile, Form | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from typing import Optional | |
| import time | |
| import uuid | |
| import hashlib | |
| # 导入数据库操作模块 | |
| import 数据库连接 as db | |
| app = FastAPI(title="ComfyUI Ranking Community API") | |
| # 允许跨域请求,这是 ComfyUI 能够访问云端的关键 | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # --- 增加根目录探针,防止 Hugging Face 健康检查失败 --- | |
| def read_root(): | |
| return {"status": "ok", "message": "ComfyUI Ranking API is running!"} | |
| # --- 【新增】:通用文件上传与哈希分类接口 --- | |
| async def upload_file(file: UploadFile = File(...), file_type: str = Form(...)): | |
| content = await file.read() | |
| # 计算文件内容的 MD5 哈希值,截取前10位保证唯一性 | |
| file_hash = hashlib.md5(content).hexdigest()[:10] | |
| original_name = file.filename | |
| # 将哈希值拼接到文件名前,防止同名文件覆盖 (后台物理名称) | |
| new_filename = f"{file_hash}_{original_name}" | |
| # 根据文件类型,物理隔离到不同的目录 | |
| dir_mapping = { | |
| "avatar": "avatars", | |
| "cover": "covers", | |
| "tool": "tools", | |
| "app": "apps" | |
| } | |
| target_dir = dir_mapping.get(file_type, "others") | |
| full_path_in_repo = f"{target_dir}/{new_filename}" | |
| # 将真实文件存入数据库(Dataset) | |
| db.save_file(full_path_in_repo, content) | |
| # 组装供前端直接下载或显示的原始直连 URL | |
| url = f"https://huggingface.co/datasets/{db.DATASET_REPO_ID}/resolve/main/{full_path_in_repo}" | |
| return { | |
| "status": "success", | |
| "url": url, # 前端 src 或 href 使用的真实链接 | |
| "display_name": original_name, # 供前端 UI 干净展示的文件名 | |
| "hashed_name": new_filename # 实际带哈希的文件名 | |
| } | |
| # --- 数据模型 --- | |
| class UserRegister(BaseModel): | |
| account: str # 唯一账号 (新增为主键) | |
| password: str # 密码 (新增) | |
| email: str # 邮箱 (新增) | |
| phone: str # 手机号 (新增) | |
| name: str # 显示名称 (昵称) | |
| gender: str | |
| avatarDataUrl: Optional[str] = None | |
| age: Optional[int] = None | |
| country: Optional[str] = None | |
| region: Optional[str] = None | |
| intro: Optional[str] = None | |
| class UserLogin(BaseModel): | |
| account: str # 改为使用 account 登录 | |
| password: str | |
| class InteractionToggle(BaseModel): | |
| item_id: str | |
| user_id: str # 使用 account 作为唯一标识 | |
| action_type: str # "like", "favorite" | |
| is_active: bool | |
| class CommentCreate(BaseModel): | |
| item_id: str | |
| author: str # 存储用户的 account 或 name | |
| content: str | |
| reply_to_user: Optional[str] = None | |
| parent_id: Optional[str] = None | |
| # 【核心修改】:新增发布项目的数据模型 | |
| class ItemCreate(BaseModel): | |
| type: str | |
| title: str | |
| shortDesc: str | |
| fullDesc: str | |
| link: str | |
| coverUrl: Optional[str] = None | |
| author: str | |
| price: float = 0.0 # 标价/积分字段 | |
| # --- 1. 用户系统 --- | |
| async def register_user(user: UserRegister): | |
| users_db = db.load_data("users.json", default_data={}) | |
| # 通过 account 查重 | |
| if user.account in users_db: | |
| raise HTTPException(status_code=400, detail="该账号已被注册,请尝试其他账号名") | |
| # 额外防重校验:邮箱和手机号不能被多个账号绑定 | |
| for existing_user in users_db.values(): | |
| if existing_user.get("email") == user.email: | |
| raise HTTPException(status_code=400, detail="该邮箱已被绑定") | |
| if existing_user.get("phone") == user.phone: | |
| raise HTTPException(status_code=400, detail="该手机号已被绑定") | |
| # 构建用户档案,初始化各项统计为 0 | |
| new_user = user.dict() | |
| new_user.update({ | |
| "receivedLikes": 0, "receivedFollows": 0, | |
| "privacy": {"follows": False, "likes": False, "favorites": False, "downloads": False} | |
| }) | |
| users_db[user.account] = new_user # 使用 account 作为字典的 Key | |
| db.save_data("users.json", users_db) | |
| # 返回时为了安全,剔除密码字段 | |
| safe_user_data = {k: v for k, v in new_user.items() if k != "password"} | |
| return {"status": "success", "message": "注册成功", "data": safe_user_data} | |
| async def login_user(user: UserLogin): | |
| users_db = db.load_data("users.json", default_data={}) | |
| if user.account not in users_db: | |
| raise HTTPException(status_code=404, detail="账号不存在,请检查或先注册") | |
| user_data = users_db[user.account] | |
| if user_data.get("password") != user.password: | |
| raise HTTPException(status_code=401, detail="密码错误") | |
| return { | |
| "status": "success", | |
| "token": f"mock_token_{user.account}", # 模拟生成 token | |
| "account": user.account, | |
| "name": user_data["name"], | |
| "avatar": user_data.get("avatarDataUrl", "https://via.placeholder.com/150") | |
| } | |
| async def get_user_profile(account: str): | |
| users_db = db.load_data("users.json", default_data={}) | |
| if account not in users_db: | |
| raise HTTPException(status_code=404, detail="用户不存在") | |
| user_data = users_db[account] | |
| safe_user_data = {k: v for k, v in user_data.items() if k != "password"} | |
| return {"status": "success", "data": safe_user_data} | |
| # --- 2. 排行榜核心数据流 --- | |
| async def get_items(type: str = "tool", sort: str = "time", limit: int = 20): | |
| items_db = db.load_data("items.json", default_data=[]) | |
| filtered_items = [item for item in items_db if item.get("type") == type] | |
| if sort == "likes": | |
| filtered_items.sort(key=lambda x: x.get("likes", 0), reverse=True) | |
| elif sort == "favorites": | |
| filtered_items.sort(key=lambda x: x.get("favorites", 0), reverse=True) | |
| elif sort == "downloads": | |
| filtered_items.sort(key=lambda x: x.get("uses", 0), reverse=True) | |
| else: | |
| filtered_items.sort(key=lambda x: x.get("created_at", 0), reverse=True) | |
| return {"status": "success", "data": filtered_items[:limit]} | |
| # 【核心修改】:处理前端发布的新插件/应用请求 | |
| async def create_item(item: ItemCreate): | |
| items_db = db.load_data("items.json", default_data=[]) | |
| # 生成独一无二的项目 ID (如: tool_169..._a1b2c3) | |
| new_id = f"{item.type}_{int(time.time())}_{uuid.uuid4().hex[:6]}" | |
| new_item = { | |
| "id": new_id, | |
| "type": item.type, | |
| "title": item.title, | |
| "author": item.author, | |
| "shortDesc": item.shortDesc, | |
| "fullDesc": item.fullDesc, | |
| "link": item.link, | |
| "coverUrl": item.coverUrl, | |
| "price": item.price, | |
| "likes": 0, | |
| "favorites": 0, | |
| "comments": 0, | |
| "uses": 0, | |
| "created_at": int(time.time()), | |
| "liked_by": [], | |
| "favorited_by": [] | |
| } | |
| # 核心:将新项目插入到列表的【最前面】(第0位),保证 "最新发布" 排在第一 | |
| items_db.insert(0, new_item) | |
| db.save_data("items.json", items_db) | |
| return {"status": "success", "data": new_item} | |
| # --- 3. 社交与互动 (点赞/收藏防抖与排重) --- | |
| async def toggle_interaction(interaction: InteractionToggle): | |
| items_db = db.load_data("items.json", default_data=[]) | |
| target_item = next((item for item in items_db if item["id"] == interaction.item_id), None) | |
| if not target_item: | |
| raise HTTPException(status_code=404, detail="目标工具/应用不存在") | |
| list_key = f"{interaction.action_type}d_by" | |
| count_key = "likes" if interaction.action_type == "like" else "favorites" | |
| if list_key not in target_item: | |
| target_item[list_key] = [] | |
| target_item[count_key] = 0 | |
| user_list = target_item[list_key] | |
| if interaction.is_active: | |
| if interaction.user_id not in user_list: | |
| user_list.append(interaction.user_id) | |
| target_item[count_key] += 1 | |
| else: | |
| if interaction.user_id in user_list: | |
| user_list.remove(interaction.user_id) | |
| target_item[count_key] = max(0, target_item[count_key] - 1) | |
| db.save_data("items.json", items_db) | |
| return { | |
| "status": "success", | |
| "message": "操作成功", | |
| "new_count": target_item[count_key] | |
| } | |
| # --- 4. 评论系统 --- | |
| async def post_comment(comment: CommentCreate): | |
| comments_db = db.load_data("comments.json", default_data={}) | |
| item_comments = comments_db.get(comment.item_id, []) | |
| new_comment = { | |
| "id": f"c_{int(time.time())}_{uuid.uuid4().hex[:6]}", | |
| "author": comment.author, | |
| "content": comment.content, | |
| "replyToUser": comment.reply_to_user, | |
| "isDeleted": False, | |
| "replies": [], | |
| "created_at": int(time.time()) | |
| } | |
| if comment.parent_id: | |
| parent = next((c for c in item_comments if c["id"] == comment.parent_id), None) | |
| if parent: | |
| parent["replies"].append(new_comment) | |
| else: | |
| raise HTTPException(status_code=404, detail="找不到要回复的父级评论") | |
| else: | |
| item_comments.append(new_comment) | |
| comments_db[comment.item_id] = item_comments | |
| db.save_data("comments.json", comments_db) | |
| return {"status": "success", "data": new_comment} | |
| async def soft_delete_comment(item_id: str, comment_id: str, author: str): | |
| comments_db = db.load_data("comments.json", default_data={}) | |
| item_comments = comments_db.get(item_id, []) | |
| def find_and_delete(comments_list): | |
| for c in comments_list: | |
| if c["id"] == comment_id: | |
| if c["author"] != author: | |
| raise HTTPException(status_code=403, detail="无权删除他人的评论") | |
| c["isDeleted"] = True | |
| c["content"] = "" | |
| return True | |
| if "replies" in c and find_and_delete(c["replies"]): | |
| return True | |
| return False | |
| if find_and_delete(item_comments): | |
| db.save_data("comments.json", comments_db) | |
| return {"status": "success", "message": "评论已软删除"} | |
| else: | |
| raise HTTPException(status_code=404, detail="找不到该评论") |