open-navigator / scripts /datasources /nces /nces_ingestion.py
jcbowyer's picture
Clean HuggingFace deployment without binary files
61d29fc
"""
NCES School District Data Ingestion
Downloads and processes National Center for Education Statistics (NCES)
Common Core of Data (CCD) to get comprehensive school district information.
Data Source: https://nces.ed.gov/ccd/
Primary Dataset: Local Education Agency (School District) Universe Survey
This provides:
- School district names and locations
- Physical addresses and phone numbers
- NCES IDs for standardized identification
- Enrollment and demographic data
"""
import asyncio
import csv
import zipfile
from typing import List, Dict, Any, Optional
from datetime import datetime
from pathlib import Path
try:
import httpx
except ImportError:
httpx = None
from loguru import logger
try:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, trim, lower, regexp_replace
PYSPARK_AVAILABLE = True
except ImportError:
PYSPARK_AVAILABLE = False
SparkSession = None
DataFrame = None
from config import settings
class NCESSchoolDistrictIngestion:
"""Ingest NCES Common Core of Data for school districts."""
# NCES provides CSV/Text files for the Local Education Agency Universe Survey
# Updated annually - using 2023-24 school year data
NCES_CCD_URL = "https://nces.ed.gov/ccd/data/zip/ccd_lea_052_2324_l_1a_083023.csv"
# Alternative: Directory of school districts with contact info
NCES_DIRECTORY_URL = "https://nces.ed.gov/ccd/data/zip/ccd_lea_directory_2324.csv"
def __init__(self, spark: Optional[SparkSession] = None):
"""Initialize ingestion with Spark session."""
self.spark = spark or SparkSession.builder.appName("NCESIngestion").getOrCreate()
self.cache_dir = Path("data/cache/nces")
self.cache_dir.mkdir(parents=True, exist_ok=True)
async def download_nces_data(self) -> Path:
"""
Download NCES school district data.
Returns:
Path to downloaded CSV file
"""
cache_file = self.cache_dir / "nces_school_districts.csv"
# Cache for 30 days (NCES data updates annually)
if cache_file.exists():
age_days = (datetime.now() - datetime.fromtimestamp(cache_file.stat().st_mtime)).days
if age_days < 30:
logger.info(f"Using cached NCES data (age: {age_days} days)")
return cache_file
logger.info("Downloading NCES school district data...")
async with httpx.AsyncClient(timeout=60.0) as client:
try:
# Try primary directory file first (has website URLs)
response = await client.get(self.NCES_DIRECTORY_URL)
response.raise_for_status()
cache_file.write_bytes(response.content)
logger.info(f"Downloaded NCES data to {cache_file}")
return cache_file
except Exception as e:
logger.error(f"Failed to download NCES directory: {e}")
# Fall back to universe survey file
try:
response = await client.get(self.NCES_CCD_URL)
response.raise_for_status()
cache_file.write_bytes(response.content)
logger.info(f"Downloaded NCES universe data to {cache_file}")
return cache_file
except Exception as e2:
logger.error(f"Failed to download NCES universe data: {e2}")
raise
def parse_csv_to_dataframe(self, csv_path: Path) -> DataFrame:
"""
Parse NCES CSV into standardized DataFrame.
Args:
csv_path: Path to NCES CSV file
Returns:
Spark DataFrame with standardized schema
"""
# Define schema for NCES data
schema = StructType([
StructField("nces_id", StringType(), False),
StructField("district_name", StringType(), False),
StructField("state", StringType(), False),
StructField("state_fips", StringType(), True),
StructField("county_name", StringType(), True),
StructField("street_address", StringType(), True),
StructField("city", StringType(), True),
StructField("zip", StringType(), True),
StructField("phone", StringType(), True),
StructField("website", StringType(), True), # Some NCES files include this!
StructField("enrollment", IntegerType(), True),
StructField("district_type", StringType(), True), # Regular, Charter, etc.
])
# Read raw CSV
raw_df = self.spark.read.csv(
str(csv_path),
header=True,
inferSchema=False
)
# Map NCES column names to our schema
# NCES uses: LEAID, LEA_NAME, STATE_ABBR, LSTREET1, LCITY, LZIP, PHONE, WEBSITE
mapped_df = raw_df.select(
col("LEAID").alias("nces_id"),
col("LEA_NAME").alias("district_name"),
col("STATE_ABBR").alias("state"),
col("ST_FIPS").alias("state_fips"),
col("COUNTY_NAME").alias("county_name"),
col("LSTREET1").alias("street_address"),
col("LCITY").alias("city"),
col("LZIP").alias("zip"),
col("PHONE").alias("phone"),
col("WEBSITE").alias("website") if "WEBSITE" in raw_df.columns else col("LEAID").cast("string").alias("website"), # Placeholder if no website column
col("ENROLLMENT").cast("int").alias("enrollment") if "ENROLLMENT" in raw_df.columns else col("LEAID").cast("int").alias("enrollment"),
col("TYPE").alias("district_type") if "TYPE" in raw_df.columns else col("LEAID").cast("string").alias("district_type"),
)
# Clean and standardize
cleaned_df = mapped_df \
.withColumn("district_name", trim(col("district_name"))) \
.withColumn("state", trim(col("state"))) \
.withColumn("website", trim(lower(col("website")))) \
.withColumn("website", regexp_replace(col("website"), r"^https?://", "")) \
.withColumn("website", regexp_replace(col("website"), r"/$", "")) \
.filter(col("district_name").isNotNull())
logger.info(f"Parsed {cleaned_df.count()} school districts from NCES data")
return cleaned_df
def write_to_bronze_layer(self, df: DataFrame) -> None:
"""
Write NCES data to Bronze layer in Delta Lake.
Args:
df: NCES school district DataFrame
"""
output_path = f"{settings.delta_lake_path}/bronze/nces_school_districts"
df.write \
.format("delta") \
.mode("overwrite") \
.partitionBy("state") \
.option("overwriteSchema", "true") \
.save(output_path)
logger.info(f"Wrote NCES data to {output_path}")
async def ingest_school_districts(self) -> DataFrame:
"""
Complete ingestion pipeline for NCES school district data.
Returns:
DataFrame with school district information
"""
# Download data
csv_path = await self.download_nces_data()
# Parse to DataFrame
df = self.parse_csv_to_dataframe(csv_path)
# Write to Bronze layer
self.write_to_bronze_layer(df)
return df
async def main():
"""Test NCES ingestion."""
ingestion = NCESSchoolDistrictIngestion()
df = await ingestion.ingest_school_districts()
print("\n📊 NCES School District Sample:")
df.show(20, truncate=False)
print("\n📈 Statistics by State:")
df.groupBy("state").count().orderBy(col("count").desc()).show(10)
if __name__ == "__main__":
asyncio.run(main())