| import aiohttp |
| from config import TURSO_DB_URL, TURSO_AUTH_TOKEN |
|
|
| _API_URL = TURSO_DB_URL.replace("libsql://", "https://") + "/v2/pipeline" |
| _session: aiohttp.ClientSession | None = None |
|
|
|
|
| async def _get_session() -> aiohttp.ClientSession: |
| global _session |
| if _session is None: |
| _session = aiohttp.ClientSession( |
| headers={"Authorization": f"Bearer {TURSO_AUTH_TOKEN}"} |
| ) |
| return _session |
|
|
|
|
| async def _query(sql: str, args: list = None) -> dict: |
| session = await _get_session() |
| stmt = {"sql": sql} |
| if args: |
| stmt["args"] = [{"type": "text", "value": str(a)} if a is not None else {"type": "null"} for a in args] |
| payload = {"requests": [{"type": "execute", "stmt": stmt}]} |
| async with session.post(_API_URL, json=payload) as resp: |
| data = await resp.json() |
| if "results" not in data: |
| raise Exception(f"Turso response missing 'results': {data}") |
| result = data["results"][0] |
| if result["type"] == "error": |
| err = result.get("error", {}) |
| raise Exception(f"Turso error: {err.get('message', str(result))}") |
| return result["response"]["result"] |
|
|
|
|
| def _convert_val(cell: dict): |
| t = cell.get("type") |
| v = cell.get("value") |
| if v is None: |
| return None |
| if t == "integer": |
| return int(v) |
| if t == "real": |
| return float(v) |
| if t == "boolean": |
| return v.lower() in ("true", "1") |
| return v |
|
|
|
|
| class _Result: |
| def __init__(self, data: dict): |
| self._data = data |
| self.columns = tuple(c["name"] for c in data.get("cols", [])) |
| raw = data.get("rows", []) |
| self.rows = [[_convert_val(cell) for cell in row] for row in raw] |
|
|
|
|
| async def fetch(sql: str, args: list = None): |
| data = await _query(sql, args) |
| return _Result(data) |
|
|
|
|
| async def fetch_all(sql: str, args: list = None) -> list[dict]: |
| result = await _query(sql, args) |
| cols = [c["name"] for c in result.get("cols", [])] |
| rows = result.get("rows", []) |
| return [dict(zip(cols, [_convert_val(cell) for cell in row])) for row in rows] |
|
|
|
|
| async def fetch_one(sql: str, args: list = None) -> dict | None: |
| rows = await fetch_all(sql, args) |
| return rows[0] if rows else None |
|
|
|
|
| def row_to_dict(row, columns: tuple = None) -> dict | None: |
| if row is None: |
| return None |
| if columns is not None: |
| return dict(zip(columns, row)) |
| return dict(row) if isinstance(row, dict) else None |
|
|