open-navigator / scripts /data /create_partitioned_datasets.py
jcbowyer's picture
Deploy: Consolidated gold tables, fixed nginx docs routing
896453f verified
#!/usr/bin/env python3
"""
Create partitioned parquet datasets for efficient state-based queries.
This converts consolidated files into partitioned datasets where each partition
represents a state. This allows:
- Querying the full national dataset
- Automatically filtering to only read needed states (partition pruning)
- Efficient analytics with tools like Spark, DuckDB, Pandas
Example:
# Read only Alabama data (only reads AL partition)
df = pd.read_parquet('data/gold/nonprofits_organizations',
filters=[('state', '=', 'AL')])
# Read multiple states
df = pd.read_parquet('data/gold/nonprofits_organizations',
filters=[('state', 'in', ['AL', 'GA', 'FL'])])
# Read everything (reads all partitions)
df = pd.read_parquet('data/gold/nonprofits_organizations')
Usage:
# Create all partitioned datasets
python scripts/create_partitioned_datasets.py --all
# Create specific dataset
python scripts/create_partitioned_datasets.py --file nonprofits_organizations.parquet
# Dry run
python scripts/create_partitioned_datasets.py --all --dry-run
"""
import argparse
from pathlib import Path
import pandas as pd
from loguru import logger
from typing import List, Dict
class PartitionedDatasetCreator:
"""Create partitioned parquet datasets by state."""
# Files that have direct 'state' column
STATE_COLUMN_FILES = {
'nonprofits_organizations.parquet': 'state',
'nonprofits_locations.parquet': 'state',
}
# Files that need state added via join with organizations (by EIN)
EIN_JOIN_FILES = {
'nonprofits_financials.parquet': 'ein',
'nonprofits_programs.parquet': 'ein',
}
# Files that have 'State' column (capitalized)
STATE_UPPER_FILES = {
'domains_gsa_domains.parquet': 'State',
}
# Files that have 'USPS' column (state abbreviation)
USPS_FILES = {
'jurisdictions_cities.parquet': 'USPS',
'jurisdictions_counties.parquet': 'USPS',
'jurisdictions_school_districts.parquet': 'USPS',
'jurisdictions_townships.parquet': 'USPS',
}
def __init__(self, gold_dir: str = "data/gold", output_dir: str = None):
"""
Initialize creator.
Args:
gold_dir: Directory containing gold parquet files and output location
output_dir: Directory for partitioned datasets (defaults to same as gold_dir)
"""
self.gold_dir = Path(gold_dir)
self.output_dir = Path(output_dir) if output_dir else self.gold_dir
# Combined mapping
self.all_files = {
**self.STATE_COLUMN_FILES,
**self.STATE_UPPER_FILES,
**self.USPS_FILES,
**self.EIN_JOIN_FILES,
}
# Cache for organizations EIN→state mapping (loaded once if needed)
self._ein_state_map = None
def get_state_column(self, filename: str) -> str:
"""Get the state column name for a file."""
if filename in self.STATE_COLUMN_FILES:
return self.STATE_COLUMN_FILES[filename]
elif filename in self.STATE_UPPER_FILES:
return self.STATE_UPPER_FILES[filename]
elif filename in self.USPS_FILES:
return self.USPS_FILES[filename]
elif filename in self.EIN_JOIN_FILES:
return 'state' # Will be added via join
else:
raise ValueError(f"Unknown file: {filename}")
def _load_ein_state_mapping(self):
"""Load EIN→state mapping from organizations (cached)."""
if self._ein_state_map is not None:
return self._ein_state_map
import pyarrow.dataset as ds
logger.info(" Loading EIN→state mapping from organizations...")
org_path = self.output_dir / 'nonprofits_organizations'
# Try partitioned dataset first
if org_path.exists():
dataset = ds.dataset(org_path, format='parquet', partitioning='hive')
table = dataset.to_table(columns=['ein', 'state'])
self._ein_state_map = table.to_pandas()
else:
# Fall back to consolidated file
org_file = self.gold_dir / 'nonprofits_organizations.parquet'
if org_file.exists():
self._ein_state_map = pd.read_parquet(org_file, columns=['ein', 'state'])
else:
raise FileNotFoundError(
"Cannot find nonprofits_organizations dataset or file. "
"Create it first before processing EIN-based files."
)
logger.info(f" Loaded {len(self._ein_state_map):,} EIN→state mappings")
return self._ein_state_map
def create_partitioned_dataset(self, filename: str, dry_run: bool = False) -> bool:
"""
Create a partitioned dataset from a consolidated file.
Args:
filename: Name of file to partition (e.g., 'nonprofits_organizations.parquet')
dry_run: If True, only report what would be done
Returns:
True if successful, False otherwise
"""
input_path = self.gold_dir / filename
if not input_path.exists():
logger.warning(f"File not found: {input_path}")
return False
logger.info(f"πŸ“‚ Processing: {filename}")
# Read the file
df = pd.read_parquet(input_path)
logger.info(f" Total records: {len(df):,}")
# Get state column
state_col = self.get_state_column(filename)
# Check if we need to add state via join
if filename in self.EIN_JOIN_FILES:
ein_col = self.EIN_JOIN_FILES[filename]
if ein_col not in df.columns:
logger.error(f" ❌ Column '{ein_col}' not found in {filename}")
return False
# Load EIN→state mapping
ein_state_map = self._load_ein_state_mapping()
# Join to add state
logger.info(f" Joining with organizations to add state column...")
df = df.merge(ein_state_map[['ein', 'state']], on=ein_col, how='left')
logger.info(f" Records with state: {df['state'].notna().sum():,} / {len(df):,}")
elif state_col not in df.columns:
logger.error(f" ❌ Column '{state_col}' not found in {filename}")
return False
# Normalize column name to 'state' for consistent partitioning
if state_col != 'state':
df['state'] = df[state_col]
# Keep original column too for backward compatibility
# Create output directory name (remove .parquet extension)
dataset_name = filename.replace('.parquet', '')
output_path = self.output_dir / dataset_name
if dry_run:
unique_states = df['state'].nunique()
total_size = input_path.stat().st_size / 1024 / 1024
logger.info(f" [DRY RUN] Would create partitioned dataset:")
logger.info(f" Path: {output_path}")
logger.info(f" Partitions: {unique_states} states")
logger.info(f" Total size: {total_size:.2f} MB")
return True
# Write partitioned dataset
logger.info(f" Writing partitioned dataset to: {output_path}")
df.to_parquet(
output_path,
engine='pyarrow',
partition_cols=['state'],
index=False
)
# Get statistics
partitions = list((output_path).glob('state=*'))
total_size = sum(
sum(f.stat().st_size for f in partition.rglob('*.parquet'))
for partition in partitions
) / 1024 / 1024
logger.success(f"βœ… Created partitioned dataset:")
logger.success(f" Path: {output_path}")
logger.success(f" Partitions: {len(partitions)} states")
logger.success(f" Total size: {total_size:.2f} MB")
logger.success(f" Query example: pd.read_parquet('{output_path}', filters=[('state', '=', 'AL')])")
return True
def create_all(self, dry_run: bool = False) -> Dict[str, bool]:
"""
Create partitioned datasets for all configured files.
Args:
dry_run: If True, only report what would be done
Returns:
Dict mapping filename to success status
"""
logger.info("πŸš€ Creating partitioned datasets...")
logger.info(f" Input directory: {self.gold_dir}")
logger.info(f" Output directory: {self.output_dir}")
logger.info("")
# Create output directory
if not dry_run:
self.output_dir.mkdir(parents=True, exist_ok=True)
results = {}
for filename in self.all_files.keys():
try:
success = self.create_partitioned_dataset(filename, dry_run=dry_run)
results[filename] = success
logger.info("")
except Exception as e:
logger.error(f"❌ Error processing {filename}: {e}")
results[filename] = False
logger.info("")
# Summary
successful = sum(1 for v in results.values() if v)
logger.success("=" * 70)
logger.success(f"βœ… Created {successful}/{len(results)} partitioned datasets")
logger.success(f"πŸ“‚ Output directory: {self.output_dir}")
logger.success("=" * 70)
return results
def main():
parser = argparse.ArgumentParser(
description="Create partitioned parquet datasets by state",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Create all partitioned datasets
python scripts/create_partitioned_datasets.py --all
# Create specific dataset
python scripts/create_partitioned_datasets.py --file nonprofits_organizations.parquet
# Dry run
python scripts/create_partitioned_datasets.py --all --dry-run
Query Examples:
# Read only Alabama data (efficient - only reads AL partition)
import pandas as pd
df = pd.read_parquet('data/gold/partitioned/nonprofits_organizations',
filters=[('state', '=', 'AL')])
# Read multiple states
df = pd.read_parquet('data/gold/partitioned/nonprofits_organizations',
filters=[('state', 'in', ['AL', 'GA', 'FL'])])
# Read all states
df = pd.read_parquet('data/gold/partitioned/nonprofits_organizations')
"""
)
parser.add_argument('--all', action='store_true',
help='Create all partitioned datasets')
parser.add_argument('--file', type=str,
help='Create partitioned dataset for specific file')
parser.add_argument('--dry-run', action='store_true',
help='Show what would be done without creating datasets')
parser.add_argument('--gold-dir', type=str, default='data/gold',
help='Directory containing gold parquet files (default: data/gold)')
parser.add_argument('--output-dir', type=str,
help='Output directory for partitioned datasets (default: same as gold-dir)')
args = parser.parse_args()
# Initialize creator
creator = PartitionedDatasetCreator(
gold_dir=args.gold_dir,
output_dir=args.output_dir
)
# Handle commands
if args.all:
creator.create_all(dry_run=args.dry_run)
elif args.file:
creator.create_partitioned_dataset(args.file, dry_run=args.dry_run)
else:
parser.print_help()
if __name__ == "__main__":
main()