Ananth Shyam commited on
Commit
451d52a
Β·
1 Parent(s): b391fb0

Implement anomaly detection and forecasting features

Browse files

- Added new documentation for Area 4: Anomaly Detection & Forecasting, detailing its purpose, implementation, and design principles.
- Created the anomaly detection module with the following components:
- `src/anomaly/__init__.py`: Initialization file for the anomaly module.
- `src/anomaly/db.py`: Supabase persistence layer for anomaly signals and query events.
- `src/anomaly/notifier.py`: In-process WebSocket broadcast bridge for real-time notifications of critical/high anomaly signals.
- `src/anomaly/router.py`: FastAPI router for the anomaly detection API endpoints.
- `src/anomaly/tasks.py`: Celery tasks for running anomaly detection algorithms, including Z-score detection, staleness scoring, and dependency risk modeling.
- Developed SQL migration script (`supabase/anomaly_migration.sql`) to create necessary tables for storing query events and anomaly signals.

.claude/settings.json CHANGED
@@ -4,7 +4,27 @@
4
  "Bash(Get-ChildItem -Path \"d:\\\\Godspeed\" -Recurse -Include \"*.py\")",
5
  "Bash(Select-Object -First 20)",
6
  "Bash(dir /s /b d:\\\\Godspeed)",
7
- "Bash(xargs wc -l)"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8
  ]
9
  }
10
  }
 
4
  "Bash(Get-ChildItem -Path \"d:\\\\Godspeed\" -Recurse -Include \"*.py\")",
5
  "Bash(Select-Object -First 20)",
6
  "Bash(dir /s /b d:\\\\Godspeed)",
7
+ "Bash(xargs wc -l)",
8
+ "Bash(Get-ChildItem -Path \"d:\\\\Godspeed\" -Directory)",
9
+ "Bash(Select-Object Name)",
10
+ "Bash(xargs grep -l \"anomaly\\\\|forecast\\\\|spike\\\\|trend\")",
11
+ "Bash(xargs grep -l \"validation\\\\|critic\\\\|generator\")",
12
+ "Bash(grep -E \"\\\\.py$\")",
13
+ "Bash(xargs grep -l \"BM25\\\\|qdrant\\\\|dense\\\\|sparse\\\\|retrieval\")",
14
+ "Bash(Get-ChildItem -Path \"d:\\\\Godspeed\" -Force)",
15
+ "Bash(Select-Object Name, FullName)",
16
+ "Bash(Get-ChildItem -Path \"d:\\\\Godspeed\" -Recurse -Directory)",
17
+ "Bash(Select-Object -ExpandProperty FullName)",
18
+ "Bash(xargs grep -l \"FastAPI\\\\|APIRouter\")",
19
+ "Bash(xargs grep -l \"redis\\\\|Redis\")",
20
+ "Bash(xargs grep -l \"apiFetch\\\\|http\")",
21
+ "PowerShell(Get-ChildItem -Path \"d:\\\\Godspeed\" -Force | Select-Object Name, @{Name=\"Type\"; Expression={if\\($_.PSIsContainer\\) {\"Dir\"} else {\"File\"}}} | Format-Table -AutoSize)",
22
+ "PowerShell(Get-ChildItem -Path \"d:\\\\Godspeed\\\\agent\" -Recurse | Select-Object FullName | ForEach-Object { $_.FullName.Replace\\(\"d:\\\\Godspeed\\\\\", \"\"\\) })",
23
+ "PowerShell(Get-ChildItem -Path \"d:\\\\Godspeed\\\\src\" -Recurse | Select-Object FullName | ForEach-Object { $_.FullName.Replace\\(\"d:\\\\Godspeed\\\\\", \"\"\\) })",
24
+ "PowerShell(Get-ChildItem -Path \"d:\\\\Godspeed\\\\frontend\" -Recurse -Depth 2 | Where-Object { -not $_.PSIsContainer } | Select-Object FullName | ForEach-Object { $_.FullName.Replace\\(\"d:\\\\Godspeed\\\\\", \"\"\\) } | Sort-Object | Select-Object -First 40)",
25
+ "PowerShell(Get-ChildItem -Path \"d:\\\\Godspeed\\\\ingestion\" -Recurse | Where-Object { -not $_.PSIsContainer } | Select-Object FullName | ForEach-Object { $_.FullName.Replace\\(\"d:\\\\Godspeed\\\\\", \"\"\\) })",
26
+ "PowerShell(Get-ChildItem -Path \"d:\\\\Godspeed\\\\graph_store\" -Recurse | Where-Object { -not $_.PSIsContainer } | Select-Object FullName | ForEach-Object { $_.FullName.Replace\\(\"d:\\\\Godspeed\\\\\", \"\"\\) })",
27
+ "Bash(git checkout *)"
28
  ]
29
  }
30
  }
