File size: 22,957 Bytes
7ea1851
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
#!/usr/bin/env python3
"""Merge the five Search-UI SQLite databases into one.

Search-UI currently keeps five separate SQLite files:

    database.db              β€” subtitles + scripture + exclusions
    images_database.db       β€” visual frame categories + embeddings
    faces_database.db        β€” persons + face embeddings + metadata
    speakers_database.db     β€” speaker reference + segment embeddings
    publications_database.db β€” processed publications + extracted images

They never join across files; the split exists because the project grew
one subsystem at a time. The 2026 refactor consolidates them into a
single file so backups, WAL pragmas, schema bootstrap, and future
migrations all happen once instead of five times.

This tool handles all three table flavors Search-UI uses:

  * Regular tables β€” copied row-for-row with INSERT OR REPLACE, indices
    included.
  * vec0 virtual tables (sqlite-vec) β€” the embeddings: subtitle_embeddings,
    subtitle_chunk_embeddings, ad_transcription_embeddings, image_embeddings,
    face_embeddings, speaker_segment_embeddings, publication_image_embeddings.
    Recreated from their stored CREATE statement, then re-populated
    preserving rowid (the rowid links each embedding to its metadata row).
  * FTS5 virtual tables β€” the full-text indices: subtitles_fts,
    ad_transcriptions_fts, publication_image_text. Same approach.

Shadow tables (vec0's *_chunks/*_rowids/*_vector_chunks00 and FTS5's
*_data/*_idx/*_content) are NOT copied directly β€” recreating the parent
virtual table recreates them, and copying the parent's logical rows
repopulates them.

Loading vec0 requires the sqlite-vec Python package. If a source contains
vec0 tables and sqlite-vec can't be loaded, the tool refuses to run rather
than silently dropping the embeddings.

Steps:
  1. Validate every source DB exists and is readable.
  2. If any source has vec0 tables, require sqlite-vec to be importable.
  3. Check for table-name collisions across sources.
  4. Create the target DB (refuses to overwrite unless --force).
  5. ATTACH each source; copy regular tables, then virtual tables
     (rowid-preserving), skipping shadow tables.
  6. Verify logical row counts match between source and destination.
  7. Leave the source files untouched.

Operating at real scale (multi-GB embeddings):
  * The copy runs in a single transaction per source and commits once at
    the end, so the write-ahead log (the ``-wal`` file next to the output)
    can grow to roughly the size of the largest source before commit.
    Make sure the output directory has free space >= the size of your
    biggest source DB. RAM is not the constraint β€” copies are server-side
    ``INSERT ... SELECT`` over an ATTACHed source, not materialized in
    Python.
  * The per-table row counts are printed in the summary; compare them
    against the sources before deleting/archiving the originals. (The
    source files are never modified, so the originals remain your rollback
    path regardless.)
  * Schema scope: tables, indices, and vec0/FTS5 virtual tables are copied.
    The Search-UI schema defines no triggers or views, so none are needed;
    if that ever changes, this tool would need extending.

Usage:
    python scripts/merge_databases.py --output ~/searchui-merged.db --dry-run
    python scripts/merge_databases.py --output ~/searchui-merged.db
    python scripts/merge_databases.py --output ~/searchui-merged.db --force

After the merge, the user can flip the app to single-DB mode by setting:
    SEARCH_UI_SEARCH_DB_PATH=~/searchui-merged.db
    SEARCH_UI_IMAGE_DB_PATH=~/searchui-merged.db
    SEARCH_UI_FACE_DB_PATH=~/searchui-merged.db
    SEARCH_UI_SPEAKER_DB_PATH=~/searchui-merged.db
    SEARCH_UI_PUBLICATIONS_DB_PATH=~/searchui-merged.db

The original DB files stay in place for at least two weeks of normal use
as the rollback path.
"""

from __future__ import annotations

import argparse
import os
import sqlite3
import sys
from dataclasses import dataclass
from pathlib import Path

REPO_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(REPO_ROOT / "backend"))


def _try_load_sqlite_vec(conn: sqlite3.Connection) -> bool:
    """Load the sqlite-vec extension onto a connection. Returns success."""
    try:
        import sqlite_vec
    except ImportError:
        return False
    try:
        conn.enable_load_extension(True)
        sqlite_vec.load(conn)
        return True
    except Exception:
        return False


