Spaces:
Running
Running
| # router_wallet.py | |
| # ========================================== | |
| # 💰 钱包与交易路由模块 | |
| # ========================================== | |
| # 作用:处理充值、提现、购买、打赏等资金操作 | |
| # 关联文件: | |
| # - verify_code_engine.py (提现验证码缓存) | |
| # - database_sql.py (SQL数据库连接) | |
| # - models_sql.py (Wallet, Transaction, Ownership 模型) | |
| # 🔒 P0安全优化:API限流 | |
| # ========================================== | |
| from fastapi import APIRouter, Depends, HTTPException, Request | |
| from fastapi.responses import Response | |
| from sqlalchemy.orm import Session | |
| from pydantic import BaseModel | |
| import time | |
| import uuid | |
| import hashlib | |
| import os | |
| import datetime | |
| import logging | |
| from database_sql import get_db | |
| from models_sql import Wallet, Transaction, Ownership, Refund | |
| from models import RechargeRequest, WithdrawRequest, PurchaseRequest, TipRequest | |
| import 数据库连接 as json_db | |
| from notifications import add_notification | |
| from 安全认证 import require_auth, is_admin | |
| # 🔒 P0安全优化:API限流 | |
| from slowapi import Limiter | |
| from slowapi.util import get_remote_address | |
| limiter = Limiter(key_func=get_remote_address) | |
| # 🔄 P7后悔模式:24小时退款窗口 | |
| REFUND_WINDOW_HOURS = 24 | |
| # 🔄 P7后悔模式:退款后30天禁购 | |
| REFUND_BAN_DAYS = 30 | |
| # 💰 系统手续费账户 | |
| SYSTEM_FEE_ACCOUNT = "system_fee_account" | |
| # 📝 P2优化:审计日志 | |
| logger = logging.getLogger("ComfyUI-Ranking.Wallet") | |
| # 🔐 导入验证码缓存 (提现时需要验证) | |
| from verify_code_engine import VERIFY_CODES | |
| router = APIRouter() | |
| # ========================================== | |
| # 脱敏工具函数 | |
| # ========================================== | |
| def _mask_alipay(account): | |
| """脱敏支付宝账号""" | |
| if account and len(account) >= 7: | |
| return account[:3] + "****" + account[-4:] | |
| return account or "" | |
| def _mask_name(name): | |
| """脱敏姓名""" | |
| if name and len(name) > 0: | |
| return name[0] + "*" * (len(name) - 1) | |
| return name or "" | |
| # ========================================== | |
| # 🚨 替换这里的支付宝初始化逻辑 🚨 | |
| # ========================================== | |
| def format_pem_key(key_str, key_type="PRIVATE"): | |
| """自动给'裸奔'的秘钥穿上符合 Python 规范的外套""" | |
| # 如果用户已经带有 BEGIN 标签,说明格式没问题,直接返回 | |
| if "BEGIN" in key_str: | |
| return key_str.replace("\\n", "\n").strip() | |
| # 去除所有的空格和换行符,拿到最纯净的字符串 | |
| clean_key = key_str.replace(" ", "").replace("\n", "").replace("\\n", "").replace("\r", "") | |
| # 按照国际标准,每 64 个字符强制切断换行 | |
| chunks = [clean_key[i:i+64] for i in range(0, len(clean_key), 64)] | |
| formatted_body = "\n".join(chunks) | |
| # 穿上头尾外套 | |
| if key_type == "PRIVATE": | |
| return f"-----BEGIN RSA PRIVATE KEY-----\n{formatted_body}\n-----END RSA PRIVATE KEY-----" | |
| else: | |
| return f"-----BEGIN PUBLIC KEY-----\n{formatted_body}\n-----END PUBLIC KEY-----" | |
| alipay_error_msg = "未知错误" | |
| try: | |
| from alipay import AliPay | |
| from alipay.utils import AliPayConfig | |
| # 1. 抓取原生态的环境变量 | |
| raw_appid = os.environ.get("ALIPAY_APPID", "").strip() | |
| raw_priv_key = os.environ.get("ALIPAY_PRIVATE_KEY", "").strip() | |
| raw_pub_key = os.environ.get("ALIPAY_PUBLIC_KEY", "").strip() | |
| if not raw_appid or not raw_priv_key or not raw_pub_key: | |
| alipay_error_msg = f"缺少环境变量。当前读取到: APPID={bool(raw_appid)}, PRIV_KEY={bool(raw_priv_key)}, PUB_KEY={bool(raw_pub_key)}" | |
| alipay = None | |
| else: | |
| # 2. 扔进我们的格式化引擎进行自动包装! | |
| priv_key_formatted = format_pem_key(raw_priv_key, "PRIVATE") | |
| pub_key_formatted = format_pem_key(raw_pub_key, "PUBLIC") | |
| # 3. 完美加载 | |
| alipay = AliPay( | |
| appid=raw_appid, | |
| app_notify_url="https://zhiwei666-comfyui-ranking-api.hf.space/api/wallet/alipay_notify", | |
| app_private_key_string=priv_key_formatted, | |
| alipay_public_key_string=pub_key_formatted, | |
| sign_type="RSA2", | |
| debug=False, | |
| config=AliPayConfig(timeout=15) | |
| ) | |
| except Exception as e: | |
| alipay = None | |
| alipay_error_msg = f"支付宝 SDK 崩溃: {str(e)}" | |
| print(f"🚨 支付宝初始化异常: {alipay_error_msg}") | |
| def calculate_tx_hash(tx_id, account, tx_type, amount, prev_hash): | |
| data = f"{tx_id}{account}{tx_type}{amount}{prev_hash}" | |
| return hashlib.sha256(data.encode()).hexdigest() | |
| async def create_recharge_order(req: RechargeRequest): | |
| if not alipay: | |
| # 这里会将真实的错误原因直接弹窗发给前端! | |
| raise HTTPException(status_code=500, detail=f"支付网关配置错误: {alipay_error_msg}") | |
| order_id = f"PAY_{int(time.time())}_{uuid.uuid4().hex[:6]}" | |
| subject = f"ComfyUI Community Points - {req.account}" | |
| order_string = alipay.api_alipay_trade_precreate( | |
| out_trade_no=order_id, | |
| total_amount=str(req.amount), | |
| subject=subject | |
| ) | |
| qr_code_url = order_string.get("qr_code") | |
| if not qr_code_url: | |
| raise HTTPException(status_code=500, detail="生成支付二维码失败") | |
| return {"status": "success", "order_id": order_id, "qr_code": qr_code_url} | |
| # 🟢 业务流转细节修复:正确解析 application/x-www-form-urlencoded | |
| async def alipay_notify(request: Request, db: Session = Depends(get_db)): | |
| # 强制将表单数据解析为纯字典,防止由于数据类型错误导致验签失败 | |
| form_data = await request.form() | |
| data = dict(form_data.items()) | |
| signature = data.pop("sign", None) | |
| data.pop("sign_type", None) | |
| if not alipay or not signature or not alipay.verify(data, signature): | |
| return Response(content="fail", media_type="text/plain") | |
| if data.get("trade_status") in ("TRADE_SUCCESS", "TRADE_FINISHED"): | |
| order_id = data.get("out_trade_no") | |
| existing_tx = db.query(Transaction).filter(Transaction.tx_id == order_id).first() | |
| if not existing_tx: | |
| amount = int(float(data.get("total_amount", 0))) | |
| account = data.get("subject", "").split(" - ")[-1] | |
| # 🔒 P1并发安全:使用悲观锁+异常处理+事务回滚防止并发充值问题 | |
| try: | |
| wallet = db.query(Wallet).filter(Wallet.account == account).with_for_update().first() | |
| if not wallet: | |
| wallet = Wallet(account=account, balance=0, earn_balance=0, tip_balance=0, frozen_balance=0) | |
| db.add(wallet) | |
| wallet.balance = (wallet.balance or 0) + amount | |
| last_tx = db.query(Transaction).filter(Transaction.account == account).order_by(Transaction.created_at.desc()).first() | |
| prev_hash = last_tx.tx_hash if last_tx else "GENESIS_HASH" | |
| tx_hash = calculate_tx_hash(order_id, account, "RECHARGE", amount, prev_hash) | |
| new_tx = Transaction( | |
| tx_id=order_id, account=account, tx_type="RECHARGE", amount=amount, | |
| prev_hash=prev_hash, tx_hash=tx_hash, | |
| description="支付宝充值" | |
| ) | |
| db.add(new_tx) | |
| db.commit() | |
| # 📝 P2优化:充值审计日志 | |
| logger.info(f"RECHARGE | account={account} | amount={amount} | order={order_id}") | |
| except Exception as e: | |
| db.rollback() | |
| logger.error(f"RECHARGE_ERROR | account={account} | amount={amount} | order={order_id} | error={str(e)}") | |
| return Response(content="fail", media_type="text/plain") | |
| return Response(content="success", media_type="text/plain") | |
| async def check_order(order_id: str, account: str = None, db: Session = Depends(get_db)): | |
| # 防线 1:先查本地数据库(如果 Webhook 成功了,这里就能查到) | |
| tx = db.query(Transaction).filter(Transaction.tx_id == order_id).first() | |
| if tx: | |
| return {"status": "SUCCESS"} | |
| # 防线 2:主动向支付宝发起查单(解决 Hugging Face 收不到回调的绝杀技) | |
| if alipay and account: | |
| try: | |
| # 拿着订单号去问支付宝:这笔钱到底付了没? | |
| result = alipay.api_alipay_trade_query(out_trade_no=order_id) | |
| if result.get("code") == "10000" and result.get("trade_status") in ("TRADE_SUCCESS", "TRADE_FINISHED"): | |
| # 🔒 防重检查:支付宝查单成功后,再次检查是否已被回调处理过 | |
| existing_check = db.query(Transaction).filter(Transaction.tx_id == order_id).first() | |
| if existing_check: | |
| return {"status": "SUCCESS"} # 已被回调入账,不重复处理 | |
| # 钱真的到了!立刻加锁入账 | |
| amount = int(float(result.get("total_amount", 0))) | |
| wallet = db.query(Wallet).filter(Wallet.account == account).with_for_update().first() | |
| if not wallet: | |
| wallet = Wallet(account=account, balance=0, earn_balance=0, tip_balance=0, frozen_balance=0) | |
| db.add(wallet) | |
| wallet.balance = (wallet.balance or 0) + amount | |
| last_tx = db.query(Transaction).filter(Transaction.account == account).order_by(Transaction.created_at.desc()).first() | |
| prev_hash = last_tx.tx_hash if last_tx else "GENESIS_HASH" | |
| tx_hash = calculate_tx_hash(order_id, account, "RECHARGE", amount, prev_hash) | |
| new_tx = Transaction( | |
| tx_id=order_id, account=account, tx_type="RECHARGE", amount=amount, | |
| prev_hash=prev_hash, tx_hash=tx_hash, | |
| description="支付宝充值" | |
| ) | |
| db.add(new_tx) | |
| db.commit() | |
| return {"status": "SUCCESS"} | |
| except Exception as e: | |
| print(f"主动查单发生异常: {e}") | |
| # 如果没查到支付成功状态,继续让前端等 | |
| return {"status": "PENDING"} | |
| async def get_wallet(account: str, db: Session = Depends(get_db)): | |
| wallet = db.query(Wallet).filter(Wallet.account == account).first() | |
| # 🚀 P1性能优化:使用聚合函数代替 .all() + sum | |
| from sqlalchemy import func | |
| total_withdrawn = db.query(func.coalesce(func.sum(Transaction.amount), 0)).filter( | |
| Transaction.account == account, | |
| Transaction.tx_type == 'WITHDRAW' | |
| ).scalar() or 0 | |
| total_withdrawn = abs(total_withdrawn) # 提现金额是负数 | |
| if not wallet: | |
| return {"status": "success", "balance": 0, "earn_balance": 0, "tip_balance": 0, "task_balance": 0, "frozen_balance": 0, "total_withdrawn": total_withdrawn} | |
| return { | |
| "status": "success", | |
| "balance": wallet.balance, | |
| "earn_balance": wallet.earn_balance, | |
| "tip_balance": wallet.tip_balance, | |
| "task_balance": wallet.task_balance or 0, | |
| "frozen_balance": wallet.frozen_balance, | |
| "total_withdrawn": total_withdrawn # 暴露给前端 | |
| } | |
| # 🔒 P0安全优化:购买每分钟最多10次 | |
| async def purchase_item(request: Request, req: PurchaseRequest, db: Session = Depends(get_db)): | |
| items_db = json_db.load_data("items.json", default_data=[]) | |
| item = next((i for i in items_db if i["id"] == req.item_id), None) | |
| if not item: | |
| raise HTTPException(status_code=404, detail="商品不存在") | |
| # 🔄 P7后悔模式:检查价格是否延迟生效 | |
| actual_price = item.get("price", 0) | |
| pending_price = item.get("pending_price") | |
| pending_price_effective = item.get("pending_price_effective_at") | |
| if pending_price is not None and pending_price_effective: | |
| # 检查是否已过生效时间 | |
| effective_time = datetime.datetime.fromisoformat(pending_price_effective) | |
| if datetime.datetime.now() >= effective_time: | |
| actual_price = pending_price | |
| # 更新实际价格,清除待生效价格 | |
| item["price"] = pending_price | |
| item["pending_price"] = None | |
| item["pending_price_effective_at"] = None | |
| json_db.save_data("items.json", items_db) | |
| price = int(actual_price) | |
| seller_account = item.get("author") | |
| if price <= 0 or req.account == seller_account: | |
| # ☁️ 免费资源或作者本人,确保 Ownership 记录存在 | |
| existing_ownership = db.query(Ownership).filter( | |
| Ownership.account == req.account, | |
| Ownership.item_id == req.item_id, | |
| Ownership.is_refunded == False | |
| ).first() | |
| if not existing_ownership: | |
| new_ownership = Ownership( | |
| account=req.account, | |
| item_id=req.item_id, | |
| price_paid=0 | |
| ) | |
| db.add(new_ownership) | |
| db.commit() | |
| return { | |
| "status": "success", | |
| "already_owned": True, | |
| "netdisk_password": item.get("netdisk_password"), # ☁️ | |
| "is_netdisk": item.get("is_netdisk", False) # ☁️ | |
| } | |
| # 🔄 P7后悔模式:检查30天禁购 | |
| refund_ban = db.query(Refund).filter( | |
| Refund.account == req.account, | |
| Refund.item_id == req.item_id, | |
| Refund.ban_until > datetime.datetime.utcnow() | |
| ).first() | |
| if refund_ban: | |
| days_left = (refund_ban.ban_until - datetime.datetime.utcnow()).days + 1 | |
| raise HTTPException(status_code=403, detail=f"您已退款过此商品,{days_left}天内禁止再次购买") | |
| # 检查是否已拥有(排除已退款的记录) | |
| owned = db.query(Ownership).filter( | |
| Ownership.account == req.account, | |
| Ownership.item_id == req.item_id, | |
| Ownership.is_refunded == False | |
| ).first() | |
| if owned: | |
| # ☁️ 已购买用户,返回网盘密码 | |
| return { | |
| "status": "success", | |
| "already_owned": True, | |
| "netdisk_password": item.get("netdisk_password"), # ☁️ | |
| "is_netdisk": item.get("is_netdisk", False) # ☁️ | |
| } | |
| # 🔒 P1并发安全:使用悲观锁+异常处理+事务回滚防止双花问题 | |
| try: | |
| buyer_wallet = db.query(Wallet).filter(Wallet.account == req.account).with_for_update().first() | |
| if not buyer_wallet or buyer_wallet.balance < price: | |
| raise HTTPException(status_code=402, detail="余额不足,请先充值") | |
| seller_wallet = db.query(Wallet).filter(Wallet.account == seller_account).with_for_update().first() | |
| if not seller_wallet: | |
| seller_wallet = Wallet(account=seller_account, balance=0, earn_balance=0, tip_balance=0, frozen_balance=0) | |
| db.add(seller_wallet) | |
| buyer_wallet.balance = (buyer_wallet.balance or 0) - price | |
| seller_wallet.balance = (seller_wallet.balance or 0) + price # 实际收入进统一余额 | |
| seller_wallet.earn_balance = (seller_wallet.earn_balance or 0) + price # 累计销售收益统计(只增不减) | |
| # 🔄 P7后悔模式:记录购买价格 | |
| new_ownership = Ownership(account=req.account, item_id=req.item_id, price_paid=price) | |
| db.add(new_ownership) | |
| tx_id = f"BUY_{int(time.time())}_{uuid.uuid4().hex[:6]}" | |
| last_tx = db.query(Transaction).filter(Transaction.account == req.account).order_by(Transaction.created_at.desc()).first() | |
| prev_hash = last_tx.tx_hash if last_tx else "GENESIS_HASH" | |
| tx_hash = calculate_tx_hash(tx_id, req.account, "PURCHASE", -price, prev_hash) | |
| # 获取卖家用户名 | |
| users_db = json_db.load_data("users.json", default_data={}) | |
| seller_info = users_db.get(seller_account, {}) | |
| seller_user_name = seller_info.get("name", seller_account) | |
| # 创建交易记录 (字段名为 related_account,与 models_sql.py 中 Transaction 模型保持一致) | |
| new_tx = Transaction( | |
| tx_id=tx_id, account=req.account, tx_type="PURCHASE", amount=-price, | |
| related_account=seller_account, item_id=req.item_id, prev_hash=prev_hash, tx_hash=tx_hash, | |
| description=f"购买 {item.get('title', '未知资源')}", | |
| item_title=item.get('title', ''), | |
| item_type=item.get('type', ''), | |
| related_user_name=seller_user_name | |
| ) | |
| db.add(new_tx) | |
| db.commit() | |
| # 📝 P2优化:购买审计日志 | |
| logger.info(f"PURCHASE | buyer={req.account} | seller={seller_account} | item={req.item_id} | amount={price} | tx={tx_id}") | |
| # 🔔 通知卖家作品被购买 | |
| if seller_account and seller_account != req.account: | |
| add_notification(seller_account, { | |
| "type": "purchase", | |
| "from_user": req.account, | |
| "target_item_id": req.item_id, | |
| "target_item_title": item.get("title", ""), | |
| "content": f"您的作品《{item.get('title', '')}》已被购买" | |
| }) | |
| # ☁️ 购买成功后返回网盘密码 | |
| return { | |
| "status": "success", | |
| "already_owned": False, | |
| "netdisk_password": item.get("netdisk_password"), # ☁️ 只有购买成功才返回 | |
| "is_netdisk": item.get("is_netdisk", False) # ☁️ | |
| } | |
| except HTTPException: | |
| db.rollback() | |
| raise | |
| except Exception as e: | |
| db.rollback() | |
| logger.error(f"PURCHASE_ERROR | buyer={req.account} | item={req.item_id} | error={str(e)}") | |
| raise HTTPException(status_code=500, detail="购买处理失败,请稍后重试") | |
| # 🔒 P0安全优化:打赏每分钟最多20次 | |
| async def tip_user(request: Request, req: TipRequest, db: Session = Depends(get_db)): | |
| if req.amount <= 0: | |
| raise HTTPException(status_code=400, detail="打赏金额必须大于0") | |
| if req.sender_account == req.target_account: | |
| raise HTTPException(status_code=400, detail="不能打赏给自己") | |
| # 🔒 P1幂等性防护:检查最近5秒内是否存在相同交易 | |
| recent_cutoff = datetime.datetime.utcnow() - datetime.timedelta(seconds=5) | |
| duplicate_tx = db.query(Transaction).filter( | |
| Transaction.account == req.sender_account, | |
| Transaction.tx_type == "TIP_OUT", | |
| Transaction.amount == -req.amount, | |
| Transaction.related_account == req.target_account, | |
| Transaction.created_at >= recent_cutoff | |
| ).first() | |
| if duplicate_tx: | |
| return {"status": "success", "message": "打赏已处理(重复请求)"} | |
| # 🔒 P1并发安全:使用悲观锁+异常处理+事务回滚防止双花问题 | |
| try: | |
| sender_wallet = db.query(Wallet).filter(Wallet.account == req.sender_account).with_for_update().first() | |
| target_wallet = db.query(Wallet).filter(Wallet.account == req.target_account).with_for_update().first() | |
| if not sender_wallet or sender_wallet.balance < req.amount: | |
| raise HTTPException(status_code=400, detail="余额不足") | |
| if not target_wallet: | |
| target_wallet = Wallet(account=req.target_account, balance=0, earn_balance=0, tip_balance=0, frozen_balance=0) | |
| db.add(target_wallet) | |
| sender_wallet.balance -= req.amount | |
| target_wallet.balance += req.amount # 实际收入进统一余额 | |
| target_wallet.tip_balance += req.amount # 累计打赏收益统计(只增不减) | |
| tx_id_sender = f"TIP_OUT_{int(time.time())}_{uuid.uuid4().hex[:6]}" | |
| tx_id_target = f"TIP_IN_{int(time.time())}_{uuid.uuid4().hex[:6]}" | |
| # 记录交易 | |
| last_tx_sender = db.query(Transaction).filter(Transaction.account == req.sender_account).order_by(Transaction.created_at.desc()).first() | |
| last_tx_target = db.query(Transaction).filter(Transaction.account == req.target_account).order_by(Transaction.created_at.desc()).first() | |
| prev_hash_sender = last_tx_sender.tx_hash if last_tx_sender else "GENESIS_HASH" | |
| prev_hash_target = last_tx_target.tx_hash if last_tx_target else "GENESIS_HASH" | |
| # 获取用户名和资源信息 | |
| users_db = json_db.load_data("users.json", default_data={}) | |
| target_user_info = users_db.get(req.target_account, {}) | |
| sender_user_info = users_db.get(req.sender_account, {}) | |
| target_user_name = target_user_info.get("name", req.target_account) | |
| sender_user_name = sender_user_info.get("name", req.sender_account) | |
| # 如果有关联资源,获取资源信息 | |
| item_title = None | |
| item_type = None | |
| if req.item_id: | |
| items_db = json_db.load_data("items.json", default_data=[]) | |
| item = next((i for i in items_db if i["id"] == req.item_id), None) | |
| if item: | |
| item_title = item.get("title") | |
| item_type = item.get("type") | |
| # 发送方交易记录 (字段名为 related_account,与 models_sql.py 中 Transaction 模型保持一致) | |
| tx_sender = Transaction( | |
| tx_id=tx_id_sender, account=req.sender_account, tx_type="TIP_OUT", amount=-req.amount, | |
| related_account=req.target_account, prev_hash=prev_hash_sender, | |
| tx_hash=calculate_tx_hash(tx_id_sender, req.sender_account, "TIP_OUT", -req.amount, prev_hash_sender), | |
| description=f"打赏给 {target_user_name}" + (f" 的 {item_title}" if item_title else ""), | |
| item_title=item_title, | |
| item_type=item_type, | |
| related_user_name=target_user_name | |
| ) | |
| # 接收方交易记录 | |
| tx_target = Transaction( | |
| tx_id=tx_id_target, account=req.target_account, tx_type="TIP_IN", amount=req.amount, | |
| related_account=req.sender_account, prev_hash=prev_hash_target, | |
| tx_hash=calculate_tx_hash(tx_id_target, req.target_account, "TIP_IN", req.amount, prev_hash_target), | |
| description=f"收到 {sender_user_name} 的打赏" + (f" ({item_title})" if item_title else ""), | |
| item_title=item_title, | |
| item_type=item_type, | |
| related_user_name=sender_user_name if not req.is_anonymous else "匿名用户" | |
| ) | |
| db.add(tx_sender) | |
| db.add(tx_target) | |
| db.commit() | |
| # 📝 P2优化:打赏审计日志 | |
| logger.info(f"TIP | from={req.sender_account} | to={req.target_account} | amount={req.amount} | item={req.item_id or 'N/A'} | anon={req.is_anonymous}") | |
| # 🔔 打赏通知(考虑匿名) | |
| if not req.is_anonymous: | |
| sender_display = sender_user_name if sender_user_name else req.sender_account | |
| add_notification(req.target_account, { | |
| "type": "tip", | |
| "from_user": req.sender_account, | |
| "target_item_id": req.item_id or "", | |
| "target_item_title": item_title or "", | |
| "content": f"您收到来自 {sender_display} 的 {req.amount} 积分打赏" | |
| }) | |
| else: | |
| add_notification(req.target_account, { | |
| "type": "tip", | |
| "from_user": "anonymous", | |
| "target_item_id": req.item_id or "", | |
| "target_item_title": "", | |
| "content": f"您收到了一份 {req.amount} 积分的匿名打赏" | |
| }) | |
| # 🚀 核心新增:记录打赏榜单和月度收益趋势 (写入 JSON 以供高频读取) | |
| users_db = json_db.load_data("users.json", default_data={}) | |
| items_db = json_db.load_data("items.json", default_data=[]) | |
| current_month = datetime.date.today().strftime("%Y-%m") | |
| # 1. 更新创作者的总打赏榜与收益趋势 | |
| if req.target_account in users_db: | |
| u = users_db[req.target_account] | |
| if "tip_history" not in u: u["tip_history"] = {} | |
| u["tip_history"][current_month] = u["tip_history"].get(current_month, 0) + req.amount | |
| if "tip_board" not in u: u["tip_board"] = [] | |
| sender_entry = next((x for x in u["tip_board"] if x["account"] == req.sender_account), None) | |
| if sender_entry: | |
| sender_entry["amount"] += req.amount | |
| else: | |
| u["tip_board"].append({"account": req.sender_account, "amount": req.amount, "is_anon": req.is_anonymous}) | |
| u["tip_board"] = sorted(u["tip_board"], key=lambda x: x["amount"], reverse=True) | |
| json_db.save_data("users.json", users_db) | |
| # 2. 如果关联了具体作品,更新作品详情的专属打赏榜与收益趋势 | |
| if req.item_id: | |
| for item in items_db: | |
| if item["id"] == req.item_id: | |
| if "tip_history" not in item: item["tip_history"] = {} | |
| item["tip_history"][current_month] = item["tip_history"].get(current_month, 0) + req.amount | |
| if "tip_board" not in item: item["tip_board"] = [] | |
| sender_entry = next((x for x in item["tip_board"] if x["account"] == req.sender_account), None) | |
| if sender_entry: | |
| sender_entry["amount"] += req.amount | |
| else: | |
| item["tip_board"].append({"account": req.sender_account, "amount": req.amount, "is_anon": req.is_anonymous}) | |
| item["tip_board"] = sorted(item["tip_board"], key=lambda x: x["amount"], reverse=True) | |
| json_db.save_data("items.json", items_db) | |
| break | |
| return {"status": "success", "balance": sender_wallet.balance} | |
| except HTTPException: | |
| db.rollback() | |
| raise | |
| except Exception as e: | |
| db.rollback() | |
| logger.error(f"TIP_ERROR | from={req.sender_account} | to={req.target_account} | amount={req.amount} | error={str(e)}") | |
| raise HTTPException(status_code=500, detail="打赏处理失败,请稍后重试") | |
| # 🔒 P0安全优化:提现每分钟最多3次 | |
| async def withdraw(request: Request, req: WithdrawRequest, db: Session = Depends(get_db)): | |
| # 🔒 P1幂等性防护:检查最近10秒内是否存在相同提现请求 | |
| recent_cutoff = datetime.datetime.utcnow() - datetime.timedelta(seconds=10) | |
| duplicate_tx = db.query(Transaction).filter( | |
| Transaction.account == req.account, | |
| Transaction.tx_type == "WITHDRAW", | |
| Transaction.amount == -req.amount, | |
| Transaction.created_at >= recent_cutoff | |
| ).first() | |
| if duplicate_tx: | |
| return {"status": "success", "message": "提现申请已提交(重复请求)"} | |
| # 🔒 最小金额检查 | |
| if req.amount < 1: | |
| raise HTTPException(status_code=400, detail="最小提现金额为1积分") | |
| # 🔒 验证码缓存键格式:{contact}_{action_type},与 send_code 接口一致 | |
| # 先通过账号查询用户邮箱,再用邮箱构建缓存键 | |
| users_db = json_db.load_data("users.json", default_data={}) | |
| user_info = users_db.get(req.account, {}) | |
| user_email = user_info.get("email", "") | |
| key = f"{user_email}_withdraw" if user_email else f"{req.account}_withdraw" | |
| code_data = VERIFY_CODES.get(key) | |
| # 🔒 P0安全修复:统一使用 expires_at 字段,兼容旧版 expires | |
| expire_time = code_data.get("expires_at", code_data.get("expires", 0)) if code_data else 0 | |
| if not code_data or code_data["code"] != req.code or time.time() > expire_time: | |
| raise HTTPException(status_code=400, detail="验证码无效或已过期") | |
| # 🔒 P1并发安全:使用悲观锁+异常处理+事务回滚防止并发问题 | |
| try: | |
| wallet = db.query(Wallet).filter(Wallet.account == req.account).with_for_update().first() | |
| if not wallet: | |
| raise HTTPException(status_code=400, detail="钱包不存在") | |
| # 🚀 核心新增:阶梯手续费计算 (与前端逻辑统一) | |
| # 查询历史累计提现总额 (WITHDRAW 类型的 amount 是负数,需要取绝对值) | |
| # 🚀 P1性能优化:使用聚合函数代替 .all() + sum | |
| from sqlalchemy import func as sql_func | |
| withdrawals_sum = db.query(sql_func.coalesce(sql_func.sum(Transaction.amount), 0)).filter( | |
| Transaction.account == req.account, | |
| Transaction.tx_type == 'WITHDRAW' | |
| ).scalar() or 0 | |
| total_withdrawn = abs(withdrawals_sum) | |
| # 手续费规则:100积分免手续费额度,超出部分收取 10% | |
| free_quota = max(0, 100 - total_withdrawn) # 剩余免责额度 | |
| fee_amount = 0 | |
| if req.amount > free_quota: | |
| fee_amount = int((req.amount - free_quota) * 0.10) # 只对超出部分收 10% | |
| actual_withdraw = req.amount # 从账户扣除的金额 | |
| net_amount = req.amount - fee_amount # 用户实际到账金额 | |
| # 可提现额度改为 wallet.balance(统一余额) | |
| if actual_withdraw > wallet.balance: | |
| raise HTTPException(status_code=400, detail="可提现余额不足") | |
| wallet.balance -= actual_withdraw # 直接从统一余额扣除 | |
| wallet.frozen_balance += net_amount # 冻结的是到账金额,非手续费部分 | |
| # 💰 手续费转入系统账户 | |
| if fee_amount > 0: | |
| sys_wallet = db.query(Wallet).filter(Wallet.account == SYSTEM_FEE_ACCOUNT).with_for_update().first() | |
| if not sys_wallet: | |
| sys_wallet = Wallet(account=SYSTEM_FEE_ACCOUNT, balance=0, earn_balance=0, tip_balance=0, task_balance=0, frozen_balance=0) | |
| db.add(sys_wallet) | |
| db.flush() | |
| sys_wallet.balance += fee_amount | |
| tx_id = f"WD_{int(time.time())}_{uuid.uuid4().hex[:6]}" | |
| last_tx = db.query(Transaction).filter(Transaction.account == req.account).order_by(Transaction.created_at.desc()).first() | |
| prev_hash = last_tx.tx_hash if last_tx else "GENESIS_HASH" | |
| tx_hash = calculate_tx_hash(tx_id, req.account, "WITHDRAW", -actual_withdraw, prev_hash) | |
| # 脱敏支付宝账号和姓名 | |
| masked_alipay = _mask_alipay(req.alipayAccount) | |
| masked_name = _mask_name(req.real_name) | |
| new_tx = Transaction( | |
| tx_id=tx_id, account=req.account, tx_type="WITHDRAW", amount=-actual_withdraw, | |
| alipay_account=req.alipayAccount, | |
| real_name=req.real_name, | |
| withdraw_status="pending", | |
| net_amount=net_amount, # 记录实际到账金额,用于解冻时准确扣减 | |
| prev_hash=prev_hash, tx_hash=tx_hash, | |
| description=f"提现到支付宝 {masked_alipay} ({masked_name})" | |
| ) | |
| db.add(new_tx) | |
| # 🚀 如果有手续费,额外记录一笔手续费交易 | |
| if fee_amount > 0: | |
| fee_tx_id = f"FEE_{int(time.time())}_{uuid.uuid4().hex[:6]}" | |
| fee_tx_hash = calculate_tx_hash(fee_tx_id, req.account, "WITHDRAW_FEE", -fee_amount, tx_hash) | |
| fee_tx = Transaction( | |
| tx_id=fee_tx_id, account=req.account, tx_type="WITHDRAW_FEE", amount=-fee_amount, | |
| prev_hash=tx_hash, tx_hash=fee_tx_hash, | |
| description="提现手续费" | |
| ) | |
| db.add(fee_tx) | |
| db.commit() | |
| # 📝 P2优化:提现审计日志 | |
| logger.info(f"WITHDRAW | account={req.account} | amount={actual_withdraw} | fee={fee_amount} | net={net_amount} | tx={tx_id}") | |
| del VERIFY_CODES[key] | |
| return { | |
| "status": "success", | |
| "withdraw_amount": actual_withdraw, | |
| "fee_amount": fee_amount, | |
| "net_amount": net_amount, | |
| "free_quota_used": min(req.amount, free_quota + total_withdrawn) - total_withdrawn # 本次消耗的免责额度 | |
| } | |
| except HTTPException: | |
| db.rollback() | |
| raise | |
| except Exception as e: | |
| db.rollback() | |
| logger.error(f"WITHDRAW_ERROR | account={req.account} | amount={req.amount} | error={str(e)}") | |
| raise HTTPException(status_code=500, detail="提现处理失败,请稍后重试") | |
| # ========================================== | |
| # 💳 P6支付增强:交易明细查询API | |
| # ========================================== | |
| async def get_transactions( | |
| account: str, | |
| page: int = 1, | |
| limit: int = 20, | |
| tx_type: str = None, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| 获取用户交易明细(分页) | |
| - tx_type: 可选筛选(RECHARGE/PURCHASE/TIP_OUT/TIP_IN/WITHDRAW/TASK_FREEZE/TASK_DEPOSIT/TASK_PAYMENT/TASK_INCOME/TASK_REFUND) | |
| """ | |
| query = db.query(Transaction).filter(Transaction.account == account) | |
| if tx_type: | |
| query = query.filter(Transaction.tx_type == tx_type) | |
| total = query.count() | |
| transactions = query.order_by(Transaction.created_at.desc()).offset((page - 1) * limit).limit(limit).all() | |
| # 格式化输出 | |
| tx_list = [] | |
| for tx in transactions: | |
| tx_data = { | |
| "tx_id": tx.tx_id, | |
| "tx_type": tx.tx_type, | |
| "amount": tx.amount, | |
| "related_account": tx.related_account, | |
| "item_id": tx.item_id, | |
| "created_at": tx.created_at.isoformat() if tx.created_at else None, | |
| # 新增字段 | |
| "description": tx.description, | |
| "item_title": tx.item_title, | |
| "item_type": tx.item_type, | |
| "related_user_name": tx.related_user_name, | |
| } | |
| # 提现类型额外返回脱敏信息 | |
| if tx.tx_type == "WITHDRAW": | |
| # 脱敏支付宝账号 | |
| if tx.alipay_account: | |
| tx_data["alipay_account"] = _mask_alipay(tx.alipay_account) | |
| if tx.real_name: | |
| tx_data["real_name"] = _mask_name(tx.real_name) | |
| tx_data["withdraw_status"] = tx.withdraw_status | |
| tx_data["net_amount"] = tx.net_amount | |
| tx_list.append(tx_data) | |
| return { | |
| "status": "success", | |
| "data": tx_list, | |
| "total": total, | |
| "page": page, | |
| "limit": limit | |
| } | |
| async def get_task_stats(account: str, db: Session = Depends(get_db)): | |
| """ | |
| 📊 获取用户任务收益统计 | |
| 🚀 P1性能优化:使用单次查询+分组聚合替代多次查询 | |
| """ | |
| from sqlalchemy import func as sql_func, case | |
| # 🚀 P1性能优化:使用分组聚合一次查询多种类型的统计 | |
| stats = db.query( | |
| Transaction.tx_type, | |
| sql_func.count(Transaction.tx_id).label('count'), | |
| sql_func.coalesce(sql_func.sum(Transaction.amount), 0).label('total') | |
| ).filter( | |
| Transaction.account == account, | |
| Transaction.tx_type.in_(["TASK_INCOME", "TASK_FREEZE", "TASK_DEPOSIT", "TASK_PAYMENT", "TASK_REFUND"]) | |
| ).group_by(Transaction.tx_type).all() | |
| # 解析统计结果 | |
| stats_map = {s.tx_type: {'count': s.count, 'total': s.total} for s in stats} | |
| total_income = stats_map.get('TASK_INCOME', {}).get('total', 0) or 0 | |
| income_count = stats_map.get('TASK_INCOME', {}).get('count', 0) or 0 | |
| # 任务支出(发布任务的支付) | |
| total_payment = abs( | |
| (stats_map.get('TASK_FREEZE', {}).get('total', 0) or 0) + | |
| (stats_map.get('TASK_DEPOSIT', {}).get('total', 0) or 0) + | |
| (stats_map.get('TASK_PAYMENT', {}).get('total', 0) or 0) | |
| ) | |
| payment_count = ( | |
| (stats_map.get('TASK_FREEZE', {}).get('count', 0) or 0) + | |
| (stats_map.get('TASK_DEPOSIT', {}).get('count', 0) or 0) + | |
| (stats_map.get('TASK_PAYMENT', {}).get('count', 0) or 0) | |
| ) | |
| total_refund = stats_map.get('TASK_REFUND', {}).get('total', 0) or 0 | |
| # 最近交易(任务相关) | |
| recent_txs = db.query(Transaction).filter( | |
| Transaction.account == account, | |
| Transaction.tx_type.in_(["TASK_INCOME", "TASK_PAYMENT", "TASK_DEPOSIT", "TASK_FREEZE", "TASK_REFUND"]) | |
| ).order_by(Transaction.created_at.desc()).limit(10).all() | |
| recent_list = [{ | |
| "tx_id": tx.tx_id, | |
| "tx_type": tx.tx_type, | |
| "amount": tx.amount, | |
| "item_id": tx.item_id, | |
| "created_at": tx.created_at.isoformat() if tx.created_at else None | |
| } for tx in recent_txs] | |
| return { | |
| "status": "success", | |
| "data": { | |
| "total_income": total_income, | |
| "income_count": income_count, | |
| "total_payment": total_payment, | |
| "payment_count": payment_count, | |
| "total_refund": total_refund, | |
| "net_earnings": total_income - total_payment + total_refund, | |
| "recent_transactions": recent_list | |
| } | |
| } | |
| async def get_tip_stats(account: str, db: Session = Depends(get_db)): | |
| """📊 获取用户打赏统计""" | |
| from sqlalchemy import func as sql_func | |
| stats = db.query( | |
| Transaction.tx_type, | |
| sql_func.count(Transaction.tx_id).label('count'), | |
| sql_func.coalesce(sql_func.sum(Transaction.amount), 0).label('total') | |
| ).filter( | |
| Transaction.account == account, | |
| Transaction.tx_type.in_(["TIP_OUT", "TIP_IN"]) | |
| ).group_by(Transaction.tx_type).all() | |
| stats_map = {s.tx_type: {'count': s.count, 'total': int(s.total)} for s in stats} | |
| total_tip_in = stats_map.get('TIP_IN', {}).get('total', 0) or 0 | |
| tip_in_count = stats_map.get('TIP_IN', {}).get('count', 0) or 0 | |
| total_tip_out = abs(stats_map.get('TIP_OUT', {}).get('total', 0) or 0) | |
| tip_out_count = stats_map.get('TIP_OUT', {}).get('count', 0) or 0 | |
| return { | |
| "status": "success", | |
| "data": { | |
| "total_tip_in": total_tip_in, | |
| "tip_in_count": tip_in_count, | |
| "total_tip_out": total_tip_out, | |
| "tip_out_count": tip_out_count, | |
| "net_tips": total_tip_in - total_tip_out | |
| } | |
| } | |
| async def get_sales_stats(account: str, db: Session = Depends(get_db)): | |
| """ | |
| 📊 获取用户销售统计 | |
| - 销售收入:来自 earn_balance(累计销售收益) | |
| - 购买支出:来自 PURCHASE 类型的交易记录 | |
| """ | |
| from sqlalchemy import func as sql_func | |
| # 获取钱包信息(用于读取 earn_balance) | |
| wallet = db.query(Wallet).filter(Wallet.account == account).first() | |
| # 统计购买支出(PURCHASE类型交易) | |
| purchase_stats = db.query( | |
| sql_func.count(Transaction.tx_id).label('count'), | |
| sql_func.coalesce(sql_func.sum(Transaction.amount), 0).label('total') | |
| ).filter( | |
| Transaction.account == account, | |
| Transaction.tx_type == "PURCHASE" | |
| ).first() | |
| # 统计退款收入(REFUND类型,作为购买支出的反向操作) | |
| refund_stats = db.query( | |
| sql_func.count(Transaction.tx_id).label('count'), | |
| sql_func.coalesce(sql_func.sum(Transaction.amount), 0).label('total') | |
| ).filter( | |
| Transaction.account == account, | |
| Transaction.tx_type == "REFUND" | |
| ).first() | |
| total_sales = wallet.earn_balance or 0 if wallet else 0 | |
| total_purchase = abs(purchase_stats.total or 0) | |
| purchase_count = purchase_stats.count or 0 | |
| # 净销售 = 销售收入 - 购买支出 | |
| net_sales = total_sales - total_purchase | |
| return { | |
| "status": "success", | |
| "data": { | |
| "total_sales": total_sales, | |
| "sales_count": 0, # 目前无法精确统计销售笔数(没有卖家交易记录) | |
| "total_purchase": total_purchase, | |
| "purchase_count": purchase_count, | |
| "net_sales": net_sales | |
| } | |
| } | |
| async def get_purchases( | |
| account: str, | |
| current_user: str = Depends(require_auth), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| 📋 获取用户购买记录列表 | |
| - 需要 JWT 鉴权,只能查询自己的购买记录 | |
| - 返回未退款的全部购买记录,按购买时间降序排列 | |
| """ | |
| # 🔒 权限校验:只能查询自己的购买记录 | |
| if current_user != account: | |
| raise HTTPException(status_code=403, detail="只能查询自己的购买记录") | |
| # 查询未退款的购买记录,按 purchased_at 降序排列 | |
| ownerships = db.query(Ownership).filter( | |
| Ownership.account == account, | |
| Ownership.is_refunded == False | |
| ).order_by(Ownership.purchased_at.desc()).all() | |
| data = [] | |
| for o in ownerships: | |
| data.append({ | |
| "item_id": o.item_id, | |
| "purchased_at": o.purchased_at.isoformat() if o.purchased_at else None, | |
| "price_paid": o.price_paid or 0, | |
| "is_refunded": o.is_refunded | |
| }) | |
| return { | |
| "status": "success", | |
| "data": data, | |
| "total": len(data) | |
| } | |
| # ========================================== | |
| # 🔄 P7后悔模式:退款API | |
| # ========================================== | |
| async def get_purchase_status(account: str, item_id: str, db: Session = Depends(get_db)): | |
| """ | |
| 获取购买状态(用于判断是否可退款) | |
| """ | |
| ownership = db.query(Ownership).filter( | |
| Ownership.account == account, | |
| Ownership.item_id == item_id, | |
| Ownership.is_refunded == False | |
| ).first() | |
| if not ownership: | |
| return {"status": "success", "owned": False} | |
| # 读取商品,获取 allow_refund(旧数据默认 True) | |
| items_db = json_db.load_data("items.json", default_data=[]) | |
| item = next((i for i in items_db if i.get("id") == item_id), None) | |
| allow_refund = item.get("allow_refund", True) if item else True | |
| # 计算是否在退款窗口内 | |
| purchased_at = ownership.purchased_at | |
| now = datetime.datetime.utcnow() | |
| refund_deadline = purchased_at + datetime.timedelta(hours=REFUND_WINDOW_HOURS) | |
| can_refund = now < refund_deadline | |
| hours_left = max(0, (refund_deadline - now).total_seconds() / 3600) if can_refund else 0 | |
| return { | |
| "status": "success", | |
| "owned": True, | |
| "purchased_at": purchased_at.isoformat(), | |
| "price_paid": ownership.price_paid, | |
| "can_refund": can_refund, | |
| "refund_hours_left": round(hours_left, 1), | |
| "allow_refund": allow_refund | |
| } | |
| # 🔒 P0安全优化:退款每分钟最多3次 | |
| async def refund_purchase(request: Request, account: str, item_id: str, db: Session = Depends(get_db)): | |
| """ | |
| 🔄 P7后悔模式:申请退款 | |
| - 24小时内可退款 | |
| - 退款后30天内禁止再次购买 | |
| - 退款后权限回收 | |
| """ | |
| items_db = json_db.load_data("items.json", default_data=[]) | |
| item = next((i for i in items_db if i["id"] == item_id), None) | |
| if not item: | |
| raise HTTPException(status_code=404, detail="商品不存在") | |
| # 检查资源是否支持退款 | |
| if not item.get("allow_refund", True): # 默认支持退款(向后兼容) | |
| raise HTTPException(status_code=400, detail="该资源不支持退款") | |
| # 查找购买记录 | |
| ownership = db.query(Ownership).filter( | |
| Ownership.account == account, | |
| Ownership.item_id == item_id, | |
| Ownership.is_refunded == False | |
| ).first() | |
| if not ownership: | |
| raise HTTPException(status_code=404, detail="未找到购买记录") | |
| # 检查是否在退款窗口内 | |
| purchased_at = ownership.purchased_at | |
| now = datetime.datetime.utcnow() | |
| refund_deadline = purchased_at + datetime.timedelta(hours=REFUND_WINDOW_HOURS) | |
| if now >= refund_deadline: | |
| hours_passed = (now - purchased_at).total_seconds() / 3600 | |
| raise HTTPException(status_code=400, detail=f"已超过24小时退款窗口(已购买{hours_passed:.1f}小时)") | |
| refund_amount = ownership.price_paid or 0 | |
| seller_account = item.get("author") | |
| if refund_amount <= 0: | |
| raise HTTPException(status_code=400, detail="该商品为免费资源,无需退款") | |
| # 🔒 P1并发安全:使用悲观锁+异常处理+事务回滚防止并发问题 | |
| try: | |
| # 执行退款 | |
| buyer_wallet = db.query(Wallet).filter(Wallet.account == account).with_for_update().first() | |
| seller_wallet = db.query(Wallet).filter(Wallet.account == seller_account).with_for_update().first() | |
| if seller_wallet: | |
| # 先检查余额是否充足,再扣款 | |
| if seller_wallet.balance < refund_amount: | |
| raise HTTPException(status_code=400, detail="卖家余额不足,无法处理退款") | |
| seller_wallet.balance -= refund_amount # 卖家扣回 | |
| # 不修改 earn_balance(保持"累计销售收益只增不减"语义) | |
| if buyer_wallet: | |
| buyer_wallet.balance += refund_amount # 买家退款 | |
| else: | |
| buyer_wallet = Wallet(account=account, balance=refund_amount) | |
| db.add(buyer_wallet) | |
| # 标记所有权为已退款 | |
| ownership.is_refunded = True | |
| ownership.refunded_at = now | |
| # 创建退款记录(30天禁购) | |
| ban_until = now + datetime.timedelta(days=REFUND_BAN_DAYS) | |
| new_refund = Refund( | |
| account=account, | |
| item_id=item_id, | |
| amount=refund_amount, | |
| ban_until=ban_until | |
| ) | |
| db.add(new_refund) | |
| # 获取卖家用户名 | |
| users_db = json_db.load_data("users.json", default_data={}) | |
| seller_info = users_db.get(seller_account, {}) | |
| seller_user_name = seller_info.get("name", seller_account) | |
| # 记录退款交易 | |
| tx_id = f"REFUND_{int(time.time())}_{uuid.uuid4().hex[:6]}" | |
| last_tx = db.query(Transaction).filter(Transaction.account == account).order_by(Transaction.created_at.desc()).first() | |
| prev_hash = last_tx.tx_hash if last_tx else "GENESIS_HASH" | |
| tx_hash = calculate_tx_hash(tx_id, account, "REFUND", refund_amount, prev_hash) | |
| new_tx = Transaction( | |
| tx_id=tx_id, account=account, tx_type="REFUND", amount=refund_amount, | |
| related_account=seller_account, item_id=item_id, prev_hash=prev_hash, tx_hash=tx_hash, | |
| description=f"退款: {item.get('title', '未知资源')}", | |
| item_title=item.get('title', ''), | |
| item_type=item.get('type', ''), | |
| related_user_name=seller_user_name | |
| ) | |
| db.add(new_tx) | |
| db.commit() | |
| logger.info(f"REFUND | buyer={account} | seller={seller_account} | item={item_id} | amount={refund_amount} | ban_until={ban_until.isoformat()}") | |
| # 🔔 通知卖家退款 | |
| if seller_account: | |
| add_notification(seller_account, { | |
| "type": "refund", | |
| "from_user": account, | |
| "target_item_id": item_id, | |
| "target_item_title": item.get("title", ""), | |
| "content": f"您的作品《{item.get('title', '')}》已被退款" | |
| }) | |
| return { | |
| "status": "success", | |
| "message": f"退款成功,{refund_amount}积分已退还", | |
| "refund_amount": refund_amount, | |
| "ban_days": REFUND_BAN_DAYS | |
| } | |
| except HTTPException: | |
| db.rollback() | |
| raise | |
| except Exception as e: | |
| db.rollback() | |
| logger.error(f"REFUND_ERROR | buyer={account} | item={item_id} | amount={refund_amount} | error={str(e)}") | |
| raise HTTPException(status_code=500, detail="退款处理失败,请稍后重试") | |
| # ========================================== | |
| # 💰 管理员提现管理 API | |
| # ========================================== | |
| async def get_admin_withdrawals( | |
| status: str = None, | |
| current_user: str = Depends(require_auth), | |
| db: Session = Depends(get_db) | |
| ): | |
| """管理员查看提现记录列表""" | |
| if not is_admin(current_user): | |
| raise HTTPException(status_code=403, detail="需要管理员权限") | |
| query = db.query(Transaction).filter(Transaction.tx_type == "WITHDRAW") | |
| if status: | |
| # 当查询 completed 状态时,同时包含 withdraw_status == "completed" 和 withdraw_status IS NULL(兼容旧数据) | |
| if status == "completed": | |
| from sqlalchemy import or_ | |
| query = query.filter(or_(Transaction.withdraw_status == status, Transaction.withdraw_status.is_(None))) | |
| else: | |
| query = query.filter(Transaction.withdraw_status == status) | |
| records = query.order_by(Transaction.created_at.desc()).all() | |
| return { | |
| "status": "success", | |
| "data": [{ | |
| "tx_id": r.tx_id, | |
| "account": r.account, | |
| "amount": r.amount, | |
| "alipay_account": r.alipay_account if r.alipay_account else "未记录", | |
| "real_name": r.real_name if r.real_name else "未记录", | |
| "withdraw_status": r.withdraw_status if r.withdraw_status else "completed", | |
| "payment_order_id": r.payment_order_id, | |
| "created_at": r.created_at.isoformat() if r.created_at else None | |
| } for r in records] | |
| } | |
| class CompleteWithdrawRequest(BaseModel): | |
| payment_order_id: str | |
| async def complete_withdrawal( | |
| tx_id: str, | |
| req: CompleteWithdrawRequest, | |
| current_user: str = Depends(require_auth), | |
| db: Session = Depends(get_db) | |
| ): | |
| """管理员确认打款完成""" | |
| if not is_admin(current_user): | |
| raise HTTPException(status_code=403, detail="需要管理员权限") | |
| tx = db.query(Transaction).filter(Transaction.tx_id == tx_id, Transaction.tx_type == "WITHDRAW").first() | |
| if not tx: | |
| raise HTTPException(status_code=404, detail="提现记录不存在") | |
| if tx.withdraw_status == "completed": | |
| raise HTTPException(status_code=400, detail="该提现已完成打款") | |
| # 更新提现状态 | |
| tx.withdraw_status = "completed" | |
| tx.payment_order_id = req.payment_order_id | |
| # 解冻余额(使用 net_amount 字段,确保解冻金额与冻结时一致) | |
| wallet = db.query(Wallet).filter(Wallet.account == tx.account).first() | |
| if wallet: | |
| if tx.net_amount is not None: | |
| net_amount = tx.net_amount | |
| else: | |
| # 历史数据兼容:查找关联手续费交易 | |
| fee_tx = db.query(Transaction).filter( | |
| Transaction.account == tx.account, | |
| Transaction.tx_type == "WITHDRAW_FEE", | |
| Transaction.created_at >= tx.created_at - datetime.timedelta(seconds=5), | |
| Transaction.created_at <= tx.created_at + datetime.timedelta(seconds=5) | |
| ).first() | |
| fee_amount = abs(fee_tx.amount) if fee_tx else 0 | |
| net_amount = abs(tx.amount) - fee_amount | |
| logger.warning(f"WITHDRAW net_amount为NULL,已计算恢复: tx_id={tx.tx_id}, net={net_amount}, fee={fee_amount}") | |
| wallet.frozen_balance = max(0, wallet.frozen_balance - net_amount) | |
| db.commit() | |
| # 📝 P2优化:提现完成审计日志 | |
| logger.info(f"WITHDRAW_COMPLETED | admin={current_user} | tx_id={tx_id} | account={tx.account} | amount={tx.amount} | net_amount={tx.net_amount} | order_id={req.payment_order_id}") | |
| return {"status": "success", "message": f"提现 {tx_id} 已确认打款完成"} |