Spaces:
Sleeping
Sleeping
| """Anomaly detection algorithms β called from Celery tasks in src/celery_app.py. | |
| Three top-level functions are exported: | |
| run_zscore_anomaly_detection() β query spikes + escalation trend (every 15 min) | |
| run_staleness_scoring() β document staleness risk (daily 03:00 UTC) | |
| run_dependency_risk_modeling() β library risk + Poisson forecast (daily 03:30 UTC) | |
| All use only stdlib (statistics, math) β no new pip dependencies. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import math | |
| import statistics | |
| from datetime import datetime, timedelta | |
| logger = logging.getLogger(__name__) | |
| # ββ Z-score helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _zscore_severity(z: float) -> str: | |
| abs_z = abs(z) | |
| if abs_z >= 5.0: | |
| return "critical" | |
| if abs_z >= 4.0: | |
| return "high" | |
| if abs_z >= 3.5: | |
| return "medium" | |
| return "low" | |
| # ββ 1. Z-score spike detection + escalation trend ββββββββββββββββββββββββββββ | |
| def run_zscore_anomaly_detection() -> None: | |
| """For every active team: compute hourly Z-scores and escalation rate trends. | |
| Reads from query_events_hourly (pre-aggregated). | |
| Writes to anomaly_signals via insert_signal() with 2-hour dedup suppression. | |
| Fires WebSocket broadcast for critical/high signals (best-effort). | |
| """ | |
| from src.anomaly.db import get_all_team_ids, get_hourly_counts, insert_signal | |
| team_ids = get_all_team_ids() | |
| now = datetime.utcnow() | |
| current_hour = now.replace(minute=0, second=0, microsecond=0).isoformat() | |
| for team_id in team_ids: | |
| try: | |
| rows = get_hourly_counts(team_id, days=14) | |
| if len(rows) < 24: | |
| continue | |
| counts = [r["query_count"] for r in rows] | |
| # Exclude the current (partial) hour from the baseline | |
| baseline = counts[:-1] | |
| if len(baseline) < 2: | |
| continue | |
| mean = statistics.mean(baseline) | |
| stdev = statistics.stdev(baseline) | |
| if stdev == 0: | |
| continue | |
| current_count = counts[-1] | |
| z = (current_count - mean) / stdev | |
| if z > 3.0: | |
| signal = insert_signal( | |
| signal_type="query_spike", | |
| team_id=team_id, | |
| entity_type="Team", | |
| entity_id=team_id, | |
| severity=_zscore_severity(z), | |
| score=round(z, 3), | |
| details={ | |
| "z_score": round(z, 3), | |
| "current_count": current_count, | |
| "baseline_mean": round(mean, 2), | |
| "baseline_stdev": round(stdev, 2), | |
| "hour_bucket": current_hour, | |
| "window_hours": len(baseline), | |
| }, | |
| ) | |
| if signal and signal.get("severity") in ("critical", "high"): | |
| _try_broadcast({ | |
| "type": "escalation_spike", | |
| "message": f"Query spike for team {team_id}: Z={z:.1f}", | |
| "timestamp": now.isoformat(), | |
| }) | |
| elif z < -2.0: | |
| insert_signal( | |
| signal_type="query_drop", | |
| team_id=team_id, | |
| entity_type="Team", | |
| entity_id=team_id, | |
| severity=_zscore_severity(z), | |
| score=round(z, 3), | |
| details={ | |
| "z_score": round(z, 3), | |
| "current_count": current_count, | |
| "baseline_mean": round(mean, 2), | |
| "hour_bucket": current_hour, | |
| }, | |
| ) | |
| _check_escalation_trend(team_id, rows, now) | |
| except Exception: | |
| logger.warning("zscore: error for team %s", team_id, exc_info=True) | |
| def _check_escalation_trend(team_id: str, rows: list[dict], now: datetime) -> None: | |
| from src.anomaly.db import insert_signal | |
| cutoff_7d = now - timedelta(days=7) | |
| cutoff_14d = now - timedelta(days=14) | |
| current_window: list[dict] = [] | |
| prior_window: list[dict] = [] | |
| for r in rows: | |
| try: | |
| hb = datetime.fromisoformat( | |
| str(r["hour_bucket"]).replace("Z", "+00:00") | |
| ).replace(tzinfo=None) | |
| except Exception: | |
| continue | |
| if hb >= cutoff_7d: | |
| current_window.append(r) | |
| elif hb >= cutoff_14d: | |
| prior_window.append(r) | |
| current_queries = sum(r["query_count"] for r in current_window) | |
| current_escalations = sum(r["escalation_count"] for r in current_window) | |
| prior_queries = sum(r["query_count"] for r in prior_window) | |
| prior_escalations = sum(r["escalation_count"] for r in prior_window) | |
| if current_queries < 10: | |
| return | |
| current_rate = current_escalations / current_queries if current_queries else 0.0 | |
| prior_rate = prior_escalations / prior_queries if prior_queries else 0.0 | |
| if prior_rate == 0.0: | |
| return | |
| ratio = current_rate / prior_rate | |
| if ratio > 1.5: | |
| severity = "high" if ratio > 2.5 else "medium" | |
| insert_signal( | |
| signal_type="escalation_trend", | |
| team_id=team_id, | |
| entity_type="Team", | |
| entity_id=team_id, | |
| severity=severity, | |
| score=round(ratio, 3), | |
| details={ | |
| "ratio": round(ratio, 3), | |
| "current_rate": round(current_rate, 4), | |
| "prior_rate": round(prior_rate, 4), | |
| "current_total_queries": current_queries, | |
| "prior_total_queries": prior_queries, | |
| }, | |
| ) | |
| # ββ 2. Staleness scoring ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_staleness_scoring() -> None: | |
| """Compute staleness_risk = age_factor Γ query_pressure for all documents. | |
| age_factor = 1 β exp(βage_days / 90) (exponential decay, half-life ~62 days) | |
| query_pressure = min(1.0, monthly_team_queries / p95_monthly_team_queries) | |
| Documents with staleness_risk < 0.1 are skipped to avoid noise. | |
| Cleans up query_events older than 90 days at the end of each run. | |
| """ | |
| from src.anomaly.db import insert_signal, purge_old_events | |
| from src.auth.db import _client as _sb_client | |
| sb = _sb_client() | |
| now = datetime.utcnow() | |
| # Load all documents | |
| try: | |
| docs = sb.table("documents").select("id,doc_id,title,team_id,updated_at").execute().data or [] | |
| except Exception: | |
| logger.warning("staleness: failed to load documents", exc_info=True) | |
| return | |
| if not docs: | |
| return | |
| # Monthly query count per team (proxy for query pressure at team level) | |
| cutoff_30d = (now - timedelta(days=30)).isoformat() | |
| team_query_counts: dict[str, int] = {} | |
| try: | |
| rows = ( | |
| sb.table("query_events") | |
| .select("team_id") | |
| .gte("created_at", cutoff_30d) | |
| .execute() | |
| .data or [] | |
| ) | |
| for r in rows: | |
| tid = r.get("team_id", "unknown") | |
| team_query_counts[tid] = team_query_counts.get(tid, 0) + 1 | |
| except Exception: | |
| logger.warning("staleness: team query count failed", exc_info=True) | |
| # p95 of team query counts | |
| count_values = sorted(team_query_counts.values()) or [1] | |
| p95_idx = max(0, int(len(count_values) * 0.95) - 1) | |
| p95_count = count_values[p95_idx] or 1 | |
| for doc in docs: | |
| try: | |
| try: | |
| updated = datetime.fromisoformat( | |
| str(doc["updated_at"]).replace("Z", "+00:00") | |
| ).replace(tzinfo=None) | |
| except Exception: | |
| updated = now - timedelta(days=180) | |
| age_days = max(0, (now - updated).days) | |
| age_factor = 1.0 - math.exp(-age_days / 90.0) | |
| monthly_count = team_query_counts.get(doc.get("team_id", ""), 0) | |
| query_pressure = min(1.0, monthly_count / p95_count) | |
| staleness_risk = round(age_factor * query_pressure, 4) | |
| if staleness_risk < 0.1: | |
| continue | |
| severity = ( | |
| "critical" if staleness_risk >= 0.8 else | |
| "high" if staleness_risk >= 0.6 else | |
| "medium" if staleness_risk >= 0.3 else | |
| "low" | |
| ) | |
| insert_signal( | |
| signal_type="staleness", | |
| team_id=doc.get("team_id"), | |
| entity_type="Document", | |
| entity_id=doc.get("doc_id"), | |
| severity=severity, | |
| score=staleness_risk, | |
| details={ | |
| "title": doc.get("title", ""), | |
| "age_days": age_days, | |
| "age_factor": round(age_factor, 4), | |
| "query_pressure": round(query_pressure, 4), | |
| "updated_at": doc.get("updated_at"), | |
| }, | |
| ) | |
| except Exception: | |
| logger.warning("staleness: error on doc %s", doc.get("doc_id"), exc_info=True) | |
| purge_old_events() | |
| # ββ 3. Dependency risk modelling ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_dependency_risk_modeling() -> None: | |
| """Score every Library node in Neo4j for dependency risk. | |
| risk = 0.40 Γ version_lag + 0.35 Γ downstream_normalized + 0.25 Γ incident_rate | |
| poisson_30d = 1 β exp(β(incident_count / 365) Γ 30) | |
| """ | |
| from src.anomaly.db import insert_signal | |
| rows = _fetch_library_risk_rows() | |
| if not rows: | |
| logger.info("dep_risk: no library rows from Neo4j") | |
| return | |
| max_downstream = max((r.get("downstream_count", 0) for r in rows), default=1) or 1 | |
| for r in rows: | |
| try: | |
| name = r.get("name", "") | |
| current_ver = r.get("current_version", "0.0.0") | |
| latest_ver = r.get("latest_version", "0.0.0") | |
| downstream = int(r.get("downstream_count", 0)) | |
| incident_count = int(r.get("incident_count", 0)) | |
| version_lag = _version_lag_score(current_ver, latest_ver) | |
| downstream_normalized = min(1.0, downstream / max_downstream) | |
| incident_rate = min(1.0, incident_count / 365.0) | |
| risk = round( | |
| 0.40 * version_lag + | |
| 0.35 * downstream_normalized + | |
| 0.25 * incident_rate, | |
| 4, | |
| ) | |
| lam = incident_count / 365.0 | |
| poisson_30d = round(1.0 - math.exp(-lam * 30), 4) | |
| severity = ( | |
| "critical" if risk >= 0.7 else | |
| "high" if risk >= 0.5 else | |
| "medium" if risk >= 0.3 else | |
| "low" | |
| ) | |
| insert_signal( | |
| signal_type="dependency_risk", | |
| team_id=None, | |
| entity_type="Library", | |
| entity_id=name, | |
| severity=severity, | |
| score=risk, | |
| details={ | |
| "library_name": name, | |
| "current_version": current_ver, | |
| "latest_version": latest_ver, | |
| "version_lag": round(version_lag, 4), | |
| "downstream_count": downstream, | |
| "downstream_normalized": round(downstream_normalized, 4), | |
| "incident_count": incident_count, | |
| "incident_rate": round(incident_rate, 4), | |
| "poisson_30d": poisson_30d, | |
| }, | |
| ) | |
| except Exception: | |
| logger.warning("dep_risk: error on library %s", r.get("name"), exc_info=True) | |
| def _fetch_library_risk_rows() -> list[dict]: | |
| import asyncio | |
| async def _query() -> list[dict]: | |
| from graph_store.config import settings as neo4j_cfg | |
| from neo4j import AsyncGraphDatabase | |
| driver = AsyncGraphDatabase.driver( | |
| neo4j_cfg.neo4j_uri, | |
| auth=(neo4j_cfg.neo4j_username, neo4j_cfg.neo4j_password), | |
| ) | |
| try: | |
| async with driver.session() as session: | |
| result = await session.run(""" | |
| MATCH (lib:Library) | |
| OPTIONAL MATCH (lib)<-[:DEPENDS_ON]-(downstream) | |
| OPTIONAL MATCH (lib)<-[:CAUSED_BY]-(inc:Incident) | |
| RETURN lib.name AS name, | |
| coalesce(lib.version, '0.0.0') AS current_version, | |
| coalesce(lib.latest_version, '0.0.0') AS latest_version, | |
| count(DISTINCT downstream) AS downstream_count, | |
| count(DISTINCT inc) AS incident_count | |
| """) | |
| return await result.data() | |
| finally: | |
| await driver.close() | |
| try: | |
| return asyncio.run(_query()) | |
| except Exception: | |
| logger.warning("dep_risk: neo4j fetch failed", exc_info=True) | |
| return [] | |
| def _version_lag_score(current: str, latest: str) -> float: | |
| """Return 0.0β1.0 based on semver distance. Falls back to 0.5 on parse error.""" | |
| try: | |
| def _parts(v: str) -> tuple[int, int, int]: | |
| parts = v.lstrip("v").split(".")[:3] | |
| ints = [(int(p) if p.isdigit() else 0) for p in (parts + ["0", "0", "0"])[:3]] | |
| return ints[0], ints[1], ints[2] | |
| c = _parts(current) | |
| l = _parts(latest) | |
| if l[0] > c[0]: | |
| return 1.0 # major version behind | |
| if l[1] > c[1]: | |
| return 0.6 # minor version behind | |
| if l[2] > c[2]: | |
| return 0.2 # patch behind | |
| return 0.0 | |
| except Exception: | |
| return 0.5 | |
| # ββ WebSocket broadcast (best-effort, in-process only) βββββββββββββββββββββββ | |
| def _try_broadcast(payload: dict) -> None: | |
| """Best-effort WebSocket push from a Celery worker. | |
| Celery workers run in a separate process so _notification_clients in | |
| src/ws/router.py will be empty here. This is intentional β real-time | |
| push is handled by src/anomaly/notifier.py (in-process BackgroundTask). | |
| This call is a no-op in the worker context but kept for testability. | |
| """ | |
| try: | |
| import asyncio | |
| from src.ws.router import broadcast_notification | |
| asyncio.run(broadcast_notification(payload)) | |
| except Exception: | |
| pass | |