wayydb-api / python /wayy_db /cli /client.py
rcgalbo's picture
Deploy wayyDB to HuggingFace Spaces
bf20cb7
"""HTTP client for the WayyDB service."""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any, NoReturn, Optional
import httpx
from wayy_db.cli.config import get_server_url
# The API uses /api/v1/{db_name}/... for OLTP routes but db_name is unused
# server-side (single global db). We hardcode "default" for forward compat.
_DB_NAME = "default"
class WayyClientError(Exception):
"""Raised when the WayyDB service returns an error."""
def __init__(self, status_code: int, detail: str) -> None:
self.status_code = status_code
self.detail = detail
super().__init__(f"HTTP {status_code}: {detail}")
class WayyClient:
"""Synchronous HTTP client for the WayyDB REST API."""
def __init__(self, base_url: Optional[str] = None, timeout: float = 30.0) -> None:
self.base_url = (base_url or get_server_url()).rstrip("/")
self._client = httpx.Client(base_url=self.base_url, timeout=timeout)
def _request(self, method: str, path: str, **kwargs: Any) -> Any:
"""Make an HTTP request and return JSON response."""
try:
resp = self._client.request(method, path, **kwargs)
except httpx.ConnectError:
raise WayyClientError(0, f"Cannot connect to {self.base_url}")
if resp.status_code >= 400:
try:
detail = resp.json().get("detail", resp.text)
except Exception:
detail = resp.text
raise WayyClientError(resp.status_code, detail)
if resp.status_code == 204 or not resp.content:
return {}
return resp.json()
# --- Health ---
def health(self) -> dict[str, Any]:
return self._request("GET", "/health")
def info(self) -> dict[str, Any]:
return self._request("GET", "/")
# --- Tables ---
def list_tables(self) -> list[str]:
data = self._request("GET", "/tables")
return data.get("tables", [])
def get_table_info(self, name: str) -> dict[str, Any]:
return self._request("GET", f"/tables/{name}")
def get_table_data(
self, name: str, limit: int = 100, offset: int = 0
) -> dict[str, Any]:
return self._request(
"GET", f"/tables/{name}/data", params={"limit": limit, "offset": offset}
)
def create_table(
self,
name: str,
columns: list[dict[str, str]],
primary_key: Optional[str] = None,
sorted_by: Optional[str] = None,
) -> dict[str, Any]:
payload = {
"name": name,
"columns": columns,
"primary_key": primary_key,
"sorted_by": sorted_by,
}
return self._request("POST", f"/api/v1/{_DB_NAME}/tables", json=payload)
def drop_table(self, name: str) -> dict[str, Any]:
return self._request("DELETE", f"/tables/{name}")
def upload_table(self, table_data: dict[str, Any]) -> dict[str, Any]:
return self._request("POST", "/tables/upload", json=table_data)
def append_rows(self, name: str, columns: list[dict[str, Any]]) -> dict[str, Any]:
return self._request("POST", f"/tables/{name}/append", json={"columns": columns})
# --- OLTP ---
def insert_row(self, table: str, data: dict[str, Any]) -> dict[str, Any]:
return self._request(
"POST", f"/api/v1/{_DB_NAME}/tables/{table}/rows", json={"data": data}
)
def get_row(self, table: str, pk: str) -> dict[str, Any]:
return self._request("GET", f"/api/v1/{_DB_NAME}/tables/{table}/rows/{pk}")
def update_row(self, table: str, pk: str, data: dict[str, Any]) -> dict[str, Any]:
return self._request(
"PUT", f"/api/v1/{_DB_NAME}/tables/{table}/rows/{pk}", json={"data": data}
)
def delete_row(self, table: str, pk: str) -> dict[str, Any]:
return self._request("DELETE", f"/api/v1/{_DB_NAME}/tables/{table}/rows/{pk}")
def filter_rows(
self, table: str, filters: Optional[dict[str, str]] = None, limit: int = 500
) -> dict[str, Any]:
params = dict(filters or {})
params["limit"] = str(limit)
return self._request(
"GET", f"/api/v1/{_DB_NAME}/tables/{table}/rows", params=params
)
# --- Aggregations ---
def aggregate(self, table: str, column: str, op: str) -> dict[str, Any]:
return self._request("GET", f"/tables/{table}/agg/{column}/{op}")
# --- Joins ---
def as_of_join(
self, left: str, right: str, on: list[str], as_of: str
) -> dict[str, Any]:
payload = {"left_table": left, "right_table": right, "on": on, "as_of": as_of}
return self._request("POST", "/join/aj", json=payload)
def window_join(
self,
left: str,
right: str,
on: list[str],
as_of: str,
before: int,
after: int,
) -> dict[str, Any]:
payload = {
"left_table": left,
"right_table": right,
"on": on,
"as_of": as_of,
"window_before": before,
"window_after": after,
}
return self._request("POST", "/join/wj", json=payload)
# --- Window functions ---
def window_function(
self,
table: str,
column: str,
operation: str,
window: Optional[int] = None,
alpha: Optional[float] = None,
) -> dict[str, Any]:
payload: dict[str, Any] = {
"table": table,
"column": column,
"operation": operation,
}
if window is not None:
payload["window"] = window
if alpha is not None:
payload["alpha"] = alpha
return self._request("POST", "/window", json=payload)
# --- Streaming ---
def ingest_tick(self, table: str, tick: dict[str, Any]) -> dict[str, Any]:
return self._request("POST", f"/ingest/{table}", json=tick)
def ingest_batch(self, table: str, ticks: list[dict[str, Any]]) -> dict[str, Any]:
return self._request("POST", f"/ingest/{table}/batch", json={"ticks": ticks})
def get_streaming_stats(self) -> dict[str, Any]:
return self._request("GET", "/streaming/stats")
def get_quote(self, table: str, symbol: str) -> dict[str, Any]:
return self._request("GET", f"/streaming/quote/{table}/{symbol}")
def get_all_quotes(self, table: str) -> dict[str, Any]:
return self._request("GET", f"/streaming/quotes/{table}")
# --- KV Store ---
def kv_get(self, key: str) -> Any:
data = self._request("GET", f"/kv/{key}")
return data.get("value")
def kv_set(self, key: str, value: Any, ttl: Optional[float] = None) -> dict[str, Any]:
payload: dict[str, Any] = {"value": value}
if ttl is not None:
payload["ttl"] = ttl
return self._request("POST", f"/kv/{key}", json=payload)
def kv_delete(self, key: str) -> dict[str, Any]:
return self._request("DELETE", f"/kv/{key}")
def kv_list(self, pattern: Optional[str] = None) -> list[str]:
params = {}
if pattern:
params["pattern"] = pattern
data = self._request("GET", "/kv", params=params)
return data.get("keys", [])
# --- Checkpoint ---
def checkpoint(self) -> dict[str, Any]:
return self._request("POST", f"/api/v1/{_DB_NAME}/checkpoint")
def close(self) -> None:
self._client.close()
def __enter__(self) -> "WayyClient":
return self
def __exit__(self, *args: Any) -> None:
self.close()
def upload_csv(
client: WayyClient, name: str, file_path: Path, sorted_by: Optional[str] = None
) -> dict[str, Any]:
"""Read a CSV file and upload it as a table.
Uses stdlib csv to avoid requiring pandas in CLI.
"""
import csv
with open(file_path, newline="") as f:
reader = csv.reader(f)
headers = next(reader)
rows = list(reader)
if not rows:
raise ValueError("CSV file is empty (no data rows)")
columns: list[dict[str, Any]] = []
for i, header in enumerate(headers):
raw_values = [row[i] for row in rows]
dtype, data = _infer_column(raw_values)
columns.append({"name": header, "dtype": dtype, "data": data})
payload = {"name": name, "columns": columns, "sorted_by": sorted_by}
return client.upload_table(payload)
def _infer_column(values: list[str]) -> tuple[str, list[Any]]:
"""Infer column dtype from string values. Returns (dtype_name, typed_data)."""
non_empty = [v for v in values if v.strip()]
if not non_empty:
return ("float64", [0.0] * len(values))
# Try int64
try:
data = [int(v) if v.strip() else 0 for v in values]
return ("int64", data)
except (ValueError, OverflowError):
pass
# Try float64 (handles empty cells as NaN)
try:
data = [float(v) if v.strip() else float("nan") for v in values]
return ("float64", data)
except (ValueError, OverflowError):
pass
raise ValueError(
f"Non-numeric column detected. Values: {values[:3]}... "
"CSV upload currently supports numeric columns only. "
"Use the Python API with from_pandas() for string/symbol columns."
)
def upload_json_ticks(
client: WayyClient, table: str, file_path: Path
) -> dict[str, Any]:
"""Read a JSON file of ticks and batch-ingest them."""
with open(file_path) as f:
data = json.load(f)
if isinstance(data, list):
ticks = data
elif isinstance(data, dict) and "ticks" in data:
ticks = data["ticks"]
else:
raise ValueError("JSON must be a list of ticks or {\"ticks\": [...]}")
return client.ingest_batch(table, ticks)