open-navigator / scripts /data /rebuild_consolidated_gold.py
jcbowyer's picture
Deploy: Consolidated gold tables, fixed nginx docs routing
896453f verified
"""
Rebuild Gold Tables - Consolidated Structure
This script clears the current gold directory and rebuilds it with a minimal,
consolidated file structure for HuggingFace deployment.
BEFORE (86 files):
- data/gold/states/AL/bills_bills.parquet
- data/gold/states/GA/bills_bills.parquet
- data/gold/states/MA/bills_bills.parquet
- ...repeated for 5 states across 15 table types
AFTER (~15-20 files):
- data/gold/bills.parquet (all states combined with `state` column)
- data/gold/nonprofits_organizations.parquet (all states combined)
- data/gold/events.parquet (all states combined)
- ...etc
Benefits:
- Reduces file count by 75%
- Easier to query (no need to union across states)
- Better for HuggingFace size limits (can partition large files if needed)
- Simpler API code (read once vs read from 5 state directories)
Usage:
python scripts/data/rebuild_consolidated_gold.py
python scripts/data/rebuild_consolidated_gold.py --dry-run
"""
import pandas as pd
from pathlib import Path
from typing import List, Dict
from loguru import logger
import shutil
import sys
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
class GoldRebuilder:
"""Rebuild gold tables in consolidated format"""
def __init__(self, gold_dir: str = "data/gold", dry_run: bool = False):
self.gold_dir = Path(gold_dir)
self.dry_run = dry_run
# Backup directory
from datetime import datetime
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.backup_dir = self.gold_dir.parent / f"gold_backup_{timestamp}"
logger.info(f"Gold directory: {self.gold_dir}")
logger.info(f"Backup directory: {self.backup_dir}")
logger.info(f"Dry run: {dry_run}")
def backup_current_gold(self):
"""Backup current gold directory before rebuilding"""
if self.gold_dir.exists():
logger.info(f"Backing up {self.gold_dir} to {self.backup_dir}...")
if not self.dry_run:
shutil.copytree(self.gold_dir, self.backup_dir)
logger.success(f"βœ… Backup created at {self.backup_dir}")
else:
logger.info(f"[DRY RUN] Would backup to {self.backup_dir}")
def consolidate_table_type(self, table_name: str, states: List[str]) -> pd.DataFrame:
"""
Consolidate a single table type across multiple states
Args:
table_name: Name like "bills_bills", "nonprofits_organizations", etc
states: List of state codes like ["AL", "GA", "MA", "WA", "WI"]
Returns:
Combined DataFrame with `state` column added
"""
logger.info(f"Consolidating {table_name} across {len(states)} states...")
dfs = []
for state in states:
state_file = self.gold_dir / "states" / state / f"{table_name}.parquet"
if state_file.exists():
df = pd.read_parquet(state_file)
# Add state column if it doesn't exist
if 'state' not in df.columns:
df['state'] = state
dfs.append(df)
logger.debug(f" βœ“ {state}: {len(df):,} rows")
else:
logger.warning(f" βœ— {state}: File not found at {state_file}")
if not dfs:
logger.warning(f"No data found for {table_name}")
return pd.DataFrame()
combined = pd.concat(dfs, ignore_index=True)
logger.success(f"βœ… Combined {len(combined):,} total rows")
return combined
def rebuild_consolidated_structure(self):
"""Rebuild gold directory with consolidated files"""
logger.info("=" * 70)
logger.info("REBUILDING GOLD TABLES - CONSOLIDATED STRUCTURE")
logger.info("=" * 70)
# Detect which states we have
states_dir = self.gold_dir / "states"
if not states_dir.exists():
logger.error(f"No states directory found at {states_dir}")
return
states = sorted([d.name for d in states_dir.iterdir() if d.is_dir()])
logger.info(f"Found states: {', '.join(states)}")
# Detect all table types from first state
sample_state = states[0] if states else None
if not sample_state:
logger.error("No states found")
return
sample_dir = states_dir / sample_state
table_files = sorted([f.stem for f in sample_dir.glob("*.parquet")])
logger.info(f"Found table types: {', '.join(table_files)}")
# Create new gold directory
new_gold_dir = self.gold_dir.parent / "gold_consolidated"
if not self.dry_run:
new_gold_dir.mkdir(exist_ok=True)
logger.info(f"Created {new_gold_dir}")
# Consolidate each table type
consolidated_files = {}
for table_name in table_files:
logger.info("")
logger.info(f"πŸ“Š Processing: {table_name}")
logger.info("-" * 70)
combined_df = self.consolidate_table_type(table_name, states)
if not combined_df.empty:
output_file = new_gold_dir / f"{table_name}.parquet"
if not self.dry_run:
combined_df.to_parquet(output_file, index=False)
size_mb = output_file.stat().st_size / (1024 * 1024)
logger.success(f"βœ… Saved {output_file.name}: {len(combined_df):,} rows, {size_mb:.2f} MB")
# Check if file is too large for HuggingFace (>500MB recommended limit)
if size_mb > 500:
logger.warning(f"⚠️ {table_name} is large ({size_mb:.2f} MB) - consider partitioning")
else:
logger.info(f"[DRY RUN] Would save {output_file.name}: {len(combined_df):,} rows")
consolidated_files[table_name] = len(combined_df)
# Copy national and reference tables (these are already consolidated)
logger.info("")
logger.info("πŸ“‚ Copying national and reference tables...")
logger.info("-" * 70)
for category in ["national", "reference"]:
category_dir = self.gold_dir / category
if category_dir.exists():
for parquet_file in category_dir.glob("*.parquet"):
if not self.dry_run:
shutil.copy2(parquet_file, new_gold_dir / parquet_file.name)
size_mb = parquet_file.stat().st_size / (1024 * 1024)
logger.info(f" βœ“ Copied {parquet_file.name} ({size_mb:.2f} MB)")
else:
logger.info(f" [DRY RUN] Would copy {parquet_file.name}")
# Summary
logger.info("")
logger.info("=" * 70)
logger.info("CONSOLIDATION SUMMARY")
logger.info("=" * 70)
if not self.dry_run:
old_file_count = len(list(self.gold_dir.rglob("*.parquet")))
new_file_count = len(list(new_gold_dir.glob("*.parquet")))
logger.info(f"Before: {old_file_count} files")
logger.info(f"After: {new_file_count} files")
logger.info(f"Reduction: {old_file_count - new_file_count} files ({(1 - new_file_count/old_file_count)*100:.1f}%)")
logger.info("")
logger.info("Next steps:")
logger.info("1. Verify the consolidated files:")
logger.info(f" ls -lh {new_gold_dir}/*.parquet")
logger.info("")
logger.info("2. Replace old gold directory:")
logger.info(f" mv {self.gold_dir} {self.gold_dir.parent}/gold_old")
logger.info(f" mv {new_gold_dir} {self.gold_dir}")
logger.info("")
logger.info("3. Update API code to remove state-specific paths:")
logger.info(" api/main.py, api/routes/search.py, etc.")
else:
logger.info("[DRY RUN] No files were modified")
logger.info(f"Would create {len(consolidated_files)} consolidated state files")
logger.info("Plus national and reference files")
def main():
import argparse
parser = argparse.ArgumentParser(description="Rebuild gold tables in consolidated format")
parser.add_argument("--dry-run", action="store_true", help="Show what would be done without making changes")
args = parser.parse_args()
rebuilder = GoldRebuilder(dry_run=args.dry_run)
# Backup first
rebuilder.backup_current_gold()
# Rebuild
rebuilder.rebuild_consolidated_structure()
if __name__ == "__main__":
main()