open-navigator / scripts /datasources /census /create_zip_county_mapping.py
jcbowyer's picture
Clean HuggingFace deployment without binary files
61d29fc
#!/usr/bin/env python3
"""
Create ZIP code to county mapping table using HUD USPS ZIP Code Crosswalk data
Data source: HUD USPS ZIP Code Crosswalk Files
https://www.huduser.gov/portal/datasets/usps_crosswalk.html
For each ZIP code, we store:
- Primary county (highest residential ratio)
- All counties it spans (for multi-county ZIP codes)
"""
import psycopg2
from psycopg2.extras import execute_values
import pandas as pd
import requests
from io import StringIO
from loguru import logger
import sys
# Database connection
DATABASE_URL = 'postgresql://postgres:password@localhost:5433/open_navigator'
# HUD ZIP-County crosswalk (2024 Q1 - latest available)
# Note: This is a simplified approach. Full implementation would download from HUD.
# For now, we'll create from Census Bureau's ZIP-County relationship file
ZCTA_COUNTY_URL = "https://www2.census.gov/geo/docs/maps-data/data/rel2020/zcta520/tab20_zcta520_county20_natl.txt"
def create_zip_county_table():
"""Create the zip_county_mapping table"""
conn = psycopg2.connect(DATABASE_URL)
cur = conn.cursor()
logger.info("📋 Creating zip_county_mapping table...")
cur.execute("""
DROP TABLE IF EXISTS zip_county_mapping CASCADE;
CREATE TABLE zip_county_mapping (
zip_code VARCHAR(10) NOT NULL,
state_fips VARCHAR(2) NOT NULL,
county_fips VARCHAR(3) NOT NULL,
state_abbr VARCHAR(2),
county_name VARCHAR(255),
residential_ratio NUMERIC(5, 4) DEFAULT 1.0,
is_primary BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT NOW(),
PRIMARY KEY (zip_code, state_fips, county_fips)
);
CREATE INDEX idx_zip_county_zip ON zip_county_mapping(zip_code);
CREATE INDEX idx_zip_county_state ON zip_county_mapping(state_abbr);
CREATE INDEX idx_zip_county_primary ON zip_county_mapping(zip_code, is_primary);
""")
conn.commit()
logger.info("✅ Table created")
conn.close()
def load_census_zcta_county_data():
"""
Load Census Bureau ZCTA to County relationship file
This gives us ZIP Code Tabulation Areas (ZCTA) to county mappings
"""
logger.info(f"📥 Downloading Census ZCTA-County data from {ZCTA_COUNTY_URL}")
try:
response = requests.get(ZCTA_COUNTY_URL, timeout=60)
response.raise_for_status()
# Parse the tab-delimited file
df = pd.read_csv(StringIO(response.text), sep='|', dtype=str)
logger.info(f"📊 Loaded {len(df)} ZCTA-County relationships")
logger.info(f"Columns: {list(df.columns)}")
return df
except Exception as e:
logger.error(f"❌ Failed to download Census data: {e}")
logger.info("💡 Using simplified state-based mapping as fallback...")
return None
def create_simplified_mapping():
"""
Create a simplified ZIP-county mapping using state + county data we already have
This won't be perfect but will cover most cases
"""
conn = psycopg2.connect(DATABASE_URL)
cur = conn.cursor()
logger.info("🔧 Creating simplified ZIP-county mapping from existing data...")
# Get all unique ZIP codes from nonprofits data
states_to_process = ['AL', 'GA', 'MA', 'WA']
zip_mappings = []
for state in states_to_process:
logger.info(f"Processing {state}...")
# Load nonprofits locations to get ZIP codes
try:
import pandas as pd
df = pd.read_parquet(f'data/gold/states/{state}/nonprofits_locations.parquet')
# Get unique ZIP codes (5-digit only)
df['zip5'] = df['zip_code'].str[:5]
unique_zips = df['zip5'].dropna().unique()
logger.info(f" Found {len(unique_zips)} unique ZIP codes in {state}")
# Get counties in this state
cur.execute("""
SELECT name, geoid
FROM jurisdictions_search
WHERE state = %s AND type = 'county'
""", (state,))
counties = cur.fetchall()
logger.info(f" Found {len(counties)} counties in {state}")
# For simplicity: assign each ZIP to all counties in the state
# with equal weight (will be refined later with actual data)
# This is a placeholder - real implementation would use spatial join
# Get state FIPS from first county GEOID (first 2 digits)
if counties:
state_fips = counties[0][1][:2] if counties[0][1] else '01'
# For now, we'll create a basic mapping
# In production, you'd use a proper ZIP-County crosswalk file
logger.info(f" Using state FIPS: {state_fips}")
except Exception as e:
logger.warning(f" Error processing {state}: {e}")
continue
logger.info("⚠️ Simplified mapping not sufficient - need proper ZIP-County crosswalk")
logger.info("💡 Recommend downloading HUD USPS ZIP Code Crosswalk file")
conn.close()
return zip_mappings
def populate_from_census_data(df):
"""Populate zip_county_mapping from Census ZCTA data"""
conn = psycopg2.connect(DATABASE_URL)
cur = conn.cursor()
logger.info("📊 Processing Census ZCTA-County data...")
# Use correct FIPS to state abbreviation mapping
# Source: https://www.census.gov/library/reference/code-lists/ansi.html
state_mapping = {
'01': 'AL', '02': 'AK', '04': 'AZ', '05': 'AR', '06': 'CA',
'08': 'CO', '09': 'CT', '10': 'DE', '11': 'DC', '12': 'FL',
'13': 'GA', '15': 'HI', '16': 'ID', '17': 'IL', '18': 'IN',
'19': 'IA', '20': 'KS', '21': 'KY', '22': 'LA', '23': 'ME',
'24': 'MD', '25': 'MA', '26': 'MI', '27': 'MN', '28': 'MS',
'29': 'MO', '30': 'MT', '31': 'NE', '32': 'NV', '33': 'NH',
'34': 'NJ', '35': 'NM', '36': 'NY', '37': 'NC', '38': 'ND',
'39': 'OH', '40': 'OK', '41': 'OR', '42': 'PA', '44': 'RI',
'45': 'SC', '46': 'SD', '47': 'TN', '48': 'TX', '49': 'UT',
'50': 'VT', '51': 'VA', '53': 'WA', '54': 'WV', '55': 'WI',
'56': 'WY', '72': 'PR'
}
logger.info(f"Using official FIPS-to-state mapping ({len(state_mapping)} states)")
# Process the Census data
# Expected columns: GEOID_ZCTA5_20, GEOID_COUNTY_20, ...
records = []
for _, row in df.iterrows():
zcta = row.get('GEOID_ZCTA5_20', '')
county_geoid = row.get('GEOID_COUNTY_20', '')
if not zcta or not county_geoid or len(county_geoid) < 5:
continue
state_fips = county_geoid[:2]
county_fips = county_geoid[2:5]
state_abbr = state_mapping.get(state_fips)
if not state_abbr:
continue
# Get county name from Census data or construct from GEOID
cur.execute("""
SELECT name FROM jurisdictions_search
WHERE type = 'county' AND geoid = %s
""", (county_geoid,))
result = cur.fetchone()
county_name = result[0] if result else row.get('NAMELSAD_COUNTY_20', None)
records.append((
zcta,
state_fips,
county_fips,
state_abbr,
county_name,
1.0, # residential_ratio - default to 1.0
True # is_primary - will update later for multi-county ZIPs
))
if records:
logger.info(f"💾 Inserting {len(records)} ZIP-county mappings...")
insert_query = """
INSERT INTO zip_county_mapping
(zip_code, state_fips, county_fips, state_abbr, county_name, residential_ratio, is_primary)
VALUES %s
ON CONFLICT (zip_code, state_fips, county_fips) DO NOTHING
"""
execute_values(cur, insert_query, records)
conn.commit()
logger.info(f"✅ Inserted {len(records)} mappings")
conn.close()
def mark_primary_counties():
"""
For ZIP codes spanning multiple counties, mark the one with highest
residential ratio as primary
"""
conn = psycopg2.connect(DATABASE_URL)
cur = conn.cursor()
logger.info("🎯 Marking primary counties for multi-county ZIP codes...")
# Reset all to non-primary first
cur.execute("UPDATE zip_county_mapping SET is_primary = FALSE")
# Mark primary (highest residential ratio) for each ZIP
cur.execute("""
WITH ranked AS (
SELECT zip_code, state_fips, county_fips,
ROW_NUMBER() OVER (
PARTITION BY zip_code
ORDER BY residential_ratio DESC, county_fips
) as rank
FROM zip_county_mapping
)
UPDATE zip_county_mapping z
SET is_primary = TRUE
FROM ranked r
WHERE z.zip_code = r.zip_code
AND z.state_fips = r.state_fips
AND z.county_fips = r.county_fips
AND r.rank = 1
""")
conn.commit()
logger.info(f"✅ Marked primary counties")
# Show stats
cur.execute("""
SELECT
COUNT(DISTINCT zip_code) as total_zips,
COUNT(DISTINCT CASE WHEN is_primary THEN zip_code END) as zips_with_primary,
COUNT(*) as total_mappings
FROM zip_county_mapping
""")
total_zips, zips_primary, total_mappings = cur.fetchone()
logger.info(f"📊 Stats:")
logger.info(f" Total ZIP codes: {total_zips:,}")
logger.info(f" ZIP codes with primary county: {zips_primary:,}")
logger.info(f" Total ZIP-county mappings: {total_mappings:,}")
logger.info(f" Multi-county ZIPs: {total_mappings - total_zips:,}")
conn.close()
if __name__ == '__main__':
logger.info("🗺️ Creating ZIP Code to County Mapping")
logger.info("=" * 60)
# Step 1: Create table
create_zip_county_table()
# Step 2: Load Census data
census_df = load_census_zcta_county_data()
if census_df is not None:
# Step 3: Populate from Census data
populate_from_census_data(census_df)
# Step 4: Mark primary counties
mark_primary_counties()
logger.info("\n✅ ZIP-County mapping complete!")
else:
logger.warning("\n⚠️ Could not load Census data")
logger.info("Alternative: Use HUD USPS ZIP Code Crosswalk file")
logger.info("Download from: https://www.huduser.gov/portal/datasets/usps_crosswalk.html")