GodSpeed / supabase /anomaly_migration.sql
Ananth Shyam
Implement anomaly detection and forecasting features
451d52a
-- ============================================================
-- Anomaly Detection Migration
-- Safe to run multiple times (all IF NOT EXISTS / ON CONFLICT).
-- Run AFTER rbac_migration.sql.
-- ============================================================
-- ── query_events: raw event persistence (90-day rolling window) ──────────────
CREATE TABLE IF NOT EXISTS query_events (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
event_id text NOT NULL UNIQUE, -- same UUID as Redis event["id"]
team_id text NOT NULL,
session_id text,
success boolean NOT NULL DEFAULT true,
duration_ms integer,
escalated boolean NOT NULL DEFAULT false,
guardrail_score float,
agent_metrics jsonb NOT NULL DEFAULT '{}',
created_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS qe_team_time_idx ON query_events (team_id, created_at DESC);
CREATE INDEX IF NOT EXISTS qe_created_at_idx ON query_events (created_at DESC);
CREATE INDEX IF NOT EXISTS qe_escalated_idx ON query_events (team_id, escalated, created_at DESC);
-- ── query_events_hourly: pre-aggregated hourly buckets ────────────────────────
CREATE TABLE IF NOT EXISTS query_events_hourly (
team_id text NOT NULL,
hour_bucket timestamptz NOT NULL,
query_count integer NOT NULL DEFAULT 0,
escalation_count integer NOT NULL DEFAULT 0,
avg_duration_ms integer,
PRIMARY KEY (team_id, hour_bucket)
);
CREATE INDEX IF NOT EXISTS qeh_team_hour_idx ON query_events_hourly (team_id, hour_bucket DESC);
-- ── anomaly_signals: detected anomaly records ─────────────────────────────────
CREATE TABLE IF NOT EXISTS anomaly_signals (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
team_id text,
signal_type text NOT NULL,
-- query_spike | query_drop | escalation_trend | staleness | dependency_risk
entity_type text,
entity_id text,
severity text NOT NULL DEFAULT 'medium',
-- critical | high | medium | low
score float NOT NULL DEFAULT 0.0,
details jsonb NOT NULL DEFAULT '{}',
resolved boolean NOT NULL DEFAULT false,
resolved_by uuid REFERENCES users(id),
resolved_at timestamptz,
detected_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS as_team_type_idx ON anomaly_signals (team_id, signal_type, detected_at DESC);
CREATE INDEX IF NOT EXISTS as_severity_idx ON anomaly_signals (severity, resolved, detected_at DESC);
CREATE INDEX IF NOT EXISTS as_resolved_idx ON anomaly_signals (resolved, team_id, detected_at DESC);
CREATE INDEX IF NOT EXISTS as_entity_idx ON anomaly_signals (entity_type, entity_id);
-- RLS: all three tables are service-role only β€” no anon/authenticated policies
-- needed because all reads go through the FastAPI backend using the service key.
ALTER TABLE query_events ENABLE ROW LEVEL SECURITY;
ALTER TABLE query_events_hourly ENABLE ROW LEVEL SECURITY;
ALTER TABLE anomaly_signals ENABLE ROW LEVEL SECURITY;
-- ── PostgreSQL function: in-DB hourly aggregation ────────────────────────────
-- Called via Supabase RPC so the worker never ships raw rows just to count them.
CREATE OR REPLACE FUNCTION aggregate_hourly_bucket(p_team_id text, p_hour timestamptz)
RETURNS void LANGUAGE plpgsql AS $$
BEGIN
INSERT INTO query_events_hourly (team_id, hour_bucket, query_count, escalation_count, avg_duration_ms)
SELECT
p_team_id,
p_hour,
count(*)::integer,
count(*) FILTER (WHERE escalated = true)::integer,
avg(duration_ms)::integer
FROM query_events
WHERE team_id = p_team_id
AND date_trunc('hour', created_at AT TIME ZONE 'UTC') = p_hour
ON CONFLICT (team_id, hour_bucket) DO UPDATE
SET query_count = EXCLUDED.query_count,
escalation_count = EXCLUDED.escalation_count,
avg_duration_ms = EXCLUDED.avg_duration_ms;
END;
$$;