Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| #!/usr/bin/env python3 | |
| """ | |
| Consolidate partitioned datasets back into single Parquet files. | |
| This script reads partitioned datasets (state=AL/, state=CA/, etc.) and | |
| combines them into single consolidated files that work with HuggingFace datasets. | |
| Usage: | |
| python scripts/consolidate_partitioned_datasets.py | |
| """ | |
| import pyarrow.parquet as pq | |
| import pyarrow as pa | |
| from pathlib import Path | |
| import shutil | |
| import sys | |
| def consolidate_dataset(partitioned_dir: Path, output_file: Path) -> None: | |
| """ | |
| Read a partitioned dataset and write it as a single consolidated file. | |
| Args: | |
| partitioned_dir: Path to partitioned dataset directory | |
| output_file: Path to output consolidated parquet file | |
| """ | |
| print(f"\n{'='*70}") | |
| print(f"Consolidating: {partitioned_dir.name}") | |
| print(f"{'='*70}") | |
| if not partitioned_dir.exists(): | |
| print(f"β οΈ Directory not found: {partitioned_dir}") | |
| return | |
| try: | |
| # Read the entire partitioned dataset using PyArrow | |
| # This will handle any schema inconsistencies by taking the union of schemas | |
| print(f"π Reading partitioned data from {partitioned_dir}...") | |
| dataset = pq.ParquetDataset(str(partitioned_dir), use_legacy_dataset=False) | |
| table = dataset.read() | |
| print(f"β Loaded {len(table):,} rows") | |
| print(f"π Schema: {table.schema}") | |
| print(f"πΎ Memory size: {table.nbytes / 1024 / 1024:.1f} MB") | |
| # Write consolidated file | |
| print(f"πΎ Writing consolidated file to {output_file}...") | |
| pq.write_table( | |
| table, | |
| output_file, | |
| compression='snappy', | |
| use_dictionary=True, | |
| write_statistics=True | |
| ) | |
| file_size = output_file.stat().st_size / 1024 / 1024 | |
| print(f"β Wrote {output_file.name} ({file_size:.1f} MB)") | |
| except Exception as e: | |
| print(f"β Error consolidating {partitioned_dir.name}: {e}") | |
| print(f" Will try reading with schema unification...") | |
| try: | |
| # Alternative approach: read all parquet files and concatenate | |
| parquet_files = list(partitioned_dir.rglob("*.parquet")) | |
| if not parquet_files: | |
| print(f" No parquet files found in {partitioned_dir}") | |
| return | |
| print(f" Found {len(parquet_files)} partition files") | |
| # Read all tables | |
| tables = [] | |
| for i, pq_file in enumerate(parquet_files): | |
| if i % 10 == 0: | |
| print(f" Reading partition {i+1}/{len(parquet_files)}...") | |
| tables.append(pq.read_table(pq_file)) | |
| # Concatenate with schema promotion | |
| print(f" Concatenating {len(tables)} tables...") | |
| combined_table = pa.concat_tables(tables, promote=True) | |
| print(f"β Combined {len(combined_table):,} rows") | |
| print(f"π Unified schema: {combined_table.schema}") | |
| # Write consolidated file | |
| print(f"πΎ Writing consolidated file to {output_file}...") | |
| pq.write_table( | |
| combined_table, | |
| output_file, | |
| compression='snappy', | |
| use_dictionary=True, | |
| write_statistics=True | |
| ) | |
| file_size = output_file.stat().st_size / 1024 / 1024 | |
| print(f"β Wrote {output_file.name} ({file_size:.1f} MB)") | |
| except Exception as e2: | |
| print(f"β Failed with alternative approach too: {e2}") | |
| sys.exit(1) | |
| def main(): | |
| """Main consolidation process.""" | |
| gold_dir = Path("data/gold") | |
| # Partitioned datasets to consolidate | |
| partitioned_datasets = [ | |
| "nonprofits_organizations", | |
| "nonprofits_locations", | |
| "nonprofits_financials", | |
| "nonprofits_programs", | |
| "jurisdictions_cities", | |
| "jurisdictions_counties", | |
| "jurisdictions_school_districts", | |
| "jurisdictions_townships", | |
| "domains_gsa_domains" | |
| ] | |
| print("π Consolidating Partitioned Datasets to Single Files") | |
| print("="*70) | |
| print(f"Gold directory: {gold_dir.absolute()}") | |
| print(f"Datasets to consolidate: {len(partitioned_datasets)}") | |
| print() | |
| # Create backup directory | |
| backup_dir = gold_dir / "partitioned_backup" | |
| backup_dir.mkdir(exist_ok=True) | |
| print(f"π¦ Backup directory: {backup_dir}") | |
| print() | |
| consolidated_count = 0 | |
| failed_count = 0 | |
| for dataset_name in partitioned_datasets: | |
| partitioned_dir = gold_dir / dataset_name | |
| output_file = gold_dir / f"{dataset_name}.parquet" | |
| if not partitioned_dir.exists(): | |
| print(f"β οΈ Skipping {dataset_name} (directory not found)") | |
| continue | |
| if not partitioned_dir.is_dir(): | |
| print(f"β οΈ Skipping {dataset_name} (not a directory)") | |
| continue | |
| try: | |
| consolidate_dataset(partitioned_dir, output_file) | |
| # Move partitioned dir to backup | |
| backup_path = backup_dir / dataset_name | |
| if backup_path.exists(): | |
| shutil.rmtree(backup_path) | |
| shutil.move(str(partitioned_dir), str(backup_path)) | |
| print(f"π¦ Moved partitioned dir to backup: {backup_path}") | |
| consolidated_count += 1 | |
| except Exception as e: | |
| print(f"β Failed to consolidate {dataset_name}: {e}") | |
| failed_count += 1 | |
| print(f"\n{'='*70}") | |
| print("β CONSOLIDATION COMPLETE") | |
| print(f"{'='*70}") | |
| print(f"β Consolidated: {consolidated_count} datasets") | |
| print(f"β Failed: {failed_count} datasets") | |
| print(f"π¦ Partitioned directories backed up to: {backup_dir}") | |
| print() | |
| print("Next steps:") | |
| print(" 1. Verify the consolidated files work with HuggingFace datasets") | |
| print(" 2. Upload to HuggingFace Hub") | |
| print(" 3. Remove backup directory once confirmed") | |
| print() | |
| if __name__ == "__main__": | |
| main() | |