#!/usr/bin/env python3 """ BPS School Finder - Database & Vector Store Builder ==================================================== Builds from choice_tool_raw.json (1,027 schools: 111 BPS + 916 non-BPS): 1. SQLite database (bps_schools.db) for hard filtering 2. FAISS vector index + metadata JSON for soft filtering via RAG (BPS only) Usage: python build_database.py # Build both DB and vector store python build_database.py --db-only # Build only SQLite python build_database.py --vector-only # Build only vector store Requirements: pip install faiss-cpu sentence-transformers numpy """ import sqlite3 import json import math import re import argparse from pathlib import Path # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- RAW_JSON = Path(__file__).parent / "raw_data" / "choice_tool_raw.json" DB_PATH = Path(__file__).parent / "bps_schools.db" VECTOR_DIR = Path(__file__).parent / "vector_store" EMBEDDING_MODEL = "all-MiniLM-L6-v2" GRADE_MAP = {"K0": -2, "K1": -1, "K2": 0} for i in range(1, 13): GRADE_MAP[str(i)] = i IS_BPS = "Boston Public School" # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def load_raw_data() -> list: with open(RAW_JSON, encoding="utf-8") as f: return json.load(f) def safe_float(val): """Convert to float or return None.""" if val is None or val == "": return None try: return float(val) except (ValueError, TypeError): return None def strip_or_empty(val): """Return stripped string or empty string.""" if val is None: return "" if isinstance(val, str): return val.strip() return str(val).strip() def bool_yes(val) -> int: """1 if val is 'Yes'/'yes', 0 otherwise.""" return 1 if strip_or_empty(val).lower() == "yes" else 0 def bool_yes_no_null(val): """1 if yes, 0 if no, None if empty.""" v = strip_or_empty(val).lower() if v == "yes": return 1 if v == "no": return 0 return None def bool_nonempty(val) -> int: """1 if val is non-empty string, 0 otherwise.""" return 1 if strip_or_empty(val) else 0 def list_has_content(val) -> int: """1 if list contains at least one non-empty string, 0 otherwise.""" if not isinstance(val, list): return bool_nonempty(val) return 1 if any(strip_or_empty(v) for v in val) else 0 def join_list(val) -> str: """Join a list field, filtering out empty strings.""" if not isinstance(val, list): return strip_or_empty(val) items = [strip_or_empty(v) for v in val if strip_or_empty(v)] return "; ".join(items) def clean_text(val) -> str: """Strip text and handle #VALUE! as empty.""" t = strip_or_empty(val) if t == "#VALUE!": return "" return t def haversine_miles(lat1, lon1, lat2, lon2): """Calculate distance in miles between two lat/lon points.""" R = 3958.8 dlat = math.radians(lat2 - lat1) dlon = math.radians(lon2 - lon1) a = (math.sin(dlat / 2) ** 2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon / 2) ** 2) return R * 2 * math.asin(math.sqrt(a)) # --------------------------------------------------------------------------- # Grade / age parsing # --------------------------------------------------------------------------- def parse_grade_token(token: str) -> int: """Parse a single grade token like 'K0', 'K1', 'K2', '7' into an integer.""" token = token.strip().upper() if token in GRADE_MAP: return GRADE_MAP[token] try: return int(token) except ValueError: return None def parse_bps_grade_span(grade_span: str): """ Parse BPS grade_span into (grade_min, grade_max, grade_min_sped). Examples: 'K1 - 6; (K0 for special education)' -> (-1, 6, -2) 'K0 - 6' -> (-2, 6, None) '7-12' -> (7, 12, None) 'K0, K2 - 6' -> (-2, 6, None) '12' -> (12, 12, None) """ gs = strip_or_empty(grade_span) if not gs: return None, None, None grade_min_sped = None # Extract special education extension sped_match = re.search(r'\((\w+)\s+for special education\)', gs) if sped_match: grade_min_sped = parse_grade_token(sped_match.group(1)) gs = gs[:sped_match.start()].strip().rstrip(";").strip() # Handle "K0, K2 - 6" pattern: take min of first token and range start if "," in gs: parts = gs.split(",") first_token = parse_grade_token(parts[0].strip()) # Parse the range part range_part = parts[-1].strip() if "-" in range_part: lo_str, hi_str = range_part.split("-", 1) lo = parse_grade_token(lo_str) hi = parse_grade_token(hi_str) grade_min = min(g for g in [first_token, lo] if g is not None) return grade_min, hi, grade_min_sped else: tokens = [parse_grade_token(p.strip()) for p in parts] tokens = [t for t in tokens if t is not None] return min(tokens), max(tokens), grade_min_sped # Handle range "K1 - 6" or "7-12" # Normalize spaces around dash range_match = re.match(r'(\w+)\s*-\s*(\w+)', gs) if range_match: lo = parse_grade_token(range_match.group(1)) hi = parse_grade_token(range_match.group(2)) return lo, hi, grade_min_sped # Single grade "12" g = parse_grade_token(gs) if g is not None: return g, g, grade_min_sped return None, None, None def parse_age_span(grade_span: str): """ Parse non-BPS grade_span into (age_min_months, age_max_months). Examples: '0yr - 5yr' -> (0, 60) '15 months - 5yr' -> (15, 60) '0.2yr - 5yr' -> (2, 60) '5yr' -> (60, 60) '3yr- 4yr' -> (36, 48) '0yr - 2.9yr' -> (0, 35) """ gs = strip_or_empty(grade_span) if not gs: return None, None def age_token_to_months(token: str) -> int: token = token.strip() # "15 months" or "15months" m = re.match(r'([\d.]+)\s*months?', token, re.IGNORECASE) if m: return int(float(m.group(1))) # "0.2yr" or "5yr" or "0yr" m = re.match(r'([\d.]+)\s*yr', token, re.IGNORECASE) if m: val = float(m.group(1)) months = round(val * 12) return months return None # Try range: "Xyr - Yyr" (with possibly missing space around dash) range_match = re.match(r'(.+?)\s*-\s*(.+)', gs) if range_match: lo = age_token_to_months(range_match.group(1)) hi = age_token_to_months(range_match.group(2)) return lo, hi # Single value: "5yr" val = age_token_to_months(gs) if val is not None: return val, val return None, None # --------------------------------------------------------------------------- # has_language_program logic # --------------------------------------------------------------------------- def compute_has_language_program(school: dict) -> int: """ BPS: 1 if language_programming_text is non-empty. Non-BPS: 1 if ANY of dual_language (not "No"), language_programming_filter="Yes", or language_programming_text non-empty. """ is_bps = school.get("provider_type") == IS_BPS lpt = strip_or_empty(school.get("language_programming_text")) if is_bps: return 1 if lpt else 0 dl = strip_or_empty(school.get("dual_language")) lpf = strip_or_empty(school.get("language_programming_filter")) if lpt: return 1 if dl and dl.lower() != "no": return 1 if lpf.lower() == "yes": return 1 return 0 # --------------------------------------------------------------------------- # IB / AP text extraction # --------------------------------------------------------------------------- def extract_ib_text(val: str) -> str: """Return IB text description, empty string if just 'Yes' or empty.""" v = strip_or_empty(val) if not v or v.lower() == "yes": return "" return v def extract_ap_text(val: str) -> str: v = strip_or_empty(val) if not v or v.lower() == "yes": return "" return v # --------------------------------------------------------------------------- # 1. Build SQLite Database # --------------------------------------------------------------------------- SCHEMA = """ CREATE TABLE schools ( id TEXT PRIMARY KEY, school TEXT NOT NULL, dba TEXT, address TEXT, latitude REAL, longitude REAL, provider_type TEXT, grade_min INTEGER, grade_max INTEGER, grade_min_sped INTEGER, age_min_months INTEGER, age_max_months INTEGER, hours_of_operation TEXT, phone_number TEXT, email TEXT, website TEXT, surround_care INTEGER, curriculum TEXT, tuition INTEGER, headstart INTEGER, accepts_ccfa INTEGER, has_language_program INTEGER, has_international_baccalaureate INTEGER, international_baccalaureate_text TEXT, has_advanced_placement INTEGER, advanced_placement_text TEXT, uniform INTEGER, UPK INTEGER, ADA INTEGER, special_admission INTEGER, special_admission_link TEXT, school_quality_framework TEXT, state_report_card TEXT, point_of_contact TEXT, school_leader TEXT, build_care INTEGER, description TEXT ); CREATE INDEX idx_provider_type ON schools(provider_type); CREATE INDEX idx_grades ON schools(grade_min, grade_max); CREATE INDEX idx_age ON schools(age_min_months, age_max_months); CREATE INDEX idx_latlon ON schools(latitude, longitude); """ INSERT_SQL = """ INSERT INTO schools VALUES ( :id, :school, :dba, :address, :latitude, :longitude, :provider_type, :grade_min, :grade_max, :grade_min_sped, :age_min_months, :age_max_months, :hours_of_operation, :phone_number, :email, :website, :surround_care, :curriculum, :tuition, :headstart, :accepts_ccfa, :has_language_program, :has_international_baccalaureate, :international_baccalaureate_text, :has_advanced_placement, :advanced_placement_text, :uniform, :UPK, :ADA, :special_admission, :special_admission_link, :school_quality_framework, :state_report_card, :point_of_contact, :school_leader, :build_care, :description ) """ def transform_school(s: dict) -> dict: """Transform a raw school record into a database row dict.""" is_bps = s.get("provider_type") == IS_BPS # Grade / age parsing grade_min = grade_max = grade_min_sped = None age_min = age_max = None if is_bps: grade_min, grade_max, grade_min_sped = parse_bps_grade_span(s.get("grade_span", "")) else: age_min, age_max = parse_age_span(s.get("grade_span", "")) ib_raw = strip_or_empty(s.get("international_baccalaureate")) ap_raw = strip_or_empty(s.get("advanced_placement")) return { "id": strip_or_empty(s.get("id")), "school": strip_or_empty(s.get("school")), "dba": strip_or_empty(s.get("DBA")), "address": strip_or_empty(s.get("address")), "latitude": safe_float(s.get("latitude")), "longitude": safe_float(s.get("longitude")), "provider_type": strip_or_empty(s.get("provider_type")), "grade_min": grade_min, "grade_max": grade_max, "grade_min_sped": grade_min_sped, "age_min_months": age_min, "age_max_months": age_max, "hours_of_operation": strip_or_empty(s.get("hours_of_operation")), "phone_number": strip_or_empty(s.get("phone_number")), "email": strip_or_empty(s.get("email")), "website": strip_or_empty(s.get("website")), "surround_care": list_has_content(s.get("surround_care")), "curriculum": strip_or_empty(s.get("curriculum")), "tuition": bool_yes_no_null(s.get("tuition")), "headstart": bool_yes(s.get("headstart")), "accepts_ccfa": bool_yes_no_null(s.get("accepts_ccfa")), "has_language_program": compute_has_language_program(s), "has_international_baccalaureate": 1 if ib_raw else 0, "international_baccalaureate_text": extract_ib_text(ib_raw), "has_advanced_placement": 1 if ap_raw else 0, "advanced_placement_text": extract_ap_text(ap_raw), "uniform": bool_yes_no_null(s.get("uniform")), "UPK": bool_yes(s.get("UPK")), "ADA": bool_yes_no_null(s.get("ADA")), "special_admission": bool_yes(s.get("special_admission_filter")), "special_admission_link": strip_or_empty(s.get("special_admission_link")), "school_quality_framework": strip_or_empty(s.get("school_quality_framework")), "state_report_card": strip_or_empty(s.get("state_report_card")), "point_of_contact": strip_or_empty(s.get("point_of_contact")), "school_leader": strip_or_empty(s.get("school_leader")), "build_care": bool_yes(s.get("BuildCare")), "description": build_description(s) if is_bps else None, } def build_sqlite(): """Build the SQLite database from choice_tool_raw.json.""" print("=" * 60) print("Building SQLite Database") print("=" * 60) data = load_raw_data() DB_PATH.parent.mkdir(parents=True, exist_ok=True) if DB_PATH.exists(): DB_PATH.unlink() conn = sqlite3.connect(str(DB_PATH)) cur = conn.cursor() cur.executescript(SCHEMA) seen_ids = set() for s in data: sid = strip_or_empty(s.get("id")) if sid in seen_ids: continue seen_ids.add(sid) row = transform_school(s) cur.execute(INSERT_SQL, row) conn.commit() # Summary cur.execute("SELECT COUNT(*) FROM schools") total = cur.fetchone()[0] cur.execute("SELECT provider_type, COUNT(*) FROM schools GROUP BY provider_type ORDER BY COUNT(*) DESC") type_counts = cur.fetchall() cur.execute("SELECT COUNT(*) FROM schools WHERE grade_min IS NOT NULL") graded = cur.fetchone()[0] cur.execute("SELECT COUNT(*) FROM schools WHERE age_min_months IS NOT NULL") aged = cur.fetchone()[0] print(f"\nInserted {total} schools") print(f" With grade_min/grade_max (BPS): {graded}") print(f" With age_min/age_max (non-BPS): {aged}") print(f"\nBy provider_type:") for t, c in type_counts: print(f" {t:45s}: {c}") conn.close() print(f"\nDatabase saved to: {DB_PATH}") return total # --------------------------------------------------------------------------- # 2. Build Vector Store (BPS only) # --------------------------------------------------------------------------- def grade_span_human_readable(grade_min, grade_max, grade_min_sped): """Convert integer grades back to human-readable string.""" def g(v): if v == -2: return "K0" if v == -1: return "K1" if v == 0: return "K2" return str(v) if grade_min is None or grade_max is None: return "unknown grades" base = f"{g(grade_min)} to {g(grade_max)}" if grade_min_sped is not None: base += f" ({g(grade_min_sped)} for special education)" return base def build_description(s: dict) -> str: """Build a RAG description for a BPS school.""" grade_min, grade_max, grade_min_sped = parse_bps_grade_span(s.get("grade_span", "")) grades_hr = grade_span_human_readable(grade_min, grade_max, grade_min_sped) parts = [ f"{strip_or_empty(s.get('school'))} is a Boston Public School located at " f"{strip_or_empty(s.get('address'))}, serving grades {grades_hr}." ] overview = clean_text(s.get("overview_mission_statement")) if overview: parts.append(overview) unique = join_list(s.get("unique_features")) if unique: parts.append(unique) field_sections = [ ("specialized_education_programs", "Specialized education programs"), ("language_programming_text", "Language programs"), ("early_college_dual_enrollment", "Early college and dual enrollment"), ("CTE_Pathways_TXT", "Career and technical education pathways"), ("after_school_program", "After school"), ("before_school_program", "Before school"), ("extra_curriculars_text", "Extracurriculars"), ] for field, label in field_sections: val = clean_text(s.get(field)) if val: parts.append(f"{label}: {val}") # List fields sports = join_list(s.get("sports")) if sports: parts.append(f"Sports: {sports}") partners = join_list(s.get("partners")) if partners: parts.append(f"Partners: {partners}") ada_desc = clean_text(s.get("ada_description")) if ada_desc: parts.append(f"Accessibility: {ada_desc}") family = clean_text(s.get("family_engagement_opportunities")) if family: parts.append(f"Family engagement: {family}") return "\n\n".join(parts) def build_vector_store(): """Build FAISS vector index and metadata from BPS school descriptions.""" print("\n" + "=" * 60) print("Building Vector Store (BPS only)") print("=" * 60) VECTOR_DIR.mkdir(parents=True, exist_ok=True) data = load_raw_data() bps_schools = [s for s in data if s.get("provider_type") == IS_BPS] documents = [] metadata = [] for s in bps_schools: desc = build_description(s) documents.append(desc) grade_min, grade_max, _ = parse_bps_grade_span(s.get("grade_span", "")) metadata.append({ "id": strip_or_empty(s.get("id")), "school": strip_or_empty(s.get("school")), "provider_type": IS_BPS, "grade_min": grade_min, "grade_max": grade_max, "latitude": safe_float(s.get("latitude")), "longitude": safe_float(s.get("longitude")), }) print(f"Generated {len(documents)} BPS school descriptions") # Save documents and metadata docs_path = VECTOR_DIR / "documents.json" meta_path = VECTOR_DIR / "metadata.json" with open(docs_path, "w") as f: json.dump(documents, f, indent=2) with open(meta_path, "w") as f: json.dump(metadata, f, indent=2) print(f"Saved documents to: {docs_path}") print(f"Saved metadata to: {meta_path}") # Build FAISS index try: import numpy as np import faiss from sentence_transformers import SentenceTransformer print(f"\nLoading embedding model: {EMBEDDING_MODEL}") model = SentenceTransformer(EMBEDDING_MODEL) print("Encoding school descriptions...") embeddings = model.encode(documents, show_progress_bar=True, normalize_embeddings=True) embeddings = np.array(embeddings).astype("float32") print(f"Embedding shape: {embeddings.shape}") dim = embeddings.shape[1] index = faiss.IndexFlatIP(dim) index.add(embeddings) index_path = VECTOR_DIR / "school_index.faiss" faiss.write_index(index, str(index_path)) print(f"FAISS index saved to: {index_path}") print(f"Index contains {index.ntotal} vectors of dimension {dim}") except ImportError as e: print(f"\nCould not build FAISS index: {e}") print(" Documents and metadata saved. Install deps to build index:") print(" pip install sentence-transformers faiss-cpu") # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- if __name__ == "__main__": parser = argparse.ArgumentParser(description="Build school database and vector store") parser.add_argument("--db-only", action="store_true", help="Build only SQLite database") parser.add_argument("--vector-only", action="store_true", help="Build only vector store") args = parser.parse_args() if args.db_only: build_sqlite() elif args.vector_only: build_vector_store() else: build_sqlite() build_vector_store() # Quick test print("\n" + "=" * 60) print("Quick Test") print("=" * 60) from database import BPSDatabase db = BPSDatabase() print("\n--- BPS schools serving grade 9 ---") results = db.hard_filter(grade=9) for r in results[:5]: print(f" {r['school']} (grades {r['grade_min']} to {r['grade_max']})") print(f" ... {len(results)} total") print("\n--- Non-BPS schools serving 3-year-olds (36 months) ---") results = db.find_schools_by_age(36) print(f" {len(results)} schools found") for r in results[:3]: print(f" {r['school']} ({r['provider_type']})") print("\n--- Family Child Care + accepts CCFA ---") results = db.hard_filter(provider_type="Family Child Care", accepts_ccfa=1) print(f" {len(results)} schools found") print("\n--- Schools within 1 mile of Copley Square ---") results = db.find_schools_near(42.3496, -71.0778, 1.0) for r in results[:5]: print(f" {r['school']} ({r['provider_type']}) - {r['distance_miles']} mi") print("\n--- Semantic search: 'arts and music programs' ---") results = db.semantic_search("arts and music programs", top_k=5) for r in results: print(f" {r['school']} (score: {r['score']:.3f})") db.close() print("\nDone!")