THIRAWAT-mapper-demo / scripts /offline /build_duckdb.py
na399's picture
Deploy THIRAWAT mapper app
25c66a0 verified
#!/usr/bin/env python3
"""Offline: convert Athena vocabulary export to DuckDB."""
from __future__ import annotations
import argparse
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parents[2]
DEFAULT_DUCKDB_PATH = REPO_ROOT / "data" / "derived" / "concepts.duckdb"
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Build DuckDB from Athena vocabulary export.")
parser.add_argument("--athena-dir", required=True, help="Directory containing Athena CSV/TSV files.")
parser.add_argument(
"--out",
default=str(DEFAULT_DUCKDB_PATH),
help=f"Output DuckDB path (default: {DEFAULT_DUCKDB_PATH}).",
)
parser.add_argument("--overwrite", action="store_true", help="Overwrite existing DuckDB file.")
parser.add_argument("--threads", type=int, default=None, help="DuckDB threads (default: auto).")
parser.add_argument("--schema", default="main", help="Target schema name (default: main).")
parser.add_argument("--sep", default="\t", help="Input delimiter (default: tab).")
parser.add_argument("--encoding", default="UTF-8", help="Input encoding (default: UTF-8).")
return parser.parse_args()
def run(args: argparse.Namespace) -> int:
try:
from athena2duckdb import CSVOptions, load_vocab_dir, verify_row_counts
except SyntaxError as exc:
raise RuntimeError(
"Failed to import athena2duckdb due upstream syntax error. "
"Current workaround: use an already-built DuckDB file (for example a previous vocab.duckdb) "
"and continue with scripts/offline/build_lancedb_index.py."
) from exc
athena_dir = Path(args.athena_dir).expanduser().resolve()
out_path = Path(args.out).expanduser().resolve()
out_path.parent.mkdir(parents=True, exist_ok=True)
if not athena_dir.exists():
raise FileNotFoundError(f"Athena directory does not exist: {athena_dir}")
csv_options = CSVOptions(sep=args.sep, encoding=args.encoding)
summary = load_vocab_dir(
input_dir=athena_dir,
out_path=out_path,
csv_options=csv_options,
overwrite=bool(args.overwrite),
threads=args.threads,
schema=args.schema,
)
print(f"Loaded {len(summary.vocab_files)} tables into {summary.db_path} (schema {summary.schema}).")
results = verify_row_counts(
db_path=summary.db_path,
vocab_files=summary.vocab_files,
csv_options=csv_options,
threads=args.threads,
schema=summary.schema,
)
mismatches = [result for result in results if not result.matches]
for result in results:
status = "OK" if result.matches else "MISMATCH"
print(
f"{status:9s} table={result.table_name:<25s} "
f"csv_rows={result.csv_rows:,} table_rows={result.table_rows:,}"
)
if mismatches:
print(f"Found {len(mismatches)} row-count mismatches.")
return 2
return 0
def main() -> int:
args = parse_args()
return run(args)
if __name__ == "__main__":
raise SystemExit(main())