popcorn-api / database.py
multimedia-cryptography-benchmarks's picture
Upload database.py with huggingface_hub
04414e1 verified
Raw
History Blame Contribute Delete
2.44 kB
import aiohttp
from config import TURSO_DB_URL, TURSO_AUTH_TOKEN
_API_URL = TURSO_DB_URL.replace("libsql://", "https://") + "/v2/pipeline"
_session: aiohttp.ClientSession | None = None
async def _get_session() -> aiohttp.ClientSession:
global _session
if _session is None:
_session = aiohttp.ClientSession(
headers={"Authorization": f"Bearer {TURSO_AUTH_TOKEN}"}
)
return _session
async def _query(sql: str, args: list = None) -> dict:
session = await _get_session()
stmt = {"sql": sql}
if args:
stmt["args"] = [{"type": "text", "value": str(a)} if a is not None else {"type": "null"} for a in args]
payload = {"requests": [{"type": "execute", "stmt": stmt}]}
async with session.post(_API_URL, json=payload) as resp:
data = await resp.json()
if "results" not in data:
raise Exception(f"Turso response missing 'results': {data}")
result = data["results"][0]
if result["type"] == "error":
err = result.get("error", {})
raise Exception(f"Turso error: {err.get('message', str(result))}")
return result["response"]["result"]
def _convert_val(cell: dict):
t = cell.get("type")
v = cell.get("value")
if v is None:
return None
if t == "integer":
return int(v)
if t == "real":
return float(v)
if t == "boolean":
return v.lower() in ("true", "1")
return v
class _Result:
def __init__(self, data: dict):
self._data = data
self.columns = tuple(c["name"] for c in data.get("cols", []))
raw = data.get("rows", [])
self.rows = [[_convert_val(cell) for cell in row] for row in raw]
async def fetch(sql: str, args: list = None):
data = await _query(sql, args)
return _Result(data)
async def fetch_all(sql: str, args: list = None) -> list[dict]:
result = await _query(sql, args)
cols = [c["name"] for c in result.get("cols", [])]
rows = result.get("rows", [])
return [dict(zip(cols, [_convert_val(cell) for cell in row])) for row in rows]
async def fetch_one(sql: str, args: list = None) -> dict | None:
rows = await fetch_all(sql, args)
return rows[0] if rows else None
def row_to_dict(row, columns: tuple = None) -> dict | None:
if row is None:
return None
if columns is not None:
return dict(zip(columns, row))
return dict(row) if isinstance(row, dict) else None