GodSpeed / src /anomaly /db.py
Ananth Shyam
Implement anomaly detection and forecasting features
451d52a
"""Supabase persistence layer for anomaly detection.
Every public function is wrapped in try/except so callers never crash
if Supabase is unavailable or misconfigured.
"""
from __future__ import annotations
import asyncio
import logging
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
def _sb():
"""Return the Supabase sync client (service role key, bypasses RLS)."""
from src.auth.db import _client
return _client()
# ── query_events ──────────────────────────────────────────────────────────────
def upsert_query_event(event: dict) -> None:
"""Write one query event row, idempotent on event_id."""
try:
_sb().table("query_events").upsert(
{
"event_id": event["id"],
"team_id": event.get("team_id", "unknown"),
"session_id": event.get("session_id"),
"success": event.get("success", True),
"duration_ms": event.get("duration_ms"),
"escalated": event.get("escalated", False),
"guardrail_score": event.get("guardrail_score"),
"agent_metrics": event.get("agents", {}),
"created_at": event.get("created_at", datetime.utcnow().isoformat()),
},
on_conflict="event_id",
ignore_duplicates=True,
).execute()
except Exception:
logger.warning("anomaly_db: upsert_query_event failed", exc_info=True)
async def async_upsert_query_event(event: dict) -> None:
"""Async shim — runs the sync upsert in the default executor."""
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, upsert_query_event, event)
# ── query_events_hourly ───────────────────────────────────────────────────────
def aggregate_hourly(team_id: str, hour_bucket: str) -> None:
"""Recompute one (team_id, hour_bucket) row via the Supabase RPC."""
try:
_sb().rpc("aggregate_hourly_bucket", {
"p_team_id": team_id,
"p_hour": hour_bucket,
}).execute()
except Exception:
logger.warning(
"anomaly_db: aggregate_hourly failed team=%s hour=%s",
team_id, hour_bucket, exc_info=True,
)
def get_hourly_counts(team_id: str, days: int = 14) -> list[dict]:
"""Return hourly aggregate rows for the last N days for a team."""
try:
cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat()
result = (
_sb()
.table("query_events_hourly")
.select("hour_bucket,query_count,escalation_count")
.eq("team_id", team_id)
.gte("hour_bucket", cutoff)
.order("hour_bucket", desc=False)
.execute()
)
return result.data or []
except Exception:
logger.warning("anomaly_db: get_hourly_counts failed", exc_info=True)
return []
def get_all_team_ids() -> list[str]:
"""Return distinct team_ids with events in the last 90 days."""
try:
cutoff = (datetime.utcnow() - timedelta(days=90)).isoformat()
result = (
_sb()
.table("query_events")
.select("team_id")
.gte("created_at", cutoff)
.execute()
)
return list({r["team_id"] for r in (result.data or [])})
except Exception:
logger.warning("anomaly_db: get_all_team_ids failed", exc_info=True)
return []
# ── anomaly_signals ───────────────────────────────────────────────────────────
def insert_signal(
signal_type: str,
severity: str,
score: float,
details: dict,
team_id: str | None = None,
entity_type: str | None = None,
entity_id: str | None = None,
) -> dict | None:
"""Insert one anomaly signal with 2-hour dedup suppression.
If an identical unresolved signal already exists for the same
(signal_type, team_id, entity_id) within the last 2 hours, the
insert is skipped and None is returned.
"""
try:
two_hours_ago = (datetime.utcnow() - timedelta(hours=2)).isoformat()
dupe_q = (
_sb()
.table("anomaly_signals")
.select("id")
.eq("signal_type", signal_type)
.eq("resolved", False)
.gte("detected_at", two_hours_ago)
)
if team_id:
dupe_q = dupe_q.eq("team_id", team_id)
if entity_id:
dupe_q = dupe_q.eq("entity_id", entity_id)
if dupe_q.execute().data:
return None # suppress duplicate
row: dict = {
"signal_type": signal_type,
"severity": severity,
"score": score,
"details": details,
}
if team_id: row["team_id"] = team_id
if entity_type: row["entity_type"] = entity_type
if entity_id: row["entity_id"] = entity_id
result = _sb().table("anomaly_signals").insert(row).execute()
return result.data[0] if result.data else None
except Exception:
logger.warning("anomaly_db: insert_signal failed type=%s", signal_type, exc_info=True)
return None
def get_signals(
team_id: str | None,
severity: str | None,
signal_type: str | None,
resolved: bool,
limit: int,
) -> list[dict]:
try:
q = (
_sb()
.table("anomaly_signals")
.select("*")
.eq("resolved", resolved)
.order("detected_at", desc=True)
.limit(limit)
)
if team_id: q = q.eq("team_id", team_id)
if severity: q = q.eq("severity", severity)
if signal_type: q = q.eq("signal_type", signal_type)
return q.execute().data or []
except Exception:
logger.warning("anomaly_db: get_signals failed", exc_info=True)
return []
def resolve_signal(signal_id: str, resolver_user_id: str) -> bool:
try:
_sb().table("anomaly_signals").update({
"resolved": True,
"resolved_by": resolver_user_id,
"resolved_at": datetime.utcnow().isoformat(),
}).eq("id", signal_id).eq("resolved", False).execute()
return True
except Exception:
logger.warning("anomaly_db: resolve_signal failed id=%s", signal_id, exc_info=True)
return False
def get_signals_summary() -> dict:
"""Return unresolved signal counts grouped by type and severity."""
try:
result = (
_sb()
.table("anomaly_signals")
.select("signal_type,severity")
.eq("resolved", False)
.execute()
)
rows = result.data or []
by_type: dict[str, int] = {}
by_severity: dict[str, int] = {}
for r in rows:
by_type[r["signal_type"]] = by_type.get(r["signal_type"], 0) + 1
by_severity[r["severity"]] = by_severity.get(r["severity"], 0) + 1
return {"total": len(rows), "by_type": by_type, "by_severity": by_severity}
except Exception:
logger.warning("anomaly_db: get_signals_summary failed", exc_info=True)
return {"total": 0, "by_type": {}, "by_severity": {}}
def get_staleness_top(limit: int = 30) -> list[dict]:
try:
result = (
_sb()
.table("anomaly_signals")
.select("entity_id,score,details,detected_at")
.eq("signal_type", "staleness")
.eq("resolved", False)
.order("score", desc=True)
.limit(limit)
.execute()
)
return result.data or []
except Exception:
logger.warning("anomaly_db: get_staleness_top failed", exc_info=True)
return []
def get_dependency_risk(limit: int = 50) -> list[dict]:
try:
result = (
_sb()
.table("anomaly_signals")
.select("entity_id,score,details,detected_at")
.eq("signal_type", "dependency_risk")
.eq("resolved", False)
.order("score", desc=True)
.limit(limit)
.execute()
)
return result.data or []
except Exception:
logger.warning("anomaly_db: get_dependency_risk failed", exc_info=True)
return []
def purge_old_events() -> int:
"""Delete query_events older than 90 days. Returns approximate deleted count."""
try:
cutoff = (datetime.utcnow() - timedelta(days=90)).isoformat()
result = _sb().table("query_events").delete().lt("created_at", cutoff).execute()
deleted = len(result.data or [])
if deleted:
logger.info("anomaly_db: purged %d old query_events", deleted)
return deleted
except Exception:
logger.warning("anomaly_db: purge_old_events failed", exc_info=True)
return 0