Legal-i's picture
Initial OrgState deploy via Stage 150 free-tier stack
d2d1903 verified
"""
orgstate_client.client β€” the SDK's only public class.
The Client is tenant-scoped at construction (the customer holds one
API key, tied to one tenant; an SDK that requires passing tenant_id
on every call would be needlessly chatty). For admin keys you can
pass tenant_id=None and override via the per-method ``tenant_id``
argument, but the customer-shaped use case is the default.
Transport is injectable:
- default: stdlib urllib (no extra deps in production)
- tests: any object with ``.request(method, url, headers=...,
params=..., json=...)`` returning something with .status_code,
.json(), .text β€” httpx.Client and fastapi.TestClient both fit
That seam lets tests run against a real FastAPI app via TestClient
without spinning up a real socket.
"""
from __future__ import annotations
import json as _json
import urllib.error
import urllib.parse
import urllib.request
from typing import Any, Dict, Optional
class ClientError(Exception):
"""Raised when the API returns an error response. Carries the
parsed JSON body when available so the caller can branch on
``error.code``."""
def __init__(self, status_code: int, body: Any):
self.status_code = status_code
self.body = body
msg = (
body.get("error", {}).get("message", str(body))
if isinstance(body, dict) else str(body)
)
super().__init__(f"HTTP {status_code}: {msg}")
class _UrllibResponse:
"""Thin adapter β€” mirrors the bits of httpx.Response the SDK uses."""
def __init__(self, status_code: int, body_bytes: bytes,
content_type: str):
self.status_code = status_code
self._body = body_bytes
self._content_type = content_type
@property
def text(self) -> str:
return self._body.decode("utf-8")
def json(self) -> Any:
return _json.loads(self._body) if self._body else None
class _UrllibTransport:
"""Default transport β€” stdlib only. Compatible-ish with httpx.Client
for the small surface the SDK needs (just ``.request``)."""
def __init__(self, base_url: str, timeout: float = 30.0):
self.base_url = base_url.rstrip("/")
self.timeout = timeout
def request(self, method: str, url: str, *,
headers: Optional[Dict[str, str]] = None,
params: Optional[Dict[str, Any]] = None,
json: Any = None):
# url may be absolute (test passes httpx-style) or path-only
# β€” normalize against base_url
if url.startswith(("http://", "https://")):
full = url
else:
full = self.base_url + url
if params:
qs = urllib.parse.urlencode(
{k: v for k, v in params.items() if v is not None}
)
if qs:
full = full + ("&" if "?" in full else "?") + qs
body: Optional[bytes] = None
h = dict(headers or {})
if json is not None:
body = _json.dumps(json).encode("utf-8")
h.setdefault("Content-Type", "application/json")
req = urllib.request.Request(full, data=body, method=method,
headers=h)
try:
with urllib.request.urlopen(req, timeout=self.timeout) as resp:
return _UrllibResponse(
status_code=resp.status,
body_bytes=resp.read(),
content_type=resp.headers.get("content-type", ""),
)
except urllib.error.HTTPError as e:
# 4xx/5xx β€” return as a response so the SDK can wrap
body_bytes = e.read() if e.fp else b""
return _UrllibResponse(
status_code=e.code,
body_bytes=body_bytes,
content_type=e.headers.get("content-type", "")
if e.headers else "",
)
class Client:
"""Tenant-scoped read-only SDK over the OrgState HTTP API.
Stage 62 covers the customer-shaped read surface. Future stages
add write methods (set_overrides, recalibrate) and admin methods
(cross-tenant list, etc.)."""
def __init__(self, *,
base_url: str,
tenant_id: str,
api_key: str,
transport: Any = None,
timeout: float = 30.0):
self.base_url = base_url.rstrip("/")
self.tenant_id = tenant_id
self.api_key = api_key
self.transport = transport or _UrllibTransport(
base_url, timeout=timeout,
)
# --- internals ---------------------------------------------------
def _request(self, method: str, path: str, *,
params: Optional[Dict[str, Any]] = None,
json: Any = None) -> Any:
"""Issue a request through the transport, raise ClientError on
non-2xx, return the parsed response object so callers can
decide whether they want .json() or .text."""
headers = {"Authorization": f"Bearer {self.api_key}"}
# for the default transport, path is enough; for httpx-style
# transports (TestClient), pass the path as-is β€” TestClient
# joins it with its own base_url
url = path if hasattr(self.transport, "_base_url") else path
resp = self.transport.request(
method, url, headers=headers, params=params, json=json,
)
if not (200 <= resp.status_code < 300):
try:
body = resp.json()
except Exception:
body = resp.text
raise ClientError(resp.status_code, body)
return resp
# --- read surface ------------------------------------------------
def health(self) -> dict:
"""GET /tenants/{tid}/health β€” the readiness state for the
tenant. Returns the same payload as Stage 34.
**503 is a normal response**, not an error: the endpoint
follows the k8s readiness convention where 503 means "not
ready" (carrying the payload that explains why). The SDK
returns the body in both cases β€” the caller branches on
``payload["healthy"]``, not on a raised exception. Other
4xx (401 no key, 403 cross-tenant) still raise."""
# bypass _request's non-2xx raising for this one route
headers = {"Authorization": f"Bearer {self.api_key}"}
resp = self.transport.request(
"GET", f"/tenants/{self.tenant_id}/health",
headers=headers, params=None, json=None,
)
if resp.status_code in (200, 503):
return resp.json()
# 401/403/404 etc. β€” those genuinely are errors
try:
body = resp.json()
except Exception:
body = resp.text
raise ClientError(resp.status_code, body)
def list_runs(self, *,
entity_type: Optional[str] = None,
since: Optional[str] = None,
limit: int = 50,
offset: int = 0) -> dict:
"""GET /tenants/{tid}/runs β€” same filters as the JSON variant
of Stage 49."""
# filter None at construction β€” httpx serializes None as the
# literal string "None", which FastAPI then treats as the
# filter value (matching no rows). Building the dict
# selectively avoids the transport-level None-handling
# difference between urllib and httpx.
params: Dict[str, Any] = {"limit": limit, "offset": offset}
if entity_type is not None:
params["entity_type"] = entity_type
if since is not None:
params["since"] = since
return self._request(
"GET", f"/tenants/{self.tenant_id}/runs", params=params,
).json()
def get_run(self, run_id: str) -> dict:
"""GET /runs/{run_id} β€” the run detail row."""
return self._request("GET", f"/runs/{run_id}").json()
def get_issues(self, run_id: str) -> dict:
"""GET /runs/{run_id}/issues β€” issues list as JSON."""
return self._request(
"GET", f"/runs/{run_id}/issues",
).json()
def download_issues_csv(self, run_id: str) -> str:
"""GET /runs/{run_id}/issues.csv β€” Stage 47 CSV download as
a plain string (caller writes to disk or pipes to pandas)."""
return self._request(
"GET", f"/runs/{run_id}/issues.csv",
).text
def download_decisions_csv(self, run_id: str) -> str:
"""GET /runs/{run_id}/decisions.csv β€” Stage 48. Decisions
snapshot merged with triage state (status/owner/notes) in
one wide row per decision. Returns CSV text."""
return self._request(
"GET", f"/runs/{run_id}/decisions.csv",
).text
def download_runs_csv(self, *,
entity_type: Optional[str] = None,
since: Optional[str] = None) -> str:
"""GET /tenants/{tid}/runs.csv β€” Stage 50. Compliance
""all runs in Q1"" pull. Same filters as list_runs JSON
(entity_type narrowing, since cutoff). Returns CSV text."""
params: Dict[str, Any] = {}
if entity_type is not None:
params["entity_type"] = entity_type
if since is not None:
params["since"] = since
return self._request(
"GET", f"/tenants/{self.tenant_id}/runs.csv",
params=params or None,
).text
def get_overrides(self) -> dict:
"""GET /tenants/{tid}/overrides β€” Stage 59. Returns the
customer's current threshold + weights overrides."""
return self._request(
"GET", f"/tenants/{self.tenant_id}/overrides",
).json()
def preview_run(self, entity_type: str) -> dict:
"""GET /tenants/{tid}/runs/preview β€” Stage 45. Dry-run
analysis, no DB writes. Returns top issues + counts."""
return self._request(
"GET", f"/tenants/{self.tenant_id}/runs/preview",
params={"entity_type": entity_type},
).json()
def download_backfill_csv(self, entity_type: str, *,
from_day: Optional[str] = None,
until_day: Optional[str] = None,
step_days: int = 1) -> str:
"""GET /tenants/{tid}/backfill.csv β€” Stage 67. Returns plain
CSV text so the caller can write to disk or feed pandas. The
chart-friendly wide layout matches the HTTP route exactly."""
params: Dict[str, Any] = {
"entity_type": entity_type, "step_days": step_days,
}
if from_day is not None:
params["from"] = from_day
if until_day is not None:
params["until"] = until_day
return self._request(
"GET", f"/tenants/{self.tenant_id}/backfill.csv",
params=params,
).text
def backfill(self, entity_type: str, *,
from_day: Optional[str] = None,
until_day: Optional[str] = None,
step_days: int = 1) -> dict:
"""GET /tenants/{tid}/backfill β€” Stage 66. Point-in-time
historical analysis over a date range. Read-only, no DB
writes. Customer dashboard widget pattern: "show me how
we looked over Q1"."""
params: Dict[str, Any] = {
"entity_type": entity_type,
"step_days": step_days,
}
# the route accepts `from`/`until` (Python keywords elsewhere,
# but valid query-string keys); httpx will encode them safely
if from_day is not None:
params["from"] = from_day
if until_day is not None:
params["until"] = until_day
return self._request(
"GET", f"/tenants/{self.tenant_id}/backfill",
params=params,
).json()
# --- write surface (operator role, Stage 63) -------------------
def ingest_observations(self, entity_type: str,
observations: list) -> dict:
"""POST /tenants/{tid}/observations β€” push observations to
the buffer (Stage 7d). Idempotent on (entity_id, day) per
(tenant, entity_type) β€” re-pushing the same day overwrites.
Requires operator-role tenant key."""
return self._request(
"POST", f"/tenants/{self.tenant_id}/observations",
json={"entity_type": entity_type,
"observations": observations},
).json()
def register_calibration(self, vertical: str, entity_type: str,
observations: list) -> dict:
"""POST /tenants/{tid}/calibrations β€” derive + persist a
calibration from the passed observations. Requires
operator-role tenant key.
For "calibrate from stored observations", use the admin
endpoint via an AdminClient (future Stage) β€” there's
deliberately no shortcut on the customer SDK; recalibration
is operationally significant and should go through ops."""
return self._request(
"POST", f"/tenants/{self.tenant_id}/calibrations",
json={"vertical": vertical, "entity_type": entity_type,
"observations": observations},
).json()
def create_schedule(self, *, vertical: str, entity_type: str,
connector_type: str,
connector_config: Optional[dict] = None,
frequency: str = "daily",
enabled: bool = True) -> dict:
"""POST /tenants/{tid}/schedules β€” register a new scheduled
ingestion (Stage 39). Customer can self-serve onboard their
connector. Requires operator-role tenant key."""
body: Dict[str, Any] = {
"vertical": vertical,
"entity_type": entity_type,
"connector_type": connector_type,
"frequency": frequency,
"enabled": enabled,
}
if connector_config is not None:
body["connector_config"] = connector_config
return self._request(
"POST", f"/tenants/{self.tenant_id}/schedules",
json=body,
).json()
def enable_schedule(self, schedule_id: str) -> dict:
"""POST /schedules/{sid}/enable β€” toggle a paused schedule
back on. Requires operator-role tenant key (the route
checks that the schedule's tenant matches the caller)."""
return self._request(
"POST", f"/schedules/{schedule_id}/enable",
).json()
def disable_schedule(self, schedule_id: str) -> dict:
"""POST /schedules/{sid}/disable β€” pause a schedule without
deleting it. Useful for testing windows or holidays."""
return self._request(
"POST", f"/schedules/{schedule_id}/disable",
).json()
# --- webhooks (Stage 73) -----------------------------------------
def list_webhooks(self, *, enabled_only: bool = False) -> dict:
"""GET /tenants/{tid}/webhooks β€” Stage 73. secret_hash and
secret_raw are NOT in the response (stripped server-side)."""
params: Dict[str, Any] = {}
if enabled_only:
params["enabled_only"] = enabled_only
return self._request(
"GET", f"/tenants/{self.tenant_id}/webhooks",
params=params or None,
).json()
def create_webhook(self, url: str, *,
secret: Optional[str] = None) -> dict:
"""POST /tenants/{tid}/webhooks β€” register a webhook. The
raw secret (if passed) comes back in the response ONCE and
is never retrievable again; the caller wires it into their
receiver's signature verification."""
body: Dict[str, Any] = {"url": url}
if secret is not None:
body["secret"] = secret
return self._request(
"POST", f"/tenants/{self.tenant_id}/webhooks",
json=body,
).json()
def get_webhook(self, webhook_id: str) -> dict:
"""GET /webhooks/{wid} β€” Stage 73."""
return self._request(
"GET", f"/webhooks/{webhook_id}",
).json()
def enable_webhook(self, webhook_id: str) -> dict:
"""POST /webhooks/{wid}/enable β€” Stage 73."""
return self._request(
"POST", f"/webhooks/{webhook_id}/enable",
).json()
def disable_webhook(self, webhook_id: str) -> dict:
"""POST /webhooks/{wid}/disable β€” Stage 73. Soft-pause:
row preserved, no future deliveries."""
return self._request(
"POST", f"/webhooks/{webhook_id}/disable",
).json()
def delete_webhook(self, webhook_id: str) -> dict:
"""DELETE /webhooks/{wid} β€” Stage 73. Idempotent: returns
deleted=false on a row that doesn't exist, no error."""
return self._request(
"DELETE", f"/webhooks/{webhook_id}",
).json()
def test_webhook(self, webhook_id: str) -> dict:
"""POST /webhooks/{wid}/test β€” Stage 74. Sends a synthetic
ping payload (event=ping) to the registered URL so the
customer can verify their receiver works. Returns the
delivery result dict (ok / status_code / error)."""
return self._request(
"POST", f"/webhooks/{webhook_id}/test",
).json()