| """ |
| delivery.backtest — "what would we have caught had we been running?" |
| (Stage 16). |
| |
| Stage 15 ran the engine once on a Salesforce export and surfaced zero |
| current issues, because the LAST 14 days of that file were stable. But |
| the per-state scores hinted at historical drift events. This module |
| turns that hint into a measurable timeline: it walks a cursor forward |
| day-by-day across the dataset, runs the engine on the window ending at |
| each cursor, and records what the engine would have alerted on. |
| |
| This is operationally useful for two reasons: |
| |
| 1. **Pitch**: a customer who hands us an export can see the engine |
| replayed against their own history — "if you'd been running this |
| since day 1, here are the events you would have been told about." |
| 2. **Threshold tuning**: backtest counts per severity per month |
| show whether the defaults are too noisy or too quiet on this |
| customer's data, before flipping the switch on live alerting. |
| |
| stdlib only. Calibration is re-derived per cursor from the data up to |
| that cursor (the engine never gets to see the future). |
| """ |
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| from dataclasses import dataclass, field |
| from datetime import date, timedelta |
| from pathlib import Path |
| from typing import Dict, List, Optional |
|
|
| from core import calibrate_from_observations, run_pipeline |
| from core.config import EntityTypeConfig |
| from core.pipeline import Observation |
| from verticals import get_vertical_config |
|
|
|
|
| @dataclass |
| class BacktestPoint: |
| """One step in the rolling backtest. ``cursor`` is the day the engine |
| is "as of"; the issues are what it would have alerted on at that |
| point. Issues are stored as plain dicts (asdict of DriftIssue) so |
| downstream rendering doesn't need to import from core.""" |
| cursor: str |
| n_states: int |
| n_issues: int |
| severity_counts: Dict[str, int] = field(default_factory=dict) |
| issues: List[dict] = field(default_factory=list) |
|
|
|
|
| @dataclass |
| class BacktestResult: |
| tenant_id: str |
| entity_type: str |
| step_days: int |
| n_steps: int |
| points: List[BacktestPoint] = field(default_factory=list) |
| |
| n_issues_total: int = 0 |
| n_unique_entities_ever_flagged: int = 0 |
| per_severity_total: Dict[str, int] = field(default_factory=dict) |
|
|
|
|
| @dataclass |
| class RecurringEntity: |
| """An entity that drifted in multiple distinct windows. Stage 17 — |
| the single most actionable signal that came out of running the |
| backtest on real data: an owner who keeps drifting is a systemic |
| issue, not a one-off.""" |
| entity_id: str |
| n_events: int |
| first_cursor: str |
| last_cursor: str |
| top_severity: str |
| cursors: List[str] = field(default_factory=list) |
|
|
|
|
| |
| |
| _SEV_RANK = {"critical": 0, "high": 1, "medium": 2, "low": 3} |
|
|
|
|
| def find_recurring_entities( |
| result: BacktestResult, |
| *, |
| min_events: int = 3, |
| ) -> List[RecurringEntity]: |
| """Return entities flagged in at least ``min_events`` distinct backtest |
| windows, ordered most-recurring first (ties broken by recency of last |
| event, then by entity_id for stability). |
| |
| A "recurring" entity is structurally different from a one-off: the |
| same problem keeps coming back, which is exactly what an operator |
| wants surfaced for proactive intervention. |
| """ |
| from collections import defaultdict |
| per_entity_cursors: Dict[str, List[str]] = defaultdict(list) |
| per_entity_worst: Dict[str, str] = {} |
| for p in result.points: |
| for issue in p.issues: |
| eid = issue["entity_id"] |
| per_entity_cursors[eid].append(p.cursor) |
| sev = issue["severity"] |
| current = per_entity_worst.get(eid) |
| if current is None or _SEV_RANK.get(sev, 99) < _SEV_RANK.get(current, 99): |
| per_entity_worst[eid] = sev |
|
|
| out: List[RecurringEntity] = [] |
| for eid, cursors in per_entity_cursors.items(): |
| if len(cursors) >= min_events: |
| ordered = sorted(cursors) |
| out.append(RecurringEntity( |
| entity_id=eid, |
| n_events=len(cursors), |
| first_cursor=ordered[0], |
| last_cursor=ordered[-1], |
| top_severity=per_entity_worst[eid], |
| cursors=ordered, |
| )) |
| out.sort(key=lambda r: (-r.n_events, _negate_iso(r.last_cursor), |
| r.entity_id)) |
| return out |
|
|
|
|
| def _negate_iso(iso_day: str) -> str: |
| """Helper so ``sort`` treats more-recent cursors as 'smaller'. We |
| can't negate a string, so we exploit ISO-8601's lexicographic |
| ordering and use a reverse-by-padding trick: a higher day produces |
| a 'smaller' key after we subtract from a sentinel. The lazy way is |
| to use a separate reverse list, but we want it inline.""" |
| |
| flip = str.maketrans("0123456789", "9876543210") |
| return iso_day.translate(flip) |
|
|
|
|
| def _compute_event_details( |
| observations: List[Observation], |
| entity_id: str, |
| cursor_iso: str, |
| config: EntityTypeConfig, |
| ) -> Dict[str, dict]: |
| """Per-metric ``{baseline_mean, recent_mean, delta_pct}`` for one |
| ``(entity, cursor)`` event (Stage 19). |
| |
| This is the operator-facing explanation of *why* the engine flagged |
| the entity at that cursor — far more actionable than the bare drift |
| score. Windows use the same boundaries the engine itself used: |
| |
| recent = (cursor - recent_window + 1, cursor] |
| baseline = (cursor - recent_window - baseline_lag - baseline_window + 1, |
| cursor - recent_window - baseline_lag] |
| """ |
| cursor = date.fromisoformat(cursor_iso) |
| recent_start = cursor - timedelta(days=config.recent_window - 1) |
| baseline_end = recent_start - timedelta(days=config.baseline_lag + 1) |
| baseline_start = baseline_end - timedelta(days=config.baseline_window - 1) |
| rs, re_, bs, be = (recent_start.isoformat(), cursor_iso, |
| baseline_start.isoformat(), baseline_end.isoformat()) |
|
|
| metric_names = [m.name for m in config.metrics] |
| baseline_vals: Dict[str, list] = {m: [] for m in metric_names} |
| recent_vals: Dict[str, list] = {m: [] for m in metric_names} |
| for o in observations: |
| if o.entity_id != entity_id: |
| continue |
| if rs <= o.day <= re_: |
| for m in metric_names: |
| v = o.values.get(m) |
| if v is not None: |
| recent_vals[m].append(float(v)) |
| elif bs <= o.day <= be: |
| for m in metric_names: |
| v = o.values.get(m) |
| if v is not None: |
| baseline_vals[m].append(float(v)) |
|
|
| out: Dict[str, dict] = {} |
| for m in metric_names: |
| b = baseline_vals[m] |
| r = recent_vals[m] |
| bm = (sum(b) / len(b)) if b else None |
| rm = (sum(r) / len(r)) if r else None |
| if bm is not None and rm is not None and bm != 0: |
| delta_pct = (rm - bm) / abs(bm) * 100.0 |
| else: |
| delta_pct = None |
| out[m] = { |
| "baseline_mean": bm, |
| "recent_mean": rm, |
| "delta_pct": delta_pct, |
| } |
| return out |
|
|
|
|
| def run_backtest( |
| observations: List[Observation], |
| config: EntityTypeConfig, |
| *, |
| step_days: int = 7, |
| tenant_id: str = "backtest", |
| ) -> BacktestResult: |
| """Walk a cursor forward in ``step_days`` increments. At each cursor, |
| re-calibrate on the observations up to cursor and run the pipeline. |
| |
| The engine never sees the future at any cursor — each step is a |
| causal as-of-that-day snapshot. Calibration is re-derived rather |
| than reused because what the engine deems "normal" itself evolves. |
| |
| Each issue dict gains a ``details`` field (Stage 19) — per-metric |
| baseline-vs-recent comparison that powers the "What changed" |
| explanation on the entity drill-down page. |
| """ |
| if not observations: |
| return BacktestResult(tenant_id=tenant_id, |
| entity_type=config.entity_type, |
| step_days=step_days, n_steps=0) |
| days = sorted({o.day for o in observations}) |
| start_iso = days[0] |
| end_iso = days[-1] |
| start = date.fromisoformat(start_iso) |
| end = date.fromisoformat(end_iso) |
| |
| |
| min_history = (config.baseline_window + config.baseline_lag |
| + config.recent_window) |
| first_cursor = start + timedelta(days=min_history - 1) |
| if first_cursor > end: |
| return BacktestResult(tenant_id=tenant_id, |
| entity_type=config.entity_type, |
| step_days=step_days, n_steps=0) |
|
|
| points: List[BacktestPoint] = [] |
| cursor = first_cursor |
| while cursor <= end: |
| cursor_iso = cursor.isoformat() |
| |
| window = [o for o in observations if o.day <= cursor_iso] |
| try: |
| calibration = calibrate_from_observations(window, config, |
| tenant_id=tenant_id) |
| result = run_pipeline(window, config, calibration, |
| tenant_id=tenant_id, top_n=50) |
| except Exception: |
| |
| |
| cursor += timedelta(days=step_days) |
| continue |
| sev_counts = {s: 0 for s in ("critical", "high", "medium", "low")} |
| |
| |
| |
| latest_state_by_entity: Dict[str, dict] = {} |
| for s in result.states: |
| cur = latest_state_by_entity.get(s["entity_id"]) |
| if cur is None or s["day"] > cur["day"]: |
| latest_state_by_entity[s["entity_id"]] = s |
| issues_dicts = [] |
| for issue in result.issues: |
| sev_counts[issue.severity] = sev_counts.get(issue.severity, 0) + 1 |
| state = latest_state_by_entity.get(issue.entity_id, {}) |
| signals = state.get("signals") or {} |
| issues_dicts.append({ |
| "entity_id": issue.entity_id, |
| "severity": issue.severity, |
| "score": issue.score, |
| "title": issue.title, |
| "signals": {k: signals.get(k) for k in |
| ("delta", "psi", "xi", "gamma", "kappa")}, |
| "details": _compute_event_details( |
| window, issue.entity_id, cursor_iso, config, |
| ), |
| }) |
| points.append(BacktestPoint( |
| cursor=cursor_iso, |
| n_states=len(result.states), |
| n_issues=len(result.issues), |
| severity_counts=sev_counts, |
| issues=issues_dicts, |
| )) |
| cursor += timedelta(days=step_days) |
|
|
| |
| all_flagged: set = set() |
| sev_totals: Dict[str, int] = {} |
| n_total = 0 |
| for p in points: |
| n_total += p.n_issues |
| for i in p.issues: |
| all_flagged.add(i["entity_id"]) |
| for s, n in p.severity_counts.items(): |
| sev_totals[s] = sev_totals.get(s, 0) + n |
| return BacktestResult( |
| tenant_id=tenant_id, |
| entity_type=config.entity_type, |
| step_days=step_days, |
| n_steps=len(points), |
| points=points, |
| n_issues_total=n_total, |
| n_unique_entities_ever_flagged=len(all_flagged), |
| per_severity_total=sev_totals, |
| ) |
|
|
|
|
| |
|
|
| _HTML_HEAD = ( |
| "<!doctype html><html><head><meta charset=\"utf-8\">" |
| "<title>OrgState backtest</title>" |
| "<style>" |
| "body{font-family:-apple-system,Segoe UI,Arial,sans-serif;" |
| "max-width:1120px;margin:32px auto;padding:0 16px;line-height:1.5;color:#222}" |
| "h1{font-size:1.6em;margin-bottom:0.2em}" |
| "h2{margin-top:1.8em;border-bottom:1px solid #ddd;padding-bottom:0.2em}" |
| "table{border-collapse:collapse;width:100%;margin:0.6em 0;font-size:0.94em}" |
| "th,td{border:1px solid #ddd;padding:4px 8px;text-align:left}" |
| "th{background:#f4f4f4}" |
| ".sev{display:inline-block;padding:1px 6px;border-radius:3px;" |
| "font-size:0.82em;font-weight:600;color:#fff}" |
| ".sev-critical{background:#a00}.sev-high{background:#d35400}" |
| ".sev-medium{background:#b8860b}.sev-low{background:#777}" |
| ".meta{color:#555;font-size:0.93em}" |
| "code{background:#f4f4f4;padding:1px 4px;border-radius:3px}" |
| ".bar{display:inline-block;height:10px;vertical-align:middle;margin-right:2px}" |
| ".bar-critical{background:#a00}.bar-high{background:#d35400}" |
| ".bar-medium{background:#b8860b}.bar-low{background:#777}" |
| ".cell-num{text-align:right;font-variant-numeric:tabular-nums}" |
| ".recurring{padding:14px 18px;background:#fff7e6;border:1px solid #f0c674;" |
| "border-radius:6px;margin:1em 0}" |
| ".recurring h2{border:none;margin-top:0;color:#7a4a00}" |
| ".recurring-badge{display:inline-block;padding:1px 8px;border-radius:3px;" |
| "background:#d35400;color:#fff;font-size:0.82em;font-weight:600;margin-left:6px}" |
| "</style></head><body>" |
| ) |
| _HTML_FOOT = "</body></html>" |
|
|
|
|
| def _esc(s): |
| import html as _html |
| return _html.escape(str(s), quote=True) |
|
|
|
|
| def _fmt_num(v) -> str: |
| if v is None: |
| return "—" |
| if abs(v) >= 100 or v == int(v): |
| return f"{v:.1f}" |
| return f"{v:.3f}" |
|
|
|
|
| def _fmt_delta_pct(v) -> str: |
| if v is None: |
| return "—" |
| sign = "+" if v >= 0 else "" |
| return f"{sign}{v:.1f}%" |
|
|
|
|
| def _delta_class(v) -> str: |
| """Colour-code the delta cell. We don't know per-metric whether the |
| direction is "higher_is_worse" without the config; the renderer |
| only sees the BacktestResult. So we pick a neutral convention: |
| positive delta -> warm (red), negative delta -> cool (green). On |
| a higher_is_worse metric this matches operator intuition; on |
| lower_is_worse the colour is inverted but the number is right. |
| Operators reading the page have the metric direction in context.""" |
| if v is None or v == 0: |
| return "" |
| return "delta-pos" if v > 0 else "delta-neg" |
|
|
|
|
| _SIGNAL_LEGEND = { |
| "delta": "Δ directional change (higher = worse direction)", |
| "psi": "ψ stability (lower = less stable)", |
| "xi": "ξ anomaly spike (higher = sharper outlier)", |
| "gamma": "γ SLA violation (higher = past target)", |
| "kappa": "κ coherence (lower = signals disagree)", |
| } |
|
|
|
|
| def _render_signals_row(signals: dict) -> str: |
| """Tiny per-signal row underneath the per-metric table. Shows the |
| five OrgState signals the engine actually combined into the drift |
| score, so the operator can tell at a glance which of them lit up.""" |
| if not signals: |
| return "" |
| parts = [] |
| for name, label in _SIGNAL_LEGEND.items(): |
| v = signals.get(name) |
| v_s = _fmt_num(v) if v is not None else "—" |
| parts.append( |
| f'<td title="{_esc(label)}"><code>{_esc(name)}</code> ' |
| f'<strong>{v_s}</strong></td>' |
| ) |
| return ( |
| '<table style="margin-top:0.4em"><tbody><tr>' |
| + "".join(parts) |
| + '</tr></tbody></table>' |
| ) |
|
|
|
|
| def _render_what_changed_blocks(flagged) -> str: |
| """One ``<div class="changed-block">`` per flagged event with a |
| per-metric baseline / recent / delta% table AND a signals row so |
| the operator sees both 'what the numbers did' and 'which signals |
| the engine combined into the alert'.""" |
| out = [] |
| for cursor, match in flagged: |
| details = match.get("details") or {} |
| signals = match.get("signals") or {} |
| if not details and not signals: |
| continue |
| rows = [] |
| for metric, d in details.items(): |
| delta = d.get("delta_pct") |
| rows.append( |
| "<tr>" |
| f"<td><code>{_esc(metric)}</code></td>" |
| f'<td class="cell-num">{_fmt_num(d.get("baseline_mean"))}</td>' |
| f'<td class="cell-num">{_fmt_num(d.get("recent_mean"))}</td>' |
| f'<td class="cell-num {_delta_class(delta)}">' |
| f"{_fmt_delta_pct(delta)}</td>" |
| "</tr>" |
| ) |
| body = ( |
| '<table><thead><tr>' |
| '<th>Metric</th><th>Baseline mean</th><th>Recent mean</th>' |
| '<th>Delta</th>' |
| '</tr></thead><tbody>' |
| + "".join(rows) |
| + '</tbody></table>' |
| ) if rows else "" |
| out.append( |
| '<div class="changed-block">' |
| f'<h3><code>{_esc(cursor)}</code> · ' |
| f'<span class="sev sev-{_esc(match["severity"])}">' |
| f'{_esc(match["severity"])}</span></h3>' |
| + body |
| + _render_signals_row(signals) |
| + '</div>' |
| ) |
| return "".join(out) |
|
|
|
|
| def _slug(entity_id: str) -> str: |
| """Make a filesystem- and URL-safe slug from an arbitrary entity id. |
| The Salesforce ids that drove this stage are already safe, but |
| user-controlled ids could contain '/', '..', spaces — and we are |
| about to use this as a filename + an href.""" |
| return "".join(c if c.isalnum() or c in "._-" else "_" |
| for c in entity_id) |
|
|
|
|
| def _bar_row(point: BacktestPoint, max_n: int) -> str: |
| """One row of the timeline table: cursor + stacked severity bars + counts.""" |
| parts = [] |
| for sev in ("critical", "high", "medium", "low"): |
| n = point.severity_counts.get(sev, 0) |
| if n > 0 and max_n > 0: |
| width = max(2, int(120 * n / max_n)) |
| parts.append(f'<span class="bar bar-{sev}" ' |
| f'style="width:{width}px" title="{n} {sev}"></span>') |
| bar = "".join(parts) if parts else '<span class="meta">—</span>' |
| return ( |
| "<tr>" |
| f"<td><code>{_esc(point.cursor)}</code></td>" |
| f'<td class="cell-num">{point.n_issues}</td>' |
| f"<td>{bar}</td>" |
| f'<td class="cell-num">{point.severity_counts.get("critical", 0)}</td>' |
| f'<td class="cell-num">{point.severity_counts.get("high", 0)}</td>' |
| f'<td class="cell-num">{point.severity_counts.get("medium", 0)}</td>' |
| f'<td class="cell-num">{point.severity_counts.get("low", 0)}</td>' |
| "</tr>" |
| ) |
|
|
|
|
| def _entity_cell(entity_id: str, linked_entities) -> str: |
| """Render an entity-id table cell, optionally linking to its |
| per-entity drill-down page (Stage 18). |
| |
| ``linked_entities`` is the set of entity ids that actually have a |
| rendered drill-down page (passed by the CLI). Entities not in the |
| set render as plain code — otherwise we'd link to 404s for |
| one-off-flagged entities whose pages were never generated. |
| """ |
| eid = _esc(entity_id) |
| if not linked_entities or entity_id not in linked_entities: |
| return f"<code>{eid}</code>" |
| return f'<a href="entities/{_esc(_slug(entity_id))}.html"><code>{eid}</code></a>' |
|
|
|
|
| def _recurring_section_html(recurring: List[RecurringEntity], |
| linked_entities=None) -> str: |
| if not recurring: |
| return "" |
| rows = [] |
| for r in recurring: |
| rows.append( |
| "<tr>" |
| f"<td>{_entity_cell(r.entity_id, linked_entities)}</td>" |
| f'<td class="cell-num">{r.n_events}</td>' |
| f'<td><span class="sev sev-{_esc(r.top_severity)}">' |
| f"{_esc(r.top_severity)}</span></td>" |
| f"<td>{_esc(r.first_cursor)}</td>" |
| f"<td>{_esc(r.last_cursor)}</td>" |
| "</tr>" |
| ) |
| return ( |
| '<div class="recurring">' |
| '<h2>Recurring drift <span class="recurring-badge">priority</span></h2>' |
| '<p class="meta">Entities flagged in three or more distinct backtest ' |
| 'windows. A repeating problem on the same entity is structurally ' |
| 'different from a one-off — these are the targets for proactive ' |
| 'intervention.</p>' |
| '<table><thead><tr>' |
| '<th>Entity</th><th>Events</th><th>Worst severity</th>' |
| '<th>First</th><th>Last</th>' |
| '</tr></thead><tbody>' |
| + "".join(rows) |
| + '</tbody></table></div>' |
| ) |
|
|
|
|
| |
|
|
| def render_entity_detail_html(result: BacktestResult, |
| entity_id: str) -> str: |
| """Standalone HTML page for a single entity — the drill-down the |
| recurrence callout (Stage 17) points to. |
| |
| Shows: |
| * the entity id + summary (n events, worst severity, first/last) |
| * a CSS-only sparkline: one cell per backtest cursor, height |
| proportional to score, colour by severity (gray when below |
| threshold = not flagged at that cursor) |
| * a chronological table of cursors at which the entity was |
| flagged, with score / severity / title |
| |
| Pure stdlib; CSS-only graphics. Same visual language as the |
| backtest page so the operator does not have to context-switch. |
| """ |
| |
| |
| |
| per_cursor = [] |
| for p in result.points: |
| match = next((i for i in p.issues if i["entity_id"] == entity_id), |
| None) |
| per_cursor.append((p.cursor, match)) |
|
|
| flagged = [(c, m) for c, m in per_cursor if m is not None] |
| if not flagged: |
| body = ( |
| f"<h1>{_esc(entity_id)}</h1>" |
| '<p class="meta">This entity was not flagged in any backtest ' |
| 'window — nothing to drill into.</p>' |
| ) |
| return _HTML_HEAD + body + _HTML_FOOT |
|
|
| n_events = len(flagged) |
| first_cursor = flagged[0][0] |
| last_cursor = flagged[-1][0] |
| severities = [m["severity"] for _, m in flagged] |
| top_severity = min(severities, key=lambda s: _SEV_RANK.get(s, 99)) |
|
|
| |
| |
| |
| |
| spark_cells = [] |
| for cursor, match in per_cursor: |
| if match is None: |
| spark_cells.append( |
| f'<div class="spark-cell" title="{_esc(cursor)} — not flagged">' |
| '</div>' |
| ) |
| else: |
| sev = match["severity"] |
| score = float(match["score"]) |
| h = max(2, min(32, int(round(score * 32)))) |
| spark_cells.append( |
| f'<div class="spark-cell" ' |
| f'title="{_esc(cursor)} — {_esc(sev)} (score {score:.3f})">' |
| f'<div class="spark-bar spark-{_esc(sev)}" ' |
| f'style="height:{h}px"></div>' |
| '</div>' |
| ) |
| sparkline = ( |
| '<div class="spark-extra">' |
| + "".join(spark_cells) |
| + '</div>' |
| ) |
|
|
| |
| rows = [] |
| for cursor, match in flagged: |
| rows.append( |
| "<tr>" |
| f"<td><code>{_esc(cursor)}</code></td>" |
| f'<td><span class="sev sev-{_esc(match["severity"])}">' |
| f'{_esc(match["severity"])}</span></td>' |
| f'<td class="cell-num">{float(match["score"]):.3f}</td>' |
| f"<td>{_esc(match['title'])}</td>" |
| "</tr>" |
| ) |
|
|
| |
| |
| |
| what_changed = _render_what_changed_blocks(flagged) |
|
|
| extra_css = ( |
| "<style>" |
| ".spark-extra{display:flex;align-items:flex-end;gap:2px;" |
| "height:40px;padding:6px 8px;background:#f9f9f9;" |
| "border:1px solid #e0e0e0;border-radius:4px;overflow-x:auto}" |
| ".spark-cell{flex:0 0 6px;height:34px;display:flex;align-items:flex-end}" |
| ".spark-bar{width:6px;border-radius:1px}" |
| ".spark-critical{background:#a00}.spark-high{background:#d35400}" |
| ".spark-medium{background:#b8860b}.spark-low{background:#777}" |
| ".back-link{display:inline-block;margin-bottom:1em;color:#555;" |
| "text-decoration:none;font-size:0.95em}" |
| ".back-link:hover{text-decoration:underline}" |
| ".changed-block{padding:10px 14px;background:#fcfcfc;" |
| "border:1px solid #e0e0e0;border-radius:4px;margin:0.8em 0}" |
| ".changed-block h3{margin:0 0 0.4em 0;font-size:1em;color:#444}" |
| ".delta-pos{color:#a00}.delta-neg{color:#2c8a3e}" |
| "</style>" |
| ) |
|
|
| body = ( |
| '<a class="back-link" href="../backtest.html">← back to backtest</a>' |
| f"<h1>{_esc(entity_id)}</h1>" |
| '<ul>' |
| f'<li><strong>Events:</strong> {n_events}</li>' |
| f'<li><strong>Worst severity:</strong> ' |
| f'<span class="sev sev-{_esc(top_severity)}">' |
| f'{_esc(top_severity)}</span></li>' |
| f'<li><strong>First:</strong> <code>{_esc(first_cursor)}</code></li>' |
| f'<li><strong>Last:</strong> <code>{_esc(last_cursor)}</code></li>' |
| '</ul>' |
| '<h2>Score trajectory</h2>' |
| '<p class="meta">One bar per backtest cursor. Bar height is the ' |
| 'drift score (0..1); colour is the severity. Empty cells are ' |
| 'cursors where this entity was below threshold.</p>' |
| + sparkline |
| + '<h2>Flagged events</h2>' |
| + '<table><thead><tr>' |
| '<th>Cursor</th><th>Severity</th><th>Score</th><th>Title</th>' |
| '</tr></thead><tbody>' |
| + "".join(rows) |
| + '</tbody></table>' |
| + (('<h2>What changed</h2>' |
| '<p class="meta">Per-metric baseline (the entity\'s own ' |
| "history before each event) vs recent (the window that " |
| "tripped the engine). This is the operator-facing 'why' " |
| "behind the bare drift score.</p>" |
| + what_changed) if what_changed else "") |
| ) |
|
|
| return _HTML_HEAD.replace("</style>", extra_css + "</style>") + body + _HTML_FOOT |
|
|
|
|
| def render_html(result: BacktestResult, |
| min_recurring_events: int = 3, |
| link_entities=False) -> str: |
| """Render the backtest HTML. |
| |
| ``link_entities`` can be: |
| * False (default) — entity ids render as plain code, no links |
| * True — every entity id is wrapped in a link (use this only if |
| you know every entity has a drill-down page; otherwise prefer |
| a set, below) |
| * a set / collection of entity ids — only those ids are linked, |
| the rest render as plain code. This avoids 404s when only the |
| recurring entities have drill-down pages rendered. |
| """ |
| """A single self-contained HTML page: rollup at top, **recurring |
| drift callout (Stage 17)**, timeline table with stacked severity |
| bars, then a per-entity total table. |
| """ |
| if not result.points: |
| return ( |
| _HTML_HEAD |
| + "<h1>OrgState backtest</h1>" |
| + '<p class="meta">No steps produced — the dataset does not have ' |
| "enough history for the engine's baseline+recent windows.</p>" |
| + _HTML_FOOT |
| ) |
|
|
| |
| |
| |
| if link_entities is True: |
| linked = {i["entity_id"] for p in result.points for i in p.issues} |
| elif link_entities is False or link_entities is None: |
| linked = None |
| else: |
| linked = set(link_entities) |
|
|
| |
| rollup_parts = [ |
| f"<li><strong>Entity type:</strong> {_esc(result.entity_type)}</li>", |
| f"<li><strong>Step:</strong> every {result.step_days} day(s)</li>", |
| f"<li><strong>Steps run:</strong> {result.n_steps}</li>", |
| f"<li><strong>Total issue-events:</strong> {result.n_issues_total}</li>", |
| (f"<li><strong>Unique entities ever flagged:</strong> " |
| f"{result.n_unique_entities_ever_flagged}</li>"), |
| ] |
| sev_breakdown = " · ".join( |
| f"{result.per_severity_total.get(s, 0)} {s}" |
| for s in ("critical", "high", "medium", "low") |
| ) |
| rollup_parts.append( |
| f"<li><strong>By severity:</strong> {sev_breakdown}</li>" |
| ) |
|
|
| |
| max_per_step = max(p.n_issues for p in result.points) if result.points else 0 |
| rows = "".join(_bar_row(p, max_per_step) for p in result.points) |
|
|
| |
| by_entity: Dict[str, Dict[str, int]] = {} |
| for p in result.points: |
| for issue in p.issues: |
| eid = issue["entity_id"] |
| d = by_entity.setdefault(eid, {"total": 0, "critical": 0, |
| "high": 0, "medium": 0, "low": 0}) |
| d["total"] += 1 |
| d[issue["severity"]] += 1 |
| entity_rows = "".join( |
| "<tr>" |
| f"<td>{_entity_cell(eid, linked)}</td>" |
| f'<td class="cell-num">{d["total"]}</td>' |
| f'<td class="cell-num">{d["critical"]}</td>' |
| f'<td class="cell-num">{d["high"]}</td>' |
| f'<td class="cell-num">{d["medium"]}</td>' |
| f'<td class="cell-num">{d["low"]}</td>' |
| "</tr>" |
| for eid, d in sorted(by_entity.items(), |
| key=lambda kv: -kv[1]["total"]) |
| ) |
|
|
| recurring = find_recurring_entities(result, min_events=min_recurring_events) |
|
|
| return ( |
| _HTML_HEAD |
| + f"<h1>OrgState backtest — <code>{_esc(result.tenant_id)}</code></h1>" |
| + "<ul>" + "".join(rollup_parts) + "</ul>" |
| + _recurring_section_html(recurring, linked_entities=linked) |
| + "<h2>Timeline (issues per cursor)</h2>" |
| + "<table><thead><tr>" |
| "<th>Cursor</th><th>Issues</th><th>Severity bars</th>" |
| "<th>Crit</th><th>High</th><th>Med</th><th>Low</th>" |
| "</tr></thead><tbody>" |
| + rows |
| + "</tbody></table>" |
| + "<h2>Per-entity totals (most flagged first)</h2>" |
| + ("<table><thead><tr>" |
| "<th>Entity</th><th>Total</th><th>Crit</th><th>High</th>" |
| "<th>Med</th><th>Low</th></tr></thead><tbody>" |
| + entity_rows |
| + "</tbody></table>" if entity_rows |
| else '<p class="meta">No entities flagged in any window.</p>') |
| + _HTML_FOOT |
| ) |
|
|
|
|
| |
|
|
| def _cli(argv: Optional[List[str]] = None) -> int: |
| parser = argparse.ArgumentParser( |
| prog="python -m delivery.backtest", |
| description="Rolling backtest of the OrgState engine on a customer CSV.", |
| ) |
| parser.add_argument("--vertical", required=True) |
| parser.add_argument("--csv", required=True) |
| parser.add_argument("--out", required=True, |
| help="output directory for backtest.html + backtest.json") |
| parser.add_argument("--entity-type", default=None) |
| parser.add_argument("--step", type=int, default=7, |
| help="step the cursor forward this many days each iteration") |
| parser.add_argument("--top-n-owners", type=int, default=None, |
| help="(salesforce) cap owners by total accounts") |
| parser.add_argument( |
| "--render-entities", action="store_true", |
| help="also write entities/<entity_id>.html drill-down pages and " |
| "link them from the backtest report (Stage 18)", |
| ) |
| parser.add_argument( |
| "--entity-min-events", type=int, default=1, |
| help="(with --render-entities) only render drill-downs for entities " |
| "flagged at least this many times (default 1 = all flagged ones)", |
| ) |
| args = parser.parse_args(argv) |
|
|
| vcfg = get_vertical_config(args.vertical) |
| entity_type = args.entity_type or next(iter(vcfg.entity_types)) |
| cfg = vcfg.entity_type(entity_type) |
|
|
| |
| from .real_pilot import _load_via_vertical |
| obs = _load_via_vertical(args.vertical, args.csv, |
| top_n_owners=args.top_n_owners, |
| entity_type=entity_type) |
| if not obs: |
| print(json.dumps({"error": f"zero observations from {args.csv!r}"}), |
| flush=True) |
| return 1 |
|
|
| result = run_backtest(obs, cfg, step_days=args.step, |
| tenant_id=f"backtest_{args.vertical}") |
| out_dir = Path(args.out) |
| out_dir.mkdir(parents=True, exist_ok=True) |
| html_path = out_dir / "backtest.html" |
| json_path = out_dir / "backtest.json" |
|
|
| rendered_entities = [] |
| if args.render_entities: |
| |
| from collections import Counter |
| per_entity_events = Counter() |
| for p in result.points: |
| for issue in p.issues: |
| per_entity_events[issue["entity_id"]] += 1 |
| targets = sorted(eid for eid, n in per_entity_events.items() |
| if n >= args.entity_min_events) |
| ent_dir = out_dir / "entities" |
| ent_dir.mkdir(exist_ok=True) |
| for eid in targets: |
| (ent_dir / f"{_slug(eid)}.html").write_text( |
| render_entity_detail_html(result, eid), encoding="utf-8", |
| ) |
| rendered_entities.append(eid) |
|
|
| html_path.write_text( |
| render_html(result, |
| link_entities=set(rendered_entities) if rendered_entities |
| else False), |
| encoding="utf-8", |
| ) |
| |
| from dataclasses import asdict |
| json_path.write_text(json.dumps(asdict(result), indent=2, |
| default=str, sort_keys=True), |
| encoding="utf-8") |
|
|
| print(json.dumps({ |
| "vertical": args.vertical, |
| "entity_type": entity_type, |
| "n_observations": len(obs), |
| "n_steps": result.n_steps, |
| "n_issues_total": result.n_issues_total, |
| "n_unique_entities_ever_flagged": result.n_unique_entities_ever_flagged, |
| "per_severity_total": result.per_severity_total, |
| "html": str(html_path.resolve()), |
| "json": str(json_path.resolve()), |
| "entity_pages": len(rendered_entities), |
| }, indent=2, sort_keys=True)) |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(_cli()) |
|
|