# 云端Space代码/router_posts.py # ========================================== # 💬 讨论区API路由(小红书风格图文社区) # ========================================== # 功能:帖子发布、列表、详情、互动(点赞/收藏/评论/打赏) # ========================================== from fastapi import APIRouter, HTTPException, Depends from models import PostCreate, PostUpdate import 数据库连接 as db from 安全认证 import require_auth from db_utils import record_view, sort_cache import time import uuid router = APIRouter() # ========================================== # 📝 帖子CRUD接口 # ========================================== @router.get("/api/posts") async def get_posts(page: int = 1, limit: int = 20, sort: str = "latest"): """ 获取帖子列表(分页,支持多种排序方式) - sort=latest: 按创建时间降序(默认) - sort=likes: 按点赞数降序 - sort=favorites: 按收藏数降序 - sort=views: 按总访问量降序 - sort=daily_views: 按日访问量降序 """ posts_db = db.load_data("posts.json", default_data=[]) users_db = db.load_data("users.json", default_data={}) # users_db 已经是 {account: user_info} 格式,直接使用 user_map = users_db # 🗂️ 使用排序缓存优化排序性能 cache_key = f"posts:{sort}" def sort_fn(data): if sort == "likes": data.sort(key=lambda x: x.get("likes", 0), reverse=True) elif sort == "favorites": data.sort(key=lambda x: x.get("favorites", 0), reverse=True) elif sort == "views": data.sort(key=lambda x: x.get("views", 0), reverse=True) elif sort == "daily_views": data.sort(key=lambda x: x.get("daily_views", 0), reverse=True) else: # latest 或其他默认 data.sort(key=lambda x: x.get("created_at", 0), reverse=True) sorted_posts = sort_cache.get_sorted(cache_key, posts_db, sort_fn) # 分页 start = (page - 1) * limit end = start + limit paged_posts = sorted_posts[start:end] # 附加作者信息,并过滤敏感字段 result = [] for post in paged_posts: author_info = user_map.get(post.get("author"), {}) post_data = { **post, "author_name": author_info.get("name", post.get("author")), "author_avatar": author_info.get("avatarDataUrl", "") } # 过滤敏感字段(列表接口过滤 viewed_by、liked_by、favorited_by) post_data.pop("viewed_by", None) post_data.pop("liked_by", None) post_data.pop("favorited_by", None) result.append(post_data) return { "status": "success", "data": result, "total": len(posts_db), "page": page, "limit": limit } @router.get("/api/my-posts") async def get_my_posts(current_user: str = Depends(require_auth)): """ 获取我的帖子列表 """ posts_db = db.load_data("posts.json", default_data=[]) # 筛选当前用户的帖子 my_posts = [p for p in posts_db if p.get("author") == current_user] # 按创建时间倒序 my_posts = sorted(my_posts, key=lambda x: x.get("created_at", 0), reverse=True) # 过滤敏感字段(列表接口过滤 viewed_by、liked_by、favorited_by) result = [] for post in my_posts: post_data = dict(post) post_data.pop("viewed_by", None) post_data.pop("liked_by", None) post_data.pop("favorited_by", None) result.append(post_data) return { "status": "success", "data": result } @router.get("/api/posts/{post_id}") async def get_post_detail(post_id: str): """ 获取帖子详情 """ posts_db = db.load_data("posts.json", default_data=[]) users_db = db.load_data("users.json", default_data={}) # users_db 已经是 {account: user_info} 格式,直接使用 user_map = users_db for post in posts_db: if post["id"] == post_id: author_info = user_map.get(post.get("author"), {}) post_data = { **post, "author_name": author_info.get("name", post.get("author")), "author_avatar": author_info.get("avatarDataUrl", "") } # 过滤敏感字段 post_data.pop("viewed_by", None) return { "status": "success", "data": post_data } raise HTTPException(status_code=404, detail="帖子不存在") @router.post("/api/posts") async def create_post(post: PostCreate, current_user: str = Depends(require_auth)): """ 发布帖子 """ posts_db = db.load_data("posts.json", default_data=[]) # 限制图片数量 images = (post.images or [])[:9] new_post = { "id": f"post_{int(time.time())}_{uuid.uuid4().hex[:6]}", "title": post.title, "content": post.content, "cover_image": post.cover_image, "images": images, "author": current_user, "created_at": int(time.time()), "is_original": post.is_original if post.is_original is not None else False, # 🎨 原创作品标记 # 互动数据 "likes": 0, "favorites": 0, "comments": 0, "liked_by": [], "favorited_by": [], "tip_board": [] # 打赏榜单 } posts_db.insert(0, new_post) db.save_data("posts.json", posts_db) # 🗂️ 清除排序缓存 sort_cache.invalidate("posts:") return {"status": "success", "data": new_post} @router.put("/api/posts/{post_id}") async def update_post(post_id: str, update_data: PostUpdate, current_user: str = Depends(require_auth)): """ 更新帖子(仅作者可操作) """ posts_db = db.load_data("posts.json", default_data=[]) for post in posts_db: if post["id"] == post_id: if post.get("author") != current_user: raise HTTPException(status_code=403, detail="无权修改他人帖子") if update_data.title is not None: post["title"] = update_data.title if update_data.content is not None: post["content"] = update_data.content if update_data.cover_image is not None: post["cover_image"] = update_data.cover_image if update_data.images is not None: post["images"] = update_data.images[:9] if update_data.is_original is not None: post["is_original"] = update_data.is_original # 🎨 更新原创作品标记 db.save_data("posts.json", posts_db) # 🗂️ 清除排序缓存 sort_cache.invalidate("posts:") return {"status": "success"} raise HTTPException(status_code=404, detail="帖子不存在") @router.delete("/api/posts/{post_id}") async def delete_post(post_id: str, current_user: str = Depends(require_auth)): """ 删除帖子(仅作者可操作) """ posts_db = db.load_data("posts.json", default_data=[]) for i, post in enumerate(posts_db): if post["id"] == post_id: if post.get("author") != current_user: raise HTTPException(status_code=403, detail="无权删除他人帖子") posts_db.pop(i) db.save_data("posts.json", posts_db) # 🗂️ 清除排序缓存 sort_cache.invalidate("posts:") return {"status": "success"} raise HTTPException(status_code=404, detail="帖子不存在") # ========================================== # ❤️ 互动接口(点赞/收藏) # ========================================== @router.post("/api/posts/{post_id}/like") async def toggle_like(post_id: str, current_user: str = Depends(require_auth)): """ 点赞/取消点赞(原子操作,并发安全) """ result_container = [None] def updater(data): for post in data: if post["id"] == post_id: liked_by = post.get("liked_by", []) if current_user in liked_by: liked_by.remove(current_user) post["likes"] = max(0, post.get("likes", 0) - 1) action = "unliked" else: liked_by.append(current_user) post["likes"] = post.get("likes", 0) + 1 action = "liked" post["liked_by"] = liked_by result_container[0] = {"status": "success", "action": action, "likes": post["likes"]} return result_container[0] = None # 未找到帖子 db.atomic_update("posts.json", updater, default_data=[]) if result_container[0] is None: raise HTTPException(status_code=404, detail="帖子不存在") # 🗂️ 清除排序缓存(点赞数变化可能影响排序) sort_cache.invalidate("posts:") return result_container[0] @router.post("/api/posts/{post_id}/favorite") async def toggle_favorite(post_id: str, current_user: str = Depends(require_auth)): """ 收藏/取消收藏(原子操作,并发安全) """ result_container = [None] def updater(data): for post in data: if post["id"] == post_id: favorited_by = post.get("favorited_by", []) if current_user in favorited_by: favorited_by.remove(current_user) post["favorites"] = max(0, post.get("favorites", 0) - 1) action = "unfavorited" else: favorited_by.append(current_user) post["favorites"] = post.get("favorites", 0) + 1 action = "favorited" post["favorited_by"] = favorited_by result_container[0] = {"status": "success", "action": action, "favorites": post["favorites"]} return result_container[0] = None # 未找到帖子 db.atomic_update("posts.json", updater, default_data=[]) if result_container[0] is None: raise HTTPException(status_code=404, detail="帖子不存在") # 🗂️ 清除排序缓存(收藏数变化可能影响排序) sort_cache.invalidate("posts:") return result_container[0] # ========================================== # 🎁 打赏接口 # ========================================== @router.post("/api/posts/{post_id}/tip") async def tip_post(post_id: str, amount: int, is_anon: bool = False, current_user: str = Depends(require_auth)): """ 打赏帖子(原子操作,并发安全) """ if amount <= 0: raise HTTPException(status_code=400, detail="打赏金额必须大于0") result_container = [None] def updater(data): # 在锁内查找帖子 target_post = None for post in data: if post["id"] == post_id: target_post = post break if not target_post: result_container[0] = {"error": "not_found"} return # 不能打赏自己 if target_post.get("author") == current_user: result_container[0] = {"error": "self_tip"} return # 在锁内操作用户余额 users_db = db.load_data("users.json", default_data={}) tipper = users_db.get(current_user) if not tipper or tipper.get("balance", 0) < amount: result_container[0] = {"error": "insufficient_balance"} return author_account = target_post.get("author") author = users_db.get(author_account) if not author: result_container[0] = {"error": "author_not_found"} return # 扣款加款 tipper["balance"] -= amount author["balance"] += amount # 更新打赏榜单 tip_board = target_post.get("tip_board", []) existing = next((t for t in tip_board if t["account"] == current_user), None) if existing: existing["amount"] += amount else: tip_board.append({"account": current_user, "amount": amount, "is_anon": is_anon}) tip_board.sort(key=lambda x: x["amount"], reverse=True) target_post["tip_board"] = tip_board # 保存用户数据 db.save_data("users.json", users_db) result_container[0] = {"status": "success", "message": f"成功打赏 {amount} 积分"} db.atomic_update("posts.json", updater, default_data=[]) # 🗂️ 清除排序缓存(打赏可能影响排序) sort_cache.invalidate("posts:") result = result_container[0] if result is None or result.get("error") == "not_found": raise HTTPException(status_code=404, detail="帖子不存在") if result.get("error") == "self_tip": raise HTTPException(status_code=400, detail="不能打赏自己的帖子") if result.get("error") == "insufficient_balance": raise HTTPException(status_code=400, detail="余额不足") if result.get("error") == "author_not_found": raise HTTPException(status_code=404, detail="作者账户不存在") return result # ========================================== # 💬 评论接口(复用通用评论系统) # ========================================== @router.get("/api/posts/{post_id}/comments") async def get_post_comments(post_id: str): """ 获取帖子评论 """ comments_db = db.load_data("comments.json", default_data={}) users_db = db.load_data("users.json", default_data={}) # users_db 已经是 {account: user_info} 格式,直接使用 user_map = users_db # comments_db 是 {item_id: [comments]} 格式 post_comments = comments_db.get(post_id, []) # 附加用户信息 result = [] for c in post_comments: author_info = user_map.get(c.get("author"), {}) result.append({ **c, "author_name": author_info.get("name", c.get("author")), "author_avatar": author_info.get("avatarDataUrl", "") }) return {"status": "success", "data": result} @router.post("/api/posts/{post_id}/comments") async def add_post_comment(post_id: str, content: str, current_user: str = Depends(require_auth)): """ 添加帖子评论 """ if not content or not content.strip(): raise HTTPException(status_code=400, detail="评论内容不能为空") posts_db = db.load_data("posts.json", default_data=[]) comments_db = db.load_data("comments.json", default_data={}) # 检查帖子是否存在 post_exists = any(p["id"] == post_id for p in posts_db) if not post_exists: raise HTTPException(status_code=404, detail="帖子不存在") new_comment = { "id": f"comment_{int(time.time())}_{uuid.uuid4().hex[:6]}", "author": current_user, "content": content.strip(), "created_at": int(time.time()) } # comments_db 是 {item_id: [comments]} 格式 post_comments = comments_db.get(post_id, []) post_comments.insert(0, new_comment) comments_db[post_id] = post_comments db.save_data("comments.json", comments_db) # 🗂️ 清除排序缓存(评论数变化可能影响排序) sort_cache.invalidate("posts:") # 更新帖子评论数 for post in posts_db: if post["id"] == post_id: post["comments"] = post.get("comments", 0) + 1 break db.save_data("posts.json", posts_db) return {"status": "success", "data": new_comment} @router.post("/api/posts/{post_id}/view") async def record_post_view(post_id: str, current_user: str = Depends(require_auth)): """ 记录帖子访问量 👁️ 需要用户认证,每个用户只计算一次总访问量,日访问量每次调用都增加 """ result = record_view("posts.json", post_id, current_user) if result is None: raise HTTPException(status_code=404, detail="帖子不存在") return {"status": "success", "views": result["views"], "daily_views": result["daily_views"]}