Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """ | |
| Migration script to load data from Gold parquet files into Neon Postgres | |
| Optimized for HuggingFace deployment - loads aggregate and search data only | |
| """ | |
| import os | |
| import sys | |
| from pathlib import Path | |
| from datetime import datetime | |
| from typing import Optional | |
| import psycopg2 | |
| from psycopg2.extras import execute_values | |
| import pandas as pd | |
| from dotenv import load_dotenv | |
| from loguru import logger | |
| # Load environment variables | |
| load_dotenv() | |
| # Database connection - prioritize dev over production | |
| NEON_DATABASE_URL_DEV = os.getenv('NEON_DATABASE_URL_DEV') | |
| NEON_DATABASE_URL = os.getenv('NEON_DATABASE_URL') | |
| DATABASE_URL = NEON_DATABASE_URL_DEV or NEON_DATABASE_URL | |
| if not DATABASE_URL: | |
| raise ValueError("Neither NEON_DATABASE_URL_DEV nor NEON_DATABASE_URL set in environment") | |
| logger.info(f"Using: {'DEV' if NEON_DATABASE_URL_DEV else 'PROD'} database") | |
| # Paths | |
| GOLD_DIR = Path("data/gold") | |
| def parse_yyyymm_date(yyyymm): | |
| """Convert YYYYMM format (e.g., '195504') to date object""" | |
| if pd.isna(yyyymm) or not yyyymm: | |
| return None | |
| try: | |
| yyyymm_str = str(int(yyyymm)) # Convert to string, remove decimals | |
| if len(yyyymm_str) == 6: | |
| year = int(yyyymm_str[:4]) | |
| month = int(yyyymm_str[4:6]) | |
| return datetime(year, month, 1).date() | |
| except (ValueError, TypeError): | |
| pass | |
| return None | |
| def clean_numeric(value): | |
| """Convert pandas NaN/None to None, keep valid numbers""" | |
| if pd.isna(value) or value is None: | |
| return None | |
| try: | |
| # Convert to float first, then check if it's a valid number | |
| num = float(value) | |
| if pd.isna(num): | |
| return None | |
| return num | |
| except (ValueError, TypeError): | |
| return None | |
| def get_db_connection(): | |
| """Get database connection""" | |
| return psycopg2.connect(DATABASE_URL) | |
| def execute_schema(conn): | |
| """Execute schema.sql to create tables""" | |
| schema_path = Path("neon/schema.sql") | |
| if not schema_path.exists(): | |
| logger.error(f"Schema file not found: {schema_path}") | |
| return False | |
| logger.info("π Creating database schema...") | |
| with open(schema_path, 'r') as f: | |
| schema_sql = f.read() | |
| try: | |
| with conn.cursor() as cur: | |
| cur.execute(schema_sql) | |
| conn.commit() | |
| logger.success("β Schema created successfully") | |
| return True | |
| except Exception as e: | |
| logger.error(f"β Schema creation failed: {e}") | |
| conn.rollback() | |
| return False | |
| def load_stats_aggregates(conn): | |
| """ | |
| Load pre-computed statistics aggregates | |
| This is the most critical table for fast dashboard loading | |
| """ | |
| logger.info("π Loading statistics aggregates...") | |
| try: | |
| cursor = conn.cursor() | |
| # Calculate national stats | |
| national_stats = calculate_national_stats() | |
| insert_stat(cursor, **national_stats) | |
| # Calculate state-level stats for each state with data | |
| states_dir = GOLD_DIR / "states" | |
| if states_dir.exists(): | |
| for state_dir in states_dir.iterdir(): | |
| if state_dir.is_dir(): | |
| state = state_dir.name | |
| logger.info(f" Processing state: {state}") | |
| state_stats = calculate_state_stats(state) | |
| if state_stats: | |
| insert_stat(cursor, **state_stats) | |
| conn.commit() | |
| # Get count | |
| cursor.execute("SELECT COUNT(*) FROM stats_aggregates") | |
| count = cursor.fetchone()[0] | |
| logger.success(f"β Loaded {count} statistics aggregates") | |
| record_sync(conn, 'stats_aggregates', count) | |
| return True | |
| except Exception as e: | |
| logger.error(f"β Failed to load stats aggregates: {e}") | |
| conn.rollback() | |
| return False | |
| def calculate_national_stats(): | |
| """Calculate national-level statistics""" | |
| stats = { | |
| 'level': 'national', | |
| 'state': None, | |
| 'county': None, | |
| 'city': None, | |
| 'jurisdictions_count': 0, | |
| 'school_districts_count': 0, | |
| 'nonprofits_count': 0, | |
| 'events_count': 0, | |
| 'bills_count': 0, | |
| 'contacts_count': 0, | |
| 'total_revenue': 0, | |
| 'total_assets': 0, | |
| } | |
| # Count jurisdictions | |
| for pattern in ['jurisdictions_cities.parquet', 'jurisdictions_counties.parquet', | |
| 'jurisdictions_townships.parquet']: | |
| file_path = GOLD_DIR / 'reference' / pattern | |
| if file_path.exists(): | |
| df = pd.read_parquet(file_path) | |
| stats['jurisdictions_count'] += len(df) | |
| # Count school districts | |
| sd_file = GOLD_DIR / 'reference' / 'jurisdictions_school_districts.parquet' | |
| if sd_file.exists(): | |
| df = pd.read_parquet(sd_file) | |
| stats['school_districts_count'] = len(df) | |
| # Count nonprofits and sum financials | |
| states_dir = GOLD_DIR / "states" | |
| if states_dir.exists(): | |
| for state_dir in states_dir.iterdir(): | |
| if state_dir.is_dir(): | |
| np_file = state_dir / "nonprofits_organizations.parquet" | |
| if np_file.exists(): | |
| df = pd.read_parquet(np_file) | |
| stats['nonprofits_count'] += len(df) | |
| # Sum revenue/assets if available | |
| if 'REVENUE' in df.columns: | |
| stats['total_revenue'] += df['REVENUE'].fillna(0).sum() | |
| if 'ASSETS' in df.columns: | |
| stats['total_assets'] += df['ASSETS'].fillna(0).sum() | |
| # Count events | |
| events_file = state_dir / "events.parquet" | |
| if events_file.exists(): | |
| df = pd.read_parquet(events_file) | |
| stats['events_count'] += len(df) | |
| # Count contacts | |
| contacts_file = state_dir / "contacts_nonprofit_officers.parquet" | |
| if contacts_file.exists(): | |
| df = pd.read_parquet(contacts_file) | |
| stats['contacts_count'] += len(df) | |
| return stats | |
| def calculate_state_stats(state: str): | |
| """Calculate state-level statistics""" | |
| stats = { | |
| 'level': 'state', | |
| 'state': state, | |
| 'county': None, | |
| 'city': None, | |
| 'jurisdictions_count': 0, | |
| 'school_districts_count': 0, | |
| 'nonprofits_count': 0, | |
| 'events_count': 0, | |
| 'bills_count': 0, | |
| 'contacts_count': 0, | |
| 'total_revenue': 0, | |
| 'total_assets': 0, | |
| } | |
| # Count jurisdictions in this state | |
| for pattern in ['jurisdictions_cities.parquet', 'jurisdictions_counties.parquet', | |
| 'jurisdictions_townships.parquet']: | |
| file_path = GOLD_DIR / 'reference' / pattern | |
| if file_path.exists(): | |
| df = pd.read_parquet(file_path) | |
| state_col = 'state' if 'state' in df.columns else 'STATE' | |
| if state_col in df.columns: | |
| state_df = df[df[state_col].str.upper() == state.upper()] | |
| stats['jurisdictions_count'] += len(state_df) | |
| # Count school districts | |
| sd_file = GOLD_DIR / 'reference' / 'jurisdictions_school_districts.parquet' | |
| if sd_file.exists(): | |
| df = pd.read_parquet(sd_file) | |
| state_col = 'state' if 'state' in df.columns else 'STATE' | |
| if state_col in df.columns: | |
| state_df = df[df[state_col].str.upper() == state.upper()] | |
| stats['school_districts_count'] = len(state_df) | |
| # State-specific data | |
| state_dir = GOLD_DIR / "states" / state | |
| # Nonprofits | |
| np_file = state_dir / "nonprofits_organizations.parquet" | |
| if np_file.exists(): | |
| df = pd.read_parquet(np_file) | |
| stats['nonprofits_count'] = len(df) | |
| if 'REVENUE' in df.columns: | |
| stats['total_revenue'] = int(df['REVENUE'].fillna(0).sum()) | |
| if 'ASSETS' in df.columns: | |
| stats['total_assets'] = int(df['ASSETS'].fillna(0).sum()) | |
| # Events | |
| events_file = state_dir / "events.parquet" | |
| if events_file.exists(): | |
| df = pd.read_parquet(events_file) | |
| stats['events_count'] = len(df) | |
| # Contacts | |
| contacts_file = state_dir / "contacts_nonprofit_officers.parquet" | |
| if contacts_file.exists(): | |
| df = pd.read_parquet(contacts_file) | |
| stats['contacts_count'] = len(df) | |
| return stats if stats['nonprofits_count'] > 0 else None | |
| def insert_stat(cursor, level, state, county, city, **metrics): | |
| """Insert statistics record""" | |
| cursor.execute(""" | |
| INSERT INTO stats_aggregates | |
| (level, state, county, city, jurisdictions_count, school_districts_count, | |
| nonprofits_count, events_count, bills_count, contacts_count, | |
| total_revenue, total_assets, last_updated) | |
| VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) | |
| ON CONFLICT (level, state, county, city) | |
| DO UPDATE SET | |
| jurisdictions_count = EXCLUDED.jurisdictions_count, | |
| school_districts_count = EXCLUDED.school_districts_count, | |
| nonprofits_count = EXCLUDED.nonprofits_count, | |
| events_count = EXCLUDED.events_count, | |
| bills_count = EXCLUDED.bills_count, | |
| contacts_count = EXCLUDED.contacts_count, | |
| total_revenue = EXCLUDED.total_revenue, | |
| total_assets = EXCLUDED.total_assets, | |
| last_updated = EXCLUDED.last_updated | |
| """, ( | |
| level, state, county, city, | |
| metrics.get('jurisdictions_count', 0), | |
| metrics.get('school_districts_count', 0), | |
| metrics.get('nonprofits_count', 0), | |
| metrics.get('events_count', 0), | |
| metrics.get('bills_count', 0), | |
| metrics.get('contacts_count', 0), | |
| metrics.get('total_revenue', 0), | |
| metrics.get('total_assets', 0), | |
| datetime.now() | |
| )) | |
| def load_nonprofits_search(conn, limit_states: Optional[list] = None): | |
| """ | |
| Load nonprofits into search table | |
| Args: | |
| limit_states: List of state codes to load (e.g., ['MA', 'CA']) or None for all | |
| """ | |
| logger.info("π’ Loading nonprofits search data...") | |
| states_to_load = limit_states or [] | |
| # If no limit, scan all states | |
| if not limit_states: | |
| states_dir = GOLD_DIR / "states" | |
| if states_dir.exists(): | |
| states_to_load = [d.name for d in states_dir.iterdir() if d.is_dir()] | |
| total_loaded = 0 | |
| cursor = conn.cursor() | |
| for state in states_to_load: | |
| np_file = GOLD_DIR / "states" / state / "nonprofits_organizations.parquet" | |
| if not np_file.exists(): | |
| logger.warning(f" No nonprofits file for {state}") | |
| continue | |
| logger.info(f" Loading nonprofits from {state}...") | |
| df = pd.read_parquet(np_file) | |
| # Prepare data for insertion (use lowercase column names) | |
| # Filter out rows with null EIN | |
| df_valid = df[df['ein'].notna()].copy() | |
| records = [] | |
| for _, row in df_valid.iterrows(): | |
| # Convert ruling date from YYYYMM to proper date | |
| ruling_date = parse_yyyymm_date(row.get('ruling')) | |
| record = ( | |
| row.get('ein'), | |
| row.get('name', ''), | |
| row.get('street', ''), | |
| row.get('city', ''), | |
| state, # Use the state variable directly | |
| row.get('zip', ''), | |
| '', # county - not in source data | |
| row.get('ntee_cd', ''), | |
| None, # ntee_description - join later | |
| row.get('subsection', ''), | |
| row.get('affiliation', ''), | |
| row.get('classification', ''), | |
| clean_numeric(row.get('form_990_total_revenue')), # Clean numeric fields | |
| clean_numeric(row.get('form_990_total_assets')), | |
| clean_numeric(row.get('income_amt')), | |
| ruling_date, # Converted ruling date | |
| row.get('foundation', ''), | |
| row.get('pf_filing_req_cd', ''), | |
| clean_numeric(row.get('acct_pd')), | |
| row.get('asset_cd', ''), | |
| row.get('income_cd', ''), | |
| row.get('filing_req_cd', ''), | |
| row.get('status', ''), # Use 'status' for exempt_org_status_cd | |
| clean_numeric(row.get('tax_period')), | |
| clean_numeric(row.get('asset_amt')), | |
| clean_numeric(row.get('income_amt')), | |
| clean_numeric(row.get('revenue_amt')), # Use revenue_amt | |
| 'irs_bmf', | |
| datetime.now() | |
| ) | |
| records.append(record) | |
| # Batch insert | |
| if records: | |
| execute_values(cursor, """ | |
| INSERT INTO nonprofits_search | |
| (ein, name, street_address, city, state, zip_code, county, | |
| ntee_code, ntee_description, subsection_code, affiliation_code, classification_code, | |
| revenue, assets, income, ruling_date, foundation_code, pf_filing_requirement_code, | |
| accounting_period, asset_code, income_code, filing_requirement_code, | |
| exempt_organization_status_code, tax_period, asset_amount, income_amount, | |
| form_990_revenue_amount, source, last_updated) | |
| VALUES %s | |
| ON CONFLICT (ein) DO UPDATE SET | |
| name = EXCLUDED.name, | |
| city = EXCLUDED.city, | |
| state = EXCLUDED.state, | |
| revenue = EXCLUDED.revenue, | |
| assets = EXCLUDED.assets, | |
| last_updated = EXCLUDED.last_updated | |
| """, records) | |
| total_loaded += len(records) | |
| logger.info(f" Loaded {len(records)} nonprofits from {state}") | |
| conn.commit() | |
| logger.success(f"β Loaded {total_loaded} nonprofits into search table") | |
| record_sync(conn, 'nonprofits_search', total_loaded) | |
| return True | |
| def load_reference_data(conn): | |
| """Load reference tables (causes, NTEE codes)""" | |
| logger.info("π Loading reference data...") | |
| cursor = conn.cursor() | |
| total = 0 | |
| # Load NTEE codes | |
| ntee_file = GOLD_DIR / "reference" / "causes_ntee_codes.parquet" | |
| if ntee_file.exists(): | |
| df = pd.read_parquet(ntee_file) | |
| # Use actual column names: ntee_code, description, parent_code | |
| records = [(row['ntee_code'], row.get('description', ''), None, None, 'irs', datetime.now()) | |
| for _, row in df.iterrows()] | |
| execute_values(cursor, """ | |
| INSERT INTO reference_ntee_codes (code, description, category, subcategory, source, last_updated) | |
| VALUES %s | |
| ON CONFLICT (code) DO UPDATE SET description = EXCLUDED.description | |
| """, records) | |
| total += len(records) | |
| logger.info(f" Loaded {len(records)} NTEE codes") | |
| # Load causes | |
| causes_file = GOLD_DIR / "reference" / "causes_everyorg_causes.parquet" | |
| if causes_file.exists(): | |
| df = pd.read_parquet(causes_file) | |
| # Use actual column names: cause_id, cause_name, description | |
| records = [(row['cause_id'], row['cause_name'], row.get('description'), None, 'everyorg', datetime.now()) | |
| for _, row in df.iterrows()] | |
| execute_values(cursor, """ | |
| INSERT INTO reference_causes (cause_slug, cause_name, description, parent_category, source, last_updated) | |
| VALUES %s | |
| ON CONFLICT (cause_slug) DO UPDATE SET cause_name = EXCLUDED.cause_name | |
| """, records) | |
| total += len(records) | |
| logger.info(f" Loaded {len(records)} causes") | |
| conn.commit() | |
| logger.success(f"β Loaded {total} reference records") | |
| return True | |
| def load_jurisdictions_search(conn): | |
| """Load jurisdictions (cities, counties, townships, school districts)""" | |
| logger.info("ποΈ Loading jurisdictions search data...") | |
| cursor = conn.cursor() | |
| total_loaded = 0 | |
| # Load cities | |
| cities_file = GOLD_DIR / "reference" / "jurisdictions_cities.parquet" | |
| if cities_file.exists(): | |
| df = pd.read_parquet(cities_file) | |
| records = [ | |
| (row.get('NAME', ''), 'city', row.get('USPS', ''), None, # name, type, state, county | |
| row.get('GEOID'), None, # geoid, fips_code | |
| None, clean_numeric(row.get('ALAND_SQMI')), # population, area_sq_miles | |
| 'census', datetime.now()) | |
| for _, row in df.iterrows() | |
| ] | |
| execute_values(cursor, """ | |
| INSERT INTO jurisdictions_search | |
| (name, type, state, county, geoid, fips_code, population, area_sq_miles, source, last_updated) | |
| VALUES %s | |
| ON CONFLICT (name, type, state, county) DO UPDATE SET | |
| geoid = EXCLUDED.geoid, | |
| area_sq_miles = EXCLUDED.area_sq_miles | |
| """, records) | |
| total_loaded += len(records) | |
| logger.info(f" Loaded {len(records):,} cities") | |
| # Load counties | |
| counties_file = GOLD_DIR / "reference" / "jurisdictions_counties.parquet" | |
| if counties_file.exists(): | |
| df = pd.read_parquet(counties_file) | |
| records = [ | |
| (row.get('NAME', ''), 'county', row.get('USPS', ''), None, | |
| row.get('GEOID'), None, | |
| None, clean_numeric(row.get('ALAND_SQMI')), | |
| 'census', datetime.now()) | |
| for _, row in df.iterrows() | |
| ] | |
| execute_values(cursor, """ | |
| INSERT INTO jurisdictions_search | |
| (name, type, state, county, geoid, fips_code, population, area_sq_miles, source, last_updated) | |
| VALUES %s | |
| ON CONFLICT (name, type, state, county) DO UPDATE SET | |
| geoid = EXCLUDED.geoid, | |
| area_sq_miles = EXCLUDED.area_sq_miles | |
| """, records) | |
| total_loaded += len(records) | |
| logger.info(f" Loaded {len(records):,} counties") | |
| # Load townships | |
| townships_file = GOLD_DIR / "reference" / "jurisdictions_townships.parquet" | |
| if townships_file.exists(): | |
| df = pd.read_parquet(townships_file) | |
| records = [ | |
| (row.get('NAME', ''), 'township', row.get('USPS', ''), None, | |
| row.get('GEOID'), None, | |
| None, clean_numeric(row.get('ALAND_SQMI')), | |
| 'census', datetime.now()) | |
| for _, row in df.iterrows() | |
| ] | |
| execute_values(cursor, """ | |
| INSERT INTO jurisdictions_search | |
| (name, type, state, county, geoid, fips_code, population, area_sq_miles, source, last_updated) | |
| VALUES %s | |
| ON CONFLICT (name, type, state, county) DO UPDATE SET | |
| geoid = EXCLUDED.geoid, | |
| area_sq_miles = EXCLUDED.area_sq_miles | |
| """, records) | |
| total_loaded += len(records) | |
| logger.info(f" Loaded {len(records):,} townships") | |
| # Load school districts | |
| districts_file = GOLD_DIR / "reference" / "jurisdictions_school_districts.parquet" | |
| if districts_file.exists(): | |
| df = pd.read_parquet(districts_file) | |
| records = [ | |
| (row.get('NAME', ''), 'school_district', row.get('STATE', ''), None, | |
| row.get('GEOID'), None, | |
| None, clean_numeric(row.get('ALAND_SQMI')), | |
| 'census', datetime.now()) | |
| for _, row in df.iterrows() | |
| ] | |
| execute_values(cursor, """ | |
| INSERT INTO jurisdictions_search | |
| (name, type, state, county, geoid, fips_code, population, area_sq_miles, source, last_updated) | |
| VALUES %s | |
| ON CONFLICT (name, type, state, county) DO UPDATE SET | |
| geoid = EXCLUDED.geoid, | |
| area_sq_miles = EXCLUDED.area_sq_miles | |
| """, records) | |
| total_loaded += len(records) | |
| logger.info(f" Loaded {len(records):,} school districts") | |
| conn.commit() | |
| logger.success(f"β Loaded {total_loaded:,} jurisdictions into search table") | |
| record_sync(conn, 'jurisdictions_search', total_loaded) | |
| return True | |
| def load_events_search(conn, limit_states=None): | |
| """Load events from states""" | |
| logger.info("π Loading events search data...") | |
| states_to_load = limit_states or [] | |
| # If no limit, scan all states | |
| if not limit_states: | |
| states_dir = GOLD_DIR / "states" | |
| if states_dir.exists(): | |
| states_to_load = [d.name for d in states_dir.iterdir() if d.is_dir()] | |
| total_loaded = 0 | |
| cursor = conn.cursor() | |
| for state in states_to_load: | |
| events_file = GOLD_DIR / "states" / state / "events.parquet" | |
| if not events_file.exists(): | |
| continue | |
| logger.info(f" Loading events from {state}...") | |
| df = pd.read_parquet(events_file) | |
| records = [] | |
| for _, row in df.iterrows(): | |
| # Parse start_date to extract date and time | |
| start_date = row.get('start_date') | |
| event_date = None | |
| event_time = None | |
| if start_date: | |
| try: | |
| if isinstance(start_date, str): | |
| from dateutil import parser | |
| dt = parser.parse(start_date) | |
| event_date = dt.date() | |
| event_time = dt.time() | |
| elif hasattr(start_date, 'date'): | |
| event_date = start_date.date() | |
| event_time = start_date.time() | |
| except: | |
| pass | |
| record = ( | |
| row.get('event_name', ''), | |
| row.get('description', ''), | |
| event_date, | |
| event_time, | |
| row.get('jurisdiction_name', ''), | |
| None, # jurisdiction_type | |
| state, | |
| None, # city | |
| row.get('location_id'), # location | |
| row.get('classification', ''), # meeting_type | |
| row.get('status', ''), | |
| None, # agenda_url | |
| None, # minutes_url | |
| None, # video_url | |
| 'openstates', | |
| datetime.now() | |
| ) | |
| records.append(record) | |
| if records: | |
| execute_values(cursor, """ | |
| INSERT INTO events_search | |
| (title, description, event_date, event_time, jurisdiction_name, jurisdiction_type, | |
| state, city, location, meeting_type, status, agenda_url, minutes_url, video_url, | |
| source, last_updated) | |
| VALUES %s | |
| """, records) | |
| total_loaded += len(records) | |
| logger.info(f" Loaded {len(records):,} events from {state}") | |
| conn.commit() | |
| logger.success(f"β Loaded {total_loaded:,} events into search table") | |
| record_sync(conn, 'events_search', total_loaded) | |
| return True | |
| def load_contacts_search(conn, limit_states=None): | |
| """Load contacts (officials, nonprofit officers) from states""" | |
| logger.info("π₯ Loading contacts search data...") | |
| states_to_load = limit_states or [] | |
| # If no limit, scan all states | |
| if not limit_states: | |
| states_dir = GOLD_DIR / "states" | |
| if states_dir.exists(): | |
| states_to_load = [d.name for d in states_dir.iterdir() if d.is_dir()] | |
| total_loaded = 0 | |
| cursor = conn.cursor() | |
| for state in states_to_load: | |
| # Load local officials | |
| officials_file = GOLD_DIR / "states" / state / "contacts_local_officials.parquet" | |
| if officials_file.exists(): | |
| df = pd.read_parquet(officials_file) | |
| records = [] | |
| for _, row in df.iterrows(): | |
| record = ( | |
| row.get('name', ''), | |
| row.get('title', ''), | |
| row.get('jurisdiction', ''), # organization_name | |
| None, # organization_ein | |
| None, # email | |
| None, # phone | |
| None, # street_address | |
| None, # city | |
| state, | |
| None, # zip_code | |
| 'government_official', # role_type | |
| None, # compensation | |
| None, # hours_per_week | |
| 'meeting_transcript', | |
| None, # tax_year | |
| datetime.now() | |
| ) | |
| records.append(record) | |
| if records: | |
| execute_values(cursor, """ | |
| INSERT INTO contacts_search | |
| (name, title, organization_name, organization_ein, email, phone, | |
| street_address, city, state, zip_code, role_type, compensation, | |
| hours_per_week, source, tax_year, last_updated) | |
| VALUES %s | |
| """, records) | |
| total_loaded += len(records) | |
| logger.info(f" Loaded {len(records):,} officials from {state}") | |
| # Load nonprofit officers (if exists) | |
| officers_file = GOLD_DIR / "states" / state / "contacts_nonprofit_officers.parquet" | |
| if officers_file.exists(): | |
| df = pd.read_parquet(officers_file) | |
| records = [] | |
| for _, row in df.iterrows(): | |
| record = ( | |
| row.get('name', ''), | |
| row.get('title', ''), | |
| row.get('organization_name', ''), | |
| row.get('ein', ''), # organization_ein | |
| None, # email | |
| None, # phone | |
| None, # street_address | |
| None, # city | |
| state, | |
| None, # zip_code | |
| 'nonprofit_officer', # role_type | |
| clean_numeric(row.get('compensation')), | |
| clean_numeric(row.get('hours_per_week')), | |
| 'irs_form990', | |
| row.get('tax_year'), | |
| datetime.now() | |
| ) | |
| records.append(record) | |
| if records: | |
| execute_values(cursor, """ | |
| INSERT INTO contacts_search | |
| (name, title, organization_name, organization_ein, email, phone, | |
| street_address, city, state, zip_code, role_type, compensation, | |
| hours_per_week, source, tax_year, last_updated) | |
| VALUES %s | |
| """, records) | |
| total_loaded += len(records) | |
| logger.info(f" Loaded {len(records):,} nonprofit officers from {state}") | |
| conn.commit() | |
| logger.success(f"β Loaded {total_loaded:,} contacts into search table") | |
| record_sync(conn, 'contacts_search', total_loaded) | |
| return True | |
| def record_sync(conn, table_name: str, records_synced: int, status: str = 'success', error: Optional[str] = None): | |
| """Record sync status""" | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| INSERT INTO last_sync (table_name, last_sync_time, records_synced, sync_status, error_message) | |
| VALUES (%s, %s, %s, %s, %s) | |
| ON CONFLICT (table_name) DO UPDATE SET | |
| last_sync_time = EXCLUDED.last_sync_time, | |
| records_synced = EXCLUDED.records_synced, | |
| sync_status = EXCLUDED.sync_status, | |
| error_message = EXCLUDED.error_message | |
| """, (table_name, datetime.now(), records_synced, status, error)) | |
| conn.commit() | |
| def main(): | |
| """Main migration function""" | |
| logger.info("π Starting Neon migration...") | |
| logger.info(f"π Gold directory: {GOLD_DIR.absolute()}") | |
| try: | |
| conn = get_db_connection() | |
| logger.success("β Connected to Neon database") | |
| # Step 1: Create schema | |
| if not execute_schema(conn): | |
| return 1 | |
| # Step 2: Load aggregates (critical for dashboard) | |
| if not load_stats_aggregates(conn): | |
| return 1 | |
| # Step 3: Load reference data | |
| if not load_reference_data(conn): | |
| return 1 | |
| # Step 4: Load nonprofit search data (start with MA as example) | |
| logger.info("β οΈ Loading only MA nonprofits (full load would be 3M+ records)") | |
| logger.info(" To load all states, modify limit_states parameter") | |
| if not load_nonprofits_search(conn, limit_states=['MA']): | |
| return 1 | |
| # Step 5: Load jurisdictions (all jurisdictions - reference data) | |
| if not load_jurisdictions_search(conn): | |
| return 1 | |
| # Step 6: Load events (MA only, same as nonprofits) | |
| if not load_events_search(conn, limit_states=['MA']): | |
| return 1 | |
| # Step 7: Load contacts (MA only, same as nonprofits) | |
| if not load_contacts_search(conn, limit_states=['MA']): | |
| return 1 | |
| # Show summary | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT table_name, records_synced, last_sync_time FROM last_sync ORDER BY table_name") | |
| logger.info("\nπ Migration Summary:") | |
| logger.info("=" * 60) | |
| for row in cursor.fetchall(): | |
| logger.info(f" {row[0]:<30} {row[1]:>10,} records ({row[2]})") | |
| logger.info("=" * 60) | |
| conn.close() | |
| logger.success("\nπ Migration completed successfully!") | |
| logger.info("\nπ‘ Next steps:") | |
| logger.info(" 1. Test queries: SELECT * FROM stats_aggregates LIMIT 5;") | |
| logger.info(" 2. Update API routes to use Neon") | |
| logger.info(" 3. Add NEON_DATABASE_URL to HuggingFace Secrets") | |
| return 0 | |
| except Exception as e: | |
| logger.error(f"\nβ Migration failed: {e}") | |
| import traceback | |
| logger.error(traceback.format_exc()) | |
| return 1 | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |