# 云端Space代码/router_tasks.py # ========================================== # 📝 任务榜API路由 # ========================================== # 功能:任务发布、接单、提交、验收、申诉全流程 # 状态:open → in_progress → submitted → completed/disputed # 💳 P6支付增强: # - 发布时冻结全额,避免支付不足 # - 记录每笔交易流水 # - 超时自动退款 # - 支付通知推送 # ========================================== from fastapi import APIRouter, HTTPException, Depends, Query from sqlalchemy.orm import Session from models import TaskCreate, TaskUpdate, TaskApply, TaskAssign, TaskSubmit, TaskAccept import 数据库连接 as db from 安全认证 import require_auth, is_admin from notifications import add_notification from database_sql import get_db from models_sql import Wallet, Transaction from db_utils import record_view, sort_cache import time import uuid import hashlib import logging router = APIRouter() # 📝 审计日志 logger = logging.getLogger("ComfyUI-Ranking.Tasks") # ========================================== # 💳 交易记录工具函数 # ========================================== def calculate_tx_hash(tx_id, account, tx_type, amount, prev_hash): data = f"{tx_id}{account}{tx_type}{amount}{prev_hash}" return hashlib.sha256(data.encode()).hexdigest() def create_task_transaction(db_session: Session, account: str, tx_type: str, amount: int, related_account: str = None, task_id: str = None, task_title: str = None, related_user_name: str = None): """ 创建任务相关交易记录 tx_type: TASK_FREEZE, TASK_DEPOSIT, TASK_PAYMENT, TASK_INCOME, TASK_REFUND """ tx_id = f"TASK_{tx_type}_{int(time.time())}_{uuid.uuid4().hex[:6]}" last_tx = db_session.query(Transaction).filter(Transaction.account == account).order_by(Transaction.created_at.desc()).first() prev_hash = last_tx.tx_hash if last_tx else "GENESIS_HASH" tx_hash = calculate_tx_hash(tx_id, account, tx_type, amount, prev_hash) # 根据 tx_type 构造 description type_labels = { "TASK_FREEZE": "任务冻结", "TASK_DEPOSIT": "任务订金", "TASK_PAYMENT": "任务尾款", "TASK_INCOME": "任务收入", "TASK_REFUND": "任务退款", "TASK_CANCEL_REFUND": "任务取消退款", } label = type_labels.get(tx_type, tx_type) desc = f"{label}: {task_title}" if task_title else label new_tx = Transaction( tx_id=tx_id, account=account, tx_type=tx_type, amount=amount, related_account=related_account, item_id=task_id, # 复用 item_id 字段存储任务ID prev_hash=prev_hash, tx_hash=tx_hash, description=desc, item_title=task_title, item_type="task", related_user_name=related_user_name ) db_session.add(new_tx) logger.info(f"TASK_TX | type={tx_type} | account={account} | amount={amount} | task={task_id} | tx={tx_id}") return tx_id # ========================================== # 📋 任务状态常量 # ========================================== TASK_STATUS = { "open": "开放接单", "in_progress": "进行中", "submitted": "待验收", "completed": "已完成", "disputed": "争议中", "cancelled": "已取消", "expired": "已过期" } # ========================================== # 📝 任务CRUD接口 # ========================================== # ========================================== # 🔄 过期任务自动处理 + 自动退款 # ========================================== import datetime def check_and_update_expired_tasks(tasks_db, db_session=None): """ 检查并更新过期任务状态 - open 状态且超过截止日期:自动取消,退还冻结金额 - in_progress 状态且超过截止日期:标记为过期(不自动取消,需双方处理) 💳 P6支付增强:过期时自动退款 💳 P0修复:两阶段提交,确保SQL事务与JSON修改原子性 """ today = datetime.date.today().isoformat() # "2026-03-30" updated = False expired_task_indices = [] # 记录过期任务的索引 # ========== 第一阶段:预检查(只读,收集需要退款的任务列表) ========== for idx, task in enumerate(tasks_db): deadline = task.get("deadline", "") status = task.get("status", "") if not deadline: continue # 检查是否过期 if deadline < today: if status == "open": # 开放接单状态且过期:记录需要处理 frozen_amount = task.get("frozen_amount", task.get("total_price", 0)) expired_task_indices.append({ "index": idx, "task_id": task["id"], "publisher": task.get("publisher"), "amount": frozen_amount, "title": task.get("title", "") }) updated = True elif status == "in_progress": # 进行中且过期:直接标记(不涉及资金) task["is_overdue"] = True updated = True # 如果没有过期任务,直接返回 if not expired_task_indices: return updated # 如果没有 db_session,只标记状态,不处理退款 if not db_session: for item in expired_task_indices: task = tasks_db[item["index"]] task["status"] = "expired" task["expired_at"] = int(time.time()) # 不标记退款,等待下次带 db_session 的调用 return updated # ========== 第二阶段:执行SQL事务(钱包退款+交易记录) ========== refund_results = [] # 记录退款成功的任务 sql_committed = False try: for item in expired_task_indices: if item["amount"] > 0: wallet = db_session.query(Wallet).filter(Wallet.account == item["publisher"]).with_for_update().first() if wallet: wallet.frozen_balance = max(0, wallet.frozen_balance - item["amount"]) wallet.balance += item["amount"] # 记录退款交易 create_task_transaction( db_session, item["publisher"], "TASK_REFUND", item["amount"], task_id=item["task_id"], task_title=item["title"] ) refund_results.append(item) # 💳 P0修复:先 commit SQL 事务 db_session.commit() sql_committed = True logger.info(f"TASK_EXPIRED_SQL_COMMIT | refunded_tasks={len(refund_results)}") except Exception as e: # 💳 P0修复:SQL事务失败,回滚所有操作,不修改JSON db_session.rollback() logger.error(f"TASK_EXPIRED_SQL_ROLLBACK | error={str(e)}") # 不修改 JSON,下次再试 return False # ========== 第三阶段:SQL成功后修改JSON任务状态 ========== if sql_committed: try: for item in expired_task_indices: task = tasks_db[item["index"]] task["status"] = "expired" task["expired_at"] = int(time.time()) if item["amount"] > 0: task["refunded"] = True task["refund_amount"] = item["amount"] # 💳 P0修复:保存JSON数据 db.save_data("tasks.json", tasks_db) logger.info(f"TASK_EXPIRED_JSON_SAVED | tasks={len(expired_task_indices)}") except Exception as e: # 💳 P0修复:JSON保存失败,记录warning但不回滚SQL(资金已转移,可后续手动恢复) logger.warning(f"TASK_EXPIRED_JSON_SAVE_FAILED | error={str(e)} | 资金已退款但任务状态未更新,需手动修复") # 不抛出异常,因为SQL事务已经成功,资金已经转移 # 发送退款通知(在事务外执行,失败不影响主流程) for item in refund_results: try: add_notification(item["publisher"], { "type": "task_refund", "from_user": "system", "target_item_id": item["task_id"], "target_item_title": item["title"], "content": f"💰 任务《{item['title']}》已过期自动取消,{item['amount']}积分已退还" }) logger.info(f"TASK_REFUND | publisher={item['publisher']} | task={item['task_id']} | amount={item['amount']}") except Exception as e: logger.warning(f"TASK_REFUND_NOTIFY_ERROR | task={item['task_id']} | error={str(e)}") return updated @router.get("/api/tasks") async def get_tasks( page: int = 1, limit: int = 20, status: str = None, sort: str = "latest", db_session: Session = Depends(get_db) ): """ 获取任务列表(分页) - status: 筛选状态(open/in_progress/completed/expired等) - sort: latest(最新)/price(价格高)/deadline(截止日期近) """ tasks_db = db.load_data("tasks.json", default_data=[]) users_db = db.load_data("users.json", default_data={}) # 自动检查并更新过期任务(带自动退款) if check_and_update_expired_tasks(tasks_db, db_session): db.save_data("tasks.json", tasks_db) # users_db 已经是 {account: user_info} 格式,直接使用 user_map = users_db # 状态筛选(默认排除已取消和过期的) filtered = tasks_db if status: filtered = [t for t in filtered if t.get("status") == status] else: # 默认不显示已取消和过期的任务 filtered = [t for t in filtered if t.get("status") not in ["cancelled", "expired"]] # 🗂️ 使用排序缓存优化排序性能 cache_key = f"tasks:{status or 'all'}:{sort}" def sort_fn(data): if sort == "price": data.sort(key=lambda x: x.get("total_price", 0), reverse=True) elif sort == "deadline": data.sort(key=lambda x: x.get("deadline", "9999")) elif sort == "views": # 👁️ 按总访问量排序 data.sort(key=lambda x: x.get("views", 0), reverse=True) elif sort == "daily_views": # 👁️ 按日访问量排序 data.sort(key=lambda x: x.get("daily_views", 0), reverse=True) elif sort == "likes": # ❤️ 按点赞数排序 data.sort(key=lambda x: x.get("likes", 0), reverse=True) elif sort == "favorites": # ❤️ 按收藏数排序 data.sort(key=lambda x: x.get("favorites", 0), reverse=True) else: # latest data.sort(key=lambda x: x.get("created_at", 0), reverse=True) filtered = sort_cache.get_sorted(cache_key, filtered, sort_fn) # 分页 start = (page - 1) * limit end = start + limit paged = filtered[start:end] # 附加发布者信息 result = [] for task in paged: publisher_info = user_map.get(task.get("publisher"), {}) # 👁️ 过滤敏感字段(列表接口过滤 liked_by, favorited_by, viewed_by) task_data = {k: v for k, v in task.items() if k not in ["viewed_by", "liked_by", "favorited_by"]} result.append({ **task_data, "publisher_name": publisher_info.get("name", task.get("publisher")), "publisher_avatar": publisher_info.get("avatarDataUrl", ""), "status_text": TASK_STATUS.get(task.get("status"), "未知") }) return { "status": "success", "data": result, "total": len(filtered), "page": page, "limit": limit } @router.get("/api/tasks/{task_id}") async def get_task_detail(task_id: str, current_user: str = None): """ 获取任务详情 """ tasks_db = db.load_data("tasks.json", default_data=[]) users_db = db.load_data("users.json", default_data={}) # users_db 已经是 {account: user_info} 格式,直接使用 user_map = users_db for task in tasks_db: if task["id"] == task_id: publisher_info = user_map.get(task.get("publisher"), {}) assignee_info = user_map.get(task.get("assignee"), {}) if task.get("assignee") else None # 构建申请者列表(带用户信息) applicants_with_info = [] for app in task.get("applicants", []): app_user = user_map.get(app.get("account"), {}) applicants_with_info.append({ **app, "name": app_user.get("name", app.get("account")), "avatar": app_user.get("avatarDataUrl", "") }) # 👁️ 过滤敏感字段(详情接口保留 liked_by 和 favorited_by,过滤 viewed_by) task_data = {k: v for k, v in task.items() if k != "viewed_by"} return { "status": "success", "data": { **task_data, "publisher_name": publisher_info.get("name", task.get("publisher")), "publisher_avatar": publisher_info.get("avatarDataUrl", ""), "assignee_name": assignee_info.get("name") if assignee_info else None, "assignee_avatar": assignee_info.get("avatarDataUrl") if assignee_info else None, "applicants": applicants_with_info, "status_text": TASK_STATUS.get(task.get("status"), "未知") } } raise HTTPException(status_code=404, detail="任务不存在") @router.post("/api/tasks") async def create_task(task: TaskCreate, current_user: str = Depends(require_auth), db_session: Session = Depends(get_db)): """ 发布新任务 💳 P6支付增强:发布时冻结全额,避免支付时余额不足 """ tasks_db = db.load_data("tasks.json", default_data=[]) # 验证订金比例 if task.depositRatio not in [10, 20, 30, 50]: raise HTTPException(status_code=400, detail="订金比例必须是 10/20/30/50 之一") # 验证价格 if task.totalPrice < 0: raise HTTPException(status_code=400, detail="任务价格不能低于0积分") # 💳 使用SQL钱包检查余额并冻结 wallet = db_session.query(Wallet).filter(Wallet.account == current_user).with_for_update().first() if not wallet: wallet = Wallet(account=current_user, balance=0) db_session.add(wallet) db_session.flush() if wallet.balance < task.totalPrice: raise HTTPException(status_code=400, detail=f"余额不足,需要{task.totalPrice}积分,当前{wallet.balance}积分") # 💳 冻结全额(从余额转移到冻结余额) wallet.balance -= task.totalPrice wallet.frozen_balance += task.totalPrice # 计算订金金额 deposit_amount = int(task.totalPrice * task.depositRatio / 100) task_id = f"task_{int(time.time())}_{uuid.uuid4().hex[:6]}" new_task = { "id": task_id, "title": task.title, "description": task.description, "reference_images": (task.referenceImages or [])[:6], "reference_link": task.referenceLink, "total_price": task.totalPrice, "deposit_ratio": task.depositRatio, "deposit_amount": deposit_amount, "deadline": task.deadline, "publisher": current_user, "status": "open", "created_at": int(time.time()), # 接单相关 "applicants": [], # 申请接单的用户列表 "assignee": None, # 指派的接单者 "assigned_at": None, # 指派时间 # 交付相关 "deliverables": [], # 提交的成果 "submit_note": None, # 提交备注 "submitted_at": None, # 提交时间 # 验收相关 "completed_at": None, # 完成时间 "feedback": None, # 验收反馈 # 申诉相关 "dispute_id": None, # 关联的申诉ID # 💳 支付状态 "frozen_amount": task.totalPrice, # 已冻结金额 # 互动数据 "likes": 0, "favorites": 0, "comments": 0, "liked_by": [], "favorited_by": [], "tip_board": [], # 打赏榜单 "views": 0, "daily_views": 0, "viewed_by": [], "daily_views_date": "" } tasks_db.insert(0, new_task) db.save_data("tasks.json", tasks_db) # 🗂️ 清除排序缓存 sort_cache.invalidate("tasks:") # 💳 记录冻结交易 create_task_transaction( db_session, current_user, "TASK_FREEZE", -task.totalPrice, task_id=task_id, task_title=task.title ) db_session.commit() logger.info(f"TASK_CREATE | publisher={current_user} | task={task_id} | price={task.totalPrice} | frozen={task.totalPrice}") return {"status": "success", "data": new_task, "frozen_amount": task.totalPrice} @router.put("/api/tasks/{task_id}") async def update_task(task_id: str, update_data: TaskUpdate, current_user: str = Depends(require_auth)): """ 更新任务(仅发布者可操作) - open状态:可修改所有字段 - in_progress/submitted状态:只能修改标题/描述/参考图/链接,不能改价格,并通知接单人 """ tasks_db = db.load_data("tasks.json", default_data=[]) for task in tasks_db: if task["id"] == task_id: if task.get("publisher") != current_user: raise HTTPException(status_code=403, detail="无权修改他人任务") status = task.get("status") if status not in ["open", "in_progress", "submitted"]: raise HTTPException(status_code=400, detail="当前状态不允许修改") # 记录修改内容(用于通知) changes = [] if update_data.title is not None and update_data.title != task.get("title"): task["title"] = update_data.title changes.append("标题") if update_data.description is not None and update_data.description != task.get("description"): task["description"] = update_data.description changes.append("描述") if update_data.referenceImages is not None: task["reference_images"] = update_data.referenceImages[:6] changes.append("参考图") if update_data.referenceLink is not None and update_data.referenceLink != task.get("reference_link"): task["reference_link"] = update_data.referenceLink changes.append("参考链接") # 价格和订金只能在open状态修改 if status == "open": if update_data.totalPrice is not None: # 验证价格 if update_data.totalPrice < 0: raise HTTPException(status_code=400, detail="任务价格不能低于0积分") task["total_price"] = update_data.totalPrice task["deposit_amount"] = int(update_data.totalPrice * task["deposit_ratio"] / 100) changes.append("价格") if update_data.depositRatio is not None and update_data.depositRatio in [10, 20, 30, 50]: task["deposit_ratio"] = update_data.depositRatio task["deposit_amount"] = int(task["total_price"] * update_data.depositRatio / 100) changes.append("订金比例") if update_data.deadline is not None: task["deadline"] = update_data.deadline changes.append("截止日期") db.save_data("tasks.json", tasks_db) # 🗂️ 清除排序缓存 sort_cache.invalidate("tasks:") # 📢 如果有接单人,发送通知 if task.get("assignee") and changes: add_notification(task.get("assignee"), { "type": "task_updated", "from_user": current_user, "target_item_id": task_id, "target_item_title": task.get("title", ""), "content": f"任务《{task.get('title', '')}》已更新:{'、'.join(changes)}" }) return {"status": "success"} raise HTTPException(status_code=404, detail="任务不存在") @router.delete("/api/tasks/{task_id}") async def cancel_task(task_id: str, current_user: str = Depends(require_auth), db_session: Session = Depends(get_db)): """ 取消任务(仅发布者可操作,且仅在 open 状态时可取消) 💳 P0修复:取消时退还冻结资金 """ tasks_db = db.load_data("tasks.json", default_data=[]) for task in tasks_db: if task["id"] == task_id: if task.get("publisher") != current_user: raise HTTPException(status_code=403, detail="无权取消他人任务") if task.get("status") != "open": raise HTTPException(status_code=400, detail="只能取消开放状态的任务") # 💳 P0修复:退还冻结资金 frozen_amount = task.get("frozen_amount", task.get("total_price", 0)) refund_success = False if frozen_amount > 0: try: wallet = db_session.query(Wallet).filter(Wallet.account == current_user).with_for_update().first() if wallet: wallet.frozen_balance = max(0, wallet.frozen_balance - frozen_amount) wallet.balance += frozen_amount # 记录退款交易 create_task_transaction( db_session, current_user, "TASK_CANCEL_REFUND", frozen_amount, task_id=task_id, task_title=task.get("title") ) db_session.commit() refund_success = True logger.info(f"TASK_CANCEL_REFUND | user={current_user} | task={task_id} | amount={frozen_amount}") except Exception as e: db_session.rollback() logger.error(f"TASK_CANCEL_REFUND_ERROR | task={task_id} | error={str(e)}") raise HTTPException(status_code=500, detail="退款失败,请稍后重试") else: refund_success = True # 无需退款 # 只有退款成功才更新状态 if refund_success: task["status"] = "cancelled" task["cancelled_at"] = int(time.time()) task["refunded"] = frozen_amount > 0 task["refund_amount"] = frozen_amount db.save_data("tasks.json", tasks_db) # 🗂️ 清除排序缓存 sort_cache.invalidate("tasks:") return {"status": "success", "refunded_amount": frozen_amount} raise HTTPException(status_code=404, detail="任务不存在") # ========================================== # 🙋 申请接单 # ========================================== @router.post("/api/tasks/{task_id}/apply") async def apply_task(task_id: str, message: str = None, current_user: str = Depends(require_auth)): """ 申请接单 """ tasks_db = db.load_data("tasks.json", default_data=[]) for task in tasks_db: if task["id"] == task_id: if task.get("status") != "open": raise HTTPException(status_code=400, detail="该任务不在开放接单状态") if task.get("publisher") == current_user: raise HTTPException(status_code=400, detail="不能申请自己发布的任务") # 检查是否已申请 applicants = task.get("applicants", []) if any(a["account"] == current_user for a in applicants): raise HTTPException(status_code=400, detail="您已申请过该任务") # 添加申请 applicants.append({ "account": current_user, "message": message or "", "applied_at": int(time.time()) }) task["applicants"] = applicants db.save_data("tasks.json", tasks_db) # 🗂️ 清除排序缓存 sort_cache.invalidate("tasks:") # 🔔 通知发布者:有人申请接单 add_notification(task.get("publisher"), { "type": "task_apply", "from_user": current_user, "target_item_id": task_id, "target_item_title": task.get("title", ""), "content": f"申请了您的任务《{task.get('title', '')}》" }) return {"status": "success", "message": "申请成功,等待发布者选择"} raise HTTPException(status_code=404, detail="任务不存在") @router.delete("/api/tasks/{task_id}/apply") async def cancel_apply(task_id: str, current_user: str = Depends(require_auth)): """ 取消申请接单 """ tasks_db = db.load_data("tasks.json", default_data=[]) for task in tasks_db: if task["id"] == task_id: if task.get("status") != "open": raise HTTPException(status_code=400, detail="任务已开始,无法取消申请") applicants = task.get("applicants", []) new_applicants = [a for a in applicants if a["account"] != current_user] if len(new_applicants) == len(applicants): raise HTTPException(status_code=400, detail="您未申请该任务") task["applicants"] = new_applicants db.save_data("tasks.json", tasks_db) # 🗂️ 清除排序缓存 sort_cache.invalidate("tasks:") return {"status": "success"} raise HTTPException(status_code=404, detail="任务不存在") # ========================================== # 🎯 指派接单者 # ========================================== @router.post("/api/tasks/{task_id}/assign") async def assign_task(task_id: str, assignee: str, current_user: str = Depends(require_auth), db_session: Session = Depends(get_db)): """ 发布者选择接单者 💳 P6支付增强:从冻结金额中扣除订金,记录交易 """ tasks_db = db.load_data("tasks.json", default_data=[]) for task in tasks_db: if task["id"] == task_id: if task.get("publisher") != current_user: raise HTTPException(status_code=403, detail="只有发布者可以指派接单者") if task.get("status") != "open": raise HTTPException(status_code=400, detail="该任务不在开放状态") # 验证接单者是否在申请列表中 applicants = task.get("applicants", []) if not any(a["account"] == assignee for a in applicants): raise HTTPException(status_code=400, detail="该用户未申请此任务") deposit = task.get("deposit_amount", 0) # 💳 从冻结金额中扣除订金(订金暂时不给接单者,待任务完成后一起给) wallet = db_session.query(Wallet).filter(Wallet.account == current_user).with_for_update().first() if not wallet or wallet.frozen_balance < deposit: raise HTTPException(status_code=400, detail="冻结余额异常") # 订金从冻结金额中扣除(但还不给接单者,待任务完成后一起支付) wallet.frozen_balance -= deposit # 获取接单者用户名 users_db = db.load_data("users.json", default_data={}) assignee_info = users_db.get(assignee, {}) assignee_name = assignee_info.get("name", assignee) # 💳 记录订金支付交易 create_task_transaction( db_session, current_user, "TASK_DEPOSIT", -deposit, related_account=assignee, task_id=task_id, task_title=task.get("title"), related_user_name=assignee_name ) # 更新任务状态 task["assignee"] = assignee task["assigned_at"] = int(time.time()) task["status"] = "in_progress" task["deposit_paid"] = deposit # 💳 记录已支付订金 db.save_data("tasks.json", tasks_db) db_session.commit() # 🗂️ 清除排序缓存 sort_cache.invalidate("tasks:") # 🔔 通知接单者:被指派接单 add_notification(assignee, { "type": "task_assigned", "from_user": current_user, "target_item_id": task_id, "target_item_title": task.get("title", ""), "content": f"您已被选为任务《{task.get('title', '')}》的接单者,订金{deposit}积分已缓冲" }) logger.info(f"TASK_ASSIGN | publisher={current_user} | assignee={assignee} | task={task_id} | deposit={deposit}") return {"status": "success", "message": f"已指派 {assignee} 接单,订金 {deposit} 积分已开始缓冲"} raise HTTPException(status_code=404, detail="任务不存在") # ========================================== # 📤 提交成果 # ========================================== @router.post("/api/tasks/{task_id}/submit") async def submit_task(task_id: str, deliverables: list, note: str = None, current_user: str = Depends(require_auth)): """ 接单者提交成果 """ tasks_db = db.load_data("tasks.json", default_data=[]) for task in tasks_db: if task["id"] == task_id: if task.get("assignee") != current_user: raise HTTPException(status_code=403, detail="只有接单者可以提交成果") if task.get("status") != "in_progress": raise HTTPException(status_code=400, detail="任务状态不允许提交") if not deliverables: raise HTTPException(status_code=400, detail="请上传交付成果") task["deliverables"] = deliverables task["submit_note"] = note task["submitted_at"] = int(time.time()) task["status"] = "submitted" db.save_data("tasks.json", tasks_db) # 🗂️ 清除排序缓存 sort_cache.invalidate("tasks:") # 🔔 通知发布者:接单者已提交成果 add_notification(task.get("publisher"), { "type": "task_submitted", "from_user": current_user, "target_item_id": task_id, "target_item_title": task.get("title", ""), "content": f"已提交任务《{task.get('title', '')}》的成果,请验收" }) return {"status": "success", "message": "成果已提交,等待发布者验收"} raise HTTPException(status_code=404, detail="任务不存在") # ========================================== # ✅ 验收成果 # ========================================== @router.post("/api/tasks/{task_id}/accept") async def accept_task(task_id: str, is_accepted: bool, feedback: str = None, current_user: str = Depends(require_auth), db_session: Session = Depends(get_db)): """ 发布者验收成果 - is_accepted=True: 验收通过,支付尾款给接单者 - is_accepted=False: 验收不通过,可以要求修改或发起申诉 💳 P6支付增强:使用SQL钱包支付,记录交易流水,发送支付通知 💳 P0修复:事务完整性保护,失败时回滚 """ tasks_db = db.load_data("tasks.json", default_data=[]) for task in tasks_db: if task["id"] == task_id: if task.get("publisher") != current_user: raise HTTPException(status_code=403, detail="只有发布者可以验收") if task.get("status") != "submitted": raise HTTPException(status_code=400, detail="任务状态不允许验收") task["feedback"] = feedback assignee_account = task.get("assignee") total_price = task.get("total_price", 0) deposit = task.get("deposit_paid", task.get("deposit_amount", 0)) # 优先使用已记录的支付订金 remaining = total_price - deposit # 尾款 if is_accepted: # 💳 验收通过:支付尾款给接单者 # 💳 P0修复:使用事务保护,失败则回滚 try: publisher_wallet = db_session.query(Wallet).filter(Wallet.account == current_user).with_for_update().first() if not publisher_wallet or publisher_wallet.frozen_balance < remaining: raise HTTPException(status_code=400, detail="冻结余额不足支付尾款") # 扣除发布者尾款(从冻结余额) publisher_wallet.frozen_balance -= remaining # 给接单者全款(订金+尾款) assignee_wallet = db_session.query(Wallet).filter(Wallet.account == assignee_account).with_for_update().first() if not assignee_wallet: assignee_wallet = Wallet(account=assignee_account, balance=0) db_session.add(assignee_wallet) db_session.flush() assignee_wallet.balance += total_price # 全款进入可用余额 assignee_wallet.task_balance += total_price # 累计任务收益统计(只增不减) # 获取双方用户名 users_db = db.load_data("users.json", default_data={}) publisher_info = users_db.get(current_user, {}) assignee_info = users_db.get(assignee_account, {}) publisher_name = publisher_info.get("name", current_user) assignee_name = assignee_info.get("name", assignee_account) # 💳 记录交易流水 # 1. 发布者支付尾款 create_task_transaction( db_session, current_user, "TASK_PAYMENT", -remaining, related_account=assignee_account, task_id=task_id, task_title=task.get("title"), related_user_name=assignee_name ) # 2. 接单者收入 create_task_transaction( db_session, assignee_account, "TASK_INCOME", total_price, related_account=current_user, task_id=task_id, task_title=task.get("title"), related_user_name=publisher_name ) task["status"] = "completed" task["completed_at"] = int(time.time()) # 💳 P0修复:先commit SQL事务(资金为真),再最佳努力写入JSON db_session.commit() # 再最佳努力写入JSON try: db.save_data("tasks.json", tasks_db) except Exception as e: logger.warning(f"TASK_COMPLETE_JSON_SAVE_FAILED | 资金已结算但任务状态未更新,需手动修复: {str(e)}") logger.info(f"TASK_COMPLETE | publisher={current_user} | assignee={assignee_account} | task={task_id} | total={total_price}") # 🗂️ 清除排序缓存(任务状态变化) sort_cache.invalidate("tasks:") message = f"验收通过,已支付 {total_price} 积分给接单者" # 🔔 支付通知:接单者收到款项 add_notification(assignee_account, { "type": "task_payment", "from_user": current_user, "target_item_id": task_id, "target_item_title": task.get("title", ""), "content": f"💰 任务《{task.get('title', '')}》验收通过,{total_price}积分已到账" }) except HTTPException: raise # 重新抛出 HTTP 异常 except Exception as e: db_session.rollback() logger.error(f"TASK_ACCEPT_ERROR | task={task_id} | error={str(e)}") raise HTTPException(status_code=500, detail="验收处理失败,请稍后重试") else: # 验收不通过:回到进行中状态,允许修改后重新提交 task["status"] = "in_progress" task["deliverables"] = [] task["submitted_at"] = None message = "验收不通过,接单者可以修改后重新提交" db.save_data("tasks.json", tasks_db) # 🗂️ 清除排序缓存(任务状态变化) sort_cache.invalidate("tasks:") # 🔔 通知接单者:验收未通过 add_notification(assignee_account, { "type": "task_rejected", "from_user": current_user, "target_item_id": task_id, "target_item_title": task.get("title", ""), "content": f"任务《{task.get('title', '')}》验收未通过,请修改后重新提交" }) return {"status": "success", "message": message} raise HTTPException(status_code=404, detail="任务不存在") # ========================================== # ⚖️ 申诉仲裁系统 # ========================================== @router.post("/api/tasks/{task_id}/dispute") async def create_dispute(task_id: str, reason: str, evidence: list = None, current_user: str = Depends(require_auth)): """ 发起申诉(发布者或接单者均可) - evidence: 证据图片URL列表(可选) """ tasks_db = db.load_data("tasks.json", default_data=[]) disputes_db = db.load_data("disputes.json", default_data=[]) for task in tasks_db: if task["id"] == task_id: # 验证权限 if current_user not in [task.get("publisher"), task.get("assignee")]: raise HTTPException(status_code=403, detail="只有发布者或接单者可以发起申诉") # 验证状态 if task.get("status") not in ["in_progress", "submitted"]: raise HTTPException(status_code=400, detail="当前状态不允许申诉") # 检查是否已有申诉 if task.get("dispute_id"): raise HTTPException(status_code=400, detail="该任务已有进行中的申诉") # 确定角色 is_publisher = current_user == task.get("publisher") role = "publisher" if is_publisher else "assignee" # 创建申诉 dispute = { "id": f"dispute_{int(time.time())}_{uuid.uuid4().hex[:6]}", "task_id": task_id, "task_title": task.get("title", ""), "publisher": task.get("publisher"), "assignee": task.get("assignee"), "initiator": current_user, "initiator_role": role, "reason": reason, "evidence": (evidence or [])[:6], "status": "pending", # pending/responded/resolved "created_at": int(time.time()), # 被申诉方回应 "response": None, "response_evidence": [], "responded_at": None, # 仲裁结果 "resolution": None, # favor_initiator/favor_respondent/split "resolution_ratio": None, # 分成比例(如果是split) "resolution_note": None, "resolved_by": None, "resolved_at": None } disputes_db.append(dispute) task["status"] = "disputed" task["dispute_id"] = dispute["id"] db.save_data("tasks.json", tasks_db) db.save_data("disputes.json", disputes_db) # 🗂️ 清除排序缓存(任务状态变为争议中) sort_cache.invalidate("tasks:") # 🔔 通知对方:已发起申诉 other_party = task.get("assignee") if is_publisher else task.get("publisher") add_notification(other_party, { "type": "task_disputed", "from_user": current_user, "target_item_id": task_id, "target_item_title": task.get("title", ""), "content": f"对任务《{task.get('title', '')}》发起了申诉,请回应" }) return {"status": "success", "data": dispute, "message": "申诉已提交,等待对方回应"} raise HTTPException(status_code=404, detail="任务不存在") @router.get("/api/disputes/{dispute_id}") async def get_dispute_detail(dispute_id: str): """ 获取申诉详情 """ disputes_db = db.load_data("disputes.json", default_data=[]) users_db = db.load_data("users.json", default_data={}) # users_db 已经是 {account: user_info} 格式,直接使用 user_map = users_db for dispute in disputes_db: if dispute["id"] == dispute_id: publisher_info = user_map.get(dispute.get("publisher"), {}) assignee_info = user_map.get(dispute.get("assignee"), {}) return { "status": "success", "data": { **dispute, "publisher_name": publisher_info.get("name", dispute.get("publisher")), "publisher_avatar": publisher_info.get("avatarDataUrl", ""), "assignee_name": assignee_info.get("name", dispute.get("assignee")), "assignee_avatar": assignee_info.get("avatarDataUrl", "") } } raise HTTPException(status_code=404, detail="申诉不存在") @router.post("/api/disputes/{dispute_id}/respond") async def respond_dispute(dispute_id: str, response: str, evidence: list = None, current_user: str = Depends(require_auth)): """ 被申诉方回应申诉 """ disputes_db = db.load_data("disputes.json", default_data=[]) for dispute in disputes_db: if dispute["id"] == dispute_id: # 验证权限:必须是被申诉方 is_publisher = dispute.get("initiator_role") == "publisher" respondent = dispute.get("assignee") if is_publisher else dispute.get("publisher") if current_user != respondent: raise HTTPException(status_code=403, detail="只有被申诉方可以回应") if dispute.get("status") != "pending": raise HTTPException(status_code=400, detail="该申诉已回应或已解决") dispute["response"] = response dispute["response_evidence"] = (evidence or [])[:6] dispute["responded_at"] = int(time.time()) dispute["status"] = "responded" db.save_data("disputes.json", disputes_db) # 🗂️ 清除排序缓存(争议状态变化可能影响排序) sort_cache.invalidate("tasks:") # 🔔 通知申诉方:对方已回应 add_notification(dispute.get("initiator"), { "type": "dispute_responded", "from_user": current_user, "target_item_id": dispute.get("task_id"), "target_item_title": dispute.get("task_title", ""), "content": f"对方已回应您关于任务《{dispute.get('task_title', '')}》的申诉" }) return {"status": "success", "message": "回应已提交,等待管理员仲裁"} raise HTTPException(status_code=404, detail="申诉不存在") @router.get("/api/admin/disputes") async def get_admin_disputes(status: str = None, current_user: str = Depends(require_auth)): """ 管理员获取申诉列表 - status: pending/responded/resolved """ if not is_admin(current_user): raise HTTPException(status_code=403, detail="需要管理员权限") disputes_db = db.load_data("disputes.json", default_data=[]) users_db = db.load_data("users.json", default_data={}) # users_db 已经是 {account: user_info} 格式,直接使用 user_map = users_db # 筛选 filtered = disputes_db if status: filtered = [d for d in filtered if d.get("status") == status] # 按时间倒序 filtered = sorted(filtered, key=lambda x: x.get("created_at", 0), reverse=True) # 附加用户信息 result = [] for dispute in filtered: publisher_info = user_map.get(dispute.get("publisher"), {}) assignee_info = user_map.get(dispute.get("assignee"), {}) result.append({ **dispute, "publisher_name": publisher_info.get("name", dispute.get("publisher")), "assignee_name": assignee_info.get("name", dispute.get("assignee")) }) return {"status": "success", "data": result} @router.post("/api/admin/disputes/{dispute_id}/resolve") async def resolve_dispute(dispute_id: str, resolution: str, ratio: int = None, note: str = None, current_user: str = Depends(require_auth), db_session: Session = Depends(get_db)): """ 管理员裁决申诉 - resolution: favor_initiator(支持申诉方) / favor_respondent(支持被申诉方) / split(按比例分成) - ratio: 分成比例(0-100),表示申诉方获得的比例。仅split时有效 - note: 裁决说明 💳 P0改造:使用 SQL Wallet 系统处理资金,替代 JSON balance 操作 """ if not is_admin(current_user): raise HTTPException(status_code=403, detail="需要管理员权限") if resolution not in ["favor_initiator", "favor_respondent", "split"]: raise HTTPException(status_code=400, detail="无效的裁决结果") if resolution == "split" and (ratio is None or ratio < 0 or ratio > 100): raise HTTPException(status_code=400, detail="分成比例必须在0-100之间") disputes_db = db.load_data("disputes.json", default_data=[]) tasks_db = db.load_data("tasks.json", default_data=[]) users_db = db.load_data("users.json", default_data={}) for dispute in disputes_db: if dispute["id"] == dispute_id: if dispute.get("status") == "resolved": raise HTTPException(status_code=400, detail="该申诉已裁决") # 查找关联任务 task = next((t for t in tasks_db if t["id"] == dispute.get("task_id")), None) if not task: raise HTTPException(status_code=404, detail="关联任务不存在") total_price = task.get("total_price", 0) deposit = task.get("deposit_amount", 0) publisher_account = dispute.get("publisher") assignee_account = dispute.get("assignee") is_publisher_initiator = dispute.get("initiator_role") == "publisher" # 💳 P0改造:使用 SQL Wallet 处理资金 try: # 获取双方钱包(带悲观锁) publisher_wallet = db_session.query(Wallet).filter(Wallet.account == publisher_account).with_for_update().first() assignee_wallet = db_session.query(Wallet).filter(Wallet.account == assignee_account).with_for_update().first() # 如果钱包不存在,创建新钱包 if not publisher_wallet: publisher_wallet = Wallet(account=publisher_account, balance=0) db_session.add(publisher_wallet) db_session.flush() if not assignee_wallet: assignee_wallet = Wallet(account=assignee_account, balance=0) db_session.add(assignee_wallet) db_session.flush() # 获取双方用户名 publisher_info = users_db.get(publisher_account, {}) assignee_info = users_db.get(assignee_account, {}) publisher_name = publisher_info.get("name", publisher_account) assignee_name = assignee_info.get("name", assignee_account) # 计算资金分配 if resolution == "favor_initiator": # 支持申诉方 if is_publisher_initiator: # 发布者胜诉:退还订金(从冻结余额释放到可用余额) publisher_wallet.frozen_balance = max(0, publisher_wallet.frozen_balance - deposit) publisher_wallet.balance += deposit # 记录交易 create_task_transaction( db_session, publisher_account, "TASK_REFUND", deposit, task_id=task.get("id"), task_title=task.get("title"), related_user_name=assignee_name ) initiator_amount = deposit respondent_amount = 0 else: # 接单者胜诉:获得全款 remaining = total_price - deposit # 发布者支付尾款(从冻结余额扣除) publisher_wallet.frozen_balance = max(0, publisher_wallet.frozen_balance - remaining) # 接单者获得全款 assignee_wallet.balance += total_price assignee_wallet.task_balance += total_price # 记录交易 create_task_transaction( db_session, publisher_account, "TASK_PAYMENT", -remaining, related_account=assignee_account, task_id=task.get("id"), task_title=task.get("title"), related_user_name=assignee_name ) create_task_transaction( db_session, assignee_account, "TASK_INCOME", total_price, related_account=publisher_account, task_id=task.get("id"), task_title=task.get("title"), related_user_name=publisher_name ) initiator_amount = total_price respondent_amount = 0 elif resolution == "favor_respondent": # 支持被申诉方 if is_publisher_initiator: # 接单者胜诉:获得全款 remaining = total_price - deposit # 发布者支付尾款(从冻结余额扣除) publisher_wallet.frozen_balance = max(0, publisher_wallet.frozen_balance - remaining) # 接单者获得全款 assignee_wallet.balance += total_price assignee_wallet.task_balance += total_price # 记录交易 create_task_transaction( db_session, publisher_account, "TASK_PAYMENT", -remaining, related_account=assignee_account, task_id=task.get("id"), task_title=task.get("title"), related_user_name=assignee_name ) create_task_transaction( db_session, assignee_account, "TASK_INCOME", total_price, related_account=publisher_account, task_id=task.get("id"), task_title=task.get("title"), related_user_name=publisher_name ) initiator_amount = 0 respondent_amount = total_price else: # 发布者胜诉:退还订金(从冻结余额释放到可用余额) publisher_wallet.frozen_balance = max(0, publisher_wallet.frozen_balance - deposit) publisher_wallet.balance += deposit # 记录交易 create_task_transaction( db_session, publisher_account, "TASK_REFUND", deposit, task_id=task.get("id"), task_title=task.get("title"), related_user_name=assignee_name ) initiator_amount = 0 respondent_amount = deposit else: # 按比例分成 initiator_share = int(total_price * ratio / 100) respondent_share = total_price - initiator_share remaining = total_price - deposit # 发布者支付尾款(从冻结余额扣除) publisher_wallet.frozen_balance = max(0, publisher_wallet.frozen_balance - remaining) # 发布者获得退款部分,接单者获得报酬部分 if is_publisher_initiator: # 申诉方是发布者 pub_refund = initiator_share assignee_earn = respondent_share else: # 申诉方是接单者 assignee_earn = initiator_share pub_refund = respondent_share # 分配资金 publisher_wallet.balance += pub_refund assignee_wallet.balance += assignee_earn assignee_wallet.task_balance += assignee_earn # 记录交易 create_task_transaction( db_session, publisher_account, "TASK_PAYMENT", -remaining, related_account=assignee_account, task_id=task.get("id"), task_title=task.get("title"), related_user_name=assignee_name ) create_task_transaction( db_session, assignee_account, "TASK_INCOME", assignee_earn, related_account=publisher_account, task_id=task.get("id"), task_title=task.get("title"), related_user_name=publisher_name ) initiator_amount = initiator_share respondent_amount = respondent_share # 提交 SQL 事务 db_session.commit() except Exception as e: db_session.rollback() logger.error(f"DISPUTE_RESOLVE_ERROR | dispute={dispute_id} | error={str(e)}") raise HTTPException(status_code=500, detail="仲裁资金处理失败,请稍后重试") # 更新申诉状态 dispute["resolution"] = resolution dispute["resolution_ratio"] = ratio dispute["resolution_note"] = note dispute["resolved_by"] = current_user dispute["resolved_at"] = int(time.time()) dispute["status"] = "resolved" # 更新任务状态 task["status"] = "completed" task["completed_at"] = int(time.time()) task["feedback"] = f"申诉裁决:{note or '无'}(结果:{resolution})" db.save_data("disputes.json", disputes_db) db.save_data("tasks.json", tasks_db) # 🗂️ 清除排序缓存(任务状态变为已完成) sort_cache.invalidate("tasks:") # 🔔 通知双方:仲裁结果 resolution_text = { "favor_initiator": "支持申诉方", "favor_respondent": "支持被申诉方", "split": f"双方协商分成({ratio}%:{100-ratio}%)" }.get(resolution, "已裁决") # 通知申诉方 add_notification(dispute.get("initiator"), { "type": "dispute_resolved", "from_user": current_user, "target_item_id": dispute.get("task_id"), "target_item_title": dispute.get("task_title", ""), "content": f"您的申诉已裁决:{resolution_text},您获得{initiator_amount}积分" }) # 通知被申诉方 respondent_account = dispute.get("assignee") if is_publisher_initiator else dispute.get("publisher") add_notification(respondent_account, { "type": "dispute_resolved", "from_user": current_user, "target_item_id": dispute.get("task_id"), "target_item_title": dispute.get("task_title", ""), "content": f"任务申诉已裁决:{resolution_text},您获得{respondent_amount}积分" }) logger.info(f"DISPUTE_RESOLVED | dispute={dispute_id} | resolution={resolution} | initiator_amount={initiator_amount} | respondent_amount={respondent_amount}") return {"status": "success", "message": f"裁决完成:{resolution_text}"} raise HTTPException(status_code=404, detail="申诉不存在") # ========================================== # 📊 我的任务 # ========================================== @router.get("/api/my-tasks") async def get_my_tasks(role: str = "publisher", current_user: str = Depends(require_auth)): """ 获取我的任务 - role=publisher: 我发布的任务 - role=assignee: 我接的任务 """ tasks_db = db.load_data("tasks.json", default_data=[]) if role == "publisher": my_tasks = [t for t in tasks_db if t.get("publisher") == current_user] else: my_tasks = [t for t in tasks_db if t.get("assignee") == current_user] # 按创建时间倒序 my_tasks = sorted(my_tasks, key=lambda x: x.get("created_at", 0), reverse=True) # 过滤敏感字段(列表接口过滤 viewed_by、liked_by、favorited_by) result = [] for task in my_tasks: task_data = {k: v for k, v in task.items() if k not in ["viewed_by", "liked_by", "favorited_by"]} result.append(task_data) return {"status": "success", "data": result} @router.post("/api/tasks/{task_id}/view") async def record_task_view(task_id: str, current_user: str = Depends(require_auth)): """ 记录任务访问量 👁️ 需要用户认证,每个用户只计算一次总访问量,日访问量每次调用都增加 """ result = record_view("tasks.json", task_id, current_user) if result is None: raise HTTPException(status_code=404, detail="任务不存在") # 🗂️ 清除排序缓存(浏览量变化可能影响排序) sort_cache.invalidate("tasks:") return {"status": "success", "views": result["views"], "daily_views": result["daily_views"]} # ========================================== # ❤️ 互动接口(点赞/收藏) # ========================================== @router.post("/api/tasks/{task_id}/like") async def toggle_like(task_id: str, current_user: str = Depends(require_auth)): """ 点赞/取消点赞(原子操作,并发安全) """ result_container = [None] def updater(data): for task in data: if task["id"] == task_id: liked_by = task.get("liked_by", []) if current_user in liked_by: liked_by.remove(current_user) task["likes"] = max(0, task.get("likes", 0) - 1) action = "unliked" else: liked_by.append(current_user) task["likes"] = task.get("likes", 0) + 1 action = "liked" task["liked_by"] = liked_by result_container[0] = {"status": "success", "action": action, "likes": task["likes"]} return result_container[0] = None # 未找到任务 db.atomic_update("tasks.json", updater, default_data=[]) if result_container[0] is None: raise HTTPException(status_code=404, detail="任务不存在") # 🗂️ 清除排序缓存(点赞数变化可能影响排序) sort_cache.invalidate("tasks:") return result_container[0] @router.post("/api/tasks/{task_id}/favorite") async def toggle_favorite(task_id: str, current_user: str = Depends(require_auth)): """ 收藏/取消收藏(原子操作,并发安全) """ result_container = [None] def updater(data): for task in data: if task["id"] == task_id: favorited_by = task.get("favorited_by", []) if current_user in favorited_by: favorited_by.remove(current_user) task["favorites"] = max(0, task.get("favorites", 0) - 1) action = "unfavorited" else: favorited_by.append(current_user) task["favorites"] = task.get("favorites", 0) + 1 action = "favorited" task["favorited_by"] = favorited_by result_container[0] = {"status": "success", "action": action, "favorites": task["favorites"]} return result_container[0] = None # 未找到任务 db.atomic_update("tasks.json", updater, default_data=[]) if result_container[0] is None: raise HTTPException(status_code=404, detail="任务不存在") # 🗂️ 清除排序缓存(收藏数变化可能影响排序) sort_cache.invalidate("tasks:") return result_container[0] # ========================================== # 🎁 打赏接口 # ========================================== @router.post("/api/tasks/{task_id}/tip") async def tip_task(task_id: str, amount: int, is_anon: bool = False, current_user: str = Depends(require_auth), db_session: Session = Depends(get_db)): """ 打赏任务(原子操作,并发安全)- 使用 SQL Wallet 系统 """ if amount <= 0: raise HTTPException(status_code=400, detail="打赏金额必须大于0") result_container = [None] publisher_account = [None] # 用于在原子操作外获取发布者账号 def updater(data): # 在锁内查找任务 target_task = None for task in data: if task["id"] == task_id: target_task = task break if not target_task: result_container[0] = {"error": "not_found"} return # 不能打赏自己 if target_task.get("publisher") == current_user: result_container[0] = {"error": "self_tip"} return publisher_account[0] = target_task.get("publisher") # 更新打赏榜单 tip_board = target_task.get("tip_board", []) existing = next((t for t in tip_board if t["account"] == current_user), None) if existing: existing["amount"] += amount else: tip_board.append({"account": current_user, "amount": amount, "is_anon": is_anon}) tip_board.sort(key=lambda x: x["amount"], reverse=True) target_task["tip_board"] = tip_board result_container[0] = {"status": "success", "message": f"成功打赏 {amount} 积分"} db.atomic_update("tasks.json", updater, default_data=[]) result = result_container[0] if result is None or result.get("error") == "not_found": raise HTTPException(status_code=404, detail="任务不存在") if result.get("error") == "self_tip": raise HTTPException(status_code=400, detail="不能打赏自己的任务") # 💳 使用 SQL Wallet 系统处理余额转账 try: publisher = publisher_account[0] if not publisher: raise HTTPException(status_code=404, detail="作者账户不存在") # 🔒 P1幂等性防护:检查最近5秒内是否存在相同交易 recent_cutoff = datetime.datetime.utcnow() - datetime.timedelta(seconds=5) duplicate_tx = db_session.query(Transaction).filter( Transaction.account == current_user, Transaction.tx_type == "TIP_OUT", Transaction.amount == -amount, Transaction.related_account == publisher, Transaction.created_at >= recent_cutoff ).first() if duplicate_tx: return {"status": "success", "message": "打赏已处理(重复请求)"} # 🔒 并发安全:使用悲观锁获取双方钱包 tipper_wallet = db_session.query(Wallet).filter(Wallet.account == current_user).with_for_update().first() author_wallet = db_session.query(Wallet).filter(Wallet.account == publisher).with_for_update().first() if not tipper_wallet or tipper_wallet.balance < amount: raise HTTPException(status_code=400, detail="余额不足") if not author_wallet: # 如果作者钱包不存在,创建一个 author_wallet = Wallet(account=publisher, balance=0, earn_balance=0, tip_balance=0, frozen_balance=0) db_session.add(author_wallet) # 执行转账 tipper_wallet.balance -= amount author_wallet.balance += amount # 实际收入进统一余额 author_wallet.tip_balance += amount # 累计打赏收益统计(只增不减) # 创建交易记录 tx_id_tipper = f"TIP_OUT_{int(time.time())}_{uuid.uuid4().hex[:6]}" tx_id_author = f"TIP_IN_{int(time.time())}_{uuid.uuid4().hex[:6]}" # 获取最后交易记录的哈希 last_tx_tipper = db_session.query(Transaction).filter(Transaction.account == current_user).order_by(Transaction.created_at.desc()).first() last_tx_author = db_session.query(Transaction).filter(Transaction.account == publisher).order_by(Transaction.created_at.desc()).first() prev_hash_tipper = last_tx_tipper.tx_hash if last_tx_tipper else "GENESIS_HASH" prev_hash_author = last_tx_author.tx_hash if last_tx_author else "GENESIS_HASH" # 获取用户信息和任务标题 users_db = db.load_data("users.json", default_data={}) tasks_db = db.load_data("tasks.json", default_data=[]) author_info = users_db.get(publisher, {}) tipper_info = users_db.get(current_user, {}) author_name = author_info.get("name", publisher) tipper_name = tipper_info.get("name", current_user) task_title = None for task in tasks_db: if task["id"] == task_id: task_title = task.get("title") break # 打赏方交易记录 (TIP_OUT) tx_tipper = Transaction( tx_id=tx_id_tipper, account=current_user, tx_type="TIP_OUT", amount=-amount, related_account=publisher, item_id=task_id, prev_hash=prev_hash_tipper, tx_hash=calculate_tx_hash(tx_id_tipper, current_user, "TIP_OUT", -amount, prev_hash_tipper), description=f"打赏给 {author_name}" + (f" 的任务《{task_title}》" if task_title else ""), item_title=task_title, item_type="task", related_user_name=author_name ) # 接收方交易记录 (TIP_IN) tx_author = Transaction( tx_id=tx_id_author, account=publisher, tx_type="TIP_IN", amount=amount, related_account=current_user, item_id=task_id, prev_hash=prev_hash_author, tx_hash=calculate_tx_hash(tx_id_author, publisher, "TIP_IN", amount, prev_hash_author), description=f"收到 {tipper_name} 的任务打赏" + (f" ({task_title})" if task_title else ""), item_title=task_title, item_type="task", related_user_name=tipper_name if not is_anon else "匿名用户" ) db_session.add(tx_tipper) db_session.add(tx_author) db_session.commit() # 📝 审计日志 logger.info(f"TASK_TIP | from={current_user} | to={publisher} | amount={amount} | task={task_id} | anon={is_anon}") # 🔔 打赏通知(考虑匿名) if not is_anon: add_notification(publisher, { "type": "tip", "from_user": current_user, "target_item_id": task_id, "target_item_title": task_title or "", "content": f"您收到来自 {tipper_name} 的 {amount} 积分任务打赏" }) else: add_notification(publisher, { "type": "tip", "from_user": "anonymous", "target_item_id": task_id, "target_item_title": "", "content": f"您收到了一份 {amount} 积分的匿名任务打赏" }) # 🗂️ 清除排序缓存(打赏可能影响排序) sort_cache.invalidate("tasks:") except HTTPException: db_session.rollback() raise except Exception as e: db_session.rollback() logger.error(f"TASK_TIP_ERROR | from={current_user} | task={task_id} | amount={amount} | error={str(e)}") raise HTTPException(status_code=500, detail="打赏处理失败,请稍后重试") return result # ========================================== # 💬 评论接口(复用通用评论系统) # ========================================== @router.get("/api/tasks/{task_id}/comments") async def get_task_comments(task_id: str): """ 获取任务评论 """ comments_db = db.load_data("comments.json", default_data={}) users_db = db.load_data("users.json", default_data={}) # users_db 已经是 {account: user_info} 格式,直接使用 user_map = users_db # comments_db 是 {item_id: [comments]} 格式 task_comments = comments_db.get(task_id, []) # 附加用户信息 result = [] for c in task_comments: author_info = user_map.get(c.get("author"), {}) result.append({ **c, "author_name": author_info.get("name", c.get("author")), "author_avatar": author_info.get("avatarDataUrl", "") }) return {"status": "success", "data": result} @router.post("/api/tasks/{task_id}/comments") async def add_task_comment(task_id: str, content: str, current_user: str = Depends(require_auth)): """ 添加任务评论 """ if not content or not content.strip(): raise HTTPException(status_code=400, detail="评论内容不能为空") tasks_db = db.load_data("tasks.json", default_data=[]) comments_db = db.load_data("comments.json", default_data={}) # 检查任务是否存在 task_exists = any(t["id"] == task_id for t in tasks_db) if not task_exists: raise HTTPException(status_code=404, detail="任务不存在") new_comment = { "id": f"comment_{int(time.time())}_{uuid.uuid4().hex[:6]}", "author": current_user, "content": content.strip(), "created_at": int(time.time()) } # comments_db 是 {item_id: [comments]} 格式 task_comments = comments_db.get(task_id, []) task_comments.insert(0, new_comment) comments_db[task_id] = task_comments db.save_data("comments.json", comments_db) # 🗂️ 清除排序缓存(评论数变化可能影响排序) sort_cache.invalidate("tasks:") # 更新任务评论数 for task in tasks_db: if task["id"] == task_id: task["comments"] = task.get("comments", 0) + 1 break db.save_data("tasks.json", tasks_db) return {"status": "success", "data": new_comment}