Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """ | |
| Rebuild Gold Tables - Consolidated Structure | |
| This script clears the current gold directory and rebuilds it with a minimal, | |
| consolidated file structure for HuggingFace deployment. | |
| BEFORE (86 files): | |
| - data/gold/states/AL/bills_bills.parquet | |
| - data/gold/states/GA/bills_bills.parquet | |
| - data/gold/states/MA/bills_bills.parquet | |
| - ...repeated for 5 states across 15 table types | |
| AFTER (~15-20 files): | |
| - data/gold/bills.parquet (all states combined with `state` column) | |
| - data/gold/nonprofits_organizations.parquet (all states combined) | |
| - data/gold/events.parquet (all states combined) | |
| - ...etc | |
| Benefits: | |
| - Reduces file count by 75% | |
| - Easier to query (no need to union across states) | |
| - Better for HuggingFace size limits (can partition large files if needed) | |
| - Simpler API code (read once vs read from 5 state directories) | |
| Usage: | |
| python scripts/data/rebuild_consolidated_gold.py | |
| python scripts/data/rebuild_consolidated_gold.py --dry-run | |
| """ | |
| import pandas as pd | |
| from pathlib import Path | |
| from typing import List, Dict | |
| from loguru import logger | |
| import shutil | |
| import sys | |
| # Add project root to path | |
| sys.path.insert(0, str(Path(__file__).parent.parent.parent)) | |
| class GoldRebuilder: | |
| """Rebuild gold tables in consolidated format""" | |
| def __init__(self, gold_dir: str = "data/gold", dry_run: bool = False): | |
| self.gold_dir = Path(gold_dir) | |
| self.dry_run = dry_run | |
| # Backup directory | |
| from datetime import datetime | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| self.backup_dir = self.gold_dir.parent / f"gold_backup_{timestamp}" | |
| logger.info(f"Gold directory: {self.gold_dir}") | |
| logger.info(f"Backup directory: {self.backup_dir}") | |
| logger.info(f"Dry run: {dry_run}") | |
| def backup_current_gold(self): | |
| """Backup current gold directory before rebuilding""" | |
| if self.gold_dir.exists(): | |
| logger.info(f"Backing up {self.gold_dir} to {self.backup_dir}...") | |
| if not self.dry_run: | |
| shutil.copytree(self.gold_dir, self.backup_dir) | |
| logger.success(f"β Backup created at {self.backup_dir}") | |
| else: | |
| logger.info(f"[DRY RUN] Would backup to {self.backup_dir}") | |
| def consolidate_table_type(self, table_name: str, states: List[str]) -> pd.DataFrame: | |
| """ | |
| Consolidate a single table type across multiple states | |
| Args: | |
| table_name: Name like "bills_bills", "nonprofits_organizations", etc | |
| states: List of state codes like ["AL", "GA", "MA", "WA", "WI"] | |
| Returns: | |
| Combined DataFrame with `state` column added | |
| """ | |
| logger.info(f"Consolidating {table_name} across {len(states)} states...") | |
| dfs = [] | |
| for state in states: | |
| state_file = self.gold_dir / "states" / state / f"{table_name}.parquet" | |
| if state_file.exists(): | |
| df = pd.read_parquet(state_file) | |
| # Add state column if it doesn't exist | |
| if 'state' not in df.columns: | |
| df['state'] = state | |
| dfs.append(df) | |
| logger.debug(f" β {state}: {len(df):,} rows") | |
| else: | |
| logger.warning(f" β {state}: File not found at {state_file}") | |
| if not dfs: | |
| logger.warning(f"No data found for {table_name}") | |
| return pd.DataFrame() | |
| combined = pd.concat(dfs, ignore_index=True) | |
| logger.success(f"β Combined {len(combined):,} total rows") | |
| return combined | |
| def rebuild_consolidated_structure(self): | |
| """Rebuild gold directory with consolidated files""" | |
| logger.info("=" * 70) | |
| logger.info("REBUILDING GOLD TABLES - CONSOLIDATED STRUCTURE") | |
| logger.info("=" * 70) | |
| # Detect which states we have | |
| states_dir = self.gold_dir / "states" | |
| if not states_dir.exists(): | |
| logger.error(f"No states directory found at {states_dir}") | |
| return | |
| states = sorted([d.name for d in states_dir.iterdir() if d.is_dir()]) | |
| logger.info(f"Found states: {', '.join(states)}") | |
| # Detect all table types from first state | |
| sample_state = states[0] if states else None | |
| if not sample_state: | |
| logger.error("No states found") | |
| return | |
| sample_dir = states_dir / sample_state | |
| table_files = sorted([f.stem for f in sample_dir.glob("*.parquet")]) | |
| logger.info(f"Found table types: {', '.join(table_files)}") | |
| # Create new gold directory | |
| new_gold_dir = self.gold_dir.parent / "gold_consolidated" | |
| if not self.dry_run: | |
| new_gold_dir.mkdir(exist_ok=True) | |
| logger.info(f"Created {new_gold_dir}") | |
| # Consolidate each table type | |
| consolidated_files = {} | |
| for table_name in table_files: | |
| logger.info("") | |
| logger.info(f"π Processing: {table_name}") | |
| logger.info("-" * 70) | |
| combined_df = self.consolidate_table_type(table_name, states) | |
| if not combined_df.empty: | |
| output_file = new_gold_dir / f"{table_name}.parquet" | |
| if not self.dry_run: | |
| combined_df.to_parquet(output_file, index=False) | |
| size_mb = output_file.stat().st_size / (1024 * 1024) | |
| logger.success(f"β Saved {output_file.name}: {len(combined_df):,} rows, {size_mb:.2f} MB") | |
| # Check if file is too large for HuggingFace (>500MB recommended limit) | |
| if size_mb > 500: | |
| logger.warning(f"β οΈ {table_name} is large ({size_mb:.2f} MB) - consider partitioning") | |
| else: | |
| logger.info(f"[DRY RUN] Would save {output_file.name}: {len(combined_df):,} rows") | |
| consolidated_files[table_name] = len(combined_df) | |
| # Copy national and reference tables (these are already consolidated) | |
| logger.info("") | |
| logger.info("π Copying national and reference tables...") | |
| logger.info("-" * 70) | |
| for category in ["national", "reference"]: | |
| category_dir = self.gold_dir / category | |
| if category_dir.exists(): | |
| for parquet_file in category_dir.glob("*.parquet"): | |
| if not self.dry_run: | |
| shutil.copy2(parquet_file, new_gold_dir / parquet_file.name) | |
| size_mb = parquet_file.stat().st_size / (1024 * 1024) | |
| logger.info(f" β Copied {parquet_file.name} ({size_mb:.2f} MB)") | |
| else: | |
| logger.info(f" [DRY RUN] Would copy {parquet_file.name}") | |
| # Summary | |
| logger.info("") | |
| logger.info("=" * 70) | |
| logger.info("CONSOLIDATION SUMMARY") | |
| logger.info("=" * 70) | |
| if not self.dry_run: | |
| old_file_count = len(list(self.gold_dir.rglob("*.parquet"))) | |
| new_file_count = len(list(new_gold_dir.glob("*.parquet"))) | |
| logger.info(f"Before: {old_file_count} files") | |
| logger.info(f"After: {new_file_count} files") | |
| logger.info(f"Reduction: {old_file_count - new_file_count} files ({(1 - new_file_count/old_file_count)*100:.1f}%)") | |
| logger.info("") | |
| logger.info("Next steps:") | |
| logger.info("1. Verify the consolidated files:") | |
| logger.info(f" ls -lh {new_gold_dir}/*.parquet") | |
| logger.info("") | |
| logger.info("2. Replace old gold directory:") | |
| logger.info(f" mv {self.gold_dir} {self.gold_dir.parent}/gold_old") | |
| logger.info(f" mv {new_gold_dir} {self.gold_dir}") | |
| logger.info("") | |
| logger.info("3. Update API code to remove state-specific paths:") | |
| logger.info(" api/main.py, api/routes/search.py, etc.") | |
| else: | |
| logger.info("[DRY RUN] No files were modified") | |
| logger.info(f"Would create {len(consolidated_files)} consolidated state files") | |
| logger.info("Plus national and reference files") | |
| def main(): | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Rebuild gold tables in consolidated format") | |
| parser.add_argument("--dry-run", action="store_true", help="Show what would be done without making changes") | |
| args = parser.parse_args() | |
| rebuilder = GoldRebuilder(dry_run=args.dry_run) | |
| # Backup first | |
| rebuilder.backup_current_gold() | |
| # Rebuild | |
| rebuilder.rebuild_consolidated_structure() | |
| if __name__ == "__main__": | |
| main() | |