ComfyUI-Ranking-API / router_posts.py
ZHIWEI666's picture
增加浏览量数据,与原创勾选
8f0ec20 verified
raw
history blame
16.9 kB
# 云端Space代码/router_posts.py
# ==========================================
# 💬 讨论区API路由(小红书风格图文社区)
# ==========================================
# 功能:帖子发布、列表、详情、互动(点赞/收藏/评论/打赏)
# ==========================================
from fastapi import APIRouter, HTTPException, Depends
from models import PostCreate, PostUpdate
import 数据库连接 as db
from 安全认证 import require_auth
from db_utils import record_view, sort_cache
import time
import uuid
router = APIRouter()
# ==========================================
# 📝 帖子CRUD接口
# ==========================================
@router.get("/api/posts")
async def get_posts(page: int = 1, limit: int = 20, sort: str = "latest"):
"""
获取帖子列表(分页,支持多种排序方式)
- sort=latest: 按创建时间降序(默认)
- sort=likes: 按点赞数降序
- sort=favorites: 按收藏数降序
- sort=views: 按总访问量降序
- sort=daily_views: 按日访问量降序
"""
posts_db = db.load_data("posts.json", default_data=[])
users_db = db.load_data("users.json", default_data={})
# users_db 已经是 {account: user_info} 格式,直接使用
user_map = users_db
# 🗂️ 使用排序缓存优化排序性能
cache_key = f"posts:{sort}"
def sort_fn(data):
if sort == "likes":
data.sort(key=lambda x: x.get("likes", 0), reverse=True)
elif sort == "favorites":
data.sort(key=lambda x: x.get("favorites", 0), reverse=True)
elif sort == "views":
data.sort(key=lambda x: x.get("views", 0), reverse=True)
elif sort == "daily_views":
data.sort(key=lambda x: x.get("daily_views", 0), reverse=True)
else: # latest 或其他默认
data.sort(key=lambda x: x.get("created_at", 0), reverse=True)
sorted_posts = sort_cache.get_sorted(cache_key, posts_db, sort_fn)
# 分页
start = (page - 1) * limit
end = start + limit
paged_posts = sorted_posts[start:end]
# 附加作者信息,并过滤敏感字段
result = []
for post in paged_posts:
author_info = user_map.get(post.get("author"), {})
post_data = {
**post,
"author_name": author_info.get("name", post.get("author")),
"author_avatar": author_info.get("avatarDataUrl", "")
}
# 过滤敏感字段(列表接口过滤 viewed_by、liked_by、favorited_by)
post_data.pop("viewed_by", None)
post_data.pop("liked_by", None)
post_data.pop("favorited_by", None)
result.append(post_data)
return {
"status": "success",
"data": result,
"total": len(posts_db),
"page": page,
"limit": limit
}
@router.get("/api/my-posts")
async def get_my_posts(current_user: str = Depends(require_auth)):
"""
获取我的帖子列表
"""
posts_db = db.load_data("posts.json", default_data=[])
# 筛选当前用户的帖子
my_posts = [p for p in posts_db if p.get("author") == current_user]
# 按创建时间倒序
my_posts = sorted(my_posts, key=lambda x: x.get("created_at", 0), reverse=True)
# 过滤敏感字段(列表接口过滤 viewed_by、liked_by、favorited_by)
result = []
for post in my_posts:
post_data = dict(post)
post_data.pop("viewed_by", None)
post_data.pop("liked_by", None)
post_data.pop("favorited_by", None)
result.append(post_data)
return {
"status": "success",
"data": result
}
@router.get("/api/posts/{post_id}")
async def get_post_detail(post_id: str):
"""
获取帖子详情
"""
posts_db = db.load_data("posts.json", default_data=[])
users_db = db.load_data("users.json", default_data={})
# users_db 已经是 {account: user_info} 格式,直接使用
user_map = users_db
for post in posts_db:
if post["id"] == post_id:
author_info = user_map.get(post.get("author"), {})
post_data = {
**post,
"author_name": author_info.get("name", post.get("author")),
"author_avatar": author_info.get("avatarDataUrl", "")
}
# 过滤敏感字段
post_data.pop("viewed_by", None)
return {
"status": "success",
"data": post_data
}
raise HTTPException(status_code=404, detail="帖子不存在")
@router.post("/api/posts")
async def create_post(post: PostCreate, current_user: str = Depends(require_auth)):
"""
发布帖子
"""
posts_db = db.load_data("posts.json", default_data=[])
# 限制图片数量
images = (post.images or [])[:9]
new_post = {
"id": f"post_{int(time.time())}_{uuid.uuid4().hex[:6]}",
"title": post.title,
"content": post.content,
"cover_image": post.cover_image,
"images": images,
"author": current_user,
"created_at": int(time.time()),
"is_original": post.is_original if post.is_original is not None else False, # 🎨 原创作品标记
# 互动数据
"likes": 0,
"favorites": 0,
"comments": 0,
"liked_by": [],
"favorited_by": [],
"tip_board": [] # 打赏榜单
}
posts_db.insert(0, new_post)
db.save_data("posts.json", posts_db)
# 🗂️ 清除排序缓存
sort_cache.invalidate("posts:")
return {"status": "success", "data": new_post}
@router.put("/api/posts/{post_id}")
async def update_post(post_id: str, update_data: PostUpdate, current_user: str = Depends(require_auth)):
"""
更新帖子(仅作者可操作)
"""
posts_db = db.load_data("posts.json", default_data=[])
for post in posts_db:
if post["id"] == post_id:
if post.get("author") != current_user:
raise HTTPException(status_code=403, detail="无权修改他人帖子")
if update_data.title is not None:
post["title"] = update_data.title
if update_data.content is not None:
post["content"] = update_data.content
if update_data.cover_image is not None:
post["cover_image"] = update_data.cover_image
if update_data.images is not None:
post["images"] = update_data.images[:9]
if update_data.is_original is not None:
post["is_original"] = update_data.is_original # 🎨 更新原创作品标记
db.save_data("posts.json", posts_db)
# 🗂️ 清除排序缓存
sort_cache.invalidate("posts:")
return {"status": "success"}
raise HTTPException(status_code=404, detail="帖子不存在")
@router.delete("/api/posts/{post_id}")
async def delete_post(post_id: str, current_user: str = Depends(require_auth)):
"""
删除帖子(仅作者可操作)
"""
posts_db = db.load_data("posts.json", default_data=[])
for i, post in enumerate(posts_db):
if post["id"] == post_id:
if post.get("author") != current_user:
raise HTTPException(status_code=403, detail="无权删除他人帖子")
posts_db.pop(i)
db.save_data("posts.json", posts_db)
# 🗂️ 清除排序缓存
sort_cache.invalidate("posts:")
return {"status": "success"}
raise HTTPException(status_code=404, detail="帖子不存在")
# ==========================================
# ❤️ 互动接口(点赞/收藏)
# ==========================================
@router.post("/api/posts/{post_id}/like")
async def toggle_like(post_id: str, current_user: str = Depends(require_auth)):
"""
点赞/取消点赞(原子操作,并发安全)
"""
result_container = [None]
def updater(data):
for post in data:
if post["id"] == post_id:
liked_by = post.get("liked_by", [])
if current_user in liked_by:
liked_by.remove(current_user)
post["likes"] = max(0, post.get("likes", 0) - 1)
action = "unliked"
else:
liked_by.append(current_user)
post["likes"] = post.get("likes", 0) + 1
action = "liked"
post["liked_by"] = liked_by
result_container[0] = {"status": "success", "action": action, "likes": post["likes"]}
return
result_container[0] = None # 未找到帖子
db.atomic_update("posts.json", updater, default_data=[])
if result_container[0] is None:
raise HTTPException(status_code=404, detail="帖子不存在")
# 🗂️ 清除排序缓存(点赞数变化可能影响排序)
sort_cache.invalidate("posts:")
return result_container[0]
@router.post("/api/posts/{post_id}/favorite")
async def toggle_favorite(post_id: str, current_user: str = Depends(require_auth)):
"""
收藏/取消收藏(原子操作,并发安全)
"""
result_container = [None]
def updater(data):
for post in data:
if post["id"] == post_id:
favorited_by = post.get("favorited_by", [])
if current_user in favorited_by:
favorited_by.remove(current_user)
post["favorites"] = max(0, post.get("favorites", 0) - 1)
action = "unfavorited"
else:
favorited_by.append(current_user)
post["favorites"] = post.get("favorites", 0) + 1
action = "favorited"
post["favorited_by"] = favorited_by
result_container[0] = {"status": "success", "action": action, "favorites": post["favorites"]}
return
result_container[0] = None # 未找到帖子
db.atomic_update("posts.json", updater, default_data=[])
if result_container[0] is None:
raise HTTPException(status_code=404, detail="帖子不存在")
# 🗂️ 清除排序缓存(收藏数变化可能影响排序)
sort_cache.invalidate("posts:")
return result_container[0]
# ==========================================
# 🎁 打赏接口
# ==========================================
@router.post("/api/posts/{post_id}/tip")
async def tip_post(post_id: str, amount: int, is_anon: bool = False, current_user: str = Depends(require_auth)):
"""
打赏帖子(原子操作,并发安全)
"""
if amount <= 0:
raise HTTPException(status_code=400, detail="打赏金额必须大于0")
result_container = [None]
def updater(data):
# 在锁内查找帖子
target_post = None
for post in data:
if post["id"] == post_id:
target_post = post
break
if not target_post:
result_container[0] = {"error": "not_found"}
return
# 不能打赏自己
if target_post.get("author") == current_user:
result_container[0] = {"error": "self_tip"}
return
# 在锁内操作用户余额
users_db = db.load_data("users.json", default_data={})
tipper = users_db.get(current_user)
if not tipper or tipper.get("balance", 0) < amount:
result_container[0] = {"error": "insufficient_balance"}
return
author_account = target_post.get("author")
author = users_db.get(author_account)
if not author:
result_container[0] = {"error": "author_not_found"}
return
# 扣款加款
tipper["balance"] -= amount
author["balance"] += amount
# 更新打赏榜单
tip_board = target_post.get("tip_board", [])
existing = next((t for t in tip_board if t["account"] == current_user), None)
if existing:
existing["amount"] += amount
else:
tip_board.append({"account": current_user, "amount": amount, "is_anon": is_anon})
tip_board.sort(key=lambda x: x["amount"], reverse=True)
target_post["tip_board"] = tip_board
# 保存用户数据
db.save_data("users.json", users_db)
result_container[0] = {"status": "success", "message": f"成功打赏 {amount} 积分"}
db.atomic_update("posts.json", updater, default_data=[])
# 🗂️ 清除排序缓存(打赏可能影响排序)
sort_cache.invalidate("posts:")
result = result_container[0]
if result is None or result.get("error") == "not_found":
raise HTTPException(status_code=404, detail="帖子不存在")
if result.get("error") == "self_tip":
raise HTTPException(status_code=400, detail="不能打赏自己的帖子")
if result.get("error") == "insufficient_balance":
raise HTTPException(status_code=400, detail="余额不足")
if result.get("error") == "author_not_found":
raise HTTPException(status_code=404, detail="作者账户不存在")
return result
# ==========================================
# 💬 评论接口(复用通用评论系统)
# ==========================================
@router.get("/api/posts/{post_id}/comments")
async def get_post_comments(post_id: str):
"""
获取帖子评论
"""
comments_db = db.load_data("comments.json", default_data={})
users_db = db.load_data("users.json", default_data={})
# users_db 已经是 {account: user_info} 格式,直接使用
user_map = users_db
# comments_db 是 {item_id: [comments]} 格式
post_comments = comments_db.get(post_id, [])
# 附加用户信息
result = []
for c in post_comments:
author_info = user_map.get(c.get("author"), {})
result.append({
**c,
"author_name": author_info.get("name", c.get("author")),
"author_avatar": author_info.get("avatarDataUrl", "")
})
return {"status": "success", "data": result}
@router.post("/api/posts/{post_id}/comments")
async def add_post_comment(post_id: str, content: str, current_user: str = Depends(require_auth)):
"""
添加帖子评论
"""
if not content or not content.strip():
raise HTTPException(status_code=400, detail="评论内容不能为空")
posts_db = db.load_data("posts.json", default_data=[])
comments_db = db.load_data("comments.json", default_data={})
# 检查帖子是否存在
post_exists = any(p["id"] == post_id for p in posts_db)
if not post_exists:
raise HTTPException(status_code=404, detail="帖子不存在")
new_comment = {
"id": f"comment_{int(time.time())}_{uuid.uuid4().hex[:6]}",
"author": current_user,
"content": content.strip(),
"created_at": int(time.time())
}
# comments_db 是 {item_id: [comments]} 格式
post_comments = comments_db.get(post_id, [])
post_comments.insert(0, new_comment)
comments_db[post_id] = post_comments
db.save_data("comments.json", comments_db)
# 🗂️ 清除排序缓存(评论数变化可能影响排序)
sort_cache.invalidate("posts:")
# 更新帖子评论数
for post in posts_db:
if post["id"] == post_id:
post["comments"] = post.get("comments", 0) + 1
break
db.save_data("posts.json", posts_db)
return {"status": "success", "data": new_comment}
@router.post("/api/posts/{post_id}/view")
async def record_post_view(post_id: str, current_user: str = Depends(require_auth)):
"""
记录帖子访问量
👁️ 需要用户认证,每个用户只计算一次总访问量,日访问量每次调用都增加
"""
result = record_view("posts.json", post_id, current_user)
if result is None:
raise HTTPException(status_code=404, detail="帖子不存在")
return {"status": "success", "views": result["views"], "daily_views": result["daily_views"]}