Spaces:
Running
Running
File size: 7,504 Bytes
088dffd 2b73d73 088dffd 2b73d73 088dffd 7d1c681 088dffd d9531e6 088dffd d9531e6 088dffd d9531e6 088dffd 2b73d73 088dffd 2b73d73 8f0ec20 2b73d73 088dffd 2b73d73 7afc96b 2b73d73 7afc96b 2b73d73 088dffd 2b73d73 7afc96b 2b73d73 088dffd 2b73d73 8f0ec20 2b73d73 7afc96b 2b73d73 7afc96b 2b73d73 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | # router_comments.py
from fastapi import APIRouter, HTTPException, Depends
import time
import uuid
import 数据库连接 as db
from notifications import add_notification
from models import InteractionToggle, CommentCreate
from 安全认证 import require_auth, is_admin
router = APIRouter()
@router.post("/api/interactions/toggle")
async def toggle_interaction(interaction: InteractionToggle):
items_db = db.load_data("items.json", default_data=[])
target_item = next((item for item in items_db if item["id"] == interaction.item_id), None)
if not target_item: raise HTTPException(status_code=404, detail="目标存在问题")
list_key = f"{interaction.action_type}d_by"
count_key = "likes" if interaction.action_type == "like" else "favorites"
if list_key not in target_item: target_item[list_key] = []; target_item[count_key] = 0
user_list = target_item[list_key]
if interaction.is_active:
if interaction.user_id not in user_list:
user_list.append(interaction.user_id)
target_item[count_key] += 1
add_notification(target_item.get("author", ""), {"type": interaction.action_type, "from_user": interaction.user_id, "target_item_id": target_item["id"], "target_item_title": target_item["title"]})
else:
if interaction.user_id in user_list:
user_list.remove(interaction.user_id)
target_item[count_key] = max(0, target_item[count_key] - 1)
db.save_data("items.json", items_db)
return {"status": "success", "new_count": target_item[count_key]}
@router.post("/api/comments")
async def post_comment(comment: CommentCreate):
comments_db = db.load_data("comments.json", default_data={})
users_db = db.load_data("users.json", default_data={})
author_info = users_db.get(comment.author, {})
reply_name = users_db.get(comment.reply_to_user, {}).get("name", comment.reply_to_user) if comment.reply_to_user else None
item_comments = comments_db.get(comment.item_id, [])
new_comment = {
"id": f"c_{int(time.time())}_{uuid.uuid4().hex[:6]}", "author": comment.author,
"authorName": author_info.get("name", comment.author), "avatar": author_info.get("avatarDataUrl", ""),
"content": comment.content, "replyToUser": comment.reply_to_user, "replyToUserName": reply_name,
"isDeleted": False, "replies": [], "created_at": int(time.time())
}
if comment.parent_id:
parent = next((c for c in item_comments if c["id"] == comment.parent_id), None)
if parent:
parent["replies"].append(new_comment)
if comment.reply_to_user:
add_notification(comment.reply_to_user, {"type": "reply", "from_user": comment.author, "target_item_id": comment.item_id, "target_item_title": "收到新的回复", "content": comment.content})
else: raise HTTPException(status_code=404, detail="找不到要回复的评论")
else:
item_comments.append(new_comment)
items_db = db.load_data("items.json", default_data=[])
posts_db = db.load_data("posts.json", default_data=[])
target_item = next((item for item in items_db if item["id"] == comment.item_id), None)
target_post = next((post for post in posts_db if post["id"] == comment.item_id), None)
if target_item:
add_notification(target_item.get("author", ""), {"type": "comment", "from_user": comment.author, "target_item_id": comment.item_id, "target_item_title": target_item["title"], "content": comment.content})
elif target_post:
add_notification(target_post.get("author", ""), {"type": "comment", "from_user": comment.author, "target_item_id": comment.item_id, "target_item_title": target_post.get("title", "") or "", "content": comment.content})
elif comment.item_id in users_db:
add_notification(comment.item_id, {"type": "comment", "from_user": comment.author, "target_item_id": comment.item_id, "target_item_title": "您的个人留言板", "content": comment.content})
comments_db[comment.item_id] = item_comments
db.save_data("comments.json", comments_db)
return {"status": "success", "data": new_comment}
@router.delete("/api/comments/{item_id}/{comment_id}")
async def soft_delete_comment(item_id: str, comment_id: str, account: str = Depends(require_auth)):
"""
软删除评论
权限规则:评论作者可删除自己的评论,帖子/内容作者可删除其帖子下的评论,管理员可删除任何评论
"""
comments_db = db.load_data("comments.json", default_data={})
items_db = db.load_data("items.json", default_data=[])
posts_db = db.load_data("posts.json", default_data=[])
users_db = db.load_data("users.json", default_data={})
tasks_db = db.load_data("tasks.json", default_data=[])
item_comments = comments_db.get(item_id, [])
target_comment = None
def find_comment(comments_list, depth=0):
"""查找目标评论"""
if depth > 20: # 防止极端嵌套导致栈溢出
return False
nonlocal target_comment
for c in comments_list:
if c["id"] == comment_id:
target_comment = c
return True
if "replies" in c and find_comment(c["replies"], depth + 1):
return True
return False
if not find_comment(item_comments):
raise HTTPException(status_code=404, detail="找不到该评论")
# 权限检查
is_comment_author = target_comment["author"] == account
is_admin_user = is_admin(account)
# 检查是否为内容作者(帖子作者或资源作者)
is_content_author = False
# 检查是否为帖子作者
for post in posts_db:
if post["id"] == item_id and post.get("author") == account:
is_content_author = True
break
# 检查是否为资源作者
if not is_content_author:
for item in items_db:
if item["id"] == item_id and item.get("author") == account:
is_content_author = True
break
# 任务发布者
if not is_content_author:
for task in tasks_db:
if task["id"] == item_id and task.get("publisher") == account:
is_content_author = True
break
# 检查是否为个人主页留言板作者
if not is_content_author:
if item_id in users_db and item_id == account:
is_content_author = True
# 权限判断:评论作者、内容作者或管理员可删除
if not (is_comment_author or is_content_author or is_admin_user):
raise HTTPException(status_code=403, detail="无权删除此评论")
# 执行软删除
def mark_deleted(comments_list, depth=0):
if depth > 20: # 防止极端嵌套导致栈溢出
return False
for c in comments_list:
if c["id"] == comment_id:
c["isDeleted"] = True
c["content"] = ""
return True
if "replies" in c and mark_deleted(c["replies"], depth + 1):
return True
return False
mark_deleted(item_comments)
comments_db[item_id] = item_comments
db.save_data("comments.json", comments_db)
return {"status": "success"} |