Legal-i commited on
Commit
c9ceb89
Β·
verified Β·
1 Parent(s): 17bcdd5

Stage 211: forecast.threshold_crossed (infra/service.py)

Browse files
Files changed (1) hide show
  1. infra/service.py +212 -1
infra/service.py CHANGED
@@ -1538,6 +1538,208 @@ class OrgStateService:
1538
  self._forecast_summary_cache[cache_key] = (now, payload)
1539
  return payload
1540
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1541
  # --- Stage 192 β€” peer benchmarking ---------------------------------
1542
 
1543
  def get_entity_peer_comparison(
@@ -3191,6 +3393,12 @@ class OrgStateService:
3191
  # Subscribers can pick this INSTEAD of decision.status_changed
3192
  # to cut Slack/Pagerduty noise on large sweeps.
3193
  "decisions.bulk_changed",
 
 
 
 
 
 
3194
  )
3195
 
3196
  @staticmethod
@@ -3222,7 +3430,10 @@ class OrgStateService:
3222
  # piggy-backs on the same bulk endpoint that already fires N
3223
  # individual decision.status_changed events; a NULL subscription
3224
  # that received both would double-count every bulk sweep.
3225
- _OPT_IN_ONLY_EVENTS = frozenset({"decisions.bulk_changed"})
 
 
 
3226
 
3227
  def _webhook_skips_event(self, webhook_id: str,
3228
  event: Optional[str]) -> bool:
 
1538
  self._forecast_summary_cache[cache_key] = (now, payload)
1539
  return payload
1540
 