@dataclass(frozen=True)
class SourceDb:
    """A database file feeding the merge."""

    label: str
    env_var: str
    default_filename: str
    # Tables we expect to copy. Used for collision detection and row-count
    # verification. Empty means "copy whatever's there."
    known_tables: tuple[str, ...]


SOURCES: tuple[SourceDb, ...] = (
    SourceDb(
        label="search",
        env_var="SEARCH_UI_SEARCH_DB_PATH",
        default_filename="database.db",
        known_tables=(
            "subtitles_fts",
            "subtitle_chunks",
            "subtitle_chunk_embeddings",
            "subtitle_embedding_status",
            "scripture_references",
            "video_exclusions",
            "exclusion_rules",
            "ad_transcriptions_fts",
            "ad_transcription_embeddings",
            "_schema_metadata",
        ),
    ),
    SourceDb(
        label="images",
        env_var="SEARCH_UI_IMAGE_DB_PATH",
        default_filename="images_database.db",
        known_tables=(
            "image_categories",
            "image_embeddings",
            "processed_videos",
            "_schema_metadata",
        ),
    ),
    SourceDb(
        label="faces",
        env_var="SEARCH_UI_FACE_DB_PATH",
        default_filename="faces_database.db",
        known_tables=(
            "persons",
            "person_embeddings",
            "face_embeddings",
            "face_metadata",
            "_schema_metadata",
        ),
    ),
    SourceDb(
        label="speakers",
        env_var="SEARCH_UI_SPEAKER_DB_PATH",
        default_filename="speakers_database.db",
        known_tables=(
            "speakers",
            "speaker_reference_embeddings",
            "speaker_segment_embeddings",
            "speaker_segment_metadata",
            "_schema_metadata",
        ),
    ),
    SourceDb(
        label="publications",
        env_var="SEARCH_UI_PUBLICATIONS_DB_PATH",
        default_filename="publications_database.db",
        known_tables=(
            "processed_publications",
            "images",
            "_schema_metadata",
        ),
    ),
)

# Tables that may exist in multiple source DBs and should be kept distinct
# in the merged file. The key is (source_label, source_table); the value is
# the new name in the merged DB. App code that queries the renamed table must
# be updated separately β€” the merge tool does NOT rewrite SQL in the app.
#
# As of 2026-05, no genuine name collisions exist across the five sources,
# so this dict is empty. _schema_metadata is consolidated separately.
COLLISION_RENAMES: dict[tuple[str, str], str] = {}


def _resolve_source_path(source: SourceDb, db_dir: Path | None = None) -> Path:
    configured = os.environ.get(source.env_var)
    if configured:
        return Path(configured).resolve()
    if db_dir:
        return (db_dir / source.default_filename).resolve()
    return Path(source.default_filename).resolve()


def _list_user_tables(conn: sqlite3.Connection) -> list[tuple[str, str]]:
    """Return [(name, type), ...] for every user-created object."""
    rows = conn.execute(
        "SELECT name, type FROM sqlite_master "
        "WHERE name NOT LIKE 'sqlite_%' "
        "ORDER BY name"
    ).fetchall()
    return [(name, kind) for name, kind in rows]


def detect_virtual_tables(present: list[tuple[SourceDb, Path]]) -> dict[str, list[str]]:
    """Return {label: [virtual_table_names]} for each source DB that has any."""
    findings: dict[str, list[str]] = {}
    for source, path in present:
        with sqlite3.connect(str(path)) as conn:
            virtuals = [name for name, _sql in _virtual_tables(conn)]
            if virtuals:
                findings[source.label] = virtuals
    return findings


def _is_virtual(sql: str | None) -> bool:
    return bool(sql) and sql.lstrip().upper().startswith("CREATE VIRTUAL")


def _virtual_tables(conn: sqlite3.Connection, alias: str = "main") -> list[tuple[str, str]]:
    """Return [(name, create_sql), ...] for every virtual table in a DB/alias."""
    rows = conn.execute(
        f"SELECT name, sql FROM {alias}.sqlite_master "
        "WHERE type = 'table' AND name NOT LIKE 'sqlite_%'"
    ).fetchall()
    return [(name, sql) for name, sql in rows if _is_virtual(sql)]


def _uses_vec0(create_sql: str) -> bool:
    return "USING VEC0" in create_sql.upper()


