File size: 9,764 Bytes
d2d1903 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 | """
orgstate_client.admin β admin-side SDK.
Stage 64: separate class for /admin/* endpoints. Admin operations
act ACROSS tenants (list_all, pause/resume, recalibrate, etc.) β
shaping them like the customer-side Client (which is tenant-scoped
at construction) would be misleading. AdminClient takes no
tenant_id at construction; tenant_id is passed per-method when
relevant.
Shares the transport seam with Client so tests can plug TestClient
in just the same way:
AdminClient(base_url="http://test", admin_key=raw,
transport=TestClient(create_app(dbfile)))
In production:
AdminClient(base_url="https://orgstate.example.com",
admin_key=os.environ["ORGSTATE_ADMIN_KEY"])
"""
from __future__ import annotations
from typing import Any, Dict, Optional
from .client import ClientError, _UrllibTransport
class AdminClient:
"""Wraps the /admin/* HTTP routes. Methods follow the route names
where natural β no fancy abstractions, just thin shells the
operator can compose into automation scripts."""
def __init__(self, *,
base_url: str,
admin_key: str,
transport: Any = None,
timeout: float = 30.0):
self.base_url = base_url.rstrip("/")
self.admin_key = admin_key
self.transport = transport or _UrllibTransport(
base_url, timeout=timeout,
)
# --- internals ---------------------------------------------------
def _request(self, method: str, path: str, *,
params: Optional[Dict[str, Any]] = None,
json: Any = None) -> Any:
headers = {"Authorization": f"Bearer {self.admin_key}"}
resp = self.transport.request(
method, path, headers=headers, params=params, json=json,
)
if not (200 <= resp.status_code < 300):
try:
body = resp.json()
except Exception:
body = resp.text
raise ClientError(resp.status_code, body)
return resp
# --- tenants -----------------------------------------------------
def list_all_tenants(self) -> dict:
"""GET /admin/tenants β every tenant with its current status."""
return self._request("GET", "/admin/tenants").json()
def pause_tenant(self, tenant_id: str) -> dict:
"""POST /admin/tenants/{tid}/pause β freeze writes (run/
calibrate/ingest/schedule tick), reads still work."""
return self._request(
"POST", f"/admin/tenants/{tenant_id}/pause",
).json()
def resume_tenant(self, tenant_id: str) -> dict:
"""POST /admin/tenants/{tid}/resume β un-freeze."""
return self._request(
"POST", f"/admin/tenants/{tenant_id}/resume",
).json()
def platform_health(self, *,
max_age_days: float = 90.0,
max_gap_hours: float = 48.0) -> dict:
"""GET /admin/tenants/health β Stage 35. Every tenant's
health in one call, sorted worst-first."""
return self._request(
"GET", "/admin/tenants/health",
params={"max_age_days": max_age_days,
"max_gap_hours": max_gap_hours},
).json()
# --- fix-it commands (Stage 55) ---------------------------------
def recalibrate(self, tenant_id: str, entity_type: str, *,
min_observations: int = 14) -> dict:
"""POST /admin/tenants/{tid}/recalibrate β Stage 30/55. Pulls
every stored observation for (tenant, entity_type), derives
a fresh calibration."""
return self._request(
"POST", f"/admin/tenants/{tenant_id}/recalibrate",
json={"entity_type": entity_type,
"min_observations": min_observations},
).json()
def trigger_run(self, tenant_id: str, entity_type: str) -> dict:
"""POST /admin/tenants/{tid}/runs/trigger β Stage 31/55. Run
the pipeline now using stored observations + calibration."""
return self._request(
"POST", f"/admin/tenants/{tenant_id}/runs/trigger",
json={"entity_type": entity_type},
).json()
# --- overrides (Stages 58/59) -----------------------------------
def set_overrides(self, tenant_id: str, entity_type: str,
overrides: dict) -> dict:
"""PUT /admin/tenants/{tid}/overrides/{et} β Stage 59. Body
is the full overrides object (e.g.
``{"severity_thresholds": {"critical": 0.85}}``)."""
return self._request(
"PUT",
f"/admin/tenants/{tenant_id}/overrides/{entity_type}",
json=overrides,
).json()
def clear_overrides(self, tenant_id: str,
entity_type: str) -> dict:
"""DELETE /admin/tenants/{tid}/overrides/{et} β idempotent
(deleted=false on a row that doesn't exist, 200 not 404)."""
return self._request(
"DELETE",
f"/admin/tenants/{tenant_id}/overrides/{entity_type}",
).json()
# --- schedules (Stages 56/57) -----------------------------------
def list_all_schedules(self, *,
enabled_only: bool = False,
status: Optional[str] = None,
tenant_id: Optional[str] = None) -> dict:
"""GET /admin/schedules β Stage 57. cross-tenant view with
optional filters (status='ok'/'skipped'/'error'/'never')."""
params: Dict[str, Any] = {"enabled_only": enabled_only}
if status is not None:
params["status"] = status
if tenant_id is not None:
params["tenant_id"] = tenant_id
return self._request(
"GET", "/admin/schedules", params=params,
).json()
def tick_schedules(self) -> dict:
"""POST /admin/schedules/tick β Stage 56. Force one tick now,
same payload as the CLI `schedule tick`."""
return self._request(
"POST", "/admin/schedules/tick",
).json()
# --- ops summary + audit (Stages 46, 53) ------------------------
def ops_summary(self, *,
max_age_days: float = 90.0,
max_gap_hours: float = 48.0) -> dict:
"""GET /admin/ops-summary β Stage 46. Bird's-eye platform
overview (tenants/schedules/runs/calibrations counts)."""
return self._request(
"GET", "/admin/ops-summary",
params={"max_age_days": max_age_days,
"max_gap_hours": max_gap_hours},
).json()
def audit_list(self, *,
actor: Optional[str] = None,
action: Optional[str] = None,
tenant_id: Optional[str] = None,
since: Optional[str] = None,
until: Optional[str] = None,
limit: int = 100,
offset: int = 0) -> dict:
"""GET /admin/audit β Stage 53. All audit filters supported."""
params: Dict[str, Any] = {"limit": limit, "offset": offset}
for k, v in (("actor", actor), ("action", action),
("tenant_id", tenant_id),
("since", since), ("until", until)):
if v is not None:
params[k] = v
return self._request(
"GET", "/admin/audit", params=params,
).json()
# --- webhooks (Stage 73) ---------------------------------------
def list_webhooks(self, tenant_id: str, *,
enabled_only: bool = False) -> dict:
"""GET /tenants/{tid}/webhooks β admin variant takes the
tenant_id explicitly (vs Client which is tenant-scoped)."""
params: Dict[str, Any] = {}
if enabled_only:
params["enabled_only"] = enabled_only
return self._request(
"GET", f"/tenants/{tenant_id}/webhooks",
params=params or None,
).json()
def create_webhook(self, tenant_id: str, url: str, *,
secret: Optional[str] = None) -> dict:
"""POST /tenants/{tid}/webhooks. raw secret returns ONCE."""
body: Dict[str, Any] = {"url": url}
if secret is not None:
body["secret"] = secret
return self._request(
"POST", f"/tenants/{tenant_id}/webhooks",
json=body,
).json()
def delete_webhook(self, webhook_id: str) -> dict:
"""DELETE /webhooks/{wid} β idempotent."""
return self._request(
"DELETE", f"/webhooks/{webhook_id}",
).json()
def test_webhook(self, webhook_id: str) -> dict:
"""POST /webhooks/{wid}/test β Stage 74. Synthetic ping
for end-to-end verification. Same shape as customer SDK."""
return self._request(
"POST", f"/webhooks/{webhook_id}/test",
).json()
def download_audit_csv(self, *,
actor: Optional[str] = None,
action: Optional[str] = None,
tenant_id: Optional[str] = None,
since: Optional[str] = None,
until: Optional[str] = None) -> str:
"""GET /admin/audit.csv β Stage 53. CSV download for
compliance pulls (returns raw text)."""
params: Dict[str, Any] = {}
for k, v in (("actor", actor), ("action", action),
("tenant_id", tenant_id),
("since", since), ("until", until)):
if v is not None:
params[k] = v
return self._request(
"GET", "/admin/audit.csv", params=params,
).text
|