Docs/03_analytics_and_intelligence.md CHANGED
@@ -512,9 +512,10 @@ ANALYTICS_PERMISSIONS = {
512
 
513
  ---
514
 
515
- ## 9. Area 4 β€” Anomaly Detection & Forecasting (Planned)
516
 
517
- > **Status: Planned Extension.** All input streams already exist in Areas 1–3. No new data pipelines needed. Requires: time-series aggregation layer (InfluxDB or PostgreSQL time-series extension).
 
518
 
519
  ### Component 1 β€” Query Spike Detector
520
 
 
512
 
513
  ---
514
 
515
+ ## 9. Area 4 β€” Anomaly Detection & Forecasting
516
 
517
+ > **Status: Implemented** on branch `anomaly-and-forecasting`. See [`anomaly-and-forecasting/`](./anomaly-and-forecasting/README.md) for full implementation docs.
518
+ > The specs below describe the original design intent. The actual implementation uses plain PostgreSQL (no InfluxDB) and stdlib-only algorithms (no external ML dependencies).
519
 
520
  ### Component 1 β€” Query Spike Detector
521
 
Docs/README.md CHANGED
@@ -13,17 +13,18 @@
13
  | [`03_analytics_and_intelligence.md`](./03_analytics_and_intelligence.md) | Query classification, interaction log schema, retrieval feedback loop, NL analytics, health dashboard, proactive agent, silo detector, Areas 4 & 5 planned specs | Building analytics, dashboards, or Area 3 agents |
14
  | [`04_integrations_and_tech_stack.md`](./04_integrations_and_tech_stack.md) | Notion, Confluence, GitHub integration specs with full code; RBAC enforcement; change detection; tech stack; local dev setup; env vars | Building any integration or setting up development environment |
15
  | [`05_market_strategy_and_gtm.md`](./05_market_strategy_and_gtm.md) | Target customer, tool strategy, competitor analysis, USPs, country-by-country market analysis, GTM sequencing | Product, positioning, or expansion decisions |
 
16
 
17
  ---
18
 
19
  ## The Five Focus Areas at a Glance
20
 
21
  ```
22
- Area 1 β€” Hybrid RAG System [Core β€” In Scope]
23
- Area 2 β€” Data Pipelines & Validation [Core β€” In Scope]
24
- Area 3 β€” Analytics & NL Intelligence [Core β€” In Scope]
25
- Area 4 β€” Anomaly & Forecasting [Planned Extension]
26
- Area 5 β€” Knowledge Graph [Planned Extension]
27
  ```
28
 
29
  All five areas are one system β€” not five products. See `01_problem_and_architecture.md` for the interaction map.
 
13
  | [`03_analytics_and_intelligence.md`](./03_analytics_and_intelligence.md) | Query classification, interaction log schema, retrieval feedback loop, NL analytics, health dashboard, proactive agent, silo detector, Areas 4 & 5 planned specs | Building analytics, dashboards, or Area 3 agents |
14
  | [`04_integrations_and_tech_stack.md`](./04_integrations_and_tech_stack.md) | Notion, Confluence, GitHub integration specs with full code; RBAC enforcement; change detection; tech stack; local dev setup; env vars | Building any integration or setting up development environment |
15
  | [`05_market_strategy_and_gtm.md`](./05_market_strategy_and_gtm.md) | Target customer, tool strategy, competitor analysis, USPs, country-by-country market analysis, GTM sequencing | Product, positioning, or expansion decisions |
16
+ | [`anomaly-and-forecasting/`](./anomaly-and-forecasting/README.md) | **Area 4 implementation docs** β€” data layer, detection algorithms, API reference, Celery scheduling | Building or extending anomaly detection, forecasting, or the Anomalies frontend tab |
17
 
18
  ---
19
 
20
  ## The Five Focus Areas at a Glance
21
 
22
  ```
23
+ Area 1 β€” Hybrid RAG System [Core β€” Implemented]
24
+ Area 2 β€” Data Pipelines & Validation [Core β€” Implemented]
25
+ Area 3 β€” Analytics & NL Intelligence [Core β€” Implemented]
26
+ Area 4 β€” Anomaly & Forecasting [Implemented β€” branch: anomaly-and-forecasting]
27
+ Area 5 β€” Knowledge Graph [Implemented]
28
  ```
29
 
30
  All five areas are one system β€” not five products. See `01_problem_and_architecture.md` for the interaction map.
Docs/anomaly-and-forecasting/01_data_layer.md ADDED
@@ -0,0 +1,257 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # 01 Β· Data Layer β€” Time-Series Tables & Migration
2
+
3
+ ---
4
+
5
+ ## Why a New Data Layer?
6
+
7
+ Redis (`gs:queries`) caps history at 1,000 events. That is enough for a live dashboard but useless for anomaly detection, which requires:
8
+
9
+ - Hourly counts over a 14-day sliding window (β‰ˆ 336 rows per team)
10
+ - Per-day escalation rates over a 14-day comparison window
11
+ - Document age relative to query volume over 30 days
12
+ - Persistent anomaly signal records with resolution workflow
13
+
14
+ Three PostgreSQL tables are added in Supabase. No TimescaleDB extension is required β€” plain indexed `TIMESTAMPTZ` columns with a pre-aggregation function serve the access patterns needed.
15
+
16
+ ---
17
+
18
+ ## Migration File
19
+
20
+ **Path:** `supabase/anomaly_migration.sql`
21
+ **Run after:** `rbac_migration.sql`
22
+ **Idempotent:** Yes β€” all statements use `IF NOT EXISTS` / `ON CONFLICT DO UPDATE`
23
+
24
+ Apply via the Supabase dashboard SQL editor or:
25
+
26
+ ```bash
27
+ supabase db push
28
+ # or
29
+ psql $DATABASE_URL -f supabase/anomaly_migration.sql
30
+ ```
31
+
32
+ ---
33
+
34
+ ## Table 1: `query_events`
35
+
36
+ Persistent copy of every query event. Redis keeps the last 1,000; this table keeps the last 90 days (purged nightly by the staleness task).
37
+
38
+ ```sql
39
+ CREATE TABLE IF NOT EXISTS query_events (
40
+ id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
41
+ event_id text NOT NULL UNIQUE, -- same UUID as Redis event["id"]
42
+ team_id text NOT NULL,
43
+ session_id text,
44
+ success boolean NOT NULL DEFAULT true,
45
+ duration_ms integer,
46
+ escalated boolean NOT NULL DEFAULT false,
47
+ guardrail_score float,
48
+ agent_metrics jsonb NOT NULL DEFAULT '{}',
49
+ created_at timestamptz NOT NULL DEFAULT now()
50
+ );
51
+ ```
52
+
53
+ | Column | Source | Notes |
54
+ |---|---|---|
55
+ | `event_id` | `agent/api.py` event `"id"` field | Used for idempotent upsert β€” Redis replays are safe |
56
+ | `team_id` | Server-enforced from session | Never trusted from client |
57
+ | `escalated` | `guardrail_score < 0.5` flag | Primary input for escalation trend detection |
58
+ | `agent_metrics` | `event["agents"]` dict | `{"doc_search": {"chunk_count": 5, "confidence": "high"}}` |
59
+
60
+ **Indexes:**
61
+ - `(team_id, created_at DESC)` β€” primary scan for per-team time ranges
62
+ - `(created_at DESC)` β€” global purge query
63
+ - `(team_id, escalated, created_at DESC)` β€” escalation rate aggregation
64
+
65
+ **Write path:** `agent/api.py` β†’ `asyncio.ensure_future(async_upsert_query_event(event))` β€” fire-and-forget, never blocks SSE stream.
66
+
67
+ ---
68
+
69
+ ## Table 2: `query_events_hourly`
70
+
71
+ Pre-aggregated hourly buckets. The Z-score detection task reads this table, not `query_events` directly, to avoid shipping thousands of raw rows to the Celery worker.
72
+
73
+ ```sql
74
+ CREATE TABLE IF NOT EXISTS query_events_hourly (
75
+ team_id text NOT NULL,
76
+ hour_bucket timestamptz NOT NULL, -- truncated to the hour, UTC
77
+ query_count integer NOT NULL DEFAULT 0,
78
+ escalation_count integer NOT NULL DEFAULT 0,
79
+ avg_duration_ms integer,
80
+ PRIMARY KEY (team_id, hour_bucket)
81
+ );
82
+ ```
83
+
84
+ **Index:** `(team_id, hour_bucket DESC)` β€” range scan for last N days.
85
+
86
+ **Write path:** The `aggregate_hourly_bucket` PostgreSQL function (see below) is called via Supabase RPC. The function does the aggregation inside the database β€” no rows are shipped to Python.
87
+
88
+ ---
89
+
90
+ ## Table 3: `anomaly_signals`
91
+
92
+ Every detected anomaly is a row here. The frontend Anomalies tab reads from this table via the API.
93
+
94
+ ```sql
95
+ CREATE TABLE IF NOT EXISTS anomaly_signals (
96
+ id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
97
+ team_id text,
98
+ signal_type text NOT NULL,
99
+ entity_type text,
100
+ entity_id text,
101
+ severity text NOT NULL DEFAULT 'medium',
102
+ score float NOT NULL DEFAULT 0.0,
103
+ details jsonb NOT NULL DEFAULT '{}',
104
+ resolved boolean NOT NULL DEFAULT false,
105
+ resolved_by uuid REFERENCES users(id),
106
+ resolved_at timestamptz,
107
+ detected_at timestamptz NOT NULL DEFAULT now()
108
+ );
109
+ ```
110
+
111
+ ### `signal_type` Values
112
+
113
+ | Value | Produced By | `entity_type` | `score` Meaning |
114
+ |---|---|---|---|
115
+ | `query_spike` | Z-score task | `Team` | Z-score value (e.g. 4.2) |
116
+ | `query_drop` | Z-score task | `Team` | Z-score value (negative, e.g. βˆ’2.8) |
117
+ | `escalation_trend` | Z-score task | `Team` | Rate ratio (e.g. 1.8 = 80% worse) |
118
+ | `staleness` | Staleness task | `Document` | Staleness risk 0.0–1.0 |
119
+ | `dependency_risk` | Dep risk task | `Library` | Composite risk score 0.0–1.0 |
120
+
121
+ ### `severity` Values
122
+
123
+ | Value | Trigger |
124
+ |---|---|
125
+ | `critical` | query_spike: \|Z\| β‰₯ 5.0 Β· staleness: β‰₯ 0.8 Β· dep_risk: β‰₯ 0.7 |
126
+ | `high` | query_spike: \|Z\| β‰₯ 4.0 Β· staleness: β‰₯ 0.6 Β· dep_risk: β‰₯ 0.5 |
127
+ | `medium` | query_spike: \|Z\| β‰₯ 3.5 Β· staleness: β‰₯ 0.3 Β· dep_risk: β‰₯ 0.3 |
128
+ | `low` | Everything else above detection threshold |
129
+
130
+ ### `details` JSONB Shapes
131
+
132
+ **query_spike / query_drop:**
133
+ ```json
134
+ {
135
+ "z_score": 4.2,
136
+ "current_count": 87,
137
+ "baseline_mean": 23.4,
138
+ "baseline_stdev": 15.1,
139
+ "hour_bucket": "2026-05-17T14:00:00",
140
+ "window_hours": 335
141
+ }
142
+ ```
143
+
144
+ **escalation_trend:**
145
+ ```json
146
+ {
147
+ "ratio": 1.92,
148
+ "current_rate": 0.192,
149
+ "prior_rate": 0.1,
150
+ "current_total_queries": 52,
151
+ "prior_total_queries": 60
152
+ }
153
+ ```
154
+
155
+ **staleness:**
156
+ ```json
157
+ {
158
+ "title": "Kubernetes Ingress Setup Guide",
159
+ "age_days": 240,
160
+ "age_factor": 0.9306,
161
+ "query_pressure": 0.72,
162
+ "updated_at": "2025-09-20T10:00:00"
163
+ }
164
+ ```
165
+
166
+ **dependency_risk:**
167
+ ```json
168
+ {
169
+ "library_name": "fastapi",
170
+ "current_version": "0.95.0",
171
+ "latest_version": "0.115.0",
172
+ "version_lag": 1.0,
173
+ "downstream_count": 8,
174
+ "downstream_normalized": 0.62,
175
+ "incident_count": 2,
176
+ "incident_rate": 0.0055,
177
+ "poisson_30d": 0.153
178
+ }
179
+ ```
180
+
181
+ **Indexes:**
182
+ - `(team_id, signal_type, detected_at DESC)` β€” filtered list queries
183
+ - `(severity, resolved, detected_at DESC)` β€” severity-sorted alert feed
184
+ - `(resolved, team_id, detected_at DESC)` β€” active signals per team
185
+ - `(entity_type, entity_id)` β€” entity-specific lookups
186
+
187
+ **Deduplication:** `db.insert_signal()` checks for an identical unresolved signal for the same `(signal_type, team_id, entity_id)` in the last 2 hours before inserting, preventing the 15-minute task from creating redundant rows.
188
+
189
+ ---
190
+
191
+ ## PostgreSQL Aggregation Function
192
+
193
+ `aggregate_hourly_bucket` runs inside the database, keeping the aggregation logic server-side.
194
+
195
+ ```sql
196
+ CREATE OR REPLACE FUNCTION aggregate_hourly_bucket(p_team_id text, p_hour timestamptz)
197
+ RETURNS void LANGUAGE plpgsql AS $$
198
+ BEGIN
199
+ INSERT INTO query_events_hourly
200
+ (team_id, hour_bucket, query_count, escalation_count, avg_duration_ms)
201
+ SELECT p_team_id, p_hour,
202
+ count(*)::integer,
203
+ count(*) FILTER (WHERE escalated = true)::integer,
204
+ avg(duration_ms)::integer
205
+ FROM query_events
206
+ WHERE team_id = p_team_id
207
+ AND date_trunc('hour', created_at AT TIME ZONE 'UTC') = p_hour
208
+ ON CONFLICT (team_id, hour_bucket) DO UPDATE
209
+ SET query_count = EXCLUDED.query_count,
210
+ escalation_count = EXCLUDED.escalation_count,
211
+ avg_duration_ms = EXCLUDED.avg_duration_ms;
212
+ END;
213
+ $$;
214
+ ```
215
+
216
+ **Called via Supabase RPC:**
217
+ ```python
218
+ _sb().rpc("aggregate_hourly_bucket", {
219
+ "p_team_id": team_id,
220
+ "p_hour": hour_bucket,
221
+ }).execute()
222
+ ```
223
+
224
+ ---
225
+
226
+ ## Data Persistence Layer: `src/anomaly/db.py`
227
+
228
+ All Supabase reads and writes go through this module. Every function is wrapped in `try/except` β€” callers never crash if Supabase is unavailable.
229
+
230
+ ### Key Functions
231
+
232
+ | Function | Direction | Called By |
233
+ |---|---|---|
234
+ | `upsert_query_event(event)` | Write | `agent/api.py` (sync, via executor) |
235
+ | `async_upsert_query_event(event)` | Write | `agent/api.py` (async shim) |
236
+ | `aggregate_hourly(team_id, hour)` | Write | Future: aggregation task |
237
+ | `get_hourly_counts(team_id, days)` | Read | `tasks.run_zscore_anomaly_detection` |
238
+ | `get_all_team_ids()` | Read | `tasks.run_zscore_anomaly_detection` |
239
+ | `insert_signal(...)` | Write | All three detection algorithms |
240
+ | `get_signals(team_id, ...)` | Read | `router.list_signals` |
241
+ | `resolve_signal(id, user_id)` | Write | `router.resolve_signal` |
242
+ | `get_signals_summary()` | Read | `router.signals_summary` |
243
+ | `get_staleness_top(limit)` | Read | `router.staleness_list` |
244
+ | `get_dependency_risk(limit)` | Read | `router.dependency_risk` |
245
+ | `purge_old_events()` | Write | `tasks.run_staleness_scoring` (nightly) |
246
+
247
+ ---
248
+
249
+ ## 90-Day Event Purge
250
+
251
+ `purge_old_events()` runs at the end of every `compute_staleness_scores` Celery task (daily 03:00 UTC):
252
+
253
+ ```python
254
+ _sb().table("query_events").delete().lt("created_at", cutoff).execute()
255
+ ```
256
+
257
+ This keeps the `query_events` table bounded. At 1 query/minute per team, 90 days = ~129,600 rows per team β€” manageable for PostgreSQL without partitioning.
Docs/anomaly-and-forecasting/02_detection_algorithms.md ADDED
@@ -0,0 +1,328 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # 02 Β· Detection Algorithms
2
+
3
+ All four algorithms live in `src/anomaly/tasks.py`. No external ML libraries are used β€” only Python stdlib (`statistics`, `math`).
4
+
5
+ ---
6
+
7
+ ## Algorithm 1: Z-Score Query Spike / Drop Detection
8
+
9
+ **Celery task:** `poll_metrics_anomalies` β€” every 15 minutes (queue: `polling`)
10
+ **Function:** `run_zscore_anomaly_detection()`
11
+ **Signal types produced:** `query_spike`, `query_drop`
12
+
13
+ ### How It Works
14
+
15
+ For every team with at least 24 hours of history in `query_events_hourly`:
16
+
17
+ 1. Fetch hourly query counts for the last 14 days (`get_hourly_counts(team_id, days=14)`)
18
+ 2. Exclude the current (partial) hour from the baseline β€” it is still accumulating
19
+ 3. Compute baseline mean (ΞΌ) and standard deviation (Οƒ) using `statistics.mean` / `statistics.stdev`
20
+ 4. Compute Z-score for the most recent complete hour:
21
+
22
+ ```
23
+ Z = (current_count βˆ’ ΞΌ) / Οƒ
24
+ ```
25
+
26
+ 5. Flag as **spike** if `Z > 3.0`, **drop** if `Z < βˆ’2.0`
27
+
28
+ ### Why Z > 3.0?
29
+
30
+ A Z-score above 3.0 means the observation is more than 3 standard deviations above the mean. Under a normal distribution, this occurs by chance less than 0.3% of the time β€” so when it happens, it is almost always a real event: an incident, a deployment failure, or a sudden surge of new users.
31
+
32
+ The drop threshold is softer (`βˆ’2.0`) because a quiet hour is less urgent than a spike but still worth flagging β€” it may indicate the integration broke silently.
33
+
34
+ ### Severity Mapping
35
+
36
+ | Z-score magnitude | Severity |
37
+ |---|---|
38
+ | \|Z\| β‰₯ 5.0 | `critical` |
39
+ | \|Z\| β‰₯ 4.0 | `high` |
40
+ | \|Z\| β‰₯ 3.5 | `medium` |
41
+ | \|Z\| β‰₯ 3.0 | `low` |
42
+
43
+ ### Edge Cases Handled
44
+
45
+ | Condition | Handling |
46
+ |---|---|
47
+ | Fewer than 24 hourly rows | Skip team β€” insufficient baseline |
48
+ | `stdev == 0` (constant rate) | Skip team β€” Z-score undefined |
49
+ | `baseline < 2 rows` | Skip team β€” `statistics.stdev` requires β‰₯ 2 values |
50
+ | Duplicate signal within 2 hours | `insert_signal()` dedup suppresses it |
51
+
52
+ ### Example
53
+
54
+ A team normally sends ~25 queries/hour (ΞΌ = 25, Οƒ = 8). At 14:00, they send 72 queries:
55
+
56
+ ```
57
+ Z = (72 βˆ’ 25) / 8 = 5.875 β†’ severity: critical
58
+ ```
59
+
60
+ The signal is inserted with `score = 5.875` and `details.hour_bucket = "2026-05-17T14:00:00"`.
61
+
62
+ ---
63
+
64
+ ## Algorithm 2: Escalation Rate Trend Detection
65
+
66
+ **Runs alongside:** Z-score task (same `poll_metrics_anomalies` Celery beat)
67
+ **Function:** `_check_escalation_trend(team_id, rows, now)`
68
+ **Signal type produced:** `escalation_trend`
69
+
70
+ ### How It Works
71
+
72
+ Using the same hourly rows fetched for Z-score:
73
+
74
+ 1. Split rows into two 7-day windows:
75
+ - **Current window:** `now βˆ’ 7d` to `now`
76
+ - **Prior window:** `now βˆ’ 14d` to `now βˆ’ 7d`
77
+
78
+ 2. For each window, sum `query_count` and `escalation_count`
79
+
80
+ 3. Compute escalation rate per window:
81
+
82
+ ```
83
+ escalation_rate = escalation_count / query_count
84
+ ```
85
+
86
+ 4. Compute ratio:
87
+
88
+ ```
89
+ ratio = current_rate / prior_rate
90
+ ```
91
+
92
+ 5. If `ratio > 1.5` and `current_queries β‰₯ 10` (noise threshold), insert signal
93
+
94
+ ### Why 10-Query Minimum?
95
+
96
+ A team with 2 total queries and 1 escalation has a 50% escalation rate. That is meaningless β€” one bad query doubles the rate. The 10-query floor ensures the rate is statistically grounded.
97
+
98
+ ### Severity
99
+
100
+ | Ratio | Severity |
101
+ |---|---|
102
+ | ratio > 2.5 | `high` |
103
+ | ratio > 1.5 | `medium` |
104
+
105
+ ### Example
106
+
107
+ - Prior 7 days: 60 queries, 6 escalations β†’ rate = 10%
108
+ - Current 7 days: 52 queries, 10 escalations β†’ rate = 19.2%
109
+ - Ratio: 1.92 β†’ `medium` severity
110
+
111
+ The team's escalation rate is 92% worse than the prior week.
112
+
113
+ ---
114
+
115
+ ## Algorithm 3: Document Staleness Scoring
116
+
117
+ **Celery task:** `compute_staleness_scores` β€” daily at 03:00 UTC (queue: `low`)
118
+ **Function:** `run_staleness_scoring()`
119
+ **Signal type produced:** `staleness`
120
+
121
+ ### The Core Formula
122
+
123
+ ```
124
+ staleness_risk = age_factor Γ— query_pressure
125
+ ```
126
+
127
+ Both factors are independent, each in [0.0, 1.0]. Their product gives a score in [0.0, 1.0].
128
+
129
+ ### Age Factor
130
+
131
+ Exponential decay with a 90-day half-life (actually ~62 days to 50%, ~90 days to 63%):
132
+
133
+ ```
134
+ age_factor = 1 βˆ’ exp(βˆ’age_days / 90)
135
+ ```
136
+
137
+ | Document age | age_factor |
138
+ |---|---|
139
+ | 0 days (just updated) | 0.000 |
140
+ | 30 days | 0.283 |
141
+ | 60 days | 0.487 |
142
+ | 90 days | 0.632 |
143
+ | 180 days | 0.865 |
144
+ | 365 days | 0.983 |
145
+ | 2 years | 0.9998 |
146
+
147
+ The exponential shape reflects reality: most knowledge stales rapidly in the first 3 months, then slows. A 3-year-old document and a 2-year-old document are both "very stale."
148
+
149
+ ### Query Pressure
150
+
151
+ How heavily queried is this document's team in the last 30 days, relative to all teams?
152
+
153
+ ```
154
+ query_pressure = min(1.0, monthly_team_query_count / p95_team_query_count)
155
+ ```
156
+
157
+ The p95 value is the 95th percentile of monthly query counts across all teams. This means only the top 5% most-active teams get `query_pressure = 1.0`. A moderately active team might get 0.4–0.6.
158
+
159
+ **Why team-level, not document-level?**
160
+ The current `query_events` schema stores per-query agent metrics (chunk counts) but does not track *which document* each chunk came from. Team-level pressure is a conservative proxy β€” it correctly identifies high-risk documents in busy teams, and under-weights documents in quiet teams (which is the safe direction: a stale doc nobody queries is low priority).
161
+
162
+ ### Severity
163
+
164
+ | `staleness_risk` | Severity |
165
+ |---|---|
166
+ | β‰₯ 0.8 | `critical` |
167
+ | β‰₯ 0.6 | `high` |
168
+ | β‰₯ 0.3 | `medium` |
169
+ | < 0.3 (but β‰₯ 0.1) | `low` |
170
+ | < 0.1 | Not inserted (below noise floor) |
171
+
172
+ ### Example
173
+
174
+ A Kubernetes runbook last updated 8 months ago (240 days), in a team that sends 180 queries/month when p95 is 250:
175
+
176
+ ```
177
+ age_factor = 1 βˆ’ exp(βˆ’240 / 90) = 0.931
178
+ query_pressure = min(1.0, 180 / 250) = 0.72
179
+ staleness_risk = 0.931 Γ— 0.72 = 0.670 β†’ high
180
+ ```
181
+
182
+ The same runbook in a team that sends 5 queries/month:
183
+
184
+ ```
185
+ query_pressure = min(1.0, 5 / 250) = 0.02
186
+ staleness_risk = 0.931 Γ— 0.02 = 0.019 β†’ below threshold, not inserted
187
+ ```
188
+
189
+ ### Cleanup
190
+
191
+ At the end of every staleness run, `purge_old_events()` deletes `query_events` rows older than 90 days, keeping the table bounded.
192
+
193
+ ---
194
+
195
+ ## Algorithm 4: Dependency Risk Modelling
196
+
197
+ **Celery task:** `compute_dependency_risk` β€” daily at 03:30 UTC (queue: `low`)
198
+ **Function:** `run_dependency_risk_modeling()`
199
+ **Signal type produced:** `dependency_risk`
200
+
201
+ ### The Risk Formula
202
+
203
+ Three independent sub-scores, weighted and summed:
204
+
205
+ ```
206
+ risk = 0.40 Γ— version_lag
207
+ + 0.35 Γ— downstream_normalized
208
+ + 0.25 Γ— incident_rate
209
+ ```
210
+
211
+ All three sub-scores are normalised to [0.0, 1.0]. The final risk score is also [0.0, 1.0].
212
+
213
+ ### Sub-Score 1: Version Lag
214
+
215
+ Derived from semantic versioning distance between `lib.version` and `lib.latest_version` in Neo4j:
216
+
217
+ | Gap | Score |
218
+ |---|---|
219
+ | Different major version | 1.0 |
220
+ | Different minor version (same major) | 0.6 |
221
+ | Different patch (same major.minor) | 0.2 |
222
+ | Same version | 0.0 |
223
+
224
+ A library that is 2 major versions behind still scores 1.0 β€” version lag is binary once you're behind a major boundary.
225
+
226
+ ### Sub-Score 2: Downstream Exposure
227
+
228
+ How many other services `DEPENDS_ON` this library in the Neo4j graph:
229
+
230
+ ```
231
+ downstream_normalized = min(1.0, downstream_count / max_downstream_across_all_libs)
232
+ ```
233
+
234
+ A library with 12 dependents when the most-connected library has 15 gets `downstream_normalized = 0.8`. This is an org-wide normalisation β€” the denominator is the most-exposed library in the entire graph.
235
+
236
+ ### Sub-Score 3: Historical Incident Rate
237
+
238
+ How many `Incident` nodes have a `CAUSED_BY` edge to this library:
239
+
240
+ ```
241
+ incident_rate = min(1.0, incident_count / 365)
242
+ ```
243
+
244
+ This caps at 1.0 when there have been β‰₯ 365 incidents (one per day) β€” which is theoretical. In practice, even 5 incidents scores `incident_rate = 0.0137`, a meaningful non-zero signal.
245
+
246
+ ### Poisson Forecast: Probability of Incident in Next 30 Days
247
+
248
+ The Poisson model treats historical incidents as a counting process with rate Ξ»:
249
+
250
+ ```
251
+ Ξ» = incident_count / 365 (incidents per day)
252
+
253
+ P(at least one incident in next 30 days) = 1 βˆ’ exp(βˆ’Ξ» Γ— 30)
254
+ ```
255
+
256
+ This gives a concrete probability β€” not a vague "high risk" label β€” that can be shown in the UI:
257
+
258
+ | incident_count | Ξ» | P(incident in 30d) |
259
+ |---|---|---|
260
+ | 0 | 0 | 0% |
261
+ | 1 | 0.00274 | 7.8% |
262
+ | 2 | 0.00548 | 15.3% |
263
+ | 5 | 0.01370 | 33.6% |
264
+ | 12 | 0.03288 | 63.0% |
265
+
266
+ ### Neo4j Query
267
+
268
+ ```cypher
269
+ MATCH (lib:Library)
270
+ OPTIONAL MATCH (lib)<-[:DEPENDS_ON]-(downstream)
271
+ OPTIONAL MATCH (lib)<-[:CAUSED_BY]-(inc:Incident)
272
+ RETURN lib.name AS name,
273
+ coalesce(lib.version, '0.0.0') AS current_version,
274
+ coalesce(lib.latest_version, '0.0.0') AS latest_version,
275
+ count(DISTINCT downstream) AS downstream_count,
276
+ count(DISTINCT inc) AS incident_count
277
+ ```
278
+
279
+ This is the only query to Neo4j in the algorithm β€” all scoring is done in Python after fetching.
280
+
281
+ ### Severity
282
+
283
+ | `risk` | Severity |
284
+ |---|---|
285
+ | β‰₯ 0.70 | `critical` |
286
+ | β‰₯ 0.50 | `high` |
287
+ | β‰₯ 0.30 | `medium` |
288
+ | < 0.30 | `low` |
289
+
290
+ ### Example
291
+
292
+ `fastapi` library: 2 major versions behind, 8 downstream services, 2 incidents in Neo4j. Max downstream across all libs is 15.
293
+
294
+ ```
295
+ version_lag = 1.0 (major version gap)
296
+ downstream_normalized = 8/15 = 0.533
297
+ incident_rate = 2/365 = 0.00548
298
+
299
+ risk = 0.40Γ—1.0 + 0.35Γ—0.533 + 0.25Γ—0.00548
300
+ = 0.400 + 0.187 + 0.001
301
+ = 0.588 β†’ high
302
+
303
+ Ξ» = 2/365 = 0.00548
304
+ poisson_30d = 1 βˆ’ exp(βˆ’0.00548 Γ— 30) = 15.3%
305
+ ```
306
+
307
+ ---
308
+
309
+ ## Algorithm Interaction
310
+
311
+ The four algorithms are independent and run on separate schedules, but they complement each other:
312
+
313
+ ```
314
+ A query spike on team_infra (Z=4.8, critical)
315
+ ↓
316
+ Cross-reference gs:topics β†’ "kubernetes" dominant this hour
317
+ ↓
318
+ Staleness algorithm (ran last night) already flagged:
319
+ "k8s-ingress-runbook.md" β†’ staleness_risk=0.72 (high)
320
+ ↓
321
+ Dependency risk algorithm (ran last night) flagged:
322
+ "helm" library β†’ risk=0.61 (high), poisson_30d=22%
323
+ ↓
324
+ Three active anomaly_signals pointing at same root cause
325
+ β†’ Alert feed surfaces them together, sorted by severity
326
+ ```
327
+
328
+ No automated correlation is built yet β€” the signals exist separately in `anomaly_signals`. Grouping them is a UI/UX concern (future Area 4 extension).
Docs/anomaly-and-forecasting/03_api_reference.md ADDED
@@ -0,0 +1,240 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # 03 Β· API Reference β€” `/api/anomaly`
2
+
3
+ **Router file:** `src/anomaly/router.py`
4
+ **Registered in:** `main.py`
5
+ **Auth:** All endpoints require a valid session cookie (`get_current_user`). Team isolation is enforced server-side for non-admin roles.
6
+
7
+ ---
8
+
9
+ ## `GET /api/anomaly/signals`
10
+
11
+ List anomaly signals with optional filters. Non-admin users are automatically scoped to their own `team_id`.
12
+
13
+ ### Query Parameters
14
+
15
+ | Param | Type | Default | Description |
16
+ |---|---|---|---|
17
+ | `type` | string | β€” | Filter by `signal_type` (`query_spike`, `query_drop`, `escalation_trend`, `staleness`, `dependency_risk`) |
18
+ | `severity` | string | β€” | Filter by `severity` (`critical`, `high`, `medium`, `low`) |
19
+ | `team_id` | string | β€” | Admin/org_admin only β€” query a specific team. Non-admins are always scoped to their own team. |
20
+ | `resolved` | boolean | `false` | Include resolved signals |
21
+ | `limit` | integer | `50` | Max rows returned (1–200) |
22
+
23
+ ### Response
24
+
25
+ ```json
26
+ {
27
+ "signals": [
28
+ {
29
+ "id": "3f2a1b4c-...",
30
+ "team_id": "team_infra",
31
+ "signal_type": "query_spike",
32
+ "entity_type": "Team",
33
+ "entity_id": "team_infra",
34
+ "severity": "critical",
35
+ "score": 5.875,
36
+ "details": {
37
+ "z_score": 5.875,
38
+ "current_count": 72,
39
+ "baseline_mean": 25.4,
40
+ "baseline_stdev": 7.9,
41
+ "hour_bucket": "2026-05-17T14:00:00",
42
+ "window_hours": 335
43
+ },
44
+ "resolved": false,
45
+ "resolved_by": null,
46
+ "resolved_at": null,
47
+ "detected_at": "2026-05-17T14:15:03.221Z"
48
+ }
49
+ ],
50
+ "total": 1
51
+ }
52
+ ```
53
+
54
+ ### Side Effect
55
+
56
+ Triggers `broadcast_new_critical_signals()` as a FastAPI `BackgroundTask`. Rate-limited to once per 5 minutes β€” pushes `escalation_spike` or `knowledge_gap` WebSocket notifications to connected clients for new critical/high signals detected in the last 10 minutes.
57
+
58
+ ---
59
+
60
+ ## `GET /api/anomaly/signals/summary`
61
+
62
+ Returns unresolved signal counts grouped by type and severity. Used by the frontend to render the severity badge strip at the top of the Anomalies tab.
63
+
64
+ ### Response
65
+
66
+ ```json
67
+ {
68
+ "total": 7,
69
+ "by_type": {
70
+ "query_spike": 2,
71
+ "staleness": 3,
72
+ "dependency_risk": 2
73
+ },
74
+ "by_severity": {
75
+ "critical": 1,
76
+ "high": 3,
77
+ "medium": 2,
78
+ "low": 1
79
+ }
80
+ }
81
+ ```
82
+
83
+ ---
84
+
85
+ ## `PATCH /api/anomaly/signals/{signal_id}/resolve`
86
+
87
+ Mark a signal as resolved. **Admin / org_admin only** β€” returns `403` for other roles.
88
+
89
+ ### Path Parameter
90
+
91
+ | Param | Description |
92
+ |---|---|
93
+ | `signal_id` | UUID of the `anomaly_signals` row |
94
+
95
+ ### Response
96
+
97
+ ```json
98
+ { "ok": true }
99
+ ```
100
+
101
+ Returns `404` if the signal does not exist or is already resolved.
102
+
103
+ **Effect:** Sets `resolved = true`, `resolved_by = user.id`, `resolved_at = now()` on the row. The signal is excluded from all `resolved=false` queries going forward.
104
+
105
+ ---
106
+
107
+ ## `GET /api/anomaly/query-patterns`
108
+
109
+ Returns hourly query counts for the last N days, with anomaly overlay markers. Powers `QuerySpikeChart.tsx`.
110
+
111
+ ### Query Parameters
112
+
113
+ | Param | Type | Default | Description |
114
+ |---|---|---|---|
115
+ | `team_id` | string | β€” | Admin only. Non-admins use their session team. |
116
+ | `days` | integer | `14` | Lookback window (1–90) |
117
+
118
+ ### Response
119
+
120
+ ```json
121
+ {
122
+ "team_id": "team_infra",
123
+ "hourly": [
124
+ {
125
+ "hour": "2026-05-03T09:00:00+00:00",
126
+ "count": 18,
127
+ "escalations": 2,
128
+ "anomaly_score": null,
129
+ "anomaly_type": null,
130
+ "anomaly_severity": null
131
+ },
132
+ {
133
+ "hour": "2026-05-17T14:00:00+00:00",
134
+ "count": 72,
135
+ "escalations": 4,
136
+ "anomaly_score": 5.875,
137
+ "anomaly_type": "query_spike",
138
+ "anomaly_severity": "critical"
139
+ }
140
+ ]
141
+ }
142
+ ```
143
+
144
+ `anomaly_score`, `anomaly_type`, `anomaly_severity` are `null` for normal hours and populated for hours that have a matching unresolved `query_spike` or `query_drop` signal. The frontend uses these to render `ReferenceArea` overlays on the chart.
145
+
146
+ ---
147
+
148
+ ## `GET /api/anomaly/staleness`
149
+
150
+ Top documents by staleness risk score. Powers `StalenessRiskList.tsx`.
151
+
152
+ ### Query Parameters
153
+
154
+ | Param | Type | Default | Description |
155
+ |---|---|---|---|
156
+ | `limit` | integer | `30` | Max rows (1–100) |
157
+
158
+ ### Response
159
+
160
+ ```json
161
+ {
162
+ "documents": [
163
+ {
164
+ "entity_id": "confluence:12345",
165
+ "score": 0.671,
166
+ "details": {
167
+ "title": "Kubernetes Ingress Setup Guide",
168
+ "age_days": 240,
169
+ "age_factor": 0.931,
170
+ "query_pressure": 0.72,
171
+ "updated_at": "2025-09-20T10:00:00"
172
+ },
173
+ "detected_at": "2026-05-17T03:04:22.100Z"
174
+ }
175
+ ],
176
+ "total": 1
177
+ }
178
+ ```
179
+
180
+ Results are sorted descending by `score`. The `details` object contains all the inputs to the staleness formula, making it possible to explain the score in a tooltip.
181
+
182
+ ---
183
+
184
+ ## `GET /api/anomaly/dependency-risk`
185
+
186
+ Libraries scored by dependency risk, including Poisson incident probability. Powers the risk column added to `DependencyTracker.tsx`.
187
+
188
+ ### Response
189
+
190
+ ```json
191
+ {
192
+ "libraries": [
193
+ {
194
+ "entity_id": "fastapi",
195
+ "score": 0.588,
196
+ "details": {
197
+ "library_name": "fastapi",
198
+ "current_version": "0.95.0",
199
+ "latest_version": "0.115.0",
200
+ "version_lag": 1.0,
201
+ "downstream_count": 8,
202
+ "downstream_normalized": 0.533,
203
+ "incident_count": 2,
204
+ "incident_rate": 0.0055,
205
+ "poisson_30d": 0.153
206
+ },
207
+ "detected_at": "2026-05-17T03:34:11.200Z"
208
+ }
209
+ ],
210
+ "total": 1
211
+ }
212
+ ```
213
+
214
+ `poisson_30d` is the probability (0.0–1.0) that this library causes at least one incident in the next 30 days, modelled as a Poisson process on historical `CAUSED_BY` edges in Neo4j. Rendered in the UI as a percentage (e.g. "15% in 30d").
215
+
216
+ ---
217
+
218
+ ## Error Responses
219
+
220
+ All endpoints return standard HTTP error codes:
221
+
222
+ | Code | Cause |
223
+ |---|---|
224
+ | `401` | No valid session cookie |
225
+ | `403` | Insufficient role (e.g. non-admin calling PATCH /resolve) |
226
+ | `404` | Signal not found or already resolved |
227
+ | `500` | Unexpected internal error (Supabase/Neo4j unavailable) |
228
+
229
+ All 5xx errors are logged via `src/utils/logger.py` with request ID for tracing.
230
+
231
+ ---
232
+
233
+ ## Authentication Notes
234
+
235
+ The `/api/anomaly/signals` endpoint accepts an optional `team_id` query param:
236
+
237
+ - **admin / org_admin roles:** `team_id` param is respected β€” they can query any team's signals
238
+ - **All other roles:** `team_id` is overridden with `user.get("team_id")` from the session β€” the client cannot change this
239
+
240
+ The `dependency_risk` signals have `team_id = null` (library risk is org-wide). They appear for all roles regardless of team scoping.
Docs/anomaly-and-forecasting/04_jobs_and_scheduling.md ADDED
@@ -0,0 +1,198 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # 04 Β· Celery Jobs & Scheduling
2
+
3
+ ---
4
+
5
+ ## Overview
6
+
7
+ Three Celery tasks handle all anomaly detection. They run in the background, writing to `anomaly_signals` in Supabase. The FastAPI app only reads from that table β€” no detection logic runs in the request path.
8
+
9
+ ```
10
+ Every 15 minutes poll_metrics_anomalies β†’ Z-score + escalation trend
11
+ Daily 03:00 UTC compute_staleness_scores β†’ Staleness risk for all documents
12
+ Daily 03:30 UTC compute_dependency_risk β†’ Library risk from Neo4j
13
+ ```
14
+
15
+ ---
16
+
17
+ ## Task Definitions
18
+
19
+ All tasks are defined in `src/celery_app.py`.
20
+
21
+ ### `poll_metrics_anomalies`
22
+
23
+ ```python
24
+ @shared_task(queue="polling", bind=True, max_retries=3)
25
+ def poll_metrics_anomalies(self):
26
+ from src.anomaly.tasks import run_zscore_anomaly_detection
27
+ run_zscore_anomaly_detection()
28
+ ```
29
+
30
+ | Property | Value |
31
+ |---|---|
32
+ | Queue | `polling` (priority 3) |
33
+ | Schedule | Every `settings.sync.metrics_poll_interval` seconds (default 900s / 15 min) |
34
+ | Max retries | 3, with 120s countdown on failure |
35
+ | Runs | `run_zscore_anomaly_detection()` β†’ `_check_escalation_trend()` |
36
+ | Writes | `query_spike`, `query_drop`, `escalation_trend` signals |
37
+
38
+ The interval is controlled by the `SYNC__METRICS_POLL_INTERVAL` env var (default 900). Lower it in staging to test faster:
39
+
40
+ ```env
41
+ SYNC__METRICS_POLL_INTERVAL=60
42
+ ```
43
+
44
+ ### `compute_staleness_scores`
45
+
46
+ ```python
47
+ @shared_task(queue="low", bind=True, max_retries=2)
48
+ def compute_staleness_scores(self):
49
+ from src.anomaly.tasks import run_staleness_scoring
50
+ run_staleness_scoring()
51
+ ```
52
+
53
+ | Property | Value |
54
+ |---|---|
55
+ | Queue | `low` (priority 1) |
56
+ | Schedule | Daily at 03:00 UTC (`crontab(hour=3, minute=0)`) |
57
+ | Max retries | 2, with 300s countdown |
58
+ | Reads | `documents` table (Supabase), `query_events` table (Supabase) |
59
+ | Writes | `staleness` signals, purges `query_events` > 90 days |
60
+
61
+ ### `compute_dependency_risk`
62
+
63
+ ```python
64
+ @shared_task(queue="low", bind=True, max_retries=2)
65
+ def compute_dependency_risk(self):
66
+ from src.anomaly.tasks import run_dependency_risk_modeling
67
+ run_dependency_risk_modeling()
68
+ ```
69
+
70
+ | Property | Value |
71
+ |---|---|
72
+ | Queue | `low` (priority 1) |
73
+ | Schedule | Daily at 03:30 UTC (`crontab(hour=3, minute=30)`) |
74
+ | Max retries | 2, with 300s countdown |
75
+ | Reads | Neo4j β€” `Library`, `DEPENDS_ON`, `CAUSED_BY` edges |
76
+ | Writes | `dependency_risk` signals |
77
+
78
+ ---
79
+
80
+ ## Beat Schedule Registration
81
+
82
+ Beat entries are registered in `setup_periodic_tasks()` in `src/celery_app.py`:
83
+
84
+ ```python
85
+ @app.on_after_finalize.connect
86
+ def setup_periodic_tasks(sender, **kwargs):
87
+ # ... existing tasks ...
88
+
89
+ from celery.schedules import crontab
90
+
91
+ sender.add_periodic_task(
92
+ crontab(hour=3, minute=0),
93
+ compute_staleness_scores.s(),
94
+ name="compute-staleness-scores",
95
+ )
96
+ sender.add_periodic_task(
97
+ crontab(hour=3, minute=30),
98
+ compute_dependency_risk.s(),
99
+ name="compute-dependency-risk",
100
+ )
101
+ ```
102
+
103
+ The `crontab` import is inside the function to avoid a module-level circular import with `celery.schedules`. This mirrors the pattern already used in `ingestion/jobs/celery_app.py`.
104
+
105
+ ---
106
+
107
+ ## Running Workers
108
+
109
+ Start a worker that handles the `polling` and `low` queues (required for anomaly tasks):
110
+
111
+ ```bash
112
+ celery -A src.celery_app worker -Q polling,low --loglevel=info
113
+ ```
114
+
115
+ Start the Beat scheduler:
116
+
117
+ ```bash
118
+ celery -A src.celery_app beat --loglevel=info
119
+ ```
120
+
121
+ For local development, combine both in one process:
122
+
123
+ ```bash
124
+ celery -A src.celery_app worker --beat -Q polling,low,default --loglevel=info
125
+ ```
126
+
127
+ ---
128
+
129
+ ## Manually Triggering Tasks
130
+
131
+ Useful for testing without waiting for the schedule:
132
+
133
+ ```bash
134
+ # Trigger Z-score detection immediately
135
+ celery -A src.celery_app call src.celery_app.poll_metrics_anomalies
136
+
137
+ # Trigger staleness scoring
138
+ celery -A src.celery_app call src.celery_app.compute_staleness_scores
139
+
140
+ # Trigger dependency risk
141
+ celery -A src.celery_app call src.celery_app.compute_dependency_risk
142
+ ```
143
+
144
+ Or from a Python shell:
145
+
146
+ ```python
147
+ from src.celery_app import poll_metrics_anomalies, compute_staleness_scores
148
+ poll_metrics_anomalies.delay()
149
+ compute_staleness_scores.delay()
150
+ ```
151
+
152
+ ---
153
+
154
+ ## Monitoring Task Execution
155
+
156
+ Check the `anomaly_signals` table after a run:
157
+
158
+ ```sql
159
+ SELECT signal_type, severity, score, entity_id, detected_at
160
+ FROM anomaly_signals
161
+ WHERE detected_at > NOW() - INTERVAL '1 hour'
162
+ ORDER BY detected_at DESC;
163
+ ```
164
+
165
+ Check for failures in the Celery dead letter queue (`ingest:deadletter` Redis key) or via Flower:
166
+
167
+ ```bash
168
+ pip install flower
169
+ celery -A src.celery_app flower --port=5555
170
+ # Open http://localhost:5555
171
+ ```
172
+
173
+ ---
174
+
175
+ ## Failure Modes
176
+
177
+ | Task | Failure Mode | Effect |
178
+ |---|---|---|
179
+ | `poll_metrics_anomalies` | Supabase unavailable | `get_all_team_ids()` returns `[]` β†’ task is a no-op, retried in 2 min |
180
+ | `poll_metrics_anomalies` | Fewer than 24h of data | All teams skipped β€” not enough baseline, no false alarms |
181
+ | `compute_staleness_scores` | `documents` table empty | Early return, no signals written |
182
+ | `compute_staleness_scores` | `query_events` table empty | All teams get `query_pressure = 0` β†’ staleness_risk = 0 β†’ no signals |
183
+ | `compute_dependency_risk` | Neo4j unavailable | `_fetch_library_risk_rows()` returns `[]` β†’ early return |
184
+ | Any task | Unhandled exception | Celery retries up to max_retries, then marks task as FAILED |
185
+
186
+ All per-entity errors inside loops are caught individually (`except: logger.warning(...)`) β€” a single bad document or library never aborts the entire run.
187
+
188
+ ---
189
+
190
+ ## Adding a New Detection Algorithm
191
+
192
+ 1. Add the algorithm function to `src/anomaly/tasks.py`
193
+ 2. Add a new `@shared_task` to `src/celery_app.py`
194
+ 3. Register with `sender.add_periodic_task(...)` in `setup_periodic_tasks()`
195
+ 4. Use `insert_signal()` from `src/anomaly/db.py` with an appropriate `signal_type` value
196
+ 5. Add a new `GET /api/anomaly/<endpoint>` if the frontend needs to query the results separately
197
+
198
+ The `signal_type` field in `anomaly_signals` is free-form text β€” no migration needed for a new type. Add it to the `signal_type` documentation in `01_data_layer.md`.
Docs/anomaly-and-forecasting/README.md ADDED
@@ -0,0 +1,86 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Area 4 β€” Anomaly Detection & Forecasting
2
+
3
+ > **Status: Implemented** on branch `anomaly-and-forecasting`.
4
+ > Previously marked "Planned Extension" in `03_analytics_and_intelligence.md`.
5
+
6
+ ---
7
+
8
+ ## What This Area Does
9
+
10
+ Area 4 consumes the raw event streams produced by Areas 1–3 and adds a detection and forecasting layer on top. It answers questions that static dashboards cannot:
11
+
12
+ - Is this query spike normal, or did something break?
13
+ - Which documents are dangerously stale *and* heavily queried?
14
+ - Is our escalation rate trending worse or better?
15
+ - Which library is most likely to cause an incident in the next 30 days?
16
+
17
+ None of this required new data pipelines. All signals already existed in Redis, Supabase, and Neo4j β€” Area 4 adds the time-series persistence and mathematical models to make them actionable.
18
+
19
+ ---
20
+
21
+ ## Document Map
22
+
23
+ | File | Contents |
24
+ |---|---|
25
+ | [`01_data_layer.md`](./01_data_layer.md) | Three new Supabase tables, PostgreSQL function, migration guide, data flow |
26
+ | [`02_detection_algorithms.md`](./02_detection_algorithms.md) | Z-score spike detection, escalation trend, staleness scoring, dependency risk + Poisson model |
27
+ | [`03_api_reference.md`](./03_api_reference.md) | All six `/api/anomaly` endpoints with request/response shapes |
28
+ | [`04_jobs_and_scheduling.md`](./04_jobs_and_scheduling.md) | Celery Beat schedule, task configuration, manual trigger guide |
29
+
30
+ ---
31
+
32
+ ## File Map (Implementation)
33
+
34
+ ```
35
+ supabase/
36
+ anomaly_migration.sql ← Run first: 3 tables + aggregate function
37
+
38
+ src/anomaly/
39
+ __init__.py
40
+ db.py ← All Supabase reads/writes
41
+ tasks.py ← Detection algorithms (stdlib only)
42
+ router.py ← FastAPI /api/anomaly endpoints
43
+ notifier.py ← In-process WebSocket broadcast bridge
44
+
45
+ agent/api.py ← +7 lines: fire-and-forget event persist
46
+ src/celery_app.py ← poll_metrics_anomalies implemented;
47
+ 2 new daily tasks added
48
+ main.py ← anomaly_router registered
49
+ ```
50
+
51
+ ---
52
+
53
+ ## How It Fits Into the Five-Area System
54
+
55
+ ```
56
+ Area 1 (RAG) β†’ query retrieval confidence feeds staleness pressure signal
57
+ Area 2 (Pipelines) β†’ document updated_at feeds staleness age factor
58
+ Area 3 (Analytics) β†’ query events β†’ query_events table β†’ Z-score + trend models
59
+ escalation events β†’ escalation_trend detection
60
+ Neo4j graph β†’ dependency risk model
61
+
62
+ Area 4 (This) β†’ anomaly_signals table β†’ API β†’ frontend Anomalies tab
63
+ WebSocket push on critical/high signals β†’ NotificationCenter
64
+
65
+ Area 5 (Graph) β†’ Library DEPENDS_ON edges consumed by dependency risk model
66
+ Incident CAUSED_BY edges feed Poisson Ξ» estimate
67
+ ```
68
+
69
+ ---
70
+
71
+ ## Design Principles
72
+
73
+ **1. Never break the query path.**
74
+ Every write from `agent/api.py` is fire-and-forget with `asyncio.ensure_future` and a bare `except: pass`. If Supabase is unavailable, the SSE stream is unaffected.
75
+
76
+ **2. No new Python dependencies.**
77
+ Z-score uses `statistics.mean` / `statistics.stdev` (stdlib). Staleness and Poisson use `math.exp` (stdlib). No scikit-learn, no numpy.
78
+
79
+ **3. Deduplication at the signal level.**
80
+ `insert_signal()` checks for an identical unresolved signal for the same (type, team, entity) in the last 2 hours before inserting. A 15-minute Celery task cannot flood the table.
81
+
82
+ **4. Team isolation everywhere.**
83
+ Non-admin API callers are scoped to `user.get("team_id")` regardless of query params. Admin and org_admin can query across teams.
84
+
85
+ **5. Celery workers cannot push WebSockets directly.**
86
+ Workers run in a separate OS process; `_notification_clients` in `src/ws/router.py` is empty there. Real-time push uses an in-process FastAPI `BackgroundTask` via `src/anomaly/notifier.py`, rate-limited to once per 5 minutes.
agent/api.py CHANGED
@@ -84,6 +84,15 @@ async def _store_query_event(
84
  await r.lpush("gs:escalations", json.dumps(escalation))
85
  await r.ltrim("gs:escalations", 0, 499)
86
 
 
 
 
 
 
 
 
 
 
87
  finally:
88
  await r.aclose()
89
  except Exception as exc:
 
84
  await r.lpush("gs:escalations", json.dumps(escalation))
85
  await r.ltrim("gs:escalations", 0, 499)
86
 
87
+ # Persist to Supabase for time-series anomaly detection.
88
+ # Fire-and-forget: never allowed to fail the SSE stream.
89
+ try:
90
+ import asyncio as _asyncio
91
+ from src.anomaly.db import async_upsert_query_event
92
+ _asyncio.ensure_future(async_upsert_query_event(event))
93
+ except Exception:
94
+ pass
95
+
96
  finally:
97
  await r.aclose()
98
  except Exception as exc:
main.py CHANGED
@@ -17,6 +17,7 @@ from src.jira_agent.router import router as jira_router
17
  from src.utils.middleware import RequestLoggingMiddleware
18
  from src.auth.router import router as auth_router
19
  from src.analytics.router import router as analytics_router
 
20
  from src.admin.router import router as admin_router
21
  from src.admin.users_api import router as admin_users_router
22
  from src.admin.users_api import audit_router as admin_audit_router
@@ -59,6 +60,7 @@ app.add_middleware(RequestLoggingMiddleware)
59
  # ---------------------------------------------------------------------------
60
  app.include_router(auth_router)
61
  app.include_router(analytics_router)
 
62
  app.include_router(admin_router)
63
  app.include_router(admin_users_router)
64
  app.include_router(admin_audit_router)
 
17
  from src.utils.middleware import RequestLoggingMiddleware
18
  from src.auth.router import router as auth_router
19
  from src.analytics.router import router as analytics_router
20
+ from src.anomaly.router import router as anomaly_router
21
  from src.admin.router import router as admin_router
22
  from src.admin.users_api import router as admin_users_router
23
  from src.admin.users_api import audit_router as admin_audit_router
 
60
  # ---------------------------------------------------------------------------
61
  app.include_router(auth_router)
62
  app.include_router(analytics_router)
63
+ app.include_router(anomaly_router)
64
  app.include_router(admin_router)
65
  app.include_router(admin_users_router)
66
  app.include_router(admin_audit_router)
src/anomaly/__init__.py ADDED
File without changes
src/anomaly/db.py ADDED
@@ -0,0 +1,262 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Supabase persistence layer for anomaly detection.
2
+
3
+ Every public function is wrapped in try/except so callers never crash
4
+ if Supabase is unavailable or misconfigured.
5
+ """
6
+
7
+ from __future__ import annotations
8
+
9
+ import asyncio
10
+ import logging
11
+ from datetime import datetime, timedelta
12
+
13
+ logger = logging.getLogger(__name__)
14
+
15
+
16
+ def _sb():
17
+ """Return the Supabase sync client (service role key, bypasses RLS)."""
18
+ from src.auth.db import _client
19
+ return _client()
20
+
21
+
22
+ # ── query_events ──────────────────────────────────────────────────────────────
23
+
24
+ def upsert_query_event(event: dict) -> None:
25
+ """Write one query event row, idempotent on event_id."""
26
+ try:
27
+ _sb().table("query_events").upsert(
28
+ {
29
+ "event_id": event["id"],
30
+ "team_id": event.get("team_id", "unknown"),
31
+ "session_id": event.get("session_id"),
32
+ "success": event.get("success", True),
33
+ "duration_ms": event.get("duration_ms"),
34
+ "escalated": event.get("escalated", False),
35
+ "guardrail_score": event.get("guardrail_score"),
36
+ "agent_metrics": event.get("agents", {}),
37
+ "created_at": event.get("created_at", datetime.utcnow().isoformat()),
38
+ },
39
+ on_conflict="event_id",
40
+ ignore_duplicates=True,
41
+ ).execute()
42
+ except Exception:
43
+ logger.warning("anomaly_db: upsert_query_event failed", exc_info=True)
44
+
45
+
46
+ async def async_upsert_query_event(event: dict) -> None:
47
+ """Async shim β€” runs the sync upsert in the default executor."""
48
+ loop = asyncio.get_event_loop()
49
+ await loop.run_in_executor(None, upsert_query_event, event)
50
+
51
+
52
+ # ── query_events_hourly ───────────────────────────────────────────────────────
53
+
54
+ def aggregate_hourly(team_id: str, hour_bucket: str) -> None:
55
+ """Recompute one (team_id, hour_bucket) row via the Supabase RPC."""
56
+ try:
57
+ _sb().rpc("aggregate_hourly_bucket", {
58
+ "p_team_id": team_id,
59
+ "p_hour": hour_bucket,
60
+ }).execute()
61
+ except Exception:
62
+ logger.warning(
63
+ "anomaly_db: aggregate_hourly failed team=%s hour=%s",
64
+ team_id, hour_bucket, exc_info=True,
65
+ )
66
+
67
+
68
+ def get_hourly_counts(team_id: str, days: int = 14) -> list[dict]:
69
+ """Return hourly aggregate rows for the last N days for a team."""
70
+ try:
71
+ cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat()
72
+ result = (
73
+ _sb()
74
+ .table("query_events_hourly")
75
+ .select("hour_bucket,query_count,escalation_count")
76
+ .eq("team_id", team_id)
77
+ .gte("hour_bucket", cutoff)
78
+ .order("hour_bucket", desc=False)
79
+ .execute()
80
+ )
81
+ return result.data or []
82
+ except Exception:
83
+ logger.warning("anomaly_db: get_hourly_counts failed", exc_info=True)
84
+ return []
85
+
86
+
87
+ def get_all_team_ids() -> list[str]:
88
+ """Return distinct team_ids with events in the last 90 days."""
89
+ try:
90
+ cutoff = (datetime.utcnow() - timedelta(days=90)).isoformat()
91
+ result = (
92
+ _sb()
93
+ .table("query_events")
94
+ .select("team_id")
95
+ .gte("created_at", cutoff)
96
+ .execute()
97
+ )
98
+ return list({r["team_id"] for r in (result.data or [])})
99
+ except Exception:
100
+ logger.warning("anomaly_db: get_all_team_ids failed", exc_info=True)
101
+ return []
102
+
103
+
104
+ # ── anomaly_signals ───────────────────────────────────────────────────────────
105
+
106
+ def insert_signal(
107
+ signal_type: str,
108
+ severity: str,
109
+ score: float,
110
+ details: dict,
111
+ team_id: str | None = None,
112
+ entity_type: str | None = None,
113
+ entity_id: str | None = None,
114
+ ) -> dict | None:
115
+ """Insert one anomaly signal with 2-hour dedup suppression.
116
+
117
+ If an identical unresolved signal already exists for the same
118
+ (signal_type, team_id, entity_id) within the last 2 hours, the
119
+ insert is skipped and None is returned.
120
+ """
121
+ try:
122
+ two_hours_ago = (datetime.utcnow() - timedelta(hours=2)).isoformat()
123
+ dupe_q = (
124
+ _sb()
125
+ .table("anomaly_signals")
126
+ .select("id")
127
+ .eq("signal_type", signal_type)
128
+ .eq("resolved", False)
129
+ .gte("detected_at", two_hours_ago)
130
+ )
131
+ if team_id:
132
+ dupe_q = dupe_q.eq("team_id", team_id)
133
+ if entity_id:
134
+ dupe_q = dupe_q.eq("entity_id", entity_id)
135
+ if dupe_q.execute().data:
136
+ return None # suppress duplicate
137
+
138
+ row: dict = {
139
+ "signal_type": signal_type,
140
+ "severity": severity,
141
+ "score": score,
142
+ "details": details,
143
+ }
144
+ if team_id: row["team_id"] = team_id
145
+ if entity_type: row["entity_type"] = entity_type
146
+ if entity_id: row["entity_id"] = entity_id
147
+
148
+ result = _sb().table("anomaly_signals").insert(row).execute()
149
+ return result.data[0] if result.data else None
150
+ except Exception:
151
+ logger.warning("anomaly_db: insert_signal failed type=%s", signal_type, exc_info=True)
152
+ return None
153
+
154
+
155
+ def get_signals(
156
+ team_id: str | None,
157
+ severity: str | None,
158
+ signal_type: str | None,
159
+ resolved: bool,
160
+ limit: int,
161
+ ) -> list[dict]:
162
+ try:
163
+ q = (
164
+ _sb()
165
+ .table("anomaly_signals")
166
+ .select("*")
167
+ .eq("resolved", resolved)
168
+ .order("detected_at", desc=True)
169
+ .limit(limit)
170
+ )
171
+ if team_id: q = q.eq("team_id", team_id)
172
+ if severity: q = q.eq("severity", severity)
173
+ if signal_type: q = q.eq("signal_type", signal_type)
174
+ return q.execute().data or []
175
+ except Exception:
176
+ logger.warning("anomaly_db: get_signals failed", exc_info=True)
177
+ return []
178
+
179
+
180
+ def resolve_signal(signal_id: str, resolver_user_id: str) -> bool:
181
+ try:
182
+ _sb().table("anomaly_signals").update({
183
+ "resolved": True,
184
+ "resolved_by": resolver_user_id,
185
+ "resolved_at": datetime.utcnow().isoformat(),
186
+ }).eq("id", signal_id).eq("resolved", False).execute()
187
+ return True
188
+ except Exception:
189
+ logger.warning("anomaly_db: resolve_signal failed id=%s", signal_id, exc_info=True)
190
+ return False
191
+
192
+
193
+ def get_signals_summary() -> dict:
194
+ """Return unresolved signal counts grouped by type and severity."""
195
+ try:
196
+ result = (
197
+ _sb()
198
+ .table("anomaly_signals")
199
+ .select("signal_type,severity")
200
+ .eq("resolved", False)
201
+ .execute()
202
+ )
203
+ rows = result.data or []
204
+ by_type: dict[str, int] = {}
205
+ by_severity: dict[str, int] = {}
206
+ for r in rows:
207
+ by_type[r["signal_type"]] = by_type.get(r["signal_type"], 0) + 1
208
+ by_severity[r["severity"]] = by_severity.get(r["severity"], 0) + 1
209
+ return {"total": len(rows), "by_type": by_type, "by_severity": by_severity}
210
+ except Exception:
211
+ logger.warning("anomaly_db: get_signals_summary failed", exc_info=True)
212
+ return {"total": 0, "by_type": {}, "by_severity": {}}
213
+
214
+
215
+ def get_staleness_top(limit: int = 30) -> list[dict]:
216
+ try:
217
+ result = (
218
+ _sb()
219
+ .table("anomaly_signals")
220
+ .select("entity_id,score,details,detected_at")
221
+ .eq("signal_type", "staleness")
222
+ .eq("resolved", False)
223
+ .order("score", desc=True)
224
+ .limit(limit)
225
+ .execute()
226
+ )
227
+ return result.data or []
228
+ except Exception:
229
+ logger.warning("anomaly_db: get_staleness_top failed", exc_info=True)
230
+ return []
231
+
232
+
233
+ def get_dependency_risk(limit: int = 50) -> list[dict]:
234
+ try:
235
+ result = (
236
+ _sb()
237
+ .table("anomaly_signals")
238
+ .select("entity_id,score,details,detected_at")
239
+ .eq("signal_type", "dependency_risk")
240
+ .eq("resolved", False)
241
+ .order("score", desc=True)
242
+ .limit(limit)
243
+ .execute()
244
+ )
245
+ return result.data or []
246
+ except Exception:
247
+ logger.warning("anomaly_db: get_dependency_risk failed", exc_info=True)
248
+ return []
249
+
250
+
251
+ def purge_old_events() -> int:
252
+ """Delete query_events older than 90 days. Returns approximate deleted count."""
253
+ try:
254
+ cutoff = (datetime.utcnow() - timedelta(days=90)).isoformat()
255
+ result = _sb().table("query_events").delete().lt("created_at", cutoff).execute()
256
+ deleted = len(result.data or [])
257
+ if deleted:
258
+ logger.info("anomaly_db: purged %d old query_events", deleted)
259
+ return deleted
260
+ except Exception:
261
+ logger.warning("anomaly_db: purge_old_events failed", exc_info=True)
262
+ return 0
src/anomaly/notifier.py ADDED
@@ -0,0 +1,74 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """In-process WebSocket broadcast bridge for anomaly signals.
2
+
3
+ Celery workers run in a separate OS process, so `_notification_clients`
4
+ in src/ws/router.py is always empty there. Real-time push is handled here:
5
+ a FastAPI BackgroundTask (running inside the API server process) polls for
6
+ recently-detected critical/high signals and calls broadcast_notification().
7
+
8
+ Rate-limited to one check per 5 minutes via a module-level timestamp.
9
+ """
10
+
11
+ from __future__ import annotations
12
+
13
+ import logging
14
+ from datetime import datetime, timedelta
15
+
16
+ logger = logging.getLogger(__name__)
17
+
18
+ _last_notified: datetime = datetime.utcnow() - timedelta(minutes=5)
19
+
20
+
21
+ async def broadcast_new_critical_signals() -> None:
22
+ """Check for new critical/high anomaly signals and push via WebSocket.
23
+
24
+ Called as a BackgroundTask from GET /api/anomaly/signals.
25
+ No-ops if fewer than 5 minutes have elapsed since the last dispatch.
26
+ """
27
+ global _last_notified
28
+ now = datetime.utcnow()
29
+
30
+ if (now - _last_notified).total_seconds() < 300:
31
+ return
32
+
33
+ _last_notified = now
34
+
35
+ try:
36
+ from src.anomaly.db import get_signals
37
+ from src.ws.router import broadcast_notification
38
+
39
+ recent_signals = get_signals(
40
+ team_id=None,
41
+ severity=None,
42
+ signal_type=None,
43
+ resolved=False,
44
+ limit=20,
45
+ )
46
+
47
+ ten_minutes_ago = now - timedelta(minutes=10)
48
+ for sig in recent_signals:
49
+ if sig.get("severity") not in ("critical", "high"):
50
+ continue
51
+ try:
52
+ detected = datetime.fromisoformat(
53
+ str(sig["detected_at"]).replace("Z", "+00:00")
54
+ ).replace(tzinfo=None)
55
+ except Exception:
56
+ continue
57
+ if detected < ten_minutes_ago:
58
+ continue
59
+
60
+ ws_type = (
61
+ "escalation_spike"
62
+ if sig["signal_type"] in ("query_spike", "query_drop", "escalation_trend")
63
+ else "knowledge_gap"
64
+ )
65
+ entity_suffix = f" β€” {sig['entity_id']}" if sig.get("entity_id") else ""
66
+ await broadcast_notification({
67
+ "type": ws_type,
68
+ "message": sig["signal_type"].replace("_", " ").title() + entity_suffix,
69
+ "severity": sig["severity"],
70
+ "timestamp": sig["detected_at"],
71
+ })
72
+
73
+ except Exception:
74
+ logger.warning("notifier: broadcast_new_critical_signals failed", exc_info=True)
src/anomaly/router.py ADDED
@@ -0,0 +1,135 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Anomaly detection API β€” /api/anomaly endpoints."""
2
+
3
+ from __future__ import annotations
4
+
5
+ from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query
6
+
7
+ from src.auth.deps import get_current_user, require_role
8
+ from src.utils.logger import get_logger
9
+
10
+ logger = get_logger(__name__)
11
+
12
+ router = APIRouter(prefix="/api/anomaly", tags=["anomaly"])
13
+
14
+
15
+ @router.get("/signals")
16
+ async def list_signals(
17
+ background_tasks: BackgroundTasks,
18
+ team_id: str | None = Query(default=None),
19
+ severity: str | None = Query(default=None),
20
+ signal_type: str | None = Query(default=None, alias="type"),
21
+ resolved: bool = Query(default=False),
22
+ limit: int = Query(default=50, ge=1, le=200),
23
+ user: dict = Depends(get_current_user),
24
+ ) -> dict:
25
+ # Non-admin users are scoped to their own team only
26
+ effective_team = team_id
27
+ if user.get("role") not in ("admin", "org_admin"):
28
+ effective_team = user.get("team_id")
29
+
30
+ from src.anomaly.db import get_signals
31
+ signals = get_signals(
32
+ team_id=effective_team,
33
+ severity=severity,
34
+ signal_type=signal_type,
35
+ resolved=resolved,
36
+ limit=limit,
37
+ )
38
+
39
+ # Dispatch WebSocket push for new critical/high signals (in-process, rate-limited)
40
+ from src.anomaly.notifier import broadcast_new_critical_signals
41
+ background_tasks.add_task(broadcast_new_critical_signals)
42
+
43
+ return {"signals": signals, "total": len(signals)}
44
+
45
+
46
+ @router.get("/signals/summary")
47
+ async def signals_summary(
48
+ user: dict = Depends(get_current_user),
49
+ ) -> dict:
50
+ from src.anomaly.db import get_signals_summary
51
+ return get_signals_summary()
52
+
53
+
54
+ @router.patch("/signals/{signal_id}/resolve")
55
+ async def resolve_signal(
56
+ signal_id: str,
57
+ user: dict = Depends(require_role("admin", "org_admin")),
58
+ ) -> dict:
59
+ from src.anomaly.db import resolve_signal
60
+ ok = resolve_signal(signal_id, resolver_user_id=user["id"])
61
+ if not ok:
62
+ raise HTTPException(status_code=404, detail="Signal not found or already resolved")
63
+ return {"ok": True}
64
+
65
+
66
+ @router.get("/query-patterns")
67
+ async def query_patterns(
68
+ team_id: str | None = Query(default=None),
69
+ days: int = Query(default=14, ge=1, le=90),
70
+ user: dict = Depends(get_current_user),
71
+ ) -> dict:
72
+ """Hourly query counts with anomaly overlay markers for QuerySpikeChart."""
73
+ from src.anomaly.db import get_hourly_counts, get_signals
74
+
75
+ effective_team = team_id
76
+ if user.get("role") not in ("admin", "org_admin"):
77
+ effective_team = user.get("team_id") or team_id
78
+
79
+ if not effective_team:
80
+ return {"hourly": [], "team_id": None}
81
+
82
+ hourly = get_hourly_counts(effective_team, days=days)
83
+ signals = get_signals(
84
+ team_id=effective_team,
85
+ severity=None,
86
+ signal_type=None,
87
+ resolved=False,
88
+ limit=200,
89
+ )
90
+
91
+ # Build a lookup of anomaly info keyed by "YYYY-MM-DDTHH" prefix
92
+ anomaly_map: dict[str, dict] = {}
93
+ for sig in signals:
94
+ if sig["signal_type"] in ("query_spike", "query_drop"):
95
+ hb = (sig.get("details") or {}).get("hour_bucket", "")
96
+ if hb:
97
+ anomaly_map[hb[:13]] = {
98
+ "score": sig["score"],
99
+ "type": sig["signal_type"],
100
+ "severity": sig["severity"],
101
+ }
102
+
103
+ result = []
104
+ for row in hourly:
105
+ hb_str = str(row["hour_bucket"])
106
+ anomaly = anomaly_map.get(hb_str[:13])
107
+ result.append({
108
+ "hour": hb_str,
109
+ "count": row["query_count"],
110
+ "escalations": row["escalation_count"],
111
+ "anomaly_score": anomaly["score"] if anomaly else None,
112
+ "anomaly_type": anomaly["type"] if anomaly else None,
113
+ "anomaly_severity": anomaly["severity"] if anomaly else None,
114
+ })
115
+
116
+ return {"hourly": result, "team_id": effective_team}
117
+
118
+
119
+ @router.get("/staleness")
120
+ async def staleness_list(
121
+ limit: int = Query(default=30, ge=1, le=100),
122
+ user: dict = Depends(get_current_user),
123
+ ) -> dict:
124
+ from src.anomaly.db import get_staleness_top
125
+ items = get_staleness_top(limit=limit)
126
+ return {"documents": items, "total": len(items)}
127
+
128
+
129
+ @router.get("/dependency-risk")
130
+ async def dependency_risk(
131
+ user: dict = Depends(get_current_user),
132
+ ) -> dict:
133
+ from src.anomaly.db import get_dependency_risk
134
+ items = get_dependency_risk()
135
+ return {"libraries": items, "total": len(items)}
src/anomaly/tasks.py ADDED
@@ -0,0 +1,406 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Anomaly detection algorithms β€” called from Celery tasks in src/celery_app.py.
2
+
3
+ Three top-level functions are exported:
4
+ run_zscore_anomaly_detection() β€” query spikes + escalation trend (every 15 min)
5
+ run_staleness_scoring() β€” document staleness risk (daily 03:00 UTC)
6
+ run_dependency_risk_modeling() β€” library risk + Poisson forecast (daily 03:30 UTC)
7
+
8
+ All use only stdlib (statistics, math) β€” no new pip dependencies.
9
+ """
10
+
11
+ from __future__ import annotations
12
+
13
+ import logging
14
+ import math
15
+ import statistics
16
+ from datetime import datetime, timedelta
17
+
18
+ logger = logging.getLogger(__name__)
19
+
20
+
21
+ # ── Z-score helpers ───────────────────────────────────────────────────────────
22
+
23
+ def _zscore_severity(z: float) -> str:
24
+ abs_z = abs(z)
25
+ if abs_z >= 5.0:
26
+ return "critical"
27
+ if abs_z >= 4.0:
28
+ return "high"
29
+ if abs_z >= 3.5:
30
+ return "medium"
31
+ return "low"
32
+
33
+
34
+ # ── 1. Z-score spike detection + escalation trend ────────────────────────────
35
+
36
+ def run_zscore_anomaly_detection() -> None:
37
+ """For every active team: compute hourly Z-scores and escalation rate trends.
38
+
39
+ Reads from query_events_hourly (pre-aggregated).
40
+ Writes to anomaly_signals via insert_signal() with 2-hour dedup suppression.
41
+ Fires WebSocket broadcast for critical/high signals (best-effort).
42
+ """
43
+ from src.anomaly.db import get_all_team_ids, get_hourly_counts, insert_signal
44
+
45
+ team_ids = get_all_team_ids()
46
+ now = datetime.utcnow()
47
+ current_hour = now.replace(minute=0, second=0, microsecond=0).isoformat()
48
+
49
+ for team_id in team_ids:
50
+ try:
51
+ rows = get_hourly_counts(team_id, days=14)
52
+ if len(rows) < 24:
53
+ continue
54
+
55
+ counts = [r["query_count"] for r in rows]
56
+ # Exclude the current (partial) hour from the baseline
57
+ baseline = counts[:-1]
58
+
59
+ if len(baseline) < 2:
60
+ continue
61
+
62
+ mean = statistics.mean(baseline)
63
+ stdev = statistics.stdev(baseline)
64
+
65
+ if stdev == 0:
66
+ continue
67
+
68
+ current_count = counts[-1]
69
+ z = (current_count - mean) / stdev
70
+
71
+ if z > 3.0:
72
+ signal = insert_signal(
73
+ signal_type="query_spike",
74
+ team_id=team_id,
75
+ entity_type="Team",
76
+ entity_id=team_id,
77
+ severity=_zscore_severity(z),
78
+ score=round(z, 3),
79
+ details={
80
+ "z_score": round(z, 3),
81
+ "current_count": current_count,
82
+ "baseline_mean": round(mean, 2),
83
+ "baseline_stdev": round(stdev, 2),
84
+ "hour_bucket": current_hour,
85
+ "window_hours": len(baseline),
86
+ },
87
+ )
88
+ if signal and signal.get("severity") in ("critical", "high"):
89
+ _try_broadcast({
90
+ "type": "escalation_spike",
91
+ "message": f"Query spike for team {team_id}: Z={z:.1f}",
92
+ "timestamp": now.isoformat(),
93
+ })
94
+
95
+ elif z < -2.0:
96
+ insert_signal(
97
+ signal_type="query_drop",
98
+ team_id=team_id,
99
+ entity_type="Team",
100
+ entity_id=team_id,
101
+ severity=_zscore_severity(z),
102
+ score=round(z, 3),
103
+ details={
104
+ "z_score": round(z, 3),
105
+ "current_count": current_count,
106
+ "baseline_mean": round(mean, 2),
107
+ "hour_bucket": current_hour,
108
+ },
109
+ )
110
+
111
+ _check_escalation_trend(team_id, rows, now)
112
+
113
+ except Exception:
114
+ logger.warning("zscore: error for team %s", team_id, exc_info=True)
115
+
116
+
117
+ def _check_escalation_trend(team_id: str, rows: list[dict], now: datetime) -> None:
118
+ from src.anomaly.db import insert_signal
119
+
120
+ cutoff_7d = now - timedelta(days=7)
121
+ cutoff_14d = now - timedelta(days=14)
122
+
123
+ current_window: list[dict] = []
124
+ prior_window: list[dict] = []
125
+
126
+ for r in rows:
127
+ try:
128
+ hb = datetime.fromisoformat(
129
+ str(r["hour_bucket"]).replace("Z", "+00:00")
130
+ ).replace(tzinfo=None)
131
+ except Exception:
132
+ continue
133
+ if hb >= cutoff_7d:
134
+ current_window.append(r)
135
+ elif hb >= cutoff_14d:
136
+ prior_window.append(r)
137
+
138
+ current_queries = sum(r["query_count"] for r in current_window)
139
+ current_escalations = sum(r["escalation_count"] for r in current_window)
140
+ prior_queries = sum(r["query_count"] for r in prior_window)
141
+ prior_escalations = sum(r["escalation_count"] for r in prior_window)
142
+
143
+ if current_queries < 10:
144
+ return
145
+
146
+ current_rate = current_escalations / current_queries if current_queries else 0.0
147
+ prior_rate = prior_escalations / prior_queries if prior_queries else 0.0
148
+
149
+ if prior_rate == 0.0:
150
+ return
151
+
152
+ ratio = current_rate / prior_rate
153
+ if ratio > 1.5:
154
+ severity = "high" if ratio > 2.5 else "medium"
155
+ insert_signal(
156
+ signal_type="escalation_trend",
157
+ team_id=team_id,
158
+ entity_type="Team",
159
+ entity_id=team_id,
160
+ severity=severity,
161
+ score=round(ratio, 3),
162
+ details={
163
+ "ratio": round(ratio, 3),
164
+ "current_rate": round(current_rate, 4),
165
+ "prior_rate": round(prior_rate, 4),
166
+ "current_total_queries": current_queries,
167
+ "prior_total_queries": prior_queries,
168
+ },
169
+ )
170
+
171
+
172
+ # ── 2. Staleness scoring ──────────────────────────────────────────────────────
173
+
174
+ def run_staleness_scoring() -> None:
175
+ """Compute staleness_risk = age_factor Γ— query_pressure for all documents.
176
+
177
+ age_factor = 1 βˆ’ exp(βˆ’age_days / 90) (exponential decay, half-life ~62 days)
178
+ query_pressure = min(1.0, monthly_team_queries / p95_monthly_team_queries)
179
+
180
+ Documents with staleness_risk < 0.1 are skipped to avoid noise.
181
+ Cleans up query_events older than 90 days at the end of each run.
182
+ """
183
+ from src.anomaly.db import insert_signal, purge_old_events
184
+ from src.auth.db import _client as _sb_client
185
+
186
+ sb = _sb_client()
187
+ now = datetime.utcnow()
188
+
189
+ # Load all documents
190
+ try:
191
+ docs = sb.table("documents").select("id,doc_id,title,team_id,updated_at").execute().data or []
192
+ except Exception:
193
+ logger.warning("staleness: failed to load documents", exc_info=True)
194
+ return
195
+
196
+ if not docs:
197
+ return
198
+
199
+ # Monthly query count per team (proxy for query pressure at team level)
200
+ cutoff_30d = (now - timedelta(days=30)).isoformat()
201
+ team_query_counts: dict[str, int] = {}
202
+ try:
203
+ rows = (
204
+ sb.table("query_events")
205
+ .select("team_id")
206
+ .gte("created_at", cutoff_30d)
207
+ .execute()
208
+ .data or []
209
+ )
210
+ for r in rows:
211
+ tid = r.get("team_id", "unknown")
212
+ team_query_counts[tid] = team_query_counts.get(tid, 0) + 1
213
+ except Exception:
214
+ logger.warning("staleness: team query count failed", exc_info=True)
215
+
216
+ # p95 of team query counts
217
+ count_values = sorted(team_query_counts.values()) or [1]
218
+ p95_idx = max(0, int(len(count_values) * 0.95) - 1)
219
+ p95_count = count_values[p95_idx] or 1
220
+
221
+ for doc in docs:
222
+ try:
223
+ try:
224
+ updated = datetime.fromisoformat(
225
+ str(doc["updated_at"]).replace("Z", "+00:00")
226
+ ).replace(tzinfo=None)
227
+ except Exception:
228
+ updated = now - timedelta(days=180)
229
+
230
+ age_days = max(0, (now - updated).days)
231
+ age_factor = 1.0 - math.exp(-age_days / 90.0)
232
+ monthly_count = team_query_counts.get(doc.get("team_id", ""), 0)
233
+ query_pressure = min(1.0, monthly_count / p95_count)
234
+ staleness_risk = round(age_factor * query_pressure, 4)
235
+
236
+ if staleness_risk < 0.1:
237
+ continue
238
+
239
+ severity = (
240
+ "critical" if staleness_risk >= 0.8 else
241
+ "high" if staleness_risk >= 0.6 else
242
+ "medium" if staleness_risk >= 0.3 else
243
+ "low"
244
+ )
245
+
246
+ insert_signal(
247
+ signal_type="staleness",
248
+ team_id=doc.get("team_id"),
249
+ entity_type="Document",
250
+ entity_id=doc.get("doc_id"),
251
+ severity=severity,
252
+ score=staleness_risk,
253
+ details={
254
+ "title": doc.get("title", ""),
255
+ "age_days": age_days,
256
+ "age_factor": round(age_factor, 4),
257
+ "query_pressure": round(query_pressure, 4),
258
+ "updated_at": doc.get("updated_at"),
259
+ },
260
+ )
261
+ except Exception:
262
+ logger.warning("staleness: error on doc %s", doc.get("doc_id"), exc_info=True)
263
+
264
+ purge_old_events()
265
+
266
+
267
+ # ── 3. Dependency risk modelling ──────────────────────────────────────────────
268
+
269
+ def run_dependency_risk_modeling() -> None:
270
+ """Score every Library node in Neo4j for dependency risk.
271
+
272
+ risk = 0.40 Γ— version_lag + 0.35 Γ— downstream_normalized + 0.25 Γ— incident_rate
273
+ poisson_30d = 1 βˆ’ exp(βˆ’(incident_count / 365) Γ— 30)
274
+ """
275
+ from src.anomaly.db import insert_signal
276
+
277
+ rows = _fetch_library_risk_rows()
278
+ if not rows:
279
+ logger.info("dep_risk: no library rows from Neo4j")
280
+ return
281
+
282
+ max_downstream = max((r.get("downstream_count", 0) for r in rows), default=1) or 1
283
+
284
+ for r in rows:
285
+ try:
286
+ name = r.get("name", "")
287
+ current_ver = r.get("current_version", "0.0.0")
288
+ latest_ver = r.get("latest_version", "0.0.0")
289
+ downstream = int(r.get("downstream_count", 0))
290
+ incident_count = int(r.get("incident_count", 0))
291
+
292
+ version_lag = _version_lag_score(current_ver, latest_ver)
293
+ downstream_normalized = min(1.0, downstream / max_downstream)
294
+ incident_rate = min(1.0, incident_count / 365.0)
295
+
296
+ risk = round(
297
+ 0.40 * version_lag +
298
+ 0.35 * downstream_normalized +
299
+ 0.25 * incident_rate,
300
+ 4,
301
+ )
302
+
303
+ lam = incident_count / 365.0
304
+ poisson_30d = round(1.0 - math.exp(-lam * 30), 4)
305
+
306
+ severity = (
307
+ "critical" if risk >= 0.7 else
308
+ "high" if risk >= 0.5 else
309
+ "medium" if risk >= 0.3 else
310
+ "low"
311
+ )
312
+
313
+ insert_signal(
314
+ signal_type="dependency_risk",
315
+ team_id=None,
316
+ entity_type="Library",
317
+ entity_id=name,
318
+ severity=severity,
319
+ score=risk,
320
+ details={
321
+ "library_name": name,
322
+ "current_version": current_ver,
323
+ "latest_version": latest_ver,
324
+ "version_lag": round(version_lag, 4),
325
+ "downstream_count": downstream,
326
+ "downstream_normalized": round(downstream_normalized, 4),
327
+ "incident_count": incident_count,
328
+ "incident_rate": round(incident_rate, 4),
329
+ "poisson_30d": poisson_30d,
330
+ },
331
+ )
332
+ except Exception:
333
+ logger.warning("dep_risk: error on library %s", r.get("name"), exc_info=True)
334
+
335
+
336
+ def _fetch_library_risk_rows() -> list[dict]:
337
+ import asyncio
338
+
339
+ async def _query() -> list[dict]:
340
+ from graph_store.config import settings as neo4j_cfg
341
+ from neo4j import AsyncGraphDatabase
342
+
343
+ driver = AsyncGraphDatabase.driver(
344
+ neo4j_cfg.neo4j_uri,
345
+ auth=(neo4j_cfg.neo4j_username, neo4j_cfg.neo4j_password),
346
+ )
347
+ try:
348
+ async with driver.session() as session:
349
+ result = await session.run("""
350
+ MATCH (lib:Library)
351
+ OPTIONAL MATCH (lib)<-[:DEPENDS_ON]-(downstream)
352
+ OPTIONAL MATCH (lib)<-[:CAUSED_BY]-(inc:Incident)
353
+ RETURN lib.name AS name,
354
+ coalesce(lib.version, '0.0.0') AS current_version,
355
+ coalesce(lib.latest_version, '0.0.0') AS latest_version,
356
+ count(DISTINCT downstream) AS downstream_count,
357
+ count(DISTINCT inc) AS incident_count
358
+ """)
359
+ return await result.data()
360
+ finally:
361
+ await driver.close()
362
+
363
+ try:
364
+ return asyncio.run(_query())
365
+ except Exception:
366
+ logger.warning("dep_risk: neo4j fetch failed", exc_info=True)
367
+ return []
368
+
369
+
370
+ def _version_lag_score(current: str, latest: str) -> float:
371
+ """Return 0.0–1.0 based on semver distance. Falls back to 0.5 on parse error."""
372
+ try:
373
+ def _parts(v: str) -> tuple[int, int, int]:
374
+ parts = v.lstrip("v").split(".")[:3]
375
+ ints = [(int(p) if p.isdigit() else 0) for p in (parts + ["0", "0", "0"])[:3]]
376
+ return ints[0], ints[1], ints[2]
377
+
378
+ c = _parts(current)
379
+ l = _parts(latest)
380
+ if l[0] > c[0]:
381
+ return 1.0 # major version behind
382
+ if l[1] > c[1]:
383
+ return 0.6 # minor version behind
384
+ if l[2] > c[2]:
385
+ return 0.2 # patch behind
386
+ return 0.0
387
+ except Exception:
388
+ return 0.5
389
+
390
+
391
+ # ── WebSocket broadcast (best-effort, in-process only) ───────────────────────
392
+
393
+ def _try_broadcast(payload: dict) -> None:
394
+ """Best-effort WebSocket push from a Celery worker.
395
+
396
+ Celery workers run in a separate process so _notification_clients in
397
+ src/ws/router.py will be empty here. This is intentional β€” real-time
398
+ push is handled by src/anomaly/notifier.py (in-process BackgroundTask).
399
+ This call is a no-op in the worker context but kept for testability.
400
+ """
401
+ try:
402
+ import asyncio
403
+ from src.ws.router import broadcast_notification
404
+ asyncio.run(broadcast_notification(payload))
405
+ except Exception:
406
+ pass
src/celery_app.py CHANGED
@@ -1,9 +1,13 @@
1
  """Celery app configuration and task setup."""
2
 
 
 
3
  from celery import Celery
4
  from kombu import Exchange, Queue
5
  from src.config import settings
6
 
 
 
7
  # Create Celery app
8
  app = Celery(settings.app_name)
9
 
@@ -93,6 +97,21 @@ def setup_periodic_tasks(sender, **kwargs):
93
  name="poll-error-traces",
94
  )
95
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
96
 
97
  # Task imports (to be implemented in tasks module)
98
  from celery import shared_task
@@ -124,8 +143,13 @@ def poll_server_logs(self):
124
 
125
  @shared_task(queue="polling", bind=True, max_retries=3)
126
  def poll_metrics_anomalies(self):
127
- """Poll metrics for anomalies."""
128
- pass
 
 
 
 
 
129
 
130
 
131
  @shared_task(queue="polling", bind=True, max_retries=3)
@@ -146,5 +170,27 @@ def enrich_and_index_document(self, document: dict):
146
  pass
147
 
148
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
149
  if __name__ == "__main__":
150
  app.start()
 
1
  """Celery app configuration and task setup."""
2
 
3
+ import logging
4
+
5
  from celery import Celery
6
  from kombu import Exchange, Queue
7
  from src.config import settings
8
 
9
+ logger = logging.getLogger(__name__)
10
+
11
  # Create Celery app
12
  app = Celery(settings.app_name)
13
 
 
97
  name="poll-error-traces",
98
  )
99
 
100
+ # Staleness scoring daily at 03:00 UTC
101
+ from celery.schedules import crontab
102
+ sender.add_periodic_task(
103
+ crontab(hour=3, minute=0),
104
+ compute_staleness_scores.s(),
105
+ name="compute-staleness-scores",
106
+ )
107
+
108
+ # Dependency risk daily at 03:30 UTC
109
+ sender.add_periodic_task(
110
+ crontab(hour=3, minute=30),
111
+ compute_dependency_risk.s(),
112
+ name="compute-dependency-risk",
113
+ )
114
+
115
 
116
  # Task imports (to be implemented in tasks module)
117
  from celery import shared_task
 
143
 
144
  @shared_task(queue="polling", bind=True, max_retries=3)
145
  def poll_metrics_anomalies(self):
146
+ """Z-score spike detection and escalation trend detection β€” runs every 15 min."""
147
+ try:
148
+ from src.anomaly.tasks import run_zscore_anomaly_detection
149
+ run_zscore_anomaly_detection()
150
+ except Exception as exc:
151
+ logger.error("poll_metrics_anomalies failed: %s", exc)
152
+ raise self.retry(exc=exc, countdown=120)
153
 
154
 
155
  @shared_task(queue="polling", bind=True, max_retries=3)
 
170
  pass
171
 
172
 
173
+ @shared_task(queue="low", bind=True, max_retries=2)
174
+ def compute_staleness_scores(self):
175
+ """Daily staleness risk scoring for all documents (03:00 UTC)."""
176
+ try:
177
+ from src.anomaly.tasks import run_staleness_scoring
178
+ run_staleness_scoring()
179
+ except Exception as exc:
180
+ logger.error("compute_staleness_scores failed: %s", exc)
181
+ raise self.retry(exc=exc, countdown=300)
182
+
183
+
184
+ @shared_task(queue="low", bind=True, max_retries=2)
185
+ def compute_dependency_risk(self):
186
+ """Daily dependency risk modelling from Neo4j graph (03:30 UTC)."""
187
+ try:
188
+ from src.anomaly.tasks import run_dependency_risk_modeling
189
+ run_dependency_risk_modeling()
190
+ except Exception as exc:
191
+ logger.error("compute_dependency_risk failed: %s", exc)
192
+ raise self.retry(exc=exc, countdown=300)
193
+
194
+
195
  if __name__ == "__main__":
196
  app.start()
supabase/anomaly_migration.sql ADDED
@@ -0,0 +1,86 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ -- ============================================================
2
+ -- Anomaly Detection Migration
3
+ -- Safe to run multiple times (all IF NOT EXISTS / ON CONFLICT).
4
+ -- Run AFTER rbac_migration.sql.
5
+ -- ============================================================
6
+
7
+ -- ── query_events: raw event persistence (90-day rolling window) ──────────────
8
+ CREATE TABLE IF NOT EXISTS query_events (
9
+ id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
10
+ event_id text NOT NULL UNIQUE, -- same UUID as Redis event["id"]
11
+ team_id text NOT NULL,
12
+ session_id text,
13
+ success boolean NOT NULL DEFAULT true,
14
+ duration_ms integer,
15
+ escalated boolean NOT NULL DEFAULT false,
16
+ guardrail_score float,
17
+ agent_metrics jsonb NOT NULL DEFAULT '{}',
18
+ created_at timestamptz NOT NULL DEFAULT now()
19
+ );
20
+
21
+ CREATE INDEX IF NOT EXISTS qe_team_time_idx ON query_events (team_id, created_at DESC);
22
+ CREATE INDEX IF NOT EXISTS qe_created_at_idx ON query_events (created_at DESC);
23
+ CREATE INDEX IF NOT EXISTS qe_escalated_idx ON query_events (team_id, escalated, created_at DESC);
24
+
25
+ -- ── query_events_hourly: pre-aggregated hourly buckets ────────────────────────
26
+ CREATE TABLE IF NOT EXISTS query_events_hourly (
27
+ team_id text NOT NULL,
28
+ hour_bucket timestamptz NOT NULL,
29
+ query_count integer NOT NULL DEFAULT 0,
30
+ escalation_count integer NOT NULL DEFAULT 0,
31
+ avg_duration_ms integer,
32
+ PRIMARY KEY (team_id, hour_bucket)
33
+ );
34
+
35
+ CREATE INDEX IF NOT EXISTS qeh_team_hour_idx ON query_events_hourly (team_id, hour_bucket DESC);
36
+
37
+ -- ── anomaly_signals: detected anomaly records ─────────────────────────────────
38
+ CREATE TABLE IF NOT EXISTS anomaly_signals (
39
+ id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
40
+ team_id text,
41
+ signal_type text NOT NULL,
42
+ -- query_spike | query_drop | escalation_trend | staleness | dependency_risk
43
+ entity_type text,
44
+ entity_id text,
45
+ severity text NOT NULL DEFAULT 'medium',
46
+ -- critical | high | medium | low
47
+ score float NOT NULL DEFAULT 0.0,
48
+ details jsonb NOT NULL DEFAULT '{}',
49
+ resolved boolean NOT NULL DEFAULT false,
50
+ resolved_by uuid REFERENCES users(id),
51
+ resolved_at timestamptz,
52
+ detected_at timestamptz NOT NULL DEFAULT now()
53
+ );
54
+
55
+ CREATE INDEX IF NOT EXISTS as_team_type_idx ON anomaly_signals (team_id, signal_type, detected_at DESC);
56
+ CREATE INDEX IF NOT EXISTS as_severity_idx ON anomaly_signals (severity, resolved, detected_at DESC);
57
+ CREATE INDEX IF NOT EXISTS as_resolved_idx ON anomaly_signals (resolved, team_id, detected_at DESC);
58
+ CREATE INDEX IF NOT EXISTS as_entity_idx ON anomaly_signals (entity_type, entity_id);
59
+
60
+ -- RLS: all three tables are service-role only β€” no anon/authenticated policies
61
+ -- needed because all reads go through the FastAPI backend using the service key.
62
+ ALTER TABLE query_events ENABLE ROW LEVEL SECURITY;
63
+ ALTER TABLE query_events_hourly ENABLE ROW LEVEL SECURITY;
64
+ ALTER TABLE anomaly_signals ENABLE ROW LEVEL SECURITY;
65
+
66
+ -- ── PostgreSQL function: in-DB hourly aggregation ────────────────────────────
67
+ -- Called via Supabase RPC so the worker never ships raw rows just to count them.
68
+ CREATE OR REPLACE FUNCTION aggregate_hourly_bucket(p_team_id text, p_hour timestamptz)
69
+ RETURNS void LANGUAGE plpgsql AS $$
70
+ BEGIN
71
+ INSERT INTO query_events_hourly (team_id, hour_bucket, query_count, escalation_count, avg_duration_ms)
72
+ SELECT
73
+ p_team_id,
74
+ p_hour,
75
+ count(*)::integer,
76
+ count(*) FILTER (WHERE escalated = true)::integer,
77
+ avg(duration_ms)::integer
78
+ FROM query_events
79
+ WHERE team_id = p_team_id
80
+ AND date_trunc('hour', created_at AT TIME ZONE 'UTC') = p_hour
81
+ ON CONFLICT (team_id, hour_bucket) DO UPDATE
82
+ SET query_count = EXCLUDED.query_count,
83
+ escalation_count = EXCLUDED.escalation_count,
84
+ avg_duration_ms = EXCLUDED.avg_duration_ms;
85
+ END;
86
+ $$;