def shadow_table_names(virtual_names: list[str], all_table_names: list[str]) -> set[str]:
    """Return the set of shadow tables backing the given virtual tables.

    vec0 creates `<vt>_chunks`, `<vt>_rowids`, `<vt>_vector_chunks00`, etc.
    FTS5 creates `<vt>_data`, `<vt>_idx`, `<vt>_content`, etc. All share the
    `<virtual_table_name>_` prefix. Recreating the parent virtual table
    recreates these, so they must be skipped by the regular-table copy.
    """
    shadows: set[str] = set()
    virtual_set = set(virtual_names)
    for table in all_table_names:
        if table in virtual_set:
            continue
        for vt in virtual_names:
            if table.startswith(f"{vt}_"):
                shadows.add(table)
                break
    return shadows


def detect_collisions(source_tables: dict[str, list[str]]) -> list[tuple[str, list[str]]]:
    """Return tables that appear in more than one source and aren't handled."""
    seen: dict[str, list[str]] = {}
    for label, tables in source_tables.items():
        for table in tables:
            seen.setdefault(table, []).append(label)
    collisions: list[tuple[str, list[str]]] = []
    for table, labels in seen.items():
        if len(labels) <= 1:
            continue
        if table == "_schema_metadata":
            continue  # handled by consolidation
        if any((label, table) in COLLISION_RENAMES for label in labels):
            continue  # handled by rename
        collisions.append((table, labels))
    return collisions


def _table_row_count(conn: sqlite3.Connection, table: str) -> int:
    try:
        return int(conn.execute(f"SELECT COUNT(*) FROM \"{table}\"").fetchone()[0])
    except sqlite3.OperationalError:
        return -1  # virtual tables that don't support COUNT


def plan_merge(
    *,
    db_dir: Path | None = None,
    require_all: bool = False,
) -> tuple[list[tuple[SourceDb, Path]], list[str]]:
    """Decide which sources we'll merge. Returns (present, missing_labels)."""
    present: list[tuple[SourceDb, Path]] = []
    missing: list[str] = []
    for source in SOURCES:
        path = _resolve_source_path(source, db_dir=db_dir)
        if path.is_file():
            present.append((source, path))
        else:
            missing.append(source.label)
    if require_all and missing:
        raise FileNotFoundError(
            f"Missing source databases: {missing}. Pass --skip-missing to ignore."
        )
    return present, missing


def _copy_regular_table(
    dst: sqlite3.Connection,
    source_table: str,
    dest_table: str,
    src_alias: str,
) -> int:
    """Copy a non-virtual table from the attached source. Returns rows copied."""
    # Read schema via the attached source (no second connection -> no lock).
    create_sql_row = dst.execute(
        f"SELECT sql FROM {src_alias}.sqlite_master WHERE type='table' AND name = ?",
        (source_table,),
    ).fetchone()
    if not create_sql_row or not create_sql_row[0]:
        return 0

    create_for_dest = create_sql_row[0]
    if dest_table != source_table:
        create_for_dest = create_for_dest.replace(source_table, dest_table)
    dst.execute(create_for_dest.replace("IF NOT EXISTS", "").strip())

    col_rows = dst.execute(f"PRAGMA {src_alias}.table_info(\"{source_table}\")").fetchall()
    cols = [row[1] for row in col_rows]
    if not cols:
        return 0
    col_list = ", ".join(f"\"{c}\"" for c in cols)
    dst.execute(
        f"INSERT OR REPLACE INTO \"{dest_table}\" ({col_list}) "
        f"SELECT {col_list} FROM {src_alias}.\"{source_table}\""
    )
    return _table_row_count(dst, dest_table)


def _copy_indices(dst: sqlite3.Connection, src_alias: str, table: str) -> int:
    """Copy any non-auto indices defined on the source table."""
    rows = dst.execute(
        f"SELECT name, sql FROM {src_alias}.sqlite_master "
        "WHERE type = 'index' AND tbl_name = ? AND sql IS NOT NULL",
        (table,),
    ).fetchall()
    copied = 0
    for name, sql in rows:
        if name.startswith("sqlite_autoindex"):
            continue
        try:
            dst.execute(sql)
            copied += 1
        except sqlite3.OperationalError:
            continue
    return copied


