""" delivery.backtest — "what would we have caught had we been running?" (Stage 16). Stage 15 ran the engine once on a Salesforce export and surfaced zero current issues, because the LAST 14 days of that file were stable. But the per-state scores hinted at historical drift events. This module turns that hint into a measurable timeline: it walks a cursor forward day-by-day across the dataset, runs the engine on the window ending at each cursor, and records what the engine would have alerted on. This is operationally useful for two reasons: 1. **Pitch**: a customer who hands us an export can see the engine replayed against their own history — "if you'd been running this since day 1, here are the events you would have been told about." 2. **Threshold tuning**: backtest counts per severity per month show whether the defaults are too noisy or too quiet on this customer's data, before flipping the switch on live alerting. stdlib only. Calibration is re-derived per cursor from the data up to that cursor (the engine never gets to see the future). """ from __future__ import annotations import argparse import json from dataclasses import dataclass, field from datetime import date, timedelta from pathlib import Path from typing import Dict, List, Optional from core import calibrate_from_observations, run_pipeline from core.config import EntityTypeConfig from core.pipeline import Observation from verticals import get_vertical_config @dataclass class BacktestPoint: """One step in the rolling backtest. ``cursor`` is the day the engine is "as of"; the issues are what it would have alerted on at that point. Issues are stored as plain dicts (asdict of DriftIssue) so downstream rendering doesn't need to import from core.""" cursor: str n_states: int n_issues: int severity_counts: Dict[str, int] = field(default_factory=dict) issues: List[dict] = field(default_factory=list) @dataclass class BacktestResult: tenant_id: str entity_type: str step_days: int n_steps: int points: List[BacktestPoint] = field(default_factory=list) # rollups for convenience — derived from points but useful in viz n_issues_total: int = 0 n_unique_entities_ever_flagged: int = 0 per_severity_total: Dict[str, int] = field(default_factory=dict) @dataclass class RecurringEntity: """An entity that drifted in multiple distinct windows. Stage 17 — the single most actionable signal that came out of running the backtest on real data: an owner who keeps drifting is a systemic issue, not a one-off.""" entity_id: str n_events: int first_cursor: str last_cursor: str top_severity: str # worst severity ever seen cursors: List[str] = field(default_factory=list) # Severity ordering, worst -> best, used to summarise the "top severity # ever seen" per recurring entity. _SEV_RANK = {"critical": 0, "high": 1, "medium": 2, "low": 3} def find_recurring_entities( result: BacktestResult, *, min_events: int = 3, ) -> List[RecurringEntity]: """Return entities flagged in at least ``min_events`` distinct backtest windows, ordered most-recurring first (ties broken by recency of last event, then by entity_id for stability). A "recurring" entity is structurally different from a one-off: the same problem keeps coming back, which is exactly what an operator wants surfaced for proactive intervention. """ from collections import defaultdict per_entity_cursors: Dict[str, List[str]] = defaultdict(list) per_entity_worst: Dict[str, str] = {} for p in result.points: for issue in p.issues: eid = issue["entity_id"] per_entity_cursors[eid].append(p.cursor) sev = issue["severity"] current = per_entity_worst.get(eid) if current is None or _SEV_RANK.get(sev, 99) < _SEV_RANK.get(current, 99): per_entity_worst[eid] = sev out: List[RecurringEntity] = [] for eid, cursors in per_entity_cursors.items(): if len(cursors) >= min_events: ordered = sorted(cursors) out.append(RecurringEntity( entity_id=eid, n_events=len(cursors), first_cursor=ordered[0], last_cursor=ordered[-1], top_severity=per_entity_worst[eid], cursors=ordered, )) out.sort(key=lambda r: (-r.n_events, _negate_iso(r.last_cursor), r.entity_id)) return out def _negate_iso(iso_day: str) -> str: """Helper so ``sort`` treats more-recent cursors as 'smaller'. We can't negate a string, so we exploit ISO-8601's lexicographic ordering and use a reverse-by-padding trick: a higher day produces a 'smaller' key after we subtract from a sentinel. The lazy way is to use a separate reverse list, but we want it inline.""" # YYYY-MM-DD is 10 chars; just translate digits so '9' < '0' etc. flip = str.maketrans("0123456789", "9876543210") return iso_day.translate(flip) def _compute_event_details( observations: List[Observation], entity_id: str, cursor_iso: str, config: EntityTypeConfig, ) -> Dict[str, dict]: """Per-metric ``{baseline_mean, recent_mean, delta_pct}`` for one ``(entity, cursor)`` event (Stage 19). This is the operator-facing explanation of *why* the engine flagged the entity at that cursor — far more actionable than the bare drift score. Windows use the same boundaries the engine itself used: recent = (cursor - recent_window + 1, cursor] baseline = (cursor - recent_window - baseline_lag - baseline_window + 1, cursor - recent_window - baseline_lag] """ cursor = date.fromisoformat(cursor_iso) recent_start = cursor - timedelta(days=config.recent_window - 1) baseline_end = recent_start - timedelta(days=config.baseline_lag + 1) baseline_start = baseline_end - timedelta(days=config.baseline_window - 1) rs, re_, bs, be = (recent_start.isoformat(), cursor_iso, baseline_start.isoformat(), baseline_end.isoformat()) metric_names = [m.name for m in config.metrics] baseline_vals: Dict[str, list] = {m: [] for m in metric_names} recent_vals: Dict[str, list] = {m: [] for m in metric_names} for o in observations: if o.entity_id != entity_id: continue if rs <= o.day <= re_: for m in metric_names: v = o.values.get(m) if v is not None: recent_vals[m].append(float(v)) elif bs <= o.day <= be: for m in metric_names: v = o.values.get(m) if v is not None: baseline_vals[m].append(float(v)) out: Dict[str, dict] = {} for m in metric_names: b = baseline_vals[m] r = recent_vals[m] bm = (sum(b) / len(b)) if b else None rm = (sum(r) / len(r)) if r else None if bm is not None and rm is not None and bm != 0: delta_pct = (rm - bm) / abs(bm) * 100.0 else: delta_pct = None out[m] = { "baseline_mean": bm, "recent_mean": rm, "delta_pct": delta_pct, } return out def run_backtest( observations: List[Observation], config: EntityTypeConfig, *, step_days: int = 7, tenant_id: str = "backtest", ) -> BacktestResult: """Walk a cursor forward in ``step_days`` increments. At each cursor, re-calibrate on the observations up to cursor and run the pipeline. The engine never sees the future at any cursor — each step is a causal as-of-that-day snapshot. Calibration is re-derived rather than reused because what the engine deems "normal" itself evolves. Each issue dict gains a ``details`` field (Stage 19) — per-metric baseline-vs-recent comparison that powers the "What changed" explanation on the entity drill-down page. """ if not observations: return BacktestResult(tenant_id=tenant_id, entity_type=config.entity_type, step_days=step_days, n_steps=0) days = sorted({o.day for o in observations}) start_iso = days[0] end_iso = days[-1] start = date.fromisoformat(start_iso) end = date.fromisoformat(end_iso) # The first useful cursor needs at least baseline + lag + recent days # of data behind it; before that, the engine cannot compute drift. min_history = (config.baseline_window + config.baseline_lag + config.recent_window) first_cursor = start + timedelta(days=min_history - 1) if first_cursor > end: return BacktestResult(tenant_id=tenant_id, entity_type=config.entity_type, step_days=step_days, n_steps=0) points: List[BacktestPoint] = [] cursor = first_cursor while cursor <= end: cursor_iso = cursor.isoformat() # observations the engine sees: everything up to and including cursor window = [o for o in observations if o.day <= cursor_iso] try: calibration = calibrate_from_observations(window, config, tenant_id=tenant_id) result = run_pipeline(window, config, calibration, tenant_id=tenant_id, top_n=50) except Exception: # a degenerate window (no variance, no rows for some entity) # should not crash the whole backtest — record an empty point cursor += timedelta(days=step_days) continue sev_counts = {s: 0 for s in ("critical", "high", "medium", "low")} # latest state per entity carries the signals breakdown the engine # actually used to make its decision — we surface it on each issue # so the "What changed" view is grounded in the same numbers. latest_state_by_entity: Dict[str, dict] = {} for s in result.states: cur = latest_state_by_entity.get(s["entity_id"]) if cur is None or s["day"] > cur["day"]: latest_state_by_entity[s["entity_id"]] = s issues_dicts = [] for issue in result.issues: sev_counts[issue.severity] = sev_counts.get(issue.severity, 0) + 1 state = latest_state_by_entity.get(issue.entity_id, {}) signals = state.get("signals") or {} issues_dicts.append({ "entity_id": issue.entity_id, "severity": issue.severity, "score": issue.score, "title": issue.title, "signals": {k: signals.get(k) for k in ("delta", "psi", "xi", "gamma", "kappa")}, "details": _compute_event_details( window, issue.entity_id, cursor_iso, config, ), }) points.append(BacktestPoint( cursor=cursor_iso, n_states=len(result.states), n_issues=len(result.issues), severity_counts=sev_counts, issues=issues_dicts, )) cursor += timedelta(days=step_days) # rollups all_flagged: set = set() sev_totals: Dict[str, int] = {} n_total = 0 for p in points: n_total += p.n_issues for i in p.issues: all_flagged.add(i["entity_id"]) for s, n in p.severity_counts.items(): sev_totals[s] = sev_totals.get(s, 0) + n return BacktestResult( tenant_id=tenant_id, entity_type=config.entity_type, step_days=step_days, n_steps=len(points), points=points, n_issues_total=n_total, n_unique_entities_ever_flagged=len(all_flagged), per_severity_total=sev_totals, ) # --- HTML renderer ------------------------------------------------------ _HTML_HEAD = ( "" "OrgState backtest" "" ) _HTML_FOOT = "" def _esc(s): import html as _html return _html.escape(str(s), quote=True) def _fmt_num(v) -> str: if v is None: return "—" if abs(v) >= 100 or v == int(v): return f"{v:.1f}" return f"{v:.3f}" def _fmt_delta_pct(v) -> str: if v is None: return "—" sign = "+" if v >= 0 else "" return f"{sign}{v:.1f}%" def _delta_class(v) -> str: """Colour-code the delta cell. We don't know per-metric whether the direction is "higher_is_worse" without the config; the renderer only sees the BacktestResult. So we pick a neutral convention: positive delta -> warm (red), negative delta -> cool (green). On a higher_is_worse metric this matches operator intuition; on lower_is_worse the colour is inverted but the number is right. Operators reading the page have the metric direction in context.""" if v is None or v == 0: return "" return "delta-pos" if v > 0 else "delta-neg" _SIGNAL_LEGEND = { "delta": "Δ directional change (higher = worse direction)", "psi": "ψ stability (lower = less stable)", "xi": "ξ anomaly spike (higher = sharper outlier)", "gamma": "γ SLA violation (higher = past target)", "kappa": "κ coherence (lower = signals disagree)", } def _render_signals_row(signals: dict) -> str: """Tiny per-signal row underneath the per-metric table. Shows the five OrgState signals the engine actually combined into the drift score, so the operator can tell at a glance which of them lit up.""" if not signals: return "" parts = [] for name, label in _SIGNAL_LEGEND.items(): v = signals.get(name) v_s = _fmt_num(v) if v is not None else "—" parts.append( f'{_esc(name)} ' f'{v_s}' ) return ( '' + "".join(parts) + '
' ) def _render_what_changed_blocks(flagged) -> str: """One ``
`` per flagged event with a per-metric baseline / recent / delta% table AND a signals row so the operator sees both 'what the numbers did' and 'which signals the engine combined into the alert'.""" out = [] for cursor, match in flagged: details = match.get("details") or {} signals = match.get("signals") or {} if not details and not signals: continue rows = [] for metric, d in details.items(): delta = d.get("delta_pct") rows.append( "" f"{_esc(metric)}" f'{_fmt_num(d.get("baseline_mean"))}' f'{_fmt_num(d.get("recent_mean"))}' f'' f"{_fmt_delta_pct(delta)}" "" ) body = ( '' '' '' '' + "".join(rows) + '
MetricBaseline meanRecent meanDelta
' ) if rows else "" out.append( '
' f'

{_esc(cursor)} · ' f'' f'{_esc(match["severity"])}

' + body + _render_signals_row(signals) + '
' ) return "".join(out) def _slug(entity_id: str) -> str: """Make a filesystem- and URL-safe slug from an arbitrary entity id. The Salesforce ids that drove this stage are already safe, but user-controlled ids could contain '/', '..', spaces — and we are about to use this as a filename + an href.""" return "".join(c if c.isalnum() or c in "._-" else "_" for c in entity_id) def _bar_row(point: BacktestPoint, max_n: int) -> str: """One row of the timeline table: cursor + stacked severity bars + counts.""" parts = [] for sev in ("critical", "high", "medium", "low"): n = point.severity_counts.get(sev, 0) if n > 0 and max_n > 0: width = max(2, int(120 * n / max_n)) parts.append(f'') bar = "".join(parts) if parts else '' return ( "" f"{_esc(point.cursor)}" f'{point.n_issues}' f"{bar}" f'{point.severity_counts.get("critical", 0)}' f'{point.severity_counts.get("high", 0)}' f'{point.severity_counts.get("medium", 0)}' f'{point.severity_counts.get("low", 0)}' "" ) def _entity_cell(entity_id: str, linked_entities) -> str: """Render an entity-id table cell, optionally linking to its per-entity drill-down page (Stage 18). ``linked_entities`` is the set of entity ids that actually have a rendered drill-down page (passed by the CLI). Entities not in the set render as plain code — otherwise we'd link to 404s for one-off-flagged entities whose pages were never generated. """ eid = _esc(entity_id) if not linked_entities or entity_id not in linked_entities: return f"{eid}" return f'{eid}' def _recurring_section_html(recurring: List[RecurringEntity], linked_entities=None) -> str: if not recurring: return "" rows = [] for r in recurring: rows.append( "" f"{_entity_cell(r.entity_id, linked_entities)}" f'{r.n_events}' f'' f"{_esc(r.top_severity)}" f"{_esc(r.first_cursor)}" f"{_esc(r.last_cursor)}" "" ) return ( '
' '

