orgstate / delivery /backtest.py
Legal-i's picture
Initial OrgState deploy via Stage 150 free-tier stack
d2d1903 verified
"""
delivery.backtest — "what would we have caught had we been running?"
(Stage 16).
Stage 15 ran the engine once on a Salesforce export and surfaced zero
current issues, because the LAST 14 days of that file were stable. But
the per-state scores hinted at historical drift events. This module
turns that hint into a measurable timeline: it walks a cursor forward
day-by-day across the dataset, runs the engine on the window ending at
each cursor, and records what the engine would have alerted on.
This is operationally useful for two reasons:
1. **Pitch**: a customer who hands us an export can see the engine
replayed against their own history — "if you'd been running this
since day 1, here are the events you would have been told about."
2. **Threshold tuning**: backtest counts per severity per month
show whether the defaults are too noisy or too quiet on this
customer's data, before flipping the switch on live alerting.
stdlib only. Calibration is re-derived per cursor from the data up to
that cursor (the engine never gets to see the future).
"""
from __future__ import annotations
import argparse
import json
from dataclasses import dataclass, field
from datetime import date, timedelta
from pathlib import Path
from typing import Dict, List, Optional
from core import calibrate_from_observations, run_pipeline
from core.config import EntityTypeConfig
from core.pipeline import Observation
from verticals import get_vertical_config
@dataclass
class BacktestPoint:
"""One step in the rolling backtest. ``cursor`` is the day the engine
is "as of"; the issues are what it would have alerted on at that
point. Issues are stored as plain dicts (asdict of DriftIssue) so
downstream rendering doesn't need to import from core."""
cursor: str
n_states: int
n_issues: int
severity_counts: Dict[str, int] = field(default_factory=dict)
issues: List[dict] = field(default_factory=list)
@dataclass
class BacktestResult:
tenant_id: str
entity_type: str
step_days: int
n_steps: int
points: List[BacktestPoint] = field(default_factory=list)
# rollups for convenience — derived from points but useful in viz
n_issues_total: int = 0
n_unique_entities_ever_flagged: int = 0
per_severity_total: Dict[str, int] = field(default_factory=dict)
@dataclass
class RecurringEntity:
"""An entity that drifted in multiple distinct windows. Stage 17 —
the single most actionable signal that came out of running the
backtest on real data: an owner who keeps drifting is a systemic
issue, not a one-off."""
entity_id: str
n_events: int
first_cursor: str
last_cursor: str
top_severity: str # worst severity ever seen
cursors: List[str] = field(default_factory=list)
# Severity ordering, worst -> best, used to summarise the "top severity
# ever seen" per recurring entity.
_SEV_RANK = {"critical": 0, "high": 1, "medium": 2, "low": 3}
def find_recurring_entities(
result: BacktestResult,
*,
min_events: int = 3,
) -> List[RecurringEntity]:
"""Return entities flagged in at least ``min_events`` distinct backtest
windows, ordered most-recurring first (ties broken by recency of last
event, then by entity_id for stability).
A "recurring" entity is structurally different from a one-off: the
same problem keeps coming back, which is exactly what an operator
wants surfaced for proactive intervention.
"""
from collections import defaultdict
per_entity_cursors: Dict[str, List[str]] = defaultdict(list)
per_entity_worst: Dict[str, str] = {}
for p in result.points:
for issue in p.issues:
eid = issue["entity_id"]
per_entity_cursors[eid].append(p.cursor)
sev = issue["severity"]
current = per_entity_worst.get(eid)
if current is None or _SEV_RANK.get(sev, 99) < _SEV_RANK.get(current, 99):
per_entity_worst[eid] = sev
out: List[RecurringEntity] = []
for eid, cursors in per_entity_cursors.items():
if len(cursors) >= min_events:
ordered = sorted(cursors)
out.append(RecurringEntity(
entity_id=eid,
n_events=len(cursors),
first_cursor=ordered[0],
last_cursor=ordered[-1],
top_severity=per_entity_worst[eid],
cursors=ordered,
))
out.sort(key=lambda r: (-r.n_events, _negate_iso(r.last_cursor),
r.entity_id))
return out
def _negate_iso(iso_day: str) -> str:
"""Helper so ``sort`` treats more-recent cursors as 'smaller'. We
can't negate a string, so we exploit ISO-8601's lexicographic
ordering and use a reverse-by-padding trick: a higher day produces
a 'smaller' key after we subtract from a sentinel. The lazy way is
to use a separate reverse list, but we want it inline."""
# YYYY-MM-DD is 10 chars; just translate digits so '9' < '0' etc.
flip = str.maketrans("0123456789", "9876543210")
return iso_day.translate(flip)
def _compute_event_details(
observations: List[Observation],
entity_id: str,
cursor_iso: str,
config: EntityTypeConfig,
) -> Dict[str, dict]:
"""Per-metric ``{baseline_mean, recent_mean, delta_pct}`` for one
``(entity, cursor)`` event (Stage 19).
This is the operator-facing explanation of *why* the engine flagged
the entity at that cursor — far more actionable than the bare drift
score. Windows use the same boundaries the engine itself used:
recent = (cursor - recent_window + 1, cursor]
baseline = (cursor - recent_window - baseline_lag - baseline_window + 1,
cursor - recent_window - baseline_lag]
"""
cursor = date.fromisoformat(cursor_iso)
recent_start = cursor - timedelta(days=config.recent_window - 1)
baseline_end = recent_start - timedelta(days=config.baseline_lag + 1)
baseline_start = baseline_end - timedelta(days=config.baseline_window - 1)
rs, re_, bs, be = (recent_start.isoformat(), cursor_iso,
baseline_start.isoformat(), baseline_end.isoformat())
metric_names = [m.name for m in config.metrics]
baseline_vals: Dict[str, list] = {m: [] for m in metric_names}
recent_vals: Dict[str, list] = {m: [] for m in metric_names}
for o in observations:
if o.entity_id != entity_id:
continue
if rs <= o.day <= re_:
for m in metric_names:
v = o.values.get(m)
if v is not None:
recent_vals[m].append(float(v))
elif bs <= o.day <= be:
for m in metric_names:
v = o.values.get(m)
if v is not None:
baseline_vals[m].append(float(v))
out: Dict[str, dict] = {}
for m in metric_names:
b = baseline_vals[m]
r = recent_vals[m]
bm = (sum(b) / len(b)) if b else None
rm = (sum(r) / len(r)) if r else None
if bm is not None and rm is not None and bm != 0:
delta_pct = (rm - bm) / abs(bm) * 100.0
else:
delta_pct = None
out[m] = {
"baseline_mean": bm,
"recent_mean": rm,
"delta_pct": delta_pct,
}
return out
def run_backtest(
observations: List[Observation],
config: EntityTypeConfig,
*,
step_days: int = 7,
tenant_id: str = "backtest",
) -> BacktestResult:
"""Walk a cursor forward in ``step_days`` increments. At each cursor,
re-calibrate on the observations up to cursor and run the pipeline.
The engine never sees the future at any cursor — each step is a
causal as-of-that-day snapshot. Calibration is re-derived rather
than reused because what the engine deems "normal" itself evolves.
Each issue dict gains a ``details`` field (Stage 19) — per-metric
baseline-vs-recent comparison that powers the "What changed"
explanation on the entity drill-down page.
"""
if not observations:
return BacktestResult(tenant_id=tenant_id,
entity_type=config.entity_type,
step_days=step_days, n_steps=0)
days = sorted({o.day for o in observations})
start_iso = days[0]
end_iso = days[-1]
start = date.fromisoformat(start_iso)
end = date.fromisoformat(end_iso)
# The first useful cursor needs at least baseline + lag + recent days
# of data behind it; before that, the engine cannot compute drift.
min_history = (config.baseline_window + config.baseline_lag
+ config.recent_window)
first_cursor = start + timedelta(days=min_history - 1)
if first_cursor > end:
return BacktestResult(tenant_id=tenant_id,
entity_type=config.entity_type,
step_days=step_days, n_steps=0)
points: List[BacktestPoint] = []
cursor = first_cursor
while cursor <= end:
cursor_iso = cursor.isoformat()
# observations the engine sees: everything up to and including cursor
window = [o for o in observations if o.day <= cursor_iso]
try:
calibration = calibrate_from_observations(window, config,
tenant_id=tenant_id)
result = run_pipeline(window, config, calibration,
tenant_id=tenant_id, top_n=50)
except Exception:
# a degenerate window (no variance, no rows for some entity)
# should not crash the whole backtest — record an empty point
cursor += timedelta(days=step_days)
continue
sev_counts = {s: 0 for s in ("critical", "high", "medium", "low")}
# latest state per entity carries the signals breakdown the engine
# actually used to make its decision — we surface it on each issue
# so the "What changed" view is grounded in the same numbers.
latest_state_by_entity: Dict[str, dict] = {}
for s in result.states:
cur = latest_state_by_entity.get(s["entity_id"])
if cur is None or s["day"] > cur["day"]:
latest_state_by_entity[s["entity_id"]] = s
issues_dicts = []
for issue in result.issues:
sev_counts[issue.severity] = sev_counts.get(issue.severity, 0) + 1
state = latest_state_by_entity.get(issue.entity_id, {})
signals = state.get("signals") or {}
issues_dicts.append({
"entity_id": issue.entity_id,
"severity": issue.severity,
"score": issue.score,
"title": issue.title,
"signals": {k: signals.get(k) for k in
("delta", "psi", "xi", "gamma", "kappa")},
"details": _compute_event_details(
window, issue.entity_id, cursor_iso, config,
),
})
points.append(BacktestPoint(
cursor=cursor_iso,
n_states=len(result.states),
n_issues=len(result.issues),
severity_counts=sev_counts,
issues=issues_dicts,
))
cursor += timedelta(days=step_days)
# rollups
all_flagged: set = set()
sev_totals: Dict[str, int] = {}
n_total = 0
for p in points:
n_total += p.n_issues
for i in p.issues:
all_flagged.add(i["entity_id"])
for s, n in p.severity_counts.items():
sev_totals[s] = sev_totals.get(s, 0) + n
return BacktestResult(
tenant_id=tenant_id,
entity_type=config.entity_type,
step_days=step_days,
n_steps=len(points),
points=points,
n_issues_total=n_total,
n_unique_entities_ever_flagged=len(all_flagged),
per_severity_total=sev_totals,
)
# --- HTML renderer ------------------------------------------------------
_HTML_HEAD = (
"<!doctype html><html><head><meta charset=\"utf-8\">"
"<title>OrgState backtest</title>"
"<style>"
"body{font-family:-apple-system,Segoe UI,Arial,sans-serif;"
"max-width:1120px;margin:32px auto;padding:0 16px;line-height:1.5;color:#222}"
"h1{font-size:1.6em;margin-bottom:0.2em}"
"h2{margin-top:1.8em;border-bottom:1px solid #ddd;padding-bottom:0.2em}"
"table{border-collapse:collapse;width:100%;margin:0.6em 0;font-size:0.94em}"
"th,td{border:1px solid #ddd;padding:4px 8px;text-align:left}"
"th{background:#f4f4f4}"
".sev{display:inline-block;padding:1px 6px;border-radius:3px;"
"font-size:0.82em;font-weight:600;color:#fff}"
".sev-critical{background:#a00}.sev-high{background:#d35400}"
".sev-medium{background:#b8860b}.sev-low{background:#777}"
".meta{color:#555;font-size:0.93em}"
"code{background:#f4f4f4;padding:1px 4px;border-radius:3px}"
".bar{display:inline-block;height:10px;vertical-align:middle;margin-right:2px}"
".bar-critical{background:#a00}.bar-high{background:#d35400}"
".bar-medium{background:#b8860b}.bar-low{background:#777}"
".cell-num{text-align:right;font-variant-numeric:tabular-nums}"
".recurring{padding:14px 18px;background:#fff7e6;border:1px solid #f0c674;"
"border-radius:6px;margin:1em 0}"
".recurring h2{border:none;margin-top:0;color:#7a4a00}"
".recurring-badge{display:inline-block;padding:1px 8px;border-radius:3px;"
"background:#d35400;color:#fff;font-size:0.82em;font-weight:600;margin-left:6px}"
"</style></head><body>"
)
_HTML_FOOT = "</body></html>"
def _esc(s):
import html as _html
return _html.escape(str(s), quote=True)
def _fmt_num(v) -> str:
if v is None:
return "—"
if abs(v) >= 100 or v == int(v):
return f"{v:.1f}"
return f"{v:.3f}"
def _fmt_delta_pct(v) -> str:
if v is None:
return "—"
sign = "+" if v >= 0 else ""
return f"{sign}{v:.1f}%"
def _delta_class(v) -> str:
"""Colour-code the delta cell. We don't know per-metric whether the
direction is "higher_is_worse" without the config; the renderer
only sees the BacktestResult. So we pick a neutral convention:
positive delta -> warm (red), negative delta -> cool (green). On
a higher_is_worse metric this matches operator intuition; on
lower_is_worse the colour is inverted but the number is right.
Operators reading the page have the metric direction in context."""
if v is None or v == 0:
return ""
return "delta-pos" if v > 0 else "delta-neg"
_SIGNAL_LEGEND = {
"delta": "Δ directional change (higher = worse direction)",
"psi": "ψ stability (lower = less stable)",
"xi": "ξ anomaly spike (higher = sharper outlier)",
"gamma": "γ SLA violation (higher = past target)",
"kappa": "κ coherence (lower = signals disagree)",
}
def _render_signals_row(signals: dict) -> str:
"""Tiny per-signal row underneath the per-metric table. Shows the
five OrgState signals the engine actually combined into the drift
score, so the operator can tell at a glance which of them lit up."""
if not signals:
return ""
parts = []
for name, label in _SIGNAL_LEGEND.items():
v = signals.get(name)
v_s = _fmt_num(v) if v is not None else "—"
parts.append(
f'<td title="{_esc(label)}"><code>{_esc(name)}</code> '
f'<strong>{v_s}</strong></td>'
)
return (
'<table style="margin-top:0.4em"><tbody><tr>'
+ "".join(parts)
+ '</tr></tbody></table>'
)
def _render_what_changed_blocks(flagged) -> str:
"""One ``<div class="changed-block">`` per flagged event with a
per-metric baseline / recent / delta% table AND a signals row so
the operator sees both 'what the numbers did' and 'which signals
the engine combined into the alert'."""
out = []
for cursor, match in flagged:
details = match.get("details") or {}
signals = match.get("signals") or {}
if not details and not signals:
continue
rows = []
for metric, d in details.items():
delta = d.get("delta_pct")
rows.append(
"<tr>"
f"<td><code>{_esc(metric)}</code></td>"
f'<td class="cell-num">{_fmt_num(d.get("baseline_mean"))}</td>'
f'<td class="cell-num">{_fmt_num(d.get("recent_mean"))}</td>'
f'<td class="cell-num {_delta_class(delta)}">'
f"{_fmt_delta_pct(delta)}</td>"
"</tr>"
)
body = (
'<table><thead><tr>'
'<th>Metric</th><th>Baseline mean</th><th>Recent mean</th>'
'<th>Delta</th>'
'</tr></thead><tbody>'
+ "".join(rows)
+ '</tbody></table>'
) if rows else ""
out.append(
'<div class="changed-block">'
f'<h3><code>{_esc(cursor)}</code> &middot; '
f'<span class="sev sev-{_esc(match["severity"])}">'
f'{_esc(match["severity"])}</span></h3>'
+ body
+ _render_signals_row(signals)
+ '</div>'
)
return "".join(out)
def _slug(entity_id: str) -> str:
"""Make a filesystem- and URL-safe slug from an arbitrary entity id.
The Salesforce ids that drove this stage are already safe, but
user-controlled ids could contain '/', '..', spaces — and we are
about to use this as a filename + an href."""
return "".join(c if c.isalnum() or c in "._-" else "_"
for c in entity_id)
def _bar_row(point: BacktestPoint, max_n: int) -> str:
"""One row of the timeline table: cursor + stacked severity bars + counts."""
parts = []
for sev in ("critical", "high", "medium", "low"):
n = point.severity_counts.get(sev, 0)
if n > 0 and max_n > 0:
width = max(2, int(120 * n / max_n))
parts.append(f'<span class="bar bar-{sev}" '
f'style="width:{width}px" title="{n} {sev}"></span>')
bar = "".join(parts) if parts else '<span class="meta">—</span>'
return (
"<tr>"
f"<td><code>{_esc(point.cursor)}</code></td>"
f'<td class="cell-num">{point.n_issues}</td>'
f"<td>{bar}</td>"
f'<td class="cell-num">{point.severity_counts.get("critical", 0)}</td>'
f'<td class="cell-num">{point.severity_counts.get("high", 0)}</td>'
f'<td class="cell-num">{point.severity_counts.get("medium", 0)}</td>'
f'<td class="cell-num">{point.severity_counts.get("low", 0)}</td>'
"</tr>"
)
def _entity_cell(entity_id: str, linked_entities) -> str:
"""Render an entity-id table cell, optionally linking to its
per-entity drill-down page (Stage 18).
``linked_entities`` is the set of entity ids that actually have a
rendered drill-down page (passed by the CLI). Entities not in the
set render as plain code — otherwise we'd link to 404s for
one-off-flagged entities whose pages were never generated.
"""
eid = _esc(entity_id)
if not linked_entities or entity_id not in linked_entities:
return f"<code>{eid}</code>"
return f'<a href="entities/{_esc(_slug(entity_id))}.html"><code>{eid}</code></a>'
def _recurring_section_html(recurring: List[RecurringEntity],
linked_entities=None) -> str:
if not recurring:
return ""
rows = []
for r in recurring:
rows.append(
"<tr>"
f"<td>{_entity_cell(r.entity_id, linked_entities)}</td>"
f'<td class="cell-num">{r.n_events}</td>'
f'<td><span class="sev sev-{_esc(r.top_severity)}">'
f"{_esc(r.top_severity)}</span></td>"
f"<td>{_esc(r.first_cursor)}</td>"
f"<td>{_esc(r.last_cursor)}</td>"
"</tr>"
)
return (
'<div class="recurring">'
'<h2>Recurring drift <span class="recurring-badge">priority</span></h2>'
'<p class="meta">Entities flagged in three or more distinct backtest '
'windows. A repeating problem on the same entity is structurally '
'different from a one-off — these are the targets for proactive '
'intervention.</p>'
'<table><thead><tr>'
'<th>Entity</th><th>Events</th><th>Worst severity</th>'
'<th>First</th><th>Last</th>'
'</tr></thead><tbody>'
+ "".join(rows)
+ '</tbody></table></div>'
)
# --- per-entity drill-down (Stage 18) ----------------------------------
def render_entity_detail_html(result: BacktestResult,
entity_id: str) -> str:
"""Standalone HTML page for a single entity — the drill-down the
recurrence callout (Stage 17) points to.
Shows:
* the entity id + summary (n events, worst severity, first/last)
* a CSS-only sparkline: one cell per backtest cursor, height
proportional to score, colour by severity (gray when below
threshold = not flagged at that cursor)
* a chronological table of cursors at which the entity was
flagged, with score / severity / title
Pure stdlib; CSS-only graphics. Same visual language as the
backtest page so the operator does not have to context-switch.
"""
# collect: for every cursor in the backtest, the entity's issue
# (None if not flagged). The drill-down's whole story sits in
# this list.
per_cursor = []
for p in result.points:
match = next((i for i in p.issues if i["entity_id"] == entity_id),
None)
per_cursor.append((p.cursor, match))
flagged = [(c, m) for c, m in per_cursor if m is not None]
if not flagged:
body = (
f"<h1>{_esc(entity_id)}</h1>"
'<p class="meta">This entity was not flagged in any backtest '
'window — nothing to drill into.</p>'
)
return _HTML_HEAD + body + _HTML_FOOT
n_events = len(flagged)
first_cursor = flagged[0][0]
last_cursor = flagged[-1][0]
severities = [m["severity"] for _, m in flagged]
top_severity = min(severities, key=lambda s: _SEV_RANK.get(s, 99))
# CSS-only sparkline. Each cursor is one fixed-width cell. Cells
# where the entity was flagged get a coloured bar at the right
# height for the score; other cells stay empty.
# Heights are in px, range up to 32 (0..1 -> 0..32).
spark_cells = []
for cursor, match in per_cursor:
if match is None:
spark_cells.append(
f'<div class="spark-cell" title="{_esc(cursor)} — not flagged">'
'</div>'
)
else:
sev = match["severity"]
score = float(match["score"])
h = max(2, min(32, int(round(score * 32))))
spark_cells.append(
f'<div class="spark-cell" '
f'title="{_esc(cursor)}{_esc(sev)} (score {score:.3f})">'
f'<div class="spark-bar spark-{_esc(sev)}" '
f'style="height:{h}px"></div>'
'</div>'
)
sparkline = (
'<div class="spark-extra">'
+ "".join(spark_cells)
+ '</div>'
)
# event table — chronological
rows = []
for cursor, match in flagged:
rows.append(
"<tr>"
f"<td><code>{_esc(cursor)}</code></td>"
f'<td><span class="sev sev-{_esc(match["severity"])}">'
f'{_esc(match["severity"])}</span></td>'
f'<td class="cell-num">{float(match["score"]):.3f}</td>'
f"<td>{_esc(match['title'])}</td>"
"</tr>"
)
# "What changed" panels (Stage 19) — one per flagged event, showing
# per-metric baseline vs recent. This is the operator-facing "why"
# behind the bare drift score.
what_changed = _render_what_changed_blocks(flagged)
extra_css = (
"<style>"
".spark-extra{display:flex;align-items:flex-end;gap:2px;"
"height:40px;padding:6px 8px;background:#f9f9f9;"
"border:1px solid #e0e0e0;border-radius:4px;overflow-x:auto}"
".spark-cell{flex:0 0 6px;height:34px;display:flex;align-items:flex-end}"
".spark-bar{width:6px;border-radius:1px}"
".spark-critical{background:#a00}.spark-high{background:#d35400}"
".spark-medium{background:#b8860b}.spark-low{background:#777}"
".back-link{display:inline-block;margin-bottom:1em;color:#555;"
"text-decoration:none;font-size:0.95em}"
".back-link:hover{text-decoration:underline}"
".changed-block{padding:10px 14px;background:#fcfcfc;"
"border:1px solid #e0e0e0;border-radius:4px;margin:0.8em 0}"
".changed-block h3{margin:0 0 0.4em 0;font-size:1em;color:#444}"
".delta-pos{color:#a00}.delta-neg{color:#2c8a3e}"
"</style>"
)
body = (
'<a class="back-link" href="../backtest.html">&larr; back to backtest</a>'
f"<h1>{_esc(entity_id)}</h1>"
'<ul>'
f'<li><strong>Events:</strong> {n_events}</li>'
f'<li><strong>Worst severity:</strong> '
f'<span class="sev sev-{_esc(top_severity)}">'
f'{_esc(top_severity)}</span></li>'
f'<li><strong>First:</strong> <code>{_esc(first_cursor)}</code></li>'
f'<li><strong>Last:</strong> <code>{_esc(last_cursor)}</code></li>'
'</ul>'
'<h2>Score trajectory</h2>'
'<p class="meta">One bar per backtest cursor. Bar height is the '
'drift score (0..1); colour is the severity. Empty cells are '
'cursors where this entity was below threshold.</p>'
+ sparkline
+ '<h2>Flagged events</h2>'
+ '<table><thead><tr>'
'<th>Cursor</th><th>Severity</th><th>Score</th><th>Title</th>'
'</tr></thead><tbody>'
+ "".join(rows)
+ '</tbody></table>'
+ (('<h2>What changed</h2>'
'<p class="meta">Per-metric baseline (the entity\'s own '
"history before each event) vs recent (the window that "
"tripped the engine). This is the operator-facing 'why' "
"behind the bare drift score.</p>"
+ what_changed) if what_changed else "")
)
return _HTML_HEAD.replace("</style>", extra_css + "</style>") + body + _HTML_FOOT
def render_html(result: BacktestResult,
min_recurring_events: int = 3,
link_entities=False) -> str:
"""Render the backtest HTML.
``link_entities`` can be:
* False (default) — entity ids render as plain code, no links
* True — every entity id is wrapped in a link (use this only if
you know every entity has a drill-down page; otherwise prefer
a set, below)
* a set / collection of entity ids — only those ids are linked,
the rest render as plain code. This avoids 404s when only the
recurring entities have drill-down pages rendered.
"""
"""A single self-contained HTML page: rollup at top, **recurring
drift callout (Stage 17)**, timeline table with stacked severity
bars, then a per-entity total table.
"""
if not result.points:
return (
_HTML_HEAD
+ "<h1>OrgState backtest</h1>"
+ '<p class="meta">No steps produced — the dataset does not have '
"enough history for the engine's baseline+recent windows.</p>"
+ _HTML_FOOT
)
# Resolve link_entities up front so every cell-rendering helper can
# see it. True -> link everyone; False/None -> link no one; set ->
# link only those ids.
if link_entities is True:
linked = {i["entity_id"] for p in result.points for i in p.issues}
elif link_entities is False or link_entities is None:
linked = None
else:
linked = set(link_entities)
# rollup
rollup_parts = [
f"<li><strong>Entity type:</strong> {_esc(result.entity_type)}</li>",
f"<li><strong>Step:</strong> every {result.step_days} day(s)</li>",
f"<li><strong>Steps run:</strong> {result.n_steps}</li>",
f"<li><strong>Total issue-events:</strong> {result.n_issues_total}</li>",
(f"<li><strong>Unique entities ever flagged:</strong> "
f"{result.n_unique_entities_ever_flagged}</li>"),
]
sev_breakdown = " &middot; ".join(
f"{result.per_severity_total.get(s, 0)} {s}"
for s in ("critical", "high", "medium", "low")
)
rollup_parts.append(
f"<li><strong>By severity:</strong> {sev_breakdown}</li>"
)
# timeline table
max_per_step = max(p.n_issues for p in result.points) if result.points else 0
rows = "".join(_bar_row(p, max_per_step) for p in result.points)
# per-entity total
by_entity: Dict[str, Dict[str, int]] = {}
for p in result.points:
for issue in p.issues:
eid = issue["entity_id"]
d = by_entity.setdefault(eid, {"total": 0, "critical": 0,
"high": 0, "medium": 0, "low": 0})
d["total"] += 1
d[issue["severity"]] += 1
entity_rows = "".join(
"<tr>"
f"<td>{_entity_cell(eid, linked)}</td>"
f'<td class="cell-num">{d["total"]}</td>'
f'<td class="cell-num">{d["critical"]}</td>'
f'<td class="cell-num">{d["high"]}</td>'
f'<td class="cell-num">{d["medium"]}</td>'
f'<td class="cell-num">{d["low"]}</td>'
"</tr>"
for eid, d in sorted(by_entity.items(),
key=lambda kv: -kv[1]["total"])
)
recurring = find_recurring_entities(result, min_events=min_recurring_events)
return (
_HTML_HEAD
+ f"<h1>OrgState backtest &mdash; <code>{_esc(result.tenant_id)}</code></h1>"
+ "<ul>" + "".join(rollup_parts) + "</ul>"
+ _recurring_section_html(recurring, linked_entities=linked)
+ "<h2>Timeline (issues per cursor)</h2>"
+ "<table><thead><tr>"
"<th>Cursor</th><th>Issues</th><th>Severity bars</th>"
"<th>Crit</th><th>High</th><th>Med</th><th>Low</th>"
"</tr></thead><tbody>"
+ rows
+ "</tbody></table>"
+ "<h2>Per-entity totals (most flagged first)</h2>"
+ ("<table><thead><tr>"
"<th>Entity</th><th>Total</th><th>Crit</th><th>High</th>"
"<th>Med</th><th>Low</th></tr></thead><tbody>"
+ entity_rows
+ "</tbody></table>" if entity_rows
else '<p class="meta">No entities flagged in any window.</p>')
+ _HTML_FOOT
)
# --- CLI ---------------------------------------------------------------
def _cli(argv: Optional[List[str]] = None) -> int:
parser = argparse.ArgumentParser(
prog="python -m delivery.backtest",
description="Rolling backtest of the OrgState engine on a customer CSV.",
)
parser.add_argument("--vertical", required=True)
parser.add_argument("--csv", required=True)
parser.add_argument("--out", required=True,
help="output directory for backtest.html + backtest.json")
parser.add_argument("--entity-type", default=None)
parser.add_argument("--step", type=int, default=7,
help="step the cursor forward this many days each iteration")
parser.add_argument("--top-n-owners", type=int, default=None,
help="(salesforce) cap owners by total accounts")
parser.add_argument(
"--render-entities", action="store_true",
help="also write entities/<entity_id>.html drill-down pages and "
"link them from the backtest report (Stage 18)",
)
parser.add_argument(
"--entity-min-events", type=int, default=1,
help="(with --render-entities) only render drill-downs for entities "
"flagged at least this many times (default 1 = all flagged ones)",
)
args = parser.parse_args(argv)
vcfg = get_vertical_config(args.vertical)
entity_type = args.entity_type or next(iter(vcfg.entity_types))
cfg = vcfg.entity_type(entity_type)
# source: same dispatch as real_pilot
from .real_pilot import _load_via_vertical
obs = _load_via_vertical(args.vertical, args.csv,
top_n_owners=args.top_n_owners,
entity_type=entity_type)
if not obs:
print(json.dumps({"error": f"zero observations from {args.csv!r}"}),
flush=True)
return 1
result = run_backtest(obs, cfg, step_days=args.step,
tenant_id=f"backtest_{args.vertical}")
out_dir = Path(args.out)
out_dir.mkdir(parents=True, exist_ok=True)
html_path = out_dir / "backtest.html"
json_path = out_dir / "backtest.json"
rendered_entities = []
if args.render_entities:
# collect all entities that fired at least min_events times
from collections import Counter
per_entity_events = Counter()
for p in result.points:
for issue in p.issues:
per_entity_events[issue["entity_id"]] += 1
targets = sorted(eid for eid, n in per_entity_events.items()
if n >= args.entity_min_events)
ent_dir = out_dir / "entities"
ent_dir.mkdir(exist_ok=True)
for eid in targets:
(ent_dir / f"{_slug(eid)}.html").write_text(
render_entity_detail_html(result, eid), encoding="utf-8",
)
rendered_entities.append(eid)
html_path.write_text(
render_html(result,
link_entities=set(rendered_entities) if rendered_entities
else False),
encoding="utf-8",
)
# JSON: convert dataclasses
from dataclasses import asdict
json_path.write_text(json.dumps(asdict(result), indent=2,
default=str, sort_keys=True),
encoding="utf-8")
print(json.dumps({
"vertical": args.vertical,
"entity_type": entity_type,
"n_observations": len(obs),
"n_steps": result.n_steps,
"n_issues_total": result.n_issues_total,
"n_unique_entities_ever_flagged": result.n_unique_entities_ever_flagged,
"per_severity_total": result.per_severity_total,
"html": str(html_path.resolve()),
"json": str(json_path.resolve()),
"entity_pages": len(rendered_entities),
}, indent=2, sort_keys=True))
return 0
if __name__ == "__main__": # pragma: no cover
raise SystemExit(_cli())