jw-search / scripts /merge_databases.py
jw-tools's picture
deploy: latest main (lazy-ML cold start, durable launcher, web-image search, scene search) + full-app data refresh
7ea1851 verified
#!/usr/bin/env python3
"""Merge the five Search-UI SQLite databases into one.
Search-UI currently keeps five separate SQLite files:
database.db — subtitles + scripture + exclusions
images_database.db — visual frame categories + embeddings
faces_database.db — persons + face embeddings + metadata
speakers_database.db — speaker reference + segment embeddings
publications_database.db — processed publications + extracted images
They never join across files; the split exists because the project grew
one subsystem at a time. The 2026 refactor consolidates them into a
single file so backups, WAL pragmas, schema bootstrap, and future
migrations all happen once instead of five times.
This tool handles all three table flavors Search-UI uses:
* Regular tables — copied row-for-row with INSERT OR REPLACE, indices
included.
* vec0 virtual tables (sqlite-vec) — the embeddings: subtitle_embeddings,
subtitle_chunk_embeddings, ad_transcription_embeddings, image_embeddings,
face_embeddings, speaker_segment_embeddings, publication_image_embeddings.
Recreated from their stored CREATE statement, then re-populated
preserving rowid (the rowid links each embedding to its metadata row).
* FTS5 virtual tables — the full-text indices: subtitles_fts,
ad_transcriptions_fts, publication_image_text. Same approach.
Shadow tables (vec0's *_chunks/*_rowids/*_vector_chunks00 and FTS5's
*_data/*_idx/*_content) are NOT copied directly — recreating the parent
virtual table recreates them, and copying the parent's logical rows
repopulates them.
Loading vec0 requires the sqlite-vec Python package. If a source contains
vec0 tables and sqlite-vec can't be loaded, the tool refuses to run rather
than silently dropping the embeddings.
Steps:
1. Validate every source DB exists and is readable.
2. If any source has vec0 tables, require sqlite-vec to be importable.
3. Check for table-name collisions across sources.
4. Create the target DB (refuses to overwrite unless --force).
5. ATTACH each source; copy regular tables, then virtual tables
(rowid-preserving), skipping shadow tables.
6. Verify logical row counts match between source and destination.
7. Leave the source files untouched.
Operating at real scale (multi-GB embeddings):
* The copy runs in a single transaction per source and commits once at
the end, so the write-ahead log (the ``-wal`` file next to the output)
can grow to roughly the size of the largest source before commit.
Make sure the output directory has free space >= the size of your
biggest source DB. RAM is not the constraint — copies are server-side
``INSERT ... SELECT`` over an ATTACHed source, not materialized in
Python.
* The per-table row counts are printed in the summary; compare them
against the sources before deleting/archiving the originals. (The
source files are never modified, so the originals remain your rollback
path regardless.)
* Schema scope: tables, indices, and vec0/FTS5 virtual tables are copied.
The Search-UI schema defines no triggers or views, so none are needed;
if that ever changes, this tool would need extending.
Usage:
python scripts/merge_databases.py --output ~/searchui-merged.db --dry-run
python scripts/merge_databases.py --output ~/searchui-merged.db
python scripts/merge_databases.py --output ~/searchui-merged.db --force
After the merge, the user can flip the app to single-DB mode by setting:
SEARCH_UI_SEARCH_DB_PATH=~/searchui-merged.db
SEARCH_UI_IMAGE_DB_PATH=~/searchui-merged.db
SEARCH_UI_FACE_DB_PATH=~/searchui-merged.db
SEARCH_UI_SPEAKER_DB_PATH=~/searchui-merged.db
SEARCH_UI_PUBLICATIONS_DB_PATH=~/searchui-merged.db
The original DB files stay in place for at least two weeks of normal use
as the rollback path.
"""
from __future__ import annotations
import argparse
import os
import sqlite3
import sys
from dataclasses import dataclass
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(REPO_ROOT / "backend"))
def _try_load_sqlite_vec(conn: sqlite3.Connection) -> bool:
"""Load the sqlite-vec extension onto a connection. Returns success."""
try:
import sqlite_vec
except ImportError:
return False
try:
conn.enable_load_extension(True)
sqlite_vec.load(conn)
return True
except Exception:
return False
@dataclass(frozen=True)
class SourceDb:
"""A database file feeding the merge."""
label: str
env_var: str
default_filename: str
# Tables we expect to copy. Used for collision detection and row-count
# verification. Empty means "copy whatever's there."
known_tables: tuple[str, ...]
SOURCES: tuple[SourceDb, ...] = (
SourceDb(
label="search",
env_var="SEARCH_UI_SEARCH_DB_PATH",
default_filename="database.db",
known_tables=(
"subtitles_fts",
"subtitle_embeddings",
"subtitle_chunks",
"subtitle_chunk_embeddings",
"subtitle_embedding_status",
"scripture_references",
"video_exclusions",
"exclusion_rules",
"ad_transcriptions_fts",
"ad_transcription_embeddings",
"_schema_metadata",
),
),
SourceDb(
label="images",
env_var="SEARCH_UI_IMAGE_DB_PATH",
default_filename="images_database.db",
known_tables=(
"image_categories",
"image_embeddings",
"processed_videos",
"_schema_metadata",
),
),
SourceDb(
label="faces",
env_var="SEARCH_UI_FACE_DB_PATH",
default_filename="faces_database.db",
known_tables=(
"persons",
"person_embeddings",
"face_embeddings",
"face_metadata",
"_schema_metadata",
),
),
SourceDb(
label="speakers",
env_var="SEARCH_UI_SPEAKER_DB_PATH",
default_filename="speakers_database.db",
known_tables=(
"speakers",
"speaker_reference_embeddings",
"speaker_segment_embeddings",
"speaker_segment_metadata",
"_schema_metadata",
),
),
SourceDb(
label="publications",
env_var="SEARCH_UI_PUBLICATIONS_DB_PATH",
default_filename="publications_database.db",
known_tables=(
"processed_publications",
"images",
"_schema_metadata",
),
),
)
# Tables that may exist in multiple source DBs and should be kept distinct
# in the merged file. The key is (source_label, source_table); the value is
# the new name in the merged DB. App code that queries the renamed table must
# be updated separately — the merge tool does NOT rewrite SQL in the app.
#
# As of 2026-05, no genuine name collisions exist across the five sources,
# so this dict is empty. _schema_metadata is consolidated separately.
COLLISION_RENAMES: dict[tuple[str, str], str] = {}
def _resolve_source_path(source: SourceDb, db_dir: Path | None = None) -> Path:
configured = os.environ.get(source.env_var)
if configured:
return Path(configured).resolve()
if db_dir:
return (db_dir / source.default_filename).resolve()
return Path(source.default_filename).resolve()
def _list_user_tables(conn: sqlite3.Connection) -> list[tuple[str, str]]:
"""Return [(name, type), ...] for every user-created object."""
rows = conn.execute(
"SELECT name, type FROM sqlite_master "
"WHERE name NOT LIKE 'sqlite_%' "
"ORDER BY name"
).fetchall()
return [(name, kind) for name, kind in rows]
def detect_virtual_tables(present: list[tuple[SourceDb, Path]]) -> dict[str, list[str]]:
"""Return {label: [virtual_table_names]} for each source DB that has any."""
findings: dict[str, list[str]] = {}
for source, path in present:
with sqlite3.connect(str(path)) as conn:
virtuals = [name for name, _sql in _virtual_tables(conn)]
if virtuals:
findings[source.label] = virtuals
return findings
def _is_virtual(sql: str | None) -> bool:
return bool(sql) and sql.lstrip().upper().startswith("CREATE VIRTUAL")
def _virtual_tables(conn: sqlite3.Connection, alias: str = "main") -> list[tuple[str, str]]:
"""Return [(name, create_sql), ...] for every virtual table in a DB/alias."""
rows = conn.execute(
f"SELECT name, sql FROM {alias}.sqlite_master "
"WHERE type = 'table' AND name NOT LIKE 'sqlite_%'"
).fetchall()
return [(name, sql) for name, sql in rows if _is_virtual(sql)]
def _uses_vec0(create_sql: str) -> bool:
return "USING VEC0" in create_sql.upper()
def shadow_table_names(virtual_names: list[str], all_table_names: list[str]) -> set[str]:
"""Return the set of shadow tables backing the given virtual tables.
vec0 creates `<vt>_chunks`, `<vt>_rowids`, `<vt>_vector_chunks00`, etc.
FTS5 creates `<vt>_data`, `<vt>_idx`, `<vt>_content`, etc. All share the
`<virtual_table_name>_` prefix. Recreating the parent virtual table
recreates these, so they must be skipped by the regular-table copy.
"""
shadows: set[str] = set()
virtual_set = set(virtual_names)
for table in all_table_names:
if table in virtual_set:
continue
for vt in virtual_names:
if table.startswith(f"{vt}_"):
shadows.add(table)
break
return shadows
def detect_collisions(source_tables: dict[str, list[str]]) -> list[tuple[str, list[str]]]:
"""Return tables that appear in more than one source and aren't handled."""
seen: dict[str, list[str]] = {}
for label, tables in source_tables.items():
for table in tables:
seen.setdefault(table, []).append(label)
collisions: list[tuple[str, list[str]]] = []
for table, labels in seen.items():
if len(labels) <= 1:
continue
if table == "_schema_metadata":
continue # handled by consolidation
if any((label, table) in COLLISION_RENAMES for label in labels):
continue # handled by rename
collisions.append((table, labels))
return collisions
def _table_row_count(conn: sqlite3.Connection, table: str) -> int:
try:
return int(conn.execute(f"SELECT COUNT(*) FROM \"{table}\"").fetchone()[0])
except sqlite3.OperationalError:
return -1 # virtual tables that don't support COUNT
def plan_merge(
*,
db_dir: Path | None = None,
require_all: bool = False,
) -> tuple[list[tuple[SourceDb, Path]], list[str]]:
"""Decide which sources we'll merge. Returns (present, missing_labels)."""
present: list[tuple[SourceDb, Path]] = []
missing: list[str] = []
for source in SOURCES:
path = _resolve_source_path(source, db_dir=db_dir)
if path.is_file():
present.append((source, path))
else:
missing.append(source.label)
if require_all and missing:
raise FileNotFoundError(
f"Missing source databases: {missing}. Pass --skip-missing to ignore."
)
return present, missing
def _copy_regular_table(
dst: sqlite3.Connection,
source_table: str,
dest_table: str,
src_alias: str,
) -> int:
"""Copy a non-virtual table from the attached source. Returns rows copied."""
# Read schema via the attached source (no second connection -> no lock).
create_sql_row = dst.execute(
f"SELECT sql FROM {src_alias}.sqlite_master WHERE type='table' AND name = ?",
(source_table,),
).fetchone()
if not create_sql_row or not create_sql_row[0]:
return 0
create_for_dest = create_sql_row[0]
if dest_table != source_table:
create_for_dest = create_for_dest.replace(source_table, dest_table)
dst.execute(create_for_dest.replace("IF NOT EXISTS", "").strip())
col_rows = dst.execute(f"PRAGMA {src_alias}.table_info(\"{source_table}\")").fetchall()
cols = [row[1] for row in col_rows]
if not cols:
return 0
col_list = ", ".join(f"\"{c}\"" for c in cols)
dst.execute(
f"INSERT OR REPLACE INTO \"{dest_table}\" ({col_list}) "
f"SELECT {col_list} FROM {src_alias}.\"{source_table}\""
)
return _table_row_count(dst, dest_table)
def _copy_indices(dst: sqlite3.Connection, src_alias: str, table: str) -> int:
"""Copy any non-auto indices defined on the source table."""
rows = dst.execute(
f"SELECT name, sql FROM {src_alias}.sqlite_master "
"WHERE type = 'index' AND tbl_name = ? AND sql IS NOT NULL",
(table,),
).fetchall()
copied = 0
for name, sql in rows:
if name.startswith("sqlite_autoindex"):
continue
try:
dst.execute(sql)
copied += 1
except sqlite3.OperationalError:
continue
return copied
def _copy_virtual_table(
dst: sqlite3.Connection,
create_sql: str,
table: str,
src_alias: str,
) -> int:
"""Recreate a vec0/FTS5 virtual table and copy its rows, preserving rowid."""
dst.execute(create_sql.replace("IF NOT EXISTS", "").strip())
col_rows = dst.execute(f"PRAGMA {src_alias}.table_info(\"{table}\")").fetchall()
cols = [row[1] for row in col_rows]
if not cols:
# No user-declared columns we can address; fall back to rowid-only.
dst.execute(
f"INSERT INTO \"{table}\"(rowid) SELECT rowid FROM {src_alias}.\"{table}\""
)
return _table_row_count(dst, table)
col_list = ", ".join(f"\"{c}\"" for c in cols)
# The rowid is the docid/key that links embeddings and FTS rows to their
# metadata tables — it MUST be preserved across the copy. We branch on
# whether PRAGMA table_info already exposes the rowid as part of `cols`:
# - rowid carried by cols: either an INTEGER PRIMARY KEY column aliases
# rowid (e.g. publication_image_embeddings: vec0(image_id INTEGER
# PRIMARY KEY, ...) — `row[5]` is the table_info pk flag), or rowid is
# itself listed as a column (sqlite-vec lists it for implicit-rowid
# vec0 tables). Copy the columns verbatim — naming an extra literal
# `rowid` would duplicate it, and for a PK alias vec0 outright REJECTS
# the name ("table ... has no column named rowid").
# - rowid NOT carried by cols (e.g. FTS5 tables list only their text
# columns): name rowid explicitly so it is preserved.
# Both branches preserve the rowid for every vec0/FTS5 shape, so this stays
# correct even if a future sqlite-vec version changes which branch a given
# table takes.
rowid_in_columns = any(row[5] for row in col_rows) or "rowid" in cols
if rowid_in_columns:
dst.execute(
f"INSERT INTO \"{table}\"({col_list}) "
f"SELECT {col_list} FROM {src_alias}.\"{table}\""
)
else:
dst.execute(
f"INSERT INTO \"{table}\"(rowid, {col_list}) "
f"SELECT rowid, {col_list} FROM {src_alias}.\"{table}\""
)
return _table_row_count(dst, table)
def merge(
sources: list[tuple[SourceDb, Path]],
output: Path,
*,
dry_run: bool = False,
) -> dict[str, dict[str, int]]:
"""Execute the merge. Returns per-table row-count summary."""
summary: dict[str, dict[str, int]] = {}
if dry_run:
print(f"DRY RUN — would write merged DB to {output}")
elif output.exists():
raise FileExistsError(
f"Output already exists: {output}. Pass --force to overwrite."
)
if dry_run:
for source, path in sources:
with sqlite3.connect(str(path)) as src:
_try_load_sqlite_vec(src)
summary[source.label] = {
name: _table_row_count(src, name)
for name, kind in _list_user_tables(src)
if kind == "table"
}
return summary
largest = max((p.stat().st_size for _s, p in sources), default=0)
if largest > 512 * 1024 * 1024: # >512 MB — worth a heads-up
print(
f"Note: largest source is {largest / 1e9:.1f} GB. The merge runs in "
f"one transaction; ensure {output.parent} has at least that much "
f"free space for the -wal file before continuing.\n"
)
dst = sqlite3.connect(str(output))
vec_loaded = _try_load_sqlite_vec(dst)
try:
dst.execute("PRAGMA journal_mode=WAL")
for source, path in sources:
print(f"Merging {source.label:14} from {path}")
src_alias = f"src_{source.label}"
dst.execute(f"ATTACH DATABASE ? AS {src_alias}", (str(path),))
virtual = _virtual_tables(dst, src_alias)
virtual_names = [name for name, _ in virtual]
all_table_names = [
name
for name, kind in dst.execute(
f"SELECT name, type FROM {src_alias}.sqlite_master "
"WHERE type='table' AND name NOT LIKE 'sqlite_%'"
).fetchall()
]
shadows = shadow_table_names(virtual_names, all_table_names)
if any(_uses_vec0(sql) for _, sql in virtual) and not vec_loaded:
raise RuntimeError(
f"Source '{source.label}' has vec0 tables but the sqlite-vec "
"extension could not be loaded. Install it (pip install sqlite-vec) "
"and re-run, or the embeddings would be silently dropped."
)
table_summary: dict[str, int] = {}
# 1) Regular tables (skip virtual tables and their shadows).
for table_name in all_table_names:
if table_name in set(virtual_names) or table_name in shadows:
continue
dest_name = COLLISION_RENAMES.get(
(source.label, table_name), table_name
)
if dest_name == "_schema_metadata" and source.label != sources[0][0].label:
# Copy _schema_metadata exactly once, from the first source
# in `sources` (which preserves SOURCES canonical order
# filtered to present DBs, so this is deterministic even
# with --skip-missing). Alembic will supersede this table.
continue
copied = _copy_regular_table(dst, table_name, dest_name, src_alias)
_copy_indices(dst, src_alias, table_name)
table_summary[table_name] = copied
# 2) Virtual tables (recreate + copy rows preserving rowid).
for table_name, create_sql in virtual:
copied = _copy_virtual_table(dst, create_sql, table_name, src_alias)
kind = "vec0" if _uses_vec0(create_sql) else "fts5"
table_summary[f"{table_name} [{kind}]"] = copied
summary[source.label] = table_summary
# Commit any open transaction before detaching, otherwise SQLite
# refuses with "database X is locked".
dst.commit()
dst.execute(f"DETACH DATABASE {src_alias}")
dst.commit()
finally:
dst.close()
return summary
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--output",
required=True,
type=Path,
help="Path for the merged DB file.",
)
parser.add_argument(
"--db-dir",
type=Path,
default=None,
help="Directory containing the source DBs. Defaults to env-driven paths.",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Inspect sources and report row counts without writing.",
)
parser.add_argument(
"--force",
action="store_true",
help="Overwrite an existing output DB.",
)
parser.add_argument(
"--skip-missing",
action="store_true",
help="Continue if some source DBs are not present.",
)
args = parser.parse_args()
present, missing = plan_merge(db_dir=args.db_dir, require_all=False)
if missing and not args.skip_missing:
print(f"Missing source databases: {missing}")
print("Pass --skip-missing to merge only the present ones.")
return 2
if missing:
print(f"Skipping missing sources: {missing}")
if not present:
print("No source databases found.")
return 2
print("Sources to merge:")
for source, path in present:
print(f" {source.label:14} {path}")
print()
# vec0 tables need sqlite-vec. Fail early with a clear message rather
# than silently dropping embeddings mid-merge.
virtual_findings = detect_virtual_tables(present)
if virtual_findings:
probe = sqlite3.connect(":memory:")
vec_ok = _try_load_sqlite_vec(probe)
probe.close()
print("Virtual tables to reconstruct (vec0/FTS5):")
for label, tables in virtual_findings.items():
print(f" {label}: {tables}")
if not vec_ok:
print(
"\nRefusing to merge — sources contain virtual tables but the\n"
"sqlite-vec extension could not be loaded. Install it with\n"
" pip install sqlite-vec\n"
"and re-run. (Without it, vec0 embeddings would be dropped.)"
)
return 4
print()
source_tables = {
source.label: [
name for name, kind in _list_user_tables(sqlite3.connect(str(path))) if kind == "table"
]
for source, path in present
}
collisions = detect_collisions(source_tables)
if collisions:
print("Refusing to merge — table-name collisions detected:")
for table, labels in collisions:
print(f" '{table}' present in: {labels}")
print("Add an entry to COLLISION_RENAMES and re-run.")
return 3
if args.force and args.output.exists():
args.output.unlink()
summary = merge(present, args.output, dry_run=args.dry_run)
print("\nSummary:")
for label, tables in summary.items():
print(f" {label}:")
for table, rows in sorted(tables.items()):
note = "(virtual)" if rows == -1 else f"{rows:,} rows"
print(f" {table:40} {note}")
if args.dry_run:
print("\nDry run complete. Re-run without --dry-run to write the merged DB.")
else:
print(f"\nMerged DB written to {args.output}")
print("Original source DBs left untouched. Test the merged DB before deleting them.")
return 0
if __name__ == "__main__":
raise SystemExit(main())