Spaces:
Sleeping
Sleeping
File size: 6,100 Bytes
4c2a557 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 | """
title: 计费信息按钮
author: fl0w1nd
author_url: https://github.com/fl0w1nd
version: 0.1.1
icon_url: data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAADAAAAAwCAYAAABXAvmHAAAACXBIWXMAAAsTAAALEwEAmpwYAAACMElEQVR4nO2ZMWgUQRSGJ4mFgZBGQTAgCqawSbASg6YIFkEsPbCQHNzd/v/sHBe4IvUSUtqlThcCSZEiXUharWxURC20SDCQIGKMxjTBk4E7MhlO7naX3M7C/PCamZud9817M/NuVwgvLy8vZ0VylOQ6ySOSjR7bEYA1kiNpnP+egeMNy74lgmiufMMRW00CkEXaNP5jh0kAzj1E9FhMO78HSCn6CDBeDsbdmM5FgB7gvHwEmPE9kFoeIGvRp1B6AXgOAEqpoVxGAMCenhvAjdiDPUAKlcvlayQJ4FczArO1Wm04FxEAUCV53O6vJYBppwGklKUONdWJlPKukwBRFF0iuW/NuwLgpdkGYMtJAAD3rTnXW2AAvhjtp9Vq9YqLAAVrpecMXxatvknnAEg+tZycb/VJKZ+R3G4ZgAfOAYRhOGYBHJC8mfiBGZxCfSQ/WhBfpZRPEj0to2P0Ybs7AMCmUup2Li6yIAgmrFOnBfFH74Vc1ELFYvGyPoWaL3ZNP05JPs5TMXcVwIYViU96v3Qz+GfcVyVxLE40SL43x4ZheKcbgLVeApD8bazyG6tvwRrfOY30R4U2OXiRAJ+N/h9RFPUbi/nCHBsEwSPRjUql0nX9ceEi0qkNwLL1mxndXq/XBwF8MPsqlcot4ZqklFMWwF+SrwDsWu2vhasCsNQhcicA7glXVSgUBnQRZ25oY2O/1SW3yIOUUkMAKk3nd0iOi7wpPKtO32XtSyJ1C/APUWkkXC3hgzUAAAAASUVORK5CYII=
required_open_webui_version: 0.4.0
"""
from pydantic import BaseModel, Field
from typing import Optional, Union, Generator, Iterator
import os
import requests
import asyncio
import json
class Action:
class Valves(BaseModel):
show_cost: bool = Field(
default=True,
description="是否显示费用",
json_schema_extra={"ui:group": "显示设置"},
)
show_balance: bool = Field(
default=True,
description="是否显示余额",
json_schema_extra={"ui:group": "显示设置"},
)
show_tokens: bool = Field(
default=True,
description="是否显示token数",
json_schema_extra={"ui:group": "显示设置"},
)
show_tokens_per_sec: bool = Field(
default=True,
description="是否显示每秒输出token数",
json_schema_extra={"ui:group": "显示设置"},
)
def __init__(self):
self.valves = self.Valves()
pass
async def action(
self,
body: dict,
__user__=None,
__event_emitter__=None,
__event_call__=None,
) -> Optional[dict]:
print(f"action:{__name__}")
# 查找最新一条assistant消息的索引和内容
messages = body.get("messages", [])
assistant_indexes = [
i for i, msg in enumerate(messages) if msg.get("role") == "assistant"
]
if not assistant_indexes:
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "没有找到assistant消息", "done": True},
}
)
return None
# 获取最后一条assistant消息的索引和内容
last_assistant_index = assistant_indexes[-1]
last_assistant_message = messages[last_assistant_index]
# 获取消息 ID
message_id = last_assistant_message.get("id")
if not message_id:
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "无法获取消息ID", "done": True},
}
)
return None
# 构建文件路径
file_path = os.path.join("/app/backend/data/record", f"{message_id}.json")
# 检查文件是否存在
if not os.path.exists(file_path):
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"未查找到该消息的计费记录,请联系管理员",
"done": True,
},
}
)
return None
# 读取统计信息
try:
with open(file_path, "r") as f:
stats_data = json.load(f)
except Exception as e:
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"读取统计文件失败: {str(e)}",
"done": True,
},
}
)
return None
# 构建状态栏显示的统计信息
stats_array = []
if self.valves.show_cost and "total_cost" in stats_data:
stats_array.append(f"Cost: ${stats_data['total_cost']:.6f}")
if self.valves.show_balance and "new_balance" in stats_data:
stats_array.append(f"Balance: ${stats_data['new_balance']:.6f}")
if (
self.valves.show_tokens
and "input_tokens" in stats_data
and "output_tokens" in stats_data
):
stats_array.append(
f"Token: {stats_data['input_tokens']}+{stats_data['output_tokens']}"
)
# 计算耗时(如果有elapsed_time)
if "elapsed_time" in stats_data:
elapsed_time = stats_data["elapsed_time"]
stats_array.append(f"Time: {elapsed_time:.2f}s")
# 计算每秒输出速度
if (
self.valves.show_tokens_per_sec
and "output_tokens" in stats_data
and elapsed_time > 0
):
stats_array.append(
f"{(stats_data['output_tokens']/elapsed_time):.2f} T/s"
)
stats = " | ".join(stat for stat in stats_array)
# 发送状态更新
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": stats,
"done": True,
},
}
)
return None
|