Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
File size: 8,059 Bytes
61d29fc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 | """
NCES School District Data Ingestion
Downloads and processes National Center for Education Statistics (NCES)
Common Core of Data (CCD) to get comprehensive school district information.
Data Source: https://nces.ed.gov/ccd/
Primary Dataset: Local Education Agency (School District) Universe Survey
This provides:
- School district names and locations
- Physical addresses and phone numbers
- NCES IDs for standardized identification
- Enrollment and demographic data
"""
import asyncio
import csv
import zipfile
from typing import List, Dict, Any, Optional
from datetime import datetime
from pathlib import Path
try:
import httpx
except ImportError:
httpx = None
from loguru import logger
try:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, trim, lower, regexp_replace
PYSPARK_AVAILABLE = True
except ImportError:
PYSPARK_AVAILABLE = False
SparkSession = None
DataFrame = None
from config import settings
class NCESSchoolDistrictIngestion:
"""Ingest NCES Common Core of Data for school districts."""
# NCES provides CSV/Text files for the Local Education Agency Universe Survey
# Updated annually - using 2023-24 school year data
NCES_CCD_URL = "https://nces.ed.gov/ccd/data/zip/ccd_lea_052_2324_l_1a_083023.csv"
# Alternative: Directory of school districts with contact info
NCES_DIRECTORY_URL = "https://nces.ed.gov/ccd/data/zip/ccd_lea_directory_2324.csv"
def __init__(self, spark: Optional[SparkSession] = None):
"""Initialize ingestion with Spark session."""
self.spark = spark or SparkSession.builder.appName("NCESIngestion").getOrCreate()
self.cache_dir = Path("data/cache/nces")
self.cache_dir.mkdir(parents=True, exist_ok=True)
async def download_nces_data(self) -> Path:
"""
Download NCES school district data.
Returns:
Path to downloaded CSV file
"""
cache_file = self.cache_dir / "nces_school_districts.csv"
# Cache for 30 days (NCES data updates annually)
if cache_file.exists():
age_days = (datetime.now() - datetime.fromtimestamp(cache_file.stat().st_mtime)).days
if age_days < 30:
logger.info(f"Using cached NCES data (age: {age_days} days)")
return cache_file
logger.info("Downloading NCES school district data...")
async with httpx.AsyncClient(timeout=60.0) as client:
try:
# Try primary directory file first (has website URLs)
response = await client.get(self.NCES_DIRECTORY_URL)
response.raise_for_status()
cache_file.write_bytes(response.content)
logger.info(f"Downloaded NCES data to {cache_file}")
return cache_file
except Exception as e:
logger.error(f"Failed to download NCES directory: {e}")
# Fall back to universe survey file
try:
response = await client.get(self.NCES_CCD_URL)
response.raise_for_status()
cache_file.write_bytes(response.content)
logger.info(f"Downloaded NCES universe data to {cache_file}")
return cache_file
except Exception as e2:
logger.error(f"Failed to download NCES universe data: {e2}")
raise
def parse_csv_to_dataframe(self, csv_path: Path) -> DataFrame:
"""
Parse NCES CSV into standardized DataFrame.
Args:
csv_path: Path to NCES CSV file
Returns:
Spark DataFrame with standardized schema
"""
# Define schema for NCES data
schema = StructType([
StructField("nces_id", StringType(), False),
StructField("district_name", StringType(), False),
StructField("state", StringType(), False),
StructField("state_fips", StringType(), True),
StructField("county_name", StringType(), True),
StructField("street_address", StringType(), True),
StructField("city", StringType(), True),
StructField("zip", StringType(), True),
StructField("phone", StringType(), True),
StructField("website", StringType(), True), # Some NCES files include this!
StructField("enrollment", IntegerType(), True),
StructField("district_type", StringType(), True), # Regular, Charter, etc.
])
# Read raw CSV
raw_df = self.spark.read.csv(
str(csv_path),
header=True,
inferSchema=False
)
# Map NCES column names to our schema
# NCES uses: LEAID, LEA_NAME, STATE_ABBR, LSTREET1, LCITY, LZIP, PHONE, WEBSITE
mapped_df = raw_df.select(
col("LEAID").alias("nces_id"),
col("LEA_NAME").alias("district_name"),
col("STATE_ABBR").alias("state"),
col("ST_FIPS").alias("state_fips"),
col("COUNTY_NAME").alias("county_name"),
col("LSTREET1").alias("street_address"),
col("LCITY").alias("city"),
col("LZIP").alias("zip"),
col("PHONE").alias("phone"),
col("WEBSITE").alias("website") if "WEBSITE" in raw_df.columns else col("LEAID").cast("string").alias("website"), # Placeholder if no website column
col("ENROLLMENT").cast("int").alias("enrollment") if "ENROLLMENT" in raw_df.columns else col("LEAID").cast("int").alias("enrollment"),
col("TYPE").alias("district_type") if "TYPE" in raw_df.columns else col("LEAID").cast("string").alias("district_type"),
)
# Clean and standardize
cleaned_df = mapped_df \
.withColumn("district_name", trim(col("district_name"))) \
.withColumn("state", trim(col("state"))) \
.withColumn("website", trim(lower(col("website")))) \
.withColumn("website", regexp_replace(col("website"), r"^https?://", "")) \
.withColumn("website", regexp_replace(col("website"), r"/$", "")) \
.filter(col("district_name").isNotNull())
logger.info(f"Parsed {cleaned_df.count()} school districts from NCES data")
return cleaned_df
def write_to_bronze_layer(self, df: DataFrame) -> None:
"""
Write NCES data to Bronze layer in Delta Lake.
Args:
df: NCES school district DataFrame
"""
output_path = f"{settings.delta_lake_path}/bronze/nces_school_districts"
df.write \
.format("delta") \
.mode("overwrite") \
.partitionBy("state") \
.option("overwriteSchema", "true") \
.save(output_path)
logger.info(f"Wrote NCES data to {output_path}")
async def ingest_school_districts(self) -> DataFrame:
"""
Complete ingestion pipeline for NCES school district data.
Returns:
DataFrame with school district information
"""
# Download data
csv_path = await self.download_nces_data()
# Parse to DataFrame
df = self.parse_csv_to_dataframe(csv_path)
# Write to Bronze layer
self.write_to_bronze_layer(df)
return df
async def main():
"""Test NCES ingestion."""
ingestion = NCESSchoolDistrictIngestion()
df = await ingestion.ingest_school_districts()
print("\n📊 NCES School District Sample:")
df.show(20, truncate=False)
print("\n📈 Statistics by State:")
df.groupBy("state").count().orderBy(col("count").desc()).show(10)
if __name__ == "__main__":
asyncio.run(main())
|