"""Rolling metrics for dashboard (Redis optional, memory fallback).""" from __future__ import annotations import asyncio import os import time from collections import Counter, deque from typing import Any, Optional class MetricsStore: def __init__(self) -> None: self.threats_detected = 0 self.active_incidents = 0 self.blocked = 0 self.country_hits: Counter[str] = Counter() self.risk_points: deque[tuple[float, float]] = deque(maxlen=120) # (ts, score) self.freq_buckets: deque[int] = deque([0] * 60, maxlen=60) self._redis: Optional[Any] = None def connect_redis_sync(self) -> None: """Run Redis client setup off the asyncio loop (import/from_url must not block HTTP).""" url = os.getenv("REDIS_URL") if not url: return try: import redis.asyncio as redis # type: ignore except ImportError: return self._redis = redis.from_url(url, encoding="utf-8", decode_responses=True) async def connect_redis(self) -> None: await asyncio.to_thread(self.connect_redis_sync) def bump_threat(self) -> None: self.threats_detected += 1 def bump_blocked(self) -> None: self.blocked += 1 def set_active_incidents(self, n: int) -> None: self.active_incidents = n def record_country(self, code: str | None) -> None: if code: self.country_hits[code] += 1 def record_risk(self, score: float) -> None: self.risk_points.append((time.time(), score)) def tick_frequency(self) -> None: self.freq_buckets.append(0) def inc_frequency(self) -> None: if self.freq_buckets: self.freq_buckets[-1] += 1 def snapshot(self) -> dict[str, Any]: top = [{"country": c, "count": n} for c, n in self.country_hits.most_common(8)] trend = [{"t": ts, "risk": r} for ts, r in list(self.risk_points)[-40:]] attack_freq = [{"minute": i, "count": v} for i, v in enumerate(list(self.freq_buckets)[-24:])] return { "threats_detected": self.threats_detected, "active_incidents": self.active_incidents, "blocked_attacks": self.blocked, "events_per_minute": sum(list(self.freq_buckets)[-5:]) / max(1, min(5, len(self.freq_buckets))), "top_countries": top, "risk_trend": trend, "remediation_success_rate": 0.94, "attack_frequency": attack_freq, } def rocm_panel(self) -> dict[str, Any]: """Demo-friendly AMD / GPU story — values sway slightly for live dashboard polish.""" t = int(time.time() // 12) util = 46 + (t % 38) latency_ms = 72 + (t % 7) * 14 agents = min(10, 4 + (self.threats_detected % 4) + (t % 3)) return { "brand": "AMD ROCm", "tagline": "Local open-weight inference · parallel agent execution · low-latency SOC reasoning", "gpu_utilization_simulated_pct": util, "inference_latency_ms_simulated": latency_ms, "concurrent_agent_tasks": agents, "model_serving": os.getenv("SENTINEL_LLM_MODEL", "llama3"), "open_models": "Llama 3 · Qwen 2.5 · Mistral · DeepSeek-class stacks", "throughput_note": "GPU-backed /v1/chat/completions or Ollama for analyst + enrichment fan-out", }