""" NCES School District Data Ingestion Downloads and processes National Center for Education Statistics (NCES) Common Core of Data (CCD) to get comprehensive school district information. Data Source: https://nces.ed.gov/ccd/ Primary Dataset: Local Education Agency (School District) Universe Survey This provides: - School district names and locations - Physical addresses and phone numbers - NCES IDs for standardized identification - Enrollment and demographic data """ import asyncio import csv import zipfile from typing import List, Dict, Any, Optional from datetime import datetime from pathlib import Path try: import httpx except ImportError: httpx = None from loguru import logger try: from pyspark.sql import SparkSession, DataFrame from pyspark.sql.types import StructType, StructField, StringType, IntegerType from pyspark.sql.functions import col, trim, lower, regexp_replace PYSPARK_AVAILABLE = True except ImportError: PYSPARK_AVAILABLE = False SparkSession = None DataFrame = None from config import settings class NCESSchoolDistrictIngestion: """Ingest NCES Common Core of Data for school districts.""" # NCES provides CSV/Text files for the Local Education Agency Universe Survey # Updated annually - using 2023-24 school year data NCES_CCD_URL = "https://nces.ed.gov/ccd/data/zip/ccd_lea_052_2324_l_1a_083023.csv" # Alternative: Directory of school districts with contact info NCES_DIRECTORY_URL = "https://nces.ed.gov/ccd/data/zip/ccd_lea_directory_2324.csv" def __init__(self, spark: Optional[SparkSession] = None): """Initialize ingestion with Spark session.""" self.spark = spark or SparkSession.builder.appName("NCESIngestion").getOrCreate() self.cache_dir = Path("data/cache/nces") self.cache_dir.mkdir(parents=True, exist_ok=True) async def download_nces_data(self) -> Path: """ Download NCES school district data. Returns: Path to downloaded CSV file """ cache_file = self.cache_dir / "nces_school_districts.csv" # Cache for 30 days (NCES data updates annually) if cache_file.exists(): age_days = (datetime.now() - datetime.fromtimestamp(cache_file.stat().st_mtime)).days if age_days < 30: logger.info(f"Using cached NCES data (age: {age_days} days)") return cache_file logger.info("Downloading NCES school district data...") async with httpx.AsyncClient(timeout=60.0) as client: try: # Try primary directory file first (has website URLs) response = await client.get(self.NCES_DIRECTORY_URL) response.raise_for_status() cache_file.write_bytes(response.content) logger.info(f"Downloaded NCES data to {cache_file}") return cache_file except Exception as e: logger.error(f"Failed to download NCES directory: {e}") # Fall back to universe survey file try: response = await client.get(self.NCES_CCD_URL) response.raise_for_status() cache_file.write_bytes(response.content) logger.info(f"Downloaded NCES universe data to {cache_file}") return cache_file except Exception as e2: logger.error(f"Failed to download NCES universe data: {e2}") raise def parse_csv_to_dataframe(self, csv_path: Path) -> DataFrame: """ Parse NCES CSV into standardized DataFrame. Args: csv_path: Path to NCES CSV file Returns: Spark DataFrame with standardized schema """ # Define schema for NCES data schema = StructType([ StructField("nces_id", StringType(), False), StructField("district_name", StringType(), False), StructField("state", StringType(), False), StructField("state_fips", StringType(), True), StructField("county_name", StringType(), True), StructField("street_address", StringType(), True), StructField("city", StringType(), True), StructField("zip", StringType(), True), StructField("phone", StringType(), True), StructField("website", StringType(), True), # Some NCES files include this! StructField("enrollment", IntegerType(), True), StructField("district_type", StringType(), True), # Regular, Charter, etc. ]) # Read raw CSV raw_df = self.spark.read.csv( str(csv_path), header=True, inferSchema=False ) # Map NCES column names to our schema # NCES uses: LEAID, LEA_NAME, STATE_ABBR, LSTREET1, LCITY, LZIP, PHONE, WEBSITE mapped_df = raw_df.select( col("LEAID").alias("nces_id"), col("LEA_NAME").alias("district_name"), col("STATE_ABBR").alias("state"), col("ST_FIPS").alias("state_fips"), col("COUNTY_NAME").alias("county_name"), col("LSTREET1").alias("street_address"), col("LCITY").alias("city"), col("LZIP").alias("zip"), col("PHONE").alias("phone"), col("WEBSITE").alias("website") if "WEBSITE" in raw_df.columns else col("LEAID").cast("string").alias("website"), # Placeholder if no website column col("ENROLLMENT").cast("int").alias("enrollment") if "ENROLLMENT" in raw_df.columns else col("LEAID").cast("int").alias("enrollment"), col("TYPE").alias("district_type") if "TYPE" in raw_df.columns else col("LEAID").cast("string").alias("district_type"), ) # Clean and standardize cleaned_df = mapped_df \ .withColumn("district_name", trim(col("district_name"))) \ .withColumn("state", trim(col("state"))) \ .withColumn("website", trim(lower(col("website")))) \ .withColumn("website", regexp_replace(col("website"), r"^https?://", "")) \ .withColumn("website", regexp_replace(col("website"), r"/$", "")) \ .filter(col("district_name").isNotNull()) logger.info(f"Parsed {cleaned_df.count()} school districts from NCES data") return cleaned_df def write_to_bronze_layer(self, df: DataFrame) -> None: """ Write NCES data to Bronze layer in Delta Lake. Args: df: NCES school district DataFrame """ output_path = f"{settings.delta_lake_path}/bronze/nces_school_districts" df.write \ .format("delta") \ .mode("overwrite") \ .partitionBy("state") \ .option("overwriteSchema", "true") \ .save(output_path) logger.info(f"Wrote NCES data to {output_path}") async def ingest_school_districts(self) -> DataFrame: """ Complete ingestion pipeline for NCES school district data. Returns: DataFrame with school district information """ # Download data csv_path = await self.download_nces_data() # Parse to DataFrame df = self.parse_csv_to_dataframe(csv_path) # Write to Bronze layer self.write_to_bronze_layer(df) return df async def main(): """Test NCES ingestion.""" ingestion = NCESSchoolDistrictIngestion() df = await ingestion.ingest_school_districts() print("\nšŸ“Š NCES School District Sample:") df.show(20, truncate=False) print("\nšŸ“ˆ Statistics by State:") df.groupBy("state").count().orderBy(col("count").desc()).show(10) if __name__ == "__main__": asyncio.run(main())