ComfyUI-Ranking-API / router_tasks.py
ZHIWEI666's picture
Upload 4 files
3c40bd1 verified
# 云端Space代码/router_tasks.py
# ==========================================
# 📝 任务榜API路由
# ==========================================
# 功能:任务发布、接单、提交、验收、申诉全流程
# 状态:open → in_progress → submitted → completed/disputed
# 💳 P6支付增强:
# - 发布时冻结全额,避免支付不足
# - 记录每笔交易流水
# - 超时自动退款
# - 支付通知推送
# ==========================================
from fastapi import APIRouter, HTTPException, Depends, Query
from sqlalchemy.orm import Session
from models import TaskCreate, TaskUpdate, TaskApply, TaskAssign, TaskSubmit, TaskAccept
import 数据库连接 as db
from 安全认证 import require_auth, is_admin
from notifications import add_notification
from database_sql import get_db
from models_sql import Wallet, Transaction
from db_utils import record_view, sort_cache
import time
import uuid
import hashlib
import logging
router = APIRouter()
# 📝 审计日志
logger = logging.getLogger("ComfyUI-Ranking.Tasks")
# ==========================================
# 💳 交易记录工具函数
# ==========================================
def calculate_tx_hash(tx_id, account, tx_type, amount, prev_hash):
data = f"{tx_id}{account}{tx_type}{amount}{prev_hash}"
return hashlib.sha256(data.encode()).hexdigest()
def create_task_transaction(db_session: Session, account: str, tx_type: str, amount: int,
related_account: str = None, task_id: str = None,
task_title: str = None, related_user_name: str = None):
"""
创建任务相关交易记录
tx_type: TASK_FREEZE, TASK_DEPOSIT, TASK_PAYMENT, TASK_INCOME, TASK_REFUND
"""
tx_id = f"TASK_{tx_type}_{int(time.time())}_{uuid.uuid4().hex[:6]}"
last_tx = db_session.query(Transaction).filter(Transaction.account == account).order_by(Transaction.created_at.desc()).first()
prev_hash = last_tx.tx_hash if last_tx else "GENESIS_HASH"
tx_hash = calculate_tx_hash(tx_id, account, tx_type, amount, prev_hash)
# 根据 tx_type 构造 description
type_labels = {
"TASK_FREEZE": "任务冻结",
"TASK_DEPOSIT": "任务订金",
"TASK_PAYMENT": "任务尾款",
"TASK_INCOME": "任务收入",
"TASK_REFUND": "任务退款",
"TASK_CANCEL_REFUND": "任务取消退款",
}
label = type_labels.get(tx_type, tx_type)
desc = f"{label}: {task_title}" if task_title else label
new_tx = Transaction(
tx_id=tx_id,
account=account,
tx_type=tx_type,
amount=amount,
related_account=related_account,
item_id=task_id, # 复用 item_id 字段存储任务ID
prev_hash=prev_hash,
tx_hash=tx_hash,
description=desc,
item_title=task_title,
item_type="task",
related_user_name=related_user_name
)
db_session.add(new_tx)
logger.info(f"TASK_TX | type={tx_type} | account={account} | amount={amount} | task={task_id} | tx={tx_id}")
return tx_id
# ==========================================
# 📋 任务状态常量
# ==========================================
TASK_STATUS = {
"open": "开放接单",
"in_progress": "进行中",
"submitted": "待验收",
"completed": "已完成",
"disputed": "争议中",
"cancelled": "已取消",
"expired": "已过期"
}
# ==========================================
# 📝 任务CRUD接口
# ==========================================
# ==========================================
# 🔄 过期任务自动处理 + 自动退款
# ==========================================
import datetime
def check_and_update_expired_tasks(tasks_db, db_session=None):
"""
检查并更新过期任务状态
- open 状态且超过截止日期:自动取消,退还冻结金额
- in_progress 状态且超过截止日期:标记为过期(不自动取消,需双方处理)
💳 P6支付增强:过期时自动退款
💳 P0修复:两阶段提交,确保SQL事务与JSON修改原子性
"""
today = datetime.date.today().isoformat() # "2026-03-30"
updated = False
expired_task_indices = [] # 记录过期任务的索引
# ========== 第一阶段:预检查(只读,收集需要退款的任务列表) ==========
for idx, task in enumerate(tasks_db):
deadline = task.get("deadline", "")
status = task.get("status", "")
if not deadline:
continue
# 检查是否过期
if deadline < today:
if status == "open":
# 开放接单状态且过期:记录需要处理
frozen_amount = task.get("frozen_amount", task.get("total_price", 0))
expired_task_indices.append({
"index": idx,
"task_id": task["id"],
"publisher": task.get("publisher"),
"amount": frozen_amount,
"title": task.get("title", "")
})
updated = True
elif status == "in_progress":
# 进行中且过期:直接标记(不涉及资金)
task["is_overdue"] = True
updated = True
# 如果没有过期任务,直接返回
if not expired_task_indices:
return updated
# 如果没有 db_session,只标记状态,不处理退款
if not db_session:
for item in expired_task_indices:
task = tasks_db[item["index"]]
task["status"] = "expired"
task["expired_at"] = int(time.time())
# 不标记退款,等待下次带 db_session 的调用
return updated
# ========== 第二阶段:执行SQL事务(钱包退款+交易记录) ==========
refund_results = [] # 记录退款成功的任务
sql_committed = False
try:
for item in expired_task_indices:
if item["amount"] > 0:
wallet = db_session.query(Wallet).filter(Wallet.account == item["publisher"]).with_for_update().first()
if wallet:
wallet.frozen_balance = max(0, wallet.frozen_balance - item["amount"])
wallet.balance += item["amount"]
# 记录退款交易
create_task_transaction(
db_session, item["publisher"], "TASK_REFUND",
item["amount"], task_id=item["task_id"],
task_title=item["title"]
)
refund_results.append(item)
# 💳 P0修复:先 commit SQL 事务
db_session.commit()
sql_committed = True
logger.info(f"TASK_EXPIRED_SQL_COMMIT | refunded_tasks={len(refund_results)}")
except Exception as e:
# 💳 P0修复:SQL事务失败,回滚所有操作,不修改JSON
db_session.rollback()
logger.error(f"TASK_EXPIRED_SQL_ROLLBACK | error={str(e)}")
# 不修改 JSON,下次再试
return False
# ========== 第三阶段:SQL成功后修改JSON任务状态 ==========
if sql_committed:
try:
for item in expired_task_indices:
task = tasks_db[item["index"]]
task["status"] = "expired"
task["expired_at"] = int(time.time())
if item["amount"] > 0:
task["refunded"] = True
task["refund_amount"] = item["amount"]
# 💳 P0修复:保存JSON数据
db.save_data("tasks.json", tasks_db)
logger.info(f"TASK_EXPIRED_JSON_SAVED | tasks={len(expired_task_indices)}")
except Exception as e:
# 💳 P0修复:JSON保存失败,记录warning但不回滚SQL(资金已转移,可后续手动恢复)
logger.warning(f"TASK_EXPIRED_JSON_SAVE_FAILED | error={str(e)} | 资金已退款但任务状态未更新,需手动修复")
# 不抛出异常,因为SQL事务已经成功,资金已经转移
# 发送退款通知(在事务外执行,失败不影响主流程)
for item in refund_results:
try:
add_notification(item["publisher"], {
"type": "task_refund",
"from_user": "system",
"target_item_id": item["task_id"],
"target_item_title": item["title"],
"content": f"💰 任务《{item['title']}》已过期自动取消,{item['amount']}积分已退还"
})
logger.info(f"TASK_REFUND | publisher={item['publisher']} | task={item['task_id']} | amount={item['amount']}")
except Exception as e:
logger.warning(f"TASK_REFUND_NOTIFY_ERROR | task={item['task_id']} | error={str(e)}")
return updated
@router.get("/api/tasks")
async def get_tasks(
page: int = 1,
limit: int = 20,
status: str = None,
sort: str = "latest",
db_session: Session = Depends(get_db)
):
"""
获取任务列表(分页)
- status: 筛选状态(open/in_progress/completed/expired等)
- sort: latest(最新)/price(价格高)/deadline(截止日期近)
"""
tasks_db = db.load_data("tasks.json", default_data=[])
users_db = db.load_data("users.json", default_data={})
# 自动检查并更新过期任务(带自动退款)
if check_and_update_expired_tasks(tasks_db, db_session):
db.save_data("tasks.json", tasks_db)
# users_db 已经是 {account: user_info} 格式,直接使用
user_map = users_db
# 状态筛选(默认排除已取消和过期的)
filtered = tasks_db
if status:
filtered = [t for t in filtered if t.get("status") == status]
else:
# 默认不显示已取消和过期的任务
filtered = [t for t in filtered if t.get("status") not in ["cancelled", "expired"]]
# 🗂️ 使用排序缓存优化排序性能
cache_key = f"tasks:{status or 'all'}:{sort}"
def sort_fn(data):
if sort == "price":
data.sort(key=lambda x: x.get("total_price", 0), reverse=True)
elif sort == "deadline":
data.sort(key=lambda x: x.get("deadline", "9999"))
elif sort == "views": # 👁️ 按总访问量排序
data.sort(key=lambda x: x.get("views", 0), reverse=True)
elif sort == "daily_views": # 👁️ 按日访问量排序
data.sort(key=lambda x: x.get("daily_views", 0), reverse=True)
elif sort == "likes": # ❤️ 按点赞数排序
data.sort(key=lambda x: x.get("likes", 0), reverse=True)
elif sort == "favorites": # ❤️ 按收藏数排序
data.sort(key=lambda x: x.get("favorites", 0), reverse=True)
else: # latest
data.sort(key=lambda x: x.get("created_at", 0), reverse=True)
filtered = sort_cache.get_sorted(cache_key, filtered, sort_fn)
# 分页
start = (page - 1) * limit
end = start + limit
paged = filtered[start:end]
# 附加发布者信息
result = []
for task in paged:
publisher_info = user_map.get(task.get("publisher"), {})
# 👁️ 过滤敏感字段(列表接口过滤 liked_by, favorited_by, viewed_by)
task_data = {k: v for k, v in task.items() if k not in ["viewed_by", "liked_by", "favorited_by"]}
result.append({
**task_data,
"publisher_name": publisher_info.get("name", task.get("publisher")),
"publisher_avatar": publisher_info.get("avatarDataUrl", ""),
"status_text": TASK_STATUS.get(task.get("status"), "未知")
})
return {
"status": "success",
"data": result,
"total": len(filtered),
"page": page,
"limit": limit
}
@router.get("/api/tasks/{task_id}")
async def get_task_detail(task_id: str, current_user: str = None):
"""
获取任务详情
"""
tasks_db = db.load_data("tasks.json", default_data=[])
users_db = db.load_data("users.json", default_data={})
# users_db 已经是 {account: user_info} 格式,直接使用
user_map = users_db
for task in tasks_db:
if task["id"] == task_id:
publisher_info = user_map.get(task.get("publisher"), {})
assignee_info = user_map.get(task.get("assignee"), {}) if task.get("assignee") else None
# 构建申请者列表(带用户信息)
applicants_with_info = []
for app in task.get("applicants", []):
app_user = user_map.get(app.get("account"), {})
applicants_with_info.append({
**app,
"name": app_user.get("name", app.get("account")),
"avatar": app_user.get("avatarDataUrl", "")
})
# 👁️ 过滤敏感字段(详情接口保留 liked_by 和 favorited_by,过滤 viewed_by)
task_data = {k: v for k, v in task.items() if k != "viewed_by"}
return {
"status": "success",
"data": {
**task_data,
"publisher_name": publisher_info.get("name", task.get("publisher")),
"publisher_avatar": publisher_info.get("avatarDataUrl", ""),
"assignee_name": assignee_info.get("name") if assignee_info else None,
"assignee_avatar": assignee_info.get("avatarDataUrl") if assignee_info else None,
"applicants": applicants_with_info,
"status_text": TASK_STATUS.get(task.get("status"), "未知")
}
}
raise HTTPException(status_code=404, detail="任务不存在")
@router.post("/api/tasks")
async def create_task(task: TaskCreate, current_user: str = Depends(require_auth), db_session: Session = Depends(get_db)):
"""
发布新任务
💳 P6支付增强:发布时冻结全额,避免支付时余额不足
"""
tasks_db = db.load_data("tasks.json", default_data=[])
# 验证订金比例
if task.depositRatio not in [10, 20, 30, 50]:
raise HTTPException(status_code=400, detail="订金比例必须是 10/20/30/50 之一")
# 验证价格
if task.totalPrice < 0:
raise HTTPException(status_code=400, detail="任务价格不能低于0积分")
# 💳 使用SQL钱包检查余额并冻结
wallet = db_session.query(Wallet).filter(Wallet.account == current_user).with_for_update().first()
if not wallet:
wallet = Wallet(account=current_user, balance=0)
db_session.add(wallet)
db_session.flush()
if wallet.balance < task.totalPrice:
raise HTTPException(status_code=400, detail=f"余额不足,需要{task.totalPrice}积分,当前{wallet.balance}积分")
# 💳 冻结全额(从余额转移到冻结余额)
wallet.balance -= task.totalPrice
wallet.frozen_balance += task.totalPrice
# 计算订金金额
deposit_amount = int(task.totalPrice * task.depositRatio / 100)
task_id = f"task_{int(time.time())}_{uuid.uuid4().hex[:6]}"
new_task = {
"id": task_id,
"title": task.title,
"description": task.description,
"reference_images": (task.referenceImages or [])[:6],
"reference_link": task.referenceLink,
"total_price": task.totalPrice,
"deposit_ratio": task.depositRatio,
"deposit_amount": deposit_amount,
"deadline": task.deadline,
"publisher": current_user,
"status": "open",
"created_at": int(time.time()),
# 接单相关
"applicants": [], # 申请接单的用户列表
"assignee": None, # 指派的接单者
"assigned_at": None, # 指派时间
# 交付相关
"deliverables": [], # 提交的成果
"submit_note": None, # 提交备注
"submitted_at": None, # 提交时间
# 验收相关
"completed_at": None, # 完成时间
"feedback": None, # 验收反馈
# 申诉相关
"dispute_id": None, # 关联的申诉ID
# 💳 支付状态
"frozen_amount": task.totalPrice, # 已冻结金额
# 互动数据
"likes": 0,
"favorites": 0,
"comments": 0,
"liked_by": [],
"favorited_by": [],
"tip_board": [], # 打赏榜单
"views": 0,
"daily_views": 0,
"viewed_by": [],
"daily_views_date": ""
}
tasks_db.insert(0, new_task)
db.save_data("tasks.json", tasks_db)
# 🗂️ 清除排序缓存
sort_cache.invalidate("tasks:")
# 💳 记录冻结交易
create_task_transaction(
db_session, current_user, "TASK_FREEZE",
-task.totalPrice, task_id=task_id,
task_title=task.title
)
db_session.commit()
logger.info(f"TASK_CREATE | publisher={current_user} | task={task_id} | price={task.totalPrice} | frozen={task.totalPrice}")
return {"status": "success", "data": new_task, "frozen_amount": task.totalPrice}
@router.put("/api/tasks/{task_id}")
async def update_task(task_id: str, update_data: TaskUpdate, current_user: str = Depends(require_auth)):
"""
更新任务(仅发布者可操作)
- open状态:可修改所有字段
- in_progress/submitted状态:只能修改标题/描述/参考图/链接,不能改价格,并通知接单人
"""
tasks_db = db.load_data("tasks.json", default_data=[])
for task in tasks_db:
if task["id"] == task_id:
if task.get("publisher") != current_user:
raise HTTPException(status_code=403, detail="无权修改他人任务")
status = task.get("status")
if status not in ["open", "in_progress", "submitted"]:
raise HTTPException(status_code=400, detail="当前状态不允许修改")
# 记录修改内容(用于通知)
changes = []
if update_data.title is not None and update_data.title != task.get("title"):
task["title"] = update_data.title
changes.append("标题")
if update_data.description is not None and update_data.description != task.get("description"):
task["description"] = update_data.description
changes.append("描述")
if update_data.referenceImages is not None:
task["reference_images"] = update_data.referenceImages[:6]
changes.append("参考图")
if update_data.referenceLink is not None and update_data.referenceLink != task.get("reference_link"):
task["reference_link"] = update_data.referenceLink
changes.append("参考链接")
# 价格和订金只能在open状态修改
if status == "open":
if update_data.totalPrice is not None:
# 验证价格
if update_data.totalPrice < 0:
raise HTTPException(status_code=400, detail="任务价格不能低于0积分")
task["total_price"] = update_data.totalPrice
task["deposit_amount"] = int(update_data.totalPrice * task["deposit_ratio"] / 100)
changes.append("价格")
if update_data.depositRatio is not None and update_data.depositRatio in [10, 20, 30, 50]:
task["deposit_ratio"] = update_data.depositRatio
task["deposit_amount"] = int(task["total_price"] * update_data.depositRatio / 100)
changes.append("订金比例")
if update_data.deadline is not None:
task["deadline"] = update_data.deadline
changes.append("截止日期")
db.save_data("tasks.json", tasks_db)
# 🗂️ 清除排序缓存
sort_cache.invalidate("tasks:")
# 📢 如果有接单人,发送通知
if task.get("assignee") and changes:
add_notification(task.get("assignee"), {
"type": "task_updated",
"from_user": current_user,
"target_item_id": task_id,
"target_item_title": task.get("title", ""),
"content": f"任务《{task.get('title', '')}》已更新:{'、'.join(changes)}"
})
return {"status": "success"}
raise HTTPException(status_code=404, detail="任务不存在")
@router.delete("/api/tasks/{task_id}")
async def cancel_task(task_id: str, current_user: str = Depends(require_auth), db_session: Session = Depends(get_db)):
"""
取消任务(仅发布者可操作,且仅在 open 状态时可取消)
💳 P0修复:取消时退还冻结资金
"""
tasks_db = db.load_data("tasks.json", default_data=[])
for task in tasks_db:
if task["id"] == task_id:
if task.get("publisher") != current_user:
raise HTTPException(status_code=403, detail="无权取消他人任务")
if task.get("status") != "open":
raise HTTPException(status_code=400, detail="只能取消开放状态的任务")
# 💳 P0修复:退还冻结资金
frozen_amount = task.get("frozen_amount", task.get("total_price", 0))
refund_success = False
if frozen_amount > 0:
try:
wallet = db_session.query(Wallet).filter(Wallet.account == current_user).with_for_update().first()
if wallet:
wallet.frozen_balance = max(0, wallet.frozen_balance - frozen_amount)
wallet.balance += frozen_amount
# 记录退款交易
create_task_transaction(
db_session, current_user, "TASK_CANCEL_REFUND",
frozen_amount, task_id=task_id,
task_title=task.get("title")
)
db_session.commit()
refund_success = True
logger.info(f"TASK_CANCEL_REFUND | user={current_user} | task={task_id} | amount={frozen_amount}")
except Exception as e:
db_session.rollback()
logger.error(f"TASK_CANCEL_REFUND_ERROR | task={task_id} | error={str(e)}")
raise HTTPException(status_code=500, detail="退款失败,请稍后重试")
else:
refund_success = True # 无需退款
# 只有退款成功才更新状态
if refund_success:
task["status"] = "cancelled"
task["cancelled_at"] = int(time.time())
task["refunded"] = frozen_amount > 0
task["refund_amount"] = frozen_amount
db.save_data("tasks.json", tasks_db)
# 🗂️ 清除排序缓存
sort_cache.invalidate("tasks:")
return {"status": "success", "refunded_amount": frozen_amount}
raise HTTPException(status_code=404, detail="任务不存在")
# ==========================================
# 🙋 申请接单
# ==========================================
@router.post("/api/tasks/{task_id}/apply")
async def apply_task(task_id: str, message: str = None, current_user: str = Depends(require_auth)):
"""
申请接单
"""
tasks_db = db.load_data("tasks.json", default_data=[])
for task in tasks_db:
if task["id"] == task_id:
if task.get("status") != "open":
raise HTTPException(status_code=400, detail="该任务不在开放接单状态")
if task.get("publisher") == current_user:
raise HTTPException(status_code=400, detail="不能申请自己发布的任务")
# 检查是否已申请
applicants = task.get("applicants", [])
if any(a["account"] == current_user for a in applicants):
raise HTTPException(status_code=400, detail="您已申请过该任务")
# 添加申请
applicants.append({
"account": current_user,
"message": message or "",
"applied_at": int(time.time())
})
task["applicants"] = applicants
db.save_data("tasks.json", tasks_db)
# 🗂️ 清除排序缓存
sort_cache.invalidate("tasks:")
# 🔔 通知发布者:有人申请接单
add_notification(task.get("publisher"), {
"type": "task_apply",
"from_user": current_user,
"target_item_id": task_id,
"target_item_title": task.get("title", ""),
"content": f"申请了您的任务《{task.get('title', '')}》"
})
return {"status": "success", "message": "申请成功,等待发布者选择"}
raise HTTPException(status_code=404, detail="任务不存在")
@router.delete("/api/tasks/{task_id}/apply")
async def cancel_apply(task_id: str, current_user: str = Depends(require_auth)):
"""
取消申请接单
"""
tasks_db = db.load_data("tasks.json", default_data=[])
for task in tasks_db:
if task["id"] == task_id:
if task.get("status") != "open":
raise HTTPException(status_code=400, detail="任务已开始,无法取消申请")
applicants = task.get("applicants", [])
new_applicants = [a for a in applicants if a["account"] != current_user]
if len(new_applicants) == len(applicants):
raise HTTPException(status_code=400, detail="您未申请该任务")
task["applicants"] = new_applicants
db.save_data("tasks.json", tasks_db)
# 🗂️ 清除排序缓存
sort_cache.invalidate("tasks:")
return {"status": "success"}
raise HTTPException(status_code=404, detail="任务不存在")
# ==========================================
# 🎯 指派接单者
# ==========================================
@router.post("/api/tasks/{task_id}/assign")
async def assign_task(task_id: str, assignee: str, current_user: str = Depends(require_auth), db_session: Session = Depends(get_db)):
"""
发布者选择接单者
💳 P6支付增强:从冻结金额中扣除订金,记录交易
"""
tasks_db = db.load_data("tasks.json", default_data=[])
for task in tasks_db:
if task["id"] == task_id:
if task.get("publisher") != current_user:
raise HTTPException(status_code=403, detail="只有发布者可以指派接单者")
if task.get("status") != "open":
raise HTTPException(status_code=400, detail="该任务不在开放状态")
# 验证接单者是否在申请列表中
applicants = task.get("applicants", [])
if not any(a["account"] == assignee for a in applicants):
raise HTTPException(status_code=400, detail="该用户未申请此任务")
deposit = task.get("deposit_amount", 0)
# 💳 从冻结金额中扣除订金(订金暂时不给接单者,待任务完成后一起给)
wallet = db_session.query(Wallet).filter(Wallet.account == current_user).with_for_update().first()
if not wallet or wallet.frozen_balance < deposit:
raise HTTPException(status_code=400, detail="冻结余额异常")
# 订金从冻结金额中扣除(但还不给接单者,待任务完成后一起支付)
wallet.frozen_balance -= deposit
# 获取接单者用户名
users_db = db.load_data("users.json", default_data={})
assignee_info = users_db.get(assignee, {})
assignee_name = assignee_info.get("name", assignee)
# 💳 记录订金支付交易
create_task_transaction(
db_session, current_user, "TASK_DEPOSIT",
-deposit, related_account=assignee, task_id=task_id,
task_title=task.get("title"),
related_user_name=assignee_name
)
# 更新任务状态
task["assignee"] = assignee
task["assigned_at"] = int(time.time())
task["status"] = "in_progress"
task["deposit_paid"] = deposit # 💳 记录已支付订金
db.save_data("tasks.json", tasks_db)
db_session.commit()
# 🗂️ 清除排序缓存
sort_cache.invalidate("tasks:")
# 🔔 通知接单者:被指派接单
add_notification(assignee, {
"type": "task_assigned",
"from_user": current_user,
"target_item_id": task_id,
"target_item_title": task.get("title", ""),
"content": f"您已被选为任务《{task.get('title', '')}》的接单者,订金{deposit}积分已缓冲"
})
logger.info(f"TASK_ASSIGN | publisher={current_user} | assignee={assignee} | task={task_id} | deposit={deposit}")
return {"status": "success", "message": f"已指派 {assignee} 接单,订金 {deposit} 积分已开始缓冲"}
raise HTTPException(status_code=404, detail="任务不存在")
# ==========================================
# 📤 提交成果
# ==========================================
@router.post("/api/tasks/{task_id}/submit")
async def submit_task(task_id: str, deliverables: list, note: str = None, current_user: str = Depends(require_auth)):
"""
接单者提交成果
"""
tasks_db = db.load_data("tasks.json", default_data=[])
for task in tasks_db:
if task["id"] == task_id:
if task.get("assignee") != current_user:
raise HTTPException(status_code=403, detail="只有接单者可以提交成果")
if task.get("status") != "in_progress":
raise HTTPException(status_code=400, detail="任务状态不允许提交")
if not deliverables:
raise HTTPException(status_code=400, detail="请上传交付成果")
task["deliverables"] = deliverables
task["submit_note"] = note
task["submitted_at"] = int(time.time())
task["status"] = "submitted"
db.save_data("tasks.json", tasks_db)
# 🗂️ 清除排序缓存
sort_cache.invalidate("tasks:")
# 🔔 通知发布者:接单者已提交成果
add_notification(task.get("publisher"), {
"type": "task_submitted",
"from_user": current_user,
"target_item_id": task_id,
"target_item_title": task.get("title", ""),
"content": f"已提交任务《{task.get('title', '')}》的成果,请验收"
})
return {"status": "success", "message": "成果已提交,等待发布者验收"}
raise HTTPException(status_code=404, detail="任务不存在")
# ==========================================
# ✅ 验收成果
# ==========================================
@router.post("/api/tasks/{task_id}/accept")
async def accept_task(task_id: str, is_accepted: bool, feedback: str = None, current_user: str = Depends(require_auth), db_session: Session = Depends(get_db)):
"""
发布者验收成果
- is_accepted=True: 验收通过,支付尾款给接单者
- is_accepted=False: 验收不通过,可以要求修改或发起申诉
💳 P6支付增强:使用SQL钱包支付,记录交易流水,发送支付通知
💳 P0修复:事务完整性保护,失败时回滚
"""
tasks_db = db.load_data("tasks.json", default_data=[])
for task in tasks_db:
if task["id"] == task_id:
if task.get("publisher") != current_user:
raise HTTPException(status_code=403, detail="只有发布者可以验收")
if task.get("status") != "submitted":
raise HTTPException(status_code=400, detail="任务状态不允许验收")
task["feedback"] = feedback
assignee_account = task.get("assignee")
total_price = task.get("total_price", 0)
deposit = task.get("deposit_paid", task.get("deposit_amount", 0)) # 优先使用已记录的支付订金
remaining = total_price - deposit # 尾款
if is_accepted:
# 💳 验收通过:支付尾款给接单者
# 💳 P0修复:使用事务保护,失败则回滚
try:
publisher_wallet = db_session.query(Wallet).filter(Wallet.account == current_user).with_for_update().first()
if not publisher_wallet or publisher_wallet.frozen_balance < remaining:
raise HTTPException(status_code=400, detail="冻结余额不足支付尾款")
# 扣除发布者尾款(从冻结余额)
publisher_wallet.frozen_balance -= remaining
# 给接单者全款(订金+尾款)
assignee_wallet = db_session.query(Wallet).filter(Wallet.account == assignee_account).with_for_update().first()
if not assignee_wallet:
assignee_wallet = Wallet(account=assignee_account, balance=0)
db_session.add(assignee_wallet)
db_session.flush()
assignee_wallet.balance += total_price # 全款进入可用余额
assignee_wallet.task_balance += total_price # 累计任务收益统计(只增不减)
# 获取双方用户名
users_db = db.load_data("users.json", default_data={})
publisher_info = users_db.get(current_user, {})
assignee_info = users_db.get(assignee_account, {})
publisher_name = publisher_info.get("name", current_user)
assignee_name = assignee_info.get("name", assignee_account)
# 💳 记录交易流水
# 1. 发布者支付尾款
create_task_transaction(
db_session, current_user, "TASK_PAYMENT",
-remaining, related_account=assignee_account, task_id=task_id,
task_title=task.get("title"),
related_user_name=assignee_name
)
# 2. 接单者收入
create_task_transaction(
db_session, assignee_account, "TASK_INCOME",
total_price, related_account=current_user, task_id=task_id,
task_title=task.get("title"),
related_user_name=publisher_name
)
task["status"] = "completed"
task["completed_at"] = int(time.time())
# 💳 P0修复:先commit SQL事务(资金为真),再最佳努力写入JSON
db_session.commit()
# 再最佳努力写入JSON
try:
db.save_data("tasks.json", tasks_db)
except Exception as e:
logger.warning(f"TASK_COMPLETE_JSON_SAVE_FAILED | 资金已结算但任务状态未更新,需手动修复: {str(e)}")
logger.info(f"TASK_COMPLETE | publisher={current_user} | assignee={assignee_account} | task={task_id} | total={total_price}")
# 🗂️ 清除排序缓存(任务状态变化)
sort_cache.invalidate("tasks:")
message = f"验收通过,已支付 {total_price} 积分给接单者"
# 🔔 支付通知:接单者收到款项
add_notification(assignee_account, {
"type": "task_payment",
"from_user": current_user,
"target_item_id": task_id,
"target_item_title": task.get("title", ""),
"content": f"💰 任务《{task.get('title', '')}》验收通过,{total_price}积分已到账"
})
except HTTPException:
raise # 重新抛出 HTTP 异常
except Exception as e:
db_session.rollback()
logger.error(f"TASK_ACCEPT_ERROR | task={task_id} | error={str(e)}")
raise HTTPException(status_code=500, detail="验收处理失败,请稍后重试")
else:
# 验收不通过:回到进行中状态,允许修改后重新提交
task["status"] = "in_progress"
task["deliverables"] = []
task["submitted_at"] = None
message = "验收不通过,接单者可以修改后重新提交"
db.save_data("tasks.json", tasks_db)
# 🗂️ 清除排序缓存(任务状态变化)
sort_cache.invalidate("tasks:")
# 🔔 通知接单者:验收未通过
add_notification(assignee_account, {
"type": "task_rejected",
"from_user": current_user,
"target_item_id": task_id,
"target_item_title": task.get("title", ""),
"content": f"任务《{task.get('title', '')}》验收未通过,请修改后重新提交"
})
return {"status": "success", "message": message}
raise HTTPException(status_code=404, detail="任务不存在")
# ==========================================
# ⚖️ 申诉仲裁系统
# ==========================================
@router.post("/api/tasks/{task_id}/dispute")
async def create_dispute(task_id: str, reason: str, evidence: list = None, current_user: str = Depends(require_auth)):
"""
发起申诉(发布者或接单者均可)
- evidence: 证据图片URL列表(可选)
"""
tasks_db = db.load_data("tasks.json", default_data=[])
disputes_db = db.load_data("disputes.json", default_data=[])
for task in tasks_db:
if task["id"] == task_id:
# 验证权限
if current_user not in [task.get("publisher"), task.get("assignee")]:
raise HTTPException(status_code=403, detail="只有发布者或接单者可以发起申诉")
# 验证状态
if task.get("status") not in ["in_progress", "submitted"]:
raise HTTPException(status_code=400, detail="当前状态不允许申诉")
# 检查是否已有申诉
if task.get("dispute_id"):
raise HTTPException(status_code=400, detail="该任务已有进行中的申诉")
# 确定角色
is_publisher = current_user == task.get("publisher")
role = "publisher" if is_publisher else "assignee"
# 创建申诉
dispute = {
"id": f"dispute_{int(time.time())}_{uuid.uuid4().hex[:6]}",
"task_id": task_id,
"task_title": task.get("title", ""),
"publisher": task.get("publisher"),
"assignee": task.get("assignee"),
"initiator": current_user,
"initiator_role": role,
"reason": reason,
"evidence": (evidence or [])[:6],
"status": "pending", # pending/responded/resolved
"created_at": int(time.time()),
# 被申诉方回应
"response": None,
"response_evidence": [],
"responded_at": None,
# 仲裁结果
"resolution": None, # favor_initiator/favor_respondent/split
"resolution_ratio": None, # 分成比例(如果是split)
"resolution_note": None,
"resolved_by": None,
"resolved_at": None
}
disputes_db.append(dispute)
task["status"] = "disputed"
task["dispute_id"] = dispute["id"]
db.save_data("tasks.json", tasks_db)
db.save_data("disputes.json", disputes_db)
# 🗂️ 清除排序缓存(任务状态变为争议中)
sort_cache.invalidate("tasks:")
# 🔔 通知对方:已发起申诉
other_party = task.get("assignee") if is_publisher else task.get("publisher")
add_notification(other_party, {
"type": "task_disputed",
"from_user": current_user,
"target_item_id": task_id,
"target_item_title": task.get("title", ""),
"content": f"对任务《{task.get('title', '')}》发起了申诉,请回应"
})
return {"status": "success", "data": dispute, "message": "申诉已提交,等待对方回应"}
raise HTTPException(status_code=404, detail="任务不存在")
@router.get("/api/disputes/{dispute_id}")
async def get_dispute_detail(dispute_id: str):
"""
获取申诉详情
"""
disputes_db = db.load_data("disputes.json", default_data=[])
users_db = db.load_data("users.json", default_data={})
# users_db 已经是 {account: user_info} 格式,直接使用
user_map = users_db
for dispute in disputes_db:
if dispute["id"] == dispute_id:
publisher_info = user_map.get(dispute.get("publisher"), {})
assignee_info = user_map.get(dispute.get("assignee"), {})
return {
"status": "success",
"data": {
**dispute,
"publisher_name": publisher_info.get("name", dispute.get("publisher")),
"publisher_avatar": publisher_info.get("avatarDataUrl", ""),
"assignee_name": assignee_info.get("name", dispute.get("assignee")),
"assignee_avatar": assignee_info.get("avatarDataUrl", "")
}
}
raise HTTPException(status_code=404, detail="申诉不存在")
@router.post("/api/disputes/{dispute_id}/respond")
async def respond_dispute(dispute_id: str, response: str, evidence: list = None, current_user: str = Depends(require_auth)):
"""
被申诉方回应申诉
"""
disputes_db = db.load_data("disputes.json", default_data=[])
for dispute in disputes_db:
if dispute["id"] == dispute_id:
# 验证权限:必须是被申诉方
is_publisher = dispute.get("initiator_role") == "publisher"
respondent = dispute.get("assignee") if is_publisher else dispute.get("publisher")
if current_user != respondent:
raise HTTPException(status_code=403, detail="只有被申诉方可以回应")
if dispute.get("status") != "pending":
raise HTTPException(status_code=400, detail="该申诉已回应或已解决")
dispute["response"] = response
dispute["response_evidence"] = (evidence or [])[:6]
dispute["responded_at"] = int(time.time())
dispute["status"] = "responded"
db.save_data("disputes.json", disputes_db)
# 🗂️ 清除排序缓存(争议状态变化可能影响排序)
sort_cache.invalidate("tasks:")
# 🔔 通知申诉方:对方已回应
add_notification(dispute.get("initiator"), {
"type": "dispute_responded",
"from_user": current_user,
"target_item_id": dispute.get("task_id"),
"target_item_title": dispute.get("task_title", ""),
"content": f"对方已回应您关于任务《{dispute.get('task_title', '')}》的申诉"
})
return {"status": "success", "message": "回应已提交,等待管理员仲裁"}
raise HTTPException(status_code=404, detail="申诉不存在")
@router.get("/api/admin/disputes")
async def get_admin_disputes(status: str = None, current_user: str = Depends(require_auth)):
"""
管理员获取申诉列表
- status: pending/responded/resolved
"""
if not is_admin(current_user):
raise HTTPException(status_code=403, detail="需要管理员权限")
disputes_db = db.load_data("disputes.json", default_data=[])
users_db = db.load_data("users.json", default_data={})
# users_db 已经是 {account: user_info} 格式,直接使用
user_map = users_db
# 筛选
filtered = disputes_db
if status:
filtered = [d for d in filtered if d.get("status") == status]
# 按时间倒序
filtered = sorted(filtered, key=lambda x: x.get("created_at", 0), reverse=True)
# 附加用户信息
result = []
for dispute in filtered:
publisher_info = user_map.get(dispute.get("publisher"), {})
assignee_info = user_map.get(dispute.get("assignee"), {})
result.append({
**dispute,
"publisher_name": publisher_info.get("name", dispute.get("publisher")),
"assignee_name": assignee_info.get("name", dispute.get("assignee"))
})
return {"status": "success", "data": result}
@router.post("/api/admin/disputes/{dispute_id}/resolve")
async def resolve_dispute(dispute_id: str, resolution: str, ratio: int = None, note: str = None, current_user: str = Depends(require_auth), db_session: Session = Depends(get_db)):
"""
管理员裁决申诉
- resolution: favor_initiator(支持申诉方) / favor_respondent(支持被申诉方) / split(按比例分成)
- ratio: 分成比例(0-100),表示申诉方获得的比例。仅split时有效
- note: 裁决说明
💳 P0改造:使用 SQL Wallet 系统处理资金,替代 JSON balance 操作
"""
if not is_admin(current_user):
raise HTTPException(status_code=403, detail="需要管理员权限")
if resolution not in ["favor_initiator", "favor_respondent", "split"]:
raise HTTPException(status_code=400, detail="无效的裁决结果")
if resolution == "split" and (ratio is None or ratio < 0 or ratio > 100):
raise HTTPException(status_code=400, detail="分成比例必须在0-100之间")
disputes_db = db.load_data("disputes.json", default_data=[])
tasks_db = db.load_data("tasks.json", default_data=[])
users_db = db.load_data("users.json", default_data={})
for dispute in disputes_db:
if dispute["id"] == dispute_id:
if dispute.get("status") == "resolved":
raise HTTPException(status_code=400, detail="该申诉已裁决")
# 查找关联任务
task = next((t for t in tasks_db if t["id"] == dispute.get("task_id")), None)
if not task:
raise HTTPException(status_code=404, detail="关联任务不存在")
total_price = task.get("total_price", 0)
deposit = task.get("deposit_amount", 0)
publisher_account = dispute.get("publisher")
assignee_account = dispute.get("assignee")
is_publisher_initiator = dispute.get("initiator_role") == "publisher"
# 💳 P0改造:使用 SQL Wallet 处理资金
try:
# 获取双方钱包(带悲观锁)
publisher_wallet = db_session.query(Wallet).filter(Wallet.account == publisher_account).with_for_update().first()
assignee_wallet = db_session.query(Wallet).filter(Wallet.account == assignee_account).with_for_update().first()
# 如果钱包不存在,创建新钱包
if not publisher_wallet:
publisher_wallet = Wallet(account=publisher_account, balance=0)
db_session.add(publisher_wallet)
db_session.flush()
if not assignee_wallet:
assignee_wallet = Wallet(account=assignee_account, balance=0)
db_session.add(assignee_wallet)
db_session.flush()
# 获取双方用户名
publisher_info = users_db.get(publisher_account, {})
assignee_info = users_db.get(assignee_account, {})
publisher_name = publisher_info.get("name", publisher_account)
assignee_name = assignee_info.get("name", assignee_account)
# 计算资金分配
if resolution == "favor_initiator":
# 支持申诉方
if is_publisher_initiator:
# 发布者胜诉:退还订金(从冻结余额释放到可用余额)
publisher_wallet.frozen_balance = max(0, publisher_wallet.frozen_balance - deposit)
publisher_wallet.balance += deposit
# 记录交易
create_task_transaction(
db_session, publisher_account, "TASK_REFUND",
deposit, task_id=task.get("id"),
task_title=task.get("title"),
related_user_name=assignee_name
)
initiator_amount = deposit
respondent_amount = 0
else:
# 接单者胜诉:获得全款
remaining = total_price - deposit
# 发布者支付尾款(从冻结余额扣除)
publisher_wallet.frozen_balance = max(0, publisher_wallet.frozen_balance - remaining)
# 接单者获得全款
assignee_wallet.balance += total_price
assignee_wallet.task_balance += total_price
# 记录交易
create_task_transaction(
db_session, publisher_account, "TASK_PAYMENT",
-remaining, related_account=assignee_account, task_id=task.get("id"),
task_title=task.get("title"),
related_user_name=assignee_name
)
create_task_transaction(
db_session, assignee_account, "TASK_INCOME",
total_price, related_account=publisher_account, task_id=task.get("id"),
task_title=task.get("title"),
related_user_name=publisher_name
)
initiator_amount = total_price
respondent_amount = 0
elif resolution == "favor_respondent":
# 支持被申诉方
if is_publisher_initiator:
# 接单者胜诉:获得全款
remaining = total_price - deposit
# 发布者支付尾款(从冻结余额扣除)
publisher_wallet.frozen_balance = max(0, publisher_wallet.frozen_balance - remaining)
# 接单者获得全款
assignee_wallet.balance += total_price
assignee_wallet.task_balance += total_price
# 记录交易
create_task_transaction(
db_session, publisher_account, "TASK_PAYMENT",
-remaining, related_account=assignee_account, task_id=task.get("id"),
task_title=task.get("title"),
related_user_name=assignee_name
)
create_task_transaction(
db_session, assignee_account, "TASK_INCOME",
total_price, related_account=publisher_account, task_id=task.get("id"),
task_title=task.get("title"),
related_user_name=publisher_name
)
initiator_amount = 0
respondent_amount = total_price
else:
# 发布者胜诉:退还订金(从冻结余额释放到可用余额)
publisher_wallet.frozen_balance = max(0, publisher_wallet.frozen_balance - deposit)
publisher_wallet.balance += deposit
# 记录交易
create_task_transaction(
db_session, publisher_account, "TASK_REFUND",
deposit, task_id=task.get("id"),
task_title=task.get("title"),
related_user_name=assignee_name
)
initiator_amount = 0
respondent_amount = deposit
else:
# 按比例分成
initiator_share = int(total_price * ratio / 100)
respondent_share = total_price - initiator_share
remaining = total_price - deposit
# 发布者支付尾款(从冻结余额扣除)
publisher_wallet.frozen_balance = max(0, publisher_wallet.frozen_balance - remaining)
# 发布者获得退款部分,接单者获得报酬部分
if is_publisher_initiator:
# 申诉方是发布者
pub_refund = initiator_share
assignee_earn = respondent_share
else:
# 申诉方是接单者
assignee_earn = initiator_share
pub_refund = respondent_share
# 分配资金
publisher_wallet.balance += pub_refund
assignee_wallet.balance += assignee_earn
assignee_wallet.task_balance += assignee_earn
# 记录交易
create_task_transaction(
db_session, publisher_account, "TASK_PAYMENT",
-remaining, related_account=assignee_account, task_id=task.get("id"),
task_title=task.get("title"),
related_user_name=assignee_name
)
create_task_transaction(
db_session, assignee_account, "TASK_INCOME",
assignee_earn, related_account=publisher_account, task_id=task.get("id"),
task_title=task.get("title"),
related_user_name=publisher_name
)
initiator_amount = initiator_share
respondent_amount = respondent_share
# 提交 SQL 事务
db_session.commit()
except Exception as e:
db_session.rollback()
logger.error(f"DISPUTE_RESOLVE_ERROR | dispute={dispute_id} | error={str(e)}")
raise HTTPException(status_code=500, detail="仲裁资金处理失败,请稍后重试")
# 更新申诉状态
dispute["resolution"] = resolution
dispute["resolution_ratio"] = ratio
dispute["resolution_note"] = note
dispute["resolved_by"] = current_user
dispute["resolved_at"] = int(time.time())
dispute["status"] = "resolved"
# 更新任务状态
task["status"] = "completed"
task["completed_at"] = int(time.time())
task["feedback"] = f"申诉裁决:{note or '无'}(结果:{resolution})"
db.save_data("disputes.json", disputes_db)
db.save_data("tasks.json", tasks_db)
# 🗂️ 清除排序缓存(任务状态变为已完成)
sort_cache.invalidate("tasks:")
# 🔔 通知双方:仲裁结果
resolution_text = {
"favor_initiator": "支持申诉方",
"favor_respondent": "支持被申诉方",
"split": f"双方协商分成({ratio}%:{100-ratio}%)"
}.get(resolution, "已裁决")
# 通知申诉方
add_notification(dispute.get("initiator"), {
"type": "dispute_resolved",
"from_user": current_user,
"target_item_id": dispute.get("task_id"),
"target_item_title": dispute.get("task_title", ""),
"content": f"您的申诉已裁决:{resolution_text},您获得{initiator_amount}积分"
})
# 通知被申诉方
respondent_account = dispute.get("assignee") if is_publisher_initiator else dispute.get("publisher")
add_notification(respondent_account, {
"type": "dispute_resolved",
"from_user": current_user,
"target_item_id": dispute.get("task_id"),
"target_item_title": dispute.get("task_title", ""),
"content": f"任务申诉已裁决:{resolution_text},您获得{respondent_amount}积分"
})
logger.info(f"DISPUTE_RESOLVED | dispute={dispute_id} | resolution={resolution} | initiator_amount={initiator_amount} | respondent_amount={respondent_amount}")
return {"status": "success", "message": f"裁决完成:{resolution_text}"}
raise HTTPException(status_code=404, detail="申诉不存在")
# ==========================================
# 📊 我的任务
# ==========================================
@router.get("/api/my-tasks")
async def get_my_tasks(role: str = "publisher", current_user: str = Depends(require_auth)):
"""
获取我的任务
- role=publisher: 我发布的任务
- role=assignee: 我接的任务
"""
tasks_db = db.load_data("tasks.json", default_data=[])
if role == "publisher":
my_tasks = [t for t in tasks_db if t.get("publisher") == current_user]
else:
my_tasks = [t for t in tasks_db if t.get("assignee") == current_user]
# 按创建时间倒序
my_tasks = sorted(my_tasks, key=lambda x: x.get("created_at", 0), reverse=True)
# 过滤敏感字段(列表接口过滤 viewed_by、liked_by、favorited_by)
result = []
for task in my_tasks:
task_data = {k: v for k, v in task.items() if k not in ["viewed_by", "liked_by", "favorited_by"]}
result.append(task_data)
return {"status": "success", "data": result}
@router.post("/api/tasks/{task_id}/view")
async def record_task_view(task_id: str, current_user: str = Depends(require_auth)):
"""
记录任务访问量
👁️ 需要用户认证,每个用户只计算一次总访问量,日访问量每次调用都增加
"""
result = record_view("tasks.json", task_id, current_user)
if result is None:
raise HTTPException(status_code=404, detail="任务不存在")
# 🗂️ 清除排序缓存(浏览量变化可能影响排序)
sort_cache.invalidate("tasks:")
return {"status": "success", "views": result["views"], "daily_views": result["daily_views"]}
# ==========================================
# ❤️ 互动接口(点赞/收藏)
# ==========================================
@router.post("/api/tasks/{task_id}/like")
async def toggle_like(task_id: str, current_user: str = Depends(require_auth)):
"""
点赞/取消点赞(原子操作,并发安全)
"""
result_container = [None]
def updater(data):
for task in data:
if task["id"] == task_id:
liked_by = task.get("liked_by", [])
if current_user in liked_by:
liked_by.remove(current_user)
task["likes"] = max(0, task.get("likes", 0) - 1)
action = "unliked"
else:
liked_by.append(current_user)
task["likes"] = task.get("likes", 0) + 1
action = "liked"
task["liked_by"] = liked_by
result_container[0] = {"status": "success", "action": action, "likes": task["likes"]}
return
result_container[0] = None # 未找到任务
db.atomic_update("tasks.json", updater, default_data=[])
if result_container[0] is None:
raise HTTPException(status_code=404, detail="任务不存在")
# 🗂️ 清除排序缓存(点赞数变化可能影响排序)
sort_cache.invalidate("tasks:")
return result_container[0]
@router.post("/api/tasks/{task_id}/favorite")
async def toggle_favorite(task_id: str, current_user: str = Depends(require_auth)):
"""
收藏/取消收藏(原子操作,并发安全)
"""
result_container = [None]
def updater(data):
for task in data:
if task["id"] == task_id:
favorited_by = task.get("favorited_by", [])
if current_user in favorited_by:
favorited_by.remove(current_user)
task["favorites"] = max(0, task.get("favorites", 0) - 1)
action = "unfavorited"
else:
favorited_by.append(current_user)
task["favorites"] = task.get("favorites", 0) + 1
action = "favorited"
task["favorited_by"] = favorited_by
result_container[0] = {"status": "success", "action": action, "favorites": task["favorites"]}
return
result_container[0] = None # 未找到任务
db.atomic_update("tasks.json", updater, default_data=[])
if result_container[0] is None:
raise HTTPException(status_code=404, detail="任务不存在")
# 🗂️ 清除排序缓存(收藏数变化可能影响排序)
sort_cache.invalidate("tasks:")
return result_container[0]
# ==========================================
# 🎁 打赏接口
# ==========================================
@router.post("/api/tasks/{task_id}/tip")
async def tip_task(task_id: str, amount: int, is_anon: bool = False, current_user: str = Depends(require_auth), db_session: Session = Depends(get_db)):
"""
打赏任务(原子操作,并发安全)- 使用 SQL Wallet 系统
"""
if amount <= 0:
raise HTTPException(status_code=400, detail="打赏金额必须大于0")
result_container = [None]
publisher_account = [None] # 用于在原子操作外获取发布者账号
def updater(data):
# 在锁内查找任务
target_task = None
for task in data:
if task["id"] == task_id:
target_task = task
break
if not target_task:
result_container[0] = {"error": "not_found"}
return
# 不能打赏自己
if target_task.get("publisher") == current_user:
result_container[0] = {"error": "self_tip"}
return
publisher_account[0] = target_task.get("publisher")
# 更新打赏榜单
tip_board = target_task.get("tip_board", [])
existing = next((t for t in tip_board if t["account"] == current_user), None)
if existing:
existing["amount"] += amount
else:
tip_board.append({"account": current_user, "amount": amount, "is_anon": is_anon})
tip_board.sort(key=lambda x: x["amount"], reverse=True)
target_task["tip_board"] = tip_board
result_container[0] = {"status": "success", "message": f"成功打赏 {amount} 积分"}
db.atomic_update("tasks.json", updater, default_data=[])
result = result_container[0]
if result is None or result.get("error") == "not_found":
raise HTTPException(status_code=404, detail="任务不存在")
if result.get("error") == "self_tip":
raise HTTPException(status_code=400, detail="不能打赏自己的任务")
# 💳 使用 SQL Wallet 系统处理余额转账
try:
publisher = publisher_account[0]
if not publisher:
raise HTTPException(status_code=404, detail="作者账户不存在")
# 🔒 P1幂等性防护:检查最近5秒内是否存在相同交易
recent_cutoff = datetime.datetime.utcnow() - datetime.timedelta(seconds=5)
duplicate_tx = db_session.query(Transaction).filter(
Transaction.account == current_user,
Transaction.tx_type == "TIP_OUT",
Transaction.amount == -amount,
Transaction.related_account == publisher,
Transaction.created_at >= recent_cutoff
).first()
if duplicate_tx:
return {"status": "success", "message": "打赏已处理(重复请求)"}
# 🔒 并发安全:使用悲观锁获取双方钱包
tipper_wallet = db_session.query(Wallet).filter(Wallet.account == current_user).with_for_update().first()
author_wallet = db_session.query(Wallet).filter(Wallet.account == publisher).with_for_update().first()
if not tipper_wallet or tipper_wallet.balance < amount:
raise HTTPException(status_code=400, detail="余额不足")
if not author_wallet:
# 如果作者钱包不存在,创建一个
author_wallet = Wallet(account=publisher, balance=0, earn_balance=0, tip_balance=0, frozen_balance=0)
db_session.add(author_wallet)
# 执行转账
tipper_wallet.balance -= amount
author_wallet.balance += amount # 实际收入进统一余额
author_wallet.tip_balance += amount # 累计打赏收益统计(只增不减)
# 创建交易记录
tx_id_tipper = f"TIP_OUT_{int(time.time())}_{uuid.uuid4().hex[:6]}"
tx_id_author = f"TIP_IN_{int(time.time())}_{uuid.uuid4().hex[:6]}"
# 获取最后交易记录的哈希
last_tx_tipper = db_session.query(Transaction).filter(Transaction.account == current_user).order_by(Transaction.created_at.desc()).first()
last_tx_author = db_session.query(Transaction).filter(Transaction.account == publisher).order_by(Transaction.created_at.desc()).first()
prev_hash_tipper = last_tx_tipper.tx_hash if last_tx_tipper else "GENESIS_HASH"
prev_hash_author = last_tx_author.tx_hash if last_tx_author else "GENESIS_HASH"
# 获取用户信息和任务标题
users_db = db.load_data("users.json", default_data={})
tasks_db = db.load_data("tasks.json", default_data=[])
author_info = users_db.get(publisher, {})
tipper_info = users_db.get(current_user, {})
author_name = author_info.get("name", publisher)
tipper_name = tipper_info.get("name", current_user)
task_title = None
for task in tasks_db:
if task["id"] == task_id:
task_title = task.get("title")
break
# 打赏方交易记录 (TIP_OUT)
tx_tipper = Transaction(
tx_id=tx_id_tipper,
account=current_user,
tx_type="TIP_OUT",
amount=-amount,
related_account=publisher,
item_id=task_id,
prev_hash=prev_hash_tipper,
tx_hash=calculate_tx_hash(tx_id_tipper, current_user, "TIP_OUT", -amount, prev_hash_tipper),
description=f"打赏给 {author_name}" + (f" 的任务《{task_title}》" if task_title else ""),
item_title=task_title,
item_type="task",
related_user_name=author_name
)
# 接收方交易记录 (TIP_IN)
tx_author = Transaction(
tx_id=tx_id_author,
account=publisher,
tx_type="TIP_IN",
amount=amount,
related_account=current_user,
item_id=task_id,
prev_hash=prev_hash_author,
tx_hash=calculate_tx_hash(tx_id_author, publisher, "TIP_IN", amount, prev_hash_author),
description=f"收到 {tipper_name} 的任务打赏" + (f" ({task_title})" if task_title else ""),
item_title=task_title,
item_type="task",
related_user_name=tipper_name if not is_anon else "匿名用户"
)
db_session.add(tx_tipper)
db_session.add(tx_author)
db_session.commit()
# 📝 审计日志
logger.info(f"TASK_TIP | from={current_user} | to={publisher} | amount={amount} | task={task_id} | anon={is_anon}")
# 🔔 打赏通知(考虑匿名)
if not is_anon:
add_notification(publisher, {
"type": "tip",
"from_user": current_user,
"target_item_id": task_id,
"target_item_title": task_title or "",
"content": f"您收到来自 {tipper_name}{amount} 积分任务打赏"
})
else:
add_notification(publisher, {
"type": "tip",
"from_user": "anonymous",
"target_item_id": task_id,
"target_item_title": "",
"content": f"您收到了一份 {amount} 积分的匿名任务打赏"
})
# 🗂️ 清除排序缓存(打赏可能影响排序)
sort_cache.invalidate("tasks:")
except HTTPException:
db_session.rollback()
raise
except Exception as e:
db_session.rollback()
logger.error(f"TASK_TIP_ERROR | from={current_user} | task={task_id} | amount={amount} | error={str(e)}")
raise HTTPException(status_code=500, detail="打赏处理失败,请稍后重试")
return result
# ==========================================
# 💬 评论接口(复用通用评论系统)
# ==========================================
@router.get("/api/tasks/{task_id}/comments")
async def get_task_comments(task_id: str):
"""
获取任务评论
"""
comments_db = db.load_data("comments.json", default_data={})
users_db = db.load_data("users.json", default_data={})
# users_db 已经是 {account: user_info} 格式,直接使用
user_map = users_db
# comments_db 是 {item_id: [comments]} 格式
task_comments = comments_db.get(task_id, [])
# 附加用户信息
result = []
for c in task_comments:
author_info = user_map.get(c.get("author"), {})
result.append({
**c,
"author_name": author_info.get("name", c.get("author")),
"author_avatar": author_info.get("avatarDataUrl", "")
})
return {"status": "success", "data": result}
@router.post("/api/tasks/{task_id}/comments")
async def add_task_comment(task_id: str, content: str, current_user: str = Depends(require_auth)):
"""
添加任务评论
"""
if not content or not content.strip():
raise HTTPException(status_code=400, detail="评论内容不能为空")
tasks_db = db.load_data("tasks.json", default_data=[])
comments_db = db.load_data("comments.json", default_data={})
# 检查任务是否存在
task_exists = any(t["id"] == task_id for t in tasks_db)
if not task_exists:
raise HTTPException(status_code=404, detail="任务不存在")
new_comment = {
"id": f"comment_{int(time.time())}_{uuid.uuid4().hex[:6]}",
"author": current_user,
"content": content.strip(),
"created_at": int(time.time())
}
# comments_db 是 {item_id: [comments]} 格式
task_comments = comments_db.get(task_id, [])
task_comments.insert(0, new_comment)
comments_db[task_id] = task_comments
db.save_data("comments.json", comments_db)
# 🗂️ 清除排序缓存(评论数变化可能影响排序)
sort_cache.invalidate("tasks:")
# 更新任务评论数
for task in tasks_db:
if task["id"] == task_id:
task["comments"] = task.get("comments", 0) + 1
break
db.save_data("tasks.json", tasks_db)
return {"status": "success", "data": new_comment}