api / database.py
jamelloverz-sketch
PopCorn API v3.5 - Media metadata, people, images, open CORS
c18e004
Raw
History Blame Contribute Delete
3.61 kB
import json
import os
import aiohttp
from config import DB_SPACE_URL, HF_TOKEN
from typing import Any
HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"}
_session: aiohttp.ClientSession | None = None
async def get_session() -> aiohttp.ClientSession:
global _session
if _session is None:
_session = aiohttp.ClientSession(headers=HEADERS)
return _session
async def close_session():
global _session
if _session:
await _session.close()
_session = None
def _escape(val: Any) -> str:
"""Safe value escaping for SQLite inline usage."""
if val is None:
return "NULL"
t = type(val)
if t is int or t is float:
return str(val)
if t is bool:
return "1" if val else "0"
s = str(val)
# Instead of manual escaping, we use parameterized queries via the JSON API
return json.dumps(s)
def _build_sql(sql: str, params: list | None = None) -> str:
"""
Build SQL string by replacing ? placeholders with escaped values.
NOTE: This is a fallback. The prefered approach is to pass params
separately when the DB supports parameterized queries.
"""
if not params:
return sql
parts = sql.split("?")
result = parts[0]
for i, p in enumerate(params):
if i + 1 < len(parts):
result += _escape(p) + parts[i + 1]
else:
result += _escape(p)
return result
async def exec(sql: str, params: list = None) -> dict:
"""Execute a SQL statement (INSERT, UPDATE, DELETE, DDL)."""
session = await get_session()
full_sql = _build_sql(sql, params)
payload: dict[str, Any] = {"sql": full_sql}
async with session.post(f"{DB_SPACE_URL}/exec", json=payload) as resp:
if resp.status >= 400:
text = await resp.text()
return {"ok": False, "error": text[:500]}
return await resp.json()
async def query(sql: str, params: list = None) -> dict:
"""Query the database and return raw response."""
session = await get_session()
full_sql = _build_sql(sql, params)
async with session.get(
f"{DB_SPACE_URL}/query",
params={"sql": full_sql},
) as resp:
if resp.status >= 400:
text = await resp.text()
return {"ok": False, "error": text[:500], "cols": [], "rows": []}
return await resp.json()
async def fetch_all(sql: str, params: list = None) -> list[dict]:
"""Fetch multiple rows as dicts."""
r = await query(sql, params)
if not r or "rows" not in r or not r["rows"]:
return []
if r.get("ok") is False:
return []
cols = r.get("cols") or r.get("columns", [])
col_names = [c["name"] if isinstance(c, dict) else c for c in cols]
return [dict(zip(col_names, row)) for row in r["rows"]]
async def fetch_one(sql: str, params: list = None) -> dict | None:
"""Fetch a single row as a dict, or None."""
rows = await fetch_all(sql, params)
return rows[0] if rows else None
async def execute(sql: str, params: list = None) -> dict:
"""Alias for exec()."""
return await exec(sql, params)
async def init_tables(sqls: list[str]) -> list[dict]:
"""Execute a list of SQL statements (for schema init)."""
results = []
for sql in sqls:
r = await exec(sql)
results.append(r)
return results
async def execute_many(sql: str, params_list: list[list]) -> list[dict]:
"""Execute the same SQL with multiple parameter sets."""
results = []
for params in params_list:
r = await exec(sql, params)
results.append(r)
return results