orgstate / infra /cli.py
Legal-i's picture
Initial OrgState deploy via Stage 150 free-tier stack
d2d1903 verified
"""
infra.cli — operator command-line interface (Stage 9).
Wraps the most common ``OrgStateService`` workflows so an operator can
do bootstrap and rotation without writing Python:
python -m infra tenant register acme "ACME Logistics"
python -m infra admin-key mint --name primary
python -m infra key mint acme --name webhook --expires-at 2026-12-31T00:00:00+00:00
python -m infra key list acme
python -m infra key revoke acme key_abcd1234
Output is JSON to stdout (one object per command), so the operator can
``| jq`` everything. Exit codes: 0 on success, 1 on user error (invalid
argument, unknown tenant), 2 on argparse-level error.
The DB target follows the same rules as the API and scheduler — pass
``--db`` or rely on ``ORGSTATE_DB_PATH`` (default: on-disk SQLite at the
package default).
"""
from __future__ import annotations
import argparse
import json
import sys
from dataclasses import asdict, is_dataclass
from pathlib import Path
from typing import Any, List, Optional
from infra import CalibrationNotFound, OrgStateService
from infra.storage import db_status
def _emit(obj: Any) -> None:
"""Print one JSON object to stdout, ensuring ASCII-safe + sorted keys
so output is stable across runs (the operator's pipelines can pin
snapshots without flaky key-order diffs)."""
print(json.dumps(_jsonable(obj), indent=2, sort_keys=True, default=str))
def _jsonable(obj: Any) -> Any:
if is_dataclass(obj):
return asdict(obj)
if isinstance(obj, list):
return [_jsonable(x) for x in obj]
if isinstance(obj, dict):
return {k: _jsonable(v) for k, v in obj.items()}
return obj
# --- handlers (one per subcommand) ---------------------------------------
def _cmd_tenant_register(svc: OrgStateService, args) -> int:
t = svc.register_tenant(args.tenant_id, args.name, vertical=args.vertical)
_emit(t)
return 0
def _cmd_tenant_list(svc: OrgStateService, args) -> int:
_emit(svc.list_tenants())
return 0
def _cmd_tenant_get(svc: OrgStateService, args) -> int:
t = svc.get_tenant(args.tenant_id)
if t is None:
print(json.dumps({"error": f"unknown tenant {args.tenant_id!r}"}),
file=sys.stderr)
return 1
_emit(t)
return 0
def _cmd_tenant_threshold_set(svc: OrgStateService, args) -> int:
"""Stage 58: set per-tenant severity threshold overrides.
Supports --critical/--high/--medium/--low; only the levels
passed get overridden (others fall back to defaults at runtime)."""
if svc.get_tenant(args.tenant_id) is None:
print(json.dumps({"error": f"unknown tenant {args.tenant_id!r}"}),
file=sys.stderr)
return 1
st = {}
for name in ("critical", "high", "medium", "low"):
v = getattr(args, name)
if v is not None:
st[name] = v
if not st:
print(json.dumps({
"error": "pass at least one of --critical/--high/--medium/--low",
}), file=sys.stderr)
return 2
overrides = {"severity_thresholds": st}
svc.set_tenant_overrides(
args.tenant_id, args.entity_type, overrides,
actor=args.actor,
)
_emit({"tenant_id": args.tenant_id,
"entity_type": args.entity_type,
"overrides": overrides})
return 0
def _cmd_tenant_threshold_get(svc: OrgStateService, args) -> int:
if svc.get_tenant(args.tenant_id) is None:
print(json.dumps({"error": f"unknown tenant {args.tenant_id!r}"}),
file=sys.stderr)
return 1
if args.entity_type:
overrides = svc.get_tenant_overrides(args.tenant_id,
args.entity_type)
_emit({"tenant_id": args.tenant_id,
"entity_type": args.entity_type,
"overrides": overrides})
else:
_emit({"tenant_id": args.tenant_id,
"overrides": svc.list_tenant_overrides(args.tenant_id)})
return 0
def _cmd_tenant_weights_set(svc: OrgStateService, args) -> int:
"""Stage 61: set per-tenant drift_weights override. Symmetric to
threshold set — only signals passed get overridden, the rest
keep their defaults at runtime."""
if svc.get_tenant(args.tenant_id) is None:
print(json.dumps({"error": f"unknown tenant {args.tenant_id!r}"}),
file=sys.stderr)
return 1
dw = {}
for name in ("delta", "psi", "xi", "gamma", "kappa"):
v = getattr(args, name)
if v is not None:
dw[name] = v
if not dw:
print(json.dumps({
"error": "pass at least one of --delta/--psi/--xi/"
"--gamma/--kappa",
}), file=sys.stderr)
return 2
# PUT semantics — merge with whatever else is already stored on
# this (tenant, entity_type) so a threshold override stays put
# when we add weights, and vice-versa
existing = svc.get_tenant_overrides(args.tenant_id, args.entity_type)
merged = {**existing, "drift_weights": {
**existing.get("drift_weights", {}), **dw,
}}
svc.set_tenant_overrides(
args.tenant_id, args.entity_type, merged,
actor=args.actor,
)
_emit({"tenant_id": args.tenant_id,
"entity_type": args.entity_type,
"overrides": merged})
return 0
def _cmd_tenant_threshold_clear(svc: OrgStateService, args) -> int:
if svc.get_tenant(args.tenant_id) is None:
print(json.dumps({"error": f"unknown tenant {args.tenant_id!r}"}),
file=sys.stderr)
return 1
deleted = svc.clear_tenant_overrides(
args.tenant_id, args.entity_type, actor=args.actor,
)
_emit({"tenant_id": args.tenant_id,
"entity_type": args.entity_type,
"deleted": deleted})
return 0
def _cmd_tenant_weights_clear(svc: OrgStateService, args) -> int:
"""Stage 61: remove only the drift_weights override (preserve
severity_thresholds if also present). Distinct from `threshold
clear` which removes ALL overrides — operators tuning weights
don't necessarily want to lose their thresholds."""
if svc.get_tenant(args.tenant_id) is None:
print(json.dumps({"error": f"unknown tenant {args.tenant_id!r}"}),
file=sys.stderr)
return 1
existing = svc.get_tenant_overrides(args.tenant_id,
args.entity_type)
if "drift_weights" not in existing:
_emit({"tenant_id": args.tenant_id,
"entity_type": args.entity_type,
"removed_drift_weights": False})
return 0
# remove just the drift_weights key
remaining = {k: v for k, v in existing.items()
if k != "drift_weights"}
if remaining:
svc.set_tenant_overrides(args.tenant_id, args.entity_type,
remaining, actor=args.actor)
else:
# nothing left → clear the whole row
svc.clear_tenant_overrides(args.tenant_id, args.entity_type,
actor=args.actor)
_emit({"tenant_id": args.tenant_id,
"entity_type": args.entity_type,
"removed_drift_weights": True})
return 0
def _cmd_tenant_set_status(svc: OrgStateService, args) -> int:
"""Stage 52: handles both `tenant pause` and `tenant resume`.
The subparser sets ``args.status`` via set_defaults so one
function serves both verbs."""
if svc.get_tenant(args.tenant_id) is None:
print(json.dumps({"error": f"unknown tenant {args.tenant_id!r}"}),
file=sys.stderr)
return 1
row = svc.set_tenant_status(args.tenant_id, args.status,
actor=args.actor)
_emit(row)
return 0
def _cmd_tenant_export(svc: OrgStateService, args) -> int:
try:
payload = svc.export_tenant(args.tenant_id)
except KeyError:
print(json.dumps({"error": f"unknown tenant {args.tenant_id!r}"}),
file=sys.stderr)
return 1
if args.out:
Path(args.out).parent.mkdir(parents=True, exist_ok=True)
Path(args.out).write_text(
json.dumps(_jsonable(payload), indent=2, sort_keys=True,
default=str),
encoding="utf-8",
)
_emit({"out": str(Path(args.out).resolve()),
"bytes": Path(args.out).stat().st_size})
else:
_emit(payload)
return 0
def _cmd_tenant_billing_config(svc: OrgStateService, args) -> int:
"""Stage 105: set the tenant's default plan + Stripe customer
so subsequent `billing invoice` / `billing push` invocations
don't need --plan / --customer flags."""
# argparse default for both fields is None; we treat that as
# "leave unchanged". Service raises if both are None.
try:
result = svc.set_tenant_billing_config(
args.tenant_id,
plan_name=args.plan,
stripe_customer_id=args.stripe_customer,
actor=args.actor,
)
except ValueError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
_emit(result)
return 0
def _cmd_tenant_billing_show(svc: OrgStateService, args) -> int:
"""Stage 105: read-only display of stored billing config."""
try:
result = svc.get_tenant_billing_config(args.tenant_id)
except ValueError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
_emit(result)
return 0
def _cmd_tenant_plan_override_set(svc: OrgStateService, args) -> int:
"""Stage 109: write a per-tenant plan override row.
--tiers takes a JSON list like '[[10000,3],[null,1]]' for
cumulative tier walk; null in position 0 = unbounded final
tier. --free-quota and --price-cents-per-unit are individual
overrides for flat-mode contracts."""
tiers = None
if args.tiers:
try:
tiers = json.loads(args.tiers)
except ValueError as e:
print(json.dumps({"error": f"--tiers is not valid JSON: {e}"}),
file=sys.stderr)
return 1
if not isinstance(tiers, list):
print(json.dumps({"error": "--tiers must be a JSON list"}),
file=sys.stderr)
return 1
try:
row = svc.set_tenant_plan_override(
args.tenant_id, args.metric,
free_quota=args.free_quota,
price_cents_per_unit=args.price_cents_per_unit,
tiers=tiers,
note=args.note,
actor=args.actor,
)
except ValueError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
_emit(row)
return 0
def _cmd_tenant_plan_override_clear(svc: OrgStateService, args) -> int:
try:
ok = svc.clear_tenant_plan_override(
args.tenant_id, args.metric, actor=args.actor,
)
except ValueError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
if not ok:
print(json.dumps({
"error": f"no override for {args.tenant_id!r} "
f"metric {args.metric!r}",
}), file=sys.stderr)
return 1
_emit({"tenant_id": args.tenant_id, "metric": args.metric,
"cleared": True})
return 0
def _cmd_tenant_rate_limit_fail_closed_set(svc: OrgStateService, args) -> int:
"""Stage 128: opt the tenant into / out of rate-limit fail-closed
mode. ``--value=true`` → 503 on rate-limit backend hiccup;
``--value=false`` → keep serving; ``--clear`` → remove the
override (tenant follows process default again)."""
if args.clear:
value: Optional[bool] = None
else:
if args.value is None:
print(json.dumps({
"error": "must pass --value true/false OR --clear",
}), file=sys.stderr)
return 1
v = args.value.strip().lower()
if v in ("true", "1", "yes", "on"):
value = True
elif v in ("false", "0", "no", "off"):
value = False
else:
print(json.dumps({
"error": f"--value must be true/false, got {args.value!r}",
}), file=sys.stderr)
return 1
try:
row = svc.set_tenant_rate_limit_fail_closed(
args.tenant_id, value, actor=args.actor,
)
except ValueError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
_emit({
"tenant_id": args.tenant_id,
"rate_limit_fail_closed": value,
"tenant": row,
})
return 0
def _cmd_tenant_rate_limit_fail_closed_show(svc: OrgStateService, args) -> int:
"""Stage 128: read the per-tenant override. None = follow
process default; True/False = explicit override."""
if svc.get_tenant(args.tenant_id) is None:
print(json.dumps({"error": f"unknown tenant {args.tenant_id!r}"}),
file=sys.stderr)
return 1
value = svc.get_tenant_rate_limit_fail_closed(args.tenant_id)
_emit({
"tenant_id": args.tenant_id,
"rate_limit_fail_closed": value,
"follows_process_default": value is None,
})
return 0
def _cmd_tenant_plan_override_show(svc: OrgStateService, args) -> int:
"""Read-only dump of all override rows for a tenant + the
resolved effective plan for verification."""
try:
rows = svc.list_tenant_plan_overrides(args.tenant_id)
eff = svc.get_effective_plan(args.tenant_id)
except ValueError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
eff_dict = {
"name": eff.name,
"currency": eff.currency,
"lines": {
m: {
"free_quota": pl.free_quota,
"price_cents_per_unit": (
pl.price_cents_per_unit if not pl.is_tiered else None
),
"tiers": (
[list(t) for t in pl.tiers] if pl.is_tiered else None
),
}
for m, pl in eff.lines.items()
},
}
_emit({
"tenant_id": args.tenant_id,
"overrides": rows,
"effective_plan": eff_dict,
})
return 0
# --- SCIM provisioning (Stage 103) ----------------------------------
def _cmd_scim_issue_token(svc: OrgStateService, args) -> int:
"""Mint a per-tenant SCIM bearer token. Raw value is shown
ONCE — operator pastes it into the IdP's SCIM connector
config. SCIM tokens have role='scim' so they CANNOT access
non-SCIM routes (see infra/auth/roles.py)."""
if svc.get_tenant(args.tenant_id) is None:
print(json.dumps({"error": f"unknown tenant {args.tenant_id!r}"}),
file=sys.stderr)
return 1
try:
key = svc.create_api_key(
args.tenant_id,
name=args.name or "scim",
expires_at=args.expires_at,
role="scim",
)
except ValueError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
_emit(key)
return 0
def _cmd_scim_list_users(svc: OrgStateService, args) -> int:
"""Ops-facing read of SCIM-provisioned users for a tenant.
Returns the persisted rows (NOT the SCIM JSON shape — for
that, call the API). Useful for sanity-checking what an IdP
push actually wrote."""
if svc.get_tenant(args.tenant_id) is None:
print(json.dumps({"error": f"unknown tenant {args.tenant_id!r}"}),
file=sys.stderr)
return 1
rows, total = svc.scim_users.list_for_tenant(
args.tenant_id,
start_index=1,
count=args.limit,
)
_emit({"total": total, "users": rows})
return 0
def _cmd_scim_count(svc: OrgStateService, args) -> int:
"""Quick gauge for monitoring drift between IdP and OrgState."""
if svc.get_tenant(args.tenant_id) is None:
print(json.dumps({"error": f"unknown tenant {args.tenant_id!r}"}),
file=sys.stderr)
return 1
n_users = svc.scim_users.count_for_tenant(args.tenant_id)
n_groups = svc.scim_groups.count_for_tenant(args.tenant_id)
_emit({"tenant_id": args.tenant_id,
"scim_users": n_users,
"scim_groups": n_groups})
return 0
def _cmd_scim_list_groups(svc: OrgStateService, args) -> int:
"""Stage 108: ops view of SCIM-provisioned groups + members."""
if svc.get_tenant(args.tenant_id) is None:
print(json.dumps({"error": f"unknown tenant {args.tenant_id!r}"}),
file=sys.stderr)
return 1
rows, total = svc.scim_groups.list_for_tenant(
args.tenant_id, count=args.limit,
)
enriched = []
for r in rows:
members = svc.scim_groups.list_members(
args.tenant_id, r["scim_id"],
)
out = dict(r)
out["members"] = members
enriched.append(out)
_emit({"total": total, "groups": enriched})
return 0
def _cmd_tenant_delete(svc: OrgStateService, args) -> int:
"""Stage 82 (GDPR Article 17): cascade-delete every row that
belongs to <tenant_id>. Requires --confirm <tenant_id> to
match the positional — a double-confirmation. Without it,
exit 1 with a clear error and DON'T DELETE.
Always best to take `infra db backup` first; the runbook
spells this out."""
if args.confirm != args.tenant_id:
print(json.dumps({
"error": (
f"deletion of tenant {args.tenant_id!r} requires "
f"--confirm {args.tenant_id} — the flag value must "
f"match the positional. Erasure is IRREVERSIBLE."
),
}), file=sys.stderr)
return 1
try:
result = svc.delete_tenant(args.tenant_id, actor=args.actor)
except KeyError:
print(json.dumps({"error": f"unknown tenant {args.tenant_id!r}"}),
file=sys.stderr)
return 1
_emit(result)
return 0
# --- usage meter + quotas (Stage 89) --------------------------------
def _cmd_usage_show(svc: OrgStateService, args) -> int:
try:
summary = svc.usage_summary(args.tenant_id, period=args.period)
except ValueError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
_emit(summary)
return 0
def _cmd_usage_set_cap(svc: OrgStateService, args) -> int:
"""Set a soft cap on (tenant, metric). --limit=0 is valid
(cap at zero); omit --limit to CLEAR (unlimited)."""
soft_limit = args.limit # None when omitted
try:
row = svc.set_tenant_quota(
args.tenant_id, args.metric, soft_limit,
actor=args.actor,
)
except ValueError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
_emit(row)
return 0
def _cmd_usage_list_caps(svc: OrgStateService, args) -> int:
try:
rows = svc.get_tenant_quotas(args.tenant_id)
except ValueError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
_emit({"tenant_id": args.tenant_id, "quotas": rows})
return 0
# --- onboard wizard (Stage 85) --------------------------------------
def _cmd_onboard(svc: OrgStateService, args) -> int:
"""Stage 85 — guided new-tenant setup. Bundles register +
mint into a single emitted summary suitable for the
customer success engineer to paste into the customer's
onboarding email.
Steps:
1. register_tenant (skip if --resume and already exists)
2. mint admin key (always — the customer needs one to
do further key management)
3. optionally mint operator key (--mint-operator) for
programmatic use that doesn't need full key-management
permissions
4. emit a JSON summary with next_steps (curl examples
pre-filled with the customer's tenant_id + base URL)
The raw API keys are shown ONCE in the summary. That's
the only time the customer ever sees them — hash-only
persistence rule (Stage 5a). The CS engineer is expected
to share the summary via a secure channel."""
from verticals import available_verticals
existing = svc.get_tenant(args.tenant_id)
if existing is not None and not args.resume:
print(json.dumps({
"error": (
f"tenant {args.tenant_id!r} already exists. "
f"Pass --resume to mint a NEW key for the existing "
f"tenant (e.g. customer lost their key) without "
f"re-registering. The old keys stay valid until "
f"you `infra key revoke` them."
),
}), file=sys.stderr)
return 1
if args.vertical not in available_verticals():
print(json.dumps({
"error": (
f"unknown vertical {args.vertical!r} — "
f"available: {sorted(available_verticals())}"
),
}), file=sys.stderr)
return 1
if existing is None:
tenant = svc.register_tenant(
args.tenant_id, args.name, vertical=args.vertical,
actor=args.actor,
)
registered = True
else:
tenant = existing
registered = False
admin_key = svc.create_api_key(
args.tenant_id, name=args.admin_key_name,
role="admin", actor=args.actor,
)
keys_out = [{
"key_id": admin_key.key_id,
"raw_key": admin_key.raw,
"role": "admin",
"name": admin_key.name,
}]
if args.mint_operator:
operator_key = svc.create_api_key(
args.tenant_id, name="onboarding-operator",
role="operator", actor=args.actor,
)
keys_out.append({
"key_id": operator_key.key_id,
"raw_key": operator_key.raw,
"role": "operator",
"name": operator_key.name,
})
base = args.base_url.rstrip("/")
primary_key_var = "ORGSTATE_TENANT_KEY"
summary = {
"tenant": {
"tenant_id": tenant["tenant_id"],
"name": tenant["name"],
"vertical": tenant["vertical"],
},
"registered": registered,
"resumed": (not registered),
"keys": keys_out,
"next_steps": [
"# 1. export your admin key (shown ONCE — save it now):",
f"export {primary_key_var}={admin_key.raw}",
"# 2. verify the tenant is live:",
f"curl -H \"Authorization: Bearer ${primary_key_var}\" "
f"{base}/tenants/{args.tenant_id}",
"# 3. register a webhook so alerts reach you:",
f"curl -X POST -H \"Authorization: Bearer ${primary_key_var}\" "
f"-H 'Content-Type: application/json' "
f"-d '{{\"url\": \"https://your-receiver.example/hook\"}}' "
f"{base}/tenants/{args.tenant_id}/webhooks",
"# 4. push your first observations + trigger a run:",
"# see CUSTOMER_QUICKSTART_HE.md for the full walkthrough",
],
"documentation": "CUSTOMER_QUICKSTART_HE.md",
"warning": (
"raw_key values appear ONCE in this response. After "
"this they exist only as sha256 hashes server-side. "
"Treat them like a password."
),
}
_emit(summary)
return 0
def _cmd_key_mint(svc: OrgStateService, args) -> int:
if svc.get_tenant(args.tenant_id) is None:
print(json.dumps({"error": f"unknown tenant {args.tenant_id!r}"}),
file=sys.stderr)
return 1
try:
key = svc.create_api_key(args.tenant_id, name=args.name or "",
expires_at=args.expires_at,
role=args.role)
except ValueError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
_emit(key) # ApiKey dataclass with raw populated
return 0
def _cmd_key_list(svc: OrgStateService, args) -> int:
if svc.get_tenant(args.tenant_id) is None:
print(json.dumps({"error": f"unknown tenant {args.tenant_id!r}"}),
file=sys.stderr)
return 1
_emit(svc.list_api_keys(args.tenant_id,
include_revoked=args.include_revoked))
return 0
def _cmd_key_revoke(svc: OrgStateService, args) -> int:
row = svc.api_keys.get(args.key_id)
if row is None or row.tenant_id != args.tenant_id:
print(json.dumps({"error": f"unknown api key {args.key_id!r}"}),
file=sys.stderr)
return 1
newly = svc.revoke_api_key(args.key_id)
_emit({"key_id": args.key_id, "revoked": True, "newly_revoked": newly})
return 0
def _cmd_admin_mint(svc: OrgStateService, args) -> int:
key = svc.create_admin_key(name=args.name or "",
expires_at=args.expires_at)
_emit(key)
return 0
def _cmd_admin_list(svc: OrgStateService, args) -> int:
_emit(svc.list_admin_keys(include_revoked=args.include_revoked))
return 0
def _cmd_admin_revoke(svc: OrgStateService, args) -> int:
if svc.admin_keys.get(args.key_id) is None:
print(json.dumps({"error": f"unknown admin key {args.key_id!r}"}),
file=sys.stderr)
return 1
newly = svc.revoke_admin_key(args.key_id)
_emit({"key_id": args.key_id, "revoked": True, "newly_revoked": newly})
return 0
# --- ops subcommands (Stage 10) ------------------------------------------
def _cmd_status(svc: OrgStateService, args) -> int:
"""Single 'is this thing alive?' answer — backend, schema version,
plus platform-wide totals an operator wants at a glance."""
info = db_status(svc.db)
tenants = svc.list_tenants()
n_open_total = 0
for t in tenants:
n_open_total += svc.decision_summary(t["tenant_id"]).get("open", 0)
n_runs = svc.db.query_one("SELECT COUNT(*) AS n FROM runs")["n"]
n_active_api_keys = svc.db.query_one(
"SELECT COUNT(*) AS n FROM api_keys WHERE revoked_at IS NULL"
)["n"]
n_active_admin_keys = svc.db.query_one(
"SELECT COUNT(*) AS n FROM admin_keys WHERE revoked_at IS NULL"
)["n"]
_emit({
"backend": info["backend"],
"schema_version": info["schema_version"],
"n_tenants": len(tenants),
"n_runs": n_runs,
"n_open_decisions": n_open_total,
"n_active_api_keys": n_active_api_keys,
"n_active_admin_keys": n_active_admin_keys,
})
return 0
def _cmd_run_diff(svc: OrgStateService, args) -> int:
run = svc.get_run(args.run_id)
if run is None:
print(json.dumps({"error": f"unknown run {args.run_id!r}"}),
file=sys.stderr)
return 1
try:
diff = svc.diff_runs(run["tenant_id"], args.run_id,
against_run_id=args.against)
except KeyError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
_emit(diff)
return 0
# Stage 22: severity threshold gate for cron/monitoring wraps. Ordered
# worst -> best so "X satisfies threshold" is "X's rank <= threshold's
# rank" — same convention the engine itself uses.
_SEVERITY_RANK = {"critical": 0, "high": 1, "medium": 2, "low": 3}
def _meets_threshold(severity: str, threshold: str) -> bool:
"""True iff ``severity`` is at least as severe as ``threshold``."""
s = _SEVERITY_RANK.get(severity, 99)
t = _SEVERITY_RANK.get(threshold, 99)
return s <= t
def _post_webhook(url: str, payload: dict,
secret_env: Optional[str] = None) -> dict:
"""Stage 40: best-effort POST of an alert payload to a webhook URL.
Returns a small status dict (status_code / error) so the caller
can include it in audit/log output. Never raises — HTTP failures
must not affect the gate exit code (the gate's job is the gate,
the webhook is courtesy).
HMAC-SHA256(body, secret) goes in X-OrgState-Signature when
``secret_env`` names an env var that holds the secret. Receivers
verify with the same secret to confirm the payload came from us.
"""
import hashlib
import hmac
import os
import urllib.error
import urllib.request
body = json.dumps(payload, sort_keys=True).encode("utf-8")
headers = {
"Content-Type": "application/json",
"User-Agent": "orgstate-alert-check/1",
}
if secret_env:
secret = os.environ.get(secret_env)
if secret:
sig = hmac.new(secret.encode("utf-8"), body,
hashlib.sha256).hexdigest()
headers["X-OrgState-Signature"] = f"sha256={sig}"
req = urllib.request.Request(url, data=body, headers=headers,
method="POST")
try:
with urllib.request.urlopen(req, timeout=10) as resp:
return {"status_code": resp.status, "ok": 200 <= resp.status < 300}
except urllib.error.HTTPError as e:
return {"status_code": e.code, "ok": False, "error": str(e)}
except (urllib.error.URLError, OSError, TimeoutError) as e:
return {"status_code": None, "ok": False, "error": str(e)}
def _cmd_run_alert_check(svc: OrgStateService, args) -> int:
"""Stage 22: exit code 1 iff the run's diff against the previous run
contains any 'new' or 'severity_changed' entity at >= ``--threshold``
severity. exit 0 otherwise. Designed to wrap in cron / Slack so the
operator gets a ping only when something material changed.
'resolved' never triggers — an alert about something getting BETTER
is noise. 'unchanged' never triggers — nothing happened.
"""
threshold = args.threshold
if threshold not in _SEVERITY_RANK:
print(json.dumps({
"error": f"unknown threshold {threshold!r}; "
f"choose: {sorted(_SEVERITY_RANK)}",
}), file=sys.stderr)
return 2
run = svc.get_run(args.run_id)
if run is None:
print(json.dumps({"error": f"unknown run {args.run_id!r}"}),
file=sys.stderr)
return 2
try:
diff = svc.diff_runs(run["tenant_id"], args.run_id,
against_run_id=args.against)
except KeyError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 2
triggers = []
for item in diff.get("new", []):
if _meets_threshold(item.get("severity", ""), threshold):
triggers.append({"bucket": "new",
"entity_id": item.get("entity_id"),
"severity": item.get("severity")})
for item in diff.get("severity_changed", []):
# surface the post-change severity — that's what the alert is
# about (something went TO this severity)
if _meets_threshold(item.get("severity", ""), threshold):
triggers.append({
"bucket": "severity_changed",
"entity_id": item.get("entity_id"),
"severity": item.get("severity"),
"from_severity": item.get("from_severity"),
})
payload = {
"run_id": args.run_id,
"against_run_id": diff.get("against_run_id"),
"threshold": threshold,
"alert": bool(triggers),
"n_triggers": len(triggers),
"triggers": triggers,
}
# Stage 40: push notification on alert. Best-effort — HTTP
# failure goes into the payload as `webhook` so the operator can
# see what was attempted, but it does NOT change the exit code.
# The gate's job is the gate; the webhook is a courtesy notice.
if triggers and args.webhook:
payload["webhook"] = _post_webhook(
args.webhook, payload,
secret_env=args.webhook_secret_env,
)
# Stage 72: ALSO deliver to any per-tenant webhooks registered
# via `infra webhook create`. Independent of --webhook (which is
# a one-shot global URL) — Stage 71 registry is the persistent,
# per-tenant subscription model. Both fire when alert=true.
if triggers:
deliveries = svc.deliver_webhooks(run["tenant_id"], payload)
if deliveries:
payload["webhook_deliveries"] = deliveries
_emit(payload)
return 1 if triggers else 0
def _cmd_run_trigger(svc: OrgStateService, args) -> int:
"""Stage 31: kick off a pipeline run from stored observations and
emit the new RunRecord as JSON. Symmetric to
`calibration recalibrate` (Stage 30) — together they close the
silent-failure loop: detect (Stages 26-28) → fix (Stages 30-31).
Uses run_from_stored, so the same operator who pushed observations
via webhook (Stage 7d) or any other connector doesn't need to
re-pull data to analyse it. Exit 0 on success; exit 1 on the
service guardrails (unknown tenant, no observations, no
calibration); exit 2 on bad args (unknown vertical / entity_type).
"""
if svc.get_tenant(args.tenant_id) is None:
print(json.dumps({"error": f"unknown tenant {args.tenant_id!r}"}),
file=sys.stderr)
return 1
try:
from verticals import get_vertical_config
cfg = get_vertical_config(args.vertical).entity_type(args.entity_type)
except (KeyError, ValueError) as e:
print(json.dumps({
"error": f"unknown vertical/entity_type: {e}",
}), file=sys.stderr)
return 2
run = svc.run_from_stored(args.tenant_id, cfg, vertical=args.vertical)
# the result PipelineResult isn't JSON-friendly; emit the
# operator-facing handle instead — what they'd query next
payload = {
"run_id": run.run_id,
"tenant_id": run.tenant_id,
"entity_type": run.entity_type,
"vertical": args.vertical,
"n_states": run.n_states,
"n_issues": run.n_issues,
"n_decisions": run.n_decisions,
}
_emit(payload)
return 0
def _cmd_run_locks(svc: OrgStateService, args) -> int:
"""Stage 43: list every currently-held run lock.
The locks table is the operator's window into "what is running
right now". A long-held lock past its TTL is the smoking-gun for
a crashed process — the next acquire would reclaim it, but the
operator can spot it here without waiting.
"""
rows = svc.db.query_all(
"SELECT lock_key, owner_id, acquired_at, expires_at "
"FROM run_locks ORDER BY acquired_at"
)
from datetime import datetime, timezone
now_iso = datetime.now(timezone.utc).isoformat()
for r in rows:
# expose a derived field so the operator doesn't have to
# compare timestamps in their head
r["expired"] = r["expires_at"] < now_iso
_emit({"n_locks": len(rows), "locks": rows})
return 0
def _cmd_run_unlock(svc: OrgStateService, args) -> int:
"""Stage 43: manually release one run lock.
Without --force: refuses if the lock is still live (not expired),
releases if it's expired (same as cleanup_expired but scoped to
one key). With --force: deletes the row regardless of state.
Exit codes:
0 — lock was free, expired-and-reclaimed, or forced
1 — lock is live and --force was not given (refused)
2 — argparse errors (handled by argparse itself)
"""
lock_key = f"run:{args.tenant_id}:{args.entity_type}"
info = svc.locks.get(lock_key)
if info is None:
_emit({"lock_key": lock_key, "lock_status": "free",
"action": "noop"})
return 0
from datetime import datetime, timezone
now_iso = datetime.now(timezone.utc).isoformat()
expired = info["expires_at"] < now_iso
if expired or args.force:
svc.db.execute(
"DELETE FROM run_locks WHERE lock_key=?", (lock_key,),
)
_emit({
"lock_key": lock_key,
"lock_status": "expired" if expired else "held",
"action": "released",
"previous": info,
})
return 0
# live + not forced — refuse so the operator must confirm
print(json.dumps({
"lock_key": lock_key,
"lock_status": "held",
"action": "refused",
"reason": "lock is currently held; pass --force to release anyway "
"(only do this if you are certain no process is running)",
"lock_info": info,
}, indent=2, sort_keys=True, default=str), file=sys.stderr)
return 1
def _cmd_backfill(svc: OrgStateService, args) -> int:
"""Stage 65: point-in-time historical analysis. Walks cutoff days
over the requested range, runs the pipeline as-of each, emits
one row per cutoff. Read-only — no DB writes."""
if svc.get_tenant(args.tenant_id) is None:
print(json.dumps({"error": f"unknown tenant {args.tenant_id!r}"}),
file=sys.stderr)
return 1
try:
from verticals import get_vertical_config
cfg = get_vertical_config(args.vertical).entity_type(
args.entity_type,
)
except (KeyError, ValueError) as e:
print(json.dumps({
"error": f"unknown vertical/entity_type: {e}",
}), file=sys.stderr)
return 2
results = svc.backfill_from_stored(
args.tenant_id, cfg, vertical=args.vertical,
from_day=args.from_day, until_day=args.until_day,
step_days=args.step_days,
)
_emit({
"tenant_id": args.tenant_id,
"entity_type": args.entity_type,
"vertical": args.vertical,
"n_points": len(results),
"from_day": args.from_day,
"until_day": args.until_day,
"step_days": args.step_days,
"results": results,
})
return 0
def _cmd_run_preview(svc: OrgStateService, args) -> int:
"""Stage 37: dry-run analysis — what would the pipeline report
against the current stored state, WITHOUT writing anything?
Sibling to `run trigger` (Stage 31) that calls the same pipeline
but persists. Used for: 'will this change break things?',
debugging without polluting the runs table, eyeballing the
current issue set without creating a new run row that schedules
+ alert-check + dashboards will then react to.
"""
if svc.get_tenant(args.tenant_id) is None:
print(json.dumps({"error": f"unknown tenant {args.tenant_id!r}"}),
file=sys.stderr)
return 1
try:
from verticals import get_vertical_config
cfg = get_vertical_config(args.vertical).entity_type(args.entity_type)
except (KeyError, ValueError) as e:
print(json.dumps({
"error": f"unknown vertical/entity_type: {e}",
}), file=sys.stderr)
return 2
summary = svc.preview_from_stored(args.tenant_id, cfg,
vertical=args.vertical)
_emit(summary)
return 0
def _cmd_run_stale_check(svc: OrgStateService, args) -> int:
"""Stage 27: exit 1 iff any tenant has gone longer than
``--max-gap-hours`` since its last run started, OR has never run.
Companion to Stage 22's alert-check — that one fires when there's
drama in a recent run; this one fires when there's no recent run
at all (cron silently fell over, scheduler skipped a tenant).
"""
if args.max_gap_hours < 0:
print(json.dumps({
"error": f"--max-gap-hours must be >= 0, got {args.max_gap_hours}",
}), file=sys.stderr)
return 2
if args.tenant is not None and svc.get_tenant(args.tenant) is None:
print(json.dumps({"error": f"unknown tenant {args.tenant!r}"}),
file=sys.stderr)
return 2
stale = svc.list_stale_runs(
args.max_gap_hours,
tenant_id=args.tenant,
entity_type=args.entity_type,
)
payload = {
"max_gap_hours": args.max_gap_hours,
"tenant_id": args.tenant,
"entity_type": args.entity_type,
"n_stale": len(stale),
"stale": stale,
}
_emit(payload)
return 1 if stale else 0
def _cmd_calibration_recalibrate(svc: OrgStateService, args) -> int:
"""Stage 30: close the loop opened by Stages 26-29 — pull every
stored observation for (tenant, entity_type), derive a fresh
calibration, save it. This is the one-line response when the
health gate reports 'stale calibration'.
Args resolve a config via the vertical registry, so the operator
never has to write Python. Exit codes follow the rest of the CLI:
0 success, 1 service-layer guardrail (unknown tenant, too few
observations), 2 argparse / unknown vertical.
"""
if svc.get_tenant(args.tenant_id) is None:
print(json.dumps({"error": f"unknown tenant {args.tenant_id!r}"}),
file=sys.stderr)
return 1
try:
from verticals import get_vertical_config
cfg = get_vertical_config(args.vertical).entity_type(args.entity_type)
except (KeyError, ValueError) as e:
print(json.dumps({
"error": f"unknown vertical/entity_type: {e}",
}), file=sys.stderr)
return 2
summary = svc.recalibrate_from_stored(
args.tenant_id, cfg, vertical=args.vertical,
min_observations=args.min_observations,
actor=args.actor,
)
_emit(summary)
return 0
def _cmd_calibration_check(svc: OrgStateService, args) -> int:
"""Stage 26: exit 1 iff any stored calibration is older than
``--max-age-days``. The check is fail-loud — calibration that
silently rots makes the engine quietly less accurate, and the
operator needs to see that on the morning cron run.
Pass ``--tenant`` to scope the check to one tenant (operator
inspecting their own corner). Omit it for the platform-wide scan
(admin / oncall opening the morning dashboard).
"""
if args.max_age_days < 0:
print(json.dumps({
"error": f"--max-age-days must be >= 0, got {args.max_age_days}",
}), file=sys.stderr)
return 2
if args.tenant is not None and svc.get_tenant(args.tenant) is None:
print(json.dumps({"error": f"unknown tenant {args.tenant!r}"}),
file=sys.stderr)
return 2
stale = svc.list_stale_calibrations(
args.max_age_days, tenant_id=args.tenant,
)
payload = {
"max_age_days": args.max_age_days,
"tenant_id": args.tenant,
"n_stale": len(stale),
"stale": stale,
}
_emit(payload)
return 1 if stale else 0
def _cmd_health(svc: OrgStateService, args) -> int:
"""Stage 28: unified silent-failure gate.
Runs both Stage 26 (stale calibration) and Stage 27 (stale runs)
in one shot and emits one combined payload + one aggregate exit
code. The whole point is to give the operator a single line in
cron — three separate gates is three places for the wiring to
silently drift out of sync.
Exit code is the worst of the two: 0 iff both checks are clean,
1 if either has stale items, 2 if either gate itself failed
(unknown tenant, bad threshold).
"""
if args.max_age_days < 0:
print(json.dumps({
"error": f"--max-age-days must be >= 0, got {args.max_age_days}",
}), file=sys.stderr)
return 2
if args.max_gap_hours < 0:
print(json.dumps({
"error": f"--max-gap-hours must be >= 0, got {args.max_gap_hours}",
}), file=sys.stderr)
return 2
if args.tenant is not None and svc.get_tenant(args.tenant) is None:
print(json.dumps({"error": f"unknown tenant {args.tenant!r}"}),
file=sys.stderr)
return 2
stale_cal = svc.list_stale_calibrations(
args.max_age_days, tenant_id=args.tenant,
)
stale_runs = svc.list_stale_runs(
args.max_gap_hours, tenant_id=args.tenant,
)
n_total = len(stale_cal) + len(stale_runs)
payload = {
"tenant_id": args.tenant,
"healthy": n_total == 0,
"n_stale_total": n_total,
"checks": {
"calibration": {
"max_age_days": args.max_age_days,
"n_stale": len(stale_cal),
"stale": stale_cal,
},
"runs": {
"max_gap_hours": args.max_gap_hours,
"n_stale": len(stale_runs),
"stale": stale_runs,
},
},
}
_emit(payload)
return 1 if n_total else 0
def _cmd_source_test(svc, args) -> int:
"""Stage 33: connector dry-run.
Builds a Connector from (--type, config flags), fetches per
entity type, and reports observation counts / metric coverage /
day range. No DB writes — purely a shape-validation tool.
Onboarding pain this addresses: a new tenant's first CSV drop /
HTTP endpoint can have the wrong column names, wrong direction,
empty file, network broken — and today the operator only finds
out via 'no stored observations' after the analysis run fails.
This catches the same class of bug at the source end.
Exit 0 if at least one observation was fetched. Exit 1 if the
connector built fine but returned empty (the silent failure
this tool exists to catch). Exit 2 on bad args.
"""
from infra.ingestion.connectors import build_connector
# build the connector config from flags. Only the keys the
# underlying connector type understands are forwarded.
config: dict = {}
if args.vertical is not None:
config["vertical"] = args.vertical
if args.data_dir is not None:
config["data_dir"] = args.data_dir
if args.base_url is not None:
config["base_url"] = args.base_url
if args.bearer_env is not None:
config["bearer_env"] = args.bearer_env
try:
connector = build_connector(args.connector_type, config)
except (KeyError, ValueError) as e:
print(json.dumps({
"error": f"could not build connector: {e}",
}), file=sys.stderr)
return 2
# which entity types to test? --entity-type narrows; otherwise
# discover via connector.entity_types() (some connectors don't
# support discovery — fall back to a clear error).
if args.entity_type is not None:
types_to_test = [args.entity_type]
else:
types_to_test = connector.entity_types()
if not types_to_test:
print(json.dumps({
"error": f"connector {args.connector_type!r} does not support "
"entity-type discovery — pass --entity-type explicitly",
}), file=sys.stderr)
return 2
by_type: dict = {}
n_total = 0
for et in types_to_test:
try:
obs = connector.fetch(et)
except (KeyError, FileNotFoundError, ValueError) as e:
by_type[et] = {"error": str(e), "n_observations": 0}
continue
if not obs:
by_type[et] = {"n_observations": 0, "n_entities": 0}
continue
entities = {o.entity_id for o in obs}
days = sorted({o.day for o in obs})
metric_names = sorted({k for o in obs for k in o.values})
summary = {
"n_observations": len(obs),
"n_entities": len(entities),
"day_range": [days[0], days[-1]] if days else None,
"metrics": metric_names,
}
# operator-requested sample (a few real observations to
# eyeball the shape with)
if args.show_sample and args.show_sample > 0:
summary["sample"] = [
{"entity_id": o.entity_id, "day": o.day, "values": o.values}
for o in obs[: args.show_sample]
]
by_type[et] = summary
n_total += len(obs)
payload = {
"connector_type": args.connector_type,
"config": config,
"entity_types_tested": types_to_test,
"n_observations_total": n_total,
"by_type": by_type,
}
_emit(payload)
return 0 if n_total > 0 else 1
def _cmd_schedule_create(svc: OrgStateService, args) -> int:
"""Stage 36: register a new scheduled-ingestion job for a tenant.
The created schedule is then driven by `schedule tick` from cron."""
from infra.ingestion.scheduler import IngestionService
if svc.get_tenant(args.tenant_id) is None:
print(json.dumps({"error": f"unknown tenant {args.tenant_id!r}"}),
file=sys.stderr)
return 1
try:
connector_config = (
json.loads(args.connector_config) if args.connector_config else {}
)
except json.JSONDecodeError as e:
print(json.dumps({
"error": f"--connector-config must be valid JSON: {e}",
}), file=sys.stderr)
return 2
ing = IngestionService(svc)
schedule = ing.register_schedule(
tenant_id=args.tenant_id, vertical=args.vertical,
entity_type=args.entity_type, connector_type=args.connector_type,
connector_config=connector_config, frequency=args.frequency,
enabled=not args.disabled,
)
_emit(schedule)
return 0
def _cmd_schedule_list(svc: OrgStateService, args) -> int:
from infra.ingestion.scheduler import IngestionService
ing = IngestionService(svc)
rows = ing.list_schedules(tenant_id=args.tenant,
enabled_only=args.enabled_only)
# Stage 38: --status FILTER narrows to schedules whose latest
# tick had that outcome. The intent of `--status error` is "show
# me what's currently broken so I can fix it". A NULL last_status
# (never ticked) matches "--status never".
if args.status is not None:
wanted = args.status
if wanted == "never":
rows = [r for r in rows if r.get("last_status") is None]
else:
rows = [r for r in rows if r.get("last_status") == wanted]
_emit(rows)
return 0
def _cmd_schedule_get(svc: OrgStateService, args) -> int:
from infra.ingestion.scheduler import IngestionService
ing = IngestionService(svc)
schedule = ing.get_schedule(args.schedule_id)
if schedule is None:
print(json.dumps({
"error": f"unknown schedule {args.schedule_id!r}",
}), file=sys.stderr)
return 1
_emit(schedule)
return 0
def _cmd_schedule_set_enabled(svc: OrgStateService, args) -> int:
"""Used for both `enable` and `disable` — args.enabled is set by
the subparser's defaults so the same function serves both."""
from infra.ingestion.scheduler import IngestionService
ing = IngestionService(svc)
if ing.get_schedule(args.schedule_id) is None:
print(json.dumps({
"error": f"unknown schedule {args.schedule_id!r}",
}), file=sys.stderr)
return 1
ing.set_enabled(args.schedule_id, args.enabled)
_emit(ing.get_schedule(args.schedule_id))
return 0
def _cmd_schedule_tick(svc: OrgStateService, args) -> int:
"""Run every currently-due schedule exactly once. The cron-friendly
surface — failures on one schedule don't abort the others (the
underlying ``run_due`` collects exceptions per-schedule), and the
full result list is emitted as JSON.
Exit 0 if every schedule succeeded OR no schedules were due. Exit
1 if any schedule failed (status='error') — cron wrappers can pin
on this to alert. Skipped schedules (no calibration yet) do NOT
flip the exit — they're a known onboarding state, not a failure.
"""
from infra.ingestion.scheduler import IngestionService
ing = IngestionService(svc)
results = ing.run_due()
n_errors = sum(1 for r in results if r.get("status") == "error")
payload = {
"n_ran": len(results),
"n_ok": sum(1 for r in results if r.get("status") == "ok"),
"n_skipped": sum(1 for r in results if r.get("status") == "skipped"),
"n_errors": n_errors,
"results": results,
}
# Stage 41: push notification when schedules errored. Same
# pattern as alert-check (Stage 40) — silence on success days,
# POST only when there's something to act on. Skipped doesn't
# trigger (it's a known onboarding state, not a failure).
if n_errors > 0 and args.webhook:
payload["webhook"] = _post_webhook(
args.webhook, payload,
secret_env=args.webhook_secret_env,
)
# Stage 75: ALSO fire per-tenant registered webhooks (Stage 71)
# for each tenant that had a schedule error. Each tenant only
# sees its OWN errors — never sees other tenants'. The Stage 41
# --webhook flag stays global (operator's overall view); this
# is the customer-scoped notification.
if n_errors > 0:
# group error rows by tenant_id (Stage 75: tenant_id is in
# every result row from run_due now)
from collections import defaultdict
by_tenant = defaultdict(list)
for r in results:
if r.get("status") == "error":
by_tenant[r["tenant_id"]].append(r)
deliveries = []
for tid, tenant_errors in by_tenant.items():
scoped_payload = {
"event": "schedule_error",
"tenant_id": tid,
"n_errors": len(tenant_errors),
"errors": tenant_errors,
}
for entry in svc.deliver_webhooks(tid, scoped_payload):
entry["tenant_id"] = tid # so the operator's view
# of the response shows
# who got what
deliveries.append(entry)
if deliveries:
payload["webhook_deliveries"] = deliveries
_emit(payload)
return 1 if n_errors else 0
def _cmd_ops_summary(svc: OrgStateService, args) -> int:
"""Stage 44: bird's‑eye view of the platform in one command.
Thin CLI wrapper over ``svc.ops_summary`` (Stage 46 refactor —
same logic now also serves the HTTP endpoint, so the two surfaces
can never drift in what they report).
Always exits 0 — this is an info command, not a gate. The Stage
28 `health` gate is the one that fails-loud on stale state.
"""
_emit(svc.ops_summary(
max_age_days=args.max_age_days,
max_gap_hours=args.max_gap_hours,
))
return 0
def _cmd_webhook_create(svc: OrgStateService, args) -> int:
"""Stage 71: register a tenant outbound webhook. Optional --secret
is hashed before persistence; the raw value is shown ONCE in the
response and never retrievable again."""
if svc.get_tenant(args.tenant_id) is None:
print(json.dumps({"error": f"unknown tenant {args.tenant_id!r}"}),
file=sys.stderr)
return 1
try:
row = svc.register_webhook(
args.tenant_id, args.url,
secret=args.secret, actor=args.actor,
)
except ValueError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 2
_emit(row)
return 0
def _cmd_webhook_list(svc: OrgStateService, args) -> int:
if svc.get_tenant(args.tenant_id) is None:
print(json.dumps({"error": f"unknown tenant {args.tenant_id!r}"}),
file=sys.stderr)
return 1
rows = svc.list_webhooks(
args.tenant_id, enabled_only=args.enabled_only,
)
# never echo the secret hash back to the CLI — it's defense in
# depth (the hash alone doesn't grant access, but no reason to
# surface it routinely)
for r in rows:
r.pop("secret_hash", None)
_emit({"tenant_id": args.tenant_id, "webhooks": rows})
return 0
def _cmd_webhook_get(svc: OrgStateService, args) -> int:
row = svc.get_webhook(args.webhook_id)
if row is None:
print(json.dumps({"error": f"unknown webhook {args.webhook_id!r}"}),
file=sys.stderr)
return 1
row.pop("secret_hash", None)
_emit(row)
return 0
def _cmd_webhook_set_enabled(svc: OrgStateService, args) -> int:
"""Used for both `enable` and `disable` via set_defaults."""
changed = svc.set_webhook_enabled(
args.webhook_id, args.enabled, actor=args.actor,
)
if not changed:
print(json.dumps({
"error": f"unknown webhook {args.webhook_id!r}",
}), file=sys.stderr)
return 1
row = svc.get_webhook(args.webhook_id)
row.pop("secret_hash", None)
_emit(row)
return 0
def _cmd_webhook_test(svc: OrgStateService, args) -> int:
"""Stage 74: send a synthetic ping payload to a single webhook
so customer can verify their receiver works. Exit 0 if delivery
returned 2xx; exit 1 if non-2xx OR unknown webhook OR network
error (the gate semantics — caller knows whether to alert)."""
result = svc.test_webhook(args.webhook_id)
_emit(result)
return 0 if result.get("ok") else 1
def _cmd_webhook_delete(svc: OrgStateService, args) -> int:
ok = svc.delete_webhook(args.webhook_id, actor=args.actor)
if not ok:
print(json.dumps({
"error": f"unknown webhook {args.webhook_id!r}",
}), file=sys.stderr)
return 1
_emit({"webhook_id": args.webhook_id, "deleted": True})
return 0
# --- delivery log + retry (Stage 76) -----------------------------------
def _cmd_webhook_deliveries_list(svc: OrgStateService, args) -> int:
if args.webhook_id:
rows = svc.webhook_deliveries.list_for_webhook(
args.webhook_id, limit=args.limit,
)
elif args.tenant_id:
rows = svc.webhook_deliveries.list_for_tenant(
args.tenant_id, status=args.status, limit=args.limit,
)
else:
print(json.dumps({
"error": "either --tenant or --webhook is required",
}), file=sys.stderr)
return 1
_emit({"deliveries": rows, "n": len(rows)})
return 0
def _cmd_webhook_deliveries_get(svc: OrgStateService, args) -> int:
row = svc.webhook_deliveries.get(args.delivery_id)
if row is None:
print(json.dumps({
"error": f"unknown delivery {args.delivery_id!r}",
}), file=sys.stderr)
return 1
_emit(row)
return 0
def _cmd_webhook_deliveries_retry(svc: OrgStateService, args) -> int:
"""Stage 76: fire pending retriable failures.
--dry-run lists what WOULD be retried without sending — useful
for cron sanity-checks before a destructive scheduled job.
Exit code mirrors gate semantics: 0 if every retry succeeded
OR nothing was pending; 1 if any retry still failed."""
if args.dry_run:
rows = svc.webhook_deliveries.list_pending_retries(
args.tenant, max_attempts=args.max_attempts,
)[:args.limit]
_emit({"would_retry": rows, "n": len(rows), "dry_run": True})
return 0
results = svc.retry_failed_deliveries(
args.tenant, max_attempts=args.max_attempts,
limit=args.limit,
)
payload = {
"n_retried": len(results),
"n_ok": sum(1 for r in results if r.get("ok")),
"n_failed": sum(1 for r in results if not r.get("ok")),
"results": results,
}
_emit(payload)
return 0 if payload["n_failed"] == 0 else 1
def _cmd_webhook_retry_due(svc: OrgStateService, args) -> int:
"""Stage 122: cron-friendly retry pass. Only fires deliveries
whose retry_at is past (respects exponential backoff). Use
in a cron job (every 1-5 min); cheap when nothing's due.
Exit codes: 0 if all due deliveries succeeded OR nothing was
due; 1 if any retry still failed (including DLQ transitions).
"""
results = svc.deliver_due_retries(
limit=args.limit, max_attempts=args.max_attempts,
)
payload = {
"n_retried": len(results),
"n_ok": sum(1 for r in results if r.get("ok")),
"n_failed": sum(1 for r in results if not r.get("ok")),
"n_dlq": sum(1 for r in results if r.get("dlq")),
"results": results,
}
_emit(payload)
return 0 if payload["n_failed"] == 0 else 1
def _cmd_webhook_dlq_list(svc: OrgStateService, args) -> int:
"""Stage 122: dump the dead-letter queue for ops forensics."""
rows = svc.webhook_dlq_list(
tenant_id=args.tenant, limit=args.limit,
)
_emit({"n": len(rows), "rows": rows})
return 0
def _cmd_webhook_dlq_purge(svc: OrgStateService, args) -> int:
"""Stage 136: delete DLQ rows older than --ttl-days.
Recommended cron cadence: daily.
Exit: 0 on success (count printed); 1 on validation failure.
--dry-run lists what would be purged without deleting."""
try:
result = svc.webhook_dlq_purge(
ttl_days=args.ttl_days,
tenant_id=args.tenant,
dry_run=args.dry_run,
actor=args.actor,
)
except ValueError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
_emit(result)
return 0
def _cmd_webhook_dlq_retry(svc: OrgStateService, args) -> int:
"""Stage 122: revive a single DLQ row as a fresh attempt
(attempt=1, new retry window). Use when ops fixed whatever
broke at the receiver — single-shot recovery without
touching SQL.
Exit: 0 if new delivery returned ok; 1 if the new attempt
failed (will be picked up by the regular cron) or input
was invalid."""
new = svc.webhook_dlq_retry(args.delivery_id, actor=args.actor)
if new is None:
print(json.dumps({
"error": f"unknown DLQ delivery {args.delivery_id!r} "
f"(or webhook deleted/disabled)",
}), file=sys.stderr)
return 1
_emit(new)
return 0 if new.get("status") == "ok" else 1
def _cmd_webhook_backoff_set(svc: OrgStateService, args) -> int:
"""Stage 129: install a custom per-webhook backoff schedule.
Pass --schedule as JSON list of positive integer seconds,
e.g. '[30, 120, 600]'. Use webhook backoff-clear to revert."""
try:
schedule = json.loads(args.schedule)
except ValueError as e:
print(json.dumps({"error": f"--schedule is not valid JSON: {e}"}),
file=sys.stderr)
return 1
try:
row = svc.set_webhook_backoff_schedule(
args.webhook_id, schedule, actor=args.actor,
)
except ValueError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
if row is None:
print(json.dumps({
"error": f"unknown webhook {args.webhook_id!r}",
}), file=sys.stderr)
return 1
_emit(row)
return 0
def _cmd_webhook_backoff_clear(svc: OrgStateService, args) -> int:
"""Stage 129: drop the per-webhook override; falls back to
the global default schedule."""
row = svc.clear_webhook_backoff_schedule(
args.webhook_id, actor=args.actor,
)
if row is None:
print(json.dumps({
"error": f"unknown webhook {args.webhook_id!r}",
}), file=sys.stderr)
return 1
_emit(row)
return 0
def _cmd_webhook_jitter_set(svc: OrgStateService, args) -> int:
"""Stage 142: install a per-webhook jitter window (max
±N seconds added to retry_at per attempt). 1..3600."""
try:
row = svc.set_webhook_jitter(
args.webhook_id, args.value, actor=args.actor,
)
except ValueError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
if row is None:
print(json.dumps({
"error": f"unknown webhook {args.webhook_id!r}",
}), file=sys.stderr)
return 1
_emit(row)
return 0
def _cmd_webhook_jitter_clear(svc: OrgStateService, args) -> int:
"""Stage 142: drop the jitter override; deterministic
schedule again."""
row = svc.clear_webhook_jitter(
args.webhook_id, actor=args.actor,
)
if row is None:
print(json.dumps({
"error": f"unknown webhook {args.webhook_id!r}",
}), file=sys.stderr)
return 1
_emit(row)
return 0
def _cmd_webhook_jitter_show(svc: OrgStateService, args) -> int:
"""Stage 142: show the per-webhook jitter setting."""
wh = svc.get_webhook(args.webhook_id)
if wh is None:
print(json.dumps({
"error": f"unknown webhook {args.webhook_id!r}",
}), file=sys.stderr)
return 1
override = svc.get_webhook_jitter(args.webhook_id)
_emit({
"webhook_id": args.webhook_id,
"jitter_seconds": override,
"effective": override or 0,
"follows_global_default": override is None,
})
return 0
def _cmd_webhook_max_attempts_set(svc: OrgStateService, args) -> int:
"""Stage 135: install a custom per-webhook retry cap. Value
must be integer 1..50."""
try:
row = svc.set_webhook_max_attempts(
args.webhook_id, args.value, actor=args.actor,
)
except ValueError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
if row is None:
print(json.dumps({
"error": f"unknown webhook {args.webhook_id!r}",
}), file=sys.stderr)
return 1
_emit(row)
return 0
def _cmd_webhook_max_attempts_clear(svc: OrgStateService, args) -> int:
"""Stage 135: drop the per-webhook cap; falls back to the
global default (5)."""
row = svc.clear_webhook_max_attempts(
args.webhook_id, actor=args.actor,
)
if row is None:
print(json.dumps({
"error": f"unknown webhook {args.webhook_id!r}",
}), file=sys.stderr)
return 1
_emit(row)
return 0
def _cmd_webhook_max_attempts_show(svc: OrgStateService, args) -> int:
"""Stage 135: show the effective retry cap (override OR
global default)."""
wh = svc.get_webhook(args.webhook_id)
if wh is None:
print(json.dumps({
"error": f"unknown webhook {args.webhook_id!r}",
}), file=sys.stderr)
return 1
override = svc.get_webhook_max_attempts(args.webhook_id)
default = OrgStateService._DEFAULT_MAX_ATTEMPTS
effective = override if override is not None else default
_emit({
"webhook_id": args.webhook_id,
"override": override,
"global_default": default,
"effective": effective,
"follows_global_default": override is None,
})
return 0
def _cmd_webhook_backoff_show(svc: OrgStateService, args) -> int:
"""Stage 129: read the current schedule for a webhook. Shows
both the effective schedule (per-webhook override OR global
default) and which one is in force."""
wh = svc.get_webhook(args.webhook_id)
if wh is None:
print(json.dumps({
"error": f"unknown webhook {args.webhook_id!r}",
}), file=sys.stderr)
return 1
override = svc.get_webhook_backoff_schedule(args.webhook_id)
global_schedule = list(OrgStateService._RETRY_BACKOFF_SECONDS)
effective = override if override is not None else global_schedule
_emit({
"webhook_id": args.webhook_id,
"override": override,
"global_default": global_schedule,
"effective": effective,
"follows_global_default": override is None,
})
return 0
# --- backup / restore (Stage 81) ------------------------------------
def _cmd_backup_create(svc, args) -> int:
"""svc is None — needs_db=False. We open the source DB
ourselves inside backup_sqlite so we don't accidentally
hold open a SqliteDatabase against the source that would
fight with the VACUUM INTO copy."""
from infra.storage.backup import BackupError, backup_sqlite
try:
result = backup_sqlite(args.db, args.out)
except BackupError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
_emit(result)
return 0
def _cmd_backup_verify(svc, args) -> int:
from infra.storage.backup import BackupError, verify_sqlite
try:
result = verify_sqlite(args.infile)
except BackupError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
_emit(result)
return 0
def _cmd_backup_restore(svc, args) -> int:
"""OFFLINE restore. Operator MUST stop the service first
(we can't enforce this; SQLite has no advisory locks across
processes). Refuses if target has data unless --force."""
from infra.storage.backup import BackupError, restore_sqlite
try:
result = restore_sqlite(args.infile, args.db, force=args.force)
except BackupError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
_emit(result)
return 0
# --- data retention auto-purge (Stage 91) ---------------------------
def _cmd_retention_show(svc: OrgStateService, args) -> int:
"""Show the active retention policy + a dry-run count of what
each policy would delete. Operator runs this to preview before
scheduling `retention purge`."""
from infra.retention import RetentionPolicy, known_tables
try:
policy = RetentionPolicy.from_env()
except ValueError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
result = svc.purge_per_policy(policy=policy, dry_run=True)
# also list tables with NO policy set so the operator sees
# the full picture (e.g. "audit_logs: not set" surfaces the
# need to configure it)
not_configured = [t for t in known_tables()
if t not in policy.windows]
result["not_configured"] = not_configured
_emit(result)
return 0
def _cmd_retention_purge(svc: OrgStateService, args) -> int:
"""Execute the retention policy. Dry-run with `--dry-run`
(same as `retention show`); without it, actually deletes
and writes an audit row. Empty policy → exit 0 with
n_deleted=0 (cron-friendly noop)."""
from infra.retention import RetentionPolicy
try:
policy = RetentionPolicy.from_env()
except ValueError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
if policy.is_empty:
# operator hasn't configured any retention yet — refuse to
# be a silent noop if --strict; otherwise emit zero counts
if args.strict:
print(json.dumps({
"error": (
"no retention policy configured — set "
"ORGSTATE_RETENTION_<TABLE>_DAYS env vars first"
),
}), file=sys.stderr)
return 1
result = svc.purge_per_policy(
policy=policy, dry_run=args.dry_run, actor=args.actor,
)
_emit(result)
return 0
# --- billing pipeline (Stage 92) ------------------------------------
def _cmd_billing_invoice(svc: OrgStateService, args) -> int:
"""Generate (or re-generate) an invoice for (tenant, period).
Idempotent — re-runs overwrite line_items + total but keep
invoice_id stable."""
try:
result = svc.generate_invoice(
args.tenant_id, period=args.period,
plan_name=args.plan, actor=args.actor,
)
except ValueError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
_emit(result)
return 0
def _cmd_billing_get(svc: OrgStateService, args) -> int:
try:
row = svc.get_invoice(args.tenant_id, args.period)
except ValueError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
if row is None:
print(json.dumps({
"error": f"no invoice for {args.tenant_id!r} "
f"period {args.period!r}",
}), file=sys.stderr)
return 1
_emit(row)
return 0
def _cmd_billing_list(svc: OrgStateService, args) -> int:
try:
rows = svc.list_invoices(args.tenant_id)
except ValueError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
_emit({"tenant_id": args.tenant_id, "invoices": rows,
"n": len(rows)})
return 0
def _cmd_billing_push(svc: OrgStateService, args) -> int:
"""Stage 99: push a generated invoice to Stripe. Idempotent —
re-runs return the cached external_invoice_id without
re-pushing. Operator supplies --customer (Stripe cus_...);
storing per-tenant customer mapping is Stage 99.1 territory."""
from infra.billing_stripe import StripeUnavailable
try:
result = svc.push_invoice_to_external(
args.tenant_id, args.period,
customer_id=args.customer,
provider=args.provider,
actor=args.actor,
)
except (ValueError, StripeUnavailable) as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
_emit(result)
return 0
def _cmd_encryption_rotate_keys(svc: OrgStateService, args) -> int:
"""Stage 114: re-encrypt every encrypted column under the
ACTIVE (first) Fernet key in the rotation. Idempotent;
rows already under active key are skipped.
Operator flow:
1. set ORGSTATE_FIELD_ENCRYPTION_KEY=NEW,OLD (CSV)
2. restart
3. `infra encryption rotate-keys --dry-run` to count
work
4. `infra encryption rotate-keys` to apply
5. when output shows 0 rewrites, drop OLD from the env"""
try:
result = svc.rotate_field_encryption_keys(
dry_run=args.dry_run, actor=args.actor,
)
except ValueError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
_emit(result)
return 0
def _cmd_billing_reconcile(svc: OrgStateService, args) -> int:
"""Stage 124: pull recent Stripe invoices, recover any local
rows still 'issued' that Stripe shows 'paid'. Use in a cron
(daily / hourly depending on volume). Idempotent.
Exit: 0 always — reconciliation either no-ops (in sync)
or recovers (drift fixed). Either result is "success".
Non-zero exit reserved for real failures (Stripe config /
network)."""
from infra.billing_stripe import StripeUnavailable
try:
result = svc.reconcile_stripe_invoices(
since_seconds_ago=args.since_seconds,
limit=args.limit,
actor=args.actor,
)
except StripeUnavailable as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
_emit(result)
return 0
def _cmd_billing_webhook_replay(svc: OrgStateService, args) -> int:
"""Stage 106: replay a Stripe webhook event from a JSON file.
Escape hatch for ops when Stripe's automatic retry isn't
enough (e.g. our endpoint was down past the retry window,
or we need to back-fill a 'paid' status manually).
The file MUST be a Stripe Event payload — exactly what the
HTTP receiver would have parsed after signature verification.
Use `stripe events list --limit N` and pipe one event to
a file. NO signature verification here — the operator IS
the authority; this is the manual override path."""
from infra.billing_stripe import StripeWebhookError, handle_event
try:
with open(args.event_file) as f:
event = json.load(f)
except (OSError, ValueError) as e:
print(json.dumps({"error": f"cannot read event: {e}"}),
file=sys.stderr)
return 1
try:
result = handle_event(svc, event)
except StripeWebhookError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
_emit(result)
return 0
def _cmd_billing_plans(svc, args) -> int:
"""List the named plans available. needs_db=False — pure
constants from infra.billing."""
from infra.billing import PLANS
out = []
for name, plan in sorted(PLANS.items()):
out.append({
"name": plan.name,
"currency": plan.currency,
"lines": {
metric: {
"free_quota": line.free_quota,
"price_cents_per_unit": line.price_cents_per_unit,
}
for metric, line in plan.lines.items()
},
})
_emit({"plans": out})
return 0
# --- SSO providers (Stage 97) ---------------------------------------
def _cmd_sso_provider_create(svc: OrgStateService, args) -> int:
"""Register an SSO provider for a tenant.
--type oidc (default): client_secret is REQUIRED and shown
ONCE in the response. Stage 97 behavior unchanged.
--type saml (Stage 102): --sso-url + --x509-cert-pem-file are
REQUIRED instead; client_secret is ignored (SAML uses
asymmetric signing — no shared secret). --issuer-url
becomes the IdP EntityID; --client-id becomes the SP
EntityID."""
provider_type = getattr(args, "type", "oidc")
try:
if provider_type == "saml":
cert_path = getattr(args, "x509_cert_pem_file", None)
if not cert_path:
raise ValueError(
"--x509-cert-pem-file is required for "
"--type saml (path to the IdP's signing cert "
"in PEM format)",
)
try:
with open(cert_path) as fh:
cert_pem = fh.read()
except OSError as exc:
raise ValueError(
f"cannot read cert file {cert_path!r}: {exc}",
) from exc
sso_url = getattr(args, "sso_url", None)
if not sso_url:
raise ValueError(
"--sso-url is required for --type saml "
"(the IdP's SAML SSO endpoint, often called "
"'SSO URL' or 'Login URL' in the IdP admin "
"console)",
)
result = svc.register_saml_provider(
args.tenant_id, args.name,
idp_entity_id=args.issuer_url,
sp_entity_id=args.client_id,
sso_url=sso_url,
x509_cert_pem=cert_pem,
allowed_email_domains=args.allowed_email_domains or "",
actor=args.actor,
)
else:
result = svc.register_sso_provider(
args.tenant_id, args.name, args.issuer_url,
args.client_id, args.client_secret,
allowed_email_domains=args.allowed_email_domains or "",
actor=args.actor,
)
except ValueError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
_emit(result)
return 0
def _cmd_sso_provider_list(svc: OrgStateService, args) -> int:
try:
rows = svc.list_sso_providers(args.tenant_id)
except ValueError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
_emit({"tenant_id": args.tenant_id, "providers": rows,
"n": len(rows)})
return 0
def _cmd_sso_provider_get(svc: OrgStateService, args) -> int:
row = svc.get_sso_provider(args.provider_id)
if row is None:
print(json.dumps({
"error": f"unknown SSO provider {args.provider_id!r}",
}), file=sys.stderr)
return 1
_emit(row)
return 0
def _cmd_sso_provider_delete(svc: OrgStateService, args) -> int:
if args.confirm != args.provider_id:
print(json.dumps({
"error": (
f"deletion of SSO provider {args.provider_id!r} "
f"requires --confirm {args.provider_id}. Existing "
f"sessions stay valid until they expire — revoke "
f"via `sso session revoke` if you need to kill them."
),
}), file=sys.stderr)
return 1
ok = svc.delete_sso_provider(args.provider_id,
actor=args.actor)
if not ok:
print(json.dumps({
"error": f"unknown SSO provider {args.provider_id!r}",
}), file=sys.stderr)
return 1
_emit({"provider_id": args.provider_id, "deleted": True})
return 0
# --- SSO sessions (Stage 98) ---------------------------------------
def _cmd_sso_session_list(svc: OrgStateService, args) -> int:
"""List SSO sessions for a tenant. Defaults to active only;
--include-revoked / --include-expired widen for forensics."""
try:
rows = svc.list_sso_sessions(
args.tenant_id,
user_email=args.user_email,
include_revoked=args.include_revoked,
include_expired=args.include_expired,
limit=args.limit,
)
except ValueError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
_emit({"tenant_id": args.tenant_id, "sessions": rows,
"n": len(rows)})
return 0
def _cmd_sso_session_revoke(svc: OrgStateService, args) -> int:
"""Revoke ONE session by token (operator pastes from list)
OR ALL sessions for a (tenant, user_email) pair. Exit codes:
0 always for the single-token form (matches existing logout
idempotency); for the bulk form, exit 0 with revoked_count
even when 0 (cron-friendly — runs every shift change)."""
if args.token:
ok = svc.revoke_sso_session(args.token)
_emit({"token_prefix": args.token[:12] + "…",
"revoked": ok})
return 0
if not args.tenant_id or not args.user_email:
print(json.dumps({
"error": (
"either --token, or both --tenant + --user-email, "
"must be provided"
),
}), file=sys.stderr)
return 1
try:
n = svc.revoke_sso_sessions_for_user(
args.tenant_id, args.user_email, actor=args.actor,
)
except ValueError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
_emit({
"tenant_id": args.tenant_id,
"user_email": args.user_email,
"revoked_count": n,
})
return 0
def _cmd_sso_session_purge(svc: OrgStateService, args) -> int:
"""Delete sessions past their expires_at. Cron-friendly:
always exit 0 with count, audit row only when count > 0."""
n = svc.purge_expired_sso_sessions(actor=args.actor)
_emit({"deleted_count": n})
return 0
# --- security posture (Stage 95) ------------------------------------
def _cmd_security_encryption_status(svc, args) -> int:
"""Report current encryption-at-rest posture from env vars.
Exits 0 always by default (status command, not a gate). Use
`--strict` to exit 1 when any gap exists — cron-friendly for
catching misconfigured deploys before they ship to customers."""
import os
from infra.security import current_status
status = current_status(
env=dict(os.environ),
db_dsn=args.db,
)
_emit(status.to_dict())
if args.strict and status.gaps:
return 1
return 0
def _cmd_security_kms_refresh(svc: OrgStateService, args) -> int:
"""Stage 143: re-fetch the encryption key from KMS. Idempotent
— emits ``{"changed": false}`` when the fetched bytes match
the in-process key (the cron's normal day). Operator schedules
this every ~15 min so KMS rotation propagates without
restart.
Exit codes:
0 — succeeded (whether changed or not).
1 — KMS fetch failed (network / IAM / invalid fetched key).
2 — KMS not configured (env vars missing). Operator
probably ran on the wrong host."""
result = svc.refresh_field_encryption_from_kms(actor=args.actor)
_emit(result)
if result.get("reason") == "kms_not_configured":
return 2
if result.get("reason") == "fetch_failed":
return 1
return 0
def _cmd_audit_list(svc: OrgStateService, args) -> int:
rows = svc.audit.list(
actor=args.actor, action=args.action_filter, tenant_id=args.tenant,
since=args.since, until=args.until,
limit=args.limit, offset=args.offset,
)
_emit(rows)
return 0
def _cmd_backup(svc: OrgStateService, args) -> int:
"""SQLite snapshot via the sqlite3 backup API — consistent even while
the DB is being written to. For Postgres we explicitly refuse rather
than half-implementing pg_dump in Python; the operator's tooling is
almost certainly better at it.
"""
if svc.db.backend != "sqlite":
print(json.dumps({
"error": (
f"backup is sqlite-only; for backend {svc.db.backend!r} use "
"the native dump tool (e.g. pg_dump for Postgres)"
),
}), file=sys.stderr)
return 1
import sqlite3
out_path = Path(args.out)
out_path.parent.mkdir(parents=True, exist_ok=True)
dest = sqlite3.connect(str(out_path))
try:
# sqlite3.Connection.backup is the online backup API — consistent
# snapshot, works while the source is being read/written.
svc.db.conn.backup(dest)
finally:
dest.close()
size = out_path.stat().st_size
_emit({
"backup_path": str(out_path.resolve()),
"bytes": size,
"backend": svc.db.backend,
})
return 0
# --- parser ---------------------------------------------------------------
def _build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
prog="python -m infra",
description="OrgState operator CLI (Stage 9).",
)
p.add_argument(
"--db", default=None,
help="DB target — a path, ':memory:', or a postgresql:// URL. "
"Defaults to ORGSTATE_DB_PATH or the on-disk SQLite default.",
)
sub = p.add_subparsers(dest="command", required=True, metavar="<command>")
# --- onboard wizard (Stage 85) -----------------------------------
# Top-level (not nested under tenant) because customer success
# discovers `infra --help` first; "onboard" reads at the same
# level as "tenant" / "key" — it's a workflow, not an entity.
onp = sub.add_parser(
"onboard",
help="bundled new-tenant setup — register + mint admin key + "
"emit a paste-ready summary for the customer email "
"(Stage 85)",
)
onp.add_argument("tenant_id",
help="short stable identifier — used in URLs "
"and audit logs; e.g. 'acme', 'globex'")
onp.add_argument("name",
help="human-readable name for dashboards")
onp.add_argument("--vertical", default="logistics",
help="which vertical adapter — default logistics")
onp.add_argument("--mint-operator", action="store_true",
dest="mint_operator",
help="also mint an operator-role key for "
"programmatic ingestion (SDK use)")
onp.add_argument("--resume", action="store_true",
help="tenant already exists — mint a fresh "
"admin key without re-registering (e.g. "
"customer lost their key)")
onp.add_argument("--base-url", default="http://localhost:8000",
dest="base_url",
help="base URL substituted into curl examples "
"in next_steps (default localhost; pass "
"your prod URL for paste-ready emails)")
onp.add_argument("--admin-key-name", default="onboarding-admin",
dest="admin_key_name",
help="audit-log name for the minted admin key")
onp.add_argument("--actor", default="customer_success",
help="audit-log actor name for the operations "
"this command performs")
onp.set_defaults(func=_cmd_onboard)
# --- usage meter + soft caps (Stage 89) --------------------------
usp = sub.add_parser(
"usage",
help="per-tenant usage meter + soft caps (Stage 89). "
"Tracks runs + observations per month; quotas warn "
"via audit on crossing (don't block).",
)
ussub = usp.add_subparsers(dest="action", required=True,
metavar="<action>")
us_show = ussub.add_parser(
"show",
help="show usage counts + soft caps for a tenant in a period",
)
us_show.add_argument("tenant_id")
us_show.add_argument(
"--period", default=None,
help="YYYY-MM (UTC). Default: current month.",
)
us_show.set_defaults(func=_cmd_usage_show)
us_cap = ussub.add_parser(
"set-cap",
help="set or clear a soft cap on (tenant, metric)",
)
us_cap.add_argument("tenant_id")
us_cap.add_argument("metric",
help="metric name — 'runs' or 'observations' today")
us_cap.add_argument(
"--limit", type=int, default=None,
help="soft cap (>=0). Omit to CLEAR (unlimited).",
)
us_cap.add_argument("--actor", default="operator")
us_cap.set_defaults(func=_cmd_usage_set_cap)
us_caps = ussub.add_parser(
"list-caps",
help="list all soft caps configured for a tenant",
)
us_caps.add_argument("tenant_id")
us_caps.set_defaults(func=_cmd_usage_list_caps)
# --- tenant ------------------------------------------------------
tp = sub.add_parser("tenant", help="tenant management")
tsub = tp.add_subparsers(dest="action", required=True, metavar="<action>")
p1 = tsub.add_parser("register", help="register a new tenant")
p1.add_argument("tenant_id")
p1.add_argument("name")
p1.add_argument("--vertical", default="logistics")
p1.set_defaults(func=_cmd_tenant_register)
p2 = tsub.add_parser("list", help="list every tenant")
p2.set_defaults(func=_cmd_tenant_list)
p3 = tsub.add_parser("get", help="get one tenant")
p3.add_argument("tenant_id")
p3.set_defaults(func=_cmd_tenant_get)
# Stage 52: pause/resume — set tenants.status. Two parsers share
# _cmd_tenant_set_status via set_defaults(status=...).
p_pause = tsub.add_parser(
"pause",
help="pause a tenant: writes (run/calibrate/ingest/schedule) "
"are rejected; reads still work (Stage 52)",
)
p_pause.add_argument("tenant_id")
p_pause.add_argument("--actor", default="operator")
p_pause.set_defaults(func=_cmd_tenant_set_status, status="paused")
p_resume = tsub.add_parser(
"resume",
help="resume a paused tenant — writes go back to normal",
)
p_resume.add_argument("tenant_id")
p_resume.add_argument("--actor", default="operator")
p_resume.set_defaults(func=_cmd_tenant_set_status, status="active")
# Stage 58: per-tenant severity threshold overrides. Three verbs
# under `tenant threshold`: set / get / clear. Overrides merge
# with the vertical's defaults at runtime — caller declares only
# what they want different.
p_thr = tsub.add_parser(
"threshold",
help="per-tenant severity threshold overrides (Stage 58)",
)
thr_sub = p_thr.add_subparsers(dest="threshold_action",
required=True,
metavar="<action>")
thr_set = thr_sub.add_parser(
"set",
help="set per-tenant severity overrides for one entity_type "
"(only the levels passed are overridden)",
)
thr_set.add_argument("tenant_id")
thr_set.add_argument("--entity-type", required=True,
dest="entity_type")
thr_set.add_argument("--critical", type=float, default=None,
help="override critical score threshold "
"(e.g. 0.85)")
thr_set.add_argument("--high", type=float, default=None)
thr_set.add_argument("--medium", type=float, default=None)
thr_set.add_argument("--low", type=float, default=None)
thr_set.add_argument("--actor", default="operator")
thr_set.set_defaults(func=_cmd_tenant_threshold_set)
thr_get = thr_sub.add_parser(
"get",
help="show per-tenant overrides — pass --entity-type for one "
"type, omit for all of the tenant's overrides",
)
thr_get.add_argument("tenant_id")
thr_get.add_argument("--entity-type", default=None,
dest="entity_type")
thr_get.set_defaults(func=_cmd_tenant_threshold_get)
thr_clear = thr_sub.add_parser(
"clear",
help="remove per-tenant overrides for one entity_type — "
"revert to vertical defaults",
)
thr_clear.add_argument("tenant_id")
thr_clear.add_argument("--entity-type", required=True,
dest="entity_type")
thr_clear.add_argument("--actor", default="operator")
thr_clear.set_defaults(func=_cmd_tenant_threshold_clear)
# Stage 61: per-tenant drift_weights overrides — symmetric to
# threshold. `set` upserts (merges with existing); `clear` removes
# only the drift_weights key (preserves severity_thresholds if
# both are set). For `get`, use `tenant threshold get` — that
# already shows the full overrides dict.
p_weights = tsub.add_parser(
"weights",
help="per-tenant drift signal weight overrides (Stage 61)",
)
w_sub = p_weights.add_subparsers(dest="weights_action",
required=True,
metavar="<action>")
w_set = w_sub.add_parser(
"set",
help="override drift weights — only signals passed (--delta "
"etc.) are overridden, the rest stay defaults",
)
w_set.add_argument("tenant_id")
w_set.add_argument("--entity-type", required=True,
dest="entity_type")
w_set.add_argument("--delta", type=float, default=None,
help="weight for Δ (directional change)")
w_set.add_argument("--psi", type=float, default=None,
help="weight for ψ (stability)")
w_set.add_argument("--xi", type=float, default=None,
help="weight for ξ (volatility)")
w_set.add_argument("--gamma", type=float, default=None,
help="weight for γ (anomaly)")
w_set.add_argument("--kappa", type=float, default=None,
help="weight for κ (coherence)")
w_set.add_argument("--actor", default="operator")
w_set.set_defaults(func=_cmd_tenant_weights_set)
w_clear = w_sub.add_parser(
"clear",
help="remove ONLY the drift_weights override (preserves "
"severity_thresholds if also set)",
)
w_clear.add_argument("tenant_id")
w_clear.add_argument("--entity-type", required=True,
dest="entity_type")
w_clear.add_argument("--actor", default="operator")
w_clear.set_defaults(func=_cmd_tenant_weights_clear)
p4 = tsub.add_parser(
"export",
help="dump everything we hold for a tenant (compliance / offboarding)",
)
p4.add_argument("tenant_id")
p4.add_argument("--out", default=None,
help="write the JSON to FILE instead of stdout")
p4.set_defaults(func=_cmd_tenant_export)
# Stage 82: GDPR right-to-erasure. Cascade-delete every row
# belonging to <tenant_id>. Double-confirm via --confirm <tid>
# matching the positional — operator typo "acme" vs "acne"
# caught here, not in production aftermath.
p_del = tsub.add_parser(
"delete",
help="CASCADE-DELETE everything for a tenant (Stage 82 / "
"GDPR Article 17). Requires --confirm <tenant_id>. "
"Best to `infra db backup` first.",
)
p_del.add_argument("tenant_id")
p_del.add_argument(
"--confirm", required=True,
help="must match <tenant_id> exactly — typo-protection on a "
"destructive operation",
)
p_del.add_argument("--actor", default="operator")
p_del.set_defaults(func=_cmd_tenant_delete)
# Stage 105: per-tenant billing config. Closes the V1.2
# billing chain so operators stop passing --plan and
# --customer on every `billing invoice` / `billing push`.
p_bcfg = tsub.add_parser(
"billing-config",
help="store the tenant's default plan + Stripe customer "
"(Stage 105). Subsequent `billing invoice` / `billing "
"push` calls inherit these unless overridden.",
)
p_bcfg.add_argument("tenant_id")
p_bcfg.add_argument(
"--plan", default=None,
help="default plan name (e.g. starter / growth / "
"enterprise). Pass empty string to clear.",
)
p_bcfg.add_argument(
"--stripe-customer", default=None,
dest="stripe_customer",
help="default Stripe customer id (cus_...). Pass empty "
"string to clear.",
)
p_bcfg.add_argument("--actor", default="operator")
p_bcfg.set_defaults(func=_cmd_tenant_billing_config)
# Stage 109 — per-tenant plan overrides (custom contracts)
p_pov_set = tsub.add_parser(
"plan-override-set",
help="Stage 109: bespoke per-metric pricing override. "
"Layers on top of the base plan; NULL fields inherit. "
"Use plan-override-clear to drop a row.",
)
p_pov_set.add_argument("tenant_id")
p_pov_set.add_argument("metric",
help="metric name (e.g. 'runs', 'observations')")
p_pov_set.add_argument(
"--free-quota", type=int, default=None,
dest="free_quota",
help="override the free units before billing kicks in",
)
p_pov_set.add_argument(
"--price-cents-per-unit", type=int, default=None,
dest="price_cents_per_unit",
help="flat-mode override of the per-unit price",
)
p_pov_set.add_argument(
"--tiers", default=None,
help='tiered-mode override; JSON list of [cap, rate] pairs, '
'e.g. \'[[10000,3],[100000,2],[null,1]]\' (null='
'unbounded final tier). Takes precedence over '
'--price-cents-per-unit.',
)
p_pov_set.add_argument(
"--note", default=None,
help="human-readable note (contract ref, ticket id)",
)
p_pov_set.add_argument("--actor", default="operator")
p_pov_set.set_defaults(func=_cmd_tenant_plan_override_set)
p_pov_clear = tsub.add_parser(
"plan-override-clear",
help="Stage 109: remove a per-tenant override row "
"(metric falls back to base plan).",
)
p_pov_clear.add_argument("tenant_id")
p_pov_clear.add_argument("metric")
p_pov_clear.add_argument("--actor", default="operator")
p_pov_clear.set_defaults(func=_cmd_tenant_plan_override_clear)
p_pov_show = tsub.add_parser(
"plan-override-show",
help="Stage 109: dump override rows + the resolved "
"effective plan for a tenant.",
)
p_pov_show.add_argument("tenant_id")
p_pov_show.set_defaults(func=_cmd_tenant_plan_override_show)
p_bshow = tsub.add_parser(
"billing-show",
help="show the tenant's stored billing config "
"(Stage 105) — plan_name + stripe_customer_id",
)
p_bshow.add_argument("tenant_id")
p_bshow.set_defaults(func=_cmd_tenant_billing_show)
# Stage 128 — per-tenant rate-limit fail-closed override
p_rl_set = tsub.add_parser(
"rate-limit-fail-closed-set",
help="Stage 128: opt the tenant into / out of rate-limit "
"fail-closed mode. Per-tenant override beats the "
"process-wide ORGSTATE_RATE_LIMIT_REDIS_FAIL_CLOSED.",
)
p_rl_set.add_argument("tenant_id")
p_rl_set.add_argument(
"--value", default=None,
help="true / false. true = regulated tenant: 503 on backend "
"hiccup. false = best-effort: keep serving (fail-open).",
)
p_rl_set.add_argument(
"--clear", action="store_true",
help="remove the override; tenant follows process default again",
)
p_rl_set.add_argument("--actor", default="operator")
p_rl_set.set_defaults(func=_cmd_tenant_rate_limit_fail_closed_set)
p_rl_show = tsub.add_parser(
"rate-limit-fail-closed-show",
help="Stage 128: read the per-tenant rate-limit fail-closed "
"override (None = follow process default).",
)
p_rl_show.add_argument("tenant_id")
p_rl_show.set_defaults(func=_cmd_tenant_rate_limit_fail_closed_show)
# --- key (per-tenant API keys) ----------------------------------
kp = sub.add_parser("key", help="per-tenant API key management")
ksub = kp.add_subparsers(dest="action", required=True, metavar="<action>")
k1 = ksub.add_parser("mint", help="mint a new key (raw shown ONCE)")
k1.add_argument("tenant_id")
k1.add_argument("--name", default="")
k1.add_argument("--expires-at", default=None,
help="ISO-8601 UTC, e.g. 2026-12-31T00:00:00+00:00")
k1.add_argument("--role", default="admin",
choices=("readonly", "operator", "admin", "scim"),
help="permission level (default admin for backwards compat; "
"use 'operator' for service accounts that ingest "
"data, 'readonly' for dashboards, 'scim' for IdP "
"provisioning tokens — see `infra scim issue-token`)")
k1.set_defaults(func=_cmd_key_mint)
k2 = ksub.add_parser("list", help="list a tenant's keys")
k2.add_argument("tenant_id")
k2.add_argument("--include-revoked", action="store_true")
k2.set_defaults(func=_cmd_key_list)
k3 = ksub.add_parser("revoke", help="revoke a key (soft delete)")
k3.add_argument("tenant_id")
k3.add_argument("key_id")
k3.set_defaults(func=_cmd_key_revoke)
# --- admin-key --------------------------------------------------
ap = sub.add_parser("admin-key", help="platform-level admin key management")
asub = ap.add_subparsers(dest="action", required=True, metavar="<action>")
a1 = asub.add_parser("mint", help="mint a new admin key (raw shown ONCE)")
a1.add_argument("--name", default="")
a1.add_argument("--expires-at", default=None)
a1.set_defaults(func=_cmd_admin_mint)
a2 = asub.add_parser("list", help="list admin keys")
a2.add_argument("--include-revoked", action="store_true")
a2.set_defaults(func=_cmd_admin_list)
a3 = asub.add_parser("revoke", help="revoke an admin key")
a3.add_argument("key_id")
a3.set_defaults(func=_cmd_admin_revoke)
# --- SCIM (Stage 103) ------------------------------------------
scimp = sub.add_parser(
"scim", help="SCIM 2.0 user provisioning (V1.2)",
)
scimsub = scimp.add_subparsers(
dest="action", required=True, metavar="<action>",
)
scim_token = scimsub.add_parser(
"issue-token",
help="mint a SCIM bearer for a tenant (raw shown ONCE; "
"paste into IdP SCIM connector)",
)
scim_token.add_argument("tenant_id")
scim_token.add_argument("--name", default="scim",
help="human label for the key (default 'scim')")
scim_token.add_argument("--expires-at", default=None,
help="ISO-8601 UTC; omit for no expiry")
scim_token.set_defaults(func=_cmd_scim_issue_token)
scim_list = scimsub.add_parser(
"list-users",
help="dump SCIM-provisioned users for a tenant (ops view)",
)
scim_list.add_argument("tenant_id")
scim_list.add_argument("--limit", type=int, default=1000)
scim_list.set_defaults(func=_cmd_scim_list_users)
scim_cnt = scimsub.add_parser(
"count",
help="counts of SCIM users + groups for a tenant "
"(monitoring helper)",
)
scim_cnt.add_argument("tenant_id")
scim_cnt.set_defaults(func=_cmd_scim_count)
scim_glist = scimsub.add_parser(
"list-groups",
help="dump SCIM-provisioned groups + members for a tenant "
"(Stage 108)",
)
scim_glist.add_argument("tenant_id")
scim_glist.add_argument("--limit", type=int, default=1000)
scim_glist.set_defaults(func=_cmd_scim_list_groups)
# --- ops (Stage 10) --------------------------------------------
sp = sub.add_parser(
"status", help="DB backend, schema version, platform totals",
)
sp.set_defaults(func=_cmd_status)
bp = sub.add_parser(
"backup",
help="SQLite-only: write a consistent snapshot to <out>",
)
bp.add_argument("out", help="destination path for the snapshot file")
bp.set_defaults(func=_cmd_backup)
# --- run diff (Stage 20) --------------------------------------
rp = sub.add_parser("run", help="run inspection (Stage 20)")
rsub = rp.add_subparsers(dest="action", required=True,
metavar="<action>")
rd = rsub.add_parser("diff",
help="diff a run against another (default: previous on tenant)")
rd.add_argument("run_id")
rd.add_argument("--against", default=None,
help="run_id to compare against (default: previous run on tenant)")
rd.set_defaults(func=_cmd_run_diff)
ra = rsub.add_parser(
"alert-check",
help="exit 1 if the run's diff contains new/escalated issues "
"at >= threshold (for cron / monitoring wrappers)",
)
ra.add_argument("run_id")
ra.add_argument("--against", default=None,
help="run_id to compare against (default: previous run)")
ra.add_argument("--threshold", default="critical",
choices=("critical", "high", "medium", "low"),
help="alert when severity is at least this "
"(default: critical — only criticals page)")
# Stage 40: push notifications. Pure additive to the existing
# pull-only gate — same exit code, same stdout payload, plus an
# optional POST when the gate fires. HTTP failures don't change
# the exit code (the gate is the gate).
ra.add_argument("--webhook", default=None,
help="POST the trigger payload as JSON to this URL "
"when the alert fires (Stage 40 — Slack-/PagerDuty-"
"friendly push)")
ra.add_argument("--webhook-secret-env", default=None,
dest="webhook_secret_env",
help="env var holding the HMAC secret. When set, the "
"POST includes X-OrgState-Signature: sha256=<hex>")
ra.set_defaults(func=_cmd_run_alert_check)
# --- run stale-check (Stage 27) -------------------------------
rs = rsub.add_parser(
"stale-check",
help="exit 1 if any tenant has gone too long without a run "
"(or has never run) — companion to alert-check",
)
rs.add_argument("--max-gap-hours", type=float, default=48.0,
dest="max_gap_hours",
help="page when last run started more than this many "
"hours ago (default 48 — most tenants run daily)")
rs.add_argument("--tenant", default=None,
help="scope the check to one tenant "
"(default: scan every tenant)")
rs.add_argument("--entity-type", default=None, dest="entity_type",
help="optionally scope the check to one entity_type "
"within the tenant(s)")
rs.set_defaults(func=_cmd_run_stale_check)
rt = rsub.add_parser(
"trigger",
help="run the pipeline from stored observations — the fix step "
"for Stage 27's detection; symmetric to "
"`calibration recalibrate`",
)
rt.add_argument("tenant_id")
rt.add_argument("--vertical", required=True,
help="vertical name (e.g. 'logistics', 'saas')")
rt.add_argument("--entity-type", required=True, dest="entity_type",
help="entity_type within the vertical")
rt.set_defaults(func=_cmd_run_trigger)
rpre = rsub.add_parser(
"preview",
help="dry-run analysis: report what the next run WOULD see "
"without writing anything (Stage 37)",
)
rpre.add_argument("tenant_id")
rpre.add_argument("--vertical", required=True)
rpre.add_argument("--entity-type", required=True, dest="entity_type")
rpre.set_defaults(func=_cmd_run_preview)
# Stage 43: operator view + manual release for the Stage 42 locks.
# The TTL already auto-recovers from crashes; these commands cover
# "I want to see/release one right now, not wait."
rl = rsub.add_parser(
"locks",
help="list every currently-held run lock (Stage 43)",
)
rl.set_defaults(func=_cmd_run_locks)
ru = rsub.add_parser(
"unlock",
help="manually release one run lock; refuses live locks "
"unless --force (Stage 43)",
)
ru.add_argument("tenant_id")
ru.add_argument("--entity-type", required=True, dest="entity_type")
ru.add_argument("--force", action="store_true",
help="release even if the lock is currently live "
"(only when certain no process is running)")
ru.set_defaults(func=_cmd_run_unlock)
# --- calibration (Stage 26) -----------------------------------
cp = sub.add_parser(
"calibration",
help="calibration freshness inspection (Stage 26)",
)
csub = cp.add_subparsers(dest="action", required=True,
metavar="<action>")
cc = csub.add_parser(
"check",
help="exit 1 if any stored calibration is older than --max-age-days "
"(for cron / monitoring wrappers)",
)
cc.add_argument("--max-age-days", type=float, default=90.0,
dest="max_age_days",
help="flag calibrations older than this many days "
"(default 90 — re-calibrate quarterly)")
cc.add_argument("--tenant", default=None,
help="scope the check to one tenant "
"(default: scan every tenant)")
cc.set_defaults(func=_cmd_calibration_check)
cr = csub.add_parser(
"recalibrate",
help="derive a fresh calibration from stored observations and "
"persist it — the fix step for Stage 26's detection",
)
cr.add_argument("tenant_id")
cr.add_argument("--vertical", required=True,
help="vertical name (e.g. 'logistics', 'saas', "
"'salesforce')")
cr.add_argument("--entity-type", required=True, dest="entity_type",
help="entity_type within the vertical "
"(e.g. 'warehouse', 'sales_owner')")
cr.add_argument("--min-observations", type=int, default=14,
dest="min_observations",
help="refuse to calibrate on fewer than this many "
"stored observations (default 14 — matches the "
"typical baseline_window)")
cr.add_argument("--actor", default="operator",
help="actor name for the audit log entry "
"(default 'operator')")
cr.set_defaults(func=_cmd_calibration_recalibrate)
# --- health (Stage 28) ----------------------------------------
hp = sub.add_parser(
"health",
help="unified silent-failure gate — runs both calibration and "
"run-stale checks in one shot (Stage 28)",
)
hp.add_argument("--max-age-days", type=float, default=90.0,
dest="max_age_days",
help="calibration freshness threshold "
"(default 90 days)")
hp.add_argument("--max-gap-hours", type=float, default=48.0,
dest="max_gap_hours",
help="run-gap threshold (default 48 hours)")
hp.add_argument("--tenant", default=None,
help="scope both checks to one tenant "
"(default: scan every tenant)")
hp.set_defaults(func=_cmd_health)
# --- source (Stage 33: connector dry-run) ---------------------
sp = sub.add_parser(
"source",
help="inspect data sources without writing to the DB (Stage 33)",
)
ssub = sp.add_subparsers(dest="action", required=True,
metavar="<action>")
st = ssub.add_parser(
"test",
help="build a connector and report observation counts / shape "
"without persisting anything — onboarding sanity check",
)
st.add_argument("--type", required=True, dest="connector_type",
help="connector type (e.g. 'csv_folder', 'http_json', "
"'sftp')")
st.add_argument("--vertical", default=None,
help="vertical name (for connectors that need it, "
"like csv_folder)")
st.add_argument("--data-dir", default=None, dest="data_dir",
help="data directory (for csv_folder)")
st.add_argument("--base-url", default=None, dest="base_url",
help="endpoint URL (for http_json)")
st.add_argument("--bearer-env", default=None, dest="bearer_env",
help="env var holding bearer token (for http_json)")
st.add_argument("--entity-type", default=None, dest="entity_type",
help="probe one entity type only "
"(default: every type the connector advertises)")
st.add_argument("--show-sample", type=int, default=0,
dest="show_sample",
help="emit the first N observations per entity type "
"so the shape is visible (default 0 = counts only)")
# this command is intentionally DB-free — it must never touch a
# production DB during a dry-run. main() skips svc creation when
# needs_db is False.
st.set_defaults(func=_cmd_source_test, needs_db=False)
# --- schedule (Stage 36) --------------------------------------
schp = sub.add_parser(
"schedule",
help="scheduled ingestion management — create, list, toggle, "
"and `tick` to run every due schedule once (cron-friendly)",
)
schsub = schp.add_subparsers(dest="action", required=True,
metavar="<action>")
sch_create = schsub.add_parser(
"create",
help="register a new scheduled-ingestion job for a tenant",
)
sch_create.add_argument("tenant_id")
sch_create.add_argument("--vertical", required=True)
sch_create.add_argument("--entity-type", required=True,
dest="entity_type")
sch_create.add_argument("--connector-type", required=True,
dest="connector_type",
help="connector type (e.g. csv_folder, "
"http_json, sftp)")
sch_create.add_argument("--connector-config", default=None,
dest="connector_config",
help="JSON string of the connector config "
"(default '{}')")
sch_create.add_argument("--frequency", default="daily",
choices=("hourly", "daily", "weekly"))
sch_create.add_argument("--disabled", action="store_true",
help="create the schedule in disabled state "
"(useful for staged rollouts)")
sch_create.set_defaults(func=_cmd_schedule_create)
sch_list = schsub.add_parser("list", help="list schedules")
sch_list.add_argument("--tenant", default=None,
help="scope to one tenant (default: all)")
sch_list.add_argument("--enabled-only", action="store_true",
dest="enabled_only")
sch_list.add_argument("--status", default=None,
choices=("ok", "skipped", "error", "never"),
help="filter by latest tick outcome — Stage 38; "
"'never' matches schedules that have not "
"ticked yet")
sch_list.set_defaults(func=_cmd_schedule_list)
sch_get = schsub.add_parser("get", help="get one schedule by id")
sch_get.add_argument("schedule_id")
sch_get.set_defaults(func=_cmd_schedule_get)
sch_enable = schsub.add_parser("enable",
help="enable a schedule")
sch_enable.add_argument("schedule_id")
sch_enable.set_defaults(func=_cmd_schedule_set_enabled, enabled=True)
sch_disable = schsub.add_parser("disable",
help="disable a schedule")
sch_disable.add_argument("schedule_id")
sch_disable.set_defaults(func=_cmd_schedule_set_enabled, enabled=False)
sch_tick = schsub.add_parser(
"tick",
help="run every currently-due schedule once "
"(cron-friendly entry point — exit 1 iff any errored)",
)
# Stage 41: push notification when at least one schedule errors.
# Same shape as Stage 40's alert-check --webhook — same helper,
# same HMAC pattern. Silence stays silent (no POST on n_errors=0).
sch_tick.add_argument("--webhook", default=None,
help="POST the tick payload as JSON to this URL "
"when at least one schedule errored "
"(Stage 41 — Slack/PagerDuty push)")
sch_tick.add_argument("--webhook-secret-env", default=None,
dest="webhook_secret_env",
help="env var holding the HMAC secret. When set, "
"the POST includes X-OrgState-Signature: "
"sha256=<hex>")
sch_tick.set_defaults(func=_cmd_schedule_tick)
# --- ops-summary (Stage 44) -----------------------------------
osp = sub.add_parser(
"ops-summary",
help="bird's-eye view of the whole platform in one command "
"(tenants, schedules, runs, calibrations, locks, audit). "
"Always exits 0 — info command, not a gate (Stage 44).",
)
osp.add_argument("--max-age-days", type=float, default=90.0,
dest="max_age_days",
help="calibration freshness threshold for the "
"stale count (default 90)")
osp.add_argument("--max-gap-hours", type=float, default=48.0,
dest="max_gap_hours",
help="run-gap threshold for the unhealthy-tenants "
"count (default 48)")
osp.set_defaults(func=_cmd_ops_summary)
# --- backfill (Stage 65) -------------------------------------
bfp = sub.add_parser(
"backfill",
help="point-in-time historical analysis — walks cutoff days "
"and reports what each as-of date would have flagged. "
"Read-only, no DB writes (Stage 65).",
)
bfp.add_argument("tenant_id")
bfp.add_argument("--vertical", required=True)
bfp.add_argument("--entity-type", required=True, dest="entity_type")
bfp.add_argument("--from", default=None, dest="from_day",
help="earliest cutoff (ISO day, e.g. 2026-01-01). "
"Default: earliest observation.")
bfp.add_argument("--until", default=None, dest="until_day",
help="latest cutoff (ISO day). "
"Default: latest observation.")
bfp.add_argument("--step", type=int, default=1, dest="step_days",
help="evaluate every Nth day (default 1 = every "
"day; pass 7 for weekly, 30 for monthly)")
bfp.set_defaults(func=_cmd_backfill)
# --- webhook (Stage 71) ----------------------------------------
whp = sub.add_parser(
"webhook",
help="per-tenant outbound webhook registry (Stage 71)",
)
whsub = whp.add_subparsers(dest="action", required=True,
metavar="<action>")
wh_create = whsub.add_parser(
"create",
help="register a new webhook URL for a tenant. The raw "
"--secret (if passed) is hashed before persistence "
"and shown ONCE in the response.",
)
wh_create.add_argument("tenant_id")
wh_create.add_argument("--url", required=True,
help="https://… endpoint to deliver to")
wh_create.add_argument("--secret", default=None,
help="raw secret string for HMAC "
"signing on delivery (Stage 72 wiring); "
"stored as sha256")
wh_create.add_argument("--actor", default="operator")
wh_create.set_defaults(func=_cmd_webhook_create)
wh_list = whsub.add_parser("list",
help="list webhooks for a tenant")
wh_list.add_argument("tenant_id")
wh_list.add_argument("--enabled-only", action="store_true",
dest="enabled_only")
wh_list.set_defaults(func=_cmd_webhook_list)
wh_get = whsub.add_parser("get",
help="inspect one webhook by id")
wh_get.add_argument("webhook_id")
wh_get.set_defaults(func=_cmd_webhook_get)
wh_enable = whsub.add_parser("enable",
help="enable a webhook")
wh_enable.add_argument("webhook_id")
wh_enable.add_argument("--actor", default="operator")
wh_enable.set_defaults(func=_cmd_webhook_set_enabled, enabled=True)
wh_disable = whsub.add_parser("disable",
help="disable a webhook (preserves "
"row + history; soft-pause)")
wh_disable.add_argument("webhook_id")
wh_disable.add_argument("--actor", default="operator")
wh_disable.set_defaults(func=_cmd_webhook_set_enabled, enabled=False)
wh_del = whsub.add_parser("delete",
help="permanently delete a webhook row")
wh_del.add_argument("webhook_id")
wh_del.add_argument("--actor", default="operator")
wh_del.set_defaults(func=_cmd_webhook_delete)
# Stage 74: synthetic ping for end-to-end verification —
# customer registers a webhook, runs this, knows in 1 second
# whether their receiver is wired correctly.
wh_test = whsub.add_parser(
"test",
help="send a synthetic 'ping' payload to a webhook to "
"verify the receiver works (Stage 74)",
)
wh_test.add_argument("webhook_id")
wh_test.set_defaults(func=_cmd_webhook_test)
# Stage 76: delivery audit log + retry queue. customer asks
# "did the alert fire?" → list. ops responds to 5xx storm →
# retry. each retry is a NEW row (attempt+1) so the audit
# trail is complete.
wh_deliveries = whsub.add_parser(
"deliveries",
help="webhook delivery audit log + retry (Stage 76)",
)
wh_dsub = wh_deliveries.add_subparsers(
dest="dsub", required=True, metavar="<subcmd>",
)
wh_dlist = wh_dsub.add_parser(
"list",
help="list delivery attempts (filter by --tenant OR "
"--webhook; optional --status)",
)
wh_dlist.add_argument("--tenant", dest="tenant_id", default=None)
wh_dlist.add_argument("--webhook", dest="webhook_id", default=None)
wh_dlist.add_argument(
"--status", default=None,
choices=("ok", "failed_retriable", "failed_permanent"),
help="filter by delivery status (only honored with --tenant)",
)
wh_dlist.add_argument("--limit", type=int, default=100)
wh_dlist.set_defaults(func=_cmd_webhook_deliveries_list)
wh_dget = wh_dsub.add_parser(
"get",
help="show one delivery row including full payload + error",
)
wh_dget.add_argument("delivery_id")
wh_dget.set_defaults(func=_cmd_webhook_deliveries_get)
wh_dretry = wh_dsub.add_parser(
"retry",
help="re-fire pending retriable failures (5xx + network). "
"Per-payload attempt cap; --dry-run for preview.",
)
wh_dretry.add_argument("--tenant", default=None,
help="scope retries to this tenant; "
"default: all tenants (operator use)")
wh_dretry.add_argument("--max-attempts", type=int, default=5,
dest="max_attempts",
help="give up on a payload once it has "
"been attempted this many times")
wh_dretry.add_argument("--limit", type=int, default=100,
help="cap retries fired per call (cron-safe)")
wh_dretry.add_argument("--dry-run", action="store_true",
help="list what would be retried without sending")
wh_dretry.set_defaults(func=_cmd_webhook_deliveries_retry)
# Stage 122 — cron-friendly retry that respects backoff
wh_due = wh_dsub.add_parser(
"retry-due",
help="Stage 122: re-fire only deliveries whose retry_at "
"is past (respects exponential backoff). Use in a "
"cron (every 1-5min). Exit 1 if any retry failed.",
)
wh_due.add_argument("--limit", type=int, default=100)
wh_due.add_argument("--max-attempts", type=int, default=5,
dest="max_attempts")
wh_due.set_defaults(func=_cmd_webhook_retry_due)
wh_dlq = wh_dsub.add_parser(
"dlq-list",
help="Stage 122: dump dead-letter queue (payloads that "
"exhausted max_attempts).",
)
wh_dlq.add_argument("--tenant", default=None)
wh_dlq.add_argument("--limit", type=int, default=100)
wh_dlq.set_defaults(func=_cmd_webhook_dlq_list)
wh_dlq_r = wh_dsub.add_parser(
"dlq-retry",
help="Stage 122: revive ONE DLQ row as attempt=1 (fresh "
"window). Use after fixing the receiver. The DLQ "
"row itself stays for forensics.",
)
wh_dlq_r.add_argument("delivery_id")
wh_dlq_r.add_argument("--actor", default="operator")
wh_dlq_r.set_defaults(func=_cmd_webhook_dlq_retry)
# Stage 136 — DLQ TTL purge (cron-friendly cleanup)
wh_dlq_p = wh_dsub.add_parser(
"dlq-purge",
help="Stage 136: delete DLQ rows older than --ttl-days. "
"Run from cron (daily). Use --dry-run first to "
"preview what would be purged.",
)
wh_dlq_p.add_argument(
"--ttl-days", type=int, required=True,
dest="ttl_days",
help="purge DLQ rows older than N days (1..3650). "
"Common: 30 (one month forensics window) or "
"90 (quarterly compliance review).",
)
wh_dlq_p.add_argument("--tenant", default=None,
help="restrict to one tenant_id")
wh_dlq_p.add_argument(
"--dry-run", action="store_true",
help="list delivery_ids that would be purged; no delete",
)
wh_dlq_p.add_argument("--actor", default="cron")
wh_dlq_p.set_defaults(func=_cmd_webhook_dlq_purge)
# Stage 129 — per-webhook custom backoff schedule
wh_bo_set = whsub.add_parser(
"backoff-set",
help="Stage 129: install a custom retry backoff for "
"ONE webhook. Pass --schedule as JSON list of "
"positive integer seconds (e.g. '[30,120,600]'). "
"Override beats the global default. Use "
"backoff-clear to drop the override.",
)
wh_bo_set.add_argument("webhook_id")
wh_bo_set.add_argument(
"--schedule", required=True,
help='JSON list of seconds, e.g. \'[30, 120, 600]\' for '
'a fast-retry webhook (payment receivers etc.)',
)
wh_bo_set.add_argument("--actor", default="operator")
wh_bo_set.set_defaults(func=_cmd_webhook_backoff_set)
wh_bo_clear = whsub.add_parser(
"backoff-clear",
help="Stage 129: drop the per-webhook backoff override. "
"Webhook returns to the global default.",
)
wh_bo_clear.add_argument("webhook_id")
wh_bo_clear.add_argument("--actor", default="operator")
wh_bo_clear.set_defaults(func=_cmd_webhook_backoff_clear)
wh_bo_show = whsub.add_parser(
"backoff-show",
help="Stage 129: show the effective backoff schedule for "
"a webhook (override OR global default).",
)
wh_bo_show.add_argument("webhook_id")
wh_bo_show.set_defaults(func=_cmd_webhook_backoff_show)
# Stage 135 — per-webhook max_attempts override
wh_ma_set = whsub.add_parser(
"max-attempts-set",
help="Stage 135: install a custom per-webhook retry cap. "
"Integer 1..50. Payment integrations want 3 (fast "
"DLQ to alert ops); analytics tolerate 8.",
)
wh_ma_set.add_argument("webhook_id")
wh_ma_set.add_argument(
"value", type=int,
help="max attempts (1..50)",
)
wh_ma_set.add_argument("--actor", default="operator")
wh_ma_set.set_defaults(func=_cmd_webhook_max_attempts_set)
wh_ma_clear = whsub.add_parser(
"max-attempts-clear",
help="Stage 135: drop the per-webhook retry cap. Webhook "
"returns to the global default (5).",
)
wh_ma_clear.add_argument("webhook_id")
wh_ma_clear.add_argument("--actor", default="operator")
wh_ma_clear.set_defaults(func=_cmd_webhook_max_attempts_clear)
wh_ma_show = whsub.add_parser(
"max-attempts-show",
help="Stage 135: show the effective retry cap for a "
"webhook (override OR global default).",
)
wh_ma_show.add_argument("webhook_id")
wh_ma_show.set_defaults(func=_cmd_webhook_max_attempts_show)
# Stage 142 — per-webhook jitter
wh_jt_set = whsub.add_parser(
"jitter-set",
help="Stage 142: install a per-webhook jitter (max "
"±N seconds added to retry_at per attempt). "
"Smooths thundering-herd retry bursts. 1..3600.",
)
wh_jt_set.add_argument("webhook_id")
wh_jt_set.add_argument(
"value", type=int,
help="max jitter seconds (1..3600)",
)
wh_jt_set.add_argument("--actor", default="operator")
wh_jt_set.set_defaults(func=_cmd_webhook_jitter_set)
wh_jt_clear = whsub.add_parser(
"jitter-clear",
help="Stage 142: drop the per-webhook jitter. Returns "
"to the deterministic schedule.",
)
wh_jt_clear.add_argument("webhook_id")
wh_jt_clear.add_argument("--actor", default="operator")
wh_jt_clear.set_defaults(func=_cmd_webhook_jitter_clear)
wh_jt_show = whsub.add_parser(
"jitter-show",
help="Stage 142: show the per-webhook jitter setting.",
)
wh_jt_show.add_argument("webhook_id")
wh_jt_show.set_defaults(func=_cmd_webhook_jitter_show)
# --- audit (Stage 12) ------------------------------------------
audp = sub.add_parser("audit", help="audit log inspection")
audsub = audp.add_subparsers(dest="action", required=True,
metavar="<action>")
ag1 = audsub.add_parser("list", help="list audit entries (newest first)")
ag1.add_argument("--actor", default=None)
ag1.add_argument("--action", dest="action_filter", default=None,
help="filter by action name (mint_api_key, etc.)")
ag1.add_argument("--tenant", default=None)
# Stage 51: time-range filters for compliance pulls.
# since is INCLUSIVE (>=), until is EXCLUSIVE (<), so a Q1
# query passes --since 2026-01-01 --until 2026-04-01 without
# double-counting the boundary.
ag1.add_argument("--since", default=None,
help="created_at >= this ISO-8601 timestamp "
"(inclusive) — Stage 51")
ag1.add_argument("--until", default=None,
help="created_at < this ISO-8601 timestamp "
"(exclusive) — Stage 51")
ag1.add_argument("--limit", type=int, default=100)
ag1.add_argument("--offset", type=int, default=0)
ag1.set_defaults(func=_cmd_audit_list)
# --- db management (Stage 81) ----------------------------------
# `infra db backup OUT` / `infra db verify IN` / `infra db
# restore IN`. The legacy `infra backup OUT` (Stage 10) stays
# for backwards compatibility — it's the same code path via
# the old _cmd_backup. The new subgroup adds verify+restore
# without breaking existing pipelines that call `infra backup`.
dbp = sub.add_parser(
"db",
help="DB management — backup/verify/restore (Stage 81; "
"SQLite only — Postgres uses pg_dump directly)",
)
dbsub = dbp.add_subparsers(dest="action", required=True,
metavar="<action>")
db_backup = dbsub.add_parser(
"backup",
help="atomic online backup to OUTFILE via VACUUM INTO "
"(safe while service runs)",
)
db_backup.add_argument("out",
help="destination path for the backup file")
db_backup.set_defaults(func=_cmd_backup_create, needs_db=False)
db_verify = dbsub.add_parser(
"verify",
help="confirm INFILE is a valid OrgState backup; print "
"schema version + per-table row counts",
)
db_verify.add_argument("infile")
db_verify.set_defaults(func=_cmd_backup_verify, needs_db=False)
db_restore = dbsub.add_parser(
"restore",
help="REPLACE the live DB with INFILE. Refuses if target "
"has data unless --force. STOP the service first — "
"live restore corrupts SQLite.",
)
db_restore.add_argument("infile")
db_restore.add_argument(
"--force", action="store_true",
help="overwrite target even if it contains data "
"(DESTRUCTIVE — current contents are LOST)",
)
db_restore.set_defaults(func=_cmd_backup_restore, needs_db=False)
# --- data retention auto-purge (Stage 91) ----------------------
rtp = sub.add_parser(
"retention",
help="data retention policy + auto-purge (Stage 91). "
"Per-table windows via ORGSTATE_RETENTION_<TABLE>_DAYS "
"env vars; unset = never purge.",
)
rtsub = rtp.add_subparsers(dest="action", required=True,
metavar="<action>")
rt_show = rtsub.add_parser(
"show",
help="print the active policy + dry-run counts of rows that "
"WOULD be deleted now. Doesn't change anything.",
)
rt_show.set_defaults(func=_cmd_retention_show)
rt_purge = rtsub.add_parser(
"purge",
help="execute the policy. With --dry-run, prints what WOULD "
"be deleted; without, actually deletes + writes an audit "
"row. Cron-friendly: exit 0 with zero counts when no "
"policy configured (unless --strict).",
)
rt_purge.add_argument(
"--dry-run", action="store_true",
help="print would-delete counts; don't touch the DB",
)
rt_purge.add_argument(
"--strict", action="store_true",
help="exit 1 instead of 0 when no policy is configured "
"(catches misconfigured cron jobs)",
)
rt_purge.add_argument(
"--actor", default="cron",
help="audit log actor for the purge run (default: cron)",
)
rt_purge.set_defaults(func=_cmd_retention_purge)
# --- billing pipeline (Stage 92) -------------------------------
blp = sub.add_parser(
"billing",
help="generate invoices from the usage meter (Stage 92). "
"Idempotent — re-runs upsert. Plans are code-defined "
"in infra/billing.py.",
)
blsub = blp.add_subparsers(dest="action", required=True,
metavar="<action>")
bl_inv = blsub.add_parser(
"invoice",
help="generate (or re-generate) the invoice for "
"(tenant, period). Default plan: starter.",
)
bl_inv.add_argument("tenant_id")
bl_inv.add_argument(
"--period", default=None,
help="YYYY-MM (UTC). Default: current month.",
)
bl_inv.add_argument(
"--plan", default=None,
help="plan name (see `infra billing plans`). "
"Default: starter.",
)
bl_inv.add_argument("--actor", default="billing_cron")
bl_inv.set_defaults(func=_cmd_billing_invoice)
bl_get = blsub.add_parser(
"get",
help="fetch an existing invoice (no regeneration)",
)
bl_get.add_argument("tenant_id")
bl_get.add_argument("period",
help="YYYY-MM (UTC) bucket of the invoice")
bl_get.set_defaults(func=_cmd_billing_get)
bl_list = blsub.add_parser(
"list",
help="list all invoices for a tenant (newest first)",
)
bl_list.add_argument("tenant_id")
bl_list.set_defaults(func=_cmd_billing_list)
bl_plans = blsub.add_parser(
"plans",
help="list the named pricing plans available",
)
bl_plans.set_defaults(func=_cmd_billing_plans, needs_db=False)
# Stage 99: push generated invoice to external billing system.
# V1.2 supports Stripe only; provider arg leaves room for
# Lemon Squeezy / Chargebee / etc when a customer asks.
bl_push = blsub.add_parser(
"push",
help="push a generated invoice to the external billing "
"system (Stripe). Idempotent: re-runs return the "
"cached external_invoice_id without duplicating.",
)
bl_push.add_argument("tenant_id")
bl_push.add_argument("period",
help="YYYY-MM UTC bucket the invoice covers")
bl_push.add_argument(
"--customer", default=None,
help="Stripe customer id (cus_...). Stage 105: when "
"omitted, falls back to the tenant's stored "
"stripe_customer_id (set via `tenant billing-config "
"--stripe-customer`). Missing in BOTH places → "
"exit 1 with a clear error.",
)
bl_push.add_argument(
"--provider", default="stripe",
choices=("stripe",),
help="external billing system. Only 'stripe' in V1.2.",
)
bl_push.add_argument("--actor", default="billing_cron")
bl_push.set_defaults(func=_cmd_billing_push)
# Stage 106: manually replay a Stripe webhook event from a
# JSON file (escape hatch for ops; the HTTP receiver is the
# primary path).
bl_replay = blsub.add_parser(
"webhook-replay",
help="replay a Stripe webhook event from a JSON file "
"(ops escape hatch — no sig check). The file must "
"be a Stripe Event payload as it would arrive at "
"POST /webhooks/stripe.",
)
bl_replay.add_argument("event_file",
help="path to a JSON file containing a "
"Stripe Event payload")
bl_replay.set_defaults(func=_cmd_billing_webhook_replay)
# Stage 124 — reconcile cron (covers webhook drops).
bl_recon = blsub.add_parser(
"reconcile",
help="Stage 124: pull recent Stripe invoices, recover "
"any local rows still 'issued' that Stripe shows "
"'paid'. Idempotent — use in a cron (daily for "
"low volume, hourly for high).",
)
bl_recon.add_argument(
"--since-seconds", type=int, default=7 * 24 * 3600,
dest="since_seconds",
help="lookback window in seconds. Default 7d covers "
"Stripe's 3d webhook retry window + slack.",
)
bl_recon.add_argument(
"--limit", type=int, default=100,
help="max Stripe invoices to scan per run (cron-safe)",
)
bl_recon.add_argument("--actor", default="billing_cron")
bl_recon.set_defaults(func=_cmd_billing_reconcile)
# --- security posture (Stage 95) -------------------------------
secp = sub.add_parser(
"security",
help="security posture inspection — encryption-at-rest, "
"TLS, DSN sslmode (Stage 95).",
)
secsub = secp.add_subparsers(dest="action", required=True,
metavar="<action>")
sec_enc = secsub.add_parser(
"encryption-status",
help="report current encryption-at-rest posture from env "
"vars + DSN. Always exits 0 unless --strict.",
)
sec_enc.add_argument(
"--strict", action="store_true",
help="exit 1 if any encryption gap exists — for "
"compliance cron jobs that should fail loud",
)
sec_enc.set_defaults(
func=_cmd_security_encryption_status, needs_db=False,
)
# Stage 143 — KMS key rotation watcher
sec_kms = secsub.add_parser(
"kms-refresh",
help="Stage 143: re-fetch the encryption key from KMS. "
"Run from cron (every ~15 min). KMS rotation "
"propagates without process restart. Exits 0 on "
"success (changed or not), 1 on fetch failure, "
"2 if KMS not configured.",
)
sec_kms.add_argument("--actor", default="cron")
sec_kms.set_defaults(func=_cmd_security_kms_refresh)
# --- Stage 114: field-encryption key rotation -----------------
encp = sub.add_parser(
"encryption",
help="field-encryption key management (Stage 114).",
)
encsub = encp.add_subparsers(dest="action", required=True,
metavar="<action>")
enc_rot = encsub.add_parser(
"rotate-keys",
help="re-encrypt every encrypted column under the "
"ACTIVE (first) Fernet key. Idempotent. Operator "
"first adds the new key in position 0 of "
"ORGSTATE_FIELD_ENCRYPTION_KEY (CSV) + restarts; "
"rotation then walks webhooks.secret_raw + "
"audit_logs.payload_json.",
)
enc_rot.add_argument(
"--dry-run", action="store_true",
help="count rows that WOULD be rewritten without "
"touching the DB",
)
enc_rot.add_argument("--actor", default="operator")
enc_rot.set_defaults(func=_cmd_encryption_rotate_keys)
# --- SSO provider management (Stage 97) ------------------------
ssop = sub.add_parser(
"sso",
help="OIDC SSO provider management (Stage 97). HTTP "
"login flow lives at /sso/{tid}/login.",
)
ssosub = ssop.add_subparsers(dest="action", required=True,
metavar="<action>")
ssopv = ssosub.add_parser(
"provider",
help="CRUD for OIDC providers (the per-tenant IdP config)",
)
ssopvsub = ssopv.add_subparsers(dest="subaction", required=True,
metavar="<subaction>")
ssoc = ssopvsub.add_parser(
"create",
help="register a new SSO provider for a tenant "
"(OIDC default; --type saml for SAML 2.0). "
"client_secret is shown ONCE in the response for "
"OIDC; SAML uses asymmetric signing (no secret).",
)
ssoc.add_argument("tenant_id")
ssoc.add_argument("--name", required=True,
help="human-readable label, e.g. 'Okta Prod'")
ssoc.add_argument(
"--type", default="oidc", choices=("oidc", "saml"),
help="protocol — 'oidc' (default; Stage 97) or 'saml' "
"(Stage 102, requires --sso-url + "
"--x509-cert-pem-file)",
)
ssoc.add_argument(
"--issuer-url", required=True, dest="issuer_url",
help="OIDC: issuer URL (e.g. https://acme.okta.com). "
"SAML: IdP EntityID (the URN/URL the IdP advertises "
"in its metadata as entityID).",
)
ssoc.add_argument(
"--client-id", required=True, dest="client_id",
help="OIDC: client_id from the IdP app registration. "
"SAML: SP EntityID (what we register at the IdP "
"as the SP, often the same as the ACS URL).",
)
ssoc.add_argument(
"--client-secret", default="", dest="client_secret",
help="OIDC: client_secret from the IdP app registration "
"(REQUIRED for --type oidc). SAML: ignored.",
)
ssoc.add_argument(
"--sso-url", default=None, dest="sso_url",
help="SAML only: the IdP's SAML SSO endpoint URL "
"(where we POST the AuthnRequest).",
)
ssoc.add_argument(
"--x509-cert-pem-file", default=None,
dest="x509_cert_pem_file",
help="SAML only: path to the IdP's signing certificate "
"in PEM format (BEGIN CERTIFICATE / END CERTIFICATE).",
)
ssoc.add_argument(
"--allowed-email-domains", default="",
dest="allowed_email_domains",
help="CSV of email domain suffixes (acme.com,partner.io); "
"empty allows any verified email",
)
ssoc.add_argument("--actor", default="operator")
ssoc.set_defaults(func=_cmd_sso_provider_create)
ssol = ssopvsub.add_parser(
"list",
help="list registered SSO providers for a tenant",
)
ssol.add_argument("tenant_id")
ssol.set_defaults(func=_cmd_sso_provider_list)
ssog = ssopvsub.add_parser(
"get",
help="fetch one SSO provider (client_secret omitted)",
)
ssog.add_argument("provider_id")
ssog.set_defaults(func=_cmd_sso_provider_get)
ssod = ssopvsub.add_parser(
"delete",
help="remove an SSO provider. Existing sessions stay "
"valid until they expire; revoke separately if "
"you need them killed now. Requires --confirm.",
)
ssod.add_argument("provider_id")
ssod.add_argument(
"--confirm", required=True,
help="must match provider_id exactly (typo guard)",
)
ssod.add_argument("--actor", default="operator")
ssod.set_defaults(func=_cmd_sso_provider_delete)
# --- SSO sessions (Stage 98) -----------------------------------
sses = ssosub.add_parser(
"session",
help="SSO session inspection + revocation (Stage 98).",
)
ssessub = sses.add_subparsers(dest="subaction", required=True,
metavar="<subaction>")
ssesl = ssessub.add_parser(
"list",
help="list active SSO sessions for a tenant (default: "
"exclude revoked + expired)",
)
ssesl.add_argument("tenant_id")
ssesl.add_argument("--user-email", dest="user_email",
default=None,
help="filter to one user's sessions")
ssesl.add_argument("--include-revoked", action="store_true",
dest="include_revoked",
help="include revoked sessions (forensics)")
ssesl.add_argument("--include-expired", action="store_true",
dest="include_expired",
help="include past-expires_at sessions")
ssesl.add_argument("--limit", type=int, default=100)
ssesl.set_defaults(func=_cmd_sso_session_list)
ssesr = ssessub.add_parser(
"revoke",
help="revoke ONE session by token, OR ALL sessions for "
"a (tenant, user_email) pair. Use the bulk form for "
"employee offboarding / credential compromise.",
)
ssesr.add_argument("--token", default=None,
help="exact session_token to revoke "
"(operator copies from `session list`)")
ssesr.add_argument("--tenant", dest="tenant_id", default=None,
help="for the bulk form: tenant_id")
ssesr.add_argument("--user-email", dest="user_email",
default=None,
help="for the bulk form: email whose "
"sessions to kill across all devices")
ssesr.add_argument("--actor", default="operator")
ssesr.set_defaults(func=_cmd_sso_session_revoke)
ssesp = ssessub.add_parser(
"purge",
help="delete sessions past their expires_at. Cron-friendly; "
"audit row only when count > 0.",
)
ssesp.add_argument("--actor", default="cron")
ssesp.set_defaults(func=_cmd_sso_session_purge)
return p
def main(argv: Optional[List[str]] = None) -> int:
parser = _build_parser()
args = parser.parse_args(argv)
# Stage 33: some commands (`source test`) are deliberately DB-free
# — they must never touch a production DB during a dry-run. Skip
# svc creation when the command opts out via needs_db=False.
if not getattr(args, "needs_db", True):
try:
return args.func(None, args)
except ValueError as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
svc = OrgStateService(args.db)
try:
return args.func(svc, args)
except CalibrationNotFound as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
except ValueError as e:
# service-layer guardrails (unknown tenant, etc.)
print(json.dumps({"error": str(e)}), file=sys.stderr)
return 1
finally:
svc.close()
if __name__ == "__main__": # pragma: no cover
raise SystemExit(main())