orgstate / infra /api /app.py
Legal-i's picture
Stage 239: per-delivery retry (infra/api/app.py)
8f1492f verified
"""
infra.api.app β€” the FastAPI shell.
Deliberately thin: every route is a few lines that call a handler in
handlers.py. All real logic and validation lives in the handlers, which have
no fastapi dependency and are fully unit-tested. This file is just wiring +
HTTP concerns (status codes, the ApiError -> JSON mapping, OpenAPI metadata).
fastapi/uvicorn are optional deps (requirements.txt) β€” importing this module
without them raises a clear error rather than a cryptic ImportError.
Run it:
pip install fastapi uvicorn
uvicorn infra.api.app:app --reload
On async: the routes are declared ``async`` by FastAPI convention, but the
handlers and the storage layer are synchronous (SQLite). True async I/O lands
with the Postgres backend β€” see ../MIGRATION_MAP.md.
"""
from __future__ import annotations
from typing import Optional
from infra.auth import ROLE_ADMIN, ROLE_OPERATOR, ROLE_READONLY, ApiKey
from infra.service import OrgStateService
from . import handlers
from .auth import (
make_dependency,
require_admin,
require_role,
require_run_access,
require_tenant_access,
require_tenant_or_admin,
)
from .errors import ApiError
from .metrics import CONTENT_TYPE as METRICS_CT
from .metrics import render_metrics
from .ratelimit import (
EXCLUDED_PATHS as RATE_LIMIT_EXCLUDED_PATHS,
)
from .ratelimit import (
RateLimitBackendUnavailable as _RateLimitBackendUnavailable,
)
from .ratelimit import (
RateLimiter,
bucket_key_for_request,
)
from .request_context import (
clear_request_context,
generate_request_id,
sanitize_upstream_id,
set_request_context,
)
from .schemas import (
AdminKeyMintBody,
AdminRecalibrateBody,
AdminTriggerRunBody,
ApiKeyMintBody,
BillingCheckoutBody,
CalibrateBody,
ConnectorTestBody,
DecisionPatchBody,
ObservationsIngestBody,
ObservationsRunBody,
RunAnalysisBody,
ScheduleCreateBody,
TenantOverridesBody,
TenantRegisterBody,
WebhookCreateBody,
)
from .security import SecurityConfig, always_on_headers
from .telemetry import ErrorCounter, init_sentry, should_record
from .tls import (
EXCLUDED_PATHS as TLS_EXCLUDED_PATHS,
)
from .tls import (
request_is_https,
require_https_from_env,
)
try:
from fastapi import Depends, FastAPI, Header, Query, Request
from fastapi.responses import JSONResponse, PlainTextResponse, Response
_FASTAPI_AVAILABLE = True
except ImportError: # pragma: no cover - exercised only when fastapi is absent
_FASTAPI_AVAILABLE = False
def _resolve_tenant_fail_closed(svc: "OrgStateService",
authorization: Optional[str],
cache=None,
) -> Optional[bool]:
"""Stage 128/134 β€” best-effort per-tenant fail-closed lookup.
Maps ``Authorization`` header β†’ tenant_id β†’ override. Used by
the rate-limit middleware. Returns:
* None β€” no override (anonymous, unknown token, tenant
has no override, or lookup raised). Backend
falls back to its own self.fail_closed (env).
* True/False β€” explicit tenant override.
Stage 134: ``cache`` is an optional ``TenantAuthCache``
(typically ``app.state.tenant_auth_cache``). Cached
lookups skip the per-request DB query β€” important at
realistic API rates where the same Authorization header
repeats thousands of times/second. Pass ``None`` to
disable caching (used by tests that want deterministic
DB behavior).
Same fail-soft discipline as Stage 128 β€” any exception
deep in the lookup path returns None rather than
propagating to the rate-limit middleware.
"""
from .auth_cache import resolve_tenant_fail_closed_cached
return resolve_tenant_fail_closed_cached(svc, authorization, cache)
def create_app(db_path: Optional[str] = None,
*, rate_limiter: Optional[RateLimiter] = None,
security_config: Optional[SecurityConfig] = None,
error_counter: Optional[ErrorCounter] = None,
require_https: Optional[bool] = None):
"""Build the FastAPI app. ``db_path`` is passed straight to
OrgStateService β€” pass ':memory:' in tests.
``rate_limiter`` (Stage 79) is optional; default reads from env
vars (``ORGSTATE_RATE_LIMIT_PER_KEY`` /
``ORGSTATE_RATE_LIMIT_PER_IP``) with generous defaults that
don't trip in normal test traffic. Inject a low-capacity
limiter to test rate-limit behavior.
``security_config`` (Stage 80) is optional; default reads
from env vars (``ORGSTATE_HSTS_ENABLED``,
``ORGSTATE_HSTS_MAX_AGE``, ``ORGSTATE_CORS_ORIGINS``). Pass
explicitly to test header behavior or to enable HSTS / CORS
without env coupling.
``error_counter`` (Stage 83) is optional; default builds a
fresh ErrorCounter. Tests inject one to verify increments
deterministically. The counter is exposed via /metrics as
``orgstate_http_errors_total`` with a status label.
``require_https`` (Stage 88) is optional; default reads
``ORGSTATE_REQUIRE_HTTPS`` env var. True β‡’ middleware refuses
plain-HTTP requests with 400; uptime/metrics endpoints
excluded. Pass explicitly in tests."""
if not _FASTAPI_AVAILABLE:
raise RuntimeError(
"infra.api.app needs FastAPI β€” `pip install fastapi uvicorn`. "
"The API logic itself lives in infra.api.handlers and is usable "
"and tested without fastapi."
)
app = FastAPI(
title="OrgState Engine API",
version="0.1.0",
description="Early operational drift detection β€” thin HTTP shell over "
"OrgStateService.",
)
svc = OrgStateService(db_path)
app.state.svc = svc
auth_dep = make_dependency(svc)
# Stage 83: error counter + optional Sentry init. Counter is
# always on (zero-dep, in-process); Sentry is opt-in via
# SENTRY_DSN env var + sentry-sdk install. We initialize
# Sentry once at app-build time so the SDK can auto-capture
# unhandled exceptions throughout the request lifecycle.
counter = error_counter if error_counter is not None else ErrorCounter()
app.state.error_counter = counter
app.state.sentry_init = init_sentry()
# Stage 79: rate-limit middleware. Constructed first so it
# wraps everything underneath. Excluded paths (probes/docs)
# are filtered inside the middleware, NOT here β€” keeping the
# exclusion list in one place (ratelimit.py).
limiter = rate_limiter if rate_limiter is not None else RateLimiter.from_env()
app.state.rate_limiter = limiter
# Stage 134 β€” auth-header β†’ tenant policy cache. Shared by
# the rate-limit middleware (Stage 128 per-tenant fail-
# closed lookup) so the same Authorization header doesn't
# re-query the DB thousands of times/second. Stored on
# app.state so the service layer can call invalidate_tenant
# after a policy change.
from .auth_cache import TenantAuthCache
app.state.tenant_auth_cache = TenantAuthCache()
svc.tenant_auth_cache = app.state.tenant_auth_cache
@app.middleware("http")
async def _rate_limit(request: "Request", call_next):
# Liveness/readiness/metrics MUST stay reliable. Uptime
# monitors don't carry credentials; throttling them would
# cause false-positive outage pages.
if request.url.path in RATE_LIMIT_EXCLUDED_PATHS:
return await call_next(request)
if not limiter.enabled:
return await call_next(request)
auth = request.headers.get("authorization")
client_host = request.client.host if request.client else None
bucket_key, authed = bucket_key_for_request(auth, client_host)
# Stage 128/134 β€” per-tenant fail-closed override.
# Cache (Stage 134) skips the DB query for repeated
# tokens. Lookup remains best-effort: any failure β†’
# None β†’ backend uses its own self.fail_closed.
fail_closed_override = _resolve_tenant_fail_closed(
svc, auth, cache=app.state.tenant_auth_cache,
)
try:
allowed, retry_after = limiter.check(
bucket_key, authed,
fail_closed_override=fail_closed_override,
)
except _RateLimitBackendUnavailable:
# Stage 118 β€” fail-closed mode: backend errored out
# and the operator opted into 503 over silent over-
# allow. Retry-After=1 nudges clients to back off
# briefly without giving up the request entirely.
return JSONResponse(
status_code=503,
content={"error": {
"code": "rate_limit_backend_unavailable",
"message": "rate-limit backend is currently "
"unreachable β€” retry shortly",
}},
headers={"Retry-After": "1"},
)
if not allowed:
# Match the existing error envelope shape so SDKs
# parse it the same way as 400/403/404.
return JSONResponse(
status_code=429,
content={
"error": {
"code": "rate_limited",
"message": "too many requests β€” slow down "
"and retry after the Retry-After "
"header value (seconds)",
},
},
headers={"Retry-After": str(max(1, int(retry_after) + 1))},
)
return await call_next(request)
# Stage 88: TLS enforcement. Registered BEFORE security_headers
# so security_headers WRAPS the 400 tls_required response β€”
# even a TLS rejection gets X-Content-Type-Options etc. Default
# OFF (env-driven); operators turn it on AFTER verifying their
# TLS terminator works for every route they expect to serve.
https_required = (require_https if require_https is not None
else require_https_from_env())
app.state.require_https = https_required
@app.middleware("http")
async def _tls_enforcement(request: "Request", call_next):
if not https_required:
return await call_next(request)
# uptime / metrics probes stay over HTTP β€” same exclusion
# rationale as the rate limiter (Stage 79)
if request.url.path in TLS_EXCLUDED_PATHS:
return await call_next(request)
if request_is_https(request):
return await call_next(request)
return JSONResponse(
status_code=400,
content={
"error": {
"code": "tls_required",
"message": (
"this endpoint requires HTTPS β€” retry with "
"https://. If you are behind a TLS-terminating "
"proxy, ensure it sets X-Forwarded-Proto: https."
),
},
},
)
# Stage 80: security headers + CORS.
# Middleware order matters β€” FastAPI/Starlette runs middleware
# in REVERSE registration order on the request, normal order on
# the response. To get hardening headers on EVERY response
# (including 429 from rate-limit above, 422 from Pydantic,
# 401/403 from auth dependencies, 400 from TLS above), security
# headers is registered AFTER the TLS middleware so it's the
# outer wrapper of both.
sec = (security_config if security_config is not None
else SecurityConfig.from_env())
app.state.security_config = sec
# CORS first (so it sees preflight before security_headers
# decorates the response). Only mount if there's an allowlist;
# absent allowlist = browser-side same-origin enforcement, which
# is the safest default.
if sec.cors_origins:
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=list(sec.cors_origins),
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "PATCH",
"DELETE", "OPTIONS"],
# Authorization is the bearer header; Content-Type
# covers JSON POST/PUT. If we add custom headers
# later (X-Request-ID, etc.) extend this list.
allow_headers=["Authorization", "Content-Type"],
)
@app.middleware("http")
async def _security_headers(request: "Request", call_next):
response = await call_next(request)
for name, value in always_on_headers(sec).items():
# use setdefault behavior: if a handler already set
# a security header (e.g. a CSP for a specific HTML
# response), don't clobber it
response.headers.setdefault(name, value)
# Stage 83: count platform errors. 4xx are caller errors
# (Pydantic 422, auth 401/403, rate-limit 429, not-found
# 404 β€” all expected, not noise we'd want to alert on).
# Only 5xx counts as a platform problem.
if should_record(response.status_code):
counter.increment(response.status_code)
return response
# Stage 84: request correlation. Registered LAST so it's the
# outermost wrapper β€” every other middleware runs INSIDE this
# one, meaning the contextvars set here are visible to:
# - rate-limit middleware (logs include request_id)
# - security_headers (debug logs if any)
# - tls enforcement (400 response carries request_id)
# - auth dependency (sets tenant_id once known)
# - handlers + DB layer (all their logger calls)
# On the way out we stamp X-Request-ID on the response so the
# client sees it and can report it back in support tickets.
@app.middleware("http")
async def _request_context(request: "Request", call_next):
upstream = sanitize_upstream_id(
request.headers.get("x-request-id")
)
rid = upstream or generate_request_id()
tokens = set_request_context(rid, tenant_id=None)
# Stage 154 β€” stamp request cookies so legacy auth helpers
# that don't take a FastAPI Request can read the SSO
# session cookie. ``request.cookies`` is a dict-like view
# parsed once by Starlette; copying to a plain dict keeps
# the contextvar value immutable from the handler's side.
from .request_context import set_request_cookies
set_request_cookies(dict(request.cookies))
try:
request.state.request_id = rid
response = await call_next(request)
response.headers["X-Request-ID"] = rid
return response
finally:
clear_request_context(tokens)
def _admin_actor(authorization: Optional[str]) -> str:
"""Resolve an Authorization header on an admin-only route to an
audit-log actor: the AdminKey.key_id when a DB key was used,
``env_admin`` when the env var was used, or ``anonymous`` in
dev mode where no admin credential is configured."""
from .auth import parse_bearer
raw = parse_bearer(authorization)
if raw is None:
return "anonymous"
adm = svc.verify_admin_key(raw)
if adm is not None:
return adm.key_id
# raw matched the env var (require_admin already passed); attribute
# it to a stable name rather than the secret itself
return "env_admin"
def _tenant_or_admin_actor(authorization: Optional[str],
tenant_id: str) -> str:
"""Actor for routes that accept either a tenant key OR admin: the
tenant ApiKey.key_id when present, otherwise the admin actor."""
from .auth import parse_bearer
raw = parse_bearer(authorization)
if raw is None:
return _admin_actor(authorization)
tenant_key = svc.verify_api_key(raw)
if tenant_key is not None and tenant_key.tenant_id == tenant_id:
return tenant_key.key_id
return _admin_actor(authorization)
@app.exception_handler(ApiError)
async def _api_error_handler(_request: "Request", exc: ApiError):
return JSONResponse(status_code=exc.status, content=exc.to_dict())
# --- open routes (no auth) ----------------------------------------
# /health is the probe; POST /tenants is the v1 bootstrap (until a
# future admin-auth slice lands). The bootstrap gap is documented in
# MIGRATION_MAP β€” operators are expected to mint the first key for a
# newly-created tenant out-of-band via svc.create_api_key().
@app.get("/health", tags=["system"])
async def health():
return handlers.health(svc)
# Stage 32: readiness gate over HTTP β€” same signal as the CLI
# `infra health`. Liveness (above) says 'process is alive';
# readiness says 'gates are passing'. K8s convention is to route
# traffic away on 503-ready without restarting the pod (which is
# what failing liveness would do).
@app.get("/health/ready", tags=["system"])
async def health_ready(
max_age_days: float = Query(default=90.0),
max_gap_hours: float = Query(default=48.0),
tenant_id: Optional[str] = Query(default=None),
):
status, payload = handlers.readiness(
svc, max_age_days=max_age_days,
max_gap_hours=max_gap_hours, tenant_id=tenant_id,
)
return JSONResponse(status_code=status, content=payload)
# Prometheus scrape endpoint (Stage 11). Unauthenticated β€” the
# convention for /metrics; lock down at the network or proxy layer
# if your deployment exposes this beyond the scrape network.
# Stage 83: also surfaces the error counter so Prometheus alerting
# on `rate(orgstate_http_errors_total[5m]) > X` works out of the box.
@app.get("/metrics", tags=["system"], response_class=PlainTextResponse)
async def metrics():
return PlainTextResponse(
render_metrics(svc, error_counter=counter,
rate_limiter=limiter),
media_type=METRICS_CT,
)
# Stage 96: public status page. Unauthenticated, excluded
# from rate limiting + TLS enforcement (uptime monitors poll
# this from internal networks). Distinct from /health
# (machine probe) and /health/ready (routing decision) β€”
# this is the human-facing trust signal.
@app.get("/status", tags=["system"], response_class=PlainTextResponse)
async def status_html():
from .status import build_status, render_html
report = build_status(svc, error_counter=counter)
return PlainTextResponse(
render_html(report),
media_type="text/html; charset=utf-8",
)
@app.get("/status.json", tags=["system"])
async def status_json():
from .status import build_status
report = build_status(svc, error_counter=counter)
return report.to_dict()
# --- Stage 97: OIDC SSO routes -----------------------------------
# Login lives at GET /sso/{tid}/login β†’ 302 to the IdP authorize
# URL. The PKCE verifier + state + nonce are stashed in a short-
# lived signed cookie so the callback can verify without DB load.
# Callback at GET /sso/{tid}/callback β†’ exchange code β†’ create
# session β†’ set session cookie β†’ 302 to the customer dashboard.
# /sso/logout invalidates the session; /sso/me echoes who you are.
SSO_FLOW_COOKIE = "orgstate_sso_flow"
SSO_SESSION_COOKIE = "orgstate_sso_session"
SSO_REFRESH_COOKIE = "orgstate_sso_refresh" # Stage 111
def _sso_secret() -> bytes:
"""Cookie-signing key. Falls back to a per-process random
when unset β€” that's safe (cookies don't survive restart)
but breaks horizontal scale, so prod operators MUST set
ORGSTATE_SSO_COOKIE_SECRET on multi-replica deploys."""
import os
s = os.environ.get("ORGSTATE_SSO_COOKIE_SECRET", "")
if s:
return s.encode("utf-8")
# cache a random per-process so requests during one
# process lifetime stay coherent
if not hasattr(app.state, "_sso_secret"):
import secrets as _secrets
app.state._sso_secret = _secrets.token_bytes(32)
return app.state._sso_secret
def _sign_flow_cookie(data: dict) -> str:
"""Sign + base64 a flow-state dict for the cookie."""
import base64
import hmac as _hmac
import json as _json
payload = base64.urlsafe_b64encode(
_json.dumps(data, sort_keys=True).encode("utf-8"),
).decode("ascii")
sig = _hmac.new(
_sso_secret(), payload.encode("ascii"), "sha256",
).hexdigest()
return f"{payload}.{sig}"
def _verify_flow_cookie(raw: Optional[str]) -> Optional[dict]:
if not raw or "." not in raw:
return None
import base64
import hmac as _hmac
import json as _json
payload, _, sig = raw.rpartition(".")
expected = _hmac.new(
_sso_secret(), payload.encode("ascii"), "sha256",
).hexdigest()
if not _hmac.compare_digest(expected, sig):
return None
try:
return _json.loads(
base64.urlsafe_b64decode(payload).decode("utf-8"),
)
except Exception:
return None
def _sso_redirect_uri(request: "Request", tenant_id: str) -> str:
"""Construct the absolute callback URL from the inbound
request. Honors X-Forwarded-Proto so deployments behind
TLS terminators get https:// in the URL the IdP receives.
Stage 154: when the API is reached through a same-origin
proxy (e.g. Vercel rewriting ``orgstate.example/api/*`` to
the HF Space backend), the inbound ``host`` header is the
backend's own hostname β€” *not* the public URL the browser
sees. Building a redirect_uri from that gives Google an
off-origin callback, and the third-party cookie the
callback sets is then blocked by the browser. The escape
hatch is ``ORGSTATE_PUBLIC_BASE_URL``: when set, the
redirect_uri (and SAML ACS URL) is anchored to it instead
of the inbound host. Trailing slashes are stripped so the
URL is stable regardless of how the operator typed it.
"""
import os as _os
public_base = (
_os.environ.get("ORGSTATE_PUBLIC_BASE_URL") or ""
).rstrip("/")
if public_base:
return f"{public_base}/sso/{tenant_id}/callback"
proto = "https" if request_is_https(request) else "http"
host = request.headers.get("host", "localhost")
return f"{proto}://{host}/sso/{tenant_id}/callback"
@app.get("/sso/{tenant_id}/login", tags=["sso"])
async def sso_login(tenant_id: str, request: "Request",
provider_id: Optional[str] = Query(default=None)):
"""Start the OIDC dance. If multiple providers are
registered, the operator passes ?provider_id=…; if only
one is registered we use it implicitly."""
from ..sso import (
SSOUnavailable,
build_authorize_url,
generate_nonce,
generate_pkce_pair,
generate_state,
is_authlib_available,
)
if not is_authlib_available():
raise ApiError(
"sso_unavailable",
"OIDC SSO requires the `authlib` package β€” "
"operator must `pip install -r requirements-sso.txt`",
status=503,
)
providers = svc.list_sso_providers(tenant_id)
if not providers:
raise ApiError(
"sso_not_configured",
f"no SSO providers registered for tenant "
f"{tenant_id!r} β€” operator runs `infra sso "
f"provider create` first",
status=404,
)
if provider_id is None:
if len(providers) > 1:
raise ApiError(
"sso_provider_required",
f"multiple SSO providers exist for tenant "
f"{tenant_id!r}; pass ?provider_id=…",
status=400,
)
provider_id = providers[0]["provider_id"]
prov = svc.sso_providers.get(provider_id)
if prov is None or prov.tenant_id != tenant_id:
raise ApiError(
"sso_provider_not_found",
f"SSO provider {provider_id!r} not registered "
f"for tenant {tenant_id!r}",
status=404,
)
state = generate_state()
nonce = generate_nonce()
verifier, challenge = generate_pkce_pair()
redirect_uri = _sso_redirect_uri(request, tenant_id)
try:
authorize_url = build_authorize_url(
prov, redirect_uri,
state=state, nonce=nonce,
code_challenge=challenge,
)
except SSOUnavailable as e:
raise ApiError("sso_discovery_failed",
str(e), status=e.status)
flow = _sign_flow_cookie({
"state": state,
"nonce": nonce,
"verifier": verifier,
"provider_id": provider_id,
"tenant_id": tenant_id,
})
resp = Response(status_code=302, headers={"location": authorize_url})
resp.set_cookie(
SSO_FLOW_COOKIE, flow,
max_age=600, httponly=True, samesite="lax",
secure=request_is_https(request),
)
return resp
@app.get("/sso/{tenant_id}/callback", tags=["sso"])
async def sso_callback(
tenant_id: str, request: "Request",
code: Optional[str] = Query(default=None),
state: Optional[str] = Query(default=None),
error: Optional[str] = Query(default=None),
):
"""IdP redirects here with ?code=...&state=... or
?error=... on failure."""
from ..sso import SSOUnavailable, handle_callback, is_authlib_available
if error:
raise ApiError("sso_idp_error", f"IdP returned: {error}",
status=400)
if not code or not state:
raise ApiError("sso_callback_invalid",
"missing code or state", status=400)
flow = _verify_flow_cookie(
request.cookies.get(SSO_FLOW_COOKIE),
)
if flow is None:
raise ApiError("sso_flow_expired",
"no valid flow cookie β€” start over at "
"/sso/{tid}/login", status=400)
if flow["tenant_id"] != tenant_id:
raise ApiError("sso_tenant_mismatch",
"callback tenant does not match flow",
status=400)
if not is_authlib_available():
raise ApiError("sso_unavailable",
"authlib not installed", status=503)
import hmac as _hmac
if not _hmac.compare_digest(flow["state"], state):
raise ApiError("sso_state_mismatch",
"state does not match β€” CSRF guard",
status=400)
prov = svc.sso_providers.get(flow["provider_id"])
if prov is None:
raise ApiError("sso_provider_not_found",
"provider deleted mid-flow",
status=404)
try:
claims = handle_callback(
prov,
code=code,
redirect_uri=_sso_redirect_uri(request, tenant_id),
code_verifier=flow["verifier"],
expected_nonce=flow["nonce"],
)
except SSOUnavailable as e:
raise ApiError("sso_callback_failed",
str(e), status=e.status)
# email_verified is best-practice: skip the session if
# the IdP knows the email isn't verified. Some IdPs
# (private OPs without email verification) omit the field
# β€” default to True in that case.
if claims.get("email_verified") is False:
raise ApiError("sso_email_unverified",
"IdP says email is not verified",
status=403)
try:
login = svc.complete_sso_login(
flow["provider_id"], claims["email"],
issue_refresh=True,
)
except PermissionError as e:
raise ApiError("sso_domain_not_allowed",
str(e), status=403)
except ValueError as e:
raise ApiError("sso_provider_not_found",
str(e), status=404)
# Stage 152: post-login redirect target.
# ORGSTATE_SSO_POST_LOGIN_URL lets operator point the
# post-Google redirect at the dashboard origin (e.g.
# https://orgstate.1bigfam.com/) instead of the API
# root. Falls back to "/" so single-origin deploys
# keep working without env config.
import os as _os
post_login = _os.environ.get(
"ORGSTATE_SSO_POST_LOGIN_URL", "/",
).strip() or "/"
# Stage 152: cross-origin XHR (dashboard at a
# different origin than the API) requires
# SameSite=none + Secure. Without that, the browser
# drops the cookie on the very next XHR from the
# dashboard. Same-origin deploys still work β€” None
# is a strict-superset behavior of Lax for cookies
# over HTTPS.
cross_origin = post_login.startswith("http")
samesite_mode = "none" if cross_origin else "lax"
is_https = request_is_https(request)
# SameSite=None requires Secure per spec; force Secure
# when crossing origins. Local-dev http:// keeps the
# Lax default (cross_origin=False since post_login
# starts with "/").
cookie_secure = is_https or cross_origin
resp = Response(status_code=302, headers={"location": post_login})
resp.set_cookie(
SSO_SESSION_COOKIE, login["session_token"],
max_age=12 * 3600, httponly=True,
samesite=samesite_mode, secure=cookie_secure,
)
# Stage 111: long-lived refresh cookie + path-scoped to
# /sso/refresh so it's NOT sent on every API request
# (defense in depth β€” a leak in a non-/sso path doesn't
# leak the refresh).
if login.get("refresh_token"):
resp.set_cookie(
SSO_REFRESH_COOKIE, login["refresh_token"],
max_age=30 * 24 * 3600, httponly=True,
samesite=samesite_mode, path="/sso/refresh",
secure=cookie_secure,
)
resp.delete_cookie(SSO_FLOW_COOKIE)
return resp
# ---------- Stage 102 β€” SAML 2.0 SSO ----------
def _saml_acs_uri(request: "Request", tenant_id: str) -> str:
"""Stage 102: absolute SP ACS URL. Operator hands this to
the IdP when configuring the SP. Same host-detection
logic as OIDC redirect_uri β€” and same Stage 154 override
via ``ORGSTATE_PUBLIC_BASE_URL`` for proxied deploys."""
import os as _os
public_base = (
_os.environ.get("ORGSTATE_PUBLIC_BASE_URL") or ""
).rstrip("/")
if public_base:
return f"{public_base}/sso/{tenant_id}/saml/acs"
proto = "https" if request_is_https(request) else "http"
host = request.headers.get("host", "localhost")
return f"{proto}://{host}/sso/{tenant_id}/saml/acs"
@app.get("/sso/{tenant_id}/saml/login", tags=["sso"])
async def saml_login(
tenant_id: str, request: "Request",
provider_id: Optional[str] = Query(default=None),
):
"""SP-initiated SAML AuthnRequest. Same provider-resolution
rules as OIDC: if exactly one SAML provider exists for the
tenant we use it implicitly, else require ?provider_id="""
from infra.sso.providers import PROVIDER_TYPE_SAML
from infra.sso.saml import build_authn_request
providers = [p for p in svc.list_sso_providers(tenant_id)
if p.get("provider_type") == PROVIDER_TYPE_SAML]
if not providers:
raise ApiError(
"saml_not_configured",
f"no SAML providers registered for tenant "
f"{tenant_id!r} β€” operator runs `infra sso "
f"provider create --type saml` first",
status=404,
)
if provider_id is None:
if len(providers) > 1:
raise ApiError(
"saml_provider_required",
f"multiple SAML providers exist for tenant "
f"{tenant_id!r}; pass ?provider_id=",
status=400,
)
provider_id = providers[0]["provider_id"]
prov = svc.sso_providers.get(provider_id)
if prov is None or prov.tenant_id != tenant_id or \
prov.provider_type != PROVIDER_TYPE_SAML:
raise ApiError(
"saml_provider_not_found",
f"SAML provider {provider_id!r} not registered "
f"for tenant {tenant_id!r}",
status=404,
)
sso_url = prov.config.get("sso_url")
if not sso_url:
raise ApiError(
"saml_provider_misconfigured",
"provider missing config.sso_url β€” operator must "
"re-create with the IdP's SSO endpoint",
status=500,
)
sp_entity_id = prov.client_id # for SAML, client_id stores SP EntityID
acs_url = _saml_acs_uri(request, tenant_id)
authn = build_authn_request(
sso_url=sso_url, sp_entity_id=sp_entity_id, acs_url=acs_url,
)
flow = _sign_flow_cookie({
"saml": True,
"request_id": authn.request_id,
"relay_state": authn.relay_state,
"provider_id": provider_id,
"tenant_id": tenant_id,
})
resp = Response(status_code=302,
headers={"location": authn.url})
resp.set_cookie(
SSO_FLOW_COOKIE, flow,
max_age=600, httponly=True, samesite="lax",
secure=request_is_https(request),
)
return resp
@app.post("/sso/{tenant_id}/saml/acs", tags=["sso"])
async def saml_acs(tenant_id: str, request: "Request"):
"""Assertion Consumer Service β€” IdP POSTs the SAMLResponse
form-encoded here. We verify the assertion signature, gate
on allowed_email_domains, mint the SSO session, set cookie."""
from infra.sso.providers import PROVIDER_TYPE_SAML
from infra.sso.saml import (
SamlVerificationError,
decode_saml_response,
default_verifier,
)
flow = _verify_flow_cookie(request.cookies.get(SSO_FLOW_COOKIE))
if flow is None or not flow.get("saml"):
raise ApiError(
"saml_flow_expired",
"no valid SAML flow cookie β€” start over at "
"/sso/{tid}/saml/login", status=400,
)
if flow["tenant_id"] != tenant_id:
raise ApiError("saml_tenant_mismatch",
"ACS tenant does not match flow",
status=400)
form = await request.form()
relay_posted = form.get("RelayState")
if relay_posted and relay_posted != flow["relay_state"]:
raise ApiError("saml_relay_mismatch",
"RelayState does not match β€” CSRF guard",
status=400)
saml_b64 = form.get("SAMLResponse")
if not saml_b64:
raise ApiError("saml_response_missing",
"no SAMLResponse field in POST body",
status=400)
prov = svc.sso_providers.get(flow["provider_id"])
if prov is None or prov.provider_type != PROVIDER_TYPE_SAML:
raise ApiError("saml_provider_not_found",
"provider deleted mid-flow",
status=404)
cert = prov.config.get("x509_cert_pem")
if not cert:
raise ApiError("saml_provider_misconfigured",
"provider missing config.x509_cert_pem",
status=500)
verifier = getattr(app.state, "saml_verifier", None) \
or default_verifier()
try:
xml = decode_saml_response(saml_b64)
extracted = verifier.verify_and_extract(
xml, cert,
expected_audience=prov.client_id,
)
except SamlVerificationError:
# Never leak which check failed (signature vs audience
# vs missing NameID) β€” same secrecy as Stage 106.
raise ApiError(
"saml_invalid",
"SAML assertion verification failed",
status=401,
)
try:
login = svc.complete_sso_login(
flow["provider_id"], extracted["email"],
issue_refresh=True,
)
except PermissionError as e:
raise ApiError("sso_domain_not_allowed",
str(e), status=403)
except ValueError as e:
raise ApiError("sso_provider_not_found",
str(e), status=404)
# Stage 152: same post-login + cross-origin cookie
# handling as the OIDC callback above. SAML deployments
# behind the dashboard get the same UX as OIDC.
import os as _os
post_login = _os.environ.get(
"ORGSTATE_SSO_POST_LOGIN_URL", "/",
).strip() or "/"
cross_origin = post_login.startswith("http")
samesite_mode = "none" if cross_origin else "lax"
cookie_secure = request_is_https(request) or cross_origin
resp = Response(status_code=302, headers={"location": post_login})
resp.set_cookie(
SSO_SESSION_COOKIE, login["session_token"],
max_age=12 * 3600, httponly=True,
samesite=samesite_mode, secure=cookie_secure,
)
if login.get("refresh_token"):
resp.set_cookie(
SSO_REFRESH_COOKIE, login["refresh_token"],
max_age=30 * 24 * 3600, httponly=True,
samesite=samesite_mode, path="/sso/refresh",
secure=cookie_secure,
)
resp.delete_cookie(SSO_FLOW_COOKIE)
return resp
@app.post("/sso/refresh", tags=["sso"])
async def sso_refresh(
request: "Request",
authorization: Optional[str] = Header(default=None),
):
"""Stage 111 β€” rotate a refresh token. Returns the new
access session_token + refresh_token. The refresh token
comes from either the HttpOnly cookie (browser) or the
Authorization header (SDK; refresh tokens prefix
``Bearer-Refresh`` so they're distinguishable in logs)."""
from infra.sso import RefreshTokenReplay
raw = None
if authorization:
parts = authorization.strip().split(None, 1)
if len(parts) == 2 and parts[0].lower() in (
"bearer", "bearer-refresh",
):
raw = parts[1].strip() or None
if raw is None:
raw = request.cookies.get(SSO_REFRESH_COOKIE)
if not raw:
raise ApiError(
"sso_no_refresh",
"no refresh token in cookie or Authorization header",
status=401,
)
try:
result = svc.refresh_sso_session(raw)
except RefreshTokenReplay:
# Replay detected β€” chain is now revoked. Tell the
# client without leaking detail (don't say "replay
# detected"; an attacker could probe to confirm a
# stolen token works once).
resp_replay = JSONResponse(
status_code=401,
content={"error": {
"code": "sso_refresh_invalid",
"message": "refresh failed β€” re-authenticate",
}},
)
resp_replay.delete_cookie(SSO_SESSION_COOKIE)
resp_replay.delete_cookie(SSO_REFRESH_COOKIE,
path="/sso/refresh")
return resp_replay
if result is None:
return JSONResponse(
status_code=401,
content={"error": {
"code": "sso_refresh_invalid",
"message": "refresh failed β€” re-authenticate",
}},
)
resp = JSONResponse(
status_code=200,
content={
"session_token": result["session_token"],
"expires_at": result["expires_at"],
"refresh_expires_at": result["refresh_expires_at"],
"tenant_id": result["tenant_id"],
"user_email": result["user_email"],
},
)
resp.set_cookie(
SSO_SESSION_COOKIE, result["session_token"],
max_age=12 * 3600, httponly=True, samesite="lax",
secure=request_is_https(request),
)
resp.set_cookie(
SSO_REFRESH_COOKIE, result["refresh_token"],
max_age=30 * 24 * 3600, httponly=True,
samesite="lax", path="/sso/refresh",
secure=request_is_https(request),
)
return resp
@app.post("/sso/logout", tags=["sso"])
async def sso_logout(request: "Request",
authorization: Optional[str] = Header(default=None)):
"""Revoke the current session. Idempotent β€” returns 200
either way (avoids leaking whether a token exists).
Stage 111: also clears the refresh cookie. Doesn't
revoke the refresh chain server-side though β€” explicit
logout means "this device", not "everywhere". Operators
nuke all chains for a user via the Stage 98 CLI."""
token = _extract_session_token(request, authorization)
if token:
svc.revoke_sso_session(token)
resp = JSONResponse({"ok": True})
resp.delete_cookie(SSO_SESSION_COOKIE)
resp.delete_cookie(SSO_REFRESH_COOKIE, path="/sso/refresh")
return resp
@app.get("/sso/me", tags=["sso"])
async def sso_me(request: "Request",
authorization: Optional[str] = Header(default=None)):
"""Echo the current session β€” useful for dashboards to
determine "am I logged in?"."""
token = _extract_session_token(request, authorization)
info = svc.verify_sso_session(token) if token else None
if info is None:
raise ApiError("sso_no_session",
"no active SSO session", status=401)
return info
def _extract_session_token(request: "Request",
authorization: Optional[str]) -> Optional[str]:
"""Prefer Authorization: Bearer over cookie β€” same as
the API key path. Returns the bare token or None."""
if authorization and authorization.lower().startswith("bearer "):
return authorization[7:].strip() or None
return request.cookies.get(SSO_SESSION_COOKIE)
# Stage 106 β€” Stripe webhook receiver. Open route (no API
# key), authenticated by HMAC-SHA256 signature in the
# Stripe-Signature header against STRIPE_WEBHOOK_SECRET.
# Reads RAW body bytes β€” re-serializing parsed JSON breaks
# the signature (whitespace + key order differ from what
# Stripe signed). Returns 200 on accepted events even when
# we deliberately ignore the type, so Stripe doesn't retry
# forever.
@app.post("/webhooks/stripe", tags=["webhooks"])
async def stripe_webhook(
request: "Request",
stripe_signature: Optional[str] = Header(
default=None, alias="Stripe-Signature",
),
):
import json as _json
import os as _os
from infra.billing_stripe import (
StripeWebhookError,
handle_event,
verify_webhook_signature,
)
secret = _os.environ.get("STRIPE_WEBHOOK_SECRET", "").strip()
if not secret:
# 503 is honest β€” the operator didn't configure the
# receiver. We return scim/scim-style error envelope
# since this isn't an authenticated route.
return JSONResponse(
status_code=503,
content={"error": {
"code": "webhook_not_configured",
"message": (
"Stripe webhook receiver disabled β€” set "
"STRIPE_WEBHOOK_SECRET to enable"
),
}},
)
raw = await request.body()
try:
verify_webhook_signature(raw, stripe_signature or "", secret)
except StripeWebhookError as exc:
# Never leak WHY verification failed β€” attackers
# probing the endpoint should learn nothing useful.
return JSONResponse(
status_code=exc.status,
content={"error": {"code": "webhook_invalid",
"message": "webhook signature "
"verification failed"}},
)
try:
event = _json.loads(raw.decode("utf-8"))
except (ValueError, UnicodeDecodeError):
return JSONResponse(
status_code=400,
content={"error": {"code": "webhook_invalid",
"message": "body is not "
"valid JSON"}},
)
try:
result = handle_event(svc, event)
except StripeWebhookError as exc:
return JSONResponse(
status_code=exc.status,
content={"error": {"code": "webhook_invalid",
"message": "event payload "
"malformed"}},
)
return JSONResponse(status_code=200, content=result)
@app.post("/tenants", tags=["tenants"], status_code=201)
async def register_tenant(
body: TenantRegisterBody,
authorization: Optional[str] = Header(default=None),
):
# Open when no admin credential is configured anywhere (env var or
# DB) β€” that is the v1 dev/bootstrap workflow. Once an admin
# credential exists, this route is admin-only so randoms can't
# enumerate the platform in prod.
require_admin(authorization, svc=svc)
return handlers.register_tenant(
svc, body.model_dump(), actor=_admin_actor(authorization),
)
# --- authenticated routes -----------------------------------------
@app.get("/tenants", tags=["tenants"])
async def list_tenants(key: ApiKey = Depends(auth_dep)):
# narrowed to the caller's own tenant β€” never enumerate across.
t = handlers.get_tenant(svc, key.tenant_id)
return [t]
@app.get("/tenants/{tenant_id}", tags=["tenants"])
async def get_tenant(tenant_id: str, key: ApiKey = Depends(auth_dep)):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_READONLY)
return handlers.get_tenant(svc, tenant_id)
# Stage 34: per-tenant readiness for customer dashboards.
# /health/ready?tenant_id=X is open (ops/uptime monitors); this one
# is authenticated and returns ONLY the caller's own tenant β€” so a
# customer can wire "is my engine healthy?" into their dashboard
# without seeing anyone else's state.
@app.get("/tenants/{tenant_id}/health", tags=["tenants"])
async def tenant_health(
tenant_id: str,
max_age_days: float = Query(default=90.0),
max_gap_hours: float = Query(default=48.0),
authorization: Optional[str] = Header(default=None),
):
# tenant-or-admin: a customer dashboard uses their own key;
# an ops dashboard with the admin key can read any tenant.
key = require_tenant_or_admin(svc, authorization, tenant_id)
if key is not None: # tenant path β€” readonly enough
require_role(key, ROLE_READONLY)
status, payload = handlers.readiness(
svc, max_age_days=max_age_days,
max_gap_hours=max_gap_hours, tenant_id=tenant_id,
)
return JSONResponse(status_code=status, content=payload)
@app.get("/tenants/{tenant_id}/export", tags=["tenants"])
async def export_tenant(tenant_id: str,
key: ApiKey = Depends(auth_dep)):
# admin only: the export bundles audit logs and run history. A
# readonly key should not be able to pull the full audit trail.
require_tenant_access(key, tenant_id)
require_role(key, ROLE_ADMIN)
return handlers.export_tenant(svc, tenant_id)
# Stage 89: per-tenant usage meter. Readonly β€” a customer
# dashboard's "this month so far" widget polls here. Soft caps
# live with the data so the dashboard can also show the cap.
@app.get("/tenants/{tenant_id}/usage", tags=["tenants"])
async def get_tenant_usage(
tenant_id: str,
period: Optional[str] = Query(default=None,
description="YYYY-MM UTC bucket. Default: current month."),
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_READONLY)
return handlers.usage_summary(svc, tenant_id, period=period)
# Stage 92: per-tenant invoices (read-only on the tenant
# surface β€” generation happens via the CLI/admin path so
# customers can't trigger their own billing).
@app.get("/tenants/{tenant_id}/invoices", tags=["tenants"])
async def list_tenant_invoices(
tenant_id: str,
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_READONLY)
try:
rows = svc.list_invoices(tenant_id)
except ValueError as e:
raise ApiError("not_found", str(e), status=404)
return {"tenant_id": tenant_id, "invoices": rows,
"n": len(rows)}
# Stage 107 β€” billing preview for the customer dashboard.
# Pure dry-run over current usage; no DB writes, no audit
# so customers can poll this on every pageview without
# polluting state.
@app.get("/tenants/{tenant_id}/billing/preview",
tags=["tenants"])
async def billing_preview(
tenant_id: str,
period: Optional[str] = Query(default=None,
description="YYYY-MM UTC. Default: current month."),
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_READONLY)
try:
return svc.preview_invoice(tenant_id, period=period)
except ValueError as e:
raise ApiError("invalid_request", str(e), status=400)
except KeyError as e:
raise ApiError("not_found", str(e), status=404)
@app.post("/tenants/{tenant_id}/billing/checkout",
tags=["tenants"])
async def billing_checkout(
tenant_id: str,
body: BillingCheckoutBody,
key: ApiKey = Depends(auth_dep),
):
"""Stage 159 (Ξ²2) β€” kick off a Stripe Checkout session for a
plan upgrade. admin tier β€” only the tenant's admin can move
the plan. Returns {checkout_url, session_id, plan}; the
dashboard redirects to checkout_url.
Returns 503 when the deployment has no Stripe API key OR no
price_id env var for the requested plan, so the UI can
switch to a "contact sales" affordance gracefully.
"""
require_tenant_access(key, tenant_id)
require_role(key, ROLE_ADMIN)
from infra.billing_stripe import StripeUnavailable, create_checkout
tenant = svc.get_tenant(tenant_id)
if tenant is None:
raise ApiError("not_found",
f"unknown tenant {tenant_id!r}", status=404)
try:
return create_checkout(
tenant_id,
body.plan,
success_url=body.success_url,
cancel_url=body.cancel_url,
stripe_customer_id=tenant.get("stripe_customer_id"),
)
except StripeUnavailable as e:
raise ApiError("stripe_unavailable", str(e), status=503)
@app.get("/tenants/{tenant_id}/invoices/{period}",
tags=["tenants"])
async def get_tenant_invoice(
tenant_id: str,
period: str,
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_READONLY)
try:
row = svc.get_invoice(tenant_id, period)
except ValueError as e:
raise ApiError("not_found", str(e), status=404)
if row is None:
raise ApiError(
"not_found",
f"no invoice for tenant {tenant_id!r} period {period!r}",
status=404,
)
return row
# Stage 82 β€” GDPR right-to-erasure. PLATFORM admin only (NOT
# tenant key, even with admin role). The reason: a stolen
# tenant admin key shouldn't be able to wipe its own tenant.
# Erasure is destructive enough that we want a human at the
# platform-admin level. Require ?confirm=<tenant_id> in the
# query β€” a hand-typed double-confirmation that the caller
# really means to delete THIS tenant and not the one they
# last opened in another tab.
@app.delete("/tenants/{tenant_id}", tags=["tenants"])
async def delete_tenant_route(
tenant_id: str,
confirm: Optional[str] = Query(default=None),
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
if confirm != tenant_id:
raise ApiError(
"confirm_required",
f"DELETE /tenants/{tenant_id} requires "
f"?confirm={tenant_id} β€” the query parameter must "
f"match the path. Erasure is irreversible.",
status=400,
)
try:
result = svc.delete_tenant(
tenant_id, actor=_admin_actor(authorization),
)
except KeyError:
raise ApiError("not_found",
f"unknown tenant {tenant_id!r}",
status=404)
return result
# Stage 229 β€” per-tenant chronic threshold override.
# Three endpoints: GET (read effective), PUT (set), DELETE
# (reset to class default).
@app.get(
"/admin/tenants/{tenant_id}/forecast_cron/chronic_threshold",
tags=["admin"],
)
async def admin_get_chronic_threshold(
tenant_id: str,
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
try:
return svc.get_chronic_threshold(tenant_id)
except ValueError as e:
raise ApiError("bad_request", str(e), status=400) from e
@app.put(
"/admin/tenants/{tenant_id}/forecast_cron/chronic_threshold",
tags=["admin"],
)
async def admin_set_chronic_threshold(
tenant_id: str, body: dict,
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
min_warnings = body.get("min_warnings")
window_days = body.get("window_days")
if not isinstance(min_warnings, int):
raise ApiError(
"bad_request",
"min_warnings must be an integer", status=400,
)
if not isinstance(window_days, int):
raise ApiError(
"bad_request",
"window_days must be an integer", status=400,
)
try:
return svc.set_chronic_threshold(
tenant_id,
min_warnings=min_warnings,
window_days=window_days,
actor=_admin_actor(authorization),
)
except ValueError as e:
raise ApiError("bad_request", str(e), status=400) from e
@app.delete(
"/admin/tenants/{tenant_id}/forecast_cron/chronic_threshold",
tags=["admin"],
)
async def admin_clear_chronic_threshold(
tenant_id: str,
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
try:
return svc.clear_chronic_threshold(
tenant_id,
actor=_admin_actor(authorization),
)
except ValueError as e:
raise ApiError("bad_request", str(e), status=400) from e
# Stage 212 β€” admin trigger for the forecast-threshold cron
# sweep. Iterates every active tenant and invokes
# check_forecast_thresholds for each. The endpoint exists so
# an external cron (HF Space, GitHub Actions, systemd timer)
# can call it on a daily cadence without dropping to the CLI.
# Returns the same JSON shape as the CLI tick.
@app.post("/admin/forecast_cron/tick", tags=["admin"])
async def admin_forecast_cron_tick(
body: Optional[dict] = None,
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
body = body or {}
days_threshold = body.get("days_threshold", 7)
if not isinstance(days_threshold, int):
raise ApiError(
"bad_request",
"days_threshold must be an integer", status=400,
)
try:
return svc.run_forecast_threshold_cron(
days_threshold=days_threshold,
actor=_admin_actor(authorization),
)
except ValueError as e:
raise ApiError("bad_request", str(e), status=400) from e
# =========================================================
# Stage 115 β€” admin HTTP routes for billing-config (Stage 105)
# and plan-overrides (Stage 109). Lets a CS team self-serve
# from an admin UI without dropping to the CLI.
# =========================================================
@app.get("/admin/tenants/{tenant_id}/billing-config",
tags=["admin"])
async def admin_get_billing_config(
tenant_id: str,
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
try:
return svc.get_tenant_billing_config(tenant_id)
except ValueError as e:
raise ApiError("invalid_request", str(e), status=400)
@app.put("/admin/tenants/{tenant_id}/billing-config",
tags=["admin"])
async def admin_put_billing_config(
request: "Request",
tenant_id: str,
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
try:
body = await request.json()
except Exception: # noqa: BLE001
raise ApiError("invalid_request",
"body is not valid JSON", status=400)
if not isinstance(body, dict):
raise ApiError("invalid_request",
"body must be a JSON object", status=400)
# plan_name + stripe_customer_id are both optional in the
# service; we forward exactly what was sent so omitted
# fields stay unchanged.
kwargs = {}
if "plan_name" in body:
kwargs["plan_name"] = body["plan_name"]
if "stripe_customer_id" in body:
kwargs["stripe_customer_id"] = body["stripe_customer_id"]
try:
return svc.set_tenant_billing_config(
tenant_id,
actor=_admin_actor(authorization),
**kwargs,
)
except ValueError as e:
raise ApiError("invalid_request", str(e), status=400)
@app.get("/admin/tenants/{tenant_id}/plan-overrides",
tags=["admin"])
async def admin_list_plan_overrides(
tenant_id: str,
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
try:
return {
"tenant_id": tenant_id,
"overrides": svc.list_tenant_plan_overrides(tenant_id),
}
except ValueError as e:
raise ApiError("invalid_request", str(e), status=400)
@app.put("/admin/tenants/{tenant_id}/plan-overrides/{metric}",
tags=["admin"])
async def admin_put_plan_override(
request: "Request",
tenant_id: str,
metric: str,
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
try:
body = await request.json()
except Exception: # noqa: BLE001
raise ApiError("invalid_request",
"body is not valid JSON", status=400)
if not isinstance(body, dict):
raise ApiError("invalid_request",
"body must be a JSON object", status=400)
kwargs = {}
for f in ("free_quota", "price_cents_per_unit",
"tiers", "note"):
if f in body:
kwargs[f] = body[f]
try:
return svc.set_tenant_plan_override(
tenant_id, metric,
actor=_admin_actor(authorization),
**kwargs,
)
except ValueError as e:
raise ApiError("invalid_request", str(e), status=400)
@app.delete("/admin/tenants/{tenant_id}/plan-overrides/{metric}",
tags=["admin"])
async def admin_delete_plan_override(
tenant_id: str,
metric: str,
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
try:
ok = svc.clear_tenant_plan_override(
tenant_id, metric,
actor=_admin_actor(authorization),
)
except ValueError as e:
raise ApiError("invalid_request", str(e), status=400)
if not ok:
raise ApiError(
"not_found",
f"no override for tenant {tenant_id!r} "
f"metric {metric!r}",
status=404,
)
return {"tenant_id": tenant_id, "metric": metric,
"deleted": True}
# Stage 128 β€” per-tenant rate-limit fail-closed override.
# GET returns the current setting (None = follow process
# default). PUT writes True/False (body: {"value": bool}).
# DELETE clears the override β†’ tenant follows process default.
@app.get("/admin/tenants/{tenant_id}/rate-limit-fail-closed",
tags=["admin"])
async def admin_get_rate_limit_fail_closed(
tenant_id: str,
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
if svc.get_tenant(tenant_id) is None:
raise ApiError("not_found",
f"unknown tenant {tenant_id!r}",
status=404)
value = svc.get_tenant_rate_limit_fail_closed(tenant_id)
return {
"tenant_id": tenant_id,
"rate_limit_fail_closed": value,
"follows_process_default": value is None,
}
@app.put("/admin/tenants/{tenant_id}/rate-limit-fail-closed",
tags=["admin"])
async def admin_put_rate_limit_fail_closed(
request: "Request",
tenant_id: str,
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
try:
body = await request.json()
except Exception: # noqa: BLE001
raise ApiError("invalid_request",
"body is not valid JSON", status=400)
if not isinstance(body, dict) or "value" not in body:
raise ApiError("invalid_request",
"body must be {\"value\": true|false}",
status=400)
raw = body["value"]
if raw is None:
value: Optional[bool] = None
elif isinstance(raw, bool):
value = raw
else:
raise ApiError("invalid_request",
"value must be boolean (or null to clear "
"β€” or use DELETE)",
status=400)
try:
row = svc.set_tenant_rate_limit_fail_closed(
tenant_id, value,
actor=_admin_actor(authorization),
)
except ValueError as e:
raise ApiError("invalid_request", str(e), status=400)
return {
"tenant_id": tenant_id,
"rate_limit_fail_closed": value,
"tenant": row,
}
@app.delete("/admin/tenants/{tenant_id}/rate-limit-fail-closed",
tags=["admin"])
async def admin_delete_rate_limit_fail_closed(
tenant_id: str,
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
try:
svc.clear_tenant_rate_limit_fail_closed(
tenant_id,
actor=_admin_actor(authorization),
)
except ValueError as e:
raise ApiError("invalid_request", str(e), status=400)
return {"tenant_id": tenant_id, "cleared": True}
# Stage 129 β€” per-webhook custom backoff schedule.
# GET returns the effective schedule (override + global +
# which one is in force). PUT body: {"schedule": [int,...]}.
# DELETE drops the override β†’ webhook follows global default.
@app.get(
"/admin/webhooks/{webhook_id}/backoff-schedule",
tags=["admin"],
)
async def admin_get_webhook_backoff_schedule(
webhook_id: str,
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
wh = svc.get_webhook(webhook_id)
if wh is None:
raise ApiError("not_found",
f"unknown webhook {webhook_id!r}",
status=404)
override = svc.get_webhook_backoff_schedule(webhook_id)
global_schedule = list(
type(svc)._RETRY_BACKOFF_SECONDS,
)
effective = (override if override is not None
else global_schedule)
return {
"webhook_id": webhook_id,
"override": override,
"global_default": global_schedule,
"effective": effective,
"follows_global_default": override is None,
}
@app.put(
"/admin/webhooks/{webhook_id}/backoff-schedule",
tags=["admin"],
)
async def admin_put_webhook_backoff_schedule(
request: "Request",
webhook_id: str,
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
try:
body = await request.json()
except Exception: # noqa: BLE001
raise ApiError("invalid_request",
"body is not valid JSON", status=400)
if (not isinstance(body, dict)
or "schedule" not in body):
raise ApiError("invalid_request",
"body must be "
"{\"schedule\": [int,...]}",
status=400)
try:
row = svc.set_webhook_backoff_schedule(
webhook_id, body["schedule"],
actor=_admin_actor(authorization),
)
except ValueError as e:
raise ApiError("invalid_request", str(e), status=400)
if row is None:
raise ApiError("not_found",
f"unknown webhook {webhook_id!r}",
status=404)
return {
"webhook_id": webhook_id,
"schedule": body["schedule"],
"webhook": row,
}
@app.delete(
"/admin/webhooks/{webhook_id}/backoff-schedule",
tags=["admin"],
)
async def admin_delete_webhook_backoff_schedule(
webhook_id: str,
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
row = svc.clear_webhook_backoff_schedule(
webhook_id, actor=_admin_actor(authorization),
)
if row is None:
raise ApiError("not_found",
f"unknown webhook {webhook_id!r}",
status=404)
return {"webhook_id": webhook_id, "cleared": True}
# Stage 135 β€” per-webhook max_attempts override.
@app.get(
"/admin/webhooks/{webhook_id}/max-attempts",
tags=["admin"],
)
async def admin_get_webhook_max_attempts(
webhook_id: str,
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
wh = svc.get_webhook(webhook_id)
if wh is None:
raise ApiError("not_found",
f"unknown webhook {webhook_id!r}",
status=404)
override = svc.get_webhook_max_attempts(webhook_id)
default = type(svc)._DEFAULT_MAX_ATTEMPTS
effective = override if override is not None else default
return {
"webhook_id": webhook_id,
"override": override,
"global_default": default,
"effective": effective,
"follows_global_default": override is None,
}
@app.put(
"/admin/webhooks/{webhook_id}/max-attempts",
tags=["admin"],
)
async def admin_put_webhook_max_attempts(
request: "Request",
webhook_id: str,
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
try:
body = await request.json()
except Exception: # noqa: BLE001
raise ApiError("invalid_request",
"body is not valid JSON", status=400)
if (not isinstance(body, dict)
or "value" not in body):
raise ApiError("invalid_request",
'body must be {"value": int}',
status=400)
raw = body["value"]
# Reject bool early β€” Python's True/False are int
# subclasses; would slip through validate_max_attempts
# via the isinstance path otherwise (the helper rejects
# them too, but failing here gives a clearer 400).
if isinstance(raw, bool) or not isinstance(raw, int):
raise ApiError("invalid_request",
"value must be an integer",
status=400)
try:
row = svc.set_webhook_max_attempts(
webhook_id, raw,
actor=_admin_actor(authorization),
)
except ValueError as e:
raise ApiError("invalid_request", str(e), status=400)
if row is None:
raise ApiError("not_found",
f"unknown webhook {webhook_id!r}",
status=404)
return {
"webhook_id": webhook_id,
"max_attempts": raw,
"webhook": row,
}
@app.delete(
"/admin/webhooks/{webhook_id}/max-attempts",
tags=["admin"],
)
async def admin_delete_webhook_max_attempts(
webhook_id: str,
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
row = svc.clear_webhook_max_attempts(
webhook_id, actor=_admin_actor(authorization),
)
if row is None:
raise ApiError("not_found",
f"unknown webhook {webhook_id!r}",
status=404)
return {"webhook_id": webhook_id, "cleared": True}
# Stage 142 β€” per-webhook jitter routes.
@app.get(
"/admin/webhooks/{webhook_id}/jitter",
tags=["admin"],
)
async def admin_get_webhook_jitter(
webhook_id: str,
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
wh = svc.get_webhook(webhook_id)
if wh is None:
raise ApiError("not_found",
f"unknown webhook {webhook_id!r}",
status=404)
override = svc.get_webhook_jitter(webhook_id)
return {
"webhook_id": webhook_id,
"jitter_seconds": override,
"effective": override or 0,
"follows_global_default": override is None,
}
@app.put(
"/admin/webhooks/{webhook_id}/jitter",
tags=["admin"],
)
async def admin_put_webhook_jitter(
request: "Request",
webhook_id: str,
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
try:
body = await request.json()
except Exception: # noqa: BLE001
raise ApiError("invalid_request",
"body is not valid JSON", status=400)
if (not isinstance(body, dict)
or "value" not in body):
raise ApiError("invalid_request",
'body must be {"value": int}',
status=400)
raw = body["value"]
if isinstance(raw, bool) or not isinstance(raw, int):
raise ApiError("invalid_request",
"value must be an integer",
status=400)
try:
row = svc.set_webhook_jitter(
webhook_id, raw,
actor=_admin_actor(authorization),
)
except ValueError as e:
raise ApiError("invalid_request", str(e), status=400)
if row is None:
raise ApiError("not_found",
f"unknown webhook {webhook_id!r}",
status=404)
return {
"webhook_id": webhook_id,
"jitter_seconds": raw,
"webhook": row,
}
@app.delete(
"/admin/webhooks/{webhook_id}/jitter",
tags=["admin"],
)
async def admin_delete_webhook_jitter(
webhook_id: str,
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
row = svc.clear_webhook_jitter(
webhook_id, actor=_admin_actor(authorization),
)
if row is None:
raise ApiError("not_found",
f"unknown webhook {webhook_id!r}",
status=404)
return {"webhook_id": webhook_id, "cleared": True}
# Stage 139 β€” SSO refresh chain inspection routes.
@app.get(
"/admin/tenants/{tenant_id}/sso/refresh-chains",
tags=["admin"],
)
async def admin_list_sso_refresh_chains(
tenant_id: str,
user_email: Optional[str] = Query(default=None),
limit: int = Query(default=50, ge=1, le=500),
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
if user_email is None:
raise ApiError(
"invalid_request",
"user_email query parameter is required",
status=400,
)
try:
chains = svc.list_sso_refresh_chains(
tenant_id, user_email, limit=limit,
)
except ValueError as e:
raise ApiError("not_found", str(e), status=404)
return {
"tenant_id": tenant_id,
"user_email": user_email,
"chains": chains,
"total": len(chains),
}
@app.get(
"/admin/sso/refresh-chains/{chain_id}",
tags=["admin"],
)
async def admin_get_sso_refresh_chain(
chain_id: str,
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
summary = svc.get_sso_refresh_chain(chain_id)
if summary is None:
raise ApiError(
"not_found",
f"unknown chain {chain_id!r}",
status=404,
)
return summary
@app.delete(
"/admin/sso/refresh-chains/{chain_id}",
tags=["admin"],
)
async def admin_revoke_sso_refresh_chain(
chain_id: str,
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
summary = svc.revoke_sso_refresh_chain(
chain_id, actor=_admin_actor(authorization),
)
if summary is None:
raise ApiError(
"not_found",
f"unknown chain {chain_id!r}",
status=404,
)
return summary
@app.post("/tenants/{tenant_id}/calibrations", tags=["calibration"], status_code=201)
async def calibrate(tenant_id: str, body: CalibrateBody,
key: ApiKey = Depends(auth_dep)):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_OPERATOR)
return handlers.calibrate(svc, tenant_id, body.model_dump())
@app.get("/tenants/{tenant_id}/calibrations", tags=["calibration"])
async def list_calibrations(tenant_id: str,
key: ApiKey = Depends(auth_dep)):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_READONLY)
return handlers.list_calibrations(svc, tenant_id)
@app.post("/tenants/{tenant_id}/runs", tags=["runs"], status_code=201)
async def run_analysis(tenant_id: str, body: RunAnalysisBody,
key: ApiKey = Depends(auth_dep)):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_OPERATOR)
return handlers.run_analysis(svc, tenant_id, body.model_dump())
@app.get("/tenants/{tenant_id}/runs", tags=["runs"])
async def list_runs(tenant_id: str,
limit: int = Query(50, ge=1, le=handlers.MAX_PAGE),
offset: int = Query(0, ge=0),
entity_type: Optional[str] = Query(default=None),
since: Optional[str] = Query(default=None),
key: ApiKey = Depends(auth_dep)):
# Stage 49: ?entity_type=warehouse&since=2026-05-01T00:00:00Z
# narrows to one pipeline since a date. Defaults preserve the
# pre-Stage-49 behaviour (no filters β†’ all runs).
require_tenant_access(key, tenant_id)
require_role(key, ROLE_READONLY)
return handlers.list_runs(
svc, tenant_id, limit=limit, offset=offset,
entity_type=entity_type, since=since,
)
# Stage 50: CSV variant of the JSON list_runs. Customer compliance
# / monthly-report use case β€” pulls into Excel. Same filters as
# Stage 49 JSON, same auth, hard cap at MAX_CSV_ROWS so a
# runaway export can't block the worker.
@app.get("/tenants/{tenant_id}/runs.csv", tags=["runs"],
response_class=PlainTextResponse)
async def list_runs_csv(
tenant_id: str,
entity_type: Optional[str] = Query(default=None),
since: Optional[str] = Query(default=None),
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_READONLY)
csv_body = handlers.runs_csv(
svc, tenant_id, entity_type=entity_type, since=since,
)
# filename hints about the scope so two parallel exports
# (e.g. warehouse vs team) don't collide in Downloads.
scope = entity_type or "all"
return PlainTextResponse(
csv_body, media_type="text/csv; charset=utf-8",
headers={
"Content-Disposition":
f'attachment; filename="{tenant_id}_runs_{scope}.csv"',
},
)
# Stage 66: HTTP mirror of the Stage 65 CLI `backfill`. Customer
# SDK / dashboard widget that wants to show "how would we have
# looked over the last 6 months" calls this. Read-only, same
# auth as preview.
@app.get("/tenants/{tenant_id}/backfill", tags=["runs"])
async def backfill_route(
tenant_id: str,
entity_type: str = Query(...),
from_day: Optional[str] = Query(default=None, alias="from"),
until_day: Optional[str] = Query(default=None, alias="until"),
step_days: int = Query(default=1, ge=1),
authorization: Optional[str] = Header(default=None),
):
key = require_tenant_or_admin(svc, authorization, tenant_id)
if key is not None:
require_role(key, ROLE_READONLY)
return handlers.backfill(
svc, tenant_id, entity_type,
from_day=from_day, until_day=until_day,
step_days=step_days,
)
# Stage 67: CSV variant β€” chart-friendly wide table. Analyst
# opens in Excel, plots n_issues over time, hands the chart to
# leadership. Same auth + read-only contract as the JSON route.
@app.get("/tenants/{tenant_id}/backfill.csv", tags=["runs"],
response_class=PlainTextResponse)
async def backfill_csv_route(
tenant_id: str,
entity_type: str = Query(...),
from_day: Optional[str] = Query(default=None, alias="from"),
until_day: Optional[str] = Query(default=None, alias="until"),
step_days: int = Query(default=1, ge=1),
authorization: Optional[str] = Header(default=None),
):
key = require_tenant_or_admin(svc, authorization, tenant_id)
if key is not None:
require_role(key, ROLE_READONLY)
csv_body = handlers.backfill_csv(
svc, tenant_id, entity_type,
from_day=from_day, until_day=until_day,
step_days=step_days,
)
return PlainTextResponse(
csv_body, media_type="text/csv; charset=utf-8",
headers={
"Content-Disposition":
f'attachment; filename='
f'"{tenant_id}_backfill_{entity_type}.csv"',
},
)
# Stage 45: HTTP mirror of the CLI `run preview`. Customer SDKs /
# dashboard widgets can ask "what would the next run report?"
# without polluting the run history. Pure read β€” readonly role.
# Vertical is inferred from the tenant's registered vertical, so
# the caller only needs entity_type.
@app.get("/tenants/{tenant_id}/runs/preview", tags=["runs"])
async def preview_run_route(
tenant_id: str,
entity_type: str = Query(...),
authorization: Optional[str] = Header(default=None),
):
key = require_tenant_or_admin(svc, authorization, tenant_id)
if key is not None:
require_role(key, ROLE_READONLY)
return handlers.preview_run(svc, tenant_id, entity_type)
@app.get("/runs/{run_id}", tags=["runs"])
async def get_run(run_id: str, key: ApiKey = Depends(auth_dep)):
require_run_access(svc, key, run_id)
require_role(key, ROLE_READONLY)
return handlers.get_run(svc, run_id)
@app.get("/runs/{run_id}/issues", tags=["runs"])
async def get_run_issues(run_id: str, key: ApiKey = Depends(auth_dep)):
require_run_access(svc, key, run_id)
require_role(key, ROLE_READONLY)
return handlers.get_run_issues(svc, run_id)
# Stage 47: CSV download for non-technical consumers β€” finance
# opens it in Excel; an analyst pipes it through their existing
# pandas notebook. Same auth as the JSON variant.
@app.get("/runs/{run_id}/issues.csv", tags=["runs"],
response_class=PlainTextResponse)
async def get_run_issues_csv(run_id: str,
key: ApiKey = Depends(auth_dep)):
require_run_access(svc, key, run_id)
require_role(key, ROLE_READONLY)
csv_body = handlers.issues_csv(svc, run_id)
# text/csv with utf-8 β€” the canonical content-type for CSV
# downloads, and what browsers/Excel/Sheets expect.
return PlainTextResponse(
csv_body, media_type="text/csv; charset=utf-8",
headers={
"Content-Disposition":
f'attachment; filename="run_{run_id}_issues.csv"',
},
)
# Stage 48: CSV download for decisions β€” same pattern as
# /runs/{rid}/issues.csv (Stage 47), but combines snapshot fields
# with the triage state (status/owner/notes) so management sees
# the actionable picture in one sheet.
@app.get("/runs/{run_id}/decisions.csv", tags=["runs"],
response_class=PlainTextResponse)
async def get_run_decisions_csv(run_id: str,
key: ApiKey = Depends(auth_dep)):
require_run_access(svc, key, run_id)
require_role(key, ROLE_READONLY)
csv_body = handlers.decisions_csv(svc, run_id)
return PlainTextResponse(
csv_body, media_type="text/csv; charset=utf-8",
headers={
"Content-Disposition":
f'attachment; filename="run_{run_id}_decisions.csv"',
},
)
@app.get("/runs/{run_id}/decisions", tags=["runs"])
async def get_run_decisions(run_id: str, key: ApiKey = Depends(auth_dep)):
require_run_access(svc, key, run_id)
require_role(key, ROLE_READONLY)
return handlers.get_run_decisions(svc, run_id)
# Stage 173 β€” everything for the entity drilldown page in one
# round-trip. Compose-it-yourself callers can still hit the
# underlying /history, /decisions, /mutes routes individually.
@app.get("/tenants/{tenant_id}/entities/{entity_id}/drilldown",
tags=["runs"])
async def get_entity_drilldown_route(
tenant_id: str,
entity_id: str,
history_limit: int = Query(default=20, ge=1, le=100),
decision_limit: int = Query(default=50, ge=1, le=200),
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_READONLY)
return svc.get_entity_drilldown(
tenant_id, entity_id,
history_limit=history_limit,
decision_limit=decision_limit,
)
# Stage 193 β€” drift forecast. "If the recent trajectory keeps
# going, what severity will this entity be at in N days?"
# Readonly tier β€” pure projection from existing run history.
@app.get("/tenants/{tenant_id}/entities/{entity_id}/forecast",
tags=["runs"])
async def get_entity_forecast_route(
tenant_id: str,
entity_id: str,
horizon_days: int = Query(default=7, ge=1, le=90),
history_limit: int = Query(default=14, ge=2, le=100),
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_READONLY)
try:
return svc.get_entity_forecast(
tenant_id, entity_id,
horizon_days=horizon_days,
history_limit=history_limit,
)
except ValueError as e:
raise ApiError("bad_request", str(e), status=400) from e
# Stage 195 β€” bulk forecast. Decision Queue uses this to render
# projected-severity badges on every card without N round-trips.
# POST (vs GET) so the entity_id list rides in the body and we
# don't bump into URL-length caps for tenants with many entities.
# Stage 226 β€” top "chronic oscillator" entities for the
# tenant: those with the most threshold_crossed events in
# the recent window. Reports page renders the top 10 for
# monthly review + root-cause prioritization.
@app.get("/tenants/{tenant_id}/forecast/top_oscillators",
tags=["runs"])
async def get_top_oscillators_route(
tenant_id: str,
days: int = Query(30, ge=1, le=365),
limit: int = Query(10, ge=1, le=50),
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_READONLY)
try:
return {
"entities": svc.list_top_threshold_oscillators(
tenant_id, days=days, limit=limit,
),
"days": days,
}
except ValueError as e:
raise ApiError("bad_request", str(e), status=400) from e
# Stage 225 β€” chronological warn/clear timeline for one
# entity. Powers the EntityDrilldown panel that shows the
# "oscillation history" β€” warn β†’ clear β†’ warn-again β€” useful
# for distinguishing chronic vs one-off thresholds.
@app.get(
"/tenants/{tenant_id}/entities/{entity_id}/forecast/threshold_events",
tags=["runs"],
)
async def get_entity_threshold_events_route(
tenant_id: str, entity_id: str,
limit: int = Query(50, ge=1, le=200),
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_READONLY)
try:
return {
"events": svc.list_entity_threshold_events(
tenant_id, entity_id, limit=limit,
),
}
except ValueError as e:
raise ApiError("bad_request", str(e), status=400) from e
# Stage 232 β€” actually POST the sample payload to an operator-
# provided URL. Lets them end-to-end test Slack/Teams formatting
# against their real channel before subscribing to live events.
# Operator role required (this fires HTTP from our server).
@app.post(
"/tenants/{tenant_id}/webhooks/event_samples/send",
tags=["webhooks"],
)
async def post_webhook_event_sample_send_route(
tenant_id: str, body: dict,
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_OPERATOR)
event = body.get("event")
url = body.get("url")
secret = body.get("secret")
if not isinstance(event, str):
raise ApiError(
"bad_request",
"body must include 'event' as a string",
status=400,
)
if not isinstance(url, str):
raise ApiError(
"bad_request",
"body must include 'url' as a string",
status=400,
)
if secret is not None and not isinstance(secret, str):
raise ApiError(
"bad_request",
"'secret' must be a string if provided",
status=400,
)
try:
return svc.send_sample_payload(
tenant_id, event=event, url=url, secret=secret,
)
except ValueError as e:
raise ApiError("bad_request", str(e), status=400) from e
# Stage 231 β€” sample webhook payload preview. Operators use
# this to verify Slack/Teams channel formatting before
# subscribing to a real event. Read-only role.
@app.get(
"/tenants/{tenant_id}/webhooks/event_samples",
tags=["webhooks"],
)
async def get_webhook_event_sample_route(
tenant_id: str,
event: str = Query(...),
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_READONLY)
try:
return {
"payload": svc.sample_webhook_payload(event, tenant_id),
"event": event,
}
except ValueError as e:
raise ApiError("bad_request", str(e), status=400) from e
# Stage 224 β€” last forecast_cron_tick for the tenant. Powers
# the DecisionQueue header tile so an operator sees "the
# overnight sweep ran 6 hours ago, found 2 new warnings"
# without leaving the page. Returns 204-equivalent (JSON null)
# when no tick has run yet.
@app.get("/tenants/{tenant_id}/forecast/last_cron_tick",
tags=["runs"])
async def get_last_cron_tick_route(
tenant_id: str,
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_READONLY)
out = svc.get_last_forecast_cron_tick(tenant_id)
return {"tick": out}
# Stage 223 β€” sibling to Stage 220's recent_warnings: entities
# that fired forecast.threshold_cleared in the recent window.
# Used by DecisionQueue to mark cards that recovered.
@app.get("/tenants/{tenant_id}/forecast/recent_clearances",
tags=["runs"])
async def get_recent_threshold_clearances_route(
tenant_id: str,
hours: int = Query(24, ge=1, le=720),
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_READONLY)
try:
return svc.list_recent_threshold_clearances(
tenant_id, hours=hours,
)
except ValueError as e:
raise ApiError("bad_request", str(e), status=400) from e
# Stage 235 β€” operator-role chronic threshold endpoints.
# Mirror of the admin trio (Stage 229) but tenant-scoped and
# without the X-Admin-Key requirement so the Settings page
# can configure the threshold using the existing API-key
# auth. The admin trio stays as-is for CS team / cross-tenant
# ops. Operator-role gate matches the authority needed to
# change anything else that affects this tenant's behavior.
@app.get(
"/tenants/{tenant_id}/forecast_cron/chronic_threshold",
tags=["runs"],
)
async def get_tenant_chronic_threshold(
tenant_id: str,
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_READONLY)
try:
return svc.get_chronic_threshold(tenant_id)
except ValueError as e:
raise ApiError("bad_request", str(e), status=400) from e
@app.put(
"/tenants/{tenant_id}/forecast_cron/chronic_threshold",
tags=["runs"],
)
async def put_tenant_chronic_threshold(
tenant_id: str, body: dict,
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_OPERATOR)
min_warnings = body.get("min_warnings")
window_days = body.get("window_days")
if not isinstance(min_warnings, int):
raise ApiError(
"bad_request",
"min_warnings must be an integer", status=400,
)
if not isinstance(window_days, int):
raise ApiError(
"bad_request",
"window_days must be an integer", status=400,
)
try:
return svc.set_chronic_threshold(
tenant_id,
min_warnings=min_warnings,
window_days=window_days,
actor=key.key_id,
)
except ValueError as e:
raise ApiError("bad_request", str(e), status=400) from e
@app.delete(
"/tenants/{tenant_id}/forecast_cron/chronic_threshold",
tags=["runs"],
)
async def delete_tenant_chronic_threshold(
tenant_id: str,
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_OPERATOR)
try:
return svc.clear_chronic_threshold(
tenant_id, actor=key.key_id,
)
except ValueError as e:
raise ApiError("bad_request", str(e), status=400) from e
# Stage 236 β€” tenant-scoped days_threshold endpoints.
# Mirrors the chronic_threshold trio (Stage 235) and the
# forecast_cron config CLI pattern.
@app.get(
"/tenants/{tenant_id}/forecast_cron/days_threshold",
tags=["runs"],
)
async def get_tenant_days_threshold(
tenant_id: str,
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_READONLY)
try:
return {"days": svc.get_days_threshold(tenant_id)}
except ValueError as e:
raise ApiError("bad_request", str(e), status=400) from e
@app.put(
"/tenants/{tenant_id}/forecast_cron/days_threshold",
tags=["runs"],
)
async def put_tenant_days_threshold(
tenant_id: str, body: dict,
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_OPERATOR)
days = body.get("days")
if not isinstance(days, int):
raise ApiError(
"bad_request",
"days must be an integer", status=400,
)
try:
return {
"days": svc.set_days_threshold(
tenant_id, days=days, actor=key.key_id,
),
}
except ValueError as e:
raise ApiError("bad_request", str(e), status=400) from e
@app.delete(
"/tenants/{tenant_id}/forecast_cron/days_threshold",
tags=["runs"],
)
async def delete_tenant_days_threshold(
tenant_id: str,
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_OPERATOR)
try:
return {
"days": svc.clear_days_threshold(
tenant_id, actor=key.key_id,
),
}
except ValueError as e:
raise ApiError("bad_request", str(e), status=400) from e
# Stage 233 β€” forecast event mutes. POST creates, GET lists
# active, DELETE removes. Operator role for write; readonly
# for the list (operators want to see the mute banner without
# being able to mute themselves).
@app.post("/tenants/{tenant_id}/forecast/mutes", tags=["runs"])
async def post_forecast_mute_route(
tenant_id: str, body: dict,
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_OPERATOR)
entity_id = body.get("entity_id")
days = body.get("days")
reason = body.get("reason")
if not isinstance(entity_id, str):
raise ApiError(
"bad_request",
"body must include 'entity_id' as a string",
status=400,
)
if not isinstance(days, int):
raise ApiError(
"bad_request",
"body must include 'days' as an integer",
status=400,
)
if reason is not None and not isinstance(reason, str):
raise ApiError(
"bad_request",
"'reason' must be a string if provided",
status=400,
)
try:
return svc.mute_forecast(
tenant_id, entity_id,
days=days, reason=reason,
actor=key.key_id,
)
except ValueError as e:
raise ApiError("bad_request", str(e), status=400) from e
@app.get("/tenants/{tenant_id}/forecast/mutes", tags=["runs"])
async def get_forecast_mutes_route(
tenant_id: str,
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_READONLY)
try:
return {
"mutes": svc.list_active_forecast_mutes(tenant_id),
}
except ValueError as e:
raise ApiError("bad_request", str(e), status=400) from e
@app.delete(
"/tenants/{tenant_id}/forecast/mutes/{mute_id}",
tags=["runs"], status_code=204,
)
async def delete_forecast_mute_route(
tenant_id: str, mute_id: str,
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_OPERATOR)
ok = svc.unmute_forecast(mute_id, actor=key.key_id)
if not ok:
raise ApiError(
"not_found",
f"unknown mute {mute_id!r}", status=404,
)
return None
# Stage 220 β€” list of entity_ids that fired
# forecast.threshold_crossed in the recent window. Used by
# DecisionQueue to mark cards under an active warning.
# Read-only role β€” counts as informational dashboard surface
# (the audit-log endpoint itself is admin-only by design;
# this is a narrowed projection).
@app.get("/tenants/{tenant_id}/forecast/recent_warnings",
tags=["runs"])
async def get_recent_threshold_warnings_route(
tenant_id: str,
hours: int = Query(24, ge=1, le=720),
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_READONLY)
try:
return svc.list_recent_threshold_warnings(
tenant_id, hours=hours,
)
except ValueError as e:
raise ApiError("bad_request", str(e), status=400) from e
# Stage 216 β€” backward-looking forecast accuracy per entity.
# Read-only; cheap (one query + N synthetic forecasts on the
# already-loaded history). Used by the EntityDrilldown
# "Forecast accuracy" panel.
@app.get("/tenants/{tenant_id}/entities/{entity_id}/forecast/accuracy",
tags=["runs"])
async def get_entity_forecast_accuracy_route(
tenant_id: str, entity_id: str,
horizon_days: int = Query(7, ge=1, le=90),
max_samples: int = Query(10, ge=1, le=100),
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_READONLY)
try:
return svc.get_forecast_accuracy(
tenant_id, entity_id,
horizon_days=horizon_days,
max_samples=max_samples,
)
except ValueError as e:
raise ApiError("bad_request", str(e), status=400) from e
# Stage 204 β€” tenant-wide forecast aggregate for the TopBar
# trajectory chip. GET (no body) β€” small, cacheable response
# shape. Read-only role suffices.
@app.get("/tenants/{tenant_id}/forecast/summary", tags=["runs"])
async def get_forecast_summary_route(
tenant_id: str,
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_READONLY)
try:
return svc.get_forecast_summary(tenant_id)
except ValueError as e:
raise ApiError("bad_request", str(e), status=400) from e
# Stage 211 β€” manual trigger for the forecast-threshold scan.
# Admin role required: this fires webhooks + writes audit rows,
# so the operator running it should have the same authority as
# someone who'd configure a cron. The intended runtime is a
# daily cron picking this up; for now the dashboard / curl is
# the trigger.
@app.post("/tenants/{tenant_id}/forecast/check_thresholds",
tags=["runs"])
async def post_forecast_check_thresholds_route(
tenant_id: str, body: dict,
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_ADMIN)
days_threshold = body.get("days_threshold", 7)
if not isinstance(days_threshold, int):
raise ApiError(
"bad_request",
"days_threshold must be an integer", status=400,
)
try:
return svc.check_forecast_thresholds(
tenant_id, days_threshold=days_threshold,
actor=key.key_id,
)
except ValueError as e:
raise ApiError("bad_request", str(e), status=400) from e
@app.post("/tenants/{tenant_id}/forecasts/batch", tags=["runs"])
async def post_forecasts_batch_route(
tenant_id: str,
body: dict,
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_READONLY)
entity_ids = body.get("entity_ids")
if not isinstance(entity_ids, list):
raise ApiError("bad_request",
"body must include 'entity_ids' "
"(list of strings)",
status=400)
for e in entity_ids:
if not isinstance(e, str):
raise ApiError("bad_request",
"every entity_id must be a string",
status=400)
horizon = body.get("horizon_days", 7)
if not isinstance(horizon, int):
raise ApiError("bad_request",
"horizon_days must be an integer",
status=400)
try:
forecasts = svc.get_forecasts_batch(
tenant_id, entity_ids, horizon_days=horizon,
)
except ValueError as e:
raise ApiError("bad_request", str(e),
status=400) from e
return {"forecasts": forecasts}
# Stage 192 β€” peer benchmarking. How does this entity rank
# against other entities of the same entity_type within the
# tenant? Per-metric percentile + median + IQR + range. Empty
# metrics list when peers are missing β€” the dashboard panel
# just hides in that case.
@app.get("/tenants/{tenant_id}/entities/{entity_id}/peers",
tags=["runs"])
async def get_entity_peers_route(
tenant_id: str,
entity_id: str,
entity_type: Optional[str] = Query(default=None),
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_READONLY)
return svc.get_entity_peer_comparison(
tenant_id, entity_id,
entity_type=entity_type,
)
@app.get("/tenants/{tenant_id}/entities/{entity_id}/history",
tags=["runs"])
async def get_entity_history(
tenant_id: str,
entity_id: str,
limit: int = Query(default=10, ge=1, le=100),
key: ApiKey = Depends(auth_dep),
):
"""Stage 157 β€” score timeline for a single entity across the
tenant's runs, most-recent first. Powers the DecisionCard
sparkline. Read access (readonly tier or higher)."""
require_tenant_access(key, tenant_id)
require_role(key, ROLE_READONLY)
return {
"tenant_id": tenant_id,
"entity_id": entity_id,
"history": svc.entity_score_history(
tenant_id, entity_id, limit=limit,
),
}
@app.patch("/runs/{run_id}/decisions/{decision_id}", tags=["runs"])
async def patch_run_decision(
run_id: str,
decision_id: str,
body: DecisionPatchBody,
key: ApiKey = Depends(auth_dep),
):
"""Stage 156: triage a single decision (status / owner /
notes). operator role minimum β€” readonly users can read the
queue but can't change it."""
require_run_access(svc, key, run_id)
require_role(key, ROLE_OPERATOR)
return handlers.patch_decision(
svc, run_id, decision_id, body.model_dump(exclude_unset=True),
)
# Stage 198 β€” bulk decision PATCH. POST so the updates list
# rides in the body (URL would balloon for ~30 decisions).
# Validates membership on every distinct run_id touched so a
# cross-tenant request can't sneak through under a single
# require_tenant_access. Fires one decisions.bulk_changed
# aggregate event covering the successful subset; per-decision
# decision.status_changed events still fire from update_decision
# so opt-in noise reduction is the operator's choice on the
# Webhooks page.
@app.post("/tenants/{tenant_id}/decisions/bulk", tags=["runs"])
async def post_bulk_update_decisions(
tenant_id: str, body: dict,
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_OPERATOR)
updates = body.get("updates")
if not isinstance(updates, list):
raise ApiError(
"bad_request",
"body must include 'updates' (list of objects)",
status=400,
)
# Cross-run access check β€” every distinct run_id named in
# the body must belong to the tenant the caller is acting
# in. require_tenant_access above already covered the
# tenant-key match; this guards against a fabricated
# run_id pointing at someone else's data.
seen_runs: set = set()
for i, u in enumerate(updates):
if not isinstance(u, dict):
raise ApiError(
"bad_request",
f"updates[{i}] must be an object", status=400,
)
rid = u.get("run_id")
if not isinstance(rid, str):
raise ApiError(
"bad_request",
f"updates[{i}].run_id must be a string",
status=400,
)
if rid in seen_runs:
continue
seen_runs.add(rid)
run_row = svc.runs.get(rid)
if run_row is None or run_row["tenant_id"] != tenant_id:
raise ApiError(
"not_found",
f"run {rid} not found in tenant {tenant_id}",
status=404,
)
try:
return svc.bulk_update_decisions(
tenant_id, updates,
actor=key.key_id,
)
except ValueError as e:
raise ApiError("bad_request", str(e), status=400) from e
@app.post("/runs/{run_id}/decisions/{decision_id}/notify",
tags=["runs"])
async def notify_decision(
run_id: str,
decision_id: str,
key: ApiKey = Depends(auth_dep),
):
"""Stage 157 (Ξ±4) β€” fan out a decision to every enabled
webhook on the tenant under event ``decision.task_requested``.
Use case: customer's Zapier / Make / n8n / custom endpoint
receives the payload and files a Jira / Linear / Asana
ticket. Operator-tier; readonly can read but not fire.
Payload shape (stable contract for downstream integrations):
event: "decision.task_requested"
tenant_id: "..."
decision: {decision_id, urgency, recommendation, ...}
issue: {issue_id, entity_id, severity, score, ...}
evidence: [{label, summary, confidence, ...}, ...]
run_id: the run that produced the decision
triggered_by: actor (key_id / sso:email) and timestamp
Returns {delivered: N, results: [...]} mirroring
svc.deliver_webhooks. N=0 with `no_webhooks_configured` so
the dashboard can show "configure webhooks first" guidance.
"""
run = require_run_access(svc, key, run_id)
require_role(key, ROLE_OPERATOR)
tenant_id = run["tenant_id"]
# Resolve decision + matching issue. The decision_json carries
# the full snapshot; the column-side triage state is folded
# in too (Stage 156).
decisions = svc.get_run_decisions(run_id)
decision = next((d for d in decisions
if d.get("decision_id") == decision_id), None)
if decision is None:
from .errors import ApiError
raise ApiError("not_found",
f"decision {decision_id!r} not found on run {run_id!r}",
status=404)
issues = svc.get_run_issues(run_id)
issue = next((i for i in issues
if i.get("issue_id") == decision.get("issue_id")), None)
from datetime import datetime, timezone
payload = {
"event": "decision.task_requested",
"tenant_id": tenant_id,
"run_id": run_id,
"decision": decision,
"issue": issue,
"evidence": (issue or {}).get("evidence", []),
"triggered_by": {
"actor": key.key_id,
"actor_role": key.role,
"at": datetime.now(timezone.utc).isoformat(),
},
}
results = svc.deliver_webhooks(tenant_id, payload)
return {
"delivered": len(results),
"results": [
{"webhook_id": r.get("webhook_id"),
"status": r.get("status"),
"status_code": r.get("status_code"),
"error": r.get("error")}
for r in results
],
}
@app.get("/runs/{run_id}/report.pdf", tags=["runs"])
async def get_run_report_pdf(run_id: str,
key: ApiKey = Depends(auth_dep)):
require_run_access(svc, key, run_id)
require_role(key, ROLE_READONLY)
from delivery.reports import (
build_customer_report_from_service,
render_pdf,
)
report = build_customer_report_from_service(svc, run_id)
if report is None: # pragma: no cover
raise ApiError("not_found", f"run {run_id!r} not found",
status=404)
try:
pdf_bytes = render_pdf(report)
except RuntimeError as e:
# fpdf2 isn't installed in this runtime
raise ApiError("pdf_unavailable", str(e), status=503)
return Response(
content=pdf_bytes,
media_type="application/pdf",
headers={
"Content-Disposition":
f'attachment; filename="orgstate_{run_id}.pdf"',
},
)
# --- Stage 185: email digest previews ------------------------------
# Render each of the four email templates against live tenant data
# so the dashboard's Reports page can offer a preview/download per
# template. No email is sent β€” the customer's ESP (or a future
# OrgState relay) handles delivery. Read-only role.
from fastapi.responses import HTMLResponse as _HTMLResponse
@app.get("/tenants/{tenant_id}/emails/welcome",
tags=["emails"], response_class=_HTMLResponse)
async def email_welcome(tenant_id: str,
key: ApiKey = Depends(auth_dep)):
from delivery.emails import render_welcome
require_tenant_access(key, tenant_id)
require_role(key, ROLE_READONLY)
t = svc.get_tenant(tenant_id)
if t is None:
raise ApiError("not_found",
f"tenant {tenant_id!r} not found",
status=404)
return _HTMLResponse(render_welcome(tenant_name=t["name"]))
@app.get("/tenants/{tenant_id}/emails/trial_expiring",
tags=["emails"], response_class=_HTMLResponse)
async def email_trial_expiring(
tenant_id: str,
days_left: int = Query(default=3, ge=0, le=30),
key: ApiKey = Depends(auth_dep),
):
from datetime import datetime, timedelta, timezone
from delivery.emails import render_trial_expiring
require_tenant_access(key, tenant_id)
require_role(key, ROLE_READONLY)
t = svc.get_tenant(tenant_id)
if t is None:
raise ApiError("not_found",
f"tenant {tenant_id!r} not found",
status=404)
decisions = svc.list_decisions(tenant_id, status="open",
limit=200)
n_open = len(decisions)
n_crit = sum(1 for d in decisions
if (d.get("severity") or "").lower() == "critical")
trial_ends = (
datetime.now(timezone.utc) + timedelta(days=days_left)
).date().isoformat()
return _HTMLResponse(render_trial_expiring(
tenant_name=t["name"], days_left=days_left,
n_open_issues=n_open, n_critical=n_crit,
trial_ends_iso=trial_ends,
))
@app.get("/tenants/{tenant_id}/emails/daily_digest",
tags=["emails"], response_class=_HTMLResponse)
async def email_daily_digest(tenant_id: str,
key: ApiKey = Depends(auth_dep)):
from datetime import datetime, timezone
from delivery.emails import render_daily_digest
require_tenant_access(key, tenant_id)
require_role(key, ROLE_READONLY)
t = svc.get_tenant(tenant_id)
if t is None:
raise ApiError("not_found",
f"tenant {tenant_id!r} not found",
status=404)
decisions = svc.list_decisions(tenant_id, status="open",
limit=50)
issues = []
for d in decisions[:5]:
issues.append({
"entity_id": d.get("entity_id"),
"entity_type": d.get("entity_type"),
"severity": d.get("severity"),
"score": d.get("score"),
"urgency": d.get("urgency"),
})
date_label = datetime.now(timezone.utc).strftime("%d.%m")
return _HTMLResponse(render_daily_digest(
tenant_name=t["name"], date_label=date_label,
issues=issues,
))
@app.get("/tenants/{tenant_id}/emails/weekly_executive",
tags=["emails"], response_class=_HTMLResponse)
async def email_weekly_executive(
tenant_id: str,
key: ApiKey = Depends(auth_dep),
):
from datetime import datetime, timezone
from delivery.emails import render_weekly_executive
require_tenant_access(key, tenant_id)
require_role(key, ROLE_READONLY)
t = svc.get_tenant(tenant_id)
if t is None:
raise ApiError("not_found",
f"tenant {tenant_id!r} not found",
status=404)
decisions = svc.list_decisions(tenant_id, status="open",
limit=200)
n_open = len(decisions)
n_crit = sum(1 for d in decisions
if (d.get("severity") or "").lower() == "critical")
n_high = sum(1 for d in decisions
if (d.get("severity") or "").lower() == "high")
# average confidence across open decisions (0-100 int)
confs = [float(d["confidence"]) for d in decisions
if d.get("confidence") is not None]
avg_conf = int(round(sum(confs) / len(confs) * 100)) if confs else 0
week_label = datetime.now(timezone.utc).strftime("יום Χ‘Χ³ %H:%M")
return _HTMLResponse(render_weekly_executive(
tenant_name=t["name"], week_label=week_label,
n_open=n_open, n_critical=n_crit, n_high=n_high,
confidence_pct=avg_conf,
))
# --- Stage 191: actually send email via the configured ESP -----
# Same 4 templates as Stage 185 but POST instead of GET, with
# {to} in the body, dispatching through infra.email_transport.
# Operator role (sending email is an outbound side-effect, not a
# read).
def _render_email(svc_, tenant_id_: str, template: str) -> tuple:
"""Render (subject, html) for the named template using
live tenant data. Shared by all four send endpoints + the
ad-hoc send for arbitrary template ids returns a clean 400."""
from datetime import datetime, timedelta, timezone
from delivery.emails import (
render_daily_digest, render_trial_expiring,
render_weekly_executive, render_welcome,
)
t = svc_.get_tenant(tenant_id_)
if t is None:
raise ApiError("not_found",
f"tenant {tenant_id_!r} not found",
status=404)
if template == "welcome":
return ("ברוכים הבאים Χœβ€‘OrgState",
render_welcome(tenant_name=t["name"]))
if template == "trial_expiring":
decisions = svc_.list_decisions(
tenant_id_, status="open", limit=200)
n_open = len(decisions)
n_crit = sum(1 for d in decisions
if (d.get("severity") or "").lower() == "critical")
trial_ends = (
datetime.now(timezone.utc) + timedelta(days=3)
).date().isoformat()
return ("Χ”Χ Χ™Χ‘Χ™Χ•ΧŸ שלך מבΧͺיים Χ‘Χ’Χ•Χ“ 3 Χ™ΧžΧ™Χ",
render_trial_expiring(
tenant_name=t["name"], days_left=3,
n_open_issues=n_open, n_critical=n_crit,
trial_ends_iso=trial_ends,
))
if template == "daily_digest":
decisions = svc_.list_decisions(
tenant_id_, status="open", limit=50)
issues = [{
"entity_id": d.get("entity_id"),
"entity_type": d.get("entity_type"),
"severity": d.get("severity"),
"score": d.get("score"),
"urgency": d.get("urgency"),
} for d in decisions[:5]]
date_label = datetime.now(timezone.utc).strftime("%d.%m")
return (f"OrgState Β· ביכום Χ™Χ•ΧžΧ™ Β· {date_label}",
render_daily_digest(
tenant_name=t["name"],
date_label=date_label,
issues=issues,
))
if template == "weekly_executive":
decisions = svc_.list_decisions(
tenant_id_, status="open", limit=200)
n_open = len(decisions)
n_crit = sum(1 for d in decisions
if (d.get("severity") or "").lower() == "critical")
n_high = sum(1 for d in decisions
if (d.get("severity") or "").lower() == "high")
confs = [float(d["confidence"]) for d in decisions
if d.get("confidence") is not None]
avg_conf = (int(round(sum(confs) / len(confs) * 100))
if confs else 0)
week_label = datetime.now(timezone.utc).strftime(
"יום Χ‘Χ³ %H:%M")
return (f"OrgState Β· ביכום Χ©Χ‘Χ•Χ’Χ™ ΧœΧ”Χ Χ”ΧœΧ”",
render_weekly_executive(
tenant_name=t["name"],
week_label=week_label,
n_open=n_open, n_critical=n_crit,
n_high=n_high, confidence_pct=avg_conf,
))
raise ApiError("bad_request",
f"unknown email template {template!r}; "
"valid: welcome, trial_expiring, "
"daily_digest, weekly_executive",
status=400)
@app.post("/tenants/{tenant_id}/emails/{template}/send",
tags=["emails"])
async def send_email_route(
tenant_id: str,
template: str,
body: dict,
authorization: Optional[str] = Header(default=None),
):
from infra.email_transport import EmailSendError
key = require_tenant_or_admin(svc, authorization, tenant_id)
if key is not None:
require_role(key, ROLE_OPERATOR)
to = body.get("to")
if not isinstance(to, str) or not to:
raise ApiError("bad_request",
"body must include 'to' (string email)",
status=400)
subject, html = _render_email(svc, tenant_id, template)
try:
return svc.send_tenant_email(
tenant_id,
to=to, subject=subject, html=html,
template=template,
actor=_tenant_or_admin_actor(authorization, tenant_id),
)
except EmailSendError as e:
# Surface ESP-side errors with the right status. Misconfig
# = 503 (operator action required); invalid recipient = 400;
# upstream / unknown transport = 502 / 500.
status = (503 if e.code == "misconfigured"
else 400 if e.code == "invalid_recipient"
else 502 if e.code == "upstream"
else 500)
raise ApiError("email_send_failed", str(e),
status=status) from e
@app.get("/admin/email/transport", tags=["emails"])
async def email_transport_status(
authorization: Optional[str] = Header(default=None),
):
"""Stage 191 β€” small diagnostics: which transport is active?
Useful for CS during a "did the trial-expiring email actually
go out?" investigation. Admin-only because it reads server
config."""
import os as _os
require_admin(authorization, svc=svc)
from infra.email_transport import active_transport_name
name = active_transport_name()
return {
"transport": name,
"is_dry_run": name == "dry_run",
"from_addr": _os.environ.get("ORGSTATE_EMAIL_FROM",
"OrgState <noreply@orgstate.local>"),
}
# --- api key management (Stage 5b) --------------------------------
# Admin OR a tenant key bound to the path tenant β€” so an admin can
# rotate keys for any tenant, and a tenant can manage its own.
# --- Stage 170: per-entity mute / snooze --------------------------
@app.post("/tenants/{tenant_id}/mutes", tags=["mutes"],
status_code=201)
async def create_mute_route(
tenant_id: str,
body: dict,
authorization: Optional[str] = Header(default=None),
):
key = require_tenant_or_admin(svc, authorization, tenant_id)
if key is not None:
require_role(key, ROLE_OPERATOR)
entity_id = body.get("entity_id")
if not isinstance(entity_id, str) or not entity_id:
raise ApiError(
"bad_request",
"body needs 'entity_id' (string)",
status=400,
)
days = body.get("days", 7)
if not isinstance(days, int):
raise ApiError(
"bad_request",
"body 'days' must be an integer",
status=400,
)
try:
return svc.mute_entity(
tenant_id, entity_id,
days=days,
reason=body.get("reason"),
actor=_tenant_or_admin_actor(authorization, tenant_id),
)
except ValueError as e:
raise ApiError("bad_request", str(e), status=400) from e
@app.get("/tenants/{tenant_id}/mutes", tags=["mutes"])
async def list_mutes_route(
tenant_id: str,
authorization: Optional[str] = Header(default=None),
):
require_tenant_or_admin(svc, authorization, tenant_id)
return {"mutes": svc.list_active_mutes(tenant_id)}
@app.delete("/mutes/{mute_id}", tags=["mutes"], status_code=204)
async def delete_mute_route(
mute_id: str,
authorization: Optional[str] = Header(default=None),
):
existing = svc.mutes.get(mute_id)
if existing is None:
raise ApiError("not_found",
f"unknown mute {mute_id!r}", status=404)
key = require_tenant_or_admin(svc, authorization,
existing["tenant_id"])
if key is not None:
require_role(key, ROLE_OPERATOR)
svc.unmute(
mute_id,
actor=_tenant_or_admin_actor(
authorization, existing["tenant_id"]),
)
return Response(status_code=204)
# --- Stage 194: tenant invitations ---------------------------------
# An admin invites a coworker by email. The invitee follows a
# one-shot link, picks a display name, and the backend hands
# them an API key bound to their name + the invited role.
# The flow uses the Stage 185/191 email infra to deliver the link.
def _invite_link(token: str) -> str:
"""Construct the accept URL. The frontend renders
/invite/<token> which posts to /invitations/accept."""
base = (os.environ.get("ORGSTATE_DASHBOARD_URL", "").strip()
or "https://orgstate.1bigfam.com")
base = base.rstrip("/")
return f"{base}/invite/{token}"
@app.post("/tenants/{tenant_id}/invitations",
tags=["invitations"], status_code=201)
async def create_invitation_route(
tenant_id: str,
body: dict,
authorization: Optional[str] = Header(default=None),
):
import os as _os
from delivery.emails import render_invitation
from infra.email_transport import EmailSendError
key = require_tenant_or_admin(svc, authorization, tenant_id)
if key is not None:
require_role(key, ROLE_ADMIN)
email = body.get("email")
role = body.get("role", "operator")
if not isinstance(email, str) or not email:
raise ApiError("bad_request",
"body must include 'email' (string)",
status=400)
try:
invite = svc.invite_user(
tenant_id, email, role=role,
invited_by=_tenant_or_admin_actor(authorization, tenant_id),
actor=_tenant_or_admin_actor(authorization, tenant_id),
)
except ValueError as e:
raise ApiError("bad_request", str(e), status=400) from e
# Attempt to send the invite email. Failure does NOT roll back
# the invitation β€” the admin can copy the URL manually.
# `email_sent` flags the outcome so the dashboard can show
# the right message.
link = _invite_link(invite["raw_token"])
t = svc.get_tenant(tenant_id)
tenant_name = t["name"] if t else tenant_id
html = render_invitation(
tenant_name=tenant_name,
invite_link=link,
role=invite["role"],
)
email_sent = False
email_error = None
try:
svc.send_tenant_email(
tenant_id, to=email,
subject=f"Χ”Χ–ΧžΧ Χ” Χœβ€‘{tenant_name} ב‑OrgState",
html=html, template="invitation",
actor=_tenant_or_admin_actor(authorization, tenant_id),
)
email_sent = True
except EmailSendError as e:
email_error = str(e)
# Strip the raw token from the returned row β€” the email path
# already received it. The accept URL is the only way to use
# the token; we don't echo it back via the API response so an
# admin watching network logs doesn't accidentally see it
# twice.
result = {k: v for k, v in invite.items()
if k not in ("token_hash", "raw_token")}
result["email_sent"] = email_sent
if email_error:
result["email_error"] = email_error
result["accept_url"] = link # let the admin copy manually
return result
@app.get("/tenants/{tenant_id}/invitations",
tags=["invitations"])
async def list_invitations_route(
tenant_id: str,
authorization: Optional[str] = Header(default=None),
):
key = require_tenant_or_admin(svc, authorization, tenant_id)
if key is not None:
require_role(key, ROLE_ADMIN)
rows = svc.list_pending_invitations(tenant_id)
# Don't leak token_hash to the dashboard.
return {
"invitations": [
{k: v for k, v in r.items() if k != "token_hash"}
for r in rows
],
}
@app.delete("/tenants/{tenant_id}/invitations/{invite_id}",
tags=["invitations"], status_code=204)
async def revoke_invitation_route(
tenant_id: str,
invite_id: str,
authorization: Optional[str] = Header(default=None),
):
key = require_tenant_or_admin(svc, authorization, tenant_id)
if key is not None:
require_role(key, ROLE_ADMIN)
try:
out = svc.revoke_invitation(
invite_id,
actor=_tenant_or_admin_actor(authorization, tenant_id),
)
except ValueError as e:
raise ApiError("bad_request", str(e), status=400) from e
if out is None:
raise ApiError("not_found",
f"invitation {invite_id!r} not found",
status=404)
return Response(status_code=204)
@app.post("/invitations/accept", tags=["invitations"])
async def accept_invitation_route(body: dict):
"""Public β€” no auth header required. The token is the
credential. Returns the minted API key + tenant + role.
Same one-shot semantics as api_key creation: raw key is
returned ONCE."""
token = body.get("token")
display_name = body.get("display_name", "")
if not isinstance(token, str) or not token:
raise ApiError("bad_request",
"body must include 'token' (string)",
status=400)
if (not isinstance(display_name, str)
or not display_name.strip()):
raise ApiError("bad_request",
"body must include 'display_name' (string)",
status=400)
try:
return svc.accept_invitation(
token, display_name=display_name)
except ValueError as e:
msg = str(e).lower()
status = (404 if "not found" in msg
else 410 if ("expired" in msg
or "revoked" in msg
or "already accepted" in msg)
else 400)
raise ApiError("bad_request", str(e),
status=status) from e
@app.post("/tenants/{tenant_id}/api_keys", tags=["api_keys"], status_code=201)
async def mint_api_key(
tenant_id: str,
body: ApiKeyMintBody = ApiKeyMintBody(),
authorization: Optional[str] = Header(default=None),
):
key = require_tenant_or_admin(svc, authorization, tenant_id)
if key is not None: # tenant path β€” admin role required
require_role(key, ROLE_ADMIN)
return handlers.mint_api_key(
svc, tenant_id, body.model_dump(),
actor=_tenant_or_admin_actor(authorization, tenant_id),
)
@app.get("/tenants/{tenant_id}/api_keys", tags=["api_keys"])
async def list_api_keys_route(
tenant_id: str,
include_revoked: bool = Query(False),
authorization: Optional[str] = Header(default=None),
):
key = require_tenant_or_admin(svc, authorization, tenant_id)
if key is not None:
require_role(key, ROLE_ADMIN)
return handlers.list_api_keys(svc, tenant_id,
include_revoked=include_revoked)
@app.delete("/tenants/{tenant_id}/api_keys/{key_id}",
tags=["api_keys"], status_code=204)
async def revoke_api_key_route(
tenant_id: str,
key_id: str,
authorization: Optional[str] = Header(default=None),
):
key = require_tenant_or_admin(svc, authorization, tenant_id)
if key is not None:
require_role(key, ROLE_ADMIN)
handlers.revoke_api_key(
svc, tenant_id, key_id,
actor=_tenant_or_admin_actor(authorization, tenant_id),
)
return Response(status_code=204)
# --- admin key management (Stage 5c) ------------------------------
# Bootstrap: an operator needs ONE admin credential first β€” either set
# ORGSTATE_ADMIN_KEY in the env (Stage 5b) or mint the first admin DB
# key via svc.create_admin_key() out-of-band. After that, all admin key
# management is over HTTP and rotation no longer needs a restart.
# Stage 73: webhook REST API. Customers configure their own
# slack/pagerduty endpoint from a dashboard; admins can do so
# cross-tenant. Same role gating as schedules β€” reads readonly,
# writes operator. The raw secret returns ONCE on create; never
# appears on read endpoints.
@app.get("/tenants/{tenant_id}/webhooks", tags=["webhooks"])
async def list_webhooks_route(
tenant_id: str,
enabled_only: bool = Query(default=False),
authorization: Optional[str] = Header(default=None),
):
key = require_tenant_or_admin(svc, authorization, tenant_id)
if key is not None:
require_role(key, ROLE_READONLY)
return handlers.list_webhooks(
svc, tenant_id, enabled_only=enabled_only,
)
@app.post("/tenants/{tenant_id}/webhooks", tags=["webhooks"],
status_code=201)
async def create_webhook_route(
tenant_id: str,
body: WebhookCreateBody,
authorization: Optional[str] = Header(default=None),
):
key = require_tenant_or_admin(svc, authorization, tenant_id)
if key is not None:
require_role(key, ROLE_OPERATOR)
return handlers.create_webhook(
svc, tenant_id, body.model_dump(),
actor=_tenant_or_admin_actor(authorization, tenant_id),
)
@app.get("/webhooks/{webhook_id}", tags=["webhooks"])
async def get_webhook_route(
webhook_id: str,
authorization: Optional[str] = Header(default=None),
):
# resolve first (Stage 39 pattern) β€” 404 if unknown, no
# enumeration distinction between 404 and 403
wh = handlers.get_webhook(svc, webhook_id)
key = require_tenant_or_admin(svc, authorization, wh["tenant_id"])
if key is not None:
require_role(key, ROLE_READONLY)
return wh
@app.post("/webhooks/{webhook_id}/enable", tags=["webhooks"])
async def enable_webhook_route(
webhook_id: str,
authorization: Optional[str] = Header(default=None),
):
wh = handlers.get_webhook(svc, webhook_id)
key = require_tenant_or_admin(svc, authorization, wh["tenant_id"])
if key is not None:
require_role(key, ROLE_OPERATOR)
return handlers.set_webhook_enabled(
svc, webhook_id, True,
actor=_tenant_or_admin_actor(authorization, wh["tenant_id"]),
)
@app.post("/webhooks/{webhook_id}/disable", tags=["webhooks"])
async def disable_webhook_route(
webhook_id: str,
authorization: Optional[str] = Header(default=None),
):
wh = handlers.get_webhook(svc, webhook_id)
key = require_tenant_or_admin(svc, authorization, wh["tenant_id"])
if key is not None:
require_role(key, ROLE_OPERATOR)
return handlers.set_webhook_enabled(
svc, webhook_id, False,
actor=_tenant_or_admin_actor(authorization, wh["tenant_id"]),
)
@app.delete("/webhooks/{webhook_id}", tags=["webhooks"])
async def delete_webhook_route(
webhook_id: str,
authorization: Optional[str] = Header(default=None),
):
wh = handlers.get_webhook(svc, webhook_id)
key = require_tenant_or_admin(svc, authorization, wh["tenant_id"])
if key is not None:
require_role(key, ROLE_OPERATOR)
return handlers.delete_webhook(
svc, webhook_id,
actor=_tenant_or_admin_actor(authorization, wh["tenant_id"]),
)
# Stage 166 β€” per-webhook severity floor. PUT installs a floor,
# DELETE clears it. Tenant-scoped (not /admin/) because this is
# a customer-facing setting: "I don't want a Slack ping for
# every clean run". Read goes through the regular list_webhooks
# response (min_severity is part of the row shape).
@app.put("/webhooks/{webhook_id}/min-severity", tags=["webhooks"])
async def put_webhook_min_severity_route(
webhook_id: str,
body: dict,
authorization: Optional[str] = Header(default=None),
):
wh = handlers.get_webhook(svc, webhook_id)
key = require_tenant_or_admin(svc, authorization, wh["tenant_id"])
if key is not None:
require_role(key, ROLE_OPERATOR)
value = body.get("min_severity")
if not isinstance(value, str):
raise ApiError(
"bad_request",
"body must include 'min_severity' (string)",
status=400,
)
try:
row = svc.set_webhook_min_severity(
webhook_id, value,
actor=_tenant_or_admin_actor(
authorization, wh["tenant_id"]),
)
except ValueError as e:
raise ApiError("bad_request", str(e), status=400) from e
return row
@app.delete("/webhooks/{webhook_id}/min-severity",
tags=["webhooks"])
async def delete_webhook_min_severity_route(
webhook_id: str,
authorization: Optional[str] = Header(default=None),
):
wh = handlers.get_webhook(svc, webhook_id)
key = require_tenant_or_admin(svc, authorization, wh["tenant_id"])
if key is not None:
require_role(key, ROLE_OPERATOR)
return svc.clear_webhook_min_severity(
webhook_id,
actor=_tenant_or_admin_actor(
authorization, wh["tenant_id"]),
)
# Stage 189 β€” per-webhook event subscription. PUT installs a
# filter (list of subscribed event names), DELETE clears it.
# Tenant-scoped β€” operator can manage their own webhooks; admin
# can manage any.
@app.put("/webhooks/{webhook_id}/subscribed-events",
tags=["webhooks"])
async def put_webhook_subscribed_events_route(
webhook_id: str,
body: dict,
authorization: Optional[str] = Header(default=None),
):
wh = handlers.get_webhook(svc, webhook_id)
key = require_tenant_or_admin(svc, authorization, wh["tenant_id"])
if key is not None:
require_role(key, ROLE_OPERATOR)
events = body.get("events")
if not isinstance(events, list):
raise ApiError(
"bad_request",
"body must include 'events' (list of event names)",
status=400,
)
try:
return svc.set_webhook_subscribed_events(
webhook_id, events,
actor=_tenant_or_admin_actor(
authorization, wh["tenant_id"]),
)
except ValueError as e:
raise ApiError("bad_request", str(e), status=400) from e
@app.delete("/webhooks/{webhook_id}/subscribed-events",
tags=["webhooks"])
async def delete_webhook_subscribed_events_route(
webhook_id: str,
authorization: Optional[str] = Header(default=None),
):
wh = handlers.get_webhook(svc, webhook_id)
key = require_tenant_or_admin(svc, authorization, wh["tenant_id"])
if key is not None:
require_role(key, ROLE_OPERATOR)
return svc.clear_webhook_subscribed_events(
webhook_id,
actor=_tenant_or_admin_actor(
authorization, wh["tenant_id"]),
)
# Stage 168 β€” delivery log for a single webhook. Read-only,
# any tenant role can see it (debugging "why didn't I get the
# Slack ping" needs analyst-tier access, not operator).
# Stage 239 β€” tenant-wide delivery list (cross-webhook). Used
# by the Webhooks page "failed deliveries" section.
@app.get("/tenants/{tenant_id}/webhook_deliveries",
tags=["webhooks"])
async def list_tenant_webhook_deliveries_route(
tenant_id: str,
status: Optional[str] = Query(default=None),
limit: int = Query(100, ge=1, le=500),
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_READONLY)
try:
return {
"deliveries": svc.list_tenant_webhook_deliveries(
tenant_id, status=status, limit=limit,
),
}
except ValueError as e:
raise ApiError("bad_request", str(e), status=400) from e
# Stage 239 β€” manual retry of one specific delivery.
@app.post("/webhook_deliveries/{delivery_id}/retry",
tags=["webhooks"])
async def retry_one_delivery_route(
delivery_id: str,
authorization: Optional[str] = Header(default=None),
):
# Resolve the delivery's tenant before auth so a
# forged delivery_id doesn't sneak through.
delivery = svc.webhook_deliveries.get(delivery_id)
if delivery is None:
raise ApiError(
"not_found",
f"unknown delivery {delivery_id!r}", status=404,
)
key = require_tenant_or_admin(
svc, authorization, delivery["tenant_id"],
)
if key is not None:
require_role(key, ROLE_OPERATOR)
actor = (key.key_id if key is not None
else _admin_actor(authorization))
try:
return svc.retry_one_delivery(delivery_id, actor=actor)
except KeyError as e:
raise ApiError("not_found", str(e), status=404) from e
except ValueError as e:
raise ApiError("bad_request", str(e), status=400) from e
@app.get("/webhooks/{webhook_id}/deliveries", tags=["webhooks"])
async def list_webhook_deliveries_route(
webhook_id: str,
limit: int = Query(50, ge=1, le=200),
authorization: Optional[str] = Header(default=None),
):
wh = handlers.get_webhook(svc, webhook_id)
require_tenant_or_admin(svc, authorization, wh["tenant_id"])
rows = svc.list_webhook_deliveries(webhook_id, limit=limit)
return {"deliveries": rows}
# Stage 74: synthetic ping. Operator-only (writes a side effect
# over the network β€” readonly-key should not be able to fire it).
@app.post("/webhooks/{webhook_id}/test", tags=["webhooks"])
async def test_webhook_route(
webhook_id: str,
authorization: Optional[str] = Header(default=None),
):
wh = handlers.get_webhook(svc, webhook_id)
key = require_tenant_or_admin(svc, authorization, wh["tenant_id"])
if key is not None:
require_role(key, ROLE_OPERATOR)
return svc.test_webhook(webhook_id)
# Stage 39: REST surface over the existing scheduler. Customer
# dashboards can list / create / toggle schedules without going
# through the CLI. Two URL families:
# /tenants/{tid}/schedules β€” list + create (tenant-scoped)
# /schedules/{sid} β€” get + enable/disable (one-shot)
@app.post("/tenants/{tenant_id}/connectors/test", tags=["schedules"])
async def test_connector(
tenant_id: str,
body: ConnectorTestBody,
key: ApiKey = Depends(auth_dep),
):
"""Stage 157 (Ξ±2) β€” dry-run probe of a connector_config tuple
before committing it to a Schedule. Returns 200 with a
``{ok: bool, ...}`` body so the dashboard can render success
+ sample stats OR the verbatim error inline β€” no 5xx
gymnastics. Operator-tier: a readonly key could mint a
scheduled job through misuse otherwise."""
require_tenant_access(key, tenant_id)
require_role(key, ROLE_OPERATOR)
from infra.ingestion.connectors import build_connector
try:
conn = build_connector(body.connector_type,
body.connector_config)
except Exception as e:
return {
"ok": False,
"stage": "build_connector",
"error_type": type(e).__name__,
"error": str(e),
}
# Figure out which entity_type to test against. Prefer the
# caller's hint; else first one the connector advertises;
# else a "warehouse" fallback (the most common demo path).
try:
advertised = (conn.entity_types()
if hasattr(conn, "entity_types") else [])
except Exception:
advertised = []
entity_type = (body.entity_type
or (advertised[0] if advertised else "warehouse"))
try:
obs = conn.fetch(entity_type)
except Exception as e:
return {
"ok": False,
"stage": "fetch",
"entity_type_tested": entity_type,
"error_type": type(e).__name__,
"error": str(e),
}
# Summarise: how many observations + how many distinct entities
# the connector returned. The customer-side wizard surfaces
# both so an empty result is visible (0 observations =
# config is reachable but yields nothing β€” probably wrong
# path or filter).
n = len(obs)
sample_entities = sorted({getattr(o, "entity_id", "") for o in obs[:50]})
return {
"ok": True,
"entity_type_tested": entity_type,
"advertised_entity_types": advertised,
"n_observations": n,
"sample_entity_ids": [e for e in sample_entities if e][:10],
}
@app.get("/tenants/{tenant_id}/schedules", tags=["schedules"])
async def list_schedules_route(
tenant_id: str,
enabled_only: bool = Query(default=False),
status: Optional[str] = Query(default=None),
authorization: Optional[str] = Header(default=None),
):
key = require_tenant_or_admin(svc, authorization, tenant_id)
if key is not None:
require_role(key, ROLE_READONLY)
return handlers.list_schedules(
svc, tenant_id, enabled_only=enabled_only, status=status,
)
@app.post("/tenants/{tenant_id}/schedules", tags=["schedules"],
status_code=201)
async def create_schedule_route(
tenant_id: str,
body: ScheduleCreateBody,
authorization: Optional[str] = Header(default=None),
):
key = require_tenant_or_admin(svc, authorization, tenant_id)
if key is not None:
# creating a schedule changes how/when analysis fires β€”
# operator-tier authority, not readonly
require_role(key, ROLE_OPERATOR)
return handlers.create_schedule(
svc, tenant_id, body.model_dump(),
actor=_tenant_or_admin_actor(authorization, tenant_id),
)
@app.get("/schedules/{schedule_id}", tags=["schedules"])
async def get_schedule_route(
schedule_id: str,
authorization: Optional[str] = Header(default=None),
):
# resolve first; THEN check tenant scoping. 404 if unknown
# (existence isn't disclosed); 403 if wrong tenant.
schedule = handlers.get_schedule(svc, schedule_id)
key = require_tenant_or_admin(svc, authorization,
schedule["tenant_id"])
if key is not None:
require_role(key, ROLE_READONLY)
return schedule
@app.post("/schedules/{schedule_id}/enable", tags=["schedules"])
async def enable_schedule_route(
schedule_id: str,
authorization: Optional[str] = Header(default=None),
):
schedule = handlers.get_schedule(svc, schedule_id)
key = require_tenant_or_admin(svc, authorization,
schedule["tenant_id"])
if key is not None:
require_role(key, ROLE_OPERATOR)
return handlers.set_schedule_enabled(svc, schedule_id, True)
@app.post("/schedules/{schedule_id}/disable", tags=["schedules"])
async def disable_schedule_route(
schedule_id: str,
authorization: Optional[str] = Header(default=None),
):
schedule = handlers.get_schedule(svc, schedule_id)
key = require_tenant_or_admin(svc, authorization,
schedule["tenant_id"])
if key is not None:
require_role(key, ROLE_OPERATOR)
return handlers.set_schedule_enabled(svc, schedule_id, False)
# Stage 238 β€” hard delete a schedule. Operator-tier (or admin
# cross-tenant). Audited via the service layer. Past runs
# remain intact in the runs table.
@app.delete("/schedules/{schedule_id}",
tags=["schedules"], status_code=204)
async def delete_schedule_route(
schedule_id: str,
authorization: Optional[str] = Header(default=None),
):
schedule = handlers.get_schedule(svc, schedule_id)
key = require_tenant_or_admin(svc, authorization,
schedule["tenant_id"])
if key is not None:
require_role(key, ROLE_OPERATOR)
actor = (key.key_id if key is not None
else _admin_actor(authorization))
from infra.ingestion.scheduler import IngestionService
ing = IngestionService(svc)
ok = ing.delete_schedule(schedule_id)
if not ok:
raise ApiError(
"not_found",
f"unknown schedule {schedule_id!r}", status=404,
)
svc.audit.log(
actor, "delete_schedule",
tenant_id=schedule["tenant_id"],
target_id=schedule_id,
payload={
"entity_type": schedule.get("entity_type"),
"connector_type": schedule.get("connector_type"),
"frequency": schedule.get("frequency"),
},
)
return None
# Stage 54: admin tenant lifecycle over HTTP. Admin dashboards
# need a cross-tenant list + pause/resume buttons. CLI commands
# exist but admin operators want point-and-click; tenant API
# keys must NOT be able to enumerate.
@app.get("/admin/tenants", tags=["admin"])
async def admin_list_all_tenants(
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
return handlers.list_all_tenants(svc)
@app.post("/admin/tenants/{tenant_id}/seed-demo", tags=["admin"])
async def admin_seed_demo(
tenant_id: str,
days_back: int = Query(default=1, ge=1, le=30),
authorization: Optional[str] = Header(default=None),
):
"""Stage 160 β€” populate a freshly-created tenant with demo
data so the dashboard renders something meaningful on first
visit. Admin-only. Reuses delivery.demo_seed.seed_logistics_
demo.seed_tenant_via_service so the same generator the CLI
ships also runs in-process here.
Idempotent w.r.t. calibration (overwrites stored fit); each
call APPENDS one run per entity_type, so calling twice
doubles the run count. Wizard call site invokes it exactly
once after tenant creation.
"""
require_admin(authorization, svc=svc)
if svc.get_tenant(tenant_id) is None:
raise ApiError("not_found",
f"unknown tenant {tenant_id!r}", status=404)
from delivery.demo_seed.seed_logistics_demo import (
seed_tenant_via_service,
)
return seed_tenant_via_service(
svc, tenant_id, days_back=days_back,
)
@app.post("/admin/tenants/{tenant_id}/pause", tags=["admin"])
async def admin_pause_tenant(
tenant_id: str,
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
return handlers.set_tenant_status(
svc, tenant_id, "paused",
actor=_admin_actor(authorization),
)
@app.post("/admin/tenants/{tenant_id}/resume", tags=["admin"])
async def admin_resume_tenant(
tenant_id: str,
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
return handlers.set_tenant_status(
svc, tenant_id, "active",
actor=_admin_actor(authorization),
)
# Stage 57: admin cross-tenant schedule list. Stage 39's
# /tenants/{tid}/schedules is per-tenant; this one lets an admin
# see the whole platform in one call β€” for dashboards that pivot
# to "which tenants have errored schedules right now?".
@app.get("/admin/schedules", tags=["admin"])
async def admin_list_all_schedules(
enabled_only: bool = Query(default=False),
status: Optional[str] = Query(default=None),
tenant_id: Optional[str] = Query(default=None),
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
return handlers.list_all_schedules(
svc, enabled_only=enabled_only,
status=status, tenant_id=tenant_id,
)
# Stage 56: admin force-tick. Stage 39's REST CRUD covers
# list/get/create/enable/disable per schedule, but tick (run
# every due schedule once) was CLI-only. An admin dashboard
# with a "run now" button needs this β€” no waiting for cron.
@app.post("/admin/schedules/tick", tags=["admin"])
async def admin_tick_schedules(
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
return handlers.admin_tick_schedules(svc)
# Stage 59: HTTP for tenant overrides (Stage 58 over REST).
# Reads: tenant key (readonly) OR admin β€” the customer dashboard
# widget should be able to surface "your thresholds are X".
# Writes (PUT/DELETE): admin-only β€” customers cannot flip their
# own thresholds; the change goes through customer success.
@app.get("/tenants/{tenant_id}/overrides", tags=["tenants"])
async def list_tenant_overrides_route(
tenant_id: str,
authorization: Optional[str] = Header(default=None),
):
key = require_tenant_or_admin(svc, authorization, tenant_id)
if key is not None:
require_role(key, ROLE_READONLY)
return handlers.list_tenant_overrides(svc, tenant_id)
@app.put("/admin/tenants/{tenant_id}/overrides/{entity_type}",
tags=["admin"])
async def admin_set_overrides(
tenant_id: str, entity_type: str,
body: TenantOverridesBody,
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
# exclude_none keeps the partial-update semantics from
# Stage 58/61: a None field means "don't touch" rather
# than "explicitly clear"
return handlers.admin_set_tenant_overrides(
svc, tenant_id, entity_type,
body.model_dump(exclude_none=True),
actor=_admin_actor(authorization),
)
@app.delete("/admin/tenants/{tenant_id}/overrides/{entity_type}",
tags=["admin"])
async def admin_clear_overrides(
tenant_id: str, entity_type: str,
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
return handlers.admin_clear_tenant_overrides(
svc, tenant_id, entity_type,
actor=_admin_actor(authorization),
)
# Stage 55: admin fix-it commands β€” recalibrate + run trigger
# over HTTP. The existing /tenants/{tid}/calibrations and /runs
# routes work for tenant keys (operator role); admin keys go
# through auth_dep too rigidly. These routes let an admin
# dashboard drive the loop without needing a tenant credential.
@app.post("/admin/tenants/{tenant_id}/recalibrate", tags=["admin"])
async def admin_recalibrate_tenant(
tenant_id: str,
body: AdminRecalibrateBody,
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
return handlers.admin_recalibrate(
svc, tenant_id, body.model_dump(),
actor=_admin_actor(authorization),
)
@app.post("/admin/tenants/{tenant_id}/runs/trigger", tags=["admin"])
async def admin_trigger_tenant_run(
tenant_id: str,
body: AdminTriggerRunBody,
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
return handlers.admin_trigger_run(
svc, tenant_id, body.model_dump(),
actor=_admin_actor(authorization),
)
# Stage 53: audit log over HTTP β€” admin-only. JSON for ops
# dashboards, CSV for compliance pulls (the auditor's preferred
# format). Same filters as the CLI `audit list` + Stage 51
# since/until time bounds.
@app.get("/admin/audit", tags=["admin"])
async def admin_list_audit(
actor: Optional[str] = Query(default=None),
action: Optional[str] = Query(default=None),
tenant_id: Optional[str] = Query(default=None),
since: Optional[str] = Query(default=None),
until: Optional[str] = Query(default=None),
limit: int = Query(100, ge=1, le=handlers.MAX_PAGE),
offset: int = Query(0, ge=0),
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
return handlers.list_audit(
svc, actor=actor, action=action, tenant_id=tenant_id,
since=since, until=until, limit=limit, offset=offset,
)
@app.get("/admin/audit.csv", tags=["admin"],
response_class=PlainTextResponse)
async def admin_audit_csv(
actor: Optional[str] = Query(default=None),
action: Optional[str] = Query(default=None),
tenant_id: Optional[str] = Query(default=None),
since: Optional[str] = Query(default=None),
until: Optional[str] = Query(default=None),
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
csv_body = handlers.audit_csv(
svc, actor=actor, action=action, tenant_id=tenant_id,
since=since, until=until,
)
# filename scopes β€” useful for two parallel exports
scope = (
"_".join(filter(None, [
f"tenant-{tenant_id}" if tenant_id else None,
f"actor-{actor}" if actor else None,
f"action-{action}" if action else None,
])) or "all"
)
return PlainTextResponse(
csv_body, media_type="text/csv; charset=utf-8",
headers={
"Content-Disposition":
f'attachment; filename="audit_{scope}.csv"',
},
)
# Stage 46: HTTP mirror of the Stage 44 CLI `ops-summary`. Ops
# dashboards speak HTTP, not CLI. Same `svc.ops_summary` method
# under both surfaces so there's no possible drift.
@app.get("/admin/ops-summary", tags=["admin"])
async def admin_ops_summary(
max_age_days: float = Query(default=90.0),
max_gap_hours: float = Query(default=48.0),
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
return svc.ops_summary(
max_age_days=max_age_days,
max_gap_hours=max_gap_hours,
)
# Stage 35: ops dashboard endpoint β€” every tenant's health in
# one call, sorted worst-first. Admin-only on purpose: it
# enumerates the platform, which a tenant key must never do.
@app.get("/admin/tenants/health", tags=["admin"])
async def admin_platform_health(
max_age_days: float = Query(default=90.0),
max_gap_hours: float = Query(default=48.0),
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
return handlers.platform_health(
svc, max_age_days=max_age_days,
max_gap_hours=max_gap_hours,
)
@app.post("/admin_keys", tags=["admin_keys"], status_code=201)
async def mint_admin_key(
body: AdminKeyMintBody = AdminKeyMintBody(),
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
return handlers.mint_admin_key(svc, body.model_dump(),
actor=_admin_actor(authorization))
@app.get("/admin_keys", tags=["admin_keys"])
async def list_admin_keys_route(
include_revoked: bool = Query(False),
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
return handlers.list_admin_keys(svc, include_revoked=include_revoked)
@app.delete("/admin_keys/{key_id}",
tags=["admin_keys"], status_code=204)
async def revoke_admin_key_route(
key_id: str,
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
handlers.revoke_admin_key(svc, key_id,
actor=_admin_actor(authorization))
return Response(status_code=204)
# --- audit log (Stage 12) ---------------------------------------
@app.get("/audit_logs", tags=["audit"])
async def list_audit_logs(
actor: Optional[str] = Query(default=None),
action: Optional[str] = Query(default=None),
tenant_id: Optional[str] = Query(default=None),
limit: int = Query(100, ge=1, le=handlers.MAX_PAGE),
offset: int = Query(0, ge=0),
authorization: Optional[str] = Header(default=None),
):
require_admin(authorization, svc=svc)
return handlers.list_audit_logs(
svc, actor=actor, action=action, tenant_id=tenant_id,
limit=limit, offset=offset,
)
# Stage 157 β€” tenant-scoped audit log view. The /audit_logs route
# above is admin-only and lets an operator audit the whole
# platform; this variant lets a tenant's own admin see *their*
# rows without needing platform-level credentials. tenant_id is
# locked to the caller's bound tenant β€” even passing ?tenant_id=
# has no effect, the path scope wins.
@app.get("/tenants/{tenant_id}/audit_logs", tags=["audit"])
async def list_tenant_audit_logs(
tenant_id: str,
actor: Optional[str] = Query(default=None),
action: Optional[str] = Query(default=None),
entity_id: Optional[str] = Query(default=None),
limit: int = Query(100, ge=1, le=handlers.MAX_PAGE),
offset: int = Query(0, ge=0),
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_ADMIN) # admin-tier audit reads
return handlers.list_audit_logs(
svc, actor=actor, action=action, entity_id=entity_id,
tenant_id=tenant_id,
limit=limit, offset=offset,
)
# --- pushed observations (Stage 7d) -------------------------------
# Separate "store this" from "run analysis" so a webhook integration
# can push hourly and the analysis can be triggered on a different
# cadence (cron, on-demand from the UI).
@app.post("/tenants/{tenant_id}/observations",
tags=["observations"], status_code=202)
async def ingest_observations(
tenant_id: str,
body: ObservationsIngestBody,
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_OPERATOR)
return handlers.ingest_observations(svc, tenant_id, body.model_dump())
@app.post("/tenants/{tenant_id}/observations/run",
tags=["observations"], status_code=201)
async def run_from_stored(
tenant_id: str,
body: ObservationsRunBody,
key: ApiKey = Depends(auth_dep),
):
require_tenant_access(key, tenant_id)
require_role(key, ROLE_OPERATOR)
return handlers.run_from_stored(svc, tenant_id, body.model_dump())
# Stage 103: SCIM 2.0 user provisioning. Mounted as a sub-router
# at /scim/v2/{tenant_id}/* β€” the SCIM module owns its own
# auth path (role='scim' bearer + tenant binding); the regular
# auth_dep is NOT applied to these routes. Rate limiting + TLS
# enforcement DO apply (Okta accepts retry-after and runs over
# HTTPS by default), so we deliberately don't add /scim to
# those EXCLUDED_PATHS sets.
from infra.scim.routes import build_router as _build_scim_router
app.include_router(_build_scim_router())
return app
# module-level `app` for `uvicorn infra.api.app:app` β€” built lazily via
# PEP 562 __getattr__. Eagerly calling create_app() at import time would
# open the default on-disk DB just to import the module, which breaks
# both tests (that pass their own :memory: path) and any tooling that
# imports this module without intending to start the API.
def __getattr__(name):
if name == "app":
return create_app() if _FASTAPI_AVAILABLE else None
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")