Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| #!/usr/bin/env python3 | |
| """ | |
| Split gold parquet files by state for easier distribution and access. | |
| This script splits large monolithic parquet files into state-specific files: | |
| - nonprofits_organizations.parquet β nonprofits_organizations_AL.parquet, nonprofits_organizations_AK.parquet, etc. | |
| - nonprofits_locations.parquet β nonprofits_locations_AL.parquet, nonprofits_locations_AK.parquet, etc. | |
| - jurisdictions_*.parquet β jurisdictions_*_AL.parquet, etc. | |
| - domains_gsa_domains.parquet β domains_gsa_domains_AL.parquet, etc. | |
| Benefits: | |
| - Smaller file sizes (easier downloads) | |
| - Faster queries (load only needed states) | |
| - Better HuggingFace upload (avoid file size limits) | |
| Usage: | |
| # Split all files | |
| python scripts/split_gold_by_state.py --all | |
| # Split specific file | |
| python scripts/split_gold_by_state.py --file nonprofits_organizations.parquet | |
| # Dry run (see what would happen) | |
| python scripts/split_gold_by_state.py --all --dry-run | |
| """ | |
| import argparse | |
| from pathlib import Path | |
| import pandas as pd | |
| from loguru import logger | |
| from typing import Dict, List | |
| class GoldFileSplitter: | |
| """Split gold parquet files by state.""" | |
| # Files that have direct 'state' column | |
| STATE_COLUMN_FILES = { | |
| 'nonprofits_organizations.parquet': 'state', | |
| 'nonprofits_locations.parquet': 'state', | |
| 'nonprofits_financials.parquet': 'state', | |
| 'nonprofits_programs.parquet': 'state', | |
| 'nonprofits_tuscaloosa_form990.parquet': 'state', | |
| } | |
| # Files that have 'State' column (capitalized) | |
| STATE_UPPER_FILES = { | |
| 'domains_gsa_domains.parquet': 'State', | |
| } | |
| # Files that have 'USPS' column (state abbreviation) | |
| USPS_FILES = { | |
| 'jurisdictions_cities.parquet': 'USPS', | |
| 'jurisdictions_counties.parquet': 'USPS', | |
| 'jurisdictions_school_districts.parquet': 'USPS', | |
| 'jurisdictions_townships.parquet': 'USPS', | |
| } | |
| def __init__(self, gold_dir: str = "data/gold", output_dir: str = None): | |
| """ | |
| Initialize splitter. | |
| DEPRECATED: Use create_partitioned_datasets.py instead for better performance. | |
| Args: | |
| gold_dir: Directory containing gold parquet files | |
| output_dir: Directory to write split files (defaults to gold_dir/by_state/) | |
| """ | |
| self.gold_dir = Path(gold_dir) | |
| self.output_dir = Path(output_dir) if output_dir else self.gold_dir / "by_state" | |
| # Create output directory | |
| self.output_dir.mkdir(parents=True, exist_ok=True) | |
| # Combined mapping of all files to split | |
| self.all_files = { | |
| **self.STATE_COLUMN_FILES, | |
| **self.STATE_UPPER_FILES, | |
| **self.USPS_FILES, | |
| } | |
| def get_state_column(self, filename: str) -> str: | |
| """Get the state column name for a file.""" | |
| if filename in self.STATE_COLUMN_FILES: | |
| return self.STATE_COLUMN_FILES[filename] | |
| elif filename in self.STATE_UPPER_FILES: | |
| return self.STATE_UPPER_FILES[filename] | |
| elif filename in self.USPS_FILES: | |
| return self.USPS_FILES[filename] | |
| else: | |
| raise ValueError(f"Unknown file: {filename}") | |
| def split_file(self, filename: str, dry_run: bool = False) -> Dict[str, int]: | |
| """ | |
| Split a single parquet file by state. | |
| Args: | |
| filename: Name of file to split (e.g., 'nonprofits_organizations.parquet') | |
| dry_run: If True, only report what would be done | |
| Returns: | |
| Dict mapping state abbreviation to record count | |
| """ | |
| input_path = self.gold_dir / filename | |
| if not input_path.exists(): | |
| logger.warning(f"File not found: {input_path}") | |
| return {} | |
| logger.info(f"π Processing: {filename}") | |
| # Read the file | |
| df = pd.read_parquet(input_path) | |
| logger.info(f" Total records: {len(df):,}") | |
| # Get state column | |
| state_col = self.get_state_column(filename) | |
| if state_col not in df.columns: | |
| logger.error(f" β Column '{state_col}' not found in {filename}") | |
| logger.error(f" Available columns: {df.columns.tolist()}") | |
| return {} | |
| # Get unique states | |
| unique_states = sorted(df[state_col].dropna().unique()) | |
| logger.info(f" Unique states: {len(unique_states)}") | |
| # Split by state | |
| state_counts = {} | |
| base_name = filename.replace('.parquet', '') | |
| for state in unique_states: | |
| state_df = df[df[state_col] == state] | |
| count = len(state_df) | |
| state_counts[state] = count | |
| # Create output filename | |
| output_filename = f"{base_name}_{state}.parquet" | |
| output_path = self.output_dir / output_filename | |
| if dry_run: | |
| logger.info(f" [DRY RUN] Would create: {output_filename} ({count:,} records)") | |
| else: | |
| # Write state-specific file | |
| state_df.to_parquet(output_path, index=False, engine='pyarrow') | |
| size_mb = output_path.stat().st_size / 1024 / 1024 | |
| logger.success(f" β Created: {output_filename} ({count:,} records, {size_mb:.2f} MB)") | |
| return state_counts | |
| def split_all(self, dry_run: bool = False) -> None: | |
| """ | |
| Split all configured files by state. | |
| Args: | |
| dry_run: If True, only report what would be done | |
| """ | |
| logger.info("π Splitting all gold files by state...") | |
| logger.info(f" Input directory: {self.gold_dir}") | |
| logger.info(f" Output directory: {self.output_dir}") | |
| logger.info("") | |
| total_files = 0 | |
| total_states = 0 | |
| for filename in self.all_files.keys(): | |
| try: | |
| state_counts = self.split_file(filename, dry_run=dry_run) | |
| if state_counts: | |
| total_files += 1 | |
| total_states += len(state_counts) | |
| logger.info("") | |
| except Exception as e: | |
| logger.error(f"β Error processing {filename}: {e}") | |
| logger.info("") | |
| logger.success("=" * 60) | |
| logger.success(f"β Split {total_files} files into {total_states} state-specific files") | |
| logger.success(f"π Output directory: {self.output_dir}") | |
| logger.success("=" * 60) | |
| def list_split_files(self) -> List[Path]: | |
| """List all split files in the output directory.""" | |
| return sorted(self.output_dir.glob("*.parquet")) | |
| def get_split_stats(self) -> pd.DataFrame: | |
| """Get statistics about split files.""" | |
| files = self.list_split_files() | |
| stats = [] | |
| for f in files: | |
| df = pd.read_parquet(f) | |
| stats.append({ | |
| 'filename': f.name, | |
| 'records': len(df), | |
| 'size_mb': f.stat().st_size / 1024 / 1024, | |
| 'columns': len(df.columns) | |
| }) | |
| return pd.DataFrame(stats) | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Split gold parquet files by state", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| # Split all files | |
| python scripts/split_gold_by_state.py --all | |
| # Split specific file | |
| python scripts/split_gold_by_state.py --file nonprofits_organizations.parquet | |
| # Dry run (see what would happen) | |
| python scripts/split_gold_by_state.py --all --dry-run | |
| # View statistics | |
| python scripts/split_gold_by_state.py --stats | |
| """ | |
| ) | |
| parser.add_argument('--all', action='store_true', | |
| help='Split all configured files') | |
| parser.add_argument('--file', type=str, | |
| help='Split a specific file') | |
| parser.add_argument('--dry-run', action='store_true', | |
| help='Show what would be done without actually splitting') | |
| parser.add_argument('--stats', action='store_true', | |
| help='Show statistics about split files') | |
| parser.add_argument('--gold-dir', type=str, default='data/gold', | |
| help='Directory containing gold parquet files (default: data/gold)') | |
| parser.add_argument('--output-dir', type=str, | |
| help='Output directory for split files (default: data/gold/by_state)') | |
| args = parser.parse_args() | |
| # Initialize splitter | |
| splitter = GoldFileSplitter( | |
| gold_dir=args.gold_dir, | |
| output_dir=args.output_dir | |
| ) | |
| # Handle commands | |
| if args.stats: | |
| logger.info("π Split file statistics:") | |
| stats_df = splitter.get_split_stats() | |
| if len(stats_df) == 0: | |
| logger.warning("No split files found. Run with --all first.") | |
| else: | |
| print(stats_df.to_string(index=False)) | |
| print(f"\nTotal files: {len(stats_df)}") | |
| print(f"Total records: {stats_df['records'].sum():,}") | |
| print(f"Total size: {stats_df['size_mb'].sum():.2f} MB") | |
| elif args.all: | |
| splitter.split_all(dry_run=args.dry_run) | |
| elif args.file: | |
| splitter.split_file(args.file, dry_run=args.dry_run) | |
| else: | |
| parser.print_help() | |
| if __name__ == "__main__": | |
| main() | |