open-navigator / scripts /datasources /census /census_ingestion.py
jcbowyer's picture
Clean HuggingFace deployment without binary files
61d29fc
"""
Census Bureau Data Ingestion
Downloads and processes Census of Governments data to create master list
of all U.S. local government jurisdictions.
Data Sources:
- Census Bureau Government Integrated Directory (GID)
- Individual State Descriptions
- FIPS codes for standardized identification
"""
import asyncio
import csv
import io
from typing import List, Dict, Any, Optional
from datetime import datetime
from pathlib import Path
import httpx
from loguru import logger
try:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import lit
PYSPARK_AVAILABLE = True
except ImportError:
PYSPARK_AVAILABLE = False
SparkSession = None
DataFrame = None
from config import settings
class CensusGovernmentIngestion:
"""Ingest Census Bureau government entity data."""
# Census Bureau API endpoint
CENSUS_API_BASE = "https://api.census.gov/data"
# Census Gazetteer Files 2024 - Actual individual jurisdiction listings with names, FIPS, coordinates, population
# These provide complete listings of all government entities, not summary statistics
GID_URLS = {
# All 3,144 counties with names, FIPS codes, lat/lon, land area, water area
"counties": "https://www2.census.gov/geo/docs/maps-data/data/gazetteer/2024_Gazetteer/2024_Gaz_counties_national.zip",
# All 19,502+ incorporated places (cities, towns, villages, boroughs)
"municipalities": "https://www2.census.gov/geo/docs/maps-data/data/gazetteer/2024_Gazetteer/2024_Gaz_place_national.zip",
# All 36,011+ county subdivisions (townships, boroughs, census county divisions, unorganized territories)
"townships": "https://www2.census.gov/geo/docs/maps-data/data/gazetteer/2024_Gazetteer/2024_Gaz_cousubs_national.zip",
# Elementary school districts
"school_districts_elem": "https://www2.census.gov/geo/docs/maps-data/data/gazetteer/2024_Gazetteer/2024_Gaz_elsd_national.zip",
# Secondary school districts
"school_districts_sec": "https://www2.census.gov/geo/docs/maps-data/data/gazetteer/2024_Gazetteer/2024_Gaz_scsd_national.zip",
# Unified school districts
"school_districts_unified": "https://www2.census.gov/geo/docs/maps-data/data/gazetteer/2024_Gazetteer/2024_Gaz_unsd_national.zip",
}
# Set to False to use real Census data
USE_MOCK_DATA = False
def __init__(self, spark: Optional[SparkSession] = None):
"""Initialize ingestion with Spark session."""
if not PYSPARK_AVAILABLE:
logger.warning("PySpark not available - using mock data mode")
self.spark = None
else:
self.spark = spark or SparkSession.builder.appName("CensusIngestion").getOrCreate()
self.cache_dir = Path("data/cache/census")
self.cache_dir.mkdir(parents=True, exist_ok=True)
def _create_mock_data(self, jurisdiction_type: str) -> List[Dict[str, Any]]:
"""
Create mock jurisdiction data for local testing.
This allows the system to run without downloading Census data.
"""
logger.info(f"Using mock data for {jurisdiction_type}")
mock_data = {
"counties": [
{"name": "Los Angeles County", "state": "CA", "state_code": "06", "fips": "06037", "population": "10014009"},
{"name": "Cook County", "state": "IL", "state_code": "17", "fips": "17031", "population": "5275541"},
{"name": "Harris County", "state": "TX", "state_code": "48", "fips": "48201", "population": "4731145"},
{"name": "Maricopa County", "state": "AZ", "state_code": "04", "fips": "04013", "population": "4485414"},
{"name": "San Diego County", "state": "CA", "state_code": "06", "fips": "06073", "population": "3286069"},
{"name": "Orange County", "state": "CA", "state_code": "06", "fips": "06059", "population": "3167809"},
{"name": "Miami-Dade County", "state": "FL", "state_code": "12", "fips": "12086", "population": "2716940"},
{"name": "Dallas County", "state": "TX", "state_code": "48", "fips": "48113", "population": "2647787"},
{"name": "Kings County", "state": "NY", "state_code": "36", "fips": "36047", "population": "2559903"},
{"name": "Riverside County", "state": "CA", "state_code": "06", "fips": "06065", "population": "2470546"},
],
"municipalities": [
{"name": "New York City", "state": "NY", "state_code": "36", "fips": "3651000", "population": "8336817"},
{"name": "Los Angeles", "state": "CA", "state_code": "06", "fips": "0644000", "population": "3979576"},
{"name": "Chicago", "state": "IL", "state_code": "17", "fips": "1714000", "population": "2746388"},
{"name": "Houston", "state": "TX", "state_code": "48", "fips": "4835000", "population": "2314157"},
{"name": "Phoenix", "state": "AZ", "state_code": "04", "fips": "0455000", "population": "1680992"},
{"name": "Philadelphia", "state": "PA", "state_code": "42", "fips": "4260000", "population": "1584064"},
{"name": "San Antonio", "state": "TX", "state_code": "48", "fips": "4865000", "population": "1547253"},
{"name": "San Diego", "state": "CA", "state_code": "06", "fips": "0666000", "population": "1423851"},
{"name": "Dallas", "state": "TX", "state_code": "48", "fips": "4819000", "population": "1343573"},
{"name": "San Jose", "state": "CA", "state_code": "06", "fips": "0668000", "population": "1013240"},
],
"townships": [
{"name": "Bloomfield Township", "state": "MI", "state_code": "26", "fips": "2609320", "population": "44253"},
{"name": "Canton Township", "state": "MI", "state_code": "26", "fips": "2613960", "population": "98659"},
{"name": "Wayne Township", "state": "IN", "state_code": "18", "fips": "1883718", "population": "97878"},
],
"school_districts": [
{"name": "Los Angeles Unified School District", "state": "CA", "state_code": "06", "fips": "0600001", "population": "600000"},
{"name": "Chicago Public Schools", "state": "IL", "state_code": "17", "fips": "1700002", "population": "340000"},
{"name": "Miami-Dade County Public Schools", "state": "FL", "state_code": "12", "fips": "1200003", "population": "330000"},
],
"special_districts": [
{"name": "Metropolitan Water District", "state": "CA", "state_code": "06", "fips": "06SD001", "population": "19000000"},
{"name": "Port Authority of NY & NJ", "state": "NY", "state_code": "36", "fips": "36SD002", "population": "21000000"},
]
}
return mock_data.get(jurisdiction_type, [])
async def download_census_data(self, jurisdiction_type: str) -> Path:
"""
Download Census Gazetteer data for a jurisdiction type.
Census Gazetteer files are tab-delimited text files inside ZIP archives.
These contain actual jurisdiction listings with names, FIPS codes, coordinates, and population.
Args:
jurisdiction_type: One of 'counties', 'municipalities', 'townships',
'school_districts', 'census_places'
Returns:
Path to extracted CSV file
"""
if jurisdiction_type not in self.GID_URLS:
raise ValueError(f"Invalid jurisdiction type: {jurisdiction_type}")
url = self.GID_URLS[jurisdiction_type]
cache_file = self.cache_dir / f"{jurisdiction_type}_{datetime.now().strftime('%Y%m%d')}.csv"
# Use cached file if exists and is recent (< 7 days old)
if cache_file.exists() and (datetime.now().timestamp() - cache_file.stat().st_mtime) < 604800:
logger.info(f"Using cached {jurisdiction_type} data from {cache_file}")
return cache_file
logger.info(f"Downloading {jurisdiction_type} data from Census Bureau...")
logger.info(f"URL: {url}")
logger.info(f"This may take 2-5 minutes for large files...")
# Increase timeout for large Census files (some are 100MB+)
async with httpx.AsyncClient(timeout=300.0, follow_redirects=True) as client:
try:
response = await client.get(url)
response.raise_for_status()
# Save ZIP file
zip_file = self.cache_dir / f"{jurisdiction_type}_temp.zip"
zip_file.write_bytes(response.content)
logger.success(f"Downloaded {len(response.content)} bytes")
# Extract ZIP file
import zipfile
with zipfile.ZipFile(zip_file, 'r') as zip_ref:
# Extract all files
extract_dir = self.cache_dir / f"{jurisdiction_type}_extracted"
extract_dir.mkdir(exist_ok=True)
zip_ref.extractall(extract_dir)
# Find tab-delimited text file (.txt) for Gazetteer files
# or CSV/Excel for school districts
txt_files = list(extract_dir.glob("*.txt"))
csv_files = list(extract_dir.glob("*.csv"))
excel_files = list(extract_dir.glob("*.xlsx")) + list(extract_dir.glob("*.xls"))
data_file = None
if txt_files:
# Gazetteer files (tab-delimited)
data_file = txt_files[0]
logger.info(f"Found Gazetteer file: {data_file.name}")
# Convert tab-delimited to CSV using pandas
import pandas as pd
df = pd.read_csv(data_file, sep='\t', encoding='latin-1', low_memory=False)
df.to_csv(cache_file, index=False)
logger.success(f"Converted to CSV: {cache_file}")
elif csv_files:
# Already CSV (some sources)
data_file = csv_files[0]
logger.info(f"Found CSV file: {data_file.name}")
import shutil
shutil.copy(data_file, cache_file)
logger.success(f"Copied to cache: {cache_file}")
elif excel_files:
# Excel files (school districts)
data_file = excel_files[0]
logger.info(f"Found Excel file: {data_file.name}")
import pandas as pd
df = pd.read_excel(data_file, engine='openpyxl')
df.to_csv(cache_file, index=False)
logger.success(f"Converted to CSV: {cache_file}")
else:
raise FileNotFoundError(f"No data file found in ZIP for {jurisdiction_type}")
# Clean up
zip_file.unlink()
import shutil
shutil.rmtree(extract_dir)
return cache_file
except httpx.TimeoutException as e:
logger.error(f"Timeout downloading {jurisdiction_type} data after 5 minutes")
logger.error(f"URL: {url}")
logger.warning(f"Census server may be slow or file is very large. Try again later or skip {jurisdiction_type}.")
raise
except httpx.HTTPError as e:
logger.error(f"HTTP error downloading {jurisdiction_type} data: {e}")
logger.error(f"URL that failed: {url}")
logger.warning(f"Check if Census Bureau website is accessible or file exists.")
raise
except Exception as e:
logger.error(f"Error processing {jurisdiction_type} data: {e}")
logger.error(f"File: {url}")
raise
def parse_csv_to_dataframe(self, csv_path: Path, jurisdiction_type: str) -> DataFrame:
"""
Parse Census Gazetteer CSV into Spark DataFrame.
Gazetteer files contain actual jurisdiction listings with names, FIPS codes,
coordinates, land area, and population (when available).
Args:
csv_path: Path to CSV file
jurisdiction_type: Type of jurisdiction
Returns:
Spark DataFrame with standardized columns
"""
logger.info(f"Parsing {csv_path} into DataFrame...")
# Read CSV with Spark
df = self.spark.read.csv(
str(csv_path),
header=True,
inferSchema=True
)
# Clean column names (Delta Lake doesn't allow spaces or special chars)
for col_name in df.columns:
clean_name = col_name.replace(" ", "_").replace("(", "").replace(")", "").replace(",", "_").replace(".", "_")
if clean_name != col_name:
df = df.withColumnRenamed(col_name, clean_name)
# Add standardized metadata columns
df = df.withColumn("jurisdiction_type", lit(jurisdiction_type))
df = df.withColumn("ingestion_date", lit(datetime.now().isoformat()))
# Gazetteer files have columns like:
# - USPS (state abbreviation)
# - GEOID (FIPS code)
# - NAME (jurisdiction name)
# - INTPTLAT, INTPTLONG (coordinates)
# - ALAND, AWATER (land and water area in sq meters)
# - ALAND_SQMI, AWATER_SQMI (in square miles)
# Add aliases for common columns (if they exist)
if "USPS" in df.columns:
df = df.withColumnRenamed("USPS", "state_code")
if "GEOID" in df.columns:
df = df.withColumnRenamed("GEOID", "fips_code")
if "NAME" in df.columns:
df = df.withColumnRenamed("NAME", "name")
if "INTPTLAT" in df.columns:
df = df.withColumnRenamed("INTPTLAT", "latitude")
if "INTPTLONG" in df.columns:
df = df.withColumnRenamed("INTPTLONG", "longitude")
logger.success(f"Parsed {df.count()} records from {csv_path}")
return df
async def ingest_all_jurisdictions(self, skip_school_districts: bool = False) -> Dict[str, DataFrame]:
"""
Download and parse all jurisdiction types.
Args:
skip_school_districts: If True, skip school districts (they're large and optional)
Returns:
Dictionary mapping jurisdiction type to DataFrame
"""
logger.info("Starting full Census data ingestion...")
dataframes = {}
jurisdiction_types = list(self.GID_URLS.keys())
# Optionally skip school districts (they're very large files and optional for core functionality)
if skip_school_districts:
jurisdiction_types = [jt for jt in jurisdiction_types if not jt.startswith('school_districts')]
logger.info("Skipping school districts (use skip_school_districts=False to include)")
for jurisdiction_type in jurisdiction_types:
try:
# Download
csv_path = await self.download_census_data(jurisdiction_type)
# Parse
df = self.parse_csv_to_dataframe(csv_path, jurisdiction_type)
dataframes[jurisdiction_type] = df
except Exception as e:
logger.error(f"Failed to ingest {jurisdiction_type}: {e}")
# School districts are optional - warn but continue
if jurisdiction_type.startswith('school_districts'):
logger.warning(f"Skipping {jurisdiction_type} (optional). Counties, municipalities, and townships are sufficient for most use cases.")
continue
logger.success(f"Ingested {len(dataframes)} jurisdiction types")
return dataframes
def write_to_bronze_layer(self, dataframes: Dict[str, DataFrame]):
"""
Write jurisdiction data to Delta Lake Bronze layer.
Bronze layer stores raw Census Gazetteer data with individual jurisdiction listings.
Each jurisdiction has name, FIPS code, state, coordinates, and area information.
Args:
dataframes: Dictionary of jurisdiction DataFrames
"""
logger.info("Writing jurisdiction data to Bronze layer...")
bronze_path = f"{settings.delta_lake_path}/bronze/jurisdictions"
# Write individual tables
for jurisdiction_type, df in dataframes.items():
table_path = f"{bronze_path}/{jurisdiction_type}"
# Write without partitioning for Bronze (raw data)
df.write \
.format("delta") \
.mode("overwrite") \
.save(table_path)
logger.success(f"Wrote {jurisdiction_type} to {table_path}")
# Note: Each jurisdiction type is stored separately in Bronze layer
# Gazetteer files have similar schemas (name, FIPS, state, coordinates)
# Silver layer will create a unified view with standardized columns
total_records = sum(df.count() for df in dataframes.values())
logger.success(f"Bronze layer complete: {len(dataframes)} tables with {total_records:,} total records")
return {
"total_records": total_records,
"tables": len(dataframes)
}
async def main():
"""Run Census ingestion pipeline."""
ingestion = CensusGovernmentIngestion()
# Download and parse all data
# Skip school districts by default (very large files, optional for core functionality)
# Set skip_school_districts=False if you specifically need school district data
dataframes = await ingestion.ingest_all_jurisdictions(skip_school_districts=True)
# Write to Delta Lake
ingestion.write_to_bronze_layer(dataframes)
# Print summary statistics
for jtype, df in dataframes.items():
count = df.count()
states = df.select("state_fips").distinct().count()
print(f"{jtype}: {count:,} jurisdictions across {states} states")
if __name__ == "__main__":
asyncio.run(main())