Spaces:
Sleeping
Sleeping
| """ | |
| title: Usage Monitor | |
| author: VariantConst & OVINC CN | |
| git_url: https://github.com/VariantConst/OpenWebUI-Monitor.git | |
| version: 0.3.6 | |
| requirements: httpx | |
| license: MIT | |
| """ | |
| import logging | |
| import time | |
| from typing import Dict, Optional | |
| from httpx import AsyncClient | |
| from pydantic import BaseModel, Field | |
| import json | |
| logger = logging.getLogger(__name__) | |
| logger.setLevel(logging.INFO) | |
| TRANSLATIONS = { | |
| "en": { | |
| "request_failed": "Request failed: {error_msg}", | |
| "insufficient_balance": "Insufficient balance: Current balance `{balance:.4f}`", | |
| "cost": "Cost: ${cost:.4f}", | |
| "balance": "Balance: ${balance:.4f}", | |
| "tokens": "Tokens: {input}+{output}", | |
| "time_spent": "Time: {time:.2f}s", | |
| "tokens_per_sec": "{tokens_per_sec:.2f} T/s", | |
| }, | |
| "zh": { | |
| "request_failed": "请求失败: {error_msg}", | |
| "insufficient_balance": "余额不足: 当前余额 `{balance:.4f}`", | |
| "cost": "费用: ¥{cost:.4f}", | |
| "balance": "余额: ¥{balance:.4f}", | |
| "tokens": "Token: {input}+{output}", | |
| "time_spent": "耗时: {time:.2f}s", | |
| "tokens_per_sec": "{tokens_per_sec:.2f} T/s", | |
| }, | |
| } | |
| class CustomException(Exception): | |
| pass | |
| class Filter: | |
| class Valves(BaseModel): | |
| api_endpoint: str = Field(default="", description="openwebui-monitor's base url") | |
| api_key: str = Field(default="", description="openwebui-monitor's api key") | |
| priority: int = Field(default=5, description="filter priority") | |
| language: str = Field(default="zh", description="language (en/zh)") | |
| show_time_spent: bool = Field(default=True, description="show time spent") | |
| show_tokens_per_sec: bool = Field(default=True, description="show tokens per second") | |
| show_cost: bool = Field(default=True, description="show cost") | |
| show_balance: bool = Field(default=True, description="show balance") | |
| show_tokens: bool = Field(default=True, description="show tokens") | |
| def __init__(self): | |
| self.type = "filter" | |
| self.name = "OpenWebUI Monitor" | |
| self.valves = self.Valves() | |
| self.outage_map: Dict[str, bool] = {} | |
| self.start_time: Optional[float] = None | |
| def get_text(self, key: str, **kwargs) -> str: | |
| lang = self.valves.language if self.valves.language in TRANSLATIONS else "en" | |
| text = TRANSLATIONS[lang].get(key, TRANSLATIONS["en"][key]) | |
| return text.format(**kwargs) if kwargs else text | |
| async def request(self, client: AsyncClient, url: str, headers: dict, json_data: dict): | |
| json_data = json.loads(json.dumps(json_data, default=lambda o: o.dict() if hasattr(o, "dict") else str(o))) | |
| response = await client.post(url=url, headers=headers, json=json_data) | |
| response.raise_for_status() | |
| response_data = response.json() | |
| if not response_data.get("success"): | |
| logger.error(self.get_text("request_failed", error_msg=response_data)) | |
| raise CustomException(self.get_text("request_failed", error_msg=response_data)) | |
| return response_data | |
| async def inlet(self, body: dict, __metadata__: Optional[dict] = None, __user__: Optional[dict] = None) -> dict: | |
| __user__ = __user__ or {} | |
| __metadata__ = __metadata__ or {} | |
| self.start_time = time.time() | |
| user_id = __user__.get("id", "default") | |
| client = AsyncClient() | |
| try: | |
| response_data = await self.request( | |
| client=client, | |
| url=f"{self.valves.api_endpoint}/api/v1/inlet", | |
| headers={"Authorization": f"Bearer {self.valves.api_key}"}, | |
| json_data={"user": __user__, "body": body}, | |
| ) | |
| self.outage_map[user_id] = response_data.get("balance", 0) <= 0 | |
| if self.outage_map[user_id]: | |
| logger.info(self.get_text("insufficient_balance", balance=response_data.get("balance", 0))) | |
| raise CustomException(self.get_text("insufficient_balance", balance=response_data.get("balance", 0))) | |
| return body | |
| except Exception as err: | |
| logger.exception(self.get_text("request_failed", error_msg=err)) | |
| if isinstance(err, CustomException): | |
| raise err | |
| raise Exception(f"error calculating usage, {err}") from err | |
| finally: | |
| await client.aclose() | |
| async def outlet( | |
| self, | |
| body: dict, | |
| __metadata__: Optional[dict] = None, | |
| __user__: Optional[dict] = None, | |
| __event_emitter__: Optional[callable] = None, | |
| ) -> dict: | |
| __user__ = __user__ or {} | |
| __metadata__ = __metadata__ or {} | |
| user_id = __user__.get("id", "default") | |
| if self.outage_map.get(user_id, False): | |
| return body | |
| client = AsyncClient() | |
| try: | |
| response_data = await self.request( | |
| client=client, | |
| url=f"{self.valves.api_endpoint}/api/v1/outlet", | |
| headers={"Authorization": f"Bearer {self.valves.api_key}"}, | |
| json_data={"user": __user__, "body": body}, | |
| ) | |
| stats_list = [] | |
| if self.valves.show_tokens: | |
| stats_list.append(self.get_text("tokens", input=response_data["inputTokens"], output=response_data["outputTokens"])) | |
| if self.valves.show_cost: | |
| stats_list.append(self.get_text("cost", cost=response_data["totalCost"])) | |
| if self.valves.show_balance: | |
| stats_list.append(self.get_text("balance", balance=response_data["newBalance"])) | |
| if self.start_time and self.valves.show_time_spent: | |
| elapsed = time.time() - self.start_time | |
| stats_list.append(self.get_text("time_spent", time=elapsed)) | |
| if self.valves.show_tokens_per_sec: | |
| tokens_per_sec = (response_data["outputTokens"] / elapsed if elapsed > 0 else 0) | |
| stats_list.append(self.get_text("tokens_per_sec", tokens_per_sec=tokens_per_sec)) | |
| stats = " | ".join(stats_list) | |
| if __event_emitter__: | |
| await __event_emitter__({"type": "status", "data": {"description": stats, "done": True}}) | |
| logger.info("usage_monitor: %s %s", user_id, stats) | |
| return body | |
| except Exception as err: | |
| logger.exception(self.get_text("request_failed", error_msg=err)) | |
| raise Exception(self.get_text("request_failed", error_msg=err)) | |
| finally: | |
| await client.aclose() | |