| |
| """Merge the five Search-UI SQLite databases into one. |
| |
| Search-UI currently keeps five separate SQLite files: |
| |
| database.db — subtitles + scripture + exclusions |
| images_database.db — visual frame categories + embeddings |
| faces_database.db — persons + face embeddings + metadata |
| speakers_database.db — speaker reference + segment embeddings |
| publications_database.db — processed publications + extracted images |
| |
| They never join across files; the split exists because the project grew |
| one subsystem at a time. The 2026 refactor consolidates them into a |
| single file so backups, WAL pragmas, schema bootstrap, and future |
| migrations all happen once instead of five times. |
| |
| This tool handles all three table flavors Search-UI uses: |
| |
| * Regular tables — copied row-for-row with INSERT OR REPLACE, indices |
| included. |
| * vec0 virtual tables (sqlite-vec) — the embeddings: subtitle_embeddings, |
| subtitle_chunk_embeddings, ad_transcription_embeddings, image_embeddings, |
| face_embeddings, speaker_segment_embeddings, publication_image_embeddings. |
| Recreated from their stored CREATE statement, then re-populated |
| preserving rowid (the rowid links each embedding to its metadata row). |
| * FTS5 virtual tables — the full-text indices: subtitles_fts, |
| ad_transcriptions_fts, publication_image_text. Same approach. |
| |
| Shadow tables (vec0's *_chunks/*_rowids/*_vector_chunks00 and FTS5's |
| *_data/*_idx/*_content) are NOT copied directly — recreating the parent |
| virtual table recreates them, and copying the parent's logical rows |
| repopulates them. |
| |
| Loading vec0 requires the sqlite-vec Python package. If a source contains |
| vec0 tables and sqlite-vec can't be loaded, the tool refuses to run rather |
| than silently dropping the embeddings. |
| |
| Steps: |
| 1. Validate every source DB exists and is readable. |
| 2. If any source has vec0 tables, require sqlite-vec to be importable. |
| 3. Check for table-name collisions across sources. |
| 4. Create the target DB (refuses to overwrite unless --force). |
| 5. ATTACH each source; copy regular tables, then virtual tables |
| (rowid-preserving), skipping shadow tables. |
| 6. Verify logical row counts match between source and destination. |
| 7. Leave the source files untouched. |
| |
| Operating at real scale (multi-GB embeddings): |
| * The copy runs in a single transaction per source and commits once at |
| the end, so the write-ahead log (the ``-wal`` file next to the output) |
| can grow to roughly the size of the largest source before commit. |
| Make sure the output directory has free space >= the size of your |
| biggest source DB. RAM is not the constraint — copies are server-side |
| ``INSERT ... SELECT`` over an ATTACHed source, not materialized in |
| Python. |
| * The per-table row counts are printed in the summary; compare them |
| against the sources before deleting/archiving the originals. (The |
| source files are never modified, so the originals remain your rollback |
| path regardless.) |
| * Schema scope: tables, indices, and vec0/FTS5 virtual tables are copied. |
| The Search-UI schema defines no triggers or views, so none are needed; |
| if that ever changes, this tool would need extending. |
| |
| Usage: |
| python scripts/merge_databases.py --output ~/searchui-merged.db --dry-run |
| python scripts/merge_databases.py --output ~/searchui-merged.db |
| python scripts/merge_databases.py --output ~/searchui-merged.db --force |
| |
| After the merge, the user can flip the app to single-DB mode by setting: |
| SEARCH_UI_SEARCH_DB_PATH=~/searchui-merged.db |
| SEARCH_UI_IMAGE_DB_PATH=~/searchui-merged.db |
| SEARCH_UI_FACE_DB_PATH=~/searchui-merged.db |
| SEARCH_UI_SPEAKER_DB_PATH=~/searchui-merged.db |
| SEARCH_UI_PUBLICATIONS_DB_PATH=~/searchui-merged.db |
| |
| The original DB files stay in place for at least two weeks of normal use |
| as the rollback path. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import os |
| import sqlite3 |
| import sys |
| from dataclasses import dataclass |
| from pathlib import Path |
|
|
| REPO_ROOT = Path(__file__).resolve().parent.parent |
| sys.path.insert(0, str(REPO_ROOT / "backend")) |
|
|
|
|
| def _try_load_sqlite_vec(conn: sqlite3.Connection) -> bool: |
| """Load the sqlite-vec extension onto a connection. Returns success.""" |
| try: |
| import sqlite_vec |
| except ImportError: |
| return False |
| try: |
| conn.enable_load_extension(True) |
| sqlite_vec.load(conn) |
| return True |
| except Exception: |
| return False |
|
|
|
|
| @dataclass(frozen=True) |
| class SourceDb: |
| """A database file feeding the merge.""" |
|
|
| label: str |
| env_var: str |
| default_filename: str |
| |
| |
| known_tables: tuple[str, ...] |
|
|
|
|
| SOURCES: tuple[SourceDb, ...] = ( |
| SourceDb( |
| label="search", |
| env_var="SEARCH_UI_SEARCH_DB_PATH", |
| default_filename="database.db", |
| known_tables=( |
| "subtitles_fts", |
| "subtitle_embeddings", |
| "subtitle_chunks", |
| "subtitle_chunk_embeddings", |
| "subtitle_embedding_status", |
| "scripture_references", |
| "video_exclusions", |
| "exclusion_rules", |
| "ad_transcriptions_fts", |
| "ad_transcription_embeddings", |
| "_schema_metadata", |
| ), |
| ), |
| SourceDb( |
| label="images", |
| env_var="SEARCH_UI_IMAGE_DB_PATH", |
| default_filename="images_database.db", |
| known_tables=( |
| "image_categories", |
| "image_embeddings", |
| "processed_videos", |
| "_schema_metadata", |
| ), |
| ), |
| SourceDb( |
| label="faces", |
| env_var="SEARCH_UI_FACE_DB_PATH", |
| default_filename="faces_database.db", |
| known_tables=( |
| "persons", |
| "person_embeddings", |
| "face_embeddings", |
| "face_metadata", |
| "_schema_metadata", |
| ), |
| ), |
| SourceDb( |
| label="speakers", |
| env_var="SEARCH_UI_SPEAKER_DB_PATH", |
| default_filename="speakers_database.db", |
| known_tables=( |
| "speakers", |
| "speaker_reference_embeddings", |
| "speaker_segment_embeddings", |
| "speaker_segment_metadata", |
| "_schema_metadata", |
| ), |
| ), |
| SourceDb( |
| label="publications", |
| env_var="SEARCH_UI_PUBLICATIONS_DB_PATH", |
| default_filename="publications_database.db", |
| known_tables=( |
| "processed_publications", |
| "images", |
| "_schema_metadata", |
| ), |
| ), |
| ) |
|
|
| |
| |
| |
| |
| |
| |
| |
| COLLISION_RENAMES: dict[tuple[str, str], str] = {} |
|
|
|
|
| def _resolve_source_path(source: SourceDb, db_dir: Path | None = None) -> Path: |
| configured = os.environ.get(source.env_var) |
| if configured: |
| return Path(configured).resolve() |
| if db_dir: |
| return (db_dir / source.default_filename).resolve() |
| return Path(source.default_filename).resolve() |
|
|
|
|
| def _list_user_tables(conn: sqlite3.Connection) -> list[tuple[str, str]]: |
| """Return [(name, type), ...] for every user-created object.""" |
| rows = conn.execute( |
| "SELECT name, type FROM sqlite_master " |
| "WHERE name NOT LIKE 'sqlite_%' " |
| "ORDER BY name" |
| ).fetchall() |
| return [(name, kind) for name, kind in rows] |
|
|
|
|
| def detect_virtual_tables(present: list[tuple[SourceDb, Path]]) -> dict[str, list[str]]: |
| """Return {label: [virtual_table_names]} for each source DB that has any.""" |
| findings: dict[str, list[str]] = {} |
| for source, path in present: |
| with sqlite3.connect(str(path)) as conn: |
| virtuals = [name for name, _sql in _virtual_tables(conn)] |
| if virtuals: |
| findings[source.label] = virtuals |
| return findings |
|
|
|
|
| def _is_virtual(sql: str | None) -> bool: |
| return bool(sql) and sql.lstrip().upper().startswith("CREATE VIRTUAL") |
|
|
|
|
| def _virtual_tables(conn: sqlite3.Connection, alias: str = "main") -> list[tuple[str, str]]: |
| """Return [(name, create_sql), ...] for every virtual table in a DB/alias.""" |
| rows = conn.execute( |
| f"SELECT name, sql FROM {alias}.sqlite_master " |
| "WHERE type = 'table' AND name NOT LIKE 'sqlite_%'" |
| ).fetchall() |
| return [(name, sql) for name, sql in rows if _is_virtual(sql)] |
|
|
|
|
| def _uses_vec0(create_sql: str) -> bool: |
| return "USING VEC0" in create_sql.upper() |
|
|
|
|
| def shadow_table_names(virtual_names: list[str], all_table_names: list[str]) -> set[str]: |
| """Return the set of shadow tables backing the given virtual tables. |
| |
| vec0 creates `<vt>_chunks`, `<vt>_rowids`, `<vt>_vector_chunks00`, etc. |
| FTS5 creates `<vt>_data`, `<vt>_idx`, `<vt>_content`, etc. All share the |
| `<virtual_table_name>_` prefix. Recreating the parent virtual table |
| recreates these, so they must be skipped by the regular-table copy. |
| """ |
| shadows: set[str] = set() |
| virtual_set = set(virtual_names) |
| for table in all_table_names: |
| if table in virtual_set: |
| continue |
| for vt in virtual_names: |
| if table.startswith(f"{vt}_"): |
| shadows.add(table) |
| break |
| return shadows |
|
|
|
|
| def detect_collisions(source_tables: dict[str, list[str]]) -> list[tuple[str, list[str]]]: |
| """Return tables that appear in more than one source and aren't handled.""" |
| seen: dict[str, list[str]] = {} |
| for label, tables in source_tables.items(): |
| for table in tables: |
| seen.setdefault(table, []).append(label) |
| collisions: list[tuple[str, list[str]]] = [] |
| for table, labels in seen.items(): |
| if len(labels) <= 1: |
| continue |
| if table == "_schema_metadata": |
| continue |
| if any((label, table) in COLLISION_RENAMES for label in labels): |
| continue |
| collisions.append((table, labels)) |
| return collisions |
|
|
|
|
| def _table_row_count(conn: sqlite3.Connection, table: str) -> int: |
| try: |
| return int(conn.execute(f"SELECT COUNT(*) FROM \"{table}\"").fetchone()[0]) |
| except sqlite3.OperationalError: |
| return -1 |
|
|
|
|
| def plan_merge( |
| *, |
| db_dir: Path | None = None, |
| require_all: bool = False, |
| ) -> tuple[list[tuple[SourceDb, Path]], list[str]]: |
| """Decide which sources we'll merge. Returns (present, missing_labels).""" |
| present: list[tuple[SourceDb, Path]] = [] |
| missing: list[str] = [] |
| for source in SOURCES: |
| path = _resolve_source_path(source, db_dir=db_dir) |
| if path.is_file(): |
| present.append((source, path)) |
| else: |
| missing.append(source.label) |
| if require_all and missing: |
| raise FileNotFoundError( |
| f"Missing source databases: {missing}. Pass --skip-missing to ignore." |
| ) |
| return present, missing |
|
|
|
|
| def _copy_regular_table( |
| dst: sqlite3.Connection, |
| source_table: str, |
| dest_table: str, |
| src_alias: str, |
| ) -> int: |
| """Copy a non-virtual table from the attached source. Returns rows copied.""" |
| |
| create_sql_row = dst.execute( |
| f"SELECT sql FROM {src_alias}.sqlite_master WHERE type='table' AND name = ?", |
| (source_table,), |
| ).fetchone() |
| if not create_sql_row or not create_sql_row[0]: |
| return 0 |
|
|
| create_for_dest = create_sql_row[0] |
| if dest_table != source_table: |
| create_for_dest = create_for_dest.replace(source_table, dest_table) |
| dst.execute(create_for_dest.replace("IF NOT EXISTS", "").strip()) |
|
|
| col_rows = dst.execute(f"PRAGMA {src_alias}.table_info(\"{source_table}\")").fetchall() |
| cols = [row[1] for row in col_rows] |
| if not cols: |
| return 0 |
| col_list = ", ".join(f"\"{c}\"" for c in cols) |
| dst.execute( |
| f"INSERT OR REPLACE INTO \"{dest_table}\" ({col_list}) " |
| f"SELECT {col_list} FROM {src_alias}.\"{source_table}\"" |
| ) |
| return _table_row_count(dst, dest_table) |
|
|
|
|
| def _copy_indices(dst: sqlite3.Connection, src_alias: str, table: str) -> int: |
| """Copy any non-auto indices defined on the source table.""" |
| rows = dst.execute( |
| f"SELECT name, sql FROM {src_alias}.sqlite_master " |
| "WHERE type = 'index' AND tbl_name = ? AND sql IS NOT NULL", |
| (table,), |
| ).fetchall() |
| copied = 0 |
| for name, sql in rows: |
| if name.startswith("sqlite_autoindex"): |
| continue |
| try: |
| dst.execute(sql) |
| copied += 1 |
| except sqlite3.OperationalError: |
| continue |
| return copied |
|
|
|
|
| def _copy_virtual_table( |
| dst: sqlite3.Connection, |
| create_sql: str, |
| table: str, |
| src_alias: str, |
| ) -> int: |
| """Recreate a vec0/FTS5 virtual table and copy its rows, preserving rowid.""" |
| dst.execute(create_sql.replace("IF NOT EXISTS", "").strip()) |
|
|
| col_rows = dst.execute(f"PRAGMA {src_alias}.table_info(\"{table}\")").fetchall() |
| cols = [row[1] for row in col_rows] |
| if not cols: |
| |
| dst.execute( |
| f"INSERT INTO \"{table}\"(rowid) SELECT rowid FROM {src_alias}.\"{table}\"" |
| ) |
| return _table_row_count(dst, table) |
|
|
| col_list = ", ".join(f"\"{c}\"" for c in cols) |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| rowid_in_columns = any(row[5] for row in col_rows) or "rowid" in cols |
| if rowid_in_columns: |
| dst.execute( |
| f"INSERT INTO \"{table}\"({col_list}) " |
| f"SELECT {col_list} FROM {src_alias}.\"{table}\"" |
| ) |
| else: |
| dst.execute( |
| f"INSERT INTO \"{table}\"(rowid, {col_list}) " |
| f"SELECT rowid, {col_list} FROM {src_alias}.\"{table}\"" |
| ) |
| return _table_row_count(dst, table) |
|
|
|
|
| def merge( |
| sources: list[tuple[SourceDb, Path]], |
| output: Path, |
| *, |
| dry_run: bool = False, |
| ) -> dict[str, dict[str, int]]: |
| """Execute the merge. Returns per-table row-count summary.""" |
| summary: dict[str, dict[str, int]] = {} |
|
|
| if dry_run: |
| print(f"DRY RUN — would write merged DB to {output}") |
| elif output.exists(): |
| raise FileExistsError( |
| f"Output already exists: {output}. Pass --force to overwrite." |
| ) |
|
|
| if dry_run: |
| for source, path in sources: |
| with sqlite3.connect(str(path)) as src: |
| _try_load_sqlite_vec(src) |
| summary[source.label] = { |
| name: _table_row_count(src, name) |
| for name, kind in _list_user_tables(src) |
| if kind == "table" |
| } |
| return summary |
|
|
| largest = max((p.stat().st_size for _s, p in sources), default=0) |
| if largest > 512 * 1024 * 1024: |
| print( |
| f"Note: largest source is {largest / 1e9:.1f} GB. The merge runs in " |
| f"one transaction; ensure {output.parent} has at least that much " |
| f"free space for the -wal file before continuing.\n" |
| ) |
|
|
| dst = sqlite3.connect(str(output)) |
| vec_loaded = _try_load_sqlite_vec(dst) |
| try: |
| dst.execute("PRAGMA journal_mode=WAL") |
| for source, path in sources: |
| print(f"Merging {source.label:14} from {path}") |
| src_alias = f"src_{source.label}" |
| dst.execute(f"ATTACH DATABASE ? AS {src_alias}", (str(path),)) |
|
|
| virtual = _virtual_tables(dst, src_alias) |
| virtual_names = [name for name, _ in virtual] |
| all_table_names = [ |
| name |
| for name, kind in dst.execute( |
| f"SELECT name, type FROM {src_alias}.sqlite_master " |
| "WHERE type='table' AND name NOT LIKE 'sqlite_%'" |
| ).fetchall() |
| ] |
| shadows = shadow_table_names(virtual_names, all_table_names) |
|
|
| if any(_uses_vec0(sql) for _, sql in virtual) and not vec_loaded: |
| raise RuntimeError( |
| f"Source '{source.label}' has vec0 tables but the sqlite-vec " |
| "extension could not be loaded. Install it (pip install sqlite-vec) " |
| "and re-run, or the embeddings would be silently dropped." |
| ) |
|
|
| table_summary: dict[str, int] = {} |
|
|
| |
| for table_name in all_table_names: |
| if table_name in set(virtual_names) or table_name in shadows: |
| continue |
| dest_name = COLLISION_RENAMES.get( |
| (source.label, table_name), table_name |
| ) |
| if dest_name == "_schema_metadata" and source.label != sources[0][0].label: |
| |
| |
| |
| |
| continue |
| copied = _copy_regular_table(dst, table_name, dest_name, src_alias) |
| _copy_indices(dst, src_alias, table_name) |
| table_summary[table_name] = copied |
|
|
| |
| for table_name, create_sql in virtual: |
| copied = _copy_virtual_table(dst, create_sql, table_name, src_alias) |
| kind = "vec0" if _uses_vec0(create_sql) else "fts5" |
| table_summary[f"{table_name} [{kind}]"] = copied |
|
|
| summary[source.label] = table_summary |
| |
| |
| dst.commit() |
| dst.execute(f"DETACH DATABASE {src_alias}") |
| dst.commit() |
| finally: |
| dst.close() |
|
|
| return summary |
|
|
|
|
| def main() -> int: |
| parser = argparse.ArgumentParser(description=__doc__) |
| parser.add_argument( |
| "--output", |
| required=True, |
| type=Path, |
| help="Path for the merged DB file.", |
| ) |
| parser.add_argument( |
| "--db-dir", |
| type=Path, |
| default=None, |
| help="Directory containing the source DBs. Defaults to env-driven paths.", |
| ) |
| parser.add_argument( |
| "--dry-run", |
| action="store_true", |
| help="Inspect sources and report row counts without writing.", |
| ) |
| parser.add_argument( |
| "--force", |
| action="store_true", |
| help="Overwrite an existing output DB.", |
| ) |
| parser.add_argument( |
| "--skip-missing", |
| action="store_true", |
| help="Continue if some source DBs are not present.", |
| ) |
| args = parser.parse_args() |
|
|
| present, missing = plan_merge(db_dir=args.db_dir, require_all=False) |
| if missing and not args.skip_missing: |
| print(f"Missing source databases: {missing}") |
| print("Pass --skip-missing to merge only the present ones.") |
| return 2 |
| if missing: |
| print(f"Skipping missing sources: {missing}") |
| if not present: |
| print("No source databases found.") |
| return 2 |
|
|
| print("Sources to merge:") |
| for source, path in present: |
| print(f" {source.label:14} {path}") |
| print() |
|
|
| |
| |
| virtual_findings = detect_virtual_tables(present) |
| if virtual_findings: |
| probe = sqlite3.connect(":memory:") |
| vec_ok = _try_load_sqlite_vec(probe) |
| probe.close() |
| print("Virtual tables to reconstruct (vec0/FTS5):") |
| for label, tables in virtual_findings.items(): |
| print(f" {label}: {tables}") |
| if not vec_ok: |
| print( |
| "\nRefusing to merge — sources contain virtual tables but the\n" |
| "sqlite-vec extension could not be loaded. Install it with\n" |
| " pip install sqlite-vec\n" |
| "and re-run. (Without it, vec0 embeddings would be dropped.)" |
| ) |
| return 4 |
| print() |
|
|
| source_tables = { |
| source.label: [ |
| name for name, kind in _list_user_tables(sqlite3.connect(str(path))) if kind == "table" |
| ] |
| for source, path in present |
| } |
| collisions = detect_collisions(source_tables) |
| if collisions: |
| print("Refusing to merge — table-name collisions detected:") |
| for table, labels in collisions: |
| print(f" '{table}' present in: {labels}") |
| print("Add an entry to COLLISION_RENAMES and re-run.") |
| return 3 |
|
|
| if args.force and args.output.exists(): |
| args.output.unlink() |
|
|
| summary = merge(present, args.output, dry_run=args.dry_run) |
|
|
| print("\nSummary:") |
| for label, tables in summary.items(): |
| print(f" {label}:") |
| for table, rows in sorted(tables.items()): |
| note = "(virtual)" if rows == -1 else f"{rows:,} rows" |
| print(f" {table:40} {note}") |
| if args.dry_run: |
| print("\nDry run complete. Re-run without --dry-run to write the merged DB.") |
| else: |
| print(f"\nMerged DB written to {args.output}") |
| print("Original source DBs left untouched. Test the merged DB before deleting them.") |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|