Spaces:
Running
Running
| # 云端Space代码/router_posts.py | |
| # ========================================== | |
| # 💬 讨论区API路由(小红书风格图文社区) | |
| # ========================================== | |
| # 功能:帖子发布、列表、详情、互动(点赞/收藏/评论/打赏) | |
| # ========================================== | |
| from fastapi import APIRouter, HTTPException, Depends | |
| from models import PostCreate, PostUpdate | |
| import 数据库连接 as db | |
| from 安全认证 import require_auth | |
| from db_utils import record_view, sort_cache | |
| import time | |
| import uuid | |
| router = APIRouter() | |
| # ========================================== | |
| # 📝 帖子CRUD接口 | |
| # ========================================== | |
| async def get_posts(page: int = 1, limit: int = 20, sort: str = "latest"): | |
| """ | |
| 获取帖子列表(分页,支持多种排序方式) | |
| - sort=latest: 按创建时间降序(默认) | |
| - sort=likes: 按点赞数降序 | |
| - sort=favorites: 按收藏数降序 | |
| - sort=views: 按总访问量降序 | |
| - sort=daily_views: 按日访问量降序 | |
| """ | |
| posts_db = db.load_data("posts.json", default_data=[]) | |
| users_db = db.load_data("users.json", default_data={}) | |
| # users_db 已经是 {account: user_info} 格式,直接使用 | |
| user_map = users_db | |
| # 🗂️ 使用排序缓存优化排序性能 | |
| cache_key = f"posts:{sort}" | |
| def sort_fn(data): | |
| if sort == "likes": | |
| data.sort(key=lambda x: x.get("likes", 0), reverse=True) | |
| elif sort == "favorites": | |
| data.sort(key=lambda x: x.get("favorites", 0), reverse=True) | |
| elif sort == "views": | |
| data.sort(key=lambda x: x.get("views", 0), reverse=True) | |
| elif sort == "daily_views": | |
| data.sort(key=lambda x: x.get("daily_views", 0), reverse=True) | |
| else: # latest 或其他默认 | |
| data.sort(key=lambda x: x.get("created_at", 0), reverse=True) | |
| sorted_posts = sort_cache.get_sorted(cache_key, posts_db, sort_fn) | |
| # 分页 | |
| start = (page - 1) * limit | |
| end = start + limit | |
| paged_posts = sorted_posts[start:end] | |
| # 附加作者信息,并过滤敏感字段 | |
| result = [] | |
| for post in paged_posts: | |
| author_info = user_map.get(post.get("author"), {}) | |
| post_data = { | |
| **post, | |
| "author_name": author_info.get("name", post.get("author")), | |
| "author_avatar": author_info.get("avatarDataUrl", "") | |
| } | |
| # 过滤敏感字段(列表接口过滤 viewed_by、liked_by、favorited_by) | |
| post_data.pop("viewed_by", None) | |
| post_data.pop("liked_by", None) | |
| post_data.pop("favorited_by", None) | |
| result.append(post_data) | |
| return { | |
| "status": "success", | |
| "data": result, | |
| "total": len(posts_db), | |
| "page": page, | |
| "limit": limit | |
| } | |
| async def get_my_posts(current_user: str = Depends(require_auth)): | |
| """ | |
| 获取我的帖子列表 | |
| """ | |
| posts_db = db.load_data("posts.json", default_data=[]) | |
| # 筛选当前用户的帖子 | |
| my_posts = [p for p in posts_db if p.get("author") == current_user] | |
| # 按创建时间倒序 | |
| my_posts = sorted(my_posts, key=lambda x: x.get("created_at", 0), reverse=True) | |
| # 过滤敏感字段(列表接口过滤 viewed_by、liked_by、favorited_by) | |
| result = [] | |
| for post in my_posts: | |
| post_data = dict(post) | |
| post_data.pop("viewed_by", None) | |
| post_data.pop("liked_by", None) | |
| post_data.pop("favorited_by", None) | |
| result.append(post_data) | |
| return { | |
| "status": "success", | |
| "data": result | |
| } | |
| async def get_post_detail(post_id: str): | |
| """ | |
| 获取帖子详情 | |
| """ | |
| posts_db = db.load_data("posts.json", default_data=[]) | |
| users_db = db.load_data("users.json", default_data={}) | |
| # users_db 已经是 {account: user_info} 格式,直接使用 | |
| user_map = users_db | |
| for post in posts_db: | |
| if post["id"] == post_id: | |
| author_info = user_map.get(post.get("author"), {}) | |
| post_data = { | |
| **post, | |
| "author_name": author_info.get("name", post.get("author")), | |
| "author_avatar": author_info.get("avatarDataUrl", "") | |
| } | |
| # 过滤敏感字段 | |
| post_data.pop("viewed_by", None) | |
| return { | |
| "status": "success", | |
| "data": post_data | |
| } | |
| raise HTTPException(status_code=404, detail="帖子不存在") | |
| async def create_post(post: PostCreate, current_user: str = Depends(require_auth)): | |
| """ | |
| 发布帖子 | |
| """ | |
| posts_db = db.load_data("posts.json", default_data=[]) | |
| # 限制图片数量 | |
| images = (post.images or [])[:9] | |
| new_post = { | |
| "id": f"post_{int(time.time())}_{uuid.uuid4().hex[:6]}", | |
| "title": post.title, | |
| "content": post.content, | |
| "cover_image": post.cover_image, | |
| "images": images, | |
| "author": current_user, | |
| "created_at": int(time.time()), | |
| "is_original": post.is_original if post.is_original is not None else False, # 🎨 原创作品标记 | |
| # 互动数据 | |
| "likes": 0, | |
| "favorites": 0, | |
| "comments": 0, | |
| "liked_by": [], | |
| "favorited_by": [], | |
| "tip_board": [] # 打赏榜单 | |
| } | |
| posts_db.insert(0, new_post) | |
| db.save_data("posts.json", posts_db) | |
| # 🗂️ 清除排序缓存 | |
| sort_cache.invalidate("posts:") | |
| return {"status": "success", "data": new_post} | |
| async def update_post(post_id: str, update_data: PostUpdate, current_user: str = Depends(require_auth)): | |
| """ | |
| 更新帖子(仅作者可操作) | |
| """ | |
| posts_db = db.load_data("posts.json", default_data=[]) | |
| for post in posts_db: | |
| if post["id"] == post_id: | |
| if post.get("author") != current_user: | |
| raise HTTPException(status_code=403, detail="无权修改他人帖子") | |
| if update_data.title is not None: | |
| post["title"] = update_data.title | |
| if update_data.content is not None: | |
| post["content"] = update_data.content | |
| if update_data.cover_image is not None: | |
| post["cover_image"] = update_data.cover_image | |
| if update_data.images is not None: | |
| post["images"] = update_data.images[:9] | |
| if update_data.is_original is not None: | |
| post["is_original"] = update_data.is_original # 🎨 更新原创作品标记 | |
| db.save_data("posts.json", posts_db) | |
| # 🗂️ 清除排序缓存 | |
| sort_cache.invalidate("posts:") | |
| return {"status": "success"} | |
| raise HTTPException(status_code=404, detail="帖子不存在") | |
| async def delete_post(post_id: str, current_user: str = Depends(require_auth)): | |
| """ | |
| 删除帖子(仅作者可操作) | |
| """ | |
| posts_db = db.load_data("posts.json", default_data=[]) | |
| for i, post in enumerate(posts_db): | |
| if post["id"] == post_id: | |
| if post.get("author") != current_user: | |
| raise HTTPException(status_code=403, detail="无权删除他人帖子") | |
| posts_db.pop(i) | |
| db.save_data("posts.json", posts_db) | |
| # 🗂️ 清除排序缓存 | |
| sort_cache.invalidate("posts:") | |
| return {"status": "success"} | |
| raise HTTPException(status_code=404, detail="帖子不存在") | |
| # ========================================== | |
| # ❤️ 互动接口(点赞/收藏) | |
| # ========================================== | |
| async def toggle_like(post_id: str, current_user: str = Depends(require_auth)): | |
| """ | |
| 点赞/取消点赞(原子操作,并发安全) | |
| """ | |
| result_container = [None] | |
| def updater(data): | |
| for post in data: | |
| if post["id"] == post_id: | |
| liked_by = post.get("liked_by", []) | |
| if current_user in liked_by: | |
| liked_by.remove(current_user) | |
| post["likes"] = max(0, post.get("likes", 0) - 1) | |
| action = "unliked" | |
| else: | |
| liked_by.append(current_user) | |
| post["likes"] = post.get("likes", 0) + 1 | |
| action = "liked" | |
| post["liked_by"] = liked_by | |
| result_container[0] = {"status": "success", "action": action, "likes": post["likes"]} | |
| return | |
| result_container[0] = None # 未找到帖子 | |
| db.atomic_update("posts.json", updater, default_data=[]) | |
| if result_container[0] is None: | |
| raise HTTPException(status_code=404, detail="帖子不存在") | |
| # 🗂️ 清除排序缓存(点赞数变化可能影响排序) | |
| sort_cache.invalidate("posts:") | |
| return result_container[0] | |
| async def toggle_favorite(post_id: str, current_user: str = Depends(require_auth)): | |
| """ | |
| 收藏/取消收藏(原子操作,并发安全) | |
| """ | |
| result_container = [None] | |
| def updater(data): | |
| for post in data: | |
| if post["id"] == post_id: | |
| favorited_by = post.get("favorited_by", []) | |
| if current_user in favorited_by: | |
| favorited_by.remove(current_user) | |
| post["favorites"] = max(0, post.get("favorites", 0) - 1) | |
| action = "unfavorited" | |
| else: | |
| favorited_by.append(current_user) | |
| post["favorites"] = post.get("favorites", 0) + 1 | |
| action = "favorited" | |
| post["favorited_by"] = favorited_by | |
| result_container[0] = {"status": "success", "action": action, "favorites": post["favorites"]} | |
| return | |
| result_container[0] = None # 未找到帖子 | |
| db.atomic_update("posts.json", updater, default_data=[]) | |
| if result_container[0] is None: | |
| raise HTTPException(status_code=404, detail="帖子不存在") | |
| # 🗂️ 清除排序缓存(收藏数变化可能影响排序) | |
| sort_cache.invalidate("posts:") | |
| return result_container[0] | |
| # ========================================== | |
| # 🎁 打赏接口 | |
| # ========================================== | |
| async def tip_post(post_id: str, amount: int, is_anon: bool = False, current_user: str = Depends(require_auth)): | |
| """ | |
| 打赏帖子(原子操作,并发安全) | |
| """ | |
| if amount <= 0: | |
| raise HTTPException(status_code=400, detail="打赏金额必须大于0") | |
| result_container = [None] | |
| def updater(data): | |
| # 在锁内查找帖子 | |
| target_post = None | |
| for post in data: | |
| if post["id"] == post_id: | |
| target_post = post | |
| break | |
| if not target_post: | |
| result_container[0] = {"error": "not_found"} | |
| return | |
| # 不能打赏自己 | |
| if target_post.get("author") == current_user: | |
| result_container[0] = {"error": "self_tip"} | |
| return | |
| # 在锁内操作用户余额 | |
| users_db = db.load_data("users.json", default_data={}) | |
| tipper = users_db.get(current_user) | |
| if not tipper or tipper.get("balance", 0) < amount: | |
| result_container[0] = {"error": "insufficient_balance"} | |
| return | |
| author_account = target_post.get("author") | |
| author = users_db.get(author_account) | |
| if not author: | |
| result_container[0] = {"error": "author_not_found"} | |
| return | |
| # 扣款加款 | |
| tipper["balance"] -= amount | |
| author["balance"] += amount | |
| # 更新打赏榜单 | |
| tip_board = target_post.get("tip_board", []) | |
| existing = next((t for t in tip_board if t["account"] == current_user), None) | |
| if existing: | |
| existing["amount"] += amount | |
| else: | |
| tip_board.append({"account": current_user, "amount": amount, "is_anon": is_anon}) | |
| tip_board.sort(key=lambda x: x["amount"], reverse=True) | |
| target_post["tip_board"] = tip_board | |
| # 保存用户数据 | |
| db.save_data("users.json", users_db) | |
| result_container[0] = {"status": "success", "message": f"成功打赏 {amount} 积分"} | |
| db.atomic_update("posts.json", updater, default_data=[]) | |
| # 🗂️ 清除排序缓存(打赏可能影响排序) | |
| sort_cache.invalidate("posts:") | |
| result = result_container[0] | |
| if result is None or result.get("error") == "not_found": | |
| raise HTTPException(status_code=404, detail="帖子不存在") | |
| if result.get("error") == "self_tip": | |
| raise HTTPException(status_code=400, detail="不能打赏自己的帖子") | |
| if result.get("error") == "insufficient_balance": | |
| raise HTTPException(status_code=400, detail="余额不足") | |
| if result.get("error") == "author_not_found": | |
| raise HTTPException(status_code=404, detail="作者账户不存在") | |
| return result | |
| # ========================================== | |
| # 💬 评论接口(复用通用评论系统) | |
| # ========================================== | |
| async def get_post_comments(post_id: str): | |
| """ | |
| 获取帖子评论 | |
| """ | |
| comments_db = db.load_data("comments.json", default_data={}) | |
| users_db = db.load_data("users.json", default_data={}) | |
| # users_db 已经是 {account: user_info} 格式,直接使用 | |
| user_map = users_db | |
| # comments_db 是 {item_id: [comments]} 格式 | |
| post_comments = comments_db.get(post_id, []) | |
| # 附加用户信息 | |
| result = [] | |
| for c in post_comments: | |
| author_info = user_map.get(c.get("author"), {}) | |
| result.append({ | |
| **c, | |
| "author_name": author_info.get("name", c.get("author")), | |
| "author_avatar": author_info.get("avatarDataUrl", "") | |
| }) | |
| return {"status": "success", "data": result} | |
| async def add_post_comment(post_id: str, content: str, current_user: str = Depends(require_auth)): | |
| """ | |
| 添加帖子评论 | |
| """ | |
| if not content or not content.strip(): | |
| raise HTTPException(status_code=400, detail="评论内容不能为空") | |
| posts_db = db.load_data("posts.json", default_data=[]) | |
| comments_db = db.load_data("comments.json", default_data={}) | |
| # 检查帖子是否存在 | |
| post_exists = any(p["id"] == post_id for p in posts_db) | |
| if not post_exists: | |
| raise HTTPException(status_code=404, detail="帖子不存在") | |
| new_comment = { | |
| "id": f"comment_{int(time.time())}_{uuid.uuid4().hex[:6]}", | |
| "author": current_user, | |
| "content": content.strip(), | |
| "created_at": int(time.time()) | |
| } | |
| # comments_db 是 {item_id: [comments]} 格式 | |
| post_comments = comments_db.get(post_id, []) | |
| post_comments.insert(0, new_comment) | |
| comments_db[post_id] = post_comments | |
| db.save_data("comments.json", comments_db) | |
| # 🗂️ 清除排序缓存(评论数变化可能影响排序) | |
| sort_cache.invalidate("posts:") | |
| # 更新帖子评论数 | |
| for post in posts_db: | |
| if post["id"] == post_id: | |
| post["comments"] = post.get("comments", 0) + 1 | |
| break | |
| db.save_data("posts.json", posts_db) | |
| return {"status": "success", "data": new_comment} | |
| async def record_post_view(post_id: str, current_user: str = Depends(require_auth)): | |
| """ | |
| 记录帖子访问量 | |
| 👁️ 需要用户认证,每个用户只计算一次总访问量,日访问量每次调用都增加 | |
| """ | |
| result = record_view("posts.json", post_id, current_user) | |
| if result is None: | |
| raise HTTPException(status_code=404, detail="帖子不存在") | |
| return {"status": "success", "views": result["views"], "daily_views": result["daily_views"]} | |