ComfyUI-Ranking-API / router_open_api.py
ZHIWEI666's picture
支持第三方扣费
ee88a99 verified
Raw
History Blame Contribute Delete
27.6 kB
# router_open_api.py
# ==========================================
# 🌐 通用开放 API 模块(供第三方插件调用)
# ==========================================
# 作用:对外提供受 Plugin Key 保护的通用接口,
# 供 NodeCraft AI 等第三方插件复用 ComfyUI-Ranking 账号体系。
# 关联文件:
# - 安全认证.py (verify_token_with_fallback, require_auth)
# - database_sql.py (get_db, SessionLocal)
# - models_sql.py (Wallet, Transaction)
# - 数据库连接.py (users.json 读写)
# 安全机制:
# - X-Plugin-Key Header 鉴权
# - 基于 plugin_key 的内存限流(每分钟重置)
# - 所有扣款/退款操作记录到 Transaction 流水(含哈希链)
# ==========================================
import os
import json
import time
import uuid
import datetime
import hashlib
import logging
import threading
from datetime import date
from typing import Optional
from fastapi import APIRouter, Depends, Header, HTTPException, Query
from fastapi.responses import JSONResponse
from sqlalchemy.orm import Session
from pydantic import BaseModel, Field
from database_sql import get_db, SessionLocal
from models_sql import Wallet, Transaction, RegisteredPlugin
from 安全认证 import verify_token_with_fallback
import 数据库连接 as json_db
logger = logging.getLogger("ComfyUI-Ranking.OpenAPI")
router = APIRouter(prefix="/api/open", tags=["open-api"])
# ═══════════════ 插件注册表(从数据库动态加载) ═══════════════
# 反向索引:plugin_key -> {id, key, name, rate_limit, permissions, max_deduct_per_tx, max_deduct_per_day}
_PLUGIN_KEY_INDEX: dict = {}
def load_plugins_from_db():
"""启动时从数据库加载所有启用的插件到内存索引"""
global _PLUGIN_KEY_INDEX
db = SessionLocal()
try:
# 首次启动时 seed 默认插件
if not db.query(RegisteredPlugin).first():
default_key = os.environ.get("NCA_PLUGIN_KEY", "nca_2026_nodecraft_ai_plugin")
default_plugin = RegisteredPlugin(
plugin_id="nodecraft_ai",
plugin_key=default_key,
name="NodeCraft AI",
enabled=True,
rate_limit=100,
max_deduct_per_tx=100,
max_deduct_per_day=1000,
permissions='["verify","balance","deduct","refund"]',
created_by="system",
)
db.add(default_plugin)
db.commit()
logger.info("✅ 已 seed 默认插件: NodeCraft AI")
plugins = db.query(RegisteredPlugin).filter(RegisteredPlugin.enabled == True).all()
# 构建内存索引
new_index = {}
for p in plugins:
try:
perms = json.loads(p.permissions) if p.permissions else ["verify", "balance", "deduct", "refund"]
except Exception:
perms = ["verify", "balance", "deduct", "refund"]
new_index[p.plugin_key] = {
"id": p.plugin_id,
"key": p.plugin_key,
"name": p.name,
"rate_limit": p.rate_limit or 100,
"permissions": perms,
"max_deduct_per_tx": p.max_deduct_per_tx or 100,
"max_deduct_per_day": p.max_deduct_per_day or 1000,
}
_PLUGIN_KEY_INDEX = new_index
logger.info(f"✅ 已加载 {len(new_index)} 个插件到注册表")
except Exception as e:
logger.error(f"❌ 加载插件注册表失败: {e}")
# 如果加载失败且索引为空,使用环境变量 fallback
if not _PLUGIN_KEY_INDEX:
fallback_key = os.environ.get("NCA_PLUGIN_KEY", "nca_2026_nodecraft_ai_plugin")
_PLUGIN_KEY_INDEX = {
fallback_key: {
"id": "nodecraft_ai",
"key": fallback_key,
"name": "NodeCraft AI",
"rate_limit": 100,
"permissions": ["verify", "balance", "deduct", "refund"],
"max_deduct_per_tx": 100,
"max_deduct_per_day": 1000,
}
}
logger.warning("⚠️ 使用 fallback 插件配置")
finally:
db.close()
def reload_plugin_registry():
"""热更新插件注册表(管理操作后调用)"""
load_plugins_from_db()
# ==========================================
# 💰 日扣款总额限制:内存计数器(每自然日重置)
# ==========================================
_daily_deduct_lock = threading.Lock()
_daily_deduct_counters: dict = {} # {plugin_id: {"date": "2026-06-17", "total": 500}}
def _peek_daily_used(plugin_id: str) -> int:
"""读取插件当日已使用的扣款总额(不修改计数)。跨日自动归零。"""
today = str(date.today())
with _daily_deduct_lock:
counter = _daily_deduct_counters.get(plugin_id)
if not counter or counter.get("date") != today:
return 0
return int(counter.get("total", 0))
def _commit_daily_used(plugin_id: str, amount: int) -> int:
"""扣款成功后将本次金额累加到当日计数,返回累加后的总额。"""
today = str(date.today())
with _daily_deduct_lock:
counter = _daily_deduct_counters.get(plugin_id)
if not counter or counter.get("date") != today:
counter = {"date": today, "total": 0}
counter["total"] = int(counter.get("total", 0)) + int(amount)
_daily_deduct_counters[plugin_id] = counter
return counter["total"]
def verify_plugin_key(plugin_key: str) -> dict:
"""
校验 Plugin Key 并返回插件配置。
参数:
plugin_key: 请求头 X-Plugin-Key 的值
返回:
匹配到的插件配置字典(包含 id 字段)
异常:
401: 未提供 Plugin Key
403: Plugin Key 无效或未注册
"""
if not plugin_key:
raise HTTPException(status_code=401, detail="缺少 X-Plugin-Key Header")
info = _PLUGIN_KEY_INDEX.get(plugin_key)
if not info:
logger.warning(f"OPEN_API_INVALID_KEY | key_prefix={plugin_key[:8]}")
raise HTTPException(status_code=403, detail="Plugin Key 无效或未注册")
return info
# ==========================================
# 🚦 限流:基于 plugin_id 的内存计数器(每分钟自动重置)
# ==========================================
_rate_lock = threading.Lock()
_rate_buckets: dict = {} # {plugin_id: {"window_start": int, "count": int}}
def _check_rate_limit(plugin_info: dict, permission: str) -> None:
"""
校验插件权限并执行限流。
- 权限不足 -> 403
- 触发限流 -> 429
"""
plugin_id = plugin_info["id"]
rate_limit = int(plugin_info.get("rate_limit", 60))
perms = plugin_info.get("permissions", [])
if permission not in perms:
raise HTTPException(
status_code=403,
detail=f"插件 {plugin_id} 无权限调用 {permission} 接口",
)
now = int(time.time())
window_start = now - (now % 60)
with _rate_lock:
bucket = _rate_buckets.get(plugin_id)
if not bucket or bucket["window_start"] != window_start:
bucket = {"window_start": window_start, "count": 0}
_rate_buckets[plugin_id] = bucket
if bucket["count"] >= rate_limit:
raise HTTPException(
status_code=429,
detail=f"请求过于频繁,限流 {rate_limit}/分钟,请稍后再试",
)
bucket["count"] += 1
def _require_plugin(plugin_key: Optional[str], permission: str) -> dict:
"""统一入口:校验 Plugin Key + 权限 + 限流"""
info = verify_plugin_key(plugin_key or "")
_check_rate_limit(info, permission)
return info
def _resolve_user_from_token(token: str) -> str:
"""从 Token 中解析出账号;失败时抛 401"""
if not token:
raise HTTPException(status_code=401, detail="缺少用户 Token")
is_valid, account, error_msg = verify_token_with_fallback(token)
if not is_valid or not account:
raise HTTPException(status_code=401, detail=error_msg or "Token 无效")
return account
def _calc_tx_hash(tx_id: str, account: str, tx_type: str, amount: int, prev_hash: str) -> str:
"""计算交易哈希(与 router_wallet.calculate_tx_hash 算法一致)"""
data = f"{tx_id}{account}{tx_type}{amount}{prev_hash}"
return hashlib.sha256(data.encode()).hexdigest()
# ==========================================
# 📦 请求模型
# ==========================================
class VerifyTokenRequest(BaseModel):
token: str = Field(..., description="待验证的用户 JWT Token")
class DeductRequest(BaseModel):
token: str = Field(..., description="用户 JWT Token")
amount: int = Field(..., gt=0, description="扣款金额(正整数)")
reason: str = Field(..., description="业务标识/扣款原因")
reference_id: Optional[str] = Field(None, description="业务侧参考 ID")
class RefundRequest(BaseModel):
token: str = Field(..., description="用户 JWT Token")
transaction_id: str = Field(..., description="原扣款交易 ID")
amount: int = Field(..., gt=0, description="退款金额(正整数)")
# ==========================================
# 1️⃣ Token 验证:POST /api/open/verify-token
# ==========================================
@router.post("/verify-token")
async def open_verify_token(
req: VerifyTokenRequest,
x_plugin_key: Optional[str] = Header(None, alias="X-Plugin-Key"),
):
"""
供第三方插件验证用户 Token 有效性。
成功返回账号与基础资料(昵称、头像)。
"""
plugin_info = _require_plugin(x_plugin_key, "verify")
is_valid, account, error_msg = verify_token_with_fallback(req.token)
if not is_valid or not account:
logger.info(
f"OPEN_VERIFY_TOKEN_FAIL | plugin={plugin_info['id']} | reason={error_msg}"
)
return {"valid": False, "error": error_msg or "Token 无效"}
users_db = json_db.load_data("users.json", default_data={})
user = users_db.get(account, {}) or {}
logger.info(f"OPEN_VERIFY_TOKEN | plugin={plugin_info['id']} | account={account}")
return {
"valid": True,
"account": account,
"name": user.get("name", account),
"avatar": user.get("avatarDataUrl", "") or user.get("avatar", ""),
}
# ==========================================
# 2️⃣ 余额查询:GET /api/open/balance
# ==========================================
@router.get("/balance")
async def open_get_balance(
authorization: Optional[str] = Header(None, alias="Authorization"),
x_plugin_key: Optional[str] = Header(None, alias="X-Plugin-Key"),
db: Session = Depends(get_db),
):
"""
查询用户钱包余额。
认证:Authorization: Bearer <user_token> + X-Plugin-Key
"""
plugin_info = _require_plugin(x_plugin_key, "balance")
if not authorization:
raise HTTPException(status_code=401, detail="缺少 Authorization Header")
parts = authorization.split(" ", 1)
if len(parts) != 2 or parts[0].lower() != "bearer":
raise HTTPException(status_code=401, detail="Authorization 格式错误,请使用 Bearer Token")
account = _resolve_user_from_token(parts[1])
wallet = db.query(Wallet).filter(Wallet.account == account).first()
if not wallet:
result = {
"account": account,
"balance": 0,
"earn_balance": 0,
"tip_balance": 0,
"task_balance": 0,
"frozen_balance": 0,
}
else:
result = {
"account": account,
"balance": int(wallet.balance or 0),
"earn_balance": int(wallet.earn_balance or 0),
"tip_balance": int(wallet.tip_balance or 0),
"task_balance": int(wallet.task_balance or 0),
"frozen_balance": int(wallet.frozen_balance or 0),
}
logger.info(
f"OPEN_BALANCE | plugin={plugin_info['id']} | account={account} | balance={result['balance']}"
)
return result
# ==========================================
# 3️⃣ 通用扣款:POST /api/open/deduct
# ==========================================
@router.post("/deduct")
async def open_deduct(
req: DeductRequest,
x_plugin_key: Optional[str] = Header(None, alias="X-Plugin-Key"),
db: Session = Depends(get_db),
):
"""
通用扣款接口(从 wallet.balance 扣减)。
流程:
1. 校验 Plugin Key + 权限 + 限流
2. 解析 Token -> account
3. 悲观锁锁定钱包,检查余额
4. 扣款 + 写入 Transaction(含哈希链)
"""
plugin_info = _require_plugin(x_plugin_key, "deduct")
account = _resolve_user_from_token(req.token)
amount = int(req.amount)
# 🛡️ 单笔扣款上限校验(在余额检查之前快速拒绝)
max_per_tx = int(plugin_info.get("max_deduct_per_tx", 100))
if amount > max_per_tx:
logger.warning(
f"OPEN_DEDUCT_TX_LIMIT | plugin={plugin_info['id']} | account={account} | "
f"amount={amount} | max={max_per_tx}"
)
return JSONResponse(status_code=403, content={
"error": "AMOUNT_EXCEEDS_LIMIT",
"message": f"单笔扣款不能超过{max_per_tx}积分",
"max": max_per_tx,
})
# 🛡️ 日扣款总额校验(仅检查不累加,扣款成功后再累加)
max_daily = int(plugin_info.get("max_deduct_per_day", 1000))
used_today = _peek_daily_used(plugin_info["id"])
if used_today + amount > max_daily:
logger.warning(
f"OPEN_DEDUCT_DAILY_LIMIT | plugin={plugin_info['id']} | account={account} | "
f"amount={amount} | used={used_today} | limit={max_daily}"
)
return JSONResponse(status_code=403, content={
"error": "DAILY_LIMIT_EXCEEDED",
"message": "今日扣款已达上限",
"limit": max_daily,
"used": used_today,
})
try:
wallet = (
db.query(Wallet)
.filter(Wallet.account == account)
.with_for_update()
.first()
)
if not wallet or int(wallet.balance or 0) < amount:
raise HTTPException(status_code=402, detail="余额不足")
wallet.balance = int(wallet.balance or 0) - amount
tx_id = f"OPEN_DEDUCT_{int(time.time())}_{uuid.uuid4().hex[:8]}"
last_tx = (
db.query(Transaction)
.filter(Transaction.account == account)
.order_by(Transaction.created_at.desc())
.first()
)
prev_hash = last_tx.tx_hash if last_tx else "GENESIS_HASH"
tx_hash = _calc_tx_hash(tx_id, account, "OPEN_DEDUCT", -amount, prev_hash)
new_tx = Transaction(
tx_id=tx_id,
account=account,
tx_type="OPEN_DEDUCT",
amount=-amount,
related_account=plugin_info["id"],
item_id=req.reference_id,
prev_hash=prev_hash,
tx_hash=tx_hash,
description=f"[{plugin_info['name']}] {req.reason}",
source_plugin=plugin_info["id"], # 🌐 来源插件标识,用于按工具结构化追踪
)
db.add(new_tx)
db.commit()
db.refresh(wallet)
except HTTPException:
db.rollback()
raise
except Exception as e:
db.rollback()
logger.error(
f"OPEN_DEDUCT_FAIL | plugin={plugin_info['id']} | account={account} | err={e}"
)
raise HTTPException(status_code=500, detail=f"扣款失败: {e}")
# ✅ 扣款成功后再累加日计数器,避免余额不足等失败场景占用额度
daily_total = _commit_daily_used(plugin_info["id"], amount)
logger.info(
f"OPEN_DEDUCT | plugin={plugin_info['id']} | account={account} | "
f"amount={amount} | tx={tx_id} | reason={req.reason} | ref={req.reference_id} | "
f"daily_used={daily_total}/{max_daily}"
)
return {
"success": True,
"remaining_balance": int(wallet.balance or 0),
"transaction_id": tx_id,
}
# ==========================================
# 4️⃣ 退款:POST /api/open/refund
# ==========================================
@router.post("/refund")
async def open_refund(
req: RefundRequest,
x_plugin_key: Optional[str] = Header(None, alias="X-Plugin-Key"),
db: Session = Depends(get_db),
):
"""
退款接口:将之前 OPEN_DEDUCT 的金额(部分或全额)退还到用户钱包。
校验:
- 原交易必须存在且 tx_type == OPEN_DEDUCT
- 原交易归属当前 token 用户
- 原交易由当前插件发起(related_account == plugin_id)
- 累计退款金额 ≤ 原扣款金额
"""
plugin_info = _require_plugin(x_plugin_key, "refund")
account = _resolve_user_from_token(req.token)
refund_amount = int(req.amount)
original_tx = (
db.query(Transaction).filter(Transaction.tx_id == req.transaction_id).first()
)
if not original_tx:
raise HTTPException(status_code=404, detail="原交易不存在")
if original_tx.account != account:
raise HTTPException(status_code=403, detail="无权对该交易执行退款")
if original_tx.tx_type != "OPEN_DEDUCT":
raise HTTPException(status_code=400, detail="该交易类型不支持开放退款")
if (original_tx.related_account or "") != plugin_info["id"]:
raise HTTPException(status_code=403, detail="不允许跨插件退款")
original_amount = abs(int(original_tx.amount or 0))
if refund_amount > original_amount:
raise HTTPException(status_code=400, detail="退款金额超过原扣款金额")
# 累计已退金额校验(防止重复退款超额)
refunded_records = (
db.query(Transaction)
.filter(
Transaction.tx_type == "OPEN_REFUND",
Transaction.related_account == plugin_info["id"],
Transaction.item_id == req.transaction_id,
)
.all()
)
already_refunded = sum(abs(int(t.amount or 0)) for t in refunded_records)
if already_refunded + refund_amount > original_amount:
raise HTTPException(
status_code=400,
detail=f"退款金额超过可退余额(已退 {already_refunded}/{original_amount})",
)
try:
wallet = (
db.query(Wallet)
.filter(Wallet.account == account)
.with_for_update()
.first()
)
if not wallet:
wallet = Wallet(
account=account,
balance=0,
earn_balance=0,
tip_balance=0,
frozen_balance=0,
)
db.add(wallet)
db.flush()
wallet.balance = int(wallet.balance or 0) + refund_amount
tx_id = f"OPEN_REFUND_{int(time.time())}_{uuid.uuid4().hex[:8]}"
last_tx = (
db.query(Transaction)
.filter(Transaction.account == account)
.order_by(Transaction.created_at.desc())
.first()
)
prev_hash = last_tx.tx_hash if last_tx else "GENESIS_HASH"
tx_hash = _calc_tx_hash(tx_id, account, "OPEN_REFUND", refund_amount, prev_hash)
new_tx = Transaction(
tx_id=tx_id,
account=account,
tx_type="OPEN_REFUND",
amount=refund_amount,
related_account=plugin_info["id"],
item_id=req.transaction_id, # 关联原扣款交易
prev_hash=prev_hash,
tx_hash=tx_hash,
description=f"[{plugin_info['name']}] 退款 {req.transaction_id}",
source_plugin=plugin_info["id"], # 🌐 来源插件标识
)
db.add(new_tx)
db.commit()
db.refresh(wallet)
except HTTPException:
db.rollback()
raise
except Exception as e:
db.rollback()
logger.error(
f"OPEN_REFUND_FAIL | plugin={plugin_info['id']} | account={account} | err={e}"
)
raise HTTPException(status_code=500, detail=f"退款失败: {e}")
logger.info(
f"OPEN_REFUND | plugin={plugin_info['id']} | account={account} | "
f"amount={refund_amount} | tx={tx_id} | original={req.transaction_id}"
)
return {
"success": True,
"remaining_balance": int(wallet.balance or 0),
"transaction_id": tx_id,
}
# ==========================================
# 5️⃣ 消费记录查询:GET /api/open/transactions
# ==========================================
@router.get("/transactions")
async def open_list_transactions(
authorization: Optional[str] = Header(None, alias="Authorization"),
x_plugin_key: Optional[str] = Header(None, alias="X-Plugin-Key"),
page: int = Query(1, ge=1, description="页码(从 1 开始)"),
page_size: int = Query(20, ge=1, le=100, description="每页条数(1-100)"),
start_time: Optional[int] = Query(None, description="起始时间(unix 秒)"),
end_time: Optional[int] = Query(None, description="结束时间(unix 秒)"),
tx_type: Optional[str] = Query(None, description="交易类型,如 OPEN_DEDUCT / OPEN_REFUND"),
db: Session = Depends(get_db),
):
"""
查询当前用户由当前插件产生的消费/退款记录。
认证:Authorization: Bearer <user_token> + X-Plugin-Key
安全:仅返回 source_plugin == 当前 plugin_id 的记录,防止跨插件读取。
"""
plugin_info = _require_plugin(x_plugin_key, "verify")
if not authorization:
raise HTTPException(status_code=401, detail="缺少 Authorization Header")
parts = authorization.split(" ", 1)
if len(parts) != 2 or parts[0].lower() != "bearer":
raise HTTPException(status_code=401, detail="Authorization 格式错误,请使用 Bearer Token")
account = _resolve_user_from_token(parts[1])
plugin_id = plugin_info["id"]
# 构造查询:按 source_plugin 过滤;兼容历史 NULL 数据走 related_account 兜底
q = db.query(Transaction).filter(
Transaction.account == account,
((Transaction.source_plugin == plugin_id)
| ((Transaction.source_plugin.is_(None)) & (Transaction.related_account == plugin_id))),
)
if tx_type:
q = q.filter(Transaction.tx_type == tx_type)
if start_time is not None:
q = q.filter(Transaction.created_at >= datetime.datetime.utcfromtimestamp(int(start_time)))
if end_time is not None:
q = q.filter(Transaction.created_at <= datetime.datetime.utcfromtimestamp(int(end_time)))
total = q.count()
rows = (
q.order_by(Transaction.created_at.desc())
.offset((page - 1) * page_size)
.limit(page_size)
.all()
)
items = []
for t in rows:
created_ts = int(t.created_at.timestamp()) if t.created_at else 0
# reason 从 description 中解析(去掉 [插件名] 前缀),兼容旧记录
reason = (t.description or "").replace(f"[{plugin_info['name']}] ", "").strip() or None
items.append({
"id": t.tx_id,
"type": t.tx_type,
"amount": int(t.amount or 0),
"reason": reason,
"reference_id": t.item_id,
"created_at": created_ts,
"description": t.description or "",
})
logger.info(
f"OPEN_LIST_TX | plugin={plugin_id} | account={account} | "
f"page={page}/{page_size} | total={total}"
)
return {
"success": True,
"transactions": items,
"total": total,
"page": page,
"page_size": page_size,
}
# ==========================================
# 6️⃣ 消费汇总:GET /api/open/usage-summary
# ==========================================
@router.get("/usage-summary")
async def open_usage_summary(
x_plugin_key: Optional[str] = Header(None, alias="X-Plugin-Key"),
account: Optional[str] = Query(None, description="指定用户账号,不传则统计所有用户"),
period: str = Query("daily", description="聚合粒度:daily / monthly / total"),
start_time: Optional[int] = Query(None, description="起始时间(unix 秒)"),
end_time: Optional[int] = Query(None, description="结束时间(unix 秒)"),
db: Session = Depends(get_db),
):
"""
本插件维度的消费汇总统计。
返回:总扣款 / 总退款 / 总笔数 / 净消费 + 按 period 分桶的明细。
安全:仅统计 source_plugin == 当前 plugin_id 的记录。
"""
plugin_info = _require_plugin(x_plugin_key, "verify")
plugin_id = plugin_info["id"]
period = (period or "daily").lower()
if period not in ("daily", "monthly", "total"):
raise HTTPException(status_code=400, detail="period 仅支持 daily / monthly / total")
base = db.query(Transaction).filter(
((Transaction.source_plugin == plugin_id)
| ((Transaction.source_plugin.is_(None)) & (Transaction.related_account == plugin_id))),
Transaction.tx_type.in_(["OPEN_DEDUCT", "OPEN_REFUND"]),
)
if account:
base = base.filter(Transaction.account == account)
if start_time is not None:
base = base.filter(Transaction.created_at >= datetime.datetime.utcfromtimestamp(int(start_time)))
if end_time is not None:
base = base.filter(Transaction.created_at <= datetime.datetime.utcfromtimestamp(int(end_time)))
rows = base.all()
total_deduct = 0
total_refund = 0
total_count = len(rows)
bucket: dict = {} # key -> {"deduct":int,"refund":int,"count":int}
for t in rows:
amt = int(t.amount or 0)
if t.tx_type == "OPEN_DEDUCT":
total_deduct += abs(amt)
elif t.tx_type == "OPEN_REFUND":
total_refund += abs(amt)
if period == "total":
continue
if not t.created_at:
continue
if period == "daily":
key = t.created_at.strftime("%Y-%m-%d")
else: # monthly
key = t.created_at.strftime("%Y-%m")
b = bucket.setdefault(key, {"deduct": 0, "refund": 0, "count": 0})
if t.tx_type == "OPEN_DEDUCT":
b["deduct"] += abs(amt)
elif t.tx_type == "OPEN_REFUND":
b["refund"] += abs(amt)
b["count"] += 1
breakdown = []
if period != "total":
for key in sorted(bucket.keys(), reverse=True):
v = bucket[key]
label = "date" if period == "daily" else "month"
breakdown.append({
label: key,
"deduct": v["deduct"],
"refund": v["refund"],
"count": v["count"],
})
logger.info(
f"OPEN_USAGE_SUMMARY | plugin={plugin_id} | account={account or '*'} | "
f"period={period} | tx_count={total_count}"
)
return {
"success": True,
"plugin_id": plugin_id,
"plugin_name": plugin_info.get("name", plugin_id),
"summary": {
"total_deduct": total_deduct,
"total_refund": total_refund,
"total_transactions": total_count,
"net_consumption": total_deduct - total_refund,
},
"breakdown": breakdown,
}