| """ |
| src/database/db.py |
| Fix #9 — Persistent query history per user |
| Fix #3 — URL page cache with TTL |
| Schema: |
| users → id, username, email, password_hash, created_at |
| queries → id, user_id, session_id, question, answer, context_preview, |
| confidence, intent, timestamp |
| url_cache → url_hash, url, text, cached_at |
| comparisons → id, user_id, urls_json, question, results_json, timestamp |
| """ |
|
|
| import sqlite3 |
| import time |
| import logging |
| from pathlib import Path |
| from contextlib import contextmanager |
|
|
| logger = logging.getLogger(__name__) |
|
|
| DB_PATH = Path("data") / "ecom_qa.db" |
| CACHE_TTL_SECONDS = 60 * 60 * 6 |
| import hashlib |
|
|
|
|
| def _url_hash(url: str) -> str: |
| return hashlib.sha256(url.encode()).hexdigest()[:32] |
|
|
|
|
| class Database: |
| def __init__(self, path: Path = DB_PATH): |
| self.path = path |
| self.path.parent.mkdir(parents=True, exist_ok=True) |
|
|
| @contextmanager |
| def _conn(self): |
| conn = sqlite3.connect(str(self.path)) |
| conn.row_factory = sqlite3.Row |
| conn.execute("PRAGMA journal_mode=WAL") |
| conn.execute("PRAGMA foreign_keys=ON") |
| try: |
| yield conn |
| conn.commit() |
| except Exception: |
| conn.rollback() |
| raise |
| finally: |
| conn.close() |
|
|
| |
| def init_tables(self): |
| with self._conn() as conn: |
| conn.executescript(""" |
| CREATE TABLE IF NOT EXISTS users ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| username TEXT UNIQUE NOT NULL, |
| email TEXT DEFAULT '', |
| password_hash TEXT NOT NULL, |
| created_at REAL DEFAULT (strftime('%s','now')) |
| ); |
| |
| CREATE TABLE IF NOT EXISTS queries ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| user_id INTEGER REFERENCES users(id) ON DELETE CASCADE, |
| session_id TEXT, |
| question TEXT NOT NULL, |
| answer TEXT DEFAULT '', |
| context_preview TEXT DEFAULT '', |
| confidence REAL, |
| intent TEXT DEFAULT 'factual', |
| timestamp REAL DEFAULT (strftime('%s','now')) |
| ); |
| |
| CREATE TABLE IF NOT EXISTS url_cache ( |
| url_hash TEXT PRIMARY KEY, |
| url TEXT NOT NULL, |
| text TEXT NOT NULL, |
| cached_at REAL NOT NULL |
| ); |
| |
| CREATE TABLE IF NOT EXISTS comparisons ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| user_id INTEGER REFERENCES users(id) ON DELETE CASCADE, |
| urls_json TEXT NOT NULL, |
| question TEXT NOT NULL, |
| results_json TEXT, |
| timestamp REAL DEFAULT (strftime('%s','now')) |
| ); |
| |
| CREATE INDEX IF NOT EXISTS idx_queries_user ON queries(user_id); |
| CREATE INDEX IF NOT EXISTS idx_queries_session ON queries(session_id); |
| CREATE INDEX IF NOT EXISTS idx_cache_hash ON url_cache(url_hash); |
| """) |
| logger.info("Database initialised at %s", self.path) |
|
|
| |
| def create_user(self, username: str, email: str, password_hash: str) -> int: |
| with self._conn() as conn: |
| cur = conn.execute( |
| "INSERT INTO users (username, email, password_hash) VALUES (?,?,?)", |
| (username, email, password_hash), |
| ) |
| return cur.lastrowid |
|
|
| def get_user_by_username(self, username: str) -> dict | None: |
| with self._conn() as conn: |
| row = conn.execute( |
| "SELECT * FROM users WHERE username = ?", (username,) |
| ).fetchone() |
| return dict(row) if row else None |
|
|
| def get_user_by_id(self, user_id: int) -> dict | None: |
| with self._conn() as conn: |
| row = conn.execute( |
| "SELECT * FROM users WHERE id = ?", (user_id,) |
| ).fetchone() |
| return dict(row) if row else None |
|
|
| |
| def save_query(self, user_id, session_id, question, answer, |
| context_preview="", confidence=None, intent="factual"): |
| with self._conn() as conn: |
| conn.execute( |
| """INSERT INTO queries |
| (user_id, session_id, question, answer, context_preview, confidence, intent) |
| VALUES (?,?,?,?,?,?,?)""", |
| (user_id, session_id, question, answer, context_preview, confidence, intent), |
| ) |
|
|
| def get_user_history(self, user_id: int, limit: int = 200) -> list[dict]: |
| with self._conn() as conn: |
| rows = conn.execute( |
| """SELECT id, question, answer, confidence, intent, |
| datetime(timestamp, 'unixepoch', 'localtime') AS timestamp |
| FROM queries WHERE user_id = ? |
| ORDER BY timestamp DESC LIMIT ?""", |
| (user_id, limit), |
| ).fetchall() |
| return [dict(r) for r in rows] |
|
|
| def get_session_history(self, session_id: str, limit: int = 50) -> list[dict]: |
| with self._conn() as conn: |
| rows = conn.execute( |
| """SELECT question, answer, confidence, intent, |
| datetime(timestamp, 'unixepoch', 'localtime') AS timestamp |
| FROM queries WHERE session_id = ? |
| ORDER BY timestamp DESC LIMIT ?""", |
| (session_id, limit), |
| ).fetchall() |
| return [dict(r) for r in rows] |
|
|
| def delete_query(self, query_id: int, user_id: int): |
| with self._conn() as conn: |
| conn.execute( |
| "DELETE FROM queries WHERE id=? AND user_id=?", (query_id, user_id) |
| ) |
|
|
| |
| def get_cached_page(self, url: str) -> str | None: |
| key = _url_hash(url) |
| now = time.time() |
| with self._conn() as conn: |
| row = conn.execute( |
| "SELECT text, cached_at FROM url_cache WHERE url_hash=?", (key,) |
| ).fetchone() |
| if row and (now - row["cached_at"]) < CACHE_TTL_SECONDS: |
| return row["text"] |
| if row: |
| conn.execute("DELETE FROM url_cache WHERE url_hash=?", (key,)) |
| return None |
|
|
| def cache_page(self, url: str, text: str): |
| key = _url_hash(url) |
| with self._conn() as conn: |
| conn.execute( |
| """INSERT OR REPLACE INTO url_cache (url_hash, url, text, cached_at) |
| VALUES (?,?,?,?)""", |
| (key, url, text, time.time()), |
| ) |
|
|
| def clear_expired_cache(self): |
| cutoff = time.time() - CACHE_TTL_SECONDS |
| with self._conn() as conn: |
| conn.execute("DELETE FROM url_cache WHERE cached_at < ?", (cutoff,)) |
|
|
| |
| def save_comparison(self, user_id, urls_json, question, results_json): |
| with self._conn() as conn: |
| conn.execute( |
| """INSERT INTO comparisons (user_id, urls_json, question, results_json) |
| VALUES (?,?,?,?)""", |
| (user_id, urls_json, question, results_json), |
| ) |
|
|