Legal-i commited on
Commit
80839fe
·
verified ·
1 Parent(s): fa79a2d

Stage 193: drift forecast — projected severity

Browse files
Files changed (3) hide show
  1. core/forecast.py +184 -0
  2. infra/api/app.py +23 -0
  3. infra/service.py +39 -0
core/forecast.py ADDED
@@ -0,0 +1,184 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ core.forecast — Stage 193 drift trajectory projection.
3
+
4
+ The existing engine answers "what is the drift score TODAY?". This
5
+ module answers "what will it likely be NEXT WEEK if the recent
6
+ trajectory continues?" — useful for operators to triage the
7
+ "still-medium-but-trending-critical" entities before they cross
8
+ the threshold.
9
+
10
+ Math (intentionally simple — a transparent linear projection beats
11
+ an opaque ML model for an operator who needs to act on the number):
12
+
13
+ 1. Pull the entity's recent drift_score history (most-recent first).
14
+ 2. Fit a least-squares linear trend on the last N points.
15
+ 3. Project the score `horizon_days` ahead at the same slope.
16
+ 4. Clamp to [0, 1].
17
+ 5. Map to severity using the entity_type's thresholds.
18
+
19
+ Confidence is data-driven: with only 2 points (n=2) the fit is
20
+ brittle; with 10+ points the trend is meaningful. We use a soft
21
+ saturation: confidence = min(1.0, n_points / 8.0), so 8 history
22
+ points yields full confidence and fewer yields proportionally less.
23
+
24
+ The trend label is "worsening" / "stable" / "improving" based on
25
+ the slope's magnitude:
26
+ |slope| < 0.005/day → stable
27
+ slope > 0 → worsening (drift score rising)
28
+ slope < 0 → improving (drift score falling)
29
+ """
30
+ from __future__ import annotations
31
+
32
+ from dataclasses import dataclass
33
+ from typing import Dict, List, Optional
34
+
35
+ from .signals import _linear_slope
36
+
37
+
38
+ _DEFAULT_HORIZON_DAYS = 7
39
+ _STABLE_SLOPE_THRESHOLD = 0.005 # |slope| below this is "stable"
40
+
41
+
42
+ @dataclass
43
+ class ForecastResult:
44
+ entity_id: str
45
+ n_history_points: int
46
+ current_score: float
47
+ current_severity: str
48
+ horizon_days: int
49
+ projected_score: float
50
+ projected_severity: str
51
+ trend: str # worsening | stable | improving
52
+ slope_per_day: float # raw slope, for explanation
53
+ confidence: float # 0.0 - 1.0
54
+
55
+ def to_dict(self) -> Dict:
56
+ return {
57
+ "entity_id": self.entity_id,
58
+ "n_history_points": self.n_history_points,
59
+ "current_score": round(self.current_score, 4),
60
+ "current_severity": self.current_severity,
61
+ "horizon_days": self.horizon_days,
62
+ "projected_score": round(self.projected_score, 4),
63
+ "projected_severity": self.projected_severity,
64
+ "trend": self.trend,
65
+ "slope_per_day": round(self.slope_per_day, 6),
66
+ "confidence": round(self.confidence, 3),
67
+ }
68
+
69
+
70
+ def severity_from_score(score: float,
71
+ thresholds: Dict[str, float]) -> str:
72
+ """Tiny mirror of core.drift.severity_from_score that takes a
73
+ plain dict — kept inline so the forecast module doesn't drag
74
+ in the full drift module just for one lookup."""
75
+ if score >= thresholds.get("critical", 0.75):
76
+ return "critical"
77
+ if score >= thresholds.get("high", 0.55):
78
+ return "high"
79
+ if score >= thresholds.get("medium", 0.35):
80
+ return "medium"
81
+ return "low"
82
+
83
+
84
+ def forecast(
85
+ entity_id: str,
86
+ history: List[Dict],
87
+ *,
88
+ horizon_days: int = _DEFAULT_HORIZON_DAYS,
89
+ severity_thresholds: Optional[Dict[str, float]] = None,
90
+ ) -> ForecastResult:
91
+ """Project the entity's drift score ``horizon_days`` ahead.
92
+
93
+ ``history`` is a list of dicts with at least ``score`` and
94
+ ``severity`` keys, ordered MOST-RECENT FIRST (the same shape
95
+ returned by ``RunRepository.entity_score_history``). The first
96
+ element is treated as the current state. The slope is fit on
97
+ the chronological order (newest last in math, even though the
98
+ input is reversed).
99
+
100
+ Empty history → returns a "no data" forecast with confidence 0
101
+ and trend "stable" so the caller doesn't crash.
102
+ """
103
+ if severity_thresholds is None:
104
+ # Reasonable defaults matching the engine's defaults.
105
+ severity_thresholds = {
106
+ "critical": 0.75, "high": 0.55, "medium": 0.35,
107
+ }
108
+ if not history:
109
+ return ForecastResult(
110
+ entity_id=entity_id,
111
+ n_history_points=0,
112
+ current_score=0.0,
113
+ current_severity="low",
114
+ horizon_days=horizon_days,
115
+ projected_score=0.0,
116
+ projected_severity="low",
117
+ trend="stable",
118
+ slope_per_day=0.0,
119
+ confidence=0.0,
120
+ )
121
+ # Reverse so we work in chronological order (oldest → newest).
122
+ chrono = list(reversed(history))
123
+ scores = [float(h["score"]) for h in chrono
124
+ if h.get("score") is not None]
125
+ if not scores:
126
+ return ForecastResult(
127
+ entity_id=entity_id,
128
+ n_history_points=0,
129
+ current_score=0.0,
130
+ current_severity="low",
131
+ horizon_days=horizon_days,
132
+ projected_score=0.0,
133
+ projected_severity="low",
134
+ trend="stable",
135
+ slope_per_day=0.0,
136
+ confidence=0.0,
137
+ )
138
+ current_score = scores[-1]
139
+ current_severity = (history[0].get("severity")
140
+ or severity_from_score(current_score,
141
+ severity_thresholds))
142
+ if len(scores) < 2:
143
+ # One point — no trend to extract. Project flat with low
144
+ # confidence so the dashboard shows "(not enough data)".
145
+ return ForecastResult(
146
+ entity_id=entity_id,
147
+ n_history_points=len(scores),
148
+ current_score=current_score,
149
+ current_severity=current_severity,
150
+ horizon_days=horizon_days,
151
+ projected_score=current_score,
152
+ projected_severity=current_severity,
153
+ trend="stable",
154
+ slope_per_day=0.0,
155
+ confidence=0.1,
156
+ )
157
+ # Slope is "score units per index step". The history points are
158
+ # successive runs which are typically daily; we treat one step
159
+ # as one day. A future stage could pull actual wall-clock deltas
160
+ # if customers run hourly or weekly schedules.
161
+ slope = _linear_slope(scores)
162
+ projected_raw = current_score + slope * horizon_days
163
+ projected_clamped = max(0.0, min(1.0, projected_raw))
164
+ if abs(slope) < _STABLE_SLOPE_THRESHOLD:
165
+ trend = "stable"
166
+ elif slope > 0:
167
+ trend = "worsening"
168
+ else:
169
+ trend = "improving"
170
+ # Soft saturation on history depth. 8 points = full confidence.
171
+ confidence = min(1.0, len(scores) / 8.0)
172
+ return ForecastResult(
173
+ entity_id=entity_id,
174
+ n_history_points=len(scores),
175
+ current_score=current_score,
176
+ current_severity=current_severity,
177
+ horizon_days=horizon_days,
178
+ projected_score=projected_clamped,
179
+ projected_severity=severity_from_score(
180
+ projected_clamped, severity_thresholds),
181
+ trend=trend,
182
+ slope_per_day=slope,
183
+ confidence=confidence,
184
+ )
infra/api/app.py CHANGED
@@ -2025,6 +2025,29 @@ def create_app(db_path: Optional[str] = None,
2025
  decision_limit=decision_limit,
2026
  )
2027
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2028
  # Stage 192 — peer benchmarking. How does this entity rank
2029
  # against other entities of the same entity_type within the
2030
  # tenant? Per-metric percentile + median + IQR + range. Empty
 
2025
  decision_limit=decision_limit,
2026
  )
2027
 
2028
+ # Stage 193 — drift forecast. "If the recent trajectory keeps
2029
+ # going, what severity will this entity be at in N days?"
2030
+ # Readonly tier — pure projection from existing run history.
2031
+ @app.get("/tenants/{tenant_id}/entities/{entity_id}/forecast",
2032
+ tags=["runs"])
2033
+ async def get_entity_forecast_route(
2034
+ tenant_id: str,
2035
+ entity_id: str,
2036
+ horizon_days: int = Query(default=7, ge=1, le=90),
2037
+ history_limit: int = Query(default=14, ge=2, le=100),
2038
+ key: ApiKey = Depends(auth_dep),
2039
+ ):
2040
+ require_tenant_access(key, tenant_id)
2041
+ require_role(key, ROLE_READONLY)
2042
+ try:
2043
+ return svc.get_entity_forecast(
2044
+ tenant_id, entity_id,
2045
+ horizon_days=horizon_days,
2046
+ history_limit=history_limit,
2047
+ )
2048
+ except ValueError as e:
2049
+ raise ApiError("bad_request", str(e), status=400) from e
2050
+
2051
  # Stage 192 — peer benchmarking. How does this entity rank
2052
  # against other entities of the same entity_type within the
2053
  # tenant? Per-metric percentile + median + IQR + range. Empty
infra/service.py CHANGED
@@ -1075,6 +1075,45 @@ class OrgStateService:
1075
  )
1076
  return row
1077
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1078
  # --- Stage 192 — peer benchmarking ---------------------------------
1079
 
1080
  def get_entity_peer_comparison(
 
1075
  )
1076
  return row
1077
 
1078
+ # --- Stage 193 — drift forecast ------------------------------------
1079
+
1080
+ def get_entity_forecast(
1081
+ self,
1082
+ tenant_id: str,
1083
+ entity_id: str,
1084
+ *,
1085
+ horizon_days: int = 7,
1086
+ history_limit: int = 14,
1087
+ ) -> dict:
1088
+ """Stage 193 — project the entity's drift score `horizon_days`
1089
+ ahead based on the recent trajectory. Returns the
1090
+ ForecastResult.to_dict() shape.
1091
+
1092
+ Defaults: 7-day horizon, 14-point history window. The 14
1093
+ matches the engine's default baseline_window so the forecast
1094
+ sees roughly the same recent context the next run would.
1095
+
1096
+ Read-only — derives entirely from existing run history. Cheap,
1097
+ polite to call on every drilldown render.
1098
+ """
1099
+ from core.forecast import forecast as _forecast
1100
+
1101
+ self._require_tenant(tenant_id)
1102
+ if horizon_days < 1 or horizon_days > 90:
1103
+ raise ValueError(
1104
+ f"horizon_days must be in [1, 90], got {horizon_days!r}",
1105
+ )
1106
+ history = self.runs.entity_score_history(
1107
+ tenant_id, entity_id, limit=history_limit,
1108
+ )
1109
+ # entity_score_history returns most-recent first; forecast()
1110
+ # expects that shape.
1111
+ result = _forecast(
1112
+ entity_id, history,
1113
+ horizon_days=horizon_days,
1114
+ )
1115
+ return result.to_dict()
1116
+
1117
  # --- Stage 192 — peer benchmarking ---------------------------------
1118
 
1119
  def get_entity_peer_comparison(