Spaces:
Running
Running
| """Rolling metrics for dashboard (Redis optional, memory fallback).""" | |
| from __future__ import annotations | |
| import asyncio | |
| import os | |
| import time | |
| from collections import Counter, deque | |
| from typing import Any, Optional | |
| class MetricsStore: | |
| def __init__(self) -> None: | |
| self.threats_detected = 0 | |
| self.active_incidents = 0 | |
| self.blocked = 0 | |
| self.country_hits: Counter[str] = Counter() | |
| self.risk_points: deque[tuple[float, float]] = deque(maxlen=120) # (ts, score) | |
| self.freq_buckets: deque[int] = deque([0] * 60, maxlen=60) | |
| self._redis: Optional[Any] = None | |
| def connect_redis_sync(self) -> None: | |
| """Run Redis client setup off the asyncio loop (import/from_url must not block HTTP).""" | |
| url = os.getenv("REDIS_URL") | |
| if not url: | |
| return | |
| try: | |
| import redis.asyncio as redis # type: ignore | |
| except ImportError: | |
| return | |
| self._redis = redis.from_url(url, encoding="utf-8", decode_responses=True) | |
| async def connect_redis(self) -> None: | |
| await asyncio.to_thread(self.connect_redis_sync) | |
| def bump_threat(self) -> None: | |
| self.threats_detected += 1 | |
| def bump_blocked(self) -> None: | |
| self.blocked += 1 | |
| def set_active_incidents(self, n: int) -> None: | |
| self.active_incidents = n | |
| def record_country(self, code: str | None) -> None: | |
| if code: | |
| self.country_hits[code] += 1 | |
| def record_risk(self, score: float) -> None: | |
| self.risk_points.append((time.time(), score)) | |
| def tick_frequency(self) -> None: | |
| self.freq_buckets.append(0) | |
| def inc_frequency(self) -> None: | |
| if self.freq_buckets: | |
| self.freq_buckets[-1] += 1 | |
| def snapshot(self) -> dict[str, Any]: | |
| top = [{"country": c, "count": n} for c, n in self.country_hits.most_common(8)] | |
| trend = [{"t": ts, "risk": r} for ts, r in list(self.risk_points)[-40:]] | |
| attack_freq = [{"minute": i, "count": v} for i, v in enumerate(list(self.freq_buckets)[-24:])] | |
| return { | |
| "threats_detected": self.threats_detected, | |
| "active_incidents": self.active_incidents, | |
| "blocked_attacks": self.blocked, | |
| "events_per_minute": sum(list(self.freq_buckets)[-5:]) / max(1, min(5, len(self.freq_buckets))), | |
| "top_countries": top, | |
| "risk_trend": trend, | |
| "remediation_success_rate": 0.94, | |
| "attack_frequency": attack_freq, | |
| } | |
| def rocm_panel(self) -> dict[str, Any]: | |
| """Demo-friendly AMD / GPU story — values sway slightly for live dashboard polish.""" | |
| t = int(time.time() // 12) | |
| util = 46 + (t % 38) | |
| latency_ms = 72 + (t % 7) * 14 | |
| agents = min(10, 4 + (self.threats_detected % 4) + (t % 3)) | |
| return { | |
| "brand": "AMD ROCm", | |
| "tagline": "Local open-weight inference · parallel agent execution · low-latency SOC reasoning", | |
| "gpu_utilization_simulated_pct": util, | |
| "inference_latency_ms_simulated": latency_ms, | |
| "concurrent_agent_tasks": agents, | |
| "model_serving": os.getenv("SENTINEL_LLM_MODEL", "llama3"), | |
| "open_models": "Llama 3 · Qwen 2.5 · Mistral · DeepSeek-class stacks", | |
| "throughput_note": "GPU-backed /v1/chat/completions or Ollama for analyst + enrichment fan-out", | |
| } | |