Recurring drift priority

' '

Entities flagged in three or more distinct backtest ' 'windows. A repeating problem on the same entity is structurally ' 'different from a one-off — these are the targets for proactive ' 'intervention.

' '' '' '' '' + "".join(rows) + '
EntityEventsWorst severityFirstLast
' ) # --- per-entity drill-down (Stage 18) ---------------------------------- def render_entity_detail_html(result: BacktestResult, entity_id: str) -> str: """Standalone HTML page for a single entity — the drill-down the recurrence callout (Stage 17) points to. Shows: * the entity id + summary (n events, worst severity, first/last) * a CSS-only sparkline: one cell per backtest cursor, height proportional to score, colour by severity (gray when below threshold = not flagged at that cursor) * a chronological table of cursors at which the entity was flagged, with score / severity / title Pure stdlib; CSS-only graphics. Same visual language as the backtest page so the operator does not have to context-switch. """ # collect: for every cursor in the backtest, the entity's issue # (None if not flagged). The drill-down's whole story sits in # this list. per_cursor = [] for p in result.points: match = next((i for i in p.issues if i["entity_id"] == entity_id), None) per_cursor.append((p.cursor, match)) flagged = [(c, m) for c, m in per_cursor if m is not None] if not flagged: body = ( f"

{_esc(entity_id)}

