open-navigator / scripts /data /consolidate_partitioned_datasets.py
jcbowyer's picture
Deploy: Consolidated gold tables, fixed nginx docs routing
896453f verified
#!/usr/bin/env python3
"""
Consolidate partitioned datasets back into single Parquet files.
This script reads partitioned datasets (state=AL/, state=CA/, etc.) and
combines them into single consolidated files that work with HuggingFace datasets.
Usage:
python scripts/consolidate_partitioned_datasets.py
"""
import pyarrow.parquet as pq
import pyarrow as pa
from pathlib import Path
import shutil
import sys
def consolidate_dataset(partitioned_dir: Path, output_file: Path) -> None:
"""
Read a partitioned dataset and write it as a single consolidated file.
Args:
partitioned_dir: Path to partitioned dataset directory
output_file: Path to output consolidated parquet file
"""
print(f"\n{'='*70}")
print(f"Consolidating: {partitioned_dir.name}")
print(f"{'='*70}")
if not partitioned_dir.exists():
print(f"⚠️ Directory not found: {partitioned_dir}")
return
try:
# Read the entire partitioned dataset using PyArrow
# This will handle any schema inconsistencies by taking the union of schemas
print(f"πŸ“– Reading partitioned data from {partitioned_dir}...")
dataset = pq.ParquetDataset(str(partitioned_dir), use_legacy_dataset=False)
table = dataset.read()
print(f"βœ… Loaded {len(table):,} rows")
print(f"πŸ“Š Schema: {table.schema}")
print(f"πŸ’Ύ Memory size: {table.nbytes / 1024 / 1024:.1f} MB")
# Write consolidated file
print(f"πŸ’Ύ Writing consolidated file to {output_file}...")
pq.write_table(
table,
output_file,
compression='snappy',
use_dictionary=True,
write_statistics=True
)
file_size = output_file.stat().st_size / 1024 / 1024
print(f"βœ… Wrote {output_file.name} ({file_size:.1f} MB)")
except Exception as e:
print(f"❌ Error consolidating {partitioned_dir.name}: {e}")
print(f" Will try reading with schema unification...")
try:
# Alternative approach: read all parquet files and concatenate
parquet_files = list(partitioned_dir.rglob("*.parquet"))
if not parquet_files:
print(f" No parquet files found in {partitioned_dir}")
return
print(f" Found {len(parquet_files)} partition files")
# Read all tables
tables = []
for i, pq_file in enumerate(parquet_files):
if i % 10 == 0:
print(f" Reading partition {i+1}/{len(parquet_files)}...")
tables.append(pq.read_table(pq_file))
# Concatenate with schema promotion
print(f" Concatenating {len(tables)} tables...")
combined_table = pa.concat_tables(tables, promote=True)
print(f"βœ… Combined {len(combined_table):,} rows")
print(f"πŸ“Š Unified schema: {combined_table.schema}")
# Write consolidated file
print(f"πŸ’Ύ Writing consolidated file to {output_file}...")
pq.write_table(
combined_table,
output_file,
compression='snappy',
use_dictionary=True,
write_statistics=True
)
file_size = output_file.stat().st_size / 1024 / 1024
print(f"βœ… Wrote {output_file.name} ({file_size:.1f} MB)")
except Exception as e2:
print(f"❌ Failed with alternative approach too: {e2}")
sys.exit(1)
def main():
"""Main consolidation process."""
gold_dir = Path("data/gold")
# Partitioned datasets to consolidate
partitioned_datasets = [
"nonprofits_organizations",
"nonprofits_locations",
"nonprofits_financials",
"nonprofits_programs",
"jurisdictions_cities",
"jurisdictions_counties",
"jurisdictions_school_districts",
"jurisdictions_townships",
"domains_gsa_domains"
]
print("πŸ”„ Consolidating Partitioned Datasets to Single Files")
print("="*70)
print(f"Gold directory: {gold_dir.absolute()}")
print(f"Datasets to consolidate: {len(partitioned_datasets)}")
print()
# Create backup directory
backup_dir = gold_dir / "partitioned_backup"
backup_dir.mkdir(exist_ok=True)
print(f"πŸ“¦ Backup directory: {backup_dir}")
print()
consolidated_count = 0
failed_count = 0
for dataset_name in partitioned_datasets:
partitioned_dir = gold_dir / dataset_name
output_file = gold_dir / f"{dataset_name}.parquet"
if not partitioned_dir.exists():
print(f"⚠️ Skipping {dataset_name} (directory not found)")
continue
if not partitioned_dir.is_dir():
print(f"⚠️ Skipping {dataset_name} (not a directory)")
continue
try:
consolidate_dataset(partitioned_dir, output_file)
# Move partitioned dir to backup
backup_path = backup_dir / dataset_name
if backup_path.exists():
shutil.rmtree(backup_path)
shutil.move(str(partitioned_dir), str(backup_path))
print(f"πŸ“¦ Moved partitioned dir to backup: {backup_path}")
consolidated_count += 1
except Exception as e:
print(f"❌ Failed to consolidate {dataset_name}: {e}")
failed_count += 1
print(f"\n{'='*70}")
print("βœ… CONSOLIDATION COMPLETE")
print(f"{'='*70}")
print(f"βœ… Consolidated: {consolidated_count} datasets")
print(f"❌ Failed: {failed_count} datasets")
print(f"πŸ“¦ Partitioned directories backed up to: {backup_dir}")
print()
print("Next steps:")
print(" 1. Verify the consolidated files work with HuggingFace datasets")
print(" 2. Upload to HuggingFace Hub")
print(" 3. Remove backup directory once confirmed")
print()
if __name__ == "__main__":
main()