def _copy_virtual_table(
    dst: sqlite3.Connection,
    create_sql: str,
    table: str,
    src_alias: str,
) -> int:
    """Recreate a vec0/FTS5 virtual table and copy its rows, preserving rowid."""
    dst.execute(create_sql.replace("IF NOT EXISTS", "").strip())

    col_rows = dst.execute(f"PRAGMA {src_alias}.table_info(\"{table}\")").fetchall()
    cols = [row[1] for row in col_rows]
    if not cols:
        # No user-declared columns we can address; fall back to rowid-only.
        dst.execute(
            f"INSERT INTO \"{table}\"(rowid) SELECT rowid FROM {src_alias}.\"{table}\""
        )
        return _table_row_count(dst, table)

    col_list = ", ".join(f"\"{c}\"" for c in cols)
    # The rowid is the docid/key that links embeddings and FTS rows to their
    # metadata tables β€” it MUST be preserved across the copy. We branch on
    # whether PRAGMA table_info already exposes the rowid as part of `cols`:
    #   - rowid carried by cols: either an INTEGER PRIMARY KEY column aliases
    #     rowid (e.g. publication_image_embeddings: vec0(image_id INTEGER
    #     PRIMARY KEY, ...) β€” `row[5]` is the table_info pk flag), or rowid is
    #     itself listed as a column (sqlite-vec lists it for implicit-rowid
    #     vec0 tables). Copy the columns verbatim β€” naming an extra literal
    #     `rowid` would duplicate it, and for a PK alias vec0 outright REJECTS
    #     the name ("table ... has no column named rowid").
    #   - rowid NOT carried by cols (e.g. FTS5 tables list only their text
    #     columns): name rowid explicitly so it is preserved.
    # Both branches preserve the rowid for every vec0/FTS5 shape, so this stays
    # correct even if a future sqlite-vec version changes which branch a given
    # table takes.
    rowid_in_columns = any(row[5] for row in col_rows) or "rowid" in cols
    if rowid_in_columns:
        dst.execute(
            f"INSERT INTO \"{table}\"({col_list}) "
            f"SELECT {col_list} FROM {src_alias}.\"{table}\""
        )
    else:
        dst.execute(
            f"INSERT INTO \"{table}\"(rowid, {col_list}) "
            f"SELECT rowid, {col_list} FROM {src_alias}.\"{table}\""
        )
    return _table_row_count(dst, table)


def merge(
    sources: list[tuple[SourceDb, Path]],
    output: Path,
    *,
    dry_run: bool = False,
) -> dict[str, dict[str, int]]:
    """Execute the merge. Returns per-table row-count summary."""
    summary: dict[str, dict[str, int]] = {}

    if dry_run:
        print(f"DRY RUN β€” would write merged DB to {output}")
    elif output.exists():
        raise FileExistsError(
            f"Output already exists: {output}. Pass --force to overwrite."
        )

    if dry_run:
        for source, path in sources:
            with sqlite3.connect(str(path)) as src:
                _try_load_sqlite_vec(src)
                summary[source.label] = {
                    name: _table_row_count(src, name)
                    for name, kind in _list_user_tables(src)
                    if kind == "table"
                }
        return summary

    largest = max((p.stat().st_size for _s, p in sources), default=0)
    if largest > 512 * 1024 * 1024:  # >512 MB β€” worth a heads-up
        print(
            f"Note: largest source is {largest / 1e9:.1f} GB. The merge runs in "
            f"one transaction; ensure {output.parent} has at least that much "
            f"free space for the -wal file before continuing.\n"
        )

    dst = sqlite3.connect(str(output))
    vec_loaded = _try_load_sqlite_vec(dst)
    try:
        dst.execute("PRAGMA journal_mode=WAL")
        for source, path in sources:
            print(f"Merging {source.label:14} from {path}")
            src_alias = f"src_{source.label}"
            dst.execute(f"ATTACH DATABASE ? AS {src_alias}", (str(path),))

            virtual = _virtual_tables(dst, src_alias)
            virtual_names = [name for name, _ in virtual]
            all_table_names = [
                name
                for name, kind in dst.execute(
                    f"SELECT name, type FROM {src_alias}.sqlite_master "
                    "WHERE type='table' AND name NOT LIKE 'sqlite_%'"
                ).fetchall()
            ]
            shadows = shadow_table_names(virtual_names, all_table_names)

            if any(_uses_vec0(sql) for _, sql in virtual) and not vec_loaded:
                raise RuntimeError(
                    f"Source '{source.label}' has vec0 tables but the sqlite-vec "
                    "extension could not be loaded. Install it (pip install sqlite-vec) "
                    "and re-run, or the embeddings would be silently dropped."
                )

            table_summary: dict[str, int] = {}

            # 1) Regular tables (skip virtual tables and their shadows).
            for table_name in all_table_names:
                if table_name in set(virtual_names) or table_name in shadows:
                    continue
                dest_name = COLLISION_RENAMES.get(
                    (source.label, table_name), table_name
                )
                if dest_name == "_schema_metadata" and source.label != sources[0][0].label:
                    # Copy _schema_metadata exactly once, from the first source
                    # in `sources` (which preserves SOURCES canonical order
                    # filtered to present DBs, so this is deterministic even
                    # with --skip-missing). Alembic will supersede this table.
                    continue
                copied = _copy_regular_table(dst, table_name, dest_name, src_alias)
                _copy_indices(dst, src_alias, table_name)
                table_summary[table_name] = copied

            # 2) Virtual tables (recreate + copy rows preserving rowid).
            for table_name, create_sql in virtual:
                copied = _copy_virtual_table(dst, create_sql, table_name, src_alias)
                kind = "vec0" if _uses_vec0(create_sql) else "fts5"
                table_summary[f"{table_name} [{kind}]"] = copied

            summary[source.label] = table_summary
            # Commit any open transaction before detaching, otherwise SQLite
            # refuses with "database X is locked".
            dst.commit()
            dst.execute(f"DETACH DATABASE {src_alias}")
        dst.commit()
    finally:
        dst.close()

    return summary


