Spaces:
Running
Running
| # router_items.py | |
| from fastapi import APIRouter, HTTPException | |
| import time | |
| import uuid | |
| import datetime | |
| import 数据库连接 as db | |
| from models import ItemCreate, ItemUpdate | |
| router = APIRouter() | |
| def get_last_6_months(): | |
| res = [] | |
| today = datetime.date.today() | |
| for i in range(5, -1, -1): | |
| m = today.month - i | |
| y = today.year | |
| while m <= 0: | |
| m += 12 | |
| y -= 1 | |
| res.append(f"{y}-{m:02d}") | |
| return res | |
| async def get_items(type: str = "tool", sort: str = "time", limit: int = 50): # 优化:默认限制调大至 50,提升前端列表体验 | |
| items_db = db.load_data("items.json", default_data=[]) | |
| comments_db = db.load_data("comments.json", default_data={}) | |
| # 如果是推荐榜,匹配所有 recommend 开头的子类型 | |
| if type == "recommend": | |
| filtered_items = [item for item in items_db if item.get("type", "").startswith("recommend")] | |
| else: | |
| filtered_items = [item for item in items_db if item.get("type") == type] | |
| for item in filtered_items: | |
| item["commentsData"] = comments_db.get(item["id"], []) | |
| item["comments"] = len(item["commentsData"]) | |
| # 🔴 【绝对核心防线】:在下发给前端前,强行在内存中抹除创作者的 Token! | |
| # 这样即使资源是公开展示的,普通用户也绝对抓不到源仓库的密钥。 | |
| item.pop("github_token", None) | |
| if sort == "likes": filtered_items.sort(key=lambda x: x.get("likes", 0), reverse=True) | |
| elif sort == "favorites": filtered_items.sort(key=lambda x: x.get("favorites", 0), reverse=True) | |
| elif sort == "downloads": filtered_items.sort(key=lambda x: x.get("uses", 0), reverse=True) | |
| else: filtered_items.sort(key=lambda x: x.get("created_at", 0), reverse=True) | |
| return {"status": "success", "data": filtered_items[:limit]} | |
| async def get_creators(sort: str = "downloads", limit: int = 20): | |
| users_db = db.load_data("users.json", default_data={}) | |
| items_db = db.load_data("items.json", default_data=[]) | |
| comments_db = db.load_data("comments.json", default_data={}) | |
| creators = [] | |
| months = get_last_6_months() | |
| for account, u in users_db.items(): | |
| u_items = [i for i in items_db if i.get("author") == account] | |
| trend_tools = {m: 0 for m in months} | |
| trend_apps = {m: 0 for m in months} | |
| trend_recommends = {m: 0 for m in months} | |
| tools_count = 0 | |
| apps_count = 0 | |
| for i in u_items: | |
| itype = i.get("type", "") | |
| history = i.get("use_history", {}) | |
| if itype == "tool" or itype == "recommend_tool": | |
| if itype == "tool": tools_count += 1 | |
| for m in months: trend_tools[m] += history.get(m, 0) | |
| elif itype == "app" or itype == "recommend_app": | |
| if itype == "app": apps_count += 1 | |
| for m in months: trend_apps[m] += history.get(m, 0) | |
| elif itype.startswith("recommend"): | |
| for m in months: trend_recommends[m] += history.get(m, 0) | |
| creators.append({ | |
| "account": account, "name": u.get("name", account), "avatar": u.get("avatarDataUrl", "https://via.placeholder.com/150"), | |
| "shortDesc": u.get("intro", "这个人很懒,什么都没写..."), "fullDesc": u.get("intro", "这个人很懒,什么都没写..."), | |
| "likes": sum(i.get("likes", 0) for i in u_items), "favorites": sum(i.get("favorites", 0) for i in u_items), | |
| "downloads": sum(i.get("uses", 0) for i in u_items), | |
| "toolsCount": tools_count, "appsCount": apps_count, "followers": len(u.get("followers", [])), "created_at": u.get("created_at", 0), | |
| "commentsData": comments_db.get(account, []), | |
| "trendData": { | |
| "months": months, | |
| "tools": [trend_tools[m] for m in months], | |
| "apps": [trend_apps[m] for m in months], | |
| "recommends": [trend_recommends[m] for m in months] | |
| } | |
| }) | |
| if sort == "likes": creators.sort(key=lambda x: x.get("likes", 0), reverse=True) | |
| elif sort == "favorites": creators.sort(key=lambda x: x.get("favorites", 0), reverse=True) | |
| elif sort == "downloads": creators.sort(key=lambda x: x.get("downloads", 0), reverse=True) | |
| else: creators.sort(key=lambda x: x.get("created_at", 0), reverse=True) | |
| return {"status": "success", "data": creators[:limit]} | |
| async def create_item(item: ItemCreate): | |
| # 【安全加固】:强制转换为整数,并拦截负数 (防浮点漏洞与洗钱) | |
| item.price = int(item.price) | |
| if item.price < 0: | |
| raise HTTPException(status_code=400, detail="🚨 安全拦截:商品价格不能为负数") | |
| items_db = db.load_data("items.json", default_data=[]) | |
| new_item = { | |
| "id": f"{item.type}_{int(time.time())}_{uuid.uuid4().hex[:6]}", "type": item.type, "title": item.title, "author": item.author, | |
| "shortDesc": item.shortDesc, "fullDesc": item.fullDesc, "link": item.link, "coverUrl": item.coverUrl, "price": item.price, | |
| "github_token": item.github_token, # 【新增】保存密钥到云端 JSON | |
| "likes": 0, "favorites": 0, "comments": 0, "uses": 0, "use_history": {}, "created_at": int(time.time()), "liked_by": [], "favorited_by": [] | |
| } | |
| items_db.insert(0, new_item) | |
| db.save_data("items.json", items_db) | |
| return {"status": "success", "data": new_item} | |
| async def update_item(item_id: str, update_data: ItemUpdate, author: str): | |
| # 【安全加固】:更新时同样强制转换为整数并拦截负数 | |
| if update_data.price is not None: | |
| update_data.price = int(update_data.price) | |
| if update_data.price < 0: | |
| raise HTTPException(status_code=400, detail="🚨 安全拦截:商品价格不能为负数") | |
| items_db = db.load_data("items.json", default_data=[]) | |
| for item in items_db: | |
| if item["id"] == item_id: | |
| if item.get("author") != author: raise HTTPException(status_code=403, detail="无权修改他人发布的内容") | |
| if update_data.title is not None: item["title"] = update_data.title | |
| if update_data.shortDesc is not None: item["shortDesc"] = update_data.shortDesc | |
| if update_data.fullDesc is not None: item["fullDesc"] = update_data.fullDesc | |
| if update_data.link is not None: item["link"] = update_data.link | |
| if update_data.coverUrl is not None: item["coverUrl"] = update_data.coverUrl | |
| if update_data.price is not None: item["price"] = update_data.price | |
| if update_data.github_token is not None: item["github_token"] = update_data.github_token # 【新增】允许更新密钥 | |
| db.save_data("items.json", items_db) | |
| return {"status": "success"} | |
| raise HTTPException(status_code=404, detail="找不到该内容记录") | |
| async def delete_item(item_id: str, author: str): | |
| items_db = db.load_data("items.json", default_data=[]) | |
| target_idx = next((i for i, item in enumerate(items_db) if item["id"] == item_id), None) | |
| if target_idx is None: raise HTTPException(status_code=404, detail="找不到该内容记录") | |
| if items_db[target_idx].get("author") != author: raise HTTPException(status_code=403, detail="无权删除他人发布的内容") | |
| items_db.pop(target_idx) | |
| db.save_data("items.json", items_db) | |
| comments_db = db.load_data("comments.json", default_data={}) | |
| if item_id in comments_db: | |
| del comments_db[item_id] | |
| db.save_data("comments.json", comments_db) | |
| return {"status": "success"} | |
| async def record_item_use(item_id: str): | |
| items_db = db.load_data("items.json", default_data=[]) | |
| current_month = datetime.date.today().strftime("%Y-%m") | |
| for item in items_db: | |
| if item["id"] == item_id: | |
| item["uses"] = item.get("uses", 0) + 1 | |
| if "use_history" not in item: | |
| item["use_history"] = {} | |
| item["use_history"][current_month] = item["use_history"].get(current_month, 0) + 1 | |
| db.save_data("items.json", items_db) | |
| return {"status": "success", "uses": item["uses"]} | |
| raise HTTPException(status_code=404, detail="找不到该内容记录") |