OpenWebUI-Monitor / resources /functions /openwebui_monitor.py
luowuyin's picture
25:05:05 10:41:39 v0.3.7
a572854
"""
title: Usage Monitor
author: VariantConst & OVINC CN
git_url: https://github.com/VariantConst/OpenWebUI-Monitor.git
version: 0.3.6
requirements: httpx
license: MIT
"""
import logging
import time
from typing import Dict, Optional
from httpx import AsyncClient
from pydantic import BaseModel, Field
import json
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
TRANSLATIONS = {
"en": {
"request_failed": "Request failed: {error_msg}",
"insufficient_balance": "Insufficient balance: Current balance `{balance:.4f}`",
"cost": "Cost: ${cost:.4f}",
"balance": "Balance: ${balance:.4f}",
"tokens": "Tokens: {input}+{output}",
"time_spent": "Time: {time:.2f}s",
"tokens_per_sec": "{tokens_per_sec:.2f} T/s",
},
"zh": {
"request_failed": "请求失败: {error_msg}",
"insufficient_balance": "余额不足: 当前余额 `{balance:.4f}`",
"cost": "费用: ¥{cost:.4f}",
"balance": "余额: ¥{balance:.4f}",
"tokens": "Token: {input}+{output}",
"time_spent": "耗时: {time:.2f}s",
"tokens_per_sec": "{tokens_per_sec:.2f} T/s",
},
}
class CustomException(Exception):
pass
class Filter:
class Valves(BaseModel):
api_endpoint: str = Field(default="", description="openwebui-monitor's base url")
api_key: str = Field(default="", description="openwebui-monitor's api key")
priority: int = Field(default=5, description="filter priority")
language: str = Field(default="zh", description="language (en/zh)")
show_time_spent: bool = Field(default=True, description="show time spent")
show_tokens_per_sec: bool = Field(default=True, description="show tokens per second")
show_cost: bool = Field(default=True, description="show cost")
show_balance: bool = Field(default=True, description="show balance")
show_tokens: bool = Field(default=True, description="show tokens")
def __init__(self):
self.type = "filter"
self.name = "OpenWebUI Monitor"
self.valves = self.Valves()
self.outage_map: Dict[str, bool] = {}
self.start_time: Optional[float] = None
def get_text(self, key: str, **kwargs) -> str:
lang = self.valves.language if self.valves.language in TRANSLATIONS else "en"
text = TRANSLATIONS[lang].get(key, TRANSLATIONS["en"][key])
return text.format(**kwargs) if kwargs else text
async def request(self, client: AsyncClient, url: str, headers: dict, json_data: dict):
json_data = json.loads(json.dumps(json_data, default=lambda o: o.dict() if hasattr(o, "dict") else str(o)))
response = await client.post(url=url, headers=headers, json=json_data)
response.raise_for_status()
response_data = response.json()
if not response_data.get("success"):
logger.error(self.get_text("request_failed", error_msg=response_data))
raise CustomException(self.get_text("request_failed", error_msg=response_data))
return response_data
async def inlet(self, body: dict, __metadata__: Optional[dict] = None, __user__: Optional[dict] = None) -> dict:
__user__ = __user__ or {}
__metadata__ = __metadata__ or {}
self.start_time = time.time()
user_id = __user__.get("id", "default")
client = AsyncClient()
try:
response_data = await self.request(
client=client,
url=f"{self.valves.api_endpoint}/api/v1/inlet",
headers={"Authorization": f"Bearer {self.valves.api_key}"},
json_data={"user": __user__, "body": body},
)
self.outage_map[user_id] = response_data.get("balance", 0) <= 0
if self.outage_map[user_id]:
logger.info(self.get_text("insufficient_balance", balance=response_data.get("balance", 0)))
raise CustomException(self.get_text("insufficient_balance", balance=response_data.get("balance", 0)))
return body
except Exception as err:
logger.exception(self.get_text("request_failed", error_msg=err))
if isinstance(err, CustomException):
raise err
raise Exception(f"error calculating usage, {err}") from err
finally:
await client.aclose()
async def outlet(
self,
body: dict,
__metadata__: Optional[dict] = None,
__user__: Optional[dict] = None,
__event_emitter__: Optional[callable] = None,
) -> dict:
__user__ = __user__ or {}
__metadata__ = __metadata__ or {}
user_id = __user__.get("id", "default")
if self.outage_map.get(user_id, False):
return body
client = AsyncClient()
try:
response_data = await self.request(
client=client,
url=f"{self.valves.api_endpoint}/api/v1/outlet",
headers={"Authorization": f"Bearer {self.valves.api_key}"},
json_data={"user": __user__, "body": body},
)
stats_list = []
if self.valves.show_tokens:
stats_list.append(self.get_text("tokens", input=response_data["inputTokens"], output=response_data["outputTokens"]))
if self.valves.show_cost:
stats_list.append(self.get_text("cost", cost=response_data["totalCost"]))
if self.valves.show_balance:
stats_list.append(self.get_text("balance", balance=response_data["newBalance"]))
if self.start_time and self.valves.show_time_spent:
elapsed = time.time() - self.start_time
stats_list.append(self.get_text("time_spent", time=elapsed))
if self.valves.show_tokens_per_sec:
tokens_per_sec = (response_data["outputTokens"] / elapsed if elapsed > 0 else 0)
stats_list.append(self.get_text("tokens_per_sec", tokens_per_sec=tokens_per_sec))
stats = " | ".join(stats_list)
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"description": stats, "done": True}})
logger.info("usage_monitor: %s %s", user_id, stats)
return body
except Exception as err:
logger.exception(self.get_text("request_failed", error_msg=err))
raise Exception(self.get_text("request_failed", error_msg=err))
finally:
await client.aclose()