Aoun-Ai / app /core /alerts.py
MuhammadMahmoud's picture
feat: implement smart intelligence layer with hardened auth, telegram alerts, and dashboard v3
3226916
"""
Telegram Alert System β€” Delivers structured real-time alerts to a Telegram chat.
Alert levels:
🚨 CRITICAL β€” all providers down, kill switch triggered
πŸ”΄ HIGH β€” error rate > 30%, cascading failures
⚠️ WARNING β€” provider on cooldown, latency spike
πŸ’° BUDGET β€” daily token budget > BUDGET_WARN_PERCENT
βœ… RECOVERY β€” provider came back online
πŸ”” INFO β€” test ping, system events
"""
import logging
import time
import asyncio
from typing import Optional
from app.core.redis_client import redis_client
logger = logging.getLogger(__name__)
# In-memory alert history (last 50 alerts for dashboard feed)
_alert_history: list = []
_MAX_HISTORY = 50
def _append_history(entry: dict) -> None:
_alert_history.append(entry)
if len(_alert_history) > _MAX_HISTORY:
_alert_history.pop(0)
def get_alert_history() -> list:
"""Returns last N alerts in reverse chronological order."""
return list(reversed(_alert_history))
class TelegramAlerter:
"""
Sends formatted Markdown alerts via Telegram Bot API.
Implements per-alert cooldown to prevent notification spam.
Falls back to structured logging if Telegram is not configured.
"""
def __init__(self):
self._last_alert_time: dict = {} # {alert_key: timestamp}
self._cooldown_seconds = 900 # 15 min between same-type alerts
def _can_alert(self, key: str, cooldown: Optional[int] = None) -> bool:
cd = cooldown if cooldown is not None else self._cooldown_seconds
now = time.time()
last = self._last_alert_time.get(key, 0)
if now - last >= cd:
self._last_alert_time[key] = now
return True
return False
async def _dispatch(self, text: str) -> bool:
"""
Sends a message to Telegram. Returns True on success.
Uses httpx for async HTTP β€” non-blocking.
"""
from app.core.config import settings
if not settings.TELEGRAM_BOT_TOKEN or not settings.TELEGRAM_CHAT_ID:
logger.debug("Telegram not configured β€” alert logged only")
return False
if not settings.TELEGRAM_ALERT_ENABLED:
logger.debug("Telegram alerts disabled via config")
return False
try:
import httpx
url = f"https://api.telegram.org/bot{settings.TELEGRAM_BOT_TOKEN}/sendMessage"
payload = {
"chat_id": settings.TELEGRAM_CHAT_ID,
"text": text,
"parse_mode": "Markdown",
"disable_web_page_preview": True,
}
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.post(url, json=payload)
if resp.status_code == 200:
logger.info("βœ… Telegram alert delivered")
return True
else:
logger.warning(f"Telegram API returned {resp.status_code}: {resp.text[:200]}")
return False
except Exception as e:
logger.error(f"Telegram dispatch error: {e}")
return False
async def send_provider_alert(
self,
provider: str,
model: str,
failure_rate: float,
total_calls: int,
) -> None:
"""Called when error rate > 30% for a provider/model pair."""
key = f"provider:{provider}/{model}"
if not self._can_alert(key):
return
pct = failure_rate * 100
level = "🚨 *CRITICAL*" if pct >= 60 else "πŸ”΄ *HIGH*"
text = (
f"{level} β€” Awn AI Provider Alert\n\n"
f"πŸ“‘ *Provider:* `{provider}`\n"
f"πŸ€– *Model:* `{model}`\n"
f"❌ *Error Rate:* `{pct:.1f}%` ({int(total_calls * failure_rate)}/{total_calls} calls)\n"
f"⏱️ *Time:* `{time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime())}`\n\n"
f"_Router will attempt failover automatically._"
)
logger.critical(f"[ALERT] {provider}/{model} error rate: {pct:.1f}%")
entry = {
"ts": time.time(),
"level": "critical" if pct >= 60 else "high",
"type": "provider_error",
"title": f"{provider} Error Rate {pct:.0f}%",
"body": f"Model: {model} | {int(total_calls*failure_rate)}/{total_calls} failed",
}
_append_history(entry)
await self._dispatch(text)
async def send_all_providers_down(self) -> None:
"""Called when every single provider has failed."""
if not self._can_alert("all_down", cooldown=300):
return
text = (
"🚨 *CRITICAL* β€” Awn AI: ALL Providers Down\n\n"
"β›” No LLM provider is currently available.\n"
"Users are receiving degradation messages.\n"
f"⏱️ `{time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime())}`\n\n"
"_Immediate action required!_"
)
logger.critical("[ALERT] ALL LLM providers are down!")
_append_history({
"ts": time.time(),
"level": "critical",
"type": "all_down",
"title": "ALL Providers Down",
"body": "No LLM provider is available. Users are degraded.",
})
await self._dispatch(text)
async def send_budget_alert(self, used: int, budget: int, pct: float) -> None:
"""Called when daily token budget exceeds BUDGET_WARN_PERCENT."""
if not self._can_alert("budget", cooldown=3600):
return
text = (
f"πŸ’° *Budget Warning* β€” Awn AI\n\n"
f"πŸ“Š *Daily Token Usage:* `{used:,}` / `{budget:,}`\n"
f"πŸ“ˆ *Used:* `{pct:.1f}%`\n"
f"⏱️ `{time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime())}`\n\n"
f"_Consider switching to cheaper models or reducing load._"
)
_append_history({
"ts": time.time(),
"level": "warning",
"type": "budget",
"title": f"Budget {pct:.0f}% Used",
"body": f"{used:,} / {budget:,} tokens today",
})
await self._dispatch(text)
async def send_recovery_alert(self, provider: str) -> None:
"""Called when a previously down provider responds successfully."""
if not self._can_alert(f"recovery:{provider}", cooldown=300):
return
text = (
f"βœ… *Recovery* β€” Awn AI\n\n"
f"πŸ“‘ *Provider:* `{provider}` is back online\n"
f"⏱️ `{time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime())}`"
)
_append_history({
"ts": time.time(),
"level": "ok",
"type": "recovery",
"title": f"{provider} Recovered",
"body": "Provider is back online",
})
await self._dispatch(text)
async def send_high_latency_alert(self, provider: str, latency_ms: float) -> None:
"""Called when P50 latency exceeds 5000ms for a provider."""
if not self._can_alert(f"latency:{provider}", cooldown=600):
return
text = (
f"⚠️ *High Latency* β€” Awn AI\n\n"
f"πŸ“‘ *Provider:* `{provider}`\n"
f"⏱️ *P50 Latency:* `{latency_ms:.0f}ms`\n"
f"πŸ• `{time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime())}`"
)
_append_history({
"ts": time.time(),
"level": "warning",
"type": "latency",
"title": f"{provider} High Latency",
"body": f"P50: {latency_ms:.0f}ms",
})
await self._dispatch(text)
async def send_kill_switch_alert(self, active: bool, by_ip: str = "unknown") -> None:
"""Called when admin activates or deactivates the global kill switch."""
action = "🚫 ACTIVATED" if active else "🟒 DEACTIVATED"
text = (
f"{'🚨' if active else 'βœ…'} *Kill Switch {action}*\n\n"
f"πŸ‘€ *By IP:* `{by_ip}`\n"
f"⏱️ `{time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime())}`"
)
_append_history({
"ts": time.time(),
"level": "critical" if active else "ok",
"type": "killswitch",
"title": f"Kill Switch {action}",
"body": f"By IP: {by_ip}",
})
await self._dispatch(text)
async def send_test_ping(self) -> bool:
"""Sends a test message to verify Telegram is configured correctly."""
text = (
"πŸ”” *Awn AI β€” Test Alert*\n\n"
"βœ… Telegram alerts are configured correctly!\n"
f"⏱️ `{time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime())}`"
)
_append_history({
"ts": time.time(),
"level": "info",
"type": "test",
"title": "Test Alert",
"body": "Telegram connection verified",
})
return await self._dispatch(text)
class AnomalyMonitor:
"""
Evaluates provider telemetry on every error and triggers alerts if thresholds are crossed.
Called automatically from TelemetryRegistry.record_error().
"""
def __init__(self):
self.last_alert_time: dict = {}
async def evaluate_provider(self, provider: str, model: str) -> None:
"""Checks Redis telemetry and fires alerts if anomaly thresholds are crossed."""
if not redis_client.is_connected or not redis_client.redis:
return
key = f"awn:telemetry:{provider}:{model}"
try:
r = redis_client.redis
metrics = await r.hgetall(key)
if not metrics:
return
successes = int(metrics.get("successes", 0))
failures = int(metrics.get("failures", 0))
total = successes + failures
# Only alert after minimum sample size
if total < 10:
return
failure_rate = failures / total
if failure_rate > 0.30:
await telegram_alerter.send_provider_alert(provider, model, failure_rate, total)
except Exception as e:
logger.error(f"AnomalyMonitor evaluation failed: {e}")
# Module-level singletons
telegram_alerter = TelegramAlerter()
anomaly_monitor = AnomalyMonitor()