open-navigator / scripts /data /split_gold_by_state.py
jcbowyer's picture
Deploy: Consolidated gold tables, fixed nginx docs routing
896453f verified
#!/usr/bin/env python3
"""
Split gold parquet files by state for easier distribution and access.
This script splits large monolithic parquet files into state-specific files:
- nonprofits_organizations.parquet β†’ nonprofits_organizations_AL.parquet, nonprofits_organizations_AK.parquet, etc.
- nonprofits_locations.parquet β†’ nonprofits_locations_AL.parquet, nonprofits_locations_AK.parquet, etc.
- jurisdictions_*.parquet β†’ jurisdictions_*_AL.parquet, etc.
- domains_gsa_domains.parquet β†’ domains_gsa_domains_AL.parquet, etc.
Benefits:
- Smaller file sizes (easier downloads)
- Faster queries (load only needed states)
- Better HuggingFace upload (avoid file size limits)
Usage:
# Split all files
python scripts/split_gold_by_state.py --all
# Split specific file
python scripts/split_gold_by_state.py --file nonprofits_organizations.parquet
# Dry run (see what would happen)
python scripts/split_gold_by_state.py --all --dry-run
"""
import argparse
from pathlib import Path
import pandas as pd
from loguru import logger
from typing import Dict, List
class GoldFileSplitter:
"""Split gold parquet files by state."""
# Files that have direct 'state' column
STATE_COLUMN_FILES = {
'nonprofits_organizations.parquet': 'state',
'nonprofits_locations.parquet': 'state',
'nonprofits_financials.parquet': 'state',
'nonprofits_programs.parquet': 'state',
'nonprofits_tuscaloosa_form990.parquet': 'state',
}
# Files that have 'State' column (capitalized)
STATE_UPPER_FILES = {
'domains_gsa_domains.parquet': 'State',
}
# Files that have 'USPS' column (state abbreviation)
USPS_FILES = {
'jurisdictions_cities.parquet': 'USPS',
'jurisdictions_counties.parquet': 'USPS',
'jurisdictions_school_districts.parquet': 'USPS',
'jurisdictions_townships.parquet': 'USPS',
}
def __init__(self, gold_dir: str = "data/gold", output_dir: str = None):
"""
Initialize splitter.
DEPRECATED: Use create_partitioned_datasets.py instead for better performance.
Args:
gold_dir: Directory containing gold parquet files
output_dir: Directory to write split files (defaults to gold_dir/by_state/)
"""
self.gold_dir = Path(gold_dir)
self.output_dir = Path(output_dir) if output_dir else self.gold_dir / "by_state"
# Create output directory
self.output_dir.mkdir(parents=True, exist_ok=True)
# Combined mapping of all files to split
self.all_files = {
**self.STATE_COLUMN_FILES,
**self.STATE_UPPER_FILES,
**self.USPS_FILES,
}
def get_state_column(self, filename: str) -> str:
"""Get the state column name for a file."""
if filename in self.STATE_COLUMN_FILES:
return self.STATE_COLUMN_FILES[filename]
elif filename in self.STATE_UPPER_FILES:
return self.STATE_UPPER_FILES[filename]
elif filename in self.USPS_FILES:
return self.USPS_FILES[filename]
else:
raise ValueError(f"Unknown file: {filename}")
def split_file(self, filename: str, dry_run: bool = False) -> Dict[str, int]:
"""
Split a single parquet file by state.
Args:
filename: Name of file to split (e.g., 'nonprofits_organizations.parquet')
dry_run: If True, only report what would be done
Returns:
Dict mapping state abbreviation to record count
"""
input_path = self.gold_dir / filename
if not input_path.exists():
logger.warning(f"File not found: {input_path}")
return {}
logger.info(f"πŸ“‚ Processing: {filename}")
# Read the file
df = pd.read_parquet(input_path)
logger.info(f" Total records: {len(df):,}")
# Get state column
state_col = self.get_state_column(filename)
if state_col not in df.columns:
logger.error(f" ❌ Column '{state_col}' not found in {filename}")
logger.error(f" Available columns: {df.columns.tolist()}")
return {}
# Get unique states
unique_states = sorted(df[state_col].dropna().unique())
logger.info(f" Unique states: {len(unique_states)}")
# Split by state
state_counts = {}
base_name = filename.replace('.parquet', '')
for state in unique_states:
state_df = df[df[state_col] == state]
count = len(state_df)
state_counts[state] = count
# Create output filename
output_filename = f"{base_name}_{state}.parquet"
output_path = self.output_dir / output_filename
if dry_run:
logger.info(f" [DRY RUN] Would create: {output_filename} ({count:,} records)")
else:
# Write state-specific file
state_df.to_parquet(output_path, index=False, engine='pyarrow')
size_mb = output_path.stat().st_size / 1024 / 1024
logger.success(f" βœ… Created: {output_filename} ({count:,} records, {size_mb:.2f} MB)")
return state_counts
def split_all(self, dry_run: bool = False) -> None:
"""
Split all configured files by state.
Args:
dry_run: If True, only report what would be done
"""
logger.info("πŸš€ Splitting all gold files by state...")
logger.info(f" Input directory: {self.gold_dir}")
logger.info(f" Output directory: {self.output_dir}")
logger.info("")
total_files = 0
total_states = 0
for filename in self.all_files.keys():
try:
state_counts = self.split_file(filename, dry_run=dry_run)
if state_counts:
total_files += 1
total_states += len(state_counts)
logger.info("")
except Exception as e:
logger.error(f"❌ Error processing {filename}: {e}")
logger.info("")
logger.success("=" * 60)
logger.success(f"βœ… Split {total_files} files into {total_states} state-specific files")
logger.success(f"πŸ“‚ Output directory: {self.output_dir}")
logger.success("=" * 60)
def list_split_files(self) -> List[Path]:
"""List all split files in the output directory."""
return sorted(self.output_dir.glob("*.parquet"))
def get_split_stats(self) -> pd.DataFrame:
"""Get statistics about split files."""
files = self.list_split_files()
stats = []
for f in files:
df = pd.read_parquet(f)
stats.append({
'filename': f.name,
'records': len(df),
'size_mb': f.stat().st_size / 1024 / 1024,
'columns': len(df.columns)
})
return pd.DataFrame(stats)
def main():
parser = argparse.ArgumentParser(
description="Split gold parquet files by state",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Split all files
python scripts/split_gold_by_state.py --all
# Split specific file
python scripts/split_gold_by_state.py --file nonprofits_organizations.parquet
# Dry run (see what would happen)
python scripts/split_gold_by_state.py --all --dry-run
# View statistics
python scripts/split_gold_by_state.py --stats
"""
)
parser.add_argument('--all', action='store_true',
help='Split all configured files')
parser.add_argument('--file', type=str,
help='Split a specific file')
parser.add_argument('--dry-run', action='store_true',
help='Show what would be done without actually splitting')
parser.add_argument('--stats', action='store_true',
help='Show statistics about split files')
parser.add_argument('--gold-dir', type=str, default='data/gold',
help='Directory containing gold parquet files (default: data/gold)')
parser.add_argument('--output-dir', type=str,
help='Output directory for split files (default: data/gold/by_state)')
args = parser.parse_args()
# Initialize splitter
splitter = GoldFileSplitter(
gold_dir=args.gold_dir,
output_dir=args.output_dir
)
# Handle commands
if args.stats:
logger.info("πŸ“Š Split file statistics:")
stats_df = splitter.get_split_stats()
if len(stats_df) == 0:
logger.warning("No split files found. Run with --all first.")
else:
print(stats_df.to_string(index=False))
print(f"\nTotal files: {len(stats_df)}")
print(f"Total records: {stats_df['records'].sum():,}")
print(f"Total size: {stats_df['size_mb'].sum():.2f} MB")
elif args.all:
splitter.split_all(dry_run=args.dry_run)
elif args.file:
splitter.split_file(args.file, dry_run=args.dry_run)
else:
parser.print_help()
if __name__ == "__main__":
main()