def main() -> int:
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument(
        "--output",
        required=True,
        type=Path,
        help="Path for the merged DB file.",
    )
    parser.add_argument(
        "--db-dir",
        type=Path,
        default=None,
        help="Directory containing the source DBs. Defaults to env-driven paths.",
    )
    parser.add_argument(
        "--dry-run",
        action="store_true",
        help="Inspect sources and report row counts without writing.",
    )
    parser.add_argument(
        "--force",
        action="store_true",
        help="Overwrite an existing output DB.",
    )
    parser.add_argument(
        "--skip-missing",
        action="store_true",
        help="Continue if some source DBs are not present.",
    )
    args = parser.parse_args()

    present, missing = plan_merge(db_dir=args.db_dir, require_all=False)
    if missing and not args.skip_missing:
        print(f"Missing source databases: {missing}")
        print("Pass --skip-missing to merge only the present ones.")
        return 2
    if missing:
        print(f"Skipping missing sources: {missing}")
    if not present:
        print("No source databases found.")
        return 2

    print("Sources to merge:")
    for source, path in present:
        print(f"  {source.label:14} {path}")
    print()

    # vec0 tables need sqlite-vec. Fail early with a clear message rather
    # than silently dropping embeddings mid-merge.
    virtual_findings = detect_virtual_tables(present)
    if virtual_findings:
        probe = sqlite3.connect(":memory:")
        vec_ok = _try_load_sqlite_vec(probe)
        probe.close()
        print("Virtual tables to reconstruct (vec0/FTS5):")
        for label, tables in virtual_findings.items():
            print(f"  {label}: {tables}")
        if not vec_ok:
            print(
                "\nRefusing to merge β€” sources contain virtual tables but the\n"
                "sqlite-vec extension could not be loaded. Install it with\n"
                "  pip install sqlite-vec\n"
                "and re-run. (Without it, vec0 embeddings would be dropped.)"
            )
            return 4
        print()

    source_tables = {
        source.label: [
            name for name, kind in _list_user_tables(sqlite3.connect(str(path))) if kind == "table"
        ]
        for source, path in present
    }
    collisions = detect_collisions(source_tables)
    if collisions:
        print("Refusing to merge β€” table-name collisions detected:")
        for table, labels in collisions:
            print(f"  '{table}' present in: {labels}")
        print("Add an entry to COLLISION_RENAMES and re-run.")
        return 3

    if args.force and args.output.exists():
        args.output.unlink()

    summary = merge(present, args.output, dry_run=args.dry_run)

    print("\nSummary:")
    for label, tables in summary.items():
        print(f"  {label}:")
        for table, rows in sorted(tables.items()):
            note = "(virtual)" if rows == -1 else f"{rows:,} rows"
            print(f"    {table:40} {note}")
    if args.dry_run:
        print("\nDry run complete. Re-run without --dry-run to write the merged DB.")
    else:
        print(f"\nMerged DB written to {args.output}")
        print("Original source DBs left untouched. Test the merged DB before deleting them.")
    return 0


if __name__ == "__main__":
    raise SystemExit(main())