Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| #!/usr/bin/env python3 | |
| """ | |
| Create partitioned parquet datasets for efficient state-based queries. | |
| This converts consolidated files into partitioned datasets where each partition | |
| represents a state. This allows: | |
| - Querying the full national dataset | |
| - Automatically filtering to only read needed states (partition pruning) | |
| - Efficient analytics with tools like Spark, DuckDB, Pandas | |
| Example: | |
| # Read only Alabama data (only reads AL partition) | |
| df = pd.read_parquet('data/gold/nonprofits_organizations', | |
| filters=[('state', '=', 'AL')]) | |
| # Read multiple states | |
| df = pd.read_parquet('data/gold/nonprofits_organizations', | |
| filters=[('state', 'in', ['AL', 'GA', 'FL'])]) | |
| # Read everything (reads all partitions) | |
| df = pd.read_parquet('data/gold/nonprofits_organizations') | |
| Usage: | |
| # Create all partitioned datasets | |
| python scripts/create_partitioned_datasets.py --all | |
| # Create specific dataset | |
| python scripts/create_partitioned_datasets.py --file nonprofits_organizations.parquet | |
| # Dry run | |
| python scripts/create_partitioned_datasets.py --all --dry-run | |
| """ | |
| import argparse | |
| from pathlib import Path | |
| import pandas as pd | |
| from loguru import logger | |
| from typing import List, Dict | |
| class PartitionedDatasetCreator: | |
| """Create partitioned parquet datasets by state.""" | |
| # Files that have direct 'state' column | |
| STATE_COLUMN_FILES = { | |
| 'nonprofits_organizations.parquet': 'state', | |
| 'nonprofits_locations.parquet': 'state', | |
| } | |
| # Files that need state added via join with organizations (by EIN) | |
| EIN_JOIN_FILES = { | |
| 'nonprofits_financials.parquet': 'ein', | |
| 'nonprofits_programs.parquet': 'ein', | |
| } | |
| # Files that have 'State' column (capitalized) | |
| STATE_UPPER_FILES = { | |
| 'domains_gsa_domains.parquet': 'State', | |
| } | |
| # Files that have 'USPS' column (state abbreviation) | |
| USPS_FILES = { | |
| 'jurisdictions_cities.parquet': 'USPS', | |
| 'jurisdictions_counties.parquet': 'USPS', | |
| 'jurisdictions_school_districts.parquet': 'USPS', | |
| 'jurisdictions_townships.parquet': 'USPS', | |
| } | |
| def __init__(self, gold_dir: str = "data/gold", output_dir: str = None): | |
| """ | |
| Initialize creator. | |
| Args: | |
| gold_dir: Directory containing gold parquet files and output location | |
| output_dir: Directory for partitioned datasets (defaults to same as gold_dir) | |
| """ | |
| self.gold_dir = Path(gold_dir) | |
| self.output_dir = Path(output_dir) if output_dir else self.gold_dir | |
| # Combined mapping | |
| self.all_files = { | |
| **self.STATE_COLUMN_FILES, | |
| **self.STATE_UPPER_FILES, | |
| **self.USPS_FILES, | |
| **self.EIN_JOIN_FILES, | |
| } | |
| # Cache for organizations EINβstate mapping (loaded once if needed) | |
| self._ein_state_map = None | |
| def get_state_column(self, filename: str) -> str: | |
| """Get the state column name for a file.""" | |
| if filename in self.STATE_COLUMN_FILES: | |
| return self.STATE_COLUMN_FILES[filename] | |
| elif filename in self.STATE_UPPER_FILES: | |
| return self.STATE_UPPER_FILES[filename] | |
| elif filename in self.USPS_FILES: | |
| return self.USPS_FILES[filename] | |
| elif filename in self.EIN_JOIN_FILES: | |
| return 'state' # Will be added via join | |
| else: | |
| raise ValueError(f"Unknown file: {filename}") | |
| def _load_ein_state_mapping(self): | |
| """Load EINβstate mapping from organizations (cached).""" | |
| if self._ein_state_map is not None: | |
| return self._ein_state_map | |
| import pyarrow.dataset as ds | |
| logger.info(" Loading EINβstate mapping from organizations...") | |
| org_path = self.output_dir / 'nonprofits_organizations' | |
| # Try partitioned dataset first | |
| if org_path.exists(): | |
| dataset = ds.dataset(org_path, format='parquet', partitioning='hive') | |
| table = dataset.to_table(columns=['ein', 'state']) | |
| self._ein_state_map = table.to_pandas() | |
| else: | |
| # Fall back to consolidated file | |
| org_file = self.gold_dir / 'nonprofits_organizations.parquet' | |
| if org_file.exists(): | |
| self._ein_state_map = pd.read_parquet(org_file, columns=['ein', 'state']) | |
| else: | |
| raise FileNotFoundError( | |
| "Cannot find nonprofits_organizations dataset or file. " | |
| "Create it first before processing EIN-based files." | |
| ) | |
| logger.info(f" Loaded {len(self._ein_state_map):,} EINβstate mappings") | |
| return self._ein_state_map | |
| def create_partitioned_dataset(self, filename: str, dry_run: bool = False) -> bool: | |
| """ | |
| Create a partitioned dataset from a consolidated file. | |
| Args: | |
| filename: Name of file to partition (e.g., 'nonprofits_organizations.parquet') | |
| dry_run: If True, only report what would be done | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| input_path = self.gold_dir / filename | |
| if not input_path.exists(): | |
| logger.warning(f"File not found: {input_path}") | |
| return False | |
| logger.info(f"π Processing: {filename}") | |
| # Read the file | |
| df = pd.read_parquet(input_path) | |
| logger.info(f" Total records: {len(df):,}") | |
| # Get state column | |
| state_col = self.get_state_column(filename) | |
| # Check if we need to add state via join | |
| if filename in self.EIN_JOIN_FILES: | |
| ein_col = self.EIN_JOIN_FILES[filename] | |
| if ein_col not in df.columns: | |
| logger.error(f" β Column '{ein_col}' not found in {filename}") | |
| return False | |
| # Load EINβstate mapping | |
| ein_state_map = self._load_ein_state_mapping() | |
| # Join to add state | |
| logger.info(f" Joining with organizations to add state column...") | |
| df = df.merge(ein_state_map[['ein', 'state']], on=ein_col, how='left') | |
| logger.info(f" Records with state: {df['state'].notna().sum():,} / {len(df):,}") | |
| elif state_col not in df.columns: | |
| logger.error(f" β Column '{state_col}' not found in {filename}") | |
| return False | |
| # Normalize column name to 'state' for consistent partitioning | |
| if state_col != 'state': | |
| df['state'] = df[state_col] | |
| # Keep original column too for backward compatibility | |
| # Create output directory name (remove .parquet extension) | |
| dataset_name = filename.replace('.parquet', '') | |
| output_path = self.output_dir / dataset_name | |
| if dry_run: | |
| unique_states = df['state'].nunique() | |
| total_size = input_path.stat().st_size / 1024 / 1024 | |
| logger.info(f" [DRY RUN] Would create partitioned dataset:") | |
| logger.info(f" Path: {output_path}") | |
| logger.info(f" Partitions: {unique_states} states") | |
| logger.info(f" Total size: {total_size:.2f} MB") | |
| return True | |
| # Write partitioned dataset | |
| logger.info(f" Writing partitioned dataset to: {output_path}") | |
| df.to_parquet( | |
| output_path, | |
| engine='pyarrow', | |
| partition_cols=['state'], | |
| index=False | |
| ) | |
| # Get statistics | |
| partitions = list((output_path).glob('state=*')) | |
| total_size = sum( | |
| sum(f.stat().st_size for f in partition.rglob('*.parquet')) | |
| for partition in partitions | |
| ) / 1024 / 1024 | |
| logger.success(f"β Created partitioned dataset:") | |
| logger.success(f" Path: {output_path}") | |
| logger.success(f" Partitions: {len(partitions)} states") | |
| logger.success(f" Total size: {total_size:.2f} MB") | |
| logger.success(f" Query example: pd.read_parquet('{output_path}', filters=[('state', '=', 'AL')])") | |
| return True | |
| def create_all(self, dry_run: bool = False) -> Dict[str, bool]: | |
| """ | |
| Create partitioned datasets for all configured files. | |
| Args: | |
| dry_run: If True, only report what would be done | |
| Returns: | |
| Dict mapping filename to success status | |
| """ | |
| logger.info("π Creating partitioned datasets...") | |
| logger.info(f" Input directory: {self.gold_dir}") | |
| logger.info(f" Output directory: {self.output_dir}") | |
| logger.info("") | |
| # Create output directory | |
| if not dry_run: | |
| self.output_dir.mkdir(parents=True, exist_ok=True) | |
| results = {} | |
| for filename in self.all_files.keys(): | |
| try: | |
| success = self.create_partitioned_dataset(filename, dry_run=dry_run) | |
| results[filename] = success | |
| logger.info("") | |
| except Exception as e: | |
| logger.error(f"β Error processing {filename}: {e}") | |
| results[filename] = False | |
| logger.info("") | |
| # Summary | |
| successful = sum(1 for v in results.values() if v) | |
| logger.success("=" * 70) | |
| logger.success(f"β Created {successful}/{len(results)} partitioned datasets") | |
| logger.success(f"π Output directory: {self.output_dir}") | |
| logger.success("=" * 70) | |
| return results | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Create partitioned parquet datasets by state", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| # Create all partitioned datasets | |
| python scripts/create_partitioned_datasets.py --all | |
| # Create specific dataset | |
| python scripts/create_partitioned_datasets.py --file nonprofits_organizations.parquet | |
| # Dry run | |
| python scripts/create_partitioned_datasets.py --all --dry-run | |
| Query Examples: | |
| # Read only Alabama data (efficient - only reads AL partition) | |
| import pandas as pd | |
| df = pd.read_parquet('data/gold/partitioned/nonprofits_organizations', | |
| filters=[('state', '=', 'AL')]) | |
| # Read multiple states | |
| df = pd.read_parquet('data/gold/partitioned/nonprofits_organizations', | |
| filters=[('state', 'in', ['AL', 'GA', 'FL'])]) | |
| # Read all states | |
| df = pd.read_parquet('data/gold/partitioned/nonprofits_organizations') | |
| """ | |
| ) | |
| parser.add_argument('--all', action='store_true', | |
| help='Create all partitioned datasets') | |
| parser.add_argument('--file', type=str, | |
| help='Create partitioned dataset for specific file') | |
| parser.add_argument('--dry-run', action='store_true', | |
| help='Show what would be done without creating datasets') | |
| parser.add_argument('--gold-dir', type=str, default='data/gold', | |
| help='Directory containing gold parquet files (default: data/gold)') | |
| parser.add_argument('--output-dir', type=str, | |
| help='Output directory for partitioned datasets (default: same as gold-dir)') | |
| args = parser.parse_args() | |
| # Initialize creator | |
| creator = PartitionedDatasetCreator( | |
| gold_dir=args.gold_dir, | |
| output_dir=args.output_dir | |
| ) | |
| # Handle commands | |
| if args.all: | |
| creator.create_all(dry_run=args.dry_run) | |
| elif args.file: | |
| creator.create_partitioned_dataset(args.file, dry_run=args.dry_run) | |
| else: | |
| parser.print_help() | |
| if __name__ == "__main__": | |
| main() | |