ecom-qa / src /database /db.py
rnyx's picture
Initial deploy
f48b219
Raw
History Blame Contribute Delete
8.44 kB
"""
src/database/db.py
Fix #9 — Persistent query history per user
Fix #3 — URL page cache with TTL
Schema:
users → id, username, email, password_hash, created_at
queries → id, user_id, session_id, question, answer, context_preview,
confidence, intent, timestamp
url_cache → url_hash, url, text, cached_at
comparisons → id, user_id, urls_json, question, results_json, timestamp
"""
import sqlite3
import time
import logging
from pathlib import Path
from contextlib import contextmanager
logger = logging.getLogger(__name__)
DB_PATH = Path("data") / "ecom_qa.db"
CACHE_TTL_SECONDS = 60 * 60 * 6 # 6 hours
import hashlib
def _url_hash(url: str) -> str:
return hashlib.sha256(url.encode()).hexdigest()[:32]
class Database:
def __init__(self, path: Path = DB_PATH):
self.path = path
self.path.parent.mkdir(parents=True, exist_ok=True)
@contextmanager
def _conn(self):
conn = sqlite3.connect(str(self.path))
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
# ── Schema ────────────────────────────────────────────────────────────────
def init_tables(self):
with self._conn() as conn:
conn.executescript("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT DEFAULT '',
password_hash TEXT NOT NULL,
created_at REAL DEFAULT (strftime('%s','now'))
);
CREATE TABLE IF NOT EXISTS queries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
session_id TEXT,
question TEXT NOT NULL,
answer TEXT DEFAULT '',
context_preview TEXT DEFAULT '',
confidence REAL,
intent TEXT DEFAULT 'factual',
timestamp REAL DEFAULT (strftime('%s','now'))
);
CREATE TABLE IF NOT EXISTS url_cache (
url_hash TEXT PRIMARY KEY,
url TEXT NOT NULL,
text TEXT NOT NULL,
cached_at REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS comparisons (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
urls_json TEXT NOT NULL,
question TEXT NOT NULL,
results_json TEXT,
timestamp REAL DEFAULT (strftime('%s','now'))
);
CREATE INDEX IF NOT EXISTS idx_queries_user ON queries(user_id);
CREATE INDEX IF NOT EXISTS idx_queries_session ON queries(session_id);
CREATE INDEX IF NOT EXISTS idx_cache_hash ON url_cache(url_hash);
""")
logger.info("Database initialised at %s", self.path)
# ── User CRUD ─────────────────────────────────────────────────────────────
def create_user(self, username: str, email: str, password_hash: str) -> int:
with self._conn() as conn:
cur = conn.execute(
"INSERT INTO users (username, email, password_hash) VALUES (?,?,?)",
(username, email, password_hash),
)
return cur.lastrowid
def get_user_by_username(self, username: str) -> dict | None:
with self._conn() as conn:
row = conn.execute(
"SELECT * FROM users WHERE username = ?", (username,)
).fetchone()
return dict(row) if row else None
def get_user_by_id(self, user_id: int) -> dict | None:
with self._conn() as conn:
row = conn.execute(
"SELECT * FROM users WHERE id = ?", (user_id,)
).fetchone()
return dict(row) if row else None
# ── Query history ─────────────────────────────────────────────────────────
def save_query(self, user_id, session_id, question, answer,
context_preview="", confidence=None, intent="factual"):
with self._conn() as conn:
conn.execute(
"""INSERT INTO queries
(user_id, session_id, question, answer, context_preview, confidence, intent)
VALUES (?,?,?,?,?,?,?)""",
(user_id, session_id, question, answer, context_preview, confidence, intent),
)
def get_user_history(self, user_id: int, limit: int = 200) -> list[dict]:
with self._conn() as conn:
rows = conn.execute(
"""SELECT id, question, answer, confidence, intent,
datetime(timestamp, 'unixepoch', 'localtime') AS timestamp
FROM queries WHERE user_id = ?
ORDER BY timestamp DESC LIMIT ?""",
(user_id, limit),
).fetchall()
return [dict(r) for r in rows]
def get_session_history(self, session_id: str, limit: int = 50) -> list[dict]:
with self._conn() as conn:
rows = conn.execute(
"""SELECT question, answer, confidence, intent,
datetime(timestamp, 'unixepoch', 'localtime') AS timestamp
FROM queries WHERE session_id = ?
ORDER BY timestamp DESC LIMIT ?""",
(session_id, limit),
).fetchall()
return [dict(r) for r in rows]
def delete_query(self, query_id: int, user_id: int):
with self._conn() as conn:
conn.execute(
"DELETE FROM queries WHERE id=? AND user_id=?", (query_id, user_id)
)
# ── URL cache ─────────────────────────────────────────────────────────────
def get_cached_page(self, url: str) -> str | None:
key = _url_hash(url)
now = time.time()
with self._conn() as conn:
row = conn.execute(
"SELECT text, cached_at FROM url_cache WHERE url_hash=?", (key,)
).fetchone()
if row and (now - row["cached_at"]) < CACHE_TTL_SECONDS:
return row["text"]
if row:
conn.execute("DELETE FROM url_cache WHERE url_hash=?", (key,))
return None
def cache_page(self, url: str, text: str):
key = _url_hash(url)
with self._conn() as conn:
conn.execute(
"""INSERT OR REPLACE INTO url_cache (url_hash, url, text, cached_at)
VALUES (?,?,?,?)""",
(key, url, text, time.time()),
)
def clear_expired_cache(self):
cutoff = time.time() - CACHE_TTL_SECONDS
with self._conn() as conn:
conn.execute("DELETE FROM url_cache WHERE cached_at < ?", (cutoff,))
# ── Comparisons ───────────────────────────────────────────────────────────
def save_comparison(self, user_id, urls_json, question, results_json):
with self._conn() as conn:
conn.execute(
"""INSERT INTO comparisons (user_id, urls_json, question, results_json)
VALUES (?,?,?,?)""",
(user_id, urls_json, question, results_json),
)