Spaces:
Sleeping
Sleeping
File size: 6,565 Bytes
4c2a557 a572854 4c2a557 a572854 4c2a557 a572854 4c2a557 a572854 4c2a557 a572854 4c2a557 a572854 4c2a557 a572854 4c2a557 a572854 4c2a557 a572854 4c2a557 a572854 4c2a557 a572854 4c2a557 a572854 4c2a557 a572854 4c2a557 a572854 4c2a557 a572854 4c2a557 a572854 4c2a557 a572854 4c2a557 a572854 4c2a557 a572854 4c2a557 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 | """
title: Usage Monitor
author: VariantConst & OVINC CN
git_url: https://github.com/VariantConst/OpenWebUI-Monitor.git
version: 0.3.6
requirements: httpx
license: MIT
"""
import logging
import time
from typing import Dict, Optional
from httpx import AsyncClient
from pydantic import BaseModel, Field
import json
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
TRANSLATIONS = {
"en": {
"request_failed": "Request failed: {error_msg}",
"insufficient_balance": "Insufficient balance: Current balance `{balance:.4f}`",
"cost": "Cost: ${cost:.4f}",
"balance": "Balance: ${balance:.4f}",
"tokens": "Tokens: {input}+{output}",
"time_spent": "Time: {time:.2f}s",
"tokens_per_sec": "{tokens_per_sec:.2f} T/s",
},
"zh": {
"request_failed": "请求失败: {error_msg}",
"insufficient_balance": "余额不足: 当前余额 `{balance:.4f}`",
"cost": "费用: ¥{cost:.4f}",
"balance": "余额: ¥{balance:.4f}",
"tokens": "Token: {input}+{output}",
"time_spent": "耗时: {time:.2f}s",
"tokens_per_sec": "{tokens_per_sec:.2f} T/s",
},
}
class CustomException(Exception):
pass
class Filter:
class Valves(BaseModel):
api_endpoint: str = Field(default="", description="openwebui-monitor's base url")
api_key: str = Field(default="", description="openwebui-monitor's api key")
priority: int = Field(default=5, description="filter priority")
language: str = Field(default="zh", description="language (en/zh)")
show_time_spent: bool = Field(default=True, description="show time spent")
show_tokens_per_sec: bool = Field(default=True, description="show tokens per second")
show_cost: bool = Field(default=True, description="show cost")
show_balance: bool = Field(default=True, description="show balance")
show_tokens: bool = Field(default=True, description="show tokens")
def __init__(self):
self.type = "filter"
self.name = "OpenWebUI Monitor"
self.valves = self.Valves()
self.outage_map: Dict[str, bool] = {}
self.start_time: Optional[float] = None
def get_text(self, key: str, **kwargs) -> str:
lang = self.valves.language if self.valves.language in TRANSLATIONS else "en"
text = TRANSLATIONS[lang].get(key, TRANSLATIONS["en"][key])
return text.format(**kwargs) if kwargs else text
async def request(self, client: AsyncClient, url: str, headers: dict, json_data: dict):
json_data = json.loads(json.dumps(json_data, default=lambda o: o.dict() if hasattr(o, "dict") else str(o)))
response = await client.post(url=url, headers=headers, json=json_data)
response.raise_for_status()
response_data = response.json()
if not response_data.get("success"):
logger.error(self.get_text("request_failed", error_msg=response_data))
raise CustomException(self.get_text("request_failed", error_msg=response_data))
return response_data
async def inlet(self, body: dict, __metadata__: Optional[dict] = None, __user__: Optional[dict] = None) -> dict:
__user__ = __user__ or {}
__metadata__ = __metadata__ or {}
self.start_time = time.time()
user_id = __user__.get("id", "default")
client = AsyncClient()
try:
response_data = await self.request(
client=client,
url=f"{self.valves.api_endpoint}/api/v1/inlet",
headers={"Authorization": f"Bearer {self.valves.api_key}"},
json_data={"user": __user__, "body": body},
)
self.outage_map[user_id] = response_data.get("balance", 0) <= 0
if self.outage_map[user_id]:
logger.info(self.get_text("insufficient_balance", balance=response_data.get("balance", 0)))
raise CustomException(self.get_text("insufficient_balance", balance=response_data.get("balance", 0)))
return body
except Exception as err:
logger.exception(self.get_text("request_failed", error_msg=err))
if isinstance(err, CustomException):
raise err
raise Exception(f"error calculating usage, {err}") from err
finally:
await client.aclose()
async def outlet(
self,
body: dict,
__metadata__: Optional[dict] = None,
__user__: Optional[dict] = None,
__event_emitter__: Optional[callable] = None,
) -> dict:
__user__ = __user__ or {}
__metadata__ = __metadata__ or {}
user_id = __user__.get("id", "default")
if self.outage_map.get(user_id, False):
return body
client = AsyncClient()
try:
response_data = await self.request(
client=client,
url=f"{self.valves.api_endpoint}/api/v1/outlet",
headers={"Authorization": f"Bearer {self.valves.api_key}"},
json_data={"user": __user__, "body": body},
)
stats_list = []
if self.valves.show_tokens:
stats_list.append(self.get_text("tokens", input=response_data["inputTokens"], output=response_data["outputTokens"]))
if self.valves.show_cost:
stats_list.append(self.get_text("cost", cost=response_data["totalCost"]))
if self.valves.show_balance:
stats_list.append(self.get_text("balance", balance=response_data["newBalance"]))
if self.start_time and self.valves.show_time_spent:
elapsed = time.time() - self.start_time
stats_list.append(self.get_text("time_spent", time=elapsed))
if self.valves.show_tokens_per_sec:
tokens_per_sec = (response_data["outputTokens"] / elapsed if elapsed > 0 else 0)
stats_list.append(self.get_text("tokens_per_sec", tokens_per_sec=tokens_per_sec))
stats = " | ".join(stats_list)
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"description": stats, "done": True}})
logger.info("usage_monitor: %s %s", user_id, stats)
return body
except Exception as err:
logger.exception(self.get_text("request_failed", error_msg=err))
raise Exception(self.get_text("request_failed", error_msg=err))
finally:
await client.aclose()
|