SentinelAI / services /metrics_store.py
iitian's picture
Sync SentinelAI project and add Hugging Face Docker Space layout.
8b3905d
"""Rolling metrics for dashboard (Redis optional, memory fallback)."""
from __future__ import annotations
import asyncio
import os
import time
from collections import Counter, deque
from typing import Any, Optional
class MetricsStore:
def __init__(self) -> None:
self.threats_detected = 0
self.active_incidents = 0
self.blocked = 0
self.country_hits: Counter[str] = Counter()
self.risk_points: deque[tuple[float, float]] = deque(maxlen=120) # (ts, score)
self.freq_buckets: deque[int] = deque([0] * 60, maxlen=60)
self._redis: Optional[Any] = None
def connect_redis_sync(self) -> None:
"""Run Redis client setup off the asyncio loop (import/from_url must not block HTTP)."""
url = os.getenv("REDIS_URL")
if not url:
return
try:
import redis.asyncio as redis # type: ignore
except ImportError:
return
self._redis = redis.from_url(url, encoding="utf-8", decode_responses=True)
async def connect_redis(self) -> None:
await asyncio.to_thread(self.connect_redis_sync)
def bump_threat(self) -> None:
self.threats_detected += 1
def bump_blocked(self) -> None:
self.blocked += 1
def set_active_incidents(self, n: int) -> None:
self.active_incidents = n
def record_country(self, code: str | None) -> None:
if code:
self.country_hits[code] += 1
def record_risk(self, score: float) -> None:
self.risk_points.append((time.time(), score))
def tick_frequency(self) -> None:
self.freq_buckets.append(0)
def inc_frequency(self) -> None:
if self.freq_buckets:
self.freq_buckets[-1] += 1
def snapshot(self) -> dict[str, Any]:
top = [{"country": c, "count": n} for c, n in self.country_hits.most_common(8)]
trend = [{"t": ts, "risk": r} for ts, r in list(self.risk_points)[-40:]]
attack_freq = [{"minute": i, "count": v} for i, v in enumerate(list(self.freq_buckets)[-24:])]
return {
"threats_detected": self.threats_detected,
"active_incidents": self.active_incidents,
"blocked_attacks": self.blocked,
"events_per_minute": sum(list(self.freq_buckets)[-5:]) / max(1, min(5, len(self.freq_buckets))),
"top_countries": top,
"risk_trend": trend,
"remediation_success_rate": 0.94,
"attack_frequency": attack_freq,
}
def rocm_panel(self) -> dict[str, Any]:
"""Demo-friendly AMD / GPU story — values sway slightly for live dashboard polish."""
t = int(time.time() // 12)
util = 46 + (t % 38)
latency_ms = 72 + (t % 7) * 14
agents = min(10, 4 + (self.threats_detected % 4) + (t % 3))
return {
"brand": "AMD ROCm",
"tagline": "Local open-weight inference · parallel agent execution · low-latency SOC reasoning",
"gpu_utilization_simulated_pct": util,
"inference_latency_ms_simulated": latency_ms,
"concurrent_agent_tasks": agents,
"model_serving": os.getenv("SENTINEL_LLM_MODEL", "llama3"),
"open_models": "Llama 3 · Qwen 2.5 · Mistral · DeepSeek-class stacks",
"throughput_note": "GPU-backed /v1/chat/completions or Ollama for analyst + enrichment fan-out",
}