Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """ | |
| Census Bureau Data Ingestion | |
| Downloads and processes Census of Governments data to create master list | |
| of all U.S. local government jurisdictions. | |
| Data Sources: | |
| - Census Bureau Government Integrated Directory (GID) | |
| - Individual State Descriptions | |
| - FIPS codes for standardized identification | |
| """ | |
| import asyncio | |
| import csv | |
| import io | |
| from typing import List, Dict, Any, Optional | |
| from datetime import datetime | |
| from pathlib import Path | |
| import httpx | |
| from loguru import logger | |
| try: | |
| from pyspark.sql import SparkSession, DataFrame | |
| from pyspark.sql.types import StructType, StructField, StringType, IntegerType | |
| from pyspark.sql.functions import lit | |
| PYSPARK_AVAILABLE = True | |
| except ImportError: | |
| PYSPARK_AVAILABLE = False | |
| SparkSession = None | |
| DataFrame = None | |
| from config import settings | |
| class CensusGovernmentIngestion: | |
| """Ingest Census Bureau government entity data.""" | |
| # Census Bureau API endpoint | |
| CENSUS_API_BASE = "https://api.census.gov/data" | |
| # Census Gazetteer Files 2024 - Actual individual jurisdiction listings with names, FIPS, coordinates, population | |
| # These provide complete listings of all government entities, not summary statistics | |
| GID_URLS = { | |
| # All 3,144 counties with names, FIPS codes, lat/lon, land area, water area | |
| "counties": "https://www2.census.gov/geo/docs/maps-data/data/gazetteer/2024_Gazetteer/2024_Gaz_counties_national.zip", | |
| # All 19,502+ incorporated places (cities, towns, villages, boroughs) | |
| "municipalities": "https://www2.census.gov/geo/docs/maps-data/data/gazetteer/2024_Gazetteer/2024_Gaz_place_national.zip", | |
| # All 36,011+ county subdivisions (townships, boroughs, census county divisions, unorganized territories) | |
| "townships": "https://www2.census.gov/geo/docs/maps-data/data/gazetteer/2024_Gazetteer/2024_Gaz_cousubs_national.zip", | |
| # Elementary school districts | |
| "school_districts_elem": "https://www2.census.gov/geo/docs/maps-data/data/gazetteer/2024_Gazetteer/2024_Gaz_elsd_national.zip", | |
| # Secondary school districts | |
| "school_districts_sec": "https://www2.census.gov/geo/docs/maps-data/data/gazetteer/2024_Gazetteer/2024_Gaz_scsd_national.zip", | |
| # Unified school districts | |
| "school_districts_unified": "https://www2.census.gov/geo/docs/maps-data/data/gazetteer/2024_Gazetteer/2024_Gaz_unsd_national.zip", | |
| } | |
| # Set to False to use real Census data | |
| USE_MOCK_DATA = False | |
| def __init__(self, spark: Optional[SparkSession] = None): | |
| """Initialize ingestion with Spark session.""" | |
| if not PYSPARK_AVAILABLE: | |
| logger.warning("PySpark not available - using mock data mode") | |
| self.spark = None | |
| else: | |
| self.spark = spark or SparkSession.builder.appName("CensusIngestion").getOrCreate() | |
| self.cache_dir = Path("data/cache/census") | |
| self.cache_dir.mkdir(parents=True, exist_ok=True) | |
| def _create_mock_data(self, jurisdiction_type: str) -> List[Dict[str, Any]]: | |
| """ | |
| Create mock jurisdiction data for local testing. | |
| This allows the system to run without downloading Census data. | |
| """ | |
| logger.info(f"Using mock data for {jurisdiction_type}") | |
| mock_data = { | |
| "counties": [ | |
| {"name": "Los Angeles County", "state": "CA", "state_code": "06", "fips": "06037", "population": "10014009"}, | |
| {"name": "Cook County", "state": "IL", "state_code": "17", "fips": "17031", "population": "5275541"}, | |
| {"name": "Harris County", "state": "TX", "state_code": "48", "fips": "48201", "population": "4731145"}, | |
| {"name": "Maricopa County", "state": "AZ", "state_code": "04", "fips": "04013", "population": "4485414"}, | |
| {"name": "San Diego County", "state": "CA", "state_code": "06", "fips": "06073", "population": "3286069"}, | |
| {"name": "Orange County", "state": "CA", "state_code": "06", "fips": "06059", "population": "3167809"}, | |
| {"name": "Miami-Dade County", "state": "FL", "state_code": "12", "fips": "12086", "population": "2716940"}, | |
| {"name": "Dallas County", "state": "TX", "state_code": "48", "fips": "48113", "population": "2647787"}, | |
| {"name": "Kings County", "state": "NY", "state_code": "36", "fips": "36047", "population": "2559903"}, | |
| {"name": "Riverside County", "state": "CA", "state_code": "06", "fips": "06065", "population": "2470546"}, | |
| ], | |
| "municipalities": [ | |
| {"name": "New York City", "state": "NY", "state_code": "36", "fips": "3651000", "population": "8336817"}, | |
| {"name": "Los Angeles", "state": "CA", "state_code": "06", "fips": "0644000", "population": "3979576"}, | |
| {"name": "Chicago", "state": "IL", "state_code": "17", "fips": "1714000", "population": "2746388"}, | |
| {"name": "Houston", "state": "TX", "state_code": "48", "fips": "4835000", "population": "2314157"}, | |
| {"name": "Phoenix", "state": "AZ", "state_code": "04", "fips": "0455000", "population": "1680992"}, | |
| {"name": "Philadelphia", "state": "PA", "state_code": "42", "fips": "4260000", "population": "1584064"}, | |
| {"name": "San Antonio", "state": "TX", "state_code": "48", "fips": "4865000", "population": "1547253"}, | |
| {"name": "San Diego", "state": "CA", "state_code": "06", "fips": "0666000", "population": "1423851"}, | |
| {"name": "Dallas", "state": "TX", "state_code": "48", "fips": "4819000", "population": "1343573"}, | |
| {"name": "San Jose", "state": "CA", "state_code": "06", "fips": "0668000", "population": "1013240"}, | |
| ], | |
| "townships": [ | |
| {"name": "Bloomfield Township", "state": "MI", "state_code": "26", "fips": "2609320", "population": "44253"}, | |
| {"name": "Canton Township", "state": "MI", "state_code": "26", "fips": "2613960", "population": "98659"}, | |
| {"name": "Wayne Township", "state": "IN", "state_code": "18", "fips": "1883718", "population": "97878"}, | |
| ], | |
| "school_districts": [ | |
| {"name": "Los Angeles Unified School District", "state": "CA", "state_code": "06", "fips": "0600001", "population": "600000"}, | |
| {"name": "Chicago Public Schools", "state": "IL", "state_code": "17", "fips": "1700002", "population": "340000"}, | |
| {"name": "Miami-Dade County Public Schools", "state": "FL", "state_code": "12", "fips": "1200003", "population": "330000"}, | |
| ], | |
| "special_districts": [ | |
| {"name": "Metropolitan Water District", "state": "CA", "state_code": "06", "fips": "06SD001", "population": "19000000"}, | |
| {"name": "Port Authority of NY & NJ", "state": "NY", "state_code": "36", "fips": "36SD002", "population": "21000000"}, | |
| ] | |
| } | |
| return mock_data.get(jurisdiction_type, []) | |
| async def download_census_data(self, jurisdiction_type: str) -> Path: | |
| """ | |
| Download Census Gazetteer data for a jurisdiction type. | |
| Census Gazetteer files are tab-delimited text files inside ZIP archives. | |
| These contain actual jurisdiction listings with names, FIPS codes, coordinates, and population. | |
| Args: | |
| jurisdiction_type: One of 'counties', 'municipalities', 'townships', | |
| 'school_districts', 'census_places' | |
| Returns: | |
| Path to extracted CSV file | |
| """ | |
| if jurisdiction_type not in self.GID_URLS: | |
| raise ValueError(f"Invalid jurisdiction type: {jurisdiction_type}") | |
| url = self.GID_URLS[jurisdiction_type] | |
| cache_file = self.cache_dir / f"{jurisdiction_type}_{datetime.now().strftime('%Y%m%d')}.csv" | |
| # Use cached file if exists and is recent (< 7 days old) | |
| if cache_file.exists() and (datetime.now().timestamp() - cache_file.stat().st_mtime) < 604800: | |
| logger.info(f"Using cached {jurisdiction_type} data from {cache_file}") | |
| return cache_file | |
| logger.info(f"Downloading {jurisdiction_type} data from Census Bureau...") | |
| logger.info(f"URL: {url}") | |
| logger.info(f"This may take 2-5 minutes for large files...") | |
| # Increase timeout for large Census files (some are 100MB+) | |
| async with httpx.AsyncClient(timeout=300.0, follow_redirects=True) as client: | |
| try: | |
| response = await client.get(url) | |
| response.raise_for_status() | |
| # Save ZIP file | |
| zip_file = self.cache_dir / f"{jurisdiction_type}_temp.zip" | |
| zip_file.write_bytes(response.content) | |
| logger.success(f"Downloaded {len(response.content)} bytes") | |
| # Extract ZIP file | |
| import zipfile | |
| with zipfile.ZipFile(zip_file, 'r') as zip_ref: | |
| # Extract all files | |
| extract_dir = self.cache_dir / f"{jurisdiction_type}_extracted" | |
| extract_dir.mkdir(exist_ok=True) | |
| zip_ref.extractall(extract_dir) | |
| # Find tab-delimited text file (.txt) for Gazetteer files | |
| # or CSV/Excel for school districts | |
| txt_files = list(extract_dir.glob("*.txt")) | |
| csv_files = list(extract_dir.glob("*.csv")) | |
| excel_files = list(extract_dir.glob("*.xlsx")) + list(extract_dir.glob("*.xls")) | |
| data_file = None | |
| if txt_files: | |
| # Gazetteer files (tab-delimited) | |
| data_file = txt_files[0] | |
| logger.info(f"Found Gazetteer file: {data_file.name}") | |
| # Convert tab-delimited to CSV using pandas | |
| import pandas as pd | |
| df = pd.read_csv(data_file, sep='\t', encoding='latin-1', low_memory=False) | |
| df.to_csv(cache_file, index=False) | |
| logger.success(f"Converted to CSV: {cache_file}") | |
| elif csv_files: | |
| # Already CSV (some sources) | |
| data_file = csv_files[0] | |
| logger.info(f"Found CSV file: {data_file.name}") | |
| import shutil | |
| shutil.copy(data_file, cache_file) | |
| logger.success(f"Copied to cache: {cache_file}") | |
| elif excel_files: | |
| # Excel files (school districts) | |
| data_file = excel_files[0] | |
| logger.info(f"Found Excel file: {data_file.name}") | |
| import pandas as pd | |
| df = pd.read_excel(data_file, engine='openpyxl') | |
| df.to_csv(cache_file, index=False) | |
| logger.success(f"Converted to CSV: {cache_file}") | |
| else: | |
| raise FileNotFoundError(f"No data file found in ZIP for {jurisdiction_type}") | |
| # Clean up | |
| zip_file.unlink() | |
| import shutil | |
| shutil.rmtree(extract_dir) | |
| return cache_file | |
| except httpx.TimeoutException as e: | |
| logger.error(f"Timeout downloading {jurisdiction_type} data after 5 minutes") | |
| logger.error(f"URL: {url}") | |
| logger.warning(f"Census server may be slow or file is very large. Try again later or skip {jurisdiction_type}.") | |
| raise | |
| except httpx.HTTPError as e: | |
| logger.error(f"HTTP error downloading {jurisdiction_type} data: {e}") | |
| logger.error(f"URL that failed: {url}") | |
| logger.warning(f"Check if Census Bureau website is accessible or file exists.") | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error processing {jurisdiction_type} data: {e}") | |
| logger.error(f"File: {url}") | |
| raise | |
| def parse_csv_to_dataframe(self, csv_path: Path, jurisdiction_type: str) -> DataFrame: | |
| """ | |
| Parse Census Gazetteer CSV into Spark DataFrame. | |
| Gazetteer files contain actual jurisdiction listings with names, FIPS codes, | |
| coordinates, land area, and population (when available). | |
| Args: | |
| csv_path: Path to CSV file | |
| jurisdiction_type: Type of jurisdiction | |
| Returns: | |
| Spark DataFrame with standardized columns | |
| """ | |
| logger.info(f"Parsing {csv_path} into DataFrame...") | |
| # Read CSV with Spark | |
| df = self.spark.read.csv( | |
| str(csv_path), | |
| header=True, | |
| inferSchema=True | |
| ) | |
| # Clean column names (Delta Lake doesn't allow spaces or special chars) | |
| for col_name in df.columns: | |
| clean_name = col_name.replace(" ", "_").replace("(", "").replace(")", "").replace(",", "_").replace(".", "_") | |
| if clean_name != col_name: | |
| df = df.withColumnRenamed(col_name, clean_name) | |
| # Add standardized metadata columns | |
| df = df.withColumn("jurisdiction_type", lit(jurisdiction_type)) | |
| df = df.withColumn("ingestion_date", lit(datetime.now().isoformat())) | |
| # Gazetteer files have columns like: | |
| # - USPS (state abbreviation) | |
| # - GEOID (FIPS code) | |
| # - NAME (jurisdiction name) | |
| # - INTPTLAT, INTPTLONG (coordinates) | |
| # - ALAND, AWATER (land and water area in sq meters) | |
| # - ALAND_SQMI, AWATER_SQMI (in square miles) | |
| # Add aliases for common columns (if they exist) | |
| if "USPS" in df.columns: | |
| df = df.withColumnRenamed("USPS", "state_code") | |
| if "GEOID" in df.columns: | |
| df = df.withColumnRenamed("GEOID", "fips_code") | |
| if "NAME" in df.columns: | |
| df = df.withColumnRenamed("NAME", "name") | |
| if "INTPTLAT" in df.columns: | |
| df = df.withColumnRenamed("INTPTLAT", "latitude") | |
| if "INTPTLONG" in df.columns: | |
| df = df.withColumnRenamed("INTPTLONG", "longitude") | |
| logger.success(f"Parsed {df.count()} records from {csv_path}") | |
| return df | |
| async def ingest_all_jurisdictions(self, skip_school_districts: bool = False) -> Dict[str, DataFrame]: | |
| """ | |
| Download and parse all jurisdiction types. | |
| Args: | |
| skip_school_districts: If True, skip school districts (they're large and optional) | |
| Returns: | |
| Dictionary mapping jurisdiction type to DataFrame | |
| """ | |
| logger.info("Starting full Census data ingestion...") | |
| dataframes = {} | |
| jurisdiction_types = list(self.GID_URLS.keys()) | |
| # Optionally skip school districts (they're very large files and optional for core functionality) | |
| if skip_school_districts: | |
| jurisdiction_types = [jt for jt in jurisdiction_types if not jt.startswith('school_districts')] | |
| logger.info("Skipping school districts (use skip_school_districts=False to include)") | |
| for jurisdiction_type in jurisdiction_types: | |
| try: | |
| # Download | |
| csv_path = await self.download_census_data(jurisdiction_type) | |
| # Parse | |
| df = self.parse_csv_to_dataframe(csv_path, jurisdiction_type) | |
| dataframes[jurisdiction_type] = df | |
| except Exception as e: | |
| logger.error(f"Failed to ingest {jurisdiction_type}: {e}") | |
| # School districts are optional - warn but continue | |
| if jurisdiction_type.startswith('school_districts'): | |
| logger.warning(f"Skipping {jurisdiction_type} (optional). Counties, municipalities, and townships are sufficient for most use cases.") | |
| continue | |
| logger.success(f"Ingested {len(dataframes)} jurisdiction types") | |
| return dataframes | |
| def write_to_bronze_layer(self, dataframes: Dict[str, DataFrame]): | |
| """ | |
| Write jurisdiction data to Delta Lake Bronze layer. | |
| Bronze layer stores raw Census Gazetteer data with individual jurisdiction listings. | |
| Each jurisdiction has name, FIPS code, state, coordinates, and area information. | |
| Args: | |
| dataframes: Dictionary of jurisdiction DataFrames | |
| """ | |
| logger.info("Writing jurisdiction data to Bronze layer...") | |
| bronze_path = f"{settings.delta_lake_path}/bronze/jurisdictions" | |
| # Write individual tables | |
| for jurisdiction_type, df in dataframes.items(): | |
| table_path = f"{bronze_path}/{jurisdiction_type}" | |
| # Write without partitioning for Bronze (raw data) | |
| df.write \ | |
| .format("delta") \ | |
| .mode("overwrite") \ | |
| .save(table_path) | |
| logger.success(f"Wrote {jurisdiction_type} to {table_path}") | |
| # Note: Each jurisdiction type is stored separately in Bronze layer | |
| # Gazetteer files have similar schemas (name, FIPS, state, coordinates) | |
| # Silver layer will create a unified view with standardized columns | |
| total_records = sum(df.count() for df in dataframes.values()) | |
| logger.success(f"Bronze layer complete: {len(dataframes)} tables with {total_records:,} total records") | |
| return { | |
| "total_records": total_records, | |
| "tables": len(dataframes) | |
| } | |
| async def main(): | |
| """Run Census ingestion pipeline.""" | |
| ingestion = CensusGovernmentIngestion() | |
| # Download and parse all data | |
| # Skip school districts by default (very large files, optional for core functionality) | |
| # Set skip_school_districts=False if you specifically need school district data | |
| dataframes = await ingestion.ingest_all_jurisdictions(skip_school_districts=True) | |
| # Write to Delta Lake | |
| ingestion.write_to_bronze_layer(dataframes) | |
| # Print summary statistics | |
| for jtype, df in dataframes.items(): | |
| count = df.count() | |
| states = df.select("state_fips").distinct().count() | |
| print(f"{jtype}: {count:,} jurisdictions across {states} states") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |