Spaces:
Sleeping
Sleeping
| """ | |
| title: 计费信息按钮 | |
| author: fl0w1nd | |
| author_url: https://github.com/fl0w1nd | |
| version: 0.1.1 | |
| icon_url: data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAADAAAAAwCAYAAABXAvmHAAAACXBIWXMAAAsTAAALEwEAmpwYAAACMElEQVR4nO2ZMWgUQRSGJ4mFgZBGQTAgCqawSbASg6YIFkEsPbCQHNzd/v/sHBe4IvUSUtqlThcCSZEiXUharWxURC20SDCQIGKMxjTBk4E7MhlO7naX3M7C/PCamZud9817M/NuVwgvLy8vZ0VylOQ6ySOSjR7bEYA1kiNpnP+egeMNy74lgmiufMMRW00CkEXaNP5jh0kAzj1E9FhMO78HSCn6CDBeDsbdmM5FgB7gvHwEmPE9kFoeIGvRp1B6AXgOAEqpoVxGAMCenhvAjdiDPUAKlcvlayQJ4FczArO1Wm04FxEAUCV53O6vJYBppwGklKUONdWJlPKukwBRFF0iuW/NuwLgpdkGYMtJAAD3rTnXW2AAvhjtp9Vq9YqLAAVrpecMXxatvknnAEg+tZycb/VJKZ+R3G4ZgAfOAYRhOGYBHJC8mfiBGZxCfSQ/WhBfpZRPEj0to2P0Ybs7AMCmUup2Li6yIAgmrFOnBfFH74Vc1ELFYvGyPoWaL3ZNP05JPs5TMXcVwIYViU96v3Qz+GfcVyVxLE40SL43x4ZheKcbgLVeApD8bazyG6tvwRrfOY30R4U2OXiRAJ+N/h9RFPUbi/nCHBsEwSPRjUql0nX9ceEi0qkNwLL1mxndXq/XBwF8MPsqlcot4ZqklFMWwF+SrwDsWu2vhasCsNQhcicA7glXVSgUBnQRZ25oY2O/1SW3yIOUUkMAKk3nd0iOi7wpPKtO32XtSyJ1C/APUWkkXC3hgzUAAAAASUVORK5CYII= | |
| required_open_webui_version: 0.4.0 | |
| """ | |
| from pydantic import BaseModel, Field | |
| from typing import Optional, Union, Generator, Iterator | |
| import os | |
| import requests | |
| import asyncio | |
| import json | |
| class Action: | |
| class Valves(BaseModel): | |
| show_cost: bool = Field( | |
| default=True, | |
| description="是否显示费用", | |
| json_schema_extra={"ui:group": "显示设置"}, | |
| ) | |
| show_balance: bool = Field( | |
| default=True, | |
| description="是否显示余额", | |
| json_schema_extra={"ui:group": "显示设置"}, | |
| ) | |
| show_tokens: bool = Field( | |
| default=True, | |
| description="是否显示token数", | |
| json_schema_extra={"ui:group": "显示设置"}, | |
| ) | |
| show_tokens_per_sec: bool = Field( | |
| default=True, | |
| description="是否显示每秒输出token数", | |
| json_schema_extra={"ui:group": "显示设置"}, | |
| ) | |
| def __init__(self): | |
| self.valves = self.Valves() | |
| pass | |
| async def action( | |
| self, | |
| body: dict, | |
| __user__=None, | |
| __event_emitter__=None, | |
| __event_call__=None, | |
| ) -> Optional[dict]: | |
| print(f"action:{__name__}") | |
| # 查找最新一条assistant消息的索引和内容 | |
| messages = body.get("messages", []) | |
| assistant_indexes = [ | |
| i for i, msg in enumerate(messages) if msg.get("role") == "assistant" | |
| ] | |
| if not assistant_indexes: | |
| if __event_emitter__: | |
| await __event_emitter__( | |
| { | |
| "type": "status", | |
| "data": {"description": "没有找到assistant消息", "done": True}, | |
| } | |
| ) | |
| return None | |
| # 获取最后一条assistant消息的索引和内容 | |
| last_assistant_index = assistant_indexes[-1] | |
| last_assistant_message = messages[last_assistant_index] | |
| # 获取消息 ID | |
| message_id = last_assistant_message.get("id") | |
| if not message_id: | |
| if __event_emitter__: | |
| await __event_emitter__( | |
| { | |
| "type": "status", | |
| "data": {"description": "无法获取消息ID", "done": True}, | |
| } | |
| ) | |
| return None | |
| # 构建文件路径 | |
| file_path = os.path.join("/app/backend/data/record", f"{message_id}.json") | |
| # 检查文件是否存在 | |
| if not os.path.exists(file_path): | |
| if __event_emitter__: | |
| await __event_emitter__( | |
| { | |
| "type": "status", | |
| "data": { | |
| "description": f"未查找到该消息的计费记录,请联系管理员", | |
| "done": True, | |
| }, | |
| } | |
| ) | |
| return None | |
| # 读取统计信息 | |
| try: | |
| with open(file_path, "r") as f: | |
| stats_data = json.load(f) | |
| except Exception as e: | |
| if __event_emitter__: | |
| await __event_emitter__( | |
| { | |
| "type": "status", | |
| "data": { | |
| "description": f"读取统计文件失败: {str(e)}", | |
| "done": True, | |
| }, | |
| } | |
| ) | |
| return None | |
| # 构建状态栏显示的统计信息 | |
| stats_array = [] | |
| if self.valves.show_cost and "total_cost" in stats_data: | |
| stats_array.append(f"Cost: ${stats_data['total_cost']:.6f}") | |
| if self.valves.show_balance and "new_balance" in stats_data: | |
| stats_array.append(f"Balance: ${stats_data['new_balance']:.6f}") | |
| if ( | |
| self.valves.show_tokens | |
| and "input_tokens" in stats_data | |
| and "output_tokens" in stats_data | |
| ): | |
| stats_array.append( | |
| f"Token: {stats_data['input_tokens']}+{stats_data['output_tokens']}" | |
| ) | |
| # 计算耗时(如果有elapsed_time) | |
| if "elapsed_time" in stats_data: | |
| elapsed_time = stats_data["elapsed_time"] | |
| stats_array.append(f"Time: {elapsed_time:.2f}s") | |
| # 计算每秒输出速度 | |
| if ( | |
| self.valves.show_tokens_per_sec | |
| and "output_tokens" in stats_data | |
| and elapsed_time > 0 | |
| ): | |
| stats_array.append( | |
| f"{(stats_data['output_tokens']/elapsed_time):.2f} T/s" | |
| ) | |
| stats = " | ".join(stat for stat in stats_array) | |
| # 发送状态更新 | |
| if __event_emitter__: | |
| await __event_emitter__( | |
| { | |
| "type": "status", | |
| "data": { | |
| "description": stats, | |
| "done": True, | |
| }, | |
| } | |
| ) | |
| return None | |