""" orgstate_client.client — the SDK's only public class. The Client is tenant-scoped at construction (the customer holds one API key, tied to one tenant; an SDK that requires passing tenant_id on every call would be needlessly chatty). For admin keys you can pass tenant_id=None and override via the per-method ``tenant_id`` argument, but the customer-shaped use case is the default. Transport is injectable: - default: stdlib urllib (no extra deps in production) - tests: any object with ``.request(method, url, headers=..., params=..., json=...)`` returning something with .status_code, .json(), .text — httpx.Client and fastapi.TestClient both fit That seam lets tests run against a real FastAPI app via TestClient without spinning up a real socket. """ from __future__ import annotations import json as _json import urllib.error import urllib.parse import urllib.request from typing import Any, Dict, Optional class ClientError(Exception): """Raised when the API returns an error response. Carries the parsed JSON body when available so the caller can branch on ``error.code``.""" def __init__(self, status_code: int, body: Any): self.status_code = status_code self.body = body msg = ( body.get("error", {}).get("message", str(body)) if isinstance(body, dict) else str(body) ) super().__init__(f"HTTP {status_code}: {msg}") class _UrllibResponse: """Thin adapter — mirrors the bits of httpx.Response the SDK uses.""" def __init__(self, status_code: int, body_bytes: bytes, content_type: str): self.status_code = status_code self._body = body_bytes self._content_type = content_type @property def text(self) -> str: return self._body.decode("utf-8") def json(self) -> Any: return _json.loads(self._body) if self._body else None class _UrllibTransport: """Default transport — stdlib only. Compatible-ish with httpx.Client for the small surface the SDK needs (just ``.request``).""" def __init__(self, base_url: str, timeout: float = 30.0): self.base_url = base_url.rstrip("/") self.timeout = timeout def request(self, method: str, url: str, *, headers: Optional[Dict[str, str]] = None, params: Optional[Dict[str, Any]] = None, json: Any = None): # url may be absolute (test passes httpx-style) or path-only # — normalize against base_url if url.startswith(("http://", "https://")): full = url else: full = self.base_url + url if params: qs = urllib.parse.urlencode( {k: v for k, v in params.items() if v is not None} ) if qs: full = full + ("&" if "?" in full else "?") + qs body: Optional[bytes] = None h = dict(headers or {}) if json is not None: body = _json.dumps(json).encode("utf-8") h.setdefault("Content-Type", "application/json") req = urllib.request.Request(full, data=body, method=method, headers=h) try: with urllib.request.urlopen(req, timeout=self.timeout) as resp: return _UrllibResponse( status_code=resp.status, body_bytes=resp.read(), content_type=resp.headers.get("content-type", ""), ) except urllib.error.HTTPError as e: # 4xx/5xx — return as a response so the SDK can wrap body_bytes = e.read() if e.fp else b"" return _UrllibResponse( status_code=e.code, body_bytes=body_bytes, content_type=e.headers.get("content-type", "") if e.headers else "", ) class Client: """Tenant-scoped read-only SDK over the OrgState HTTP API. Stage 62 covers the customer-shaped read surface. Future stages add write methods (set_overrides, recalibrate) and admin methods (cross-tenant list, etc.).""" def __init__(self, *, base_url: str, tenant_id: str, api_key: str, transport: Any = None, timeout: float = 30.0): self.base_url = base_url.rstrip("/") self.tenant_id = tenant_id self.api_key = api_key self.transport = transport or _UrllibTransport( base_url, timeout=timeout, ) # --- internals --------------------------------------------------- def _request(self, method: str, path: str, *, params: Optional[Dict[str, Any]] = None, json: Any = None) -> Any: """Issue a request through the transport, raise ClientError on non-2xx, return the parsed response object so callers can decide whether they want .json() or .text.""" headers = {"Authorization": f"Bearer {self.api_key}"} # for the default transport, path is enough; for httpx-style # transports (TestClient), pass the path as-is — TestClient # joins it with its own base_url url = path if hasattr(self.transport, "_base_url") else path resp = self.transport.request( method, url, headers=headers, params=params, json=json, ) if not (200 <= resp.status_code < 300): try: body = resp.json() except Exception: body = resp.text raise ClientError(resp.status_code, body) return resp # --- read surface ------------------------------------------------ def health(self) -> dict: """GET /tenants/{tid}/health — the readiness state for the tenant. Returns the same payload as Stage 34. **503 is a normal response**, not an error: the endpoint follows the k8s readiness convention where 503 means "not ready" (carrying the payload that explains why). The SDK returns the body in both cases — the caller branches on ``payload["healthy"]``, not on a raised exception. Other 4xx (401 no key, 403 cross-tenant) still raise.""" # bypass _request's non-2xx raising for this one route headers = {"Authorization": f"Bearer {self.api_key}"} resp = self.transport.request( "GET", f"/tenants/{self.tenant_id}/health", headers=headers, params=None, json=None, ) if resp.status_code in (200, 503): return resp.json() # 401/403/404 etc. — those genuinely are errors try: body = resp.json() except Exception: body = resp.text raise ClientError(resp.status_code, body) def list_runs(self, *, entity_type: Optional[str] = None, since: Optional[str] = None, limit: int = 50, offset: int = 0) -> dict: """GET /tenants/{tid}/runs — same filters as the JSON variant of Stage 49.""" # filter None at construction — httpx serializes None as the # literal string "None", which FastAPI then treats as the # filter value (matching no rows). Building the dict # selectively avoids the transport-level None-handling # difference between urllib and httpx. params: Dict[str, Any] = {"limit": limit, "offset": offset} if entity_type is not None: params["entity_type"] = entity_type if since is not None: params["since"] = since return self._request( "GET", f"/tenants/{self.tenant_id}/runs", params=params, ).json() def get_run(self, run_id: str) -> dict: """GET /runs/{run_id} — the run detail row.""" return self._request("GET", f"/runs/{run_id}").json() def get_issues(self, run_id: str) -> dict: """GET /runs/{run_id}/issues — issues list as JSON.""" return self._request( "GET", f"/runs/{run_id}/issues", ).json() def download_issues_csv(self, run_id: str) -> str: """GET /runs/{run_id}/issues.csv — Stage 47 CSV download as a plain string (caller writes to disk or pipes to pandas).""" return self._request( "GET", f"/runs/{run_id}/issues.csv", ).text def download_decisions_csv(self, run_id: str) -> str: """GET /runs/{run_id}/decisions.csv — Stage 48. Decisions snapshot merged with triage state (status/owner/notes) in one wide row per decision. Returns CSV text.""" return self._request( "GET", f"/runs/{run_id}/decisions.csv", ).text def download_runs_csv(self, *, entity_type: Optional[str] = None, since: Optional[str] = None) -> str: """GET /tenants/{tid}/runs.csv — Stage 50. Compliance ""all runs in Q1"" pull. Same filters as list_runs JSON (entity_type narrowing, since cutoff). Returns CSV text.""" params: Dict[str, Any] = {} if entity_type is not None: params["entity_type"] = entity_type if since is not None: params["since"] = since return self._request( "GET", f"/tenants/{self.tenant_id}/runs.csv", params=params or None, ).text def get_overrides(self) -> dict: """GET /tenants/{tid}/overrides — Stage 59. Returns the customer's current threshold + weights overrides.""" return self._request( "GET", f"/tenants/{self.tenant_id}/overrides", ).json() def preview_run(self, entity_type: str) -> dict: """GET /tenants/{tid}/runs/preview — Stage 45. Dry-run analysis, no DB writes. Returns top issues + counts.""" return self._request( "GET", f"/tenants/{self.tenant_id}/runs/preview", params={"entity_type": entity_type}, ).json() def download_backfill_csv(self, entity_type: str, *, from_day: Optional[str] = None, until_day: Optional[str] = None, step_days: int = 1) -> str: """GET /tenants/{tid}/backfill.csv — Stage 67. Returns plain CSV text so the caller can write to disk or feed pandas. The chart-friendly wide layout matches the HTTP route exactly.""" params: Dict[str, Any] = { "entity_type": entity_type, "step_days": step_days, } if from_day is not None: params["from"] = from_day if until_day is not None: params["until"] = until_day return self._request( "GET", f"/tenants/{self.tenant_id}/backfill.csv", params=params, ).text def backfill(self, entity_type: str, *, from_day: Optional[str] = None, until_day: Optional[str] = None, step_days: int = 1) -> dict: """GET /tenants/{tid}/backfill — Stage 66. Point-in-time historical analysis over a date range. Read-only, no DB writes. Customer dashboard widget pattern: "show me how we looked over Q1".""" params: Dict[str, Any] = { "entity_type": entity_type, "step_days": step_days, } # the route accepts `from`/`until` (Python keywords elsewhere, # but valid query-string keys); httpx will encode them safely if from_day is not None: params["from"] = from_day if until_day is not None: params["until"] = until_day return self._request( "GET", f"/tenants/{self.tenant_id}/backfill", params=params, ).json() # --- write surface (operator role, Stage 63) ------------------- def ingest_observations(self, entity_type: str, observations: list) -> dict: """POST /tenants/{tid}/observations — push observations to the buffer (Stage 7d). Idempotent on (entity_id, day) per (tenant, entity_type) — re-pushing the same day overwrites. Requires operator-role tenant key.""" return self._request( "POST", f"/tenants/{self.tenant_id}/observations", json={"entity_type": entity_type, "observations": observations}, ).json() def register_calibration(self, vertical: str, entity_type: str, observations: list) -> dict: """POST /tenants/{tid}/calibrations — derive + persist a calibration from the passed observations. Requires operator-role tenant key. For "calibrate from stored observations", use the admin endpoint via an AdminClient (future Stage) — there's deliberately no shortcut on the customer SDK; recalibration is operationally significant and should go through ops.""" return self._request( "POST", f"/tenants/{self.tenant_id}/calibrations", json={"vertical": vertical, "entity_type": entity_type, "observations": observations}, ).json() def create_schedule(self, *, vertical: str, entity_type: str, connector_type: str, connector_config: Optional[dict] = None, frequency: str = "daily", enabled: bool = True) -> dict: """POST /tenants/{tid}/schedules — register a new scheduled ingestion (Stage 39). Customer can self-serve onboard their connector. Requires operator-role tenant key.""" body: Dict[str, Any] = { "vertical": vertical, "entity_type": entity_type, "connector_type": connector_type, "frequency": frequency, "enabled": enabled, } if connector_config is not None: body["connector_config"] = connector_config return self._request( "POST", f"/tenants/{self.tenant_id}/schedules", json=body, ).json() def enable_schedule(self, schedule_id: str) -> dict: """POST /schedules/{sid}/enable — toggle a paused schedule back on. Requires operator-role tenant key (the route checks that the schedule's tenant matches the caller).""" return self._request( "POST", f"/schedules/{schedule_id}/enable", ).json() def disable_schedule(self, schedule_id: str) -> dict: """POST /schedules/{sid}/disable — pause a schedule without deleting it. Useful for testing windows or holidays.""" return self._request( "POST", f"/schedules/{schedule_id}/disable", ).json() # --- webhooks (Stage 73) ----------------------------------------- def list_webhooks(self, *, enabled_only: bool = False) -> dict: """GET /tenants/{tid}/webhooks — Stage 73. secret_hash and secret_raw are NOT in the response (stripped server-side).""" params: Dict[str, Any] = {} if enabled_only: params["enabled_only"] = enabled_only return self._request( "GET", f"/tenants/{self.tenant_id}/webhooks", params=params or None, ).json() def create_webhook(self, url: str, *, secret: Optional[str] = None) -> dict: """POST /tenants/{tid}/webhooks — register a webhook. The raw secret (if passed) comes back in the response ONCE and is never retrievable again; the caller wires it into their receiver's signature verification.""" body: Dict[str, Any] = {"url": url} if secret is not None: body["secret"] = secret return self._request( "POST", f"/tenants/{self.tenant_id}/webhooks", json=body, ).json() def get_webhook(self, webhook_id: str) -> dict: """GET /webhooks/{wid} — Stage 73.""" return self._request( "GET", f"/webhooks/{webhook_id}", ).json() def enable_webhook(self, webhook_id: str) -> dict: """POST /webhooks/{wid}/enable — Stage 73.""" return self._request( "POST", f"/webhooks/{webhook_id}/enable", ).json() def disable_webhook(self, webhook_id: str) -> dict: """POST /webhooks/{wid}/disable — Stage 73. Soft-pause: row preserved, no future deliveries.""" return self._request( "POST", f"/webhooks/{webhook_id}/disable", ).json() def delete_webhook(self, webhook_id: str) -> dict: """DELETE /webhooks/{wid} — Stage 73. Idempotent: returns deleted=false on a row that doesn't exist, no error.""" return self._request( "DELETE", f"/webhooks/{webhook_id}", ).json() def test_webhook(self, webhook_id: str) -> dict: """POST /webhooks/{wid}/test — Stage 74. Sends a synthetic ping payload (event=ping) to the registered URL so the customer can verify their receiver works. Returns the delivery result dict (ok / status_code / error).""" return self._request( "POST", f"/webhooks/{webhook_id}/test", ).json()