" '

This entity was not flagged in any backtest ' 'window — nothing to drill into.

' ) return _HTML_HEAD + body + _HTML_FOOT n_events = len(flagged) first_cursor = flagged[0][0] last_cursor = flagged[-1][0] severities = [m["severity"] for _, m in flagged] top_severity = min(severities, key=lambda s: _SEV_RANK.get(s, 99)) # CSS-only sparkline. Each cursor is one fixed-width cell. Cells # where the entity was flagged get a coloured bar at the right # height for the score; other cells stay empty. # Heights are in px, range up to 32 (0..1 -> 0..32). spark_cells = [] for cursor, match in per_cursor: if match is None: spark_cells.append( f'
' '
' ) else: sev = match["severity"] score = float(match["score"]) h = max(2, min(32, int(round(score * 32)))) spark_cells.append( f'
' f'
' '
' ) sparkline = ( '
' + "".join(spark_cells) + '
' ) # event table — chronological rows = [] for cursor, match in flagged: rows.append( "" f"{_esc(cursor)}" f'' f'{_esc(match["severity"])}' f'{float(match["score"]):.3f}' f"{_esc(match['title'])}" "" ) # "What changed" panels (Stage 19) — one per flagged event, showing # per-metric baseline vs recent. This is the operator-facing "why" # behind the bare drift score. what_changed = _render_what_changed_blocks(flagged) extra_css = ( "" ) body = ( '← back to backtest' f"

