Spaces:
Sleeping
Sleeping
File size: 4,311 Bytes
451d52a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 | -- ============================================================
-- Anomaly Detection Migration
-- Safe to run multiple times (all IF NOT EXISTS / ON CONFLICT).
-- Run AFTER rbac_migration.sql.
-- ============================================================
-- ββ query_events: raw event persistence (90-day rolling window) ββββββββββββββ
CREATE TABLE IF NOT EXISTS query_events (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
event_id text NOT NULL UNIQUE, -- same UUID as Redis event["id"]
team_id text NOT NULL,
session_id text,
success boolean NOT NULL DEFAULT true,
duration_ms integer,
escalated boolean NOT NULL DEFAULT false,
guardrail_score float,
agent_metrics jsonb NOT NULL DEFAULT '{}',
created_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS qe_team_time_idx ON query_events (team_id, created_at DESC);
CREATE INDEX IF NOT EXISTS qe_created_at_idx ON query_events (created_at DESC);
CREATE INDEX IF NOT EXISTS qe_escalated_idx ON query_events (team_id, escalated, created_at DESC);
-- ββ query_events_hourly: pre-aggregated hourly buckets ββββββββββββββββββββββββ
CREATE TABLE IF NOT EXISTS query_events_hourly (
team_id text NOT NULL,
hour_bucket timestamptz NOT NULL,
query_count integer NOT NULL DEFAULT 0,
escalation_count integer NOT NULL DEFAULT 0,
avg_duration_ms integer,
PRIMARY KEY (team_id, hour_bucket)
);
CREATE INDEX IF NOT EXISTS qeh_team_hour_idx ON query_events_hourly (team_id, hour_bucket DESC);
-- ββ anomaly_signals: detected anomaly records βββββββββββββββββββββββββββββββββ
CREATE TABLE IF NOT EXISTS anomaly_signals (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
team_id text,
signal_type text NOT NULL,
-- query_spike | query_drop | escalation_trend | staleness | dependency_risk
entity_type text,
entity_id text,
severity text NOT NULL DEFAULT 'medium',
-- critical | high | medium | low
score float NOT NULL DEFAULT 0.0,
details jsonb NOT NULL DEFAULT '{}',
resolved boolean NOT NULL DEFAULT false,
resolved_by uuid REFERENCES users(id),
resolved_at timestamptz,
detected_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS as_team_type_idx ON anomaly_signals (team_id, signal_type, detected_at DESC);
CREATE INDEX IF NOT EXISTS as_severity_idx ON anomaly_signals (severity, resolved, detected_at DESC);
CREATE INDEX IF NOT EXISTS as_resolved_idx ON anomaly_signals (resolved, team_id, detected_at DESC);
CREATE INDEX IF NOT EXISTS as_entity_idx ON anomaly_signals (entity_type, entity_id);
-- RLS: all three tables are service-role only β no anon/authenticated policies
-- needed because all reads go through the FastAPI backend using the service key.
ALTER TABLE query_events ENABLE ROW LEVEL SECURITY;
ALTER TABLE query_events_hourly ENABLE ROW LEVEL SECURITY;
ALTER TABLE anomaly_signals ENABLE ROW LEVEL SECURITY;
-- ββ PostgreSQL function: in-DB hourly aggregation ββββββββββββββββββββββββββββ
-- Called via Supabase RPC so the worker never ships raw rows just to count them.
CREATE OR REPLACE FUNCTION aggregate_hourly_bucket(p_team_id text, p_hour timestamptz)
RETURNS void LANGUAGE plpgsql AS $$
BEGIN
INSERT INTO query_events_hourly (team_id, hour_bucket, query_count, escalation_count, avg_duration_ms)
SELECT
p_team_id,
p_hour,
count(*)::integer,
count(*) FILTER (WHERE escalated = true)::integer,
avg(duration_ms)::integer
FROM query_events
WHERE team_id = p_team_id
AND date_trunc('hour', created_at AT TIME ZONE 'UTC') = p_hour
ON CONFLICT (team_id, hour_bucket) DO UPDATE
SET query_count = EXCLUDED.query_count,
escalation_count = EXCLUDED.escalation_count,
avg_duration_ms = EXCLUDED.avg_duration_ms;
END;
$$;
|