#!/usr/bin/env python3 """Merge the five Search-UI SQLite databases into one. Search-UI currently keeps five separate SQLite files: database.db — subtitles + scripture + exclusions images_database.db — visual frame categories + embeddings faces_database.db — persons + face embeddings + metadata speakers_database.db — speaker reference + segment embeddings publications_database.db — processed publications + extracted images They never join across files; the split exists because the project grew one subsystem at a time. The 2026 refactor consolidates them into a single file so backups, WAL pragmas, schema bootstrap, and future migrations all happen once instead of five times. This tool handles all three table flavors Search-UI uses: * Regular tables — copied row-for-row with INSERT OR REPLACE, indices included. * vec0 virtual tables (sqlite-vec) — the embeddings: subtitle_embeddings, subtitle_chunk_embeddings, ad_transcription_embeddings, image_embeddings, face_embeddings, speaker_segment_embeddings, publication_image_embeddings. Recreated from their stored CREATE statement, then re-populated preserving rowid (the rowid links each embedding to its metadata row). * FTS5 virtual tables — the full-text indices: subtitles_fts, ad_transcriptions_fts, publication_image_text. Same approach. Shadow tables (vec0's *_chunks/*_rowids/*_vector_chunks00 and FTS5's *_data/*_idx/*_content) are NOT copied directly — recreating the parent virtual table recreates them, and copying the parent's logical rows repopulates them. Loading vec0 requires the sqlite-vec Python package. If a source contains vec0 tables and sqlite-vec can't be loaded, the tool refuses to run rather than silently dropping the embeddings. Steps: 1. Validate every source DB exists and is readable. 2. If any source has vec0 tables, require sqlite-vec to be importable. 3. Check for table-name collisions across sources. 4. Create the target DB (refuses to overwrite unless --force). 5. ATTACH each source; copy regular tables, then virtual tables (rowid-preserving), skipping shadow tables. 6. Verify logical row counts match between source and destination. 7. Leave the source files untouched. Operating at real scale (multi-GB embeddings): * The copy runs in a single transaction per source and commits once at the end, so the write-ahead log (the ``-wal`` file next to the output) can grow to roughly the size of the largest source before commit. Make sure the output directory has free space >= the size of your biggest source DB. RAM is not the constraint — copies are server-side ``INSERT ... SELECT`` over an ATTACHed source, not materialized in Python. * The per-table row counts are printed in the summary; compare them against the sources before deleting/archiving the originals. (The source files are never modified, so the originals remain your rollback path regardless.) * Schema scope: tables, indices, and vec0/FTS5 virtual tables are copied. The Search-UI schema defines no triggers or views, so none are needed; if that ever changes, this tool would need extending. Usage: python scripts/merge_databases.py --output ~/searchui-merged.db --dry-run python scripts/merge_databases.py --output ~/searchui-merged.db python scripts/merge_databases.py --output ~/searchui-merged.db --force After the merge, the user can flip the app to single-DB mode by setting: SEARCH_UI_SEARCH_DB_PATH=~/searchui-merged.db SEARCH_UI_IMAGE_DB_PATH=~/searchui-merged.db SEARCH_UI_FACE_DB_PATH=~/searchui-merged.db SEARCH_UI_SPEAKER_DB_PATH=~/searchui-merged.db SEARCH_UI_PUBLICATIONS_DB_PATH=~/searchui-merged.db The original DB files stay in place for at least two weeks of normal use as the rollback path. """ from __future__ import annotations import argparse import os import sqlite3 import sys from dataclasses import dataclass from pathlib import Path REPO_ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(REPO_ROOT / "backend")) def _try_load_sqlite_vec(conn: sqlite3.Connection) -> bool: """Load the sqlite-vec extension onto a connection. Returns success.""" try: import sqlite_vec except ImportError: return False try: conn.enable_load_extension(True) sqlite_vec.load(conn) return True except Exception: return False @dataclass(frozen=True) class SourceDb: """A database file feeding the merge.""" label: str env_var: str default_filename: str # Tables we expect to copy. Used for collision detection and row-count # verification. Empty means "copy whatever's there." known_tables: tuple[str, ...] SOURCES: tuple[SourceDb, ...] = ( SourceDb( label="search", env_var="SEARCH_UI_SEARCH_DB_PATH", default_filename="database.db", known_tables=( "subtitles_fts", "subtitle_embeddings", "subtitle_chunks", "subtitle_chunk_embeddings", "subtitle_embedding_status", "scripture_references", "video_exclusions", "exclusion_rules", "ad_transcriptions_fts", "ad_transcription_embeddings", "_schema_metadata", ), ), SourceDb( label="images", env_var="SEARCH_UI_IMAGE_DB_PATH", default_filename="images_database.db", known_tables=( "image_categories", "image_embeddings", "processed_videos", "_schema_metadata", ), ), SourceDb( label="faces", env_var="SEARCH_UI_FACE_DB_PATH", default_filename="faces_database.db", known_tables=( "persons", "person_embeddings", "face_embeddings", "face_metadata", "_schema_metadata", ), ), SourceDb( label="speakers", env_var="SEARCH_UI_SPEAKER_DB_PATH", default_filename="speakers_database.db", known_tables=( "speakers", "speaker_reference_embeddings", "speaker_segment_embeddings", "speaker_segment_metadata", "_schema_metadata", ), ), SourceDb( label="publications", env_var="SEARCH_UI_PUBLICATIONS_DB_PATH", default_filename="publications_database.db", known_tables=( "processed_publications", "images", "_schema_metadata", ), ), ) # Tables that may exist in multiple source DBs and should be kept distinct # in the merged file. The key is (source_label, source_table); the value is # the new name in the merged DB. App code that queries the renamed table must # be updated separately — the merge tool does NOT rewrite SQL in the app. # # As of 2026-05, no genuine name collisions exist across the five sources, # so this dict is empty. _schema_metadata is consolidated separately. COLLISION_RENAMES: dict[tuple[str, str], str] = {} def _resolve_source_path(source: SourceDb, db_dir: Path | None = None) -> Path: configured = os.environ.get(source.env_var) if configured: return Path(configured).resolve() if db_dir: return (db_dir / source.default_filename).resolve() return Path(source.default_filename).resolve() def _list_user_tables(conn: sqlite3.Connection) -> list[tuple[str, str]]: """Return [(name, type), ...] for every user-created object.""" rows = conn.execute( "SELECT name, type FROM sqlite_master " "WHERE name NOT LIKE 'sqlite_%' " "ORDER BY name" ).fetchall() return [(name, kind) for name, kind in rows] def detect_virtual_tables(present: list[tuple[SourceDb, Path]]) -> dict[str, list[str]]: """Return {label: [virtual_table_names]} for each source DB that has any.""" findings: dict[str, list[str]] = {} for source, path in present: with sqlite3.connect(str(path)) as conn: virtuals = [name for name, _sql in _virtual_tables(conn)] if virtuals: findings[source.label] = virtuals return findings def _is_virtual(sql: str | None) -> bool: return bool(sql) and sql.lstrip().upper().startswith("CREATE VIRTUAL") def _virtual_tables(conn: sqlite3.Connection, alias: str = "main") -> list[tuple[str, str]]: """Return [(name, create_sql), ...] for every virtual table in a DB/alias.""" rows = conn.execute( f"SELECT name, sql FROM {alias}.sqlite_master " "WHERE type = 'table' AND name NOT LIKE 'sqlite_%'" ).fetchall() return [(name, sql) for name, sql in rows if _is_virtual(sql)] def _uses_vec0(create_sql: str) -> bool: return "USING VEC0" in create_sql.upper() def shadow_table_names(virtual_names: list[str], all_table_names: list[str]) -> set[str]: """Return the set of shadow tables backing the given virtual tables. vec0 creates `_chunks`, `_rowids`, `_vector_chunks00`, etc. FTS5 creates `_data`, `_idx`, `_content`, etc. All share the `_` prefix. Recreating the parent virtual table recreates these, so they must be skipped by the regular-table copy. """ shadows: set[str] = set() virtual_set = set(virtual_names) for table in all_table_names: if table in virtual_set: continue for vt in virtual_names: if table.startswith(f"{vt}_"): shadows.add(table) break return shadows def detect_collisions(source_tables: dict[str, list[str]]) -> list[tuple[str, list[str]]]: """Return tables that appear in more than one source and aren't handled.""" seen: dict[str, list[str]] = {} for label, tables in source_tables.items(): for table in tables: seen.setdefault(table, []).append(label) collisions: list[tuple[str, list[str]]] = [] for table, labels in seen.items(): if len(labels) <= 1: continue if table == "_schema_metadata": continue # handled by consolidation if any((label, table) in COLLISION_RENAMES for label in labels): continue # handled by rename collisions.append((table, labels)) return collisions def _table_row_count(conn: sqlite3.Connection, table: str) -> int: try: return int(conn.execute(f"SELECT COUNT(*) FROM \"{table}\"").fetchone()[0]) except sqlite3.OperationalError: return -1 # virtual tables that don't support COUNT def plan_merge( *, db_dir: Path | None = None, require_all: bool = False, ) -> tuple[list[tuple[SourceDb, Path]], list[str]]: """Decide which sources we'll merge. Returns (present, missing_labels).""" present: list[tuple[SourceDb, Path]] = [] missing: list[str] = [] for source in SOURCES: path = _resolve_source_path(source, db_dir=db_dir) if path.is_file(): present.append((source, path)) else: missing.append(source.label) if require_all and missing: raise FileNotFoundError( f"Missing source databases: {missing}. Pass --skip-missing to ignore." ) return present, missing def _copy_regular_table( dst: sqlite3.Connection, source_table: str, dest_table: str, src_alias: str, ) -> int: """Copy a non-virtual table from the attached source. Returns rows copied.""" # Read schema via the attached source (no second connection -> no lock). create_sql_row = dst.execute( f"SELECT sql FROM {src_alias}.sqlite_master WHERE type='table' AND name = ?", (source_table,), ).fetchone() if not create_sql_row or not create_sql_row[0]: return 0 create_for_dest = create_sql_row[0] if dest_table != source_table: create_for_dest = create_for_dest.replace(source_table, dest_table) dst.execute(create_for_dest.replace("IF NOT EXISTS", "").strip()) col_rows = dst.execute(f"PRAGMA {src_alias}.table_info(\"{source_table}\")").fetchall() cols = [row[1] for row in col_rows] if not cols: return 0 col_list = ", ".join(f"\"{c}\"" for c in cols) dst.execute( f"INSERT OR REPLACE INTO \"{dest_table}\" ({col_list}) " f"SELECT {col_list} FROM {src_alias}.\"{source_table}\"" ) return _table_row_count(dst, dest_table) def _copy_indices(dst: sqlite3.Connection, src_alias: str, table: str) -> int: """Copy any non-auto indices defined on the source table.""" rows = dst.execute( f"SELECT name, sql FROM {src_alias}.sqlite_master " "WHERE type = 'index' AND tbl_name = ? AND sql IS NOT NULL", (table,), ).fetchall() copied = 0 for name, sql in rows: if name.startswith("sqlite_autoindex"): continue try: dst.execute(sql) copied += 1 except sqlite3.OperationalError: continue return copied def _copy_virtual_table( dst: sqlite3.Connection, create_sql: str, table: str, src_alias: str, ) -> int: """Recreate a vec0/FTS5 virtual table and copy its rows, preserving rowid.""" dst.execute(create_sql.replace("IF NOT EXISTS", "").strip()) col_rows = dst.execute(f"PRAGMA {src_alias}.table_info(\"{table}\")").fetchall() cols = [row[1] for row in col_rows] if not cols: # No user-declared columns we can address; fall back to rowid-only. dst.execute( f"INSERT INTO \"{table}\"(rowid) SELECT rowid FROM {src_alias}.\"{table}\"" ) return _table_row_count(dst, table) col_list = ", ".join(f"\"{c}\"" for c in cols) # The rowid is the docid/key that links embeddings and FTS rows to their # metadata tables — it MUST be preserved across the copy. We branch on # whether PRAGMA table_info already exposes the rowid as part of `cols`: # - rowid carried by cols: either an INTEGER PRIMARY KEY column aliases # rowid (e.g. publication_image_embeddings: vec0(image_id INTEGER # PRIMARY KEY, ...) — `row[5]` is the table_info pk flag), or rowid is # itself listed as a column (sqlite-vec lists it for implicit-rowid # vec0 tables). Copy the columns verbatim — naming an extra literal # `rowid` would duplicate it, and for a PK alias vec0 outright REJECTS # the name ("table ... has no column named rowid"). # - rowid NOT carried by cols (e.g. FTS5 tables list only their text # columns): name rowid explicitly so it is preserved. # Both branches preserve the rowid for every vec0/FTS5 shape, so this stays # correct even if a future sqlite-vec version changes which branch a given # table takes. rowid_in_columns = any(row[5] for row in col_rows) or "rowid" in cols if rowid_in_columns: dst.execute( f"INSERT INTO \"{table}\"({col_list}) " f"SELECT {col_list} FROM {src_alias}.\"{table}\"" ) else: dst.execute( f"INSERT INTO \"{table}\"(rowid, {col_list}) " f"SELECT rowid, {col_list} FROM {src_alias}.\"{table}\"" ) return _table_row_count(dst, table) def merge( sources: list[tuple[SourceDb, Path]], output: Path, *, dry_run: bool = False, ) -> dict[str, dict[str, int]]: """Execute the merge. Returns per-table row-count summary.""" summary: dict[str, dict[str, int]] = {} if dry_run: print(f"DRY RUN — would write merged DB to {output}") elif output.exists(): raise FileExistsError( f"Output already exists: {output}. Pass --force to overwrite." ) if dry_run: for source, path in sources: with sqlite3.connect(str(path)) as src: _try_load_sqlite_vec(src) summary[source.label] = { name: _table_row_count(src, name) for name, kind in _list_user_tables(src) if kind == "table" } return summary largest = max((p.stat().st_size for _s, p in sources), default=0) if largest > 512 * 1024 * 1024: # >512 MB — worth a heads-up print( f"Note: largest source is {largest / 1e9:.1f} GB. The merge runs in " f"one transaction; ensure {output.parent} has at least that much " f"free space for the -wal file before continuing.\n" ) dst = sqlite3.connect(str(output)) vec_loaded = _try_load_sqlite_vec(dst) try: dst.execute("PRAGMA journal_mode=WAL") for source, path in sources: print(f"Merging {source.label:14} from {path}") src_alias = f"src_{source.label}" dst.execute(f"ATTACH DATABASE ? AS {src_alias}", (str(path),)) virtual = _virtual_tables(dst, src_alias) virtual_names = [name for name, _ in virtual] all_table_names = [ name for name, kind in dst.execute( f"SELECT name, type FROM {src_alias}.sqlite_master " "WHERE type='table' AND name NOT LIKE 'sqlite_%'" ).fetchall() ] shadows = shadow_table_names(virtual_names, all_table_names) if any(_uses_vec0(sql) for _, sql in virtual) and not vec_loaded: raise RuntimeError( f"Source '{source.label}' has vec0 tables but the sqlite-vec " "extension could not be loaded. Install it (pip install sqlite-vec) " "and re-run, or the embeddings would be silently dropped." ) table_summary: dict[str, int] = {} # 1) Regular tables (skip virtual tables and their shadows). for table_name in all_table_names: if table_name in set(virtual_names) or table_name in shadows: continue dest_name = COLLISION_RENAMES.get( (source.label, table_name), table_name ) if dest_name == "_schema_metadata" and source.label != sources[0][0].label: # Copy _schema_metadata exactly once, from the first source # in `sources` (which preserves SOURCES canonical order # filtered to present DBs, so this is deterministic even # with --skip-missing). Alembic will supersede this table. continue copied = _copy_regular_table(dst, table_name, dest_name, src_alias) _copy_indices(dst, src_alias, table_name) table_summary[table_name] = copied # 2) Virtual tables (recreate + copy rows preserving rowid). for table_name, create_sql in virtual: copied = _copy_virtual_table(dst, create_sql, table_name, src_alias) kind = "vec0" if _uses_vec0(create_sql) else "fts5" table_summary[f"{table_name} [{kind}]"] = copied summary[source.label] = table_summary # Commit any open transaction before detaching, otherwise SQLite # refuses with "database X is locked". dst.commit() dst.execute(f"DETACH DATABASE {src_alias}") dst.commit() finally: dst.close() return summary def main() -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--output", required=True, type=Path, help="Path for the merged DB file.", ) parser.add_argument( "--db-dir", type=Path, default=None, help="Directory containing the source DBs. Defaults to env-driven paths.", ) parser.add_argument( "--dry-run", action="store_true", help="Inspect sources and report row counts without writing.", ) parser.add_argument( "--force", action="store_true", help="Overwrite an existing output DB.", ) parser.add_argument( "--skip-missing", action="store_true", help="Continue if some source DBs are not present.", ) args = parser.parse_args() present, missing = plan_merge(db_dir=args.db_dir, require_all=False) if missing and not args.skip_missing: print(f"Missing source databases: {missing}") print("Pass --skip-missing to merge only the present ones.") return 2 if missing: print(f"Skipping missing sources: {missing}") if not present: print("No source databases found.") return 2 print("Sources to merge:") for source, path in present: print(f" {source.label:14} {path}") print() # vec0 tables need sqlite-vec. Fail early with a clear message rather # than silently dropping embeddings mid-merge. virtual_findings = detect_virtual_tables(present) if virtual_findings: probe = sqlite3.connect(":memory:") vec_ok = _try_load_sqlite_vec(probe) probe.close() print("Virtual tables to reconstruct (vec0/FTS5):") for label, tables in virtual_findings.items(): print(f" {label}: {tables}") if not vec_ok: print( "\nRefusing to merge — sources contain virtual tables but the\n" "sqlite-vec extension could not be loaded. Install it with\n" " pip install sqlite-vec\n" "and re-run. (Without it, vec0 embeddings would be dropped.)" ) return 4 print() source_tables = { source.label: [ name for name, kind in _list_user_tables(sqlite3.connect(str(path))) if kind == "table" ] for source, path in present } collisions = detect_collisions(source_tables) if collisions: print("Refusing to merge — table-name collisions detected:") for table, labels in collisions: print(f" '{table}' present in: {labels}") print("Add an entry to COLLISION_RENAMES and re-run.") return 3 if args.force and args.output.exists(): args.output.unlink() summary = merge(present, args.output, dry_run=args.dry_run) print("\nSummary:") for label, tables in summary.items(): print(f" {label}:") for table, rows in sorted(tables.items()): note = "(virtual)" if rows == -1 else f"{rows:,} rows" print(f" {table:40} {note}") if args.dry_run: print("\nDry run complete. Re-run without --dry-run to write the merged DB.") else: print(f"\nMerged DB written to {args.output}") print("Original source DBs left untouched. Test the merged DB before deleting them.") return 0 if __name__ == "__main__": raise SystemExit(main())