""" orgstate_client.admin — admin-side SDK. Stage 64: separate class for /admin/* endpoints. Admin operations act ACROSS tenants (list_all, pause/resume, recalibrate, etc.) — shaping them like the customer-side Client (which is tenant-scoped at construction) would be misleading. AdminClient takes no tenant_id at construction; tenant_id is passed per-method when relevant. Shares the transport seam with Client so tests can plug TestClient in just the same way: AdminClient(base_url="http://test", admin_key=raw, transport=TestClient(create_app(dbfile))) In production: AdminClient(base_url="https://orgstate.example.com", admin_key=os.environ["ORGSTATE_ADMIN_KEY"]) """ from __future__ import annotations from typing import Any, Dict, Optional from .client import ClientError, _UrllibTransport class AdminClient: """Wraps the /admin/* HTTP routes. Methods follow the route names where natural — no fancy abstractions, just thin shells the operator can compose into automation scripts.""" def __init__(self, *, base_url: str, admin_key: str, transport: Any = None, timeout: float = 30.0): self.base_url = base_url.rstrip("/") self.admin_key = admin_key self.transport = transport or _UrllibTransport( base_url, timeout=timeout, ) # --- internals --------------------------------------------------- def _request(self, method: str, path: str, *, params: Optional[Dict[str, Any]] = None, json: Any = None) -> Any: headers = {"Authorization": f"Bearer {self.admin_key}"} resp = self.transport.request( method, path, headers=headers, params=params, json=json, ) if not (200 <= resp.status_code < 300): try: body = resp.json() except Exception: body = resp.text raise ClientError(resp.status_code, body) return resp # --- tenants ----------------------------------------------------- def list_all_tenants(self) -> dict: """GET /admin/tenants — every tenant with its current status.""" return self._request("GET", "/admin/tenants").json() def pause_tenant(self, tenant_id: str) -> dict: """POST /admin/tenants/{tid}/pause — freeze writes (run/ calibrate/ingest/schedule tick), reads still work.""" return self._request( "POST", f"/admin/tenants/{tenant_id}/pause", ).json() def resume_tenant(self, tenant_id: str) -> dict: """POST /admin/tenants/{tid}/resume — un-freeze.""" return self._request( "POST", f"/admin/tenants/{tenant_id}/resume", ).json() def platform_health(self, *, max_age_days: float = 90.0, max_gap_hours: float = 48.0) -> dict: """GET /admin/tenants/health — Stage 35. Every tenant's health in one call, sorted worst-first.""" return self._request( "GET", "/admin/tenants/health", params={"max_age_days": max_age_days, "max_gap_hours": max_gap_hours}, ).json() # --- fix-it commands (Stage 55) --------------------------------- def recalibrate(self, tenant_id: str, entity_type: str, *, min_observations: int = 14) -> dict: """POST /admin/tenants/{tid}/recalibrate — Stage 30/55. Pulls every stored observation for (tenant, entity_type), derives a fresh calibration.""" return self._request( "POST", f"/admin/tenants/{tenant_id}/recalibrate", json={"entity_type": entity_type, "min_observations": min_observations}, ).json() def trigger_run(self, tenant_id: str, entity_type: str) -> dict: """POST /admin/tenants/{tid}/runs/trigger — Stage 31/55. Run the pipeline now using stored observations + calibration.""" return self._request( "POST", f"/admin/tenants/{tenant_id}/runs/trigger", json={"entity_type": entity_type}, ).json() # --- overrides (Stages 58/59) ----------------------------------- def set_overrides(self, tenant_id: str, entity_type: str, overrides: dict) -> dict: """PUT /admin/tenants/{tid}/overrides/{et} — Stage 59. Body is the full overrides object (e.g. ``{"severity_thresholds": {"critical": 0.85}}``).""" return self._request( "PUT", f"/admin/tenants/{tenant_id}/overrides/{entity_type}", json=overrides, ).json() def clear_overrides(self, tenant_id: str, entity_type: str) -> dict: """DELETE /admin/tenants/{tid}/overrides/{et} — idempotent (deleted=false on a row that doesn't exist, 200 not 404).""" return self._request( "DELETE", f"/admin/tenants/{tenant_id}/overrides/{entity_type}", ).json() # --- schedules (Stages 56/57) ----------------------------------- def list_all_schedules(self, *, enabled_only: bool = False, status: Optional[str] = None, tenant_id: Optional[str] = None) -> dict: """GET /admin/schedules — Stage 57. cross-tenant view with optional filters (status='ok'/'skipped'/'error'/'never').""" params: Dict[str, Any] = {"enabled_only": enabled_only} if status is not None: params["status"] = status if tenant_id is not None: params["tenant_id"] = tenant_id return self._request( "GET", "/admin/schedules", params=params, ).json() def tick_schedules(self) -> dict: """POST /admin/schedules/tick — Stage 56. Force one tick now, same payload as the CLI `schedule tick`.""" return self._request( "POST", "/admin/schedules/tick", ).json() # --- ops summary + audit (Stages 46, 53) ------------------------ def ops_summary(self, *, max_age_days: float = 90.0, max_gap_hours: float = 48.0) -> dict: """GET /admin/ops-summary — Stage 46. Bird's-eye platform overview (tenants/schedules/runs/calibrations counts).""" return self._request( "GET", "/admin/ops-summary", params={"max_age_days": max_age_days, "max_gap_hours": max_gap_hours}, ).json() def audit_list(self, *, actor: Optional[str] = None, action: Optional[str] = None, tenant_id: Optional[str] = None, since: Optional[str] = None, until: Optional[str] = None, limit: int = 100, offset: int = 0) -> dict: """GET /admin/audit — Stage 53. All audit filters supported.""" params: Dict[str, Any] = {"limit": limit, "offset": offset} for k, v in (("actor", actor), ("action", action), ("tenant_id", tenant_id), ("since", since), ("until", until)): if v is not None: params[k] = v return self._request( "GET", "/admin/audit", params=params, ).json() # --- webhooks (Stage 73) --------------------------------------- def list_webhooks(self, tenant_id: str, *, enabled_only: bool = False) -> dict: """GET /tenants/{tid}/webhooks — admin variant takes the tenant_id explicitly (vs Client which is tenant-scoped).""" params: Dict[str, Any] = {} if enabled_only: params["enabled_only"] = enabled_only return self._request( "GET", f"/tenants/{tenant_id}/webhooks", params=params or None, ).json() def create_webhook(self, tenant_id: str, url: str, *, secret: Optional[str] = None) -> dict: """POST /tenants/{tid}/webhooks. raw secret returns ONCE.""" body: Dict[str, Any] = {"url": url} if secret is not None: body["secret"] = secret return self._request( "POST", f"/tenants/{tenant_id}/webhooks", json=body, ).json() def delete_webhook(self, webhook_id: str) -> dict: """DELETE /webhooks/{wid} — idempotent.""" return self._request( "DELETE", f"/webhooks/{webhook_id}", ).json() def test_webhook(self, webhook_id: str) -> dict: """POST /webhooks/{wid}/test — Stage 74. Synthetic ping for end-to-end verification. Same shape as customer SDK.""" return self._request( "POST", f"/webhooks/{webhook_id}/test", ).json() def download_audit_csv(self, *, actor: Optional[str] = None, action: Optional[str] = None, tenant_id: Optional[str] = None, since: Optional[str] = None, until: Optional[str] = None) -> str: """GET /admin/audit.csv — Stage 53. CSV download for compliance pulls (returns raw text).""" params: Dict[str, Any] = {} for k, v in (("actor", actor), ("action", action), ("tenant_id", tenant_id), ("since", since), ("until", until)): if v is not None: params[k] = v return self._request( "GET", "/admin/audit.csv", params=params, ).text