Spaces:
Sleeping
Sleeping
| """Supabase persistence layer for anomaly detection. | |
| Every public function is wrapped in try/except so callers never crash | |
| if Supabase is unavailable or misconfigured. | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import logging | |
| from datetime import datetime, timedelta | |
| logger = logging.getLogger(__name__) | |
| def _sb(): | |
| """Return the Supabase sync client (service role key, bypasses RLS).""" | |
| from src.auth.db import _client | |
| return _client() | |
| # ── query_events ────────────────────────────────────────────────────────────── | |
| def upsert_query_event(event: dict) -> None: | |
| """Write one query event row, idempotent on event_id.""" | |
| try: | |
| _sb().table("query_events").upsert( | |
| { | |
| "event_id": event["id"], | |
| "team_id": event.get("team_id", "unknown"), | |
| "session_id": event.get("session_id"), | |
| "success": event.get("success", True), | |
| "duration_ms": event.get("duration_ms"), | |
| "escalated": event.get("escalated", False), | |
| "guardrail_score": event.get("guardrail_score"), | |
| "agent_metrics": event.get("agents", {}), | |
| "created_at": event.get("created_at", datetime.utcnow().isoformat()), | |
| }, | |
| on_conflict="event_id", | |
| ignore_duplicates=True, | |
| ).execute() | |
| except Exception: | |
| logger.warning("anomaly_db: upsert_query_event failed", exc_info=True) | |
| async def async_upsert_query_event(event: dict) -> None: | |
| """Async shim — runs the sync upsert in the default executor.""" | |
| loop = asyncio.get_event_loop() | |
| await loop.run_in_executor(None, upsert_query_event, event) | |
| # ── query_events_hourly ─────────────────────────────────────────────────────── | |
| def aggregate_hourly(team_id: str, hour_bucket: str) -> None: | |
| """Recompute one (team_id, hour_bucket) row via the Supabase RPC.""" | |
| try: | |
| _sb().rpc("aggregate_hourly_bucket", { | |
| "p_team_id": team_id, | |
| "p_hour": hour_bucket, | |
| }).execute() | |
| except Exception: | |
| logger.warning( | |
| "anomaly_db: aggregate_hourly failed team=%s hour=%s", | |
| team_id, hour_bucket, exc_info=True, | |
| ) | |
| def get_hourly_counts(team_id: str, days: int = 14) -> list[dict]: | |
| """Return hourly aggregate rows for the last N days for a team.""" | |
| try: | |
| cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat() | |
| result = ( | |
| _sb() | |
| .table("query_events_hourly") | |
| .select("hour_bucket,query_count,escalation_count") | |
| .eq("team_id", team_id) | |
| .gte("hour_bucket", cutoff) | |
| .order("hour_bucket", desc=False) | |
| .execute() | |
| ) | |
| return result.data or [] | |
| except Exception: | |
| logger.warning("anomaly_db: get_hourly_counts failed", exc_info=True) | |
| return [] | |
| def get_all_team_ids() -> list[str]: | |
| """Return distinct team_ids with events in the last 90 days.""" | |
| try: | |
| cutoff = (datetime.utcnow() - timedelta(days=90)).isoformat() | |
| result = ( | |
| _sb() | |
| .table("query_events") | |
| .select("team_id") | |
| .gte("created_at", cutoff) | |
| .execute() | |
| ) | |
| return list({r["team_id"] for r in (result.data or [])}) | |
| except Exception: | |
| logger.warning("anomaly_db: get_all_team_ids failed", exc_info=True) | |
| return [] | |
| # ── anomaly_signals ─────────────────────────────────────────────────────────── | |
| def insert_signal( | |
| signal_type: str, | |
| severity: str, | |
| score: float, | |
| details: dict, | |
| team_id: str | None = None, | |
| entity_type: str | None = None, | |
| entity_id: str | None = None, | |
| ) -> dict | None: | |
| """Insert one anomaly signal with 2-hour dedup suppression. | |
| If an identical unresolved signal already exists for the same | |
| (signal_type, team_id, entity_id) within the last 2 hours, the | |
| insert is skipped and None is returned. | |
| """ | |
| try: | |
| two_hours_ago = (datetime.utcnow() - timedelta(hours=2)).isoformat() | |
| dupe_q = ( | |
| _sb() | |
| .table("anomaly_signals") | |
| .select("id") | |
| .eq("signal_type", signal_type) | |
| .eq("resolved", False) | |
| .gte("detected_at", two_hours_ago) | |
| ) | |
| if team_id: | |
| dupe_q = dupe_q.eq("team_id", team_id) | |
| if entity_id: | |
| dupe_q = dupe_q.eq("entity_id", entity_id) | |
| if dupe_q.execute().data: | |
| return None # suppress duplicate | |
| row: dict = { | |
| "signal_type": signal_type, | |
| "severity": severity, | |
| "score": score, | |
| "details": details, | |
| } | |
| if team_id: row["team_id"] = team_id | |
| if entity_type: row["entity_type"] = entity_type | |
| if entity_id: row["entity_id"] = entity_id | |
| result = _sb().table("anomaly_signals").insert(row).execute() | |
| return result.data[0] if result.data else None | |
| except Exception: | |
| logger.warning("anomaly_db: insert_signal failed type=%s", signal_type, exc_info=True) | |
| return None | |
| def get_signals( | |
| team_id: str | None, | |
| severity: str | None, | |
| signal_type: str | None, | |
| resolved: bool, | |
| limit: int, | |
| ) -> list[dict]: | |
| try: | |
| q = ( | |
| _sb() | |
| .table("anomaly_signals") | |
| .select("*") | |
| .eq("resolved", resolved) | |
| .order("detected_at", desc=True) | |
| .limit(limit) | |
| ) | |
| if team_id: q = q.eq("team_id", team_id) | |
| if severity: q = q.eq("severity", severity) | |
| if signal_type: q = q.eq("signal_type", signal_type) | |
| return q.execute().data or [] | |
| except Exception: | |
| logger.warning("anomaly_db: get_signals failed", exc_info=True) | |
| return [] | |
| def resolve_signal(signal_id: str, resolver_user_id: str) -> bool: | |
| try: | |
| _sb().table("anomaly_signals").update({ | |
| "resolved": True, | |
| "resolved_by": resolver_user_id, | |
| "resolved_at": datetime.utcnow().isoformat(), | |
| }).eq("id", signal_id).eq("resolved", False).execute() | |
| return True | |
| except Exception: | |
| logger.warning("anomaly_db: resolve_signal failed id=%s", signal_id, exc_info=True) | |
| return False | |
| def get_signals_summary() -> dict: | |
| """Return unresolved signal counts grouped by type and severity.""" | |
| try: | |
| result = ( | |
| _sb() | |
| .table("anomaly_signals") | |
| .select("signal_type,severity") | |
| .eq("resolved", False) | |
| .execute() | |
| ) | |
| rows = result.data or [] | |
| by_type: dict[str, int] = {} | |
| by_severity: dict[str, int] = {} | |
| for r in rows: | |
| by_type[r["signal_type"]] = by_type.get(r["signal_type"], 0) + 1 | |
| by_severity[r["severity"]] = by_severity.get(r["severity"], 0) + 1 | |
| return {"total": len(rows), "by_type": by_type, "by_severity": by_severity} | |
| except Exception: | |
| logger.warning("anomaly_db: get_signals_summary failed", exc_info=True) | |
| return {"total": 0, "by_type": {}, "by_severity": {}} | |
| def get_staleness_top(limit: int = 30) -> list[dict]: | |
| try: | |
| result = ( | |
| _sb() | |
| .table("anomaly_signals") | |
| .select("entity_id,score,details,detected_at") | |
| .eq("signal_type", "staleness") | |
| .eq("resolved", False) | |
| .order("score", desc=True) | |
| .limit(limit) | |
| .execute() | |
| ) | |
| return result.data or [] | |
| except Exception: | |
| logger.warning("anomaly_db: get_staleness_top failed", exc_info=True) | |
| return [] | |
| def get_dependency_risk(limit: int = 50) -> list[dict]: | |
| try: | |
| result = ( | |
| _sb() | |
| .table("anomaly_signals") | |
| .select("entity_id,score,details,detected_at") | |
| .eq("signal_type", "dependency_risk") | |
| .eq("resolved", False) | |
| .order("score", desc=True) | |
| .limit(limit) | |
| .execute() | |
| ) | |
| return result.data or [] | |
| except Exception: | |
| logger.warning("anomaly_db: get_dependency_risk failed", exc_info=True) | |
| return [] | |
| def purge_old_events() -> int: | |
| """Delete query_events older than 90 days. Returns approximate deleted count.""" | |
| try: | |
| cutoff = (datetime.utcnow() - timedelta(days=90)).isoformat() | |
| result = _sb().table("query_events").delete().lt("created_at", cutoff).execute() | |
| deleted = len(result.data or []) | |
| if deleted: | |
| logger.info("anomaly_db: purged %d old query_events", deleted) | |
| return deleted | |
| except Exception: | |
| logger.warning("anomaly_db: purge_old_events failed", exc_info=True) | |
| return 0 | |