""" Rebuild Gold Tables - Consolidated Structure This script clears the current gold directory and rebuilds it with a minimal, consolidated file structure for HuggingFace deployment. BEFORE (86 files): - data/gold/states/AL/bills_bills.parquet - data/gold/states/GA/bills_bills.parquet - data/gold/states/MA/bills_bills.parquet - ...repeated for 5 states across 15 table types AFTER (~15-20 files): - data/gold/bills.parquet (all states combined with `state` column) - data/gold/nonprofits_organizations.parquet (all states combined) - data/gold/events.parquet (all states combined) - ...etc Benefits: - Reduces file count by 75% - Easier to query (no need to union across states) - Better for HuggingFace size limits (can partition large files if needed) - Simpler API code (read once vs read from 5 state directories) Usage: python scripts/data/rebuild_consolidated_gold.py python scripts/data/rebuild_consolidated_gold.py --dry-run """ import pandas as pd from pathlib import Path from typing import List, Dict from loguru import logger import shutil import sys # Add project root to path sys.path.insert(0, str(Path(__file__).parent.parent.parent)) class GoldRebuilder: """Rebuild gold tables in consolidated format""" def __init__(self, gold_dir: str = "data/gold", dry_run: bool = False): self.gold_dir = Path(gold_dir) self.dry_run = dry_run # Backup directory from datetime import datetime timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") self.backup_dir = self.gold_dir.parent / f"gold_backup_{timestamp}" logger.info(f"Gold directory: {self.gold_dir}") logger.info(f"Backup directory: {self.backup_dir}") logger.info(f"Dry run: {dry_run}") def backup_current_gold(self): """Backup current gold directory before rebuilding""" if self.gold_dir.exists(): logger.info(f"Backing up {self.gold_dir} to {self.backup_dir}...") if not self.dry_run: shutil.copytree(self.gold_dir, self.backup_dir) logger.success(f"✅ Backup created at {self.backup_dir}") else: logger.info(f"[DRY RUN] Would backup to {self.backup_dir}") def consolidate_table_type(self, table_name: str, states: List[str]) -> pd.DataFrame: """ Consolidate a single table type across multiple states Args: table_name: Name like "bills_bills", "nonprofits_organizations", etc states: List of state codes like ["AL", "GA", "MA", "WA", "WI"] Returns: Combined DataFrame with `state` column added """ logger.info(f"Consolidating {table_name} across {len(states)} states...") dfs = [] for state in states: state_file = self.gold_dir / "states" / state / f"{table_name}.parquet" if state_file.exists(): df = pd.read_parquet(state_file) # Add state column if it doesn't exist if 'state' not in df.columns: df['state'] = state dfs.append(df) logger.debug(f" ✓ {state}: {len(df):,} rows") else: logger.warning(f" ✗ {state}: File not found at {state_file}") if not dfs: logger.warning(f"No data found for {table_name}") return pd.DataFrame() combined = pd.concat(dfs, ignore_index=True) logger.success(f"✅ Combined {len(combined):,} total rows") return combined def rebuild_consolidated_structure(self): """Rebuild gold directory with consolidated files""" logger.info("=" * 70) logger.info("REBUILDING GOLD TABLES - CONSOLIDATED STRUCTURE") logger.info("=" * 70) # Detect which states we have states_dir = self.gold_dir / "states" if not states_dir.exists(): logger.error(f"No states directory found at {states_dir}") return states = sorted([d.name for d in states_dir.iterdir() if d.is_dir()]) logger.info(f"Found states: {', '.join(states)}") # Detect all table types from first state sample_state = states[0] if states else None if not sample_state: logger.error("No states found") return sample_dir = states_dir / sample_state table_files = sorted([f.stem for f in sample_dir.glob("*.parquet")]) logger.info(f"Found table types: {', '.join(table_files)}") # Create new gold directory new_gold_dir = self.gold_dir.parent / "gold_consolidated" if not self.dry_run: new_gold_dir.mkdir(exist_ok=True) logger.info(f"Created {new_gold_dir}") # Consolidate each table type consolidated_files = {} for table_name in table_files: logger.info("") logger.info(f"📊 Processing: {table_name}") logger.info("-" * 70) combined_df = self.consolidate_table_type(table_name, states) if not combined_df.empty: output_file = new_gold_dir / f"{table_name}.parquet" if not self.dry_run: combined_df.to_parquet(output_file, index=False) size_mb = output_file.stat().st_size / (1024 * 1024) logger.success(f"✅ Saved {output_file.name}: {len(combined_df):,} rows, {size_mb:.2f} MB") # Check if file is too large for HuggingFace (>500MB recommended limit) if size_mb > 500: logger.warning(f"⚠️ {table_name} is large ({size_mb:.2f} MB) - consider partitioning") else: logger.info(f"[DRY RUN] Would save {output_file.name}: {len(combined_df):,} rows") consolidated_files[table_name] = len(combined_df) # Copy national and reference tables (these are already consolidated) logger.info("") logger.info("📂 Copying national and reference tables...") logger.info("-" * 70) for category in ["national", "reference"]: category_dir = self.gold_dir / category if category_dir.exists(): for parquet_file in category_dir.glob("*.parquet"): if not self.dry_run: shutil.copy2(parquet_file, new_gold_dir / parquet_file.name) size_mb = parquet_file.stat().st_size / (1024 * 1024) logger.info(f" ✓ Copied {parquet_file.name} ({size_mb:.2f} MB)") else: logger.info(f" [DRY RUN] Would copy {parquet_file.name}") # Summary logger.info("") logger.info("=" * 70) logger.info("CONSOLIDATION SUMMARY") logger.info("=" * 70) if not self.dry_run: old_file_count = len(list(self.gold_dir.rglob("*.parquet"))) new_file_count = len(list(new_gold_dir.glob("*.parquet"))) logger.info(f"Before: {old_file_count} files") logger.info(f"After: {new_file_count} files") logger.info(f"Reduction: {old_file_count - new_file_count} files ({(1 - new_file_count/old_file_count)*100:.1f}%)") logger.info("") logger.info("Next steps:") logger.info("1. Verify the consolidated files:") logger.info(f" ls -lh {new_gold_dir}/*.parquet") logger.info("") logger.info("2. Replace old gold directory:") logger.info(f" mv {self.gold_dir} {self.gold_dir.parent}/gold_old") logger.info(f" mv {new_gold_dir} {self.gold_dir}") logger.info("") logger.info("3. Update API code to remove state-specific paths:") logger.info(" api/main.py, api/routes/search.py, etc.") else: logger.info("[DRY RUN] No files were modified") logger.info(f"Would create {len(consolidated_files)} consolidated state files") logger.info("Plus national and reference files") def main(): import argparse parser = argparse.ArgumentParser(description="Rebuild gold tables in consolidated format") parser.add_argument("--dry-run", action="store_true", help="Show what would be done without making changes") args = parser.parse_args() rebuilder = GoldRebuilder(dry_run=args.dry_run) # Backup first rebuilder.backup_current_gold() # Rebuild rebuilder.rebuild_consolidated_structure() if __name__ == "__main__": main()