Spaces:
Runtime error
Runtime error
| """HTTP client for the WayyDB service.""" | |
| from __future__ import annotations | |
| import json | |
| from pathlib import Path | |
| from typing import Any, NoReturn, Optional | |
| import httpx | |
| from wayy_db.cli.config import get_server_url | |
| # The API uses /api/v1/{db_name}/... for OLTP routes but db_name is unused | |
| # server-side (single global db). We hardcode "default" for forward compat. | |
| _DB_NAME = "default" | |
| class WayyClientError(Exception): | |
| """Raised when the WayyDB service returns an error.""" | |
| def __init__(self, status_code: int, detail: str) -> None: | |
| self.status_code = status_code | |
| self.detail = detail | |
| super().__init__(f"HTTP {status_code}: {detail}") | |
| class WayyClient: | |
| """Synchronous HTTP client for the WayyDB REST API.""" | |
| def __init__(self, base_url: Optional[str] = None, timeout: float = 30.0) -> None: | |
| self.base_url = (base_url or get_server_url()).rstrip("/") | |
| self._client = httpx.Client(base_url=self.base_url, timeout=timeout) | |
| def _request(self, method: str, path: str, **kwargs: Any) -> Any: | |
| """Make an HTTP request and return JSON response.""" | |
| try: | |
| resp = self._client.request(method, path, **kwargs) | |
| except httpx.ConnectError: | |
| raise WayyClientError(0, f"Cannot connect to {self.base_url}") | |
| if resp.status_code >= 400: | |
| try: | |
| detail = resp.json().get("detail", resp.text) | |
| except Exception: | |
| detail = resp.text | |
| raise WayyClientError(resp.status_code, detail) | |
| if resp.status_code == 204 or not resp.content: | |
| return {} | |
| return resp.json() | |
| # --- Health --- | |
| def health(self) -> dict[str, Any]: | |
| return self._request("GET", "/health") | |
| def info(self) -> dict[str, Any]: | |
| return self._request("GET", "/") | |
| # --- Tables --- | |
| def list_tables(self) -> list[str]: | |
| data = self._request("GET", "/tables") | |
| return data.get("tables", []) | |
| def get_table_info(self, name: str) -> dict[str, Any]: | |
| return self._request("GET", f"/tables/{name}") | |
| def get_table_data( | |
| self, name: str, limit: int = 100, offset: int = 0 | |
| ) -> dict[str, Any]: | |
| return self._request( | |
| "GET", f"/tables/{name}/data", params={"limit": limit, "offset": offset} | |
| ) | |
| def create_table( | |
| self, | |
| name: str, | |
| columns: list[dict[str, str]], | |
| primary_key: Optional[str] = None, | |
| sorted_by: Optional[str] = None, | |
| ) -> dict[str, Any]: | |
| payload = { | |
| "name": name, | |
| "columns": columns, | |
| "primary_key": primary_key, | |
| "sorted_by": sorted_by, | |
| } | |
| return self._request("POST", f"/api/v1/{_DB_NAME}/tables", json=payload) | |
| def drop_table(self, name: str) -> dict[str, Any]: | |
| return self._request("DELETE", f"/tables/{name}") | |
| def upload_table(self, table_data: dict[str, Any]) -> dict[str, Any]: | |
| return self._request("POST", "/tables/upload", json=table_data) | |
| def append_rows(self, name: str, columns: list[dict[str, Any]]) -> dict[str, Any]: | |
| return self._request("POST", f"/tables/{name}/append", json={"columns": columns}) | |
| # --- OLTP --- | |
| def insert_row(self, table: str, data: dict[str, Any]) -> dict[str, Any]: | |
| return self._request( | |
| "POST", f"/api/v1/{_DB_NAME}/tables/{table}/rows", json={"data": data} | |
| ) | |
| def get_row(self, table: str, pk: str) -> dict[str, Any]: | |
| return self._request("GET", f"/api/v1/{_DB_NAME}/tables/{table}/rows/{pk}") | |
| def update_row(self, table: str, pk: str, data: dict[str, Any]) -> dict[str, Any]: | |
| return self._request( | |
| "PUT", f"/api/v1/{_DB_NAME}/tables/{table}/rows/{pk}", json={"data": data} | |
| ) | |
| def delete_row(self, table: str, pk: str) -> dict[str, Any]: | |
| return self._request("DELETE", f"/api/v1/{_DB_NAME}/tables/{table}/rows/{pk}") | |
| def filter_rows( | |
| self, table: str, filters: Optional[dict[str, str]] = None, limit: int = 500 | |
| ) -> dict[str, Any]: | |
| params = dict(filters or {}) | |
| params["limit"] = str(limit) | |
| return self._request( | |
| "GET", f"/api/v1/{_DB_NAME}/tables/{table}/rows", params=params | |
| ) | |
| # --- Aggregations --- | |
| def aggregate(self, table: str, column: str, op: str) -> dict[str, Any]: | |
| return self._request("GET", f"/tables/{table}/agg/{column}/{op}") | |
| # --- Joins --- | |
| def as_of_join( | |
| self, left: str, right: str, on: list[str], as_of: str | |
| ) -> dict[str, Any]: | |
| payload = {"left_table": left, "right_table": right, "on": on, "as_of": as_of} | |
| return self._request("POST", "/join/aj", json=payload) | |
| def window_join( | |
| self, | |
| left: str, | |
| right: str, | |
| on: list[str], | |
| as_of: str, | |
| before: int, | |
| after: int, | |
| ) -> dict[str, Any]: | |
| payload = { | |
| "left_table": left, | |
| "right_table": right, | |
| "on": on, | |
| "as_of": as_of, | |
| "window_before": before, | |
| "window_after": after, | |
| } | |
| return self._request("POST", "/join/wj", json=payload) | |
| # --- Window functions --- | |
| def window_function( | |
| self, | |
| table: str, | |
| column: str, | |
| operation: str, | |
| window: Optional[int] = None, | |
| alpha: Optional[float] = None, | |
| ) -> dict[str, Any]: | |
| payload: dict[str, Any] = { | |
| "table": table, | |
| "column": column, | |
| "operation": operation, | |
| } | |
| if window is not None: | |
| payload["window"] = window | |
| if alpha is not None: | |
| payload["alpha"] = alpha | |
| return self._request("POST", "/window", json=payload) | |
| # --- Streaming --- | |
| def ingest_tick(self, table: str, tick: dict[str, Any]) -> dict[str, Any]: | |
| return self._request("POST", f"/ingest/{table}", json=tick) | |
| def ingest_batch(self, table: str, ticks: list[dict[str, Any]]) -> dict[str, Any]: | |
| return self._request("POST", f"/ingest/{table}/batch", json={"ticks": ticks}) | |
| def get_streaming_stats(self) -> dict[str, Any]: | |
| return self._request("GET", "/streaming/stats") | |
| def get_quote(self, table: str, symbol: str) -> dict[str, Any]: | |
| return self._request("GET", f"/streaming/quote/{table}/{symbol}") | |
| def get_all_quotes(self, table: str) -> dict[str, Any]: | |
| return self._request("GET", f"/streaming/quotes/{table}") | |
| # --- KV Store --- | |
| def kv_get(self, key: str) -> Any: | |
| data = self._request("GET", f"/kv/{key}") | |
| return data.get("value") | |
| def kv_set(self, key: str, value: Any, ttl: Optional[float] = None) -> dict[str, Any]: | |
| payload: dict[str, Any] = {"value": value} | |
| if ttl is not None: | |
| payload["ttl"] = ttl | |
| return self._request("POST", f"/kv/{key}", json=payload) | |
| def kv_delete(self, key: str) -> dict[str, Any]: | |
| return self._request("DELETE", f"/kv/{key}") | |
| def kv_list(self, pattern: Optional[str] = None) -> list[str]: | |
| params = {} | |
| if pattern: | |
| params["pattern"] = pattern | |
| data = self._request("GET", "/kv", params=params) | |
| return data.get("keys", []) | |
| # --- Checkpoint --- | |
| def checkpoint(self) -> dict[str, Any]: | |
| return self._request("POST", f"/api/v1/{_DB_NAME}/checkpoint") | |
| def close(self) -> None: | |
| self._client.close() | |
| def __enter__(self) -> "WayyClient": | |
| return self | |
| def __exit__(self, *args: Any) -> None: | |
| self.close() | |
| def upload_csv( | |
| client: WayyClient, name: str, file_path: Path, sorted_by: Optional[str] = None | |
| ) -> dict[str, Any]: | |
| """Read a CSV file and upload it as a table. | |
| Uses stdlib csv to avoid requiring pandas in CLI. | |
| """ | |
| import csv | |
| with open(file_path, newline="") as f: | |
| reader = csv.reader(f) | |
| headers = next(reader) | |
| rows = list(reader) | |
| if not rows: | |
| raise ValueError("CSV file is empty (no data rows)") | |
| columns: list[dict[str, Any]] = [] | |
| for i, header in enumerate(headers): | |
| raw_values = [row[i] for row in rows] | |
| dtype, data = _infer_column(raw_values) | |
| columns.append({"name": header, "dtype": dtype, "data": data}) | |
| payload = {"name": name, "columns": columns, "sorted_by": sorted_by} | |
| return client.upload_table(payload) | |
| def _infer_column(values: list[str]) -> tuple[str, list[Any]]: | |
| """Infer column dtype from string values. Returns (dtype_name, typed_data).""" | |
| non_empty = [v for v in values if v.strip()] | |
| if not non_empty: | |
| return ("float64", [0.0] * len(values)) | |
| # Try int64 | |
| try: | |
| data = [int(v) if v.strip() else 0 for v in values] | |
| return ("int64", data) | |
| except (ValueError, OverflowError): | |
| pass | |
| # Try float64 (handles empty cells as NaN) | |
| try: | |
| data = [float(v) if v.strip() else float("nan") for v in values] | |
| return ("float64", data) | |
| except (ValueError, OverflowError): | |
| pass | |
| raise ValueError( | |
| f"Non-numeric column detected. Values: {values[:3]}... " | |
| "CSV upload currently supports numeric columns only. " | |
| "Use the Python API with from_pandas() for string/symbol columns." | |
| ) | |
| def upload_json_ticks( | |
| client: WayyClient, table: str, file_path: Path | |
| ) -> dict[str, Any]: | |
| """Read a JSON file of ticks and batch-ingest them.""" | |
| with open(file_path) as f: | |
| data = json.load(f) | |
| if isinstance(data, list): | |
| ticks = data | |
| elif isinstance(data, dict) and "ticks" in data: | |
| ticks = data["ticks"] | |
| else: | |
| raise ValueError("JSON must be a list of ticks or {\"ticks\": [...]}") | |
| return client.ingest_batch(table, ticks) | |