import os import sqlite3 import re import threading import time from pathlib import Path from contextlib import contextmanager # SD_DB_DIR lets Docker users put the database in a mounted volume. _db_dir = os.environ.get("SD_DB_DIR") DB_PATH = ( Path(_db_dir) / "scripture_detector.db" if _db_dir else Path(__file__).resolve().parent / "scripture_detector.db" ) # ── cache-db (per-session in-memory) mode ──────────────────────────────────── # Enabled by SCRIPTURE_DETECTOR_CACHE_DB=1 (set by app.py when --cache-db is # passed). Each browser session gets its own isolated SQLite :memory: database. # Sessions are keyed by a UUID stored in the signed Flask session cookie and # cleaned up after SESSION_TTL seconds of inactivity. _CACHE_MODE: bool = os.environ.get("SCRIPTURE_DETECTOR_CACHE_DB", "") in ("1", "true", "yes") SESSION_TTL: int = int(os.environ.get("SD_SESSION_TTL", "3600")) # default 1 hour # Maps session_id → {conn, lock, last_used} _sessions: dict[str, dict] = {} _sessions_lock = threading.Lock() # Thread-local holds the current session_id (set by app.py before_request hook) session_local = threading.local() _SCHEMA_SQL = """ CREATE TABLE IF NOT EXISTS sources ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, text TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS quotes ( id INTEGER PRIMARY KEY AUTOINCREMENT, source_id INTEGER NOT NULL, span_start INTEGER, span_end INTEGER, quote_text TEXT NOT NULL, quote_type TEXT NOT NULL DEFAULT 'allusion' CHECK(quote_type IN ('full','partial','paraphrase','allusion')), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (source_id) REFERENCES sources(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS quote_references ( id INTEGER PRIMARY KEY AUTOINCREMENT, quote_id INTEGER NOT NULL, reference TEXT NOT NULL, book_code TEXT, FOREIGN KEY (quote_id) REFERENCES quotes(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS settings ( key TEXT PRIMARY KEY, value TEXT NOT NULL ); """ def _new_session_conn() -> sqlite3.Connection: """Create a fresh in-memory SQLite connection with the full schema.""" conn = sqlite3.connect(":memory:", check_same_thread=False) conn.row_factory = sqlite3.Row conn.execute("PRAGMA foreign_keys = ON") conn.executescript(_SCHEMA_SQL) conn.commit() return conn def _get_session_entry(session_id: str) -> dict: """Return (and lazily create) the session entry for *session_id*.""" with _sessions_lock: # Prune idle sessions now = time.monotonic() stale = [sid for sid, s in _sessions.items() if now - s["last_used"] > SESSION_TTL] for sid in stale: try: _sessions[sid]["conn"].close() except Exception: pass del _sessions[sid] if session_id not in _sessions: _sessions[session_id] = { "conn": _new_session_conn(), "lock": threading.Lock(), "last_used": now, } else: _sessions[session_id]["last_used"] = now return _sessions[session_id] @contextmanager def get_db(): if _CACHE_MODE: sid = getattr(session_local, "session_id", None) or "anonymous" entry = _get_session_entry(sid) with entry["lock"]: conn = entry["conn"] try: yield conn conn.commit() except Exception: conn.rollback() raise else: conn = sqlite3.connect(str(DB_PATH)) conn.row_factory = sqlite3.Row conn.execute("PRAGMA foreign_keys = ON") try: yield conn conn.commit() except Exception: conn.rollback() raise finally: conn.close() def init_db(): if _CACHE_MODE: # In cache mode each session's schema is initialised when its # connection is first created (_new_session_conn). Nothing to do here. return with get_db() as conn: conn.executescript(_SCHEMA_SQL) def extract_book_code(reference: str) -> str: match = re.match(r"^([a-z0-9]+)_", reference.strip().lower()) return match.group(1) if match else "" # ── Sources ────────────────────────────────────────────────────────────────── def create_source(name: str, text: str) -> int: with get_db() as conn: cur = conn.execute( "INSERT INTO sources (name, text) VALUES (?, ?)", (name, text) ) return cur.lastrowid def get_source(source_id: int) -> dict | None: with get_db() as conn: row = conn.execute( "SELECT * FROM sources WHERE id = ?", (source_id,) ).fetchone() return dict(row) if row else None def get_all_sources() -> list[dict]: with get_db() as conn: rows = conn.execute( """SELECT s.id, s.name, s.created_at, s.updated_at, LENGTH(s.text) as text_length, COUNT(q.id) as quote_count FROM sources s LEFT JOIN quotes q ON q.source_id = s.id GROUP BY s.id ORDER BY s.created_at DESC""" ).fetchall() return [dict(r) for r in rows] def delete_source(source_id: int): with get_db() as conn: conn.execute("DELETE FROM sources WHERE id = ?", (source_id,)) # ── Quotes ─────────────────────────────────────────────────────────────────── def add_quote( source_id: int, span_start: int | None, span_end: int | None, quote_text: str, quote_type: str, references: list[str], ) -> int: with get_db() as conn: cur = conn.execute( """INSERT INTO quotes (source_id, span_start, span_end, quote_text, quote_type) VALUES (?, ?, ?, ?, ?)""", (source_id, span_start, span_end, quote_text, quote_type), ) quote_id = cur.lastrowid for ref in references: ref_clean = ref.strip().lower() book = extract_book_code(ref_clean) conn.execute( "INSERT INTO quote_references (quote_id, reference, book_code) VALUES (?, ?, ?)", (quote_id, ref_clean, book), ) return quote_id def update_quote( quote_id: int, quote_text: str = None, quote_type: str = None, span_start: int = None, span_end: int = None, references: list[str] = None, ): with get_db() as conn: if quote_text is not None: conn.execute( "UPDATE quotes SET quote_text=?, updated_at=CURRENT_TIMESTAMP WHERE id=?", (quote_text, quote_id), ) if quote_type is not None: conn.execute( "UPDATE quotes SET quote_type=?, updated_at=CURRENT_TIMESTAMP WHERE id=?", (quote_type, quote_id), ) if span_start is not None: conn.execute( "UPDATE quotes SET span_start=?, updated_at=CURRENT_TIMESTAMP WHERE id=?", (span_start, quote_id), ) if span_end is not None: conn.execute( "UPDATE quotes SET span_end=?, updated_at=CURRENT_TIMESTAMP WHERE id=?", (span_end, quote_id), ) if references is not None: conn.execute("DELETE FROM quote_references WHERE quote_id=?", (quote_id,)) for ref in references: ref_clean = ref.strip().lower() book = extract_book_code(ref_clean) conn.execute( "INSERT INTO quote_references (quote_id, reference, book_code) VALUES (?, ?, ?)", (quote_id, ref_clean, book), ) def delete_quote(quote_id: int): with get_db() as conn: conn.execute("DELETE FROM quotes WHERE id = ?", (quote_id,)) def get_quotes_for_source(source_id: int) -> list[dict]: with get_db() as conn: quotes = conn.execute( "SELECT * FROM quotes WHERE source_id = ? ORDER BY span_start", (source_id,), ).fetchall() result = [] for q in quotes: qd = dict(q) refs = conn.execute( "SELECT reference, book_code FROM quote_references WHERE quote_id = ?", (q["id"],), ).fetchall() qd["references"] = [dict(r) for r in refs] result.append(qd) return result def delete_quotes_for_source(source_id: int): with get_db() as conn: conn.execute("DELETE FROM quotes WHERE source_id = ?", (source_id,)) def delete_quotes_in_range(source_id: int, start: int, end: int): with get_db() as conn: conn.execute( """DELETE FROM quotes WHERE source_id = ? AND span_start IS NOT NULL AND span_end IS NOT NULL AND NOT (span_end <= ? OR span_start >= ?)""", (source_id, start, end), ) # ── Settings ───────────────────────────────────────────────────────────────── def get_setting(key: str, default: str = None) -> str | None: with get_db() as conn: row = conn.execute("SELECT value FROM settings WHERE key=?", (key,)).fetchone() return row["value"] if row else default def set_setting(key: str, value: str): with get_db() as conn: conn.execute( "INSERT INTO settings (key, value) VALUES (?, ?) " "ON CONFLICT(key) DO UPDATE SET value=excluded.value", (key, value), ) def get_all_settings() -> dict: with get_db() as conn: rows = conn.execute("SELECT key, value FROM settings").fetchall() return {r["key"]: r["value"] for r in rows} # ── Analytics ──────────────────────────────────────────────────────────────── def get_book_distribution(source_id: int = None) -> list[dict]: with get_db() as conn: if source_id: rows = conn.execute( """SELECT qr.book_code, COUNT(*) as count FROM quote_references qr JOIN quotes q ON qr.quote_id = q.id WHERE q.source_id = ? AND qr.book_code != '' GROUP BY qr.book_code ORDER BY count DESC""", (source_id,), ).fetchall() else: rows = conn.execute( """SELECT qr.book_code, COUNT(*) as count FROM quote_references qr JOIN quotes q ON qr.quote_id = q.id WHERE qr.book_code != '' GROUP BY qr.book_code ORDER BY count DESC""" ).fetchall() return [dict(r) for r in rows] def get_quote_type_distribution(source_id: int = None) -> dict: with get_db() as conn: if source_id: rows = conn.execute( "SELECT quote_type, COUNT(*) as count FROM quotes WHERE source_id=? GROUP BY quote_type", (source_id,), ).fetchall() else: rows = conn.execute( "SELECT quote_type, COUNT(*) as count FROM quotes GROUP BY quote_type" ).fetchall() return {r["quote_type"]: r["count"] for r in rows} def search_sources(filters: list[dict], logic: str = "AND") -> dict: """ Search sources by text content and/or scripture references. filters: list of dicts, each with: - type: 'text' | 'book' | 'chapter' | 'verse' - value: the query string (already lowercased by caller) logic: 'AND' (source must match all filters) | 'OR' (source must match any filter) Returns {'total': int, 'results': [enriched source dicts]} """ valid = [f for f in filters if f.get("value", "").strip()] if not valid: return {"total": 0, "results": []} with get_db() as conn: # --- per-filter: map source_id → list of evidence dicts --------------- filter_evidence: list[dict[int, list[dict]]] = [] for f in valid: ftype = f["type"] fval = f["value"].strip().lower() ev: dict[int, list[dict]] = {} if ftype == "text": rows = conn.execute( "SELECT id, name, text FROM sources " "WHERE lower(text) LIKE ? OR lower(name) LIKE ?", [f"%{fval}%", f"%{fval}%"], ).fetchall() for r in rows: full = r["text"] idx = full.lower().find(fval) if idx >= 0: s0 = max(0, idx - 100) s1 = min(len(full), idx + len(fval) + 100) pre = ("…" if s0 > 0 else "") post = ("…" if s1 < len(full) else "") snippet = pre + full[s0:s1] + post else: # Query matched the source name, not the text body snippet = r["name"] ev[r["id"]] = ev.get(r["id"], []) ev[r["id"]].append({ "kind": "text", "snippet": snippet, "query": f["value"], "offset": idx, }) elif ftype == "book": rows = conn.execute( """SELECT DISTINCT q.source_id, qr.reference, q.quote_text, q.quote_type FROM quotes q JOIN quote_references qr ON qr.quote_id = q.id WHERE qr.book_code = ?""", [fval], ).fetchall() for r in rows: sid = r["source_id"] ev.setdefault(sid, []) ev[sid].append({ "kind": "ref", "reference": r["reference"], "quote_text": r["quote_text"][:120], "quote_type": r["quote_type"], }) elif ftype == "chapter": # fval is e.g. "gen_1" rows = conn.execute( """SELECT DISTINCT q.source_id, qr.reference, q.quote_text, q.quote_type FROM quotes q JOIN quote_references qr ON qr.quote_id = q.id WHERE qr.reference LIKE ?""", [f"{fval}:%"], ).fetchall() for r in rows: sid = r["source_id"] ev.setdefault(sid, []) ev[sid].append({ "kind": "ref", "reference": r["reference"], "quote_text": r["quote_text"][:120], "quote_type": r["quote_type"], }) elif ftype == "verse": # fval is e.g. "gen_1:1" rows = conn.execute( """SELECT DISTINCT q.source_id, qr.reference, q.quote_text, q.quote_type FROM quotes q JOIN quote_references qr ON qr.quote_id = q.id WHERE qr.reference = ?""", [fval], ).fetchall() for r in rows: sid = r["source_id"] ev.setdefault(sid, []) ev[sid].append({ "kind": "ref", "reference": r["reference"], "quote_text": r["quote_text"][:120], "quote_type": r["quote_type"], }) filter_evidence.append(ev) # --- combine filter result sets --------------------------------------- id_sets = [set(ev.keys()) for ev in filter_evidence] if logic == "AND": matching: set[int] = id_sets[0] for s in id_sets[1:]: matching &= s else: matching = set() for s in id_sets: matching |= s if not matching: return {"total": 0, "results": []} # --- fetch source rows with aggregate stats -------------------------- placeholders = ",".join("?" * len(matching)) src_rows = conn.execute( f"""SELECT s.id, s.name, s.created_at, COUNT(q.id) as quote_count FROM sources s LEFT JOIN quotes q ON q.source_id = s.id WHERE s.id IN ({placeholders}) GROUP BY s.id ORDER BY s.created_at DESC""", list(matching), ).fetchall() results = [] for src in src_rows: sd = dict(src) # type distribution td_rows = conn.execute( "SELECT quote_type, COUNT(*) as c FROM quotes " "WHERE source_id=? GROUP BY quote_type", [sd["id"]], ).fetchall() sd["type_distribution"] = {r["quote_type"]: r["c"] for r in td_rows} # book distribution (top 5) bd_rows = conn.execute( """SELECT qr.book_code, COUNT(*) as count FROM quote_references qr JOIN quotes q ON qr.quote_id = q.id WHERE q.source_id = ? AND qr.book_code != '' GROUP BY qr.book_code ORDER BY count DESC LIMIT 5""", [sd["id"]], ).fetchall() sd["book_distribution"] = [dict(r) for r in bd_rows] # aggregate evidence from all matching filters, deduplicate refs all_ev: list[dict] = [] seen_refs: set[str] = set() for ev in filter_evidence: if sd["id"] in ev: for item in ev[sd["id"]]: if item["kind"] == "ref": key = item["reference"] if key in seen_refs: continue seen_refs.add(key) all_ev.append(item) sd["match_evidence"] = all_ev[:10] results.append(sd) return {"total": len(results), "results": results} def get_dashboard_data() -> dict: with get_db() as conn: source_count = conn.execute("SELECT COUNT(*) as c FROM sources").fetchone()["c"] quote_count = conn.execute("SELECT COUNT(*) as c FROM quotes").fetchone()["c"] ref_count = conn.execute("SELECT COUNT(*) as c FROM quote_references").fetchone()["c"] sources = conn.execute( """SELECT s.id, s.name, s.created_at, COUNT(q.id) as quote_count FROM sources s LEFT JOIN quotes q ON q.source_id = s.id GROUP BY s.id ORDER BY s.created_at DESC""" ).fetchall() return { "source_count": source_count, "quote_count": quote_count, "reference_count": ref_count, "sources": [dict(s) for s in sources], }