{_esc(entity_id)}

" '' '

Score trajectory

' '

One bar per backtest cursor. Bar height is the ' 'drift score (0..1); colour is the severity. Empty cells are ' 'cursors where this entity was below threshold.

' + sparkline + '

Flagged events

' + '' '' '' + "".join(rows) + '
CursorSeverityScoreTitle
' + (('

What changed

' '

Per-metric baseline (the entity\'s own ' "history before each event) vs recent (the window that " "tripped the engine). This is the operator-facing 'why' " "behind the bare drift score.

" + what_changed) if what_changed else "") ) return _HTML_HEAD.replace("", extra_css + "") + body + _HTML_FOOT def render_html(result: BacktestResult, min_recurring_events: int = 3, link_entities=False) -> str: """Render the backtest HTML. ``link_entities`` can be: * False (default) — entity ids render as plain code, no links * True — every entity id is wrapped in a link (use this only if you know every entity has a drill-down page; otherwise prefer a set, below) * a set / collection of entity ids — only those ids are linked, the rest render as plain code. This avoids 404s when only the recurring entities have drill-down pages rendered. """ """A single self-contained HTML page: rollup at top, **recurring drift callout (Stage 17)**, timeline table with stacked severity bars, then a per-entity total table. """ if not result.points: return ( _HTML_HEAD + "

OrgState backtest

" + '

No steps produced — the dataset does not have ' "enough history for the engine's baseline+recent windows.

" + _HTML_FOOT ) # Resolve link_entities up front so every cell-rendering helper can # see it. True -> link everyone; False/None -> link no one; set -> # link only those ids. if link_entities is True: linked = {i["entity_id"] for p in result.points for i in p.issues} elif link_entities is False or link_entities is None: linked = None else: linked = set(link_entities) # rollup rollup_parts = [ f"
  • Entity type: {_esc(result.entity_type)}
  • ", f"
  • Step: every {result.step_days} day(s)
  • ", f"
  • Steps run: {result.n_steps}
  • ", f"
  • Total issue-events: {result.n_issues_total}
  • ", (f"
  • Unique entities ever flagged: " f"{result.n_unique_entities_ever_flagged}
  • "), ] sev_breakdown = " · ".join( f"{result.per_severity_total.get(s, 0)} {s}" for s in ("critical", "high", "medium", "low") ) rollup_parts.append( f"
  • By severity: {sev_breakdown}
  • " ) # timeline table max_per_step = max(p.n_issues for p in result.points) if result.points else 0 rows = "".join(_bar_row(p, max_per_step) for p in result.points) # per-entity total by_entity: Dict[str, Dict[str, int]] = {} for p in result.points: for issue in p.issues: eid = issue["entity_id"] d = by_entity.setdefault(eid, {"total": 0, "critical": 0, "high": 0, "medium": 0, "low": 0}) d["total"] += 1 d[issue["severity"]] += 1 entity_rows = "".join( "" f"{_entity_cell(eid, linked)}" f'{d["total"]}' f'{d["critical"]}' f'{d["high"]}' f'{d["medium"]}' f'{d["low"]}' "" for eid, d in sorted(by_entity.items(), key=lambda kv: -kv[1]["total"]) ) recurring = find_recurring_entities(result, min_events=min_recurring_events) return ( _HTML_HEAD + f"

    OrgState backtest — {_esc(result.tenant_id)}

    " + "" + _recurring_section_html(recurring, linked_entities=linked) + "

    Timeline (issues per cursor)

    " + "" "" "" "" + rows + "
    CursorIssuesSeverity barsCritHighMedLow
    " + "

    Per-entity totals (most flagged first)

    " + ("" "" "" + entity_rows + "
    EntityTotalCritHighMedLow
    " if entity_rows else '

    No entities flagged in any window.

    ') + _HTML_FOOT ) # --- CLI --------------------------------------------------------------- def _cli(argv: Optional[List[str]] = None) -> int: parser = argparse.ArgumentParser( prog="python -m delivery.backtest", description="Rolling backtest of the OrgState engine on a customer CSV.", ) parser.add_argument("--vertical", required=True) parser.add_argument("--csv", required=True) parser.add_argument("--out", required=True, help="output directory for backtest.html + backtest.json") parser.add_argument("--entity-type", default=None) parser.add_argument("--step", type=int, default=7, help="step the cursor forward this many days each iteration") parser.add_argument("--top-n-owners", type=int, default=None, help="(salesforce) cap owners by total accounts") parser.add_argument( "--render-entities", action="store_true", help="also write entities/.html drill-down pages and " "link them from the backtest report (Stage 18)", ) parser.add_argument( "--entity-min-events", type=int, default=1, help="(with --render-entities) only render drill-downs for entities " "flagged at least this many times (default 1 = all flagged ones)", ) args = parser.parse_args(argv) vcfg = get_vertical_config(args.vertical) entity_type = args.entity_type or next(iter(vcfg.entity_types)) cfg = vcfg.entity_type(entity_type) # source: same dispatch as real_pilot from .real_pilot import _load_via_vertical obs = _load_via_vertical(args.vertical, args.csv, top_n_owners=args.top_n_owners, entity_type=entity_type) if not obs: print(json.dumps({"error": f"zero observations from {args.csv!r}"}), flush=True) return 1 result = run_backtest(obs, cfg, step_days=args.step, tenant_id=f"backtest_{args.vertical}") out_dir = Path(args.out) out_dir.mkdir(parents=True, exist_ok=True) html_path = out_dir / "backtest.html" json_path = out_dir / "backtest.json" rendered_entities = [] if args.render_entities: # collect all entities that fired at least min_events times from collections import Counter per_entity_events = Counter() for p in result.points: for issue in p.issues: per_entity_events[issue["entity_id"]] += 1 targets = sorted(eid for eid, n in per_entity_events.items() if n >= args.entity_min_events) ent_dir = out_dir / "entities" ent_dir.mkdir(exist_ok=True) for eid in targets: (ent_dir / f"{_slug(eid)}.html").write_text( render_entity_detail_html(result, eid), encoding="utf-8", ) rendered_entities.append(eid) html_path.write_text( render_html(result, link_entities=set(rendered_entities) if rendered_entities else False), encoding="utf-8", ) # JSON: convert dataclasses from dataclasses import asdict json_path.write_text(json.dumps(asdict(result), indent=2, default=str, sort_keys=True), encoding="utf-8") print(json.dumps({ "vertical": args.vertical, "entity_type": entity_type, "n_observations": len(obs), "n_steps": result.n_steps, "n_issues_total": result.n_issues_total, "n_unique_entities_ever_flagged": result.n_unique_entities_ever_flagged, "per_severity_total": result.per_severity_total, "html": str(html_path.resolve()), "json": str(json_path.resolve()), "entity_pages": len(rendered_entities), }, indent=2, sort_keys=True)) return 0 if __name__ == "__main__": # pragma: no cover raise SystemExit(_cli())