Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """Offline: convert Athena vocabulary export to DuckDB.""" | |
| from __future__ import annotations | |
| import argparse | |
| from pathlib import Path | |
| REPO_ROOT = Path(__file__).resolve().parents[2] | |
| DEFAULT_DUCKDB_PATH = REPO_ROOT / "data" / "derived" / "concepts.duckdb" | |
| def parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser(description="Build DuckDB from Athena vocabulary export.") | |
| parser.add_argument("--athena-dir", required=True, help="Directory containing Athena CSV/TSV files.") | |
| parser.add_argument( | |
| "--out", | |
| default=str(DEFAULT_DUCKDB_PATH), | |
| help=f"Output DuckDB path (default: {DEFAULT_DUCKDB_PATH}).", | |
| ) | |
| parser.add_argument("--overwrite", action="store_true", help="Overwrite existing DuckDB file.") | |
| parser.add_argument("--threads", type=int, default=None, help="DuckDB threads (default: auto).") | |
| parser.add_argument("--schema", default="main", help="Target schema name (default: main).") | |
| parser.add_argument("--sep", default="\t", help="Input delimiter (default: tab).") | |
| parser.add_argument("--encoding", default="UTF-8", help="Input encoding (default: UTF-8).") | |
| return parser.parse_args() | |
| def run(args: argparse.Namespace) -> int: | |
| try: | |
| from athena2duckdb import CSVOptions, load_vocab_dir, verify_row_counts | |
| except SyntaxError as exc: | |
| raise RuntimeError( | |
| "Failed to import athena2duckdb due upstream syntax error. " | |
| "Current workaround: use an already-built DuckDB file (for example a previous vocab.duckdb) " | |
| "and continue with scripts/offline/build_lancedb_index.py." | |
| ) from exc | |
| athena_dir = Path(args.athena_dir).expanduser().resolve() | |
| out_path = Path(args.out).expanduser().resolve() | |
| out_path.parent.mkdir(parents=True, exist_ok=True) | |
| if not athena_dir.exists(): | |
| raise FileNotFoundError(f"Athena directory does not exist: {athena_dir}") | |
| csv_options = CSVOptions(sep=args.sep, encoding=args.encoding) | |
| summary = load_vocab_dir( | |
| input_dir=athena_dir, | |
| out_path=out_path, | |
| csv_options=csv_options, | |
| overwrite=bool(args.overwrite), | |
| threads=args.threads, | |
| schema=args.schema, | |
| ) | |
| print(f"Loaded {len(summary.vocab_files)} tables into {summary.db_path} (schema {summary.schema}).") | |
| results = verify_row_counts( | |
| db_path=summary.db_path, | |
| vocab_files=summary.vocab_files, | |
| csv_options=csv_options, | |
| threads=args.threads, | |
| schema=summary.schema, | |
| ) | |
| mismatches = [result for result in results if not result.matches] | |
| for result in results: | |
| status = "OK" if result.matches else "MISMATCH" | |
| print( | |
| f"{status:9s} table={result.table_name:<25s} " | |
| f"csv_rows={result.csv_rows:,} table_rows={result.table_rows:,}" | |
| ) | |
| if mismatches: | |
| print(f"Found {len(mismatches)} row-count mismatches.") | |
| return 2 | |
| return 0 | |
| def main() -> int: | |
| args = parse_args() | |
| return run(args) | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |