Spaces:
Running
Running
| # router_open_api.py | |
| # ========================================== | |
| # 🌐 通用开放 API 模块(供第三方插件调用) | |
| # ========================================== | |
| # 作用:对外提供受 Plugin Key 保护的通用接口, | |
| # 供 NodeCraft AI 等第三方插件复用 ComfyUI-Ranking 账号体系。 | |
| # 关联文件: | |
| # - 安全认证.py (verify_token_with_fallback, require_auth) | |
| # - database_sql.py (get_db, SessionLocal) | |
| # - models_sql.py (Wallet, Transaction) | |
| # - 数据库连接.py (users.json 读写) | |
| # 安全机制: | |
| # - X-Plugin-Key Header 鉴权 | |
| # - 基于 plugin_key 的内存限流(每分钟重置) | |
| # - 所有扣款/退款操作记录到 Transaction 流水(含哈希链) | |
| # ========================================== | |
| import os | |
| import json | |
| import time | |
| import uuid | |
| import datetime | |
| import hashlib | |
| import logging | |
| import threading | |
| from datetime import date | |
| from typing import Optional | |
| from fastapi import APIRouter, Depends, Header, HTTPException, Query | |
| from fastapi.responses import JSONResponse | |
| from sqlalchemy.orm import Session | |
| from pydantic import BaseModel, Field | |
| from database_sql import get_db, SessionLocal | |
| from models_sql import Wallet, Transaction, RegisteredPlugin | |
| from 安全认证 import verify_token_with_fallback | |
| import 数据库连接 as json_db | |
| logger = logging.getLogger("ComfyUI-Ranking.OpenAPI") | |
| router = APIRouter(prefix="/api/open", tags=["open-api"]) | |
| # ═══════════════ 插件注册表(从数据库动态加载) ═══════════════ | |
| # 反向索引:plugin_key -> {id, key, name, rate_limit, permissions, max_deduct_per_tx, max_deduct_per_day} | |
| _PLUGIN_KEY_INDEX: dict = {} | |
| def load_plugins_from_db(): | |
| """启动时从数据库加载所有启用的插件到内存索引""" | |
| global _PLUGIN_KEY_INDEX | |
| db = SessionLocal() | |
| try: | |
| # 首次启动时 seed 默认插件 | |
| if not db.query(RegisteredPlugin).first(): | |
| default_key = os.environ.get("NCA_PLUGIN_KEY", "nca_2026_nodecraft_ai_plugin") | |
| default_plugin = RegisteredPlugin( | |
| plugin_id="nodecraft_ai", | |
| plugin_key=default_key, | |
| name="NodeCraft AI", | |
| enabled=True, | |
| rate_limit=100, | |
| max_deduct_per_tx=100, | |
| max_deduct_per_day=1000, | |
| permissions='["verify","balance","deduct","refund"]', | |
| created_by="system", | |
| ) | |
| db.add(default_plugin) | |
| db.commit() | |
| logger.info("✅ 已 seed 默认插件: NodeCraft AI") | |
| plugins = db.query(RegisteredPlugin).filter(RegisteredPlugin.enabled == True).all() | |
| # 构建内存索引 | |
| new_index = {} | |
| for p in plugins: | |
| try: | |
| perms = json.loads(p.permissions) if p.permissions else ["verify", "balance", "deduct", "refund"] | |
| except Exception: | |
| perms = ["verify", "balance", "deduct", "refund"] | |
| new_index[p.plugin_key] = { | |
| "id": p.plugin_id, | |
| "key": p.plugin_key, | |
| "name": p.name, | |
| "rate_limit": p.rate_limit or 100, | |
| "permissions": perms, | |
| "max_deduct_per_tx": p.max_deduct_per_tx or 100, | |
| "max_deduct_per_day": p.max_deduct_per_day or 1000, | |
| } | |
| _PLUGIN_KEY_INDEX = new_index | |
| logger.info(f"✅ 已加载 {len(new_index)} 个插件到注册表") | |
| except Exception as e: | |
| logger.error(f"❌ 加载插件注册表失败: {e}") | |
| # 如果加载失败且索引为空,使用环境变量 fallback | |
| if not _PLUGIN_KEY_INDEX: | |
| fallback_key = os.environ.get("NCA_PLUGIN_KEY", "nca_2026_nodecraft_ai_plugin") | |
| _PLUGIN_KEY_INDEX = { | |
| fallback_key: { | |
| "id": "nodecraft_ai", | |
| "key": fallback_key, | |
| "name": "NodeCraft AI", | |
| "rate_limit": 100, | |
| "permissions": ["verify", "balance", "deduct", "refund"], | |
| "max_deduct_per_tx": 100, | |
| "max_deduct_per_day": 1000, | |
| } | |
| } | |
| logger.warning("⚠️ 使用 fallback 插件配置") | |
| finally: | |
| db.close() | |
| def reload_plugin_registry(): | |
| """热更新插件注册表(管理操作后调用)""" | |
| load_plugins_from_db() | |
| # ========================================== | |
| # 💰 日扣款总额限制:内存计数器(每自然日重置) | |
| # ========================================== | |
| _daily_deduct_lock = threading.Lock() | |
| _daily_deduct_counters: dict = {} # {plugin_id: {"date": "2026-06-17", "total": 500}} | |
| def _peek_daily_used(plugin_id: str) -> int: | |
| """读取插件当日已使用的扣款总额(不修改计数)。跨日自动归零。""" | |
| today = str(date.today()) | |
| with _daily_deduct_lock: | |
| counter = _daily_deduct_counters.get(plugin_id) | |
| if not counter or counter.get("date") != today: | |
| return 0 | |
| return int(counter.get("total", 0)) | |
| def _commit_daily_used(plugin_id: str, amount: int) -> int: | |
| """扣款成功后将本次金额累加到当日计数,返回累加后的总额。""" | |
| today = str(date.today()) | |
| with _daily_deduct_lock: | |
| counter = _daily_deduct_counters.get(plugin_id) | |
| if not counter or counter.get("date") != today: | |
| counter = {"date": today, "total": 0} | |
| counter["total"] = int(counter.get("total", 0)) + int(amount) | |
| _daily_deduct_counters[plugin_id] = counter | |
| return counter["total"] | |
| def verify_plugin_key(plugin_key: str) -> dict: | |
| """ | |
| 校验 Plugin Key 并返回插件配置。 | |
| 参数: | |
| plugin_key: 请求头 X-Plugin-Key 的值 | |
| 返回: | |
| 匹配到的插件配置字典(包含 id 字段) | |
| 异常: | |
| 401: 未提供 Plugin Key | |
| 403: Plugin Key 无效或未注册 | |
| """ | |
| if not plugin_key: | |
| raise HTTPException(status_code=401, detail="缺少 X-Plugin-Key Header") | |
| info = _PLUGIN_KEY_INDEX.get(plugin_key) | |
| if not info: | |
| logger.warning(f"OPEN_API_INVALID_KEY | key_prefix={plugin_key[:8]}") | |
| raise HTTPException(status_code=403, detail="Plugin Key 无效或未注册") | |
| return info | |
| # ========================================== | |
| # 🚦 限流:基于 plugin_id 的内存计数器(每分钟自动重置) | |
| # ========================================== | |
| _rate_lock = threading.Lock() | |
| _rate_buckets: dict = {} # {plugin_id: {"window_start": int, "count": int}} | |
| def _check_rate_limit(plugin_info: dict, permission: str) -> None: | |
| """ | |
| 校验插件权限并执行限流。 | |
| - 权限不足 -> 403 | |
| - 触发限流 -> 429 | |
| """ | |
| plugin_id = plugin_info["id"] | |
| rate_limit = int(plugin_info.get("rate_limit", 60)) | |
| perms = plugin_info.get("permissions", []) | |
| if permission not in perms: | |
| raise HTTPException( | |
| status_code=403, | |
| detail=f"插件 {plugin_id} 无权限调用 {permission} 接口", | |
| ) | |
| now = int(time.time()) | |
| window_start = now - (now % 60) | |
| with _rate_lock: | |
| bucket = _rate_buckets.get(plugin_id) | |
| if not bucket or bucket["window_start"] != window_start: | |
| bucket = {"window_start": window_start, "count": 0} | |
| _rate_buckets[plugin_id] = bucket | |
| if bucket["count"] >= rate_limit: | |
| raise HTTPException( | |
| status_code=429, | |
| detail=f"请求过于频繁,限流 {rate_limit}/分钟,请稍后再试", | |
| ) | |
| bucket["count"] += 1 | |
| def _require_plugin(plugin_key: Optional[str], permission: str) -> dict: | |
| """统一入口:校验 Plugin Key + 权限 + 限流""" | |
| info = verify_plugin_key(plugin_key or "") | |
| _check_rate_limit(info, permission) | |
| return info | |
| def _resolve_user_from_token(token: str) -> str: | |
| """从 Token 中解析出账号;失败时抛 401""" | |
| if not token: | |
| raise HTTPException(status_code=401, detail="缺少用户 Token") | |
| is_valid, account, error_msg = verify_token_with_fallback(token) | |
| if not is_valid or not account: | |
| raise HTTPException(status_code=401, detail=error_msg or "Token 无效") | |
| return account | |
| def _calc_tx_hash(tx_id: str, account: str, tx_type: str, amount: int, prev_hash: str) -> str: | |
| """计算交易哈希(与 router_wallet.calculate_tx_hash 算法一致)""" | |
| data = f"{tx_id}{account}{tx_type}{amount}{prev_hash}" | |
| return hashlib.sha256(data.encode()).hexdigest() | |
| # ========================================== | |
| # 📦 请求模型 | |
| # ========================================== | |
| class VerifyTokenRequest(BaseModel): | |
| token: str = Field(..., description="待验证的用户 JWT Token") | |
| class DeductRequest(BaseModel): | |
| token: str = Field(..., description="用户 JWT Token") | |
| amount: int = Field(..., gt=0, description="扣款金额(正整数)") | |
| reason: str = Field(..., description="业务标识/扣款原因") | |
| reference_id: Optional[str] = Field(None, description="业务侧参考 ID") | |
| class RefundRequest(BaseModel): | |
| token: str = Field(..., description="用户 JWT Token") | |
| transaction_id: str = Field(..., description="原扣款交易 ID") | |
| amount: int = Field(..., gt=0, description="退款金额(正整数)") | |
| # ========================================== | |
| # 1️⃣ Token 验证:POST /api/open/verify-token | |
| # ========================================== | |
| async def open_verify_token( | |
| req: VerifyTokenRequest, | |
| x_plugin_key: Optional[str] = Header(None, alias="X-Plugin-Key"), | |
| ): | |
| """ | |
| 供第三方插件验证用户 Token 有效性。 | |
| 成功返回账号与基础资料(昵称、头像)。 | |
| """ | |
| plugin_info = _require_plugin(x_plugin_key, "verify") | |
| is_valid, account, error_msg = verify_token_with_fallback(req.token) | |
| if not is_valid or not account: | |
| logger.info( | |
| f"OPEN_VERIFY_TOKEN_FAIL | plugin={plugin_info['id']} | reason={error_msg}" | |
| ) | |
| return {"valid": False, "error": error_msg or "Token 无效"} | |
| users_db = json_db.load_data("users.json", default_data={}) | |
| user = users_db.get(account, {}) or {} | |
| logger.info(f"OPEN_VERIFY_TOKEN | plugin={plugin_info['id']} | account={account}") | |
| return { | |
| "valid": True, | |
| "account": account, | |
| "name": user.get("name", account), | |
| "avatar": user.get("avatarDataUrl", "") or user.get("avatar", ""), | |
| } | |
| # ========================================== | |
| # 2️⃣ 余额查询:GET /api/open/balance | |
| # ========================================== | |
| async def open_get_balance( | |
| authorization: Optional[str] = Header(None, alias="Authorization"), | |
| x_plugin_key: Optional[str] = Header(None, alias="X-Plugin-Key"), | |
| db: Session = Depends(get_db), | |
| ): | |
| """ | |
| 查询用户钱包余额。 | |
| 认证:Authorization: Bearer <user_token> + X-Plugin-Key | |
| """ | |
| plugin_info = _require_plugin(x_plugin_key, "balance") | |
| if not authorization: | |
| raise HTTPException(status_code=401, detail="缺少 Authorization Header") | |
| parts = authorization.split(" ", 1) | |
| if len(parts) != 2 or parts[0].lower() != "bearer": | |
| raise HTTPException(status_code=401, detail="Authorization 格式错误,请使用 Bearer Token") | |
| account = _resolve_user_from_token(parts[1]) | |
| wallet = db.query(Wallet).filter(Wallet.account == account).first() | |
| if not wallet: | |
| result = { | |
| "account": account, | |
| "balance": 0, | |
| "earn_balance": 0, | |
| "tip_balance": 0, | |
| "task_balance": 0, | |
| "frozen_balance": 0, | |
| } | |
| else: | |
| result = { | |
| "account": account, | |
| "balance": int(wallet.balance or 0), | |
| "earn_balance": int(wallet.earn_balance or 0), | |
| "tip_balance": int(wallet.tip_balance or 0), | |
| "task_balance": int(wallet.task_balance or 0), | |
| "frozen_balance": int(wallet.frozen_balance or 0), | |
| } | |
| logger.info( | |
| f"OPEN_BALANCE | plugin={plugin_info['id']} | account={account} | balance={result['balance']}" | |
| ) | |
| return result | |
| # ========================================== | |
| # 3️⃣ 通用扣款:POST /api/open/deduct | |
| # ========================================== | |
| async def open_deduct( | |
| req: DeductRequest, | |
| x_plugin_key: Optional[str] = Header(None, alias="X-Plugin-Key"), | |
| db: Session = Depends(get_db), | |
| ): | |
| """ | |
| 通用扣款接口(从 wallet.balance 扣减)。 | |
| 流程: | |
| 1. 校验 Plugin Key + 权限 + 限流 | |
| 2. 解析 Token -> account | |
| 3. 悲观锁锁定钱包,检查余额 | |
| 4. 扣款 + 写入 Transaction(含哈希链) | |
| """ | |
| plugin_info = _require_plugin(x_plugin_key, "deduct") | |
| account = _resolve_user_from_token(req.token) | |
| amount = int(req.amount) | |
| # 🛡️ 单笔扣款上限校验(在余额检查之前快速拒绝) | |
| max_per_tx = int(plugin_info.get("max_deduct_per_tx", 100)) | |
| if amount > max_per_tx: | |
| logger.warning( | |
| f"OPEN_DEDUCT_TX_LIMIT | plugin={plugin_info['id']} | account={account} | " | |
| f"amount={amount} | max={max_per_tx}" | |
| ) | |
| return JSONResponse(status_code=403, content={ | |
| "error": "AMOUNT_EXCEEDS_LIMIT", | |
| "message": f"单笔扣款不能超过{max_per_tx}积分", | |
| "max": max_per_tx, | |
| }) | |
| # 🛡️ 日扣款总额校验(仅检查不累加,扣款成功后再累加) | |
| max_daily = int(plugin_info.get("max_deduct_per_day", 1000)) | |
| used_today = _peek_daily_used(plugin_info["id"]) | |
| if used_today + amount > max_daily: | |
| logger.warning( | |
| f"OPEN_DEDUCT_DAILY_LIMIT | plugin={plugin_info['id']} | account={account} | " | |
| f"amount={amount} | used={used_today} | limit={max_daily}" | |
| ) | |
| return JSONResponse(status_code=403, content={ | |
| "error": "DAILY_LIMIT_EXCEEDED", | |
| "message": "今日扣款已达上限", | |
| "limit": max_daily, | |
| "used": used_today, | |
| }) | |
| try: | |
| wallet = ( | |
| db.query(Wallet) | |
| .filter(Wallet.account == account) | |
| .with_for_update() | |
| .first() | |
| ) | |
| if not wallet or int(wallet.balance or 0) < amount: | |
| raise HTTPException(status_code=402, detail="余额不足") | |
| wallet.balance = int(wallet.balance or 0) - amount | |
| tx_id = f"OPEN_DEDUCT_{int(time.time())}_{uuid.uuid4().hex[:8]}" | |
| last_tx = ( | |
| db.query(Transaction) | |
| .filter(Transaction.account == account) | |
| .order_by(Transaction.created_at.desc()) | |
| .first() | |
| ) | |
| prev_hash = last_tx.tx_hash if last_tx else "GENESIS_HASH" | |
| tx_hash = _calc_tx_hash(tx_id, account, "OPEN_DEDUCT", -amount, prev_hash) | |
| new_tx = Transaction( | |
| tx_id=tx_id, | |
| account=account, | |
| tx_type="OPEN_DEDUCT", | |
| amount=-amount, | |
| related_account=plugin_info["id"], | |
| item_id=req.reference_id, | |
| prev_hash=prev_hash, | |
| tx_hash=tx_hash, | |
| description=f"[{plugin_info['name']}] {req.reason}", | |
| source_plugin=plugin_info["id"], # 🌐 来源插件标识,用于按工具结构化追踪 | |
| ) | |
| db.add(new_tx) | |
| db.commit() | |
| db.refresh(wallet) | |
| except HTTPException: | |
| db.rollback() | |
| raise | |
| except Exception as e: | |
| db.rollback() | |
| logger.error( | |
| f"OPEN_DEDUCT_FAIL | plugin={plugin_info['id']} | account={account} | err={e}" | |
| ) | |
| raise HTTPException(status_code=500, detail=f"扣款失败: {e}") | |
| # ✅ 扣款成功后再累加日计数器,避免余额不足等失败场景占用额度 | |
| daily_total = _commit_daily_used(plugin_info["id"], amount) | |
| logger.info( | |
| f"OPEN_DEDUCT | plugin={plugin_info['id']} | account={account} | " | |
| f"amount={amount} | tx={tx_id} | reason={req.reason} | ref={req.reference_id} | " | |
| f"daily_used={daily_total}/{max_daily}" | |
| ) | |
| return { | |
| "success": True, | |
| "remaining_balance": int(wallet.balance or 0), | |
| "transaction_id": tx_id, | |
| } | |
| # ========================================== | |
| # 4️⃣ 退款:POST /api/open/refund | |
| # ========================================== | |
| async def open_refund( | |
| req: RefundRequest, | |
| x_plugin_key: Optional[str] = Header(None, alias="X-Plugin-Key"), | |
| db: Session = Depends(get_db), | |
| ): | |
| """ | |
| 退款接口:将之前 OPEN_DEDUCT 的金额(部分或全额)退还到用户钱包。 | |
| 校验: | |
| - 原交易必须存在且 tx_type == OPEN_DEDUCT | |
| - 原交易归属当前 token 用户 | |
| - 原交易由当前插件发起(related_account == plugin_id) | |
| - 累计退款金额 ≤ 原扣款金额 | |
| """ | |
| plugin_info = _require_plugin(x_plugin_key, "refund") | |
| account = _resolve_user_from_token(req.token) | |
| refund_amount = int(req.amount) | |
| original_tx = ( | |
| db.query(Transaction).filter(Transaction.tx_id == req.transaction_id).first() | |
| ) | |
| if not original_tx: | |
| raise HTTPException(status_code=404, detail="原交易不存在") | |
| if original_tx.account != account: | |
| raise HTTPException(status_code=403, detail="无权对该交易执行退款") | |
| if original_tx.tx_type != "OPEN_DEDUCT": | |
| raise HTTPException(status_code=400, detail="该交易类型不支持开放退款") | |
| if (original_tx.related_account or "") != plugin_info["id"]: | |
| raise HTTPException(status_code=403, detail="不允许跨插件退款") | |
| original_amount = abs(int(original_tx.amount or 0)) | |
| if refund_amount > original_amount: | |
| raise HTTPException(status_code=400, detail="退款金额超过原扣款金额") | |
| # 累计已退金额校验(防止重复退款超额) | |
| refunded_records = ( | |
| db.query(Transaction) | |
| .filter( | |
| Transaction.tx_type == "OPEN_REFUND", | |
| Transaction.related_account == plugin_info["id"], | |
| Transaction.item_id == req.transaction_id, | |
| ) | |
| .all() | |
| ) | |
| already_refunded = sum(abs(int(t.amount or 0)) for t in refunded_records) | |
| if already_refunded + refund_amount > original_amount: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"退款金额超过可退余额(已退 {already_refunded}/{original_amount})", | |
| ) | |
| try: | |
| wallet = ( | |
| db.query(Wallet) | |
| .filter(Wallet.account == account) | |
| .with_for_update() | |
| .first() | |
| ) | |
| if not wallet: | |
| wallet = Wallet( | |
| account=account, | |
| balance=0, | |
| earn_balance=0, | |
| tip_balance=0, | |
| frozen_balance=0, | |
| ) | |
| db.add(wallet) | |
| db.flush() | |
| wallet.balance = int(wallet.balance or 0) + refund_amount | |
| tx_id = f"OPEN_REFUND_{int(time.time())}_{uuid.uuid4().hex[:8]}" | |
| last_tx = ( | |
| db.query(Transaction) | |
| .filter(Transaction.account == account) | |
| .order_by(Transaction.created_at.desc()) | |
| .first() | |
| ) | |
| prev_hash = last_tx.tx_hash if last_tx else "GENESIS_HASH" | |
| tx_hash = _calc_tx_hash(tx_id, account, "OPEN_REFUND", refund_amount, prev_hash) | |
| new_tx = Transaction( | |
| tx_id=tx_id, | |
| account=account, | |
| tx_type="OPEN_REFUND", | |
| amount=refund_amount, | |
| related_account=plugin_info["id"], | |
| item_id=req.transaction_id, # 关联原扣款交易 | |
| prev_hash=prev_hash, | |
| tx_hash=tx_hash, | |
| description=f"[{plugin_info['name']}] 退款 {req.transaction_id}", | |
| source_plugin=plugin_info["id"], # 🌐 来源插件标识 | |
| ) | |
| db.add(new_tx) | |
| db.commit() | |
| db.refresh(wallet) | |
| except HTTPException: | |
| db.rollback() | |
| raise | |
| except Exception as e: | |
| db.rollback() | |
| logger.error( | |
| f"OPEN_REFUND_FAIL | plugin={plugin_info['id']} | account={account} | err={e}" | |
| ) | |
| raise HTTPException(status_code=500, detail=f"退款失败: {e}") | |
| logger.info( | |
| f"OPEN_REFUND | plugin={plugin_info['id']} | account={account} | " | |
| f"amount={refund_amount} | tx={tx_id} | original={req.transaction_id}" | |
| ) | |
| return { | |
| "success": True, | |
| "remaining_balance": int(wallet.balance or 0), | |
| "transaction_id": tx_id, | |
| } | |
| # ========================================== | |
| # 5️⃣ 消费记录查询:GET /api/open/transactions | |
| # ========================================== | |
| async def open_list_transactions( | |
| authorization: Optional[str] = Header(None, alias="Authorization"), | |
| x_plugin_key: Optional[str] = Header(None, alias="X-Plugin-Key"), | |
| page: int = Query(1, ge=1, description="页码(从 1 开始)"), | |
| page_size: int = Query(20, ge=1, le=100, description="每页条数(1-100)"), | |
| start_time: Optional[int] = Query(None, description="起始时间(unix 秒)"), | |
| end_time: Optional[int] = Query(None, description="结束时间(unix 秒)"), | |
| tx_type: Optional[str] = Query(None, description="交易类型,如 OPEN_DEDUCT / OPEN_REFUND"), | |
| db: Session = Depends(get_db), | |
| ): | |
| """ | |
| 查询当前用户由当前插件产生的消费/退款记录。 | |
| 认证:Authorization: Bearer <user_token> + X-Plugin-Key | |
| 安全:仅返回 source_plugin == 当前 plugin_id 的记录,防止跨插件读取。 | |
| """ | |
| plugin_info = _require_plugin(x_plugin_key, "verify") | |
| if not authorization: | |
| raise HTTPException(status_code=401, detail="缺少 Authorization Header") | |
| parts = authorization.split(" ", 1) | |
| if len(parts) != 2 or parts[0].lower() != "bearer": | |
| raise HTTPException(status_code=401, detail="Authorization 格式错误,请使用 Bearer Token") | |
| account = _resolve_user_from_token(parts[1]) | |
| plugin_id = plugin_info["id"] | |
| # 构造查询:按 source_plugin 过滤;兼容历史 NULL 数据走 related_account 兜底 | |
| q = db.query(Transaction).filter( | |
| Transaction.account == account, | |
| ((Transaction.source_plugin == plugin_id) | |
| | ((Transaction.source_plugin.is_(None)) & (Transaction.related_account == plugin_id))), | |
| ) | |
| if tx_type: | |
| q = q.filter(Transaction.tx_type == tx_type) | |
| if start_time is not None: | |
| q = q.filter(Transaction.created_at >= datetime.datetime.utcfromtimestamp(int(start_time))) | |
| if end_time is not None: | |
| q = q.filter(Transaction.created_at <= datetime.datetime.utcfromtimestamp(int(end_time))) | |
| total = q.count() | |
| rows = ( | |
| q.order_by(Transaction.created_at.desc()) | |
| .offset((page - 1) * page_size) | |
| .limit(page_size) | |
| .all() | |
| ) | |
| items = [] | |
| for t in rows: | |
| created_ts = int(t.created_at.timestamp()) if t.created_at else 0 | |
| # reason 从 description 中解析(去掉 [插件名] 前缀),兼容旧记录 | |
| reason = (t.description or "").replace(f"[{plugin_info['name']}] ", "").strip() or None | |
| items.append({ | |
| "id": t.tx_id, | |
| "type": t.tx_type, | |
| "amount": int(t.amount or 0), | |
| "reason": reason, | |
| "reference_id": t.item_id, | |
| "created_at": created_ts, | |
| "description": t.description or "", | |
| }) | |
| logger.info( | |
| f"OPEN_LIST_TX | plugin={plugin_id} | account={account} | " | |
| f"page={page}/{page_size} | total={total}" | |
| ) | |
| return { | |
| "success": True, | |
| "transactions": items, | |
| "total": total, | |
| "page": page, | |
| "page_size": page_size, | |
| } | |
| # ========================================== | |
| # 6️⃣ 消费汇总:GET /api/open/usage-summary | |
| # ========================================== | |
| async def open_usage_summary( | |
| x_plugin_key: Optional[str] = Header(None, alias="X-Plugin-Key"), | |
| account: Optional[str] = Query(None, description="指定用户账号,不传则统计所有用户"), | |
| period: str = Query("daily", description="聚合粒度:daily / monthly / total"), | |
| start_time: Optional[int] = Query(None, description="起始时间(unix 秒)"), | |
| end_time: Optional[int] = Query(None, description="结束时间(unix 秒)"), | |
| db: Session = Depends(get_db), | |
| ): | |
| """ | |
| 本插件维度的消费汇总统计。 | |
| 返回:总扣款 / 总退款 / 总笔数 / 净消费 + 按 period 分桶的明细。 | |
| 安全:仅统计 source_plugin == 当前 plugin_id 的记录。 | |
| """ | |
| plugin_info = _require_plugin(x_plugin_key, "verify") | |
| plugin_id = plugin_info["id"] | |
| period = (period or "daily").lower() | |
| if period not in ("daily", "monthly", "total"): | |
| raise HTTPException(status_code=400, detail="period 仅支持 daily / monthly / total") | |
| base = db.query(Transaction).filter( | |
| ((Transaction.source_plugin == plugin_id) | |
| | ((Transaction.source_plugin.is_(None)) & (Transaction.related_account == plugin_id))), | |
| Transaction.tx_type.in_(["OPEN_DEDUCT", "OPEN_REFUND"]), | |
| ) | |
| if account: | |
| base = base.filter(Transaction.account == account) | |
| if start_time is not None: | |
| base = base.filter(Transaction.created_at >= datetime.datetime.utcfromtimestamp(int(start_time))) | |
| if end_time is not None: | |
| base = base.filter(Transaction.created_at <= datetime.datetime.utcfromtimestamp(int(end_time))) | |
| rows = base.all() | |
| total_deduct = 0 | |
| total_refund = 0 | |
| total_count = len(rows) | |
| bucket: dict = {} # key -> {"deduct":int,"refund":int,"count":int} | |
| for t in rows: | |
| amt = int(t.amount or 0) | |
| if t.tx_type == "OPEN_DEDUCT": | |
| total_deduct += abs(amt) | |
| elif t.tx_type == "OPEN_REFUND": | |
| total_refund += abs(amt) | |
| if period == "total": | |
| continue | |
| if not t.created_at: | |
| continue | |
| if period == "daily": | |
| key = t.created_at.strftime("%Y-%m-%d") | |
| else: # monthly | |
| key = t.created_at.strftime("%Y-%m") | |
| b = bucket.setdefault(key, {"deduct": 0, "refund": 0, "count": 0}) | |
| if t.tx_type == "OPEN_DEDUCT": | |
| b["deduct"] += abs(amt) | |
| elif t.tx_type == "OPEN_REFUND": | |
| b["refund"] += abs(amt) | |
| b["count"] += 1 | |
| breakdown = [] | |
| if period != "total": | |
| for key in sorted(bucket.keys(), reverse=True): | |
| v = bucket[key] | |
| label = "date" if period == "daily" else "month" | |
| breakdown.append({ | |
| label: key, | |
| "deduct": v["deduct"], | |
| "refund": v["refund"], | |
| "count": v["count"], | |
| }) | |
| logger.info( | |
| f"OPEN_USAGE_SUMMARY | plugin={plugin_id} | account={account or '*'} | " | |
| f"period={period} | tx_count={total_count}" | |
| ) | |
| return { | |
| "success": True, | |
| "plugin_id": plugin_id, | |
| "plugin_name": plugin_info.get("name", plugin_id), | |
| "summary": { | |
| "total_deduct": total_deduct, | |
| "total_refund": total_refund, | |
| "total_transactions": total_count, | |
| "net_consumption": total_deduct - total_refund, | |
| }, | |
| "breakdown": breakdown, | |
| } | |