Spaces:
Sleeping
Sleeping
feat: implement smart intelligence layer with hardened auth, telegram alerts, and dashboard v3
3226916 | """ | |
| Telegram Alert System β Delivers structured real-time alerts to a Telegram chat. | |
| Alert levels: | |
| π¨ CRITICAL β all providers down, kill switch triggered | |
| π΄ HIGH β error rate > 30%, cascading failures | |
| β οΈ WARNING β provider on cooldown, latency spike | |
| π° BUDGET β daily token budget > BUDGET_WARN_PERCENT | |
| β RECOVERY β provider came back online | |
| π INFO β test ping, system events | |
| """ | |
| import logging | |
| import time | |
| import asyncio | |
| from typing import Optional | |
| from app.core.redis_client import redis_client | |
| logger = logging.getLogger(__name__) | |
| # In-memory alert history (last 50 alerts for dashboard feed) | |
| _alert_history: list = [] | |
| _MAX_HISTORY = 50 | |
| def _append_history(entry: dict) -> None: | |
| _alert_history.append(entry) | |
| if len(_alert_history) > _MAX_HISTORY: | |
| _alert_history.pop(0) | |
| def get_alert_history() -> list: | |
| """Returns last N alerts in reverse chronological order.""" | |
| return list(reversed(_alert_history)) | |
| class TelegramAlerter: | |
| """ | |
| Sends formatted Markdown alerts via Telegram Bot API. | |
| Implements per-alert cooldown to prevent notification spam. | |
| Falls back to structured logging if Telegram is not configured. | |
| """ | |
| def __init__(self): | |
| self._last_alert_time: dict = {} # {alert_key: timestamp} | |
| self._cooldown_seconds = 900 # 15 min between same-type alerts | |
| def _can_alert(self, key: str, cooldown: Optional[int] = None) -> bool: | |
| cd = cooldown if cooldown is not None else self._cooldown_seconds | |
| now = time.time() | |
| last = self._last_alert_time.get(key, 0) | |
| if now - last >= cd: | |
| self._last_alert_time[key] = now | |
| return True | |
| return False | |
| async def _dispatch(self, text: str) -> bool: | |
| """ | |
| Sends a message to Telegram. Returns True on success. | |
| Uses httpx for async HTTP β non-blocking. | |
| """ | |
| from app.core.config import settings | |
| if not settings.TELEGRAM_BOT_TOKEN or not settings.TELEGRAM_CHAT_ID: | |
| logger.debug("Telegram not configured β alert logged only") | |
| return False | |
| if not settings.TELEGRAM_ALERT_ENABLED: | |
| logger.debug("Telegram alerts disabled via config") | |
| return False | |
| try: | |
| import httpx | |
| url = f"https://api.telegram.org/bot{settings.TELEGRAM_BOT_TOKEN}/sendMessage" | |
| payload = { | |
| "chat_id": settings.TELEGRAM_CHAT_ID, | |
| "text": text, | |
| "parse_mode": "Markdown", | |
| "disable_web_page_preview": True, | |
| } | |
| async with httpx.AsyncClient(timeout=10.0) as client: | |
| resp = await client.post(url, json=payload) | |
| if resp.status_code == 200: | |
| logger.info("β Telegram alert delivered") | |
| return True | |
| else: | |
| logger.warning(f"Telegram API returned {resp.status_code}: {resp.text[:200]}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Telegram dispatch error: {e}") | |
| return False | |
| async def send_provider_alert( | |
| self, | |
| provider: str, | |
| model: str, | |
| failure_rate: float, | |
| total_calls: int, | |
| ) -> None: | |
| """Called when error rate > 30% for a provider/model pair.""" | |
| key = f"provider:{provider}/{model}" | |
| if not self._can_alert(key): | |
| return | |
| pct = failure_rate * 100 | |
| level = "π¨ *CRITICAL*" if pct >= 60 else "π΄ *HIGH*" | |
| text = ( | |
| f"{level} β Awn AI Provider Alert\n\n" | |
| f"π‘ *Provider:* `{provider}`\n" | |
| f"π€ *Model:* `{model}`\n" | |
| f"β *Error Rate:* `{pct:.1f}%` ({int(total_calls * failure_rate)}/{total_calls} calls)\n" | |
| f"β±οΈ *Time:* `{time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime())}`\n\n" | |
| f"_Router will attempt failover automatically._" | |
| ) | |
| logger.critical(f"[ALERT] {provider}/{model} error rate: {pct:.1f}%") | |
| entry = { | |
| "ts": time.time(), | |
| "level": "critical" if pct >= 60 else "high", | |
| "type": "provider_error", | |
| "title": f"{provider} Error Rate {pct:.0f}%", | |
| "body": f"Model: {model} | {int(total_calls*failure_rate)}/{total_calls} failed", | |
| } | |
| _append_history(entry) | |
| await self._dispatch(text) | |
| async def send_all_providers_down(self) -> None: | |
| """Called when every single provider has failed.""" | |
| if not self._can_alert("all_down", cooldown=300): | |
| return | |
| text = ( | |
| "π¨ *CRITICAL* β Awn AI: ALL Providers Down\n\n" | |
| "β No LLM provider is currently available.\n" | |
| "Users are receiving degradation messages.\n" | |
| f"β±οΈ `{time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime())}`\n\n" | |
| "_Immediate action required!_" | |
| ) | |
| logger.critical("[ALERT] ALL LLM providers are down!") | |
| _append_history({ | |
| "ts": time.time(), | |
| "level": "critical", | |
| "type": "all_down", | |
| "title": "ALL Providers Down", | |
| "body": "No LLM provider is available. Users are degraded.", | |
| }) | |
| await self._dispatch(text) | |
| async def send_budget_alert(self, used: int, budget: int, pct: float) -> None: | |
| """Called when daily token budget exceeds BUDGET_WARN_PERCENT.""" | |
| if not self._can_alert("budget", cooldown=3600): | |
| return | |
| text = ( | |
| f"π° *Budget Warning* β Awn AI\n\n" | |
| f"π *Daily Token Usage:* `{used:,}` / `{budget:,}`\n" | |
| f"π *Used:* `{pct:.1f}%`\n" | |
| f"β±οΈ `{time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime())}`\n\n" | |
| f"_Consider switching to cheaper models or reducing load._" | |
| ) | |
| _append_history({ | |
| "ts": time.time(), | |
| "level": "warning", | |
| "type": "budget", | |
| "title": f"Budget {pct:.0f}% Used", | |
| "body": f"{used:,} / {budget:,} tokens today", | |
| }) | |
| await self._dispatch(text) | |
| async def send_recovery_alert(self, provider: str) -> None: | |
| """Called when a previously down provider responds successfully.""" | |
| if not self._can_alert(f"recovery:{provider}", cooldown=300): | |
| return | |
| text = ( | |
| f"β *Recovery* β Awn AI\n\n" | |
| f"π‘ *Provider:* `{provider}` is back online\n" | |
| f"β±οΈ `{time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime())}`" | |
| ) | |
| _append_history({ | |
| "ts": time.time(), | |
| "level": "ok", | |
| "type": "recovery", | |
| "title": f"{provider} Recovered", | |
| "body": "Provider is back online", | |
| }) | |
| await self._dispatch(text) | |
| async def send_high_latency_alert(self, provider: str, latency_ms: float) -> None: | |
| """Called when P50 latency exceeds 5000ms for a provider.""" | |
| if not self._can_alert(f"latency:{provider}", cooldown=600): | |
| return | |
| text = ( | |
| f"β οΈ *High Latency* β Awn AI\n\n" | |
| f"π‘ *Provider:* `{provider}`\n" | |
| f"β±οΈ *P50 Latency:* `{latency_ms:.0f}ms`\n" | |
| f"π `{time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime())}`" | |
| ) | |
| _append_history({ | |
| "ts": time.time(), | |
| "level": "warning", | |
| "type": "latency", | |
| "title": f"{provider} High Latency", | |
| "body": f"P50: {latency_ms:.0f}ms", | |
| }) | |
| await self._dispatch(text) | |
| async def send_kill_switch_alert(self, active: bool, by_ip: str = "unknown") -> None: | |
| """Called when admin activates or deactivates the global kill switch.""" | |
| action = "π« ACTIVATED" if active else "π’ DEACTIVATED" | |
| text = ( | |
| f"{'π¨' if active else 'β '} *Kill Switch {action}*\n\n" | |
| f"π€ *By IP:* `{by_ip}`\n" | |
| f"β±οΈ `{time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime())}`" | |
| ) | |
| _append_history({ | |
| "ts": time.time(), | |
| "level": "critical" if active else "ok", | |
| "type": "killswitch", | |
| "title": f"Kill Switch {action}", | |
| "body": f"By IP: {by_ip}", | |
| }) | |
| await self._dispatch(text) | |
| async def send_test_ping(self) -> bool: | |
| """Sends a test message to verify Telegram is configured correctly.""" | |
| text = ( | |
| "π *Awn AI β Test Alert*\n\n" | |
| "β Telegram alerts are configured correctly!\n" | |
| f"β±οΈ `{time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime())}`" | |
| ) | |
| _append_history({ | |
| "ts": time.time(), | |
| "level": "info", | |
| "type": "test", | |
| "title": "Test Alert", | |
| "body": "Telegram connection verified", | |
| }) | |
| return await self._dispatch(text) | |
| class AnomalyMonitor: | |
| """ | |
| Evaluates provider telemetry on every error and triggers alerts if thresholds are crossed. | |
| Called automatically from TelemetryRegistry.record_error(). | |
| """ | |
| def __init__(self): | |
| self.last_alert_time: dict = {} | |
| async def evaluate_provider(self, provider: str, model: str) -> None: | |
| """Checks Redis telemetry and fires alerts if anomaly thresholds are crossed.""" | |
| if not redis_client.is_connected or not redis_client.redis: | |
| return | |
| key = f"awn:telemetry:{provider}:{model}" | |
| try: | |
| r = redis_client.redis | |
| metrics = await r.hgetall(key) | |
| if not metrics: | |
| return | |
| successes = int(metrics.get("successes", 0)) | |
| failures = int(metrics.get("failures", 0)) | |
| total = successes + failures | |
| # Only alert after minimum sample size | |
| if total < 10: | |
| return | |
| failure_rate = failures / total | |
| if failure_rate > 0.30: | |
| await telegram_alerter.send_provider_alert(provider, model, failure_rate, total) | |
| except Exception as e: | |
| logger.error(f"AnomalyMonitor evaluation failed: {e}") | |
| # Module-level singletons | |
| telegram_alerter = TelegramAlerter() | |
| anomaly_monitor = AnomalyMonitor() | |