open-navigator / scripts /datasources /census /acs_ingestion.py
jcbowyer's picture
Clean HuggingFace deployment without binary files
61d29fc
"""
American Community Survey (ACS) Data Ingestion
Downloads and processes demographic, economic, housing, and social data from the
U.S. Census Bureau's American Community Survey (ACS) 5-Year Estimates.
Data Coverage:
- Demographics (age, race, ethnicity, language)
- Economics (income, employment, poverty)
- Housing (occupancy, value, rent)
- Social (education, disability, veteran status)
- Health insurance coverage
Data Granularity:
- National, State, County, Place (city/town), Tract, Block Group
"""
import asyncio
import csv
import zipfile
from typing import List, Dict, Any, Optional
from datetime import datetime
from pathlib import Path
import httpx
import pandas as pd
from loguru import logger
try:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from pyspark.sql.functions import lit
PYSPARK_AVAILABLE = True
except ImportError:
PYSPARK_AVAILABLE = False
SparkSession = None
DataFrame = None
from config import settings
class ACSDataIngestion:
"""
Ingest American Community Survey (ACS) data for civic engagement analysis.
The ACS provides demographic, economic, housing, and social characteristics
for all areas of the United States, Puerto Rico, and Island Areas.
We use 5-Year Estimates (most reliable, covers all geographies).
"""
# ACS 5-Year Estimates (2022) - Most recent complete dataset
# These are summary files with pre-aggregated tables
ACS_BASE_URL = "https://www2.census.gov/programs-surveys/acs/summary_file/2022/data"
# Key ACS tables for civic engagement and oral health policy
ACS_TABLES = {
# Demographics
"B01001": "Sex by Age",
"B02001": "Race",
"B03002": "Hispanic or Latino Origin by Race",
"B05001": "Nativity and Citizenship Status",
"B16001": "Language Spoken at Home",
# Economics
"B19013": "Median Household Income",
"B17001": "Poverty Status (Individual)",
"B23025": "Employment Status",
"C24010": "Sex by Occupation",
# Housing
"B25001": "Housing Units",
"B25003": "Tenure (Owner vs Renter)",
"B25077": "Median Home Value",
"B25064": "Median Gross Rent",
# Education
"B15003": "Educational Attainment",
"B14001": "School Enrollment by Age",
# Health Insurance (Critical for oral health policy)
"B27001": "Health Insurance Coverage Status by Age",
"B27010": "Health Insurance Coverage by Age (Under 19)",
"C27007": "Medicaid/Means-Tested Public Coverage",
# Disability
"B18101": "Sex by Age by Disability Status",
# Veterans
"B21001": "Veteran Status",
}
# Geography levels available
GEO_LEVELS = {
"us": "United States",
"state": "State",
"county": "County",
"place": "Place (City/Town)",
"tract": "Census Tract",
"cousub": "County Subdivision",
}
def __init__(self, data_dir: Optional[Path] = None, spark: Optional[SparkSession] = None):
"""
Initialize ACS ingestion.
Args:
data_dir: Base directory for data storage (default: data/cache/acs)
Can be set to D:/ for D drive storage
spark: Optional Spark session for Delta Lake integration
"""
if data_dir is None:
self.data_dir = Path("data/cache/acs")
else:
self.data_dir = Path(data_dir)
self.data_dir.mkdir(parents=True, exist_ok=True)
if not PYSPARK_AVAILABLE:
logger.warning("PySpark not available - data will be stored as Parquet only")
self.spark = None
else:
self.spark = spark or SparkSession.builder.appName("ACSIngestion").getOrCreate()
# Census API key (optional but recommended for higher rate limits)
self.api_key = settings.CENSUS_API_KEY if hasattr(settings, 'CENSUS_API_KEY') else None
logger.info(f"ACS data directory: {self.data_dir.absolute()}")
async def download_acs_data_api(
self,
table: str,
geography: str = "county",
state: str = "*",
year: int = 2022
) -> pd.DataFrame:
"""
Download ACS data using Census API.
This is the recommended approach for targeted data extraction.
Requires a Census API key for higher rate limits.
Args:
table: ACS table code (e.g., "B19013" for median household income)
geography: Geographic level (county, place, tract, etc.)
state: State FIPS code (* for all states)
year: ACS year (2022 is most recent 5-year estimate)
Returns:
pandas DataFrame with requested data
Example:
# Get median household income for all counties
df = await acs.download_acs_data_api("B19013", "county", "*")
"""
if not self.api_key:
logger.warning("No Census API key found. Get one at: https://api.census.gov/data/key_signup.html")
logger.info("Without API key, you're limited to 500 requests/day")
# Construct API URL
base_url = f"https://api.census.gov/data/{year}/acs/acs5"
# Determine geography parameter based on level
geo_params = {
"county": f"county:*",
"place": f"place:*",
"tract": f"tract:*",
"cousub": f"county subdivision:*",
}
# Get all variables for this table
variables = f"group({table})"
params = {
"get": variables,
"for": geo_params.get(geography, "county:*"),
}
if state != "*":
params["in"] = f"state:{state}"
if self.api_key:
params["key"] = self.api_key
logger.info(f"Downloading ACS table {table} ({self.ACS_TABLES.get(table, 'Unknown')})...")
logger.info(f"Geography: {geography}, State: {state}, Year: {year}")
async with httpx.AsyncClient(timeout=300.0) as client:
try:
response = await client.get(base_url, params=params)
response.raise_for_status()
# Parse JSON response
data = response.json()
# First row is headers, rest is data
headers = data[0]
rows = data[1:]
df = pd.DataFrame(rows, columns=headers)
logger.success(f"Downloaded {len(df)} records for table {table}")
# Cache the data
cache_file = self.data_dir / f"{table}_{geography}_{state}_{year}.parquet"
df.to_parquet(cache_file, index=False)
logger.info(f"Cached to: {cache_file}")
return df
except httpx.HTTPStatusError as e:
logger.error(f"API request failed: {e}")
logger.error(f"Status: {e.response.status_code}")
logger.error(f"Response: {e.response.text[:500]}")
raise
async def download_all_demographics(self, geography: str = "county", state: str = "*") -> Dict[str, pd.DataFrame]:
"""
Download all key demographic tables for a geography level.
This downloads the most important tables for civic engagement analysis:
- Demographics (age, race, language)
- Economics (income, poverty, employment)
- Health insurance coverage
- Education
Args:
geography: Geographic level (county, place, tract)
state: State FIPS code (* for all states)
Returns:
Dictionary mapping table codes to DataFrames
Example:
# Get all demographic data for California counties
dfs = await acs.download_all_demographics("county", "06")
"""
key_tables = [
"B01001", # Age/Sex
"B02001", # Race
"B03002", # Hispanic origin
"B19013", # Median household income
"B17001", # Poverty
"B27001", # Health insurance
"B15003", # Education
]
results = {}
for table in key_tables:
try:
df = await self.download_acs_data_api(table, geography, state)
results[table] = df
# Rate limiting - be nice to Census API
await asyncio.sleep(1)
except Exception as e:
logger.error(f"Failed to download table {table}: {e}")
continue
logger.success(f"Downloaded {len(results)}/{len(key_tables)} tables")
return results
async def download_bulk_files(self, state: str = "ALL", year: int = 2022) -> Path:
"""
Download bulk ACS summary files (ZIP archives).
This is useful for downloading ALL ACS data at once.
Warning: Files are LARGE (several GB per state).
Args:
state: State abbreviation (e.g., "CA", "TX") or "ALL" for all states
year: ACS year (2022 is most recent)
Returns:
Path to extracted data directory
Note:
- ALL states file is ~15 GB
- Individual state files are 200-500 MB each
- Consider using API for targeted data extraction instead
"""
if state == "ALL":
filename = f"All_Geographies_Not_Tracts_Block_Groups.zip"
else:
filename = f"{year}_5yr_Summary_FileTemplates.zip"
url = f"{self.ACS_BASE_URL}/{filename}"
output_dir = self.data_dir / f"acs_{year}_{state}"
output_dir.mkdir(parents=True, exist_ok=True)
zip_path = output_dir / filename
# Check if already downloaded
if zip_path.exists():
logger.info(f"Using cached file: {zip_path}")
return output_dir
logger.warning(f"Downloading bulk ACS file: {filename}")
logger.warning(f"This may be several GB and take 10-30 minutes...")
async with httpx.AsyncClient(timeout=3600.0) as client: # 1 hour timeout
async with client.stream("GET", url) as response:
response.raise_for_status()
total_size = int(response.headers.get("content-length", 0))
with open(zip_path, "wb") as f:
downloaded = 0
async for chunk in response.aiter_bytes(chunk_size=8192 * 1024): # 8 MB chunks
f.write(chunk)
downloaded += len(chunk)
if total_size > 0:
pct = (downloaded / total_size) * 100
logger.info(f"Progress: {pct:.1f}% ({downloaded / 1e9:.2f} GB / {total_size / 1e9:.2f} GB)")
logger.success(f"Downloaded: {zip_path}")
# Extract ZIP
logger.info("Extracting ZIP file...")
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(output_dir)
logger.success(f"Extracted to: {output_dir}")
return output_dir
def get_cached_data(self, table: str, geography: str = "county", state: str = "*") -> Optional[pd.DataFrame]:
"""
Load cached ACS data if available.
Args:
table: ACS table code
geography: Geographic level
state: State FIPS code
Returns:
DataFrame if cached, None otherwise
"""
cache_file = self.data_dir / f"{table}_{geography}_{state}_2022.parquet"
if cache_file.exists():
logger.info(f"Loading cached data: {cache_file}")
return pd.read_parquet(cache_file)
return None
def list_available_tables(self) -> None:
"""Print all available ACS tables."""
print("\n📊 Available ACS Tables\n")
print("=" * 80)
categories = {
"Demographics": ["B01001", "B02001", "B03002", "B05001", "B16001"],
"Economics": ["B19013", "B17001", "B23025", "C24010"],
"Housing": ["B25001", "B25003", "B25077", "B25064"],
"Education": ["B15003", "B14001"],
"Health Insurance": ["B27001", "B27010", "C27007"],
"Other": ["B18101", "B21001"],
}
for category, tables in categories.items():
print(f"\n{category}:")
for table in tables:
description = self.ACS_TABLES.get(table, "Unknown")
print(f" {table}: {description}")
print("\n" + "=" * 80)
print(f"\nTotal: {len(self.ACS_TABLES)} tables available")
print("\nFor complete table list, visit:")
print("https://www.census.gov/programs-surveys/acs/technical-documentation/table-shells.html")
async def main():
"""Example usage of ACS ingestion."""
# Option 1: Use default data directory (data/cache/acs)
acs = ACSDataIngestion()
# Option 2: Use D drive (WSL - Windows Subsystem for Linux)
# acs = ACSDataIngestion(data_dir=Path("/mnt/d/open-navigator-data/acs"))
# Option 3: Use D drive (Windows native)
# acs = ACSDataIngestion(data_dir=Path("D:/open-navigator-data/acs"))
# Option 4: Use external drive (Linux/Mac)
# acs = ACSDataIngestion(data_dir=Path("/mnt/external/open-navigator-data/acs"))
# List available tables
acs.list_available_tables()
print("\n" + "=" * 80)
print("Example 1: Download median household income for all counties")
print("=" * 80)
# Download median household income for all U.S. counties
income_df = await acs.download_acs_data_api("B19013", geography="county", state="*")
print(f"\nDownloaded {len(income_df)} counties")
print(income_df.head())
print("\n" + "=" * 80)
print("Example 2: Download health insurance data for California counties")
print("=" * 80)
# Download health insurance data for California only
health_df = await acs.download_acs_data_api("B27001", geography="county", state="06")
print(f"\nDownloaded {len(health_df)} California counties")
print(health_df.head())
print("\n" + "=" * 80)
print("Example 3: Download all key demographic tables")
print("=" * 80)
# Download comprehensive demographic package
all_data = await acs.download_all_demographics(geography="county", state="06")
print(f"\nDownloaded {len(all_data)} tables:")
for table_code, df in all_data.items():
table_name = acs.ACS_TABLES.get(table_code, "Unknown")
print(f" {table_code} ({table_name}): {len(df)} records")
if __name__ == "__main__":
asyncio.run(main())