Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
File size: 6,232 Bytes
896453f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 | #!/usr/bin/env python3
"""
Consolidate partitioned datasets back into single Parquet files.
This script reads partitioned datasets (state=AL/, state=CA/, etc.) and
combines them into single consolidated files that work with HuggingFace datasets.
Usage:
python scripts/consolidate_partitioned_datasets.py
"""
import pyarrow.parquet as pq
import pyarrow as pa
from pathlib import Path
import shutil
import sys
def consolidate_dataset(partitioned_dir: Path, output_file: Path) -> None:
"""
Read a partitioned dataset and write it as a single consolidated file.
Args:
partitioned_dir: Path to partitioned dataset directory
output_file: Path to output consolidated parquet file
"""
print(f"\n{'='*70}")
print(f"Consolidating: {partitioned_dir.name}")
print(f"{'='*70}")
if not partitioned_dir.exists():
print(f"β οΈ Directory not found: {partitioned_dir}")
return
try:
# Read the entire partitioned dataset using PyArrow
# This will handle any schema inconsistencies by taking the union of schemas
print(f"π Reading partitioned data from {partitioned_dir}...")
dataset = pq.ParquetDataset(str(partitioned_dir), use_legacy_dataset=False)
table = dataset.read()
print(f"β
Loaded {len(table):,} rows")
print(f"π Schema: {table.schema}")
print(f"πΎ Memory size: {table.nbytes / 1024 / 1024:.1f} MB")
# Write consolidated file
print(f"πΎ Writing consolidated file to {output_file}...")
pq.write_table(
table,
output_file,
compression='snappy',
use_dictionary=True,
write_statistics=True
)
file_size = output_file.stat().st_size / 1024 / 1024
print(f"β
Wrote {output_file.name} ({file_size:.1f} MB)")
except Exception as e:
print(f"β Error consolidating {partitioned_dir.name}: {e}")
print(f" Will try reading with schema unification...")
try:
# Alternative approach: read all parquet files and concatenate
parquet_files = list(partitioned_dir.rglob("*.parquet"))
if not parquet_files:
print(f" No parquet files found in {partitioned_dir}")
return
print(f" Found {len(parquet_files)} partition files")
# Read all tables
tables = []
for i, pq_file in enumerate(parquet_files):
if i % 10 == 0:
print(f" Reading partition {i+1}/{len(parquet_files)}...")
tables.append(pq.read_table(pq_file))
# Concatenate with schema promotion
print(f" Concatenating {len(tables)} tables...")
combined_table = pa.concat_tables(tables, promote=True)
print(f"β
Combined {len(combined_table):,} rows")
print(f"π Unified schema: {combined_table.schema}")
# Write consolidated file
print(f"πΎ Writing consolidated file to {output_file}...")
pq.write_table(
combined_table,
output_file,
compression='snappy',
use_dictionary=True,
write_statistics=True
)
file_size = output_file.stat().st_size / 1024 / 1024
print(f"β
Wrote {output_file.name} ({file_size:.1f} MB)")
except Exception as e2:
print(f"β Failed with alternative approach too: {e2}")
sys.exit(1)
def main():
"""Main consolidation process."""
gold_dir = Path("data/gold")
# Partitioned datasets to consolidate
partitioned_datasets = [
"nonprofits_organizations",
"nonprofits_locations",
"nonprofits_financials",
"nonprofits_programs",
"jurisdictions_cities",
"jurisdictions_counties",
"jurisdictions_school_districts",
"jurisdictions_townships",
"domains_gsa_domains"
]
print("π Consolidating Partitioned Datasets to Single Files")
print("="*70)
print(f"Gold directory: {gold_dir.absolute()}")
print(f"Datasets to consolidate: {len(partitioned_datasets)}")
print()
# Create backup directory
backup_dir = gold_dir / "partitioned_backup"
backup_dir.mkdir(exist_ok=True)
print(f"π¦ Backup directory: {backup_dir}")
print()
consolidated_count = 0
failed_count = 0
for dataset_name in partitioned_datasets:
partitioned_dir = gold_dir / dataset_name
output_file = gold_dir / f"{dataset_name}.parquet"
if not partitioned_dir.exists():
print(f"β οΈ Skipping {dataset_name} (directory not found)")
continue
if not partitioned_dir.is_dir():
print(f"β οΈ Skipping {dataset_name} (not a directory)")
continue
try:
consolidate_dataset(partitioned_dir, output_file)
# Move partitioned dir to backup
backup_path = backup_dir / dataset_name
if backup_path.exists():
shutil.rmtree(backup_path)
shutil.move(str(partitioned_dir), str(backup_path))
print(f"π¦ Moved partitioned dir to backup: {backup_path}")
consolidated_count += 1
except Exception as e:
print(f"β Failed to consolidate {dataset_name}: {e}")
failed_count += 1
print(f"\n{'='*70}")
print("β
CONSOLIDATION COMPLETE")
print(f"{'='*70}")
print(f"β
Consolidated: {consolidated_count} datasets")
print(f"β Failed: {failed_count} datasets")
print(f"π¦ Partitioned directories backed up to: {backup_dir}")
print()
print("Next steps:")
print(" 1. Verify the consolidated files work with HuggingFace datasets")
print(" 2. Upload to HuggingFace Hub")
print(" 3. Remove backup directory once confirmed")
print()
if __name__ == "__main__":
main()
|