1541
+ # --- Stage 211 β€” forecast.threshold_crossed --------------------
1542
+
1543
+ # Default critical-score threshold matches
1544
+ # core.forecast.severity_from_score default. Same constant the
1545
+ # ForecastBadge / EntityDrilldown use client-side.
1546
+ _CRITICAL_SCORE_THRESHOLD = 0.75
1547
+ # Default early-warning window for forecast.threshold_crossed.
1548
+ # Operators wanting tighter/looser windows pass days_threshold.
1549
+ _DEFAULT_DAYS_THRESHOLD = 7
1550
+ # Audit dedupe window β€” entities already fired in the past
1551
+ # ``_THRESHOLD_DEDUPE_DAYS`` are skipped on the next check call
1552
+ # so a daily cron doesn't paste the same five entities into
1553
+ # Slack every morning. 7 days matches the default early-warning
1554
+ # window β€” once you've been warned about an entity, you have a
1555
+ # week before another reminder.
1556
+ _THRESHOLD_DEDUPE_DAYS = 7
1557
+
1558
+ @staticmethod
1559
+ def _compute_days_to_critical(
1560
+ current_score: float,
1561
+ slope_per_day: float,
1562
+ critical_threshold: float = 0.75,
1563
+ ) -> Optional[int]:
1564
+ """Return the projected days until the score crosses the
1565
+ critical threshold at the current slope. Returns None when
1566
+ the answer isn't meaningful (already past threshold, or
1567
+ not worsening).
1568
+
1569
+ Uses a small epsilon on the ceil() to absorb floating-
1570
+ point precision artifacts β€” e.g., (0.75 - 0.60) / 0.03
1571
+ evaluates to 5.000000000000001 in IEEE-754, which would
1572
+ round up to 6 days without the epsilon.
1573
+ """
1574
+ if current_score >= critical_threshold:
1575
+ return 0
1576
+ if slope_per_day <= 0:
1577
+ return None
1578
+ import math as _math
1579
+ raw = (critical_threshold - current_score) / slope_per_day
1580
+ return int(_math.ceil(raw - 1e-9))
1581
+
1582
+ def check_forecast_thresholds(
1583
+ self,
1584
+ tenant_id: str,
1585
+ *,
1586
+ days_threshold: int = _DEFAULT_DAYS_THRESHOLD,
1587
+ confidence_min: float = 0.2,
1588
+ entity_cap: int = 500,
1589
+ actor: str = "system",
1590
+ ) -> dict:
1591
+ """Stage 211 β€” scan every entity in the tenant for a forecast
1592
+ whose projected days_to_critical falls inside
1593
+ ``days_threshold``, fire ``forecast.threshold_crossed`` for
1594
+ any NEW entrants (per the audit-log dedupe window), and
1595
+ record an audit row per firing.
1596
+
1597
+ "New entrant" = not already fired-and-audited within the
1598
+ past _THRESHOLD_DEDUPE_DAYS. This is computed from the
1599
+ audit log so it survives process restarts.
1600
+
1601
+ Returns ``{"checked": N, "fired": [list of entity payloads],
1602
+ "skipped_recent": [entity_ids], "ts": iso}`` for the caller
1603
+ to log / display.
1604
+
1605
+ Intended to be invoked by a daily cron (POST endpoint exists
1606
+ for manual trigger; a cron stage can wrap it).
1607
+ """
1608
+ self._require_tenant(tenant_id)
1609
+ if days_threshold < 1 or days_threshold > 90:
1610
+ raise ValueError(
1611
+ f"days_threshold must be in [1, 90], got "
1612
+ f"{days_threshold!r}",
1613
+ )
1614
+ if not (0.0 <= confidence_min <= 1.0):
1615
+ raise ValueError(
1616
+ "confidence_min must be in [0.0, 1.0], "
1617
+ f"got {confidence_min!r}",
1618
+ )
1619
+
1620
+ # 1. Enumerate entities + their forecasts at the standard
1621
+ # 7-day horizon (the days_to_critical computation is
1622
+ # horizon-independent β€” it uses slope_per_day directly).
1623
+ entity_ids = self.runs.list_distinct_entity_ids(
1624
+ tenant_id, limit=entity_cap,
1625
+ )
1626
+ if not entity_ids:
1627
+ from datetime import datetime as _dt
1628
+ from datetime import timezone as _tz
1629
+ return {
1630
+ "checked": 0, "fired": [], "skipped_recent": [],
1631
+ "ts": _dt.now(_tz.utc).isoformat(),
1632
+ }
1633
+ forecasts = self.get_forecasts_batch(
1634
+ tenant_id, entity_ids, horizon_days=7,
1635
+ ) if len(entity_ids) <= 200 else [
1636
+ self.get_entity_forecast(tenant_id, eid, horizon_days=7)
1637
+ for eid in entity_ids
1638
+ ]
1639
+
1640
+ # 2. Build the dedupe set: entity_ids that already fired
1641
+ # forecast.threshold_crossed within the past window. Query
1642
+ # is per-tenant so cross-tenant noise can't leak in.
1643
+ from datetime import datetime as _dt
1644
+ from datetime import timedelta as _td
1645
+ from datetime import timezone as _tz
1646
+ cutoff = (
1647
+ _dt.now(_tz.utc) - _td(days=self._THRESHOLD_DEDUPE_DAYS)
1648
+ ).isoformat()
1649
+ recent_rows = self.audit.list(
1650
+ action="forecast.threshold_crossed",
1651
+ tenant_id=tenant_id,
1652
+ since=cutoff,
1653
+ limit=10000,
1654
+ )
1655
+ already_fired: set = set()
1656
+ for r in recent_rows:
1657
+ payload = r.get("payload") or {}
1658
+ eid = payload.get("entity_id") if isinstance(payload, dict) else None
1659
+ if isinstance(eid, str):
1660
+ already_fired.add(eid)
1661
+
1662
+ # 3. Walk forecasts, fire on qualifying NEW entrants.
1663
+ fired: List[dict] = []
1664
+ skipped_recent: List[str] = []
1665
+ ts_iso = _dt.now(_tz.utc).isoformat()
1666
+ for f in forecasts:
1667
+ conf = f.get("confidence") or 0.0
1668
+ if conf < confidence_min:
1669
+ continue
1670
+ current = f.get("current_score") or 0.0
1671
+ slope = f.get("slope_per_day") or 0.0
1672
+ dtc = self._compute_days_to_critical(
1673
+ current, slope, self._CRITICAL_SCORE_THRESHOLD,
1674
+ )
1675
+ if dtc is None or dtc > days_threshold:
1676
+ continue
1677
+ eid = f.get("entity_id")
1678
+ if not isinstance(eid, str):
1679
+ continue
1680
+ if eid in already_fired:
1681
+ skipped_recent.append(eid)
1682
+ continue
1683
+ entry = {
1684
+ "entity_id": eid,
1685
+ "days_to_critical": dtc,
1686
+ "current_score": current,
1687
+ "projected_score": f.get("projected_score"),
1688
+ "slope_per_day": slope,
1689
+ "confidence": conf,
1690
+ }
1691
+ self._fire_forecast_threshold_crossed_event(
1692
+ tenant_id=tenant_id, entry=entry, ts=ts_iso,
1693
+ )
1694
+ self.audit.log(
1695
+ actor, "forecast.threshold_crossed",
1696
+ tenant_id=tenant_id,
1697
+ payload=entry,
1698
+ )
1699
+ fired.append(entry)
1700
+ # Avoid double-firing within the same call if list
1701
+ # somehow re-encounters the entity.
1702
+ already_fired.add(eid)
1703
+
1704
+ return {
1705
+ "checked": len(forecasts),
1706
+ "fired": fired,
1707
+ "skipped_recent": skipped_recent,
1708
+ "ts": ts_iso,
1709
+ }
1710
+
1711
+ def _fire_forecast_threshold_crossed_event(
1712
+ self, *,
1713
+ tenant_id: str,
1714
+ entry: dict,
1715
+ ts: str,
1716
+ ) -> None:
1717
+ """Stage 211 β€” best-effort emission of
1718
+ forecast.threshold_crossed. Same swallow-and-log discipline
1719
+ as the other event fires; a delivery failure must never
1720
+ roll back the audit-log write (the audit row is the source
1721
+ of truth for the dedupe window)."""
1722
+ payload = {
1723
+ "event": "forecast.threshold_crossed",
1724
+ "tenant_id": tenant_id,
1725
+ "entity_id": entry["entity_id"],
1726
+ "days_to_critical": entry["days_to_critical"],
1727
+ "current_score": entry["current_score"],
1728
+ "projected_score": entry["projected_score"],
1729
+ "slope_per_day": entry["slope_per_day"],
1730
+ "confidence": entry["confidence"],
1731
+ "ts": ts,
1732
+ }
1733
+ try:
1734
+ self.deliver_webhooks(tenant_id, payload)
1735
+ except Exception as exc: # noqa: BLE001
1736
+ import logging as _log
1737
+ _log.getLogger(__name__).warning(
1738
+ "forecast.threshold_crossed webhook emission "
1739
+ "failed for tenant %r entity %r: %s",
1740
+ tenant_id, entry.get("entity_id"), exc,
1741
+ )
1742
+
1743
  # --- Stage 192 β€” peer benchmarking ---------------------------------
1744
 
1745
  def get_entity_peer_comparison(
 
3393
  # Subscribers can pick this INSTEAD of decision.status_changed
3394
  # to cut Slack/Pagerduty noise on large sweeps.
3395
  "decisions.bulk_changed",
3396
+ # Stage 211 β€” fired when an entity's projected days-to-critical
3397
+ # falls below the threshold (default 7). Operators subscribe
3398
+ # for early-warning Slack pages instead of reading the
3399
+ # trajectory chip manually each morning. Opt-in like the bulk
3400
+ # event β€” NULL subscriptions don't get it.
3401
+ "forecast.threshold_crossed",
3402
  )
3403
 
3404
  @staticmethod
 
3430
  # piggy-backs on the same bulk endpoint that already fires N
3431
  # individual decision.status_changed events; a NULL subscription
3432
  # that received both would double-count every bulk sweep.
3433
+ _OPT_IN_ONLY_EVENTS = frozenset({
3434
+ "decisions.bulk_changed",
3435
+ "forecast.threshold_crossed", # Stage 211
3436
+ })
3437
 
3438
  def _webhook_skips_event(self, webhook_id: str,
3439
  event: Optional[str]) -> bool: