14c_chatbot / data /build_database.py
csong03
Initial Space upload with LFS-tracked binaries
9e118e4
#!/usr/bin/env python3
"""
BPS School Finder - Database & Vector Store Builder
====================================================
Builds from choice_tool_raw.json (1,027 schools: 111 BPS + 916 non-BPS):
1. SQLite database (bps_schools.db) for hard filtering
2. FAISS vector index + metadata JSON for soft filtering via RAG (BPS only)
Usage:
python build_database.py # Build both DB and vector store
python build_database.py --db-only # Build only SQLite
python build_database.py --vector-only # Build only vector store
Requirements:
pip install faiss-cpu sentence-transformers numpy
"""
import sqlite3
import json
import math
import re
import argparse
from pathlib import Path
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
RAW_JSON = Path(__file__).parent / "raw_data" / "choice_tool_raw.json"
DB_PATH = Path(__file__).parent / "bps_schools.db"
VECTOR_DIR = Path(__file__).parent / "vector_store"
EMBEDDING_MODEL = "all-MiniLM-L6-v2"
GRADE_MAP = {"K0": -2, "K1": -1, "K2": 0}
for i in range(1, 13):
GRADE_MAP[str(i)] = i
IS_BPS = "Boston Public School"
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def load_raw_data() -> list:
with open(RAW_JSON, encoding="utf-8") as f:
return json.load(f)
def safe_float(val):
"""Convert to float or return None."""
if val is None or val == "":
return None
try:
return float(val)
except (ValueError, TypeError):
return None
def strip_or_empty(val):
"""Return stripped string or empty string."""
if val is None:
return ""
if isinstance(val, str):
return val.strip()
return str(val).strip()
def bool_yes(val) -> int:
"""1 if val is 'Yes'/'yes', 0 otherwise."""
return 1 if strip_or_empty(val).lower() == "yes" else 0
def bool_yes_no_null(val):
"""1 if yes, 0 if no, None if empty."""
v = strip_or_empty(val).lower()
if v == "yes":
return 1
if v == "no":
return 0
return None
def bool_nonempty(val) -> int:
"""1 if val is non-empty string, 0 otherwise."""
return 1 if strip_or_empty(val) else 0
def list_has_content(val) -> int:
"""1 if list contains at least one non-empty string, 0 otherwise."""
if not isinstance(val, list):
return bool_nonempty(val)
return 1 if any(strip_or_empty(v) for v in val) else 0
def join_list(val) -> str:
"""Join a list field, filtering out empty strings."""
if not isinstance(val, list):
return strip_or_empty(val)
items = [strip_or_empty(v) for v in val if strip_or_empty(v)]
return "; ".join(items)
def clean_text(val) -> str:
"""Strip text and handle #VALUE! as empty."""
t = strip_or_empty(val)
if t == "#VALUE!":
return ""
return t
def haversine_miles(lat1, lon1, lat2, lon2):
"""Calculate distance in miles between two lat/lon points."""
R = 3958.8
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = (math.sin(dlat / 2) ** 2 +
math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) *
math.sin(dlon / 2) ** 2)
return R * 2 * math.asin(math.sqrt(a))
# ---------------------------------------------------------------------------
# Grade / age parsing
# ---------------------------------------------------------------------------
def parse_grade_token(token: str) -> int:
"""Parse a single grade token like 'K0', 'K1', 'K2', '7' into an integer."""
token = token.strip().upper()
if token in GRADE_MAP:
return GRADE_MAP[token]
try:
return int(token)
except ValueError:
return None
def parse_bps_grade_span(grade_span: str):
"""
Parse BPS grade_span into (grade_min, grade_max, grade_min_sped).
Examples:
'K1 - 6; (K0 for special education)' -> (-1, 6, -2)
'K0 - 6' -> (-2, 6, None)
'7-12' -> (7, 12, None)
'K0, K2 - 6' -> (-2, 6, None)
'12' -> (12, 12, None)
"""
gs = strip_or_empty(grade_span)
if not gs:
return None, None, None
grade_min_sped = None
# Extract special education extension
sped_match = re.search(r'\((\w+)\s+for special education\)', gs)
if sped_match:
grade_min_sped = parse_grade_token(sped_match.group(1))
gs = gs[:sped_match.start()].strip().rstrip(";").strip()
# Handle "K0, K2 - 6" pattern: take min of first token and range start
if "," in gs:
parts = gs.split(",")
first_token = parse_grade_token(parts[0].strip())
# Parse the range part
range_part = parts[-1].strip()
if "-" in range_part:
lo_str, hi_str = range_part.split("-", 1)
lo = parse_grade_token(lo_str)
hi = parse_grade_token(hi_str)
grade_min = min(g for g in [first_token, lo] if g is not None)
return grade_min, hi, grade_min_sped
else:
tokens = [parse_grade_token(p.strip()) for p in parts]
tokens = [t for t in tokens if t is not None]
return min(tokens), max(tokens), grade_min_sped
# Handle range "K1 - 6" or "7-12"
# Normalize spaces around dash
range_match = re.match(r'(\w+)\s*-\s*(\w+)', gs)
if range_match:
lo = parse_grade_token(range_match.group(1))
hi = parse_grade_token(range_match.group(2))
return lo, hi, grade_min_sped
# Single grade "12"
g = parse_grade_token(gs)
if g is not None:
return g, g, grade_min_sped
return None, None, None
def parse_age_span(grade_span: str):
"""
Parse non-BPS grade_span into (age_min_months, age_max_months).
Examples:
'0yr - 5yr' -> (0, 60)
'15 months - 5yr' -> (15, 60)
'0.2yr - 5yr' -> (2, 60)
'5yr' -> (60, 60)
'3yr- 4yr' -> (36, 48)
'0yr - 2.9yr' -> (0, 35)
"""
gs = strip_or_empty(grade_span)
if not gs:
return None, None
def age_token_to_months(token: str) -> int:
token = token.strip()
# "15 months" or "15months"
m = re.match(r'([\d.]+)\s*months?', token, re.IGNORECASE)
if m:
return int(float(m.group(1)))
# "0.2yr" or "5yr" or "0yr"
m = re.match(r'([\d.]+)\s*yr', token, re.IGNORECASE)
if m:
val = float(m.group(1))
months = round(val * 12)
return months
return None
# Try range: "Xyr - Yyr" (with possibly missing space around dash)
range_match = re.match(r'(.+?)\s*-\s*(.+)', gs)
if range_match:
lo = age_token_to_months(range_match.group(1))
hi = age_token_to_months(range_match.group(2))
return lo, hi
# Single value: "5yr"
val = age_token_to_months(gs)
if val is not None:
return val, val
return None, None
# ---------------------------------------------------------------------------
# has_language_program logic
# ---------------------------------------------------------------------------
def compute_has_language_program(school: dict) -> int:
"""
BPS: 1 if language_programming_text is non-empty.
Non-BPS: 1 if ANY of dual_language (not "No"), language_programming_filter="Yes",
or language_programming_text non-empty.
"""
is_bps = school.get("provider_type") == IS_BPS
lpt = strip_or_empty(school.get("language_programming_text"))
if is_bps:
return 1 if lpt else 0
dl = strip_or_empty(school.get("dual_language"))
lpf = strip_or_empty(school.get("language_programming_filter"))
if lpt:
return 1
if dl and dl.lower() != "no":
return 1
if lpf.lower() == "yes":
return 1
return 0
# ---------------------------------------------------------------------------
# IB / AP text extraction
# ---------------------------------------------------------------------------
def extract_ib_text(val: str) -> str:
"""Return IB text description, empty string if just 'Yes' or empty."""
v = strip_or_empty(val)
if not v or v.lower() == "yes":
return ""
return v
def extract_ap_text(val: str) -> str:
v = strip_or_empty(val)
if not v or v.lower() == "yes":
return ""
return v
# ---------------------------------------------------------------------------
# 1. Build SQLite Database
# ---------------------------------------------------------------------------
SCHEMA = """
CREATE TABLE schools (
id TEXT PRIMARY KEY,
school TEXT NOT NULL,
dba TEXT,
address TEXT,
latitude REAL,
longitude REAL,
provider_type TEXT,
grade_min INTEGER,
grade_max INTEGER,
grade_min_sped INTEGER,
age_min_months INTEGER,
age_max_months INTEGER,
hours_of_operation TEXT,
phone_number TEXT,
email TEXT,
website TEXT,
surround_care INTEGER,
curriculum TEXT,
tuition INTEGER,
headstart INTEGER,
accepts_ccfa INTEGER,
has_language_program INTEGER,
has_international_baccalaureate INTEGER,
international_baccalaureate_text TEXT,
has_advanced_placement INTEGER,
advanced_placement_text TEXT,
uniform INTEGER,
UPK INTEGER,
ADA INTEGER,
special_admission INTEGER,
special_admission_link TEXT,
school_quality_framework TEXT,
state_report_card TEXT,
point_of_contact TEXT,
school_leader TEXT,
build_care INTEGER,
description TEXT
);
CREATE INDEX idx_provider_type ON schools(provider_type);
CREATE INDEX idx_grades ON schools(grade_min, grade_max);
CREATE INDEX idx_age ON schools(age_min_months, age_max_months);
CREATE INDEX idx_latlon ON schools(latitude, longitude);
"""
INSERT_SQL = """
INSERT INTO schools VALUES (
:id, :school, :dba, :address, :latitude, :longitude, :provider_type,
:grade_min, :grade_max, :grade_min_sped,
:age_min_months, :age_max_months,
:hours_of_operation, :phone_number, :email, :website,
:surround_care, :curriculum, :tuition, :headstart, :accepts_ccfa,
:has_language_program,
:has_international_baccalaureate, :international_baccalaureate_text,
:has_advanced_placement, :advanced_placement_text,
:uniform, :UPK, :ADA,
:special_admission, :special_admission_link,
:school_quality_framework, :state_report_card,
:point_of_contact, :school_leader, :build_care, :description
)
"""
def transform_school(s: dict) -> dict:
"""Transform a raw school record into a database row dict."""
is_bps = s.get("provider_type") == IS_BPS
# Grade / age parsing
grade_min = grade_max = grade_min_sped = None
age_min = age_max = None
if is_bps:
grade_min, grade_max, grade_min_sped = parse_bps_grade_span(s.get("grade_span", ""))
else:
age_min, age_max = parse_age_span(s.get("grade_span", ""))
ib_raw = strip_or_empty(s.get("international_baccalaureate"))
ap_raw = strip_or_empty(s.get("advanced_placement"))
return {
"id": strip_or_empty(s.get("id")),
"school": strip_or_empty(s.get("school")),
"dba": strip_or_empty(s.get("DBA")),
"address": strip_or_empty(s.get("address")),
"latitude": safe_float(s.get("latitude")),
"longitude": safe_float(s.get("longitude")),
"provider_type": strip_or_empty(s.get("provider_type")),
"grade_min": grade_min,
"grade_max": grade_max,
"grade_min_sped": grade_min_sped,
"age_min_months": age_min,
"age_max_months": age_max,
"hours_of_operation": strip_or_empty(s.get("hours_of_operation")),
"phone_number": strip_or_empty(s.get("phone_number")),
"email": strip_or_empty(s.get("email")),
"website": strip_or_empty(s.get("website")),
"surround_care": list_has_content(s.get("surround_care")),
"curriculum": strip_or_empty(s.get("curriculum")),
"tuition": bool_yes_no_null(s.get("tuition")),
"headstart": bool_yes(s.get("headstart")),
"accepts_ccfa": bool_yes_no_null(s.get("accepts_ccfa")),
"has_language_program": compute_has_language_program(s),
"has_international_baccalaureate": 1 if ib_raw else 0,
"international_baccalaureate_text": extract_ib_text(ib_raw),
"has_advanced_placement": 1 if ap_raw else 0,
"advanced_placement_text": extract_ap_text(ap_raw),
"uniform": bool_yes_no_null(s.get("uniform")),
"UPK": bool_yes(s.get("UPK")),
"ADA": bool_yes_no_null(s.get("ADA")),
"special_admission": bool_yes(s.get("special_admission_filter")),
"special_admission_link": strip_or_empty(s.get("special_admission_link")),
"school_quality_framework": strip_or_empty(s.get("school_quality_framework")),
"state_report_card": strip_or_empty(s.get("state_report_card")),
"point_of_contact": strip_or_empty(s.get("point_of_contact")),
"school_leader": strip_or_empty(s.get("school_leader")),
"build_care": bool_yes(s.get("BuildCare")),
"description": build_description(s) if is_bps else None,
}
def build_sqlite():
"""Build the SQLite database from choice_tool_raw.json."""
print("=" * 60)
print("Building SQLite Database")
print("=" * 60)
data = load_raw_data()
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
if DB_PATH.exists():
DB_PATH.unlink()
conn = sqlite3.connect(str(DB_PATH))
cur = conn.cursor()
cur.executescript(SCHEMA)
seen_ids = set()
for s in data:
sid = strip_or_empty(s.get("id"))
if sid in seen_ids:
continue
seen_ids.add(sid)
row = transform_school(s)
cur.execute(INSERT_SQL, row)
conn.commit()
# Summary
cur.execute("SELECT COUNT(*) FROM schools")
total = cur.fetchone()[0]
cur.execute("SELECT provider_type, COUNT(*) FROM schools GROUP BY provider_type ORDER BY COUNT(*) DESC")
type_counts = cur.fetchall()
cur.execute("SELECT COUNT(*) FROM schools WHERE grade_min IS NOT NULL")
graded = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM schools WHERE age_min_months IS NOT NULL")
aged = cur.fetchone()[0]
print(f"\nInserted {total} schools")
print(f" With grade_min/grade_max (BPS): {graded}")
print(f" With age_min/age_max (non-BPS): {aged}")
print(f"\nBy provider_type:")
for t, c in type_counts:
print(f" {t:45s}: {c}")
conn.close()
print(f"\nDatabase saved to: {DB_PATH}")
return total
# ---------------------------------------------------------------------------
# 2. Build Vector Store (BPS only)
# ---------------------------------------------------------------------------
def grade_span_human_readable(grade_min, grade_max, grade_min_sped):
"""Convert integer grades back to human-readable string."""
def g(v):
if v == -2: return "K0"
if v == -1: return "K1"
if v == 0: return "K2"
return str(v)
if grade_min is None or grade_max is None:
return "unknown grades"
base = f"{g(grade_min)} to {g(grade_max)}"
if grade_min_sped is not None:
base += f" ({g(grade_min_sped)} for special education)"
return base
def build_description(s: dict) -> str:
"""Build a RAG description for a BPS school."""
grade_min, grade_max, grade_min_sped = parse_bps_grade_span(s.get("grade_span", ""))
grades_hr = grade_span_human_readable(grade_min, grade_max, grade_min_sped)
parts = [
f"{strip_or_empty(s.get('school'))} is a Boston Public School located at "
f"{strip_or_empty(s.get('address'))}, serving grades {grades_hr}."
]
overview = clean_text(s.get("overview_mission_statement"))
if overview:
parts.append(overview)
unique = join_list(s.get("unique_features"))
if unique:
parts.append(unique)
field_sections = [
("specialized_education_programs", "Specialized education programs"),
("language_programming_text", "Language programs"),
("early_college_dual_enrollment", "Early college and dual enrollment"),
("CTE_Pathways_TXT", "Career and technical education pathways"),
("after_school_program", "After school"),
("before_school_program", "Before school"),
("extra_curriculars_text", "Extracurriculars"),
]
for field, label in field_sections:
val = clean_text(s.get(field))
if val:
parts.append(f"{label}: {val}")
# List fields
sports = join_list(s.get("sports"))
if sports:
parts.append(f"Sports: {sports}")
partners = join_list(s.get("partners"))
if partners:
parts.append(f"Partners: {partners}")
ada_desc = clean_text(s.get("ada_description"))
if ada_desc:
parts.append(f"Accessibility: {ada_desc}")
family = clean_text(s.get("family_engagement_opportunities"))
if family:
parts.append(f"Family engagement: {family}")
return "\n\n".join(parts)
def build_vector_store():
"""Build FAISS vector index and metadata from BPS school descriptions."""
print("\n" + "=" * 60)
print("Building Vector Store (BPS only)")
print("=" * 60)
VECTOR_DIR.mkdir(parents=True, exist_ok=True)
data = load_raw_data()
bps_schools = [s for s in data if s.get("provider_type") == IS_BPS]
documents = []
metadata = []
for s in bps_schools:
desc = build_description(s)
documents.append(desc)
grade_min, grade_max, _ = parse_bps_grade_span(s.get("grade_span", ""))
metadata.append({
"id": strip_or_empty(s.get("id")),
"school": strip_or_empty(s.get("school")),
"provider_type": IS_BPS,
"grade_min": grade_min,
"grade_max": grade_max,
"latitude": safe_float(s.get("latitude")),
"longitude": safe_float(s.get("longitude")),
})
print(f"Generated {len(documents)} BPS school descriptions")
# Save documents and metadata
docs_path = VECTOR_DIR / "documents.json"
meta_path = VECTOR_DIR / "metadata.json"
with open(docs_path, "w") as f:
json.dump(documents, f, indent=2)
with open(meta_path, "w") as f:
json.dump(metadata, f, indent=2)
print(f"Saved documents to: {docs_path}")
print(f"Saved metadata to: {meta_path}")
# Build FAISS index
try:
import numpy as np
import faiss
from sentence_transformers import SentenceTransformer
print(f"\nLoading embedding model: {EMBEDDING_MODEL}")
model = SentenceTransformer(EMBEDDING_MODEL)
print("Encoding school descriptions...")
embeddings = model.encode(documents, show_progress_bar=True, normalize_embeddings=True)
embeddings = np.array(embeddings).astype("float32")
print(f"Embedding shape: {embeddings.shape}")
dim = embeddings.shape[1]
index = faiss.IndexFlatIP(dim)
index.add(embeddings)
index_path = VECTOR_DIR / "school_index.faiss"
faiss.write_index(index, str(index_path))
print(f"FAISS index saved to: {index_path}")
print(f"Index contains {index.ntotal} vectors of dimension {dim}")
except ImportError as e:
print(f"\nCould not build FAISS index: {e}")
print(" Documents and metadata saved. Install deps to build index:")
print(" pip install sentence-transformers faiss-cpu")
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Build school database and vector store")
parser.add_argument("--db-only", action="store_true", help="Build only SQLite database")
parser.add_argument("--vector-only", action="store_true", help="Build only vector store")
args = parser.parse_args()
if args.db_only:
build_sqlite()
elif args.vector_only:
build_vector_store()
else:
build_sqlite()
build_vector_store()
# Quick test
print("\n" + "=" * 60)
print("Quick Test")
print("=" * 60)
from database import BPSDatabase
db = BPSDatabase()
print("\n--- BPS schools serving grade 9 ---")
results = db.hard_filter(grade=9)
for r in results[:5]:
print(f" {r['school']} (grades {r['grade_min']} to {r['grade_max']})")
print(f" ... {len(results)} total")
print("\n--- Non-BPS schools serving 3-year-olds (36 months) ---")
results = db.find_schools_by_age(36)
print(f" {len(results)} schools found")
for r in results[:3]:
print(f" {r['school']} ({r['provider_type']})")
print("\n--- Family Child Care + accepts CCFA ---")
results = db.hard_filter(provider_type="Family Child Care", accepts_ccfa=1)
print(f" {len(results)} schools found")
print("\n--- Schools within 1 mile of Copley Square ---")
results = db.find_schools_near(42.3496, -71.0778, 1.0)
for r in results[:5]:
print(f" {r['school']} ({r['provider_type']}) - {r['distance_miles']} mi")
print("\n--- Semantic search: 'arts and music programs' ---")
results = db.semantic_search("arts and music programs", top_k=5)
for r in results:
print(f" {r['school']} (score: {r['score']:.3f})")
db.close()
print("\nDone!")