14c_chatbot / data /database.py
csong03
Initial Space upload with LFS-tracked binaries
9e118e4
"""
BPS School Finder - Database Query Interface
==============================================
Runtime query class for the chatbot. Import this module:
from database import BPSDatabase
db = BPSDatabase()
"""
import sqlite3
import json
import math
from pathlib import Path
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
DB_PATH = Path(__file__).parent / "bps_schools.db"
VECTOR_DIR = Path(__file__).parent / "vector_store"
EMBEDDING_MODEL = "all-MiniLM-L6-v2"
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def haversine_miles(lat1, lon1, lat2, lon2):
"""Calculate distance in miles between two lat/lon points."""
R = 3958.8
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = (math.sin(dlat / 2) ** 2 +
math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) *
math.sin(dlon / 2) ** 2)
return R * 2 * math.asin(math.sqrt(a))
# ---------------------------------------------------------------------------
# Query Interface
# ---------------------------------------------------------------------------
class BPSDatabase:
"""Query interface for the chatbot."""
def __init__(self, db_path=None, vector_dir=None):
self.db_path = db_path or DB_PATH
self.vector_dir = vector_dir or VECTOR_DIR
self.conn = sqlite3.connect(str(self.db_path))
self.conn.row_factory = sqlite3.Row
self.index = None
self.documents = None
self.metadata = None
self.model = None
self._load_vector_store()
def _load_vector_store(self):
"""Load FAISS index and metadata."""
try:
meta_path = Path(self.vector_dir) / "metadata.json"
docs_path = Path(self.vector_dir) / "documents.json"
index_path = Path(self.vector_dir) / "school_index.faiss"
if meta_path.exists():
with open(meta_path) as f:
self.metadata = json.load(f)
with open(docs_path) as f:
self.documents = json.load(f)
if index_path.exists():
import faiss
self.index = faiss.read_index(str(index_path))
from sentence_transformers import SentenceTransformer
self.model = SentenceTransformer(EMBEDDING_MODEL)
print(f"Vector store loaded: {self.index.ntotal} schools indexed")
except Exception as e:
print(f"Vector store not fully loaded: {e}")
# --- Hard Filtering (SQL) ---
def find_schools_by_grade(self, grade: int) -> list:
"""Find BPS schools that serve a specific grade level.
Grade encoding: K0=-2, K1=-1, K2=0, 1-12 as integers."""
cur = self.conn.execute(
"SELECT * FROM schools WHERE grade_min <= ? AND grade_max >= ? ORDER BY school",
(grade, grade),
)
return [dict(r) for r in cur.fetchall()]
def find_schools_by_age(self, age_months: int) -> list:
"""Find non-BPS schools that serve a specific age in months."""
cur = self.conn.execute(
"SELECT * FROM schools WHERE age_min_months <= ? AND age_max_months >= ? ORDER BY school",
(age_months, age_months),
)
return [dict(r) for r in cur.fetchall()]
def find_schools_near(self, lat: float, lon: float, radius_miles: float = 1.0) -> list:
"""Find schools within radius_miles of a given lat/lon."""
lat_delta = radius_miles / 69.0
lon_delta = radius_miles / 53.0
cur = self.conn.execute(
"""SELECT * FROM schools
WHERE latitude BETWEEN ? AND ?
AND longitude BETWEEN ? AND ?""",
(lat - lat_delta, lat + lat_delta, lon - lon_delta, lon + lon_delta),
)
results = []
for row in cur.fetchall():
d = dict(row)
if d["latitude"] and d["longitude"]:
dist = haversine_miles(lat, lon, d["latitude"], d["longitude"])
if dist <= radius_miles:
d["distance_miles"] = round(dist, 2)
results.append(d)
results.sort(key=lambda x: x["distance_miles"])
return results
def find_schools_by_provider_type(self, provider_type: str) -> list:
"""Find schools by provider type (exact match)."""
cur = self.conn.execute(
"SELECT * FROM schools WHERE provider_type = ? ORDER BY school",
(provider_type,),
)
return [dict(r) for r in cur.fetchall()]
def find_schools_by_filters(self, **kwargs) -> list:
"""
Combined AND filter for boolean/integer fields.
Supported kwargs: UPK, ADA, accepts_ccfa, headstart, has_language_program,
has_advanced_placement, has_international_baccalaureate, uniform,
special_admission, surround_care, build_care, tuition.
"""
allowed = {
"UPK", "ADA", "accepts_ccfa", "headstart", "has_language_program",
"has_advanced_placement", "has_international_baccalaureate", "uniform",
"special_admission", "surround_care", "build_care", "tuition",
}
query = "SELECT * FROM schools WHERE 1=1"
params = []
for key, val in kwargs.items():
if key in allowed and val is not None:
query += f" AND {key} = ?"
params.append(val)
cur = self.conn.execute(query + " ORDER BY school", params)
return [dict(r) for r in cur.fetchall()]
def hard_filter(self, grade: int = None, age_months: int = None,
provider_type: str = None, lat: float = None, lon: float = None,
radius_miles: float = 1.0, **boolean_filters) -> list:
"""
Combined hard filter. All specified conditions must match (AND logic).
"""
query = "SELECT * FROM schools WHERE 1=1"
params = []
if grade is not None:
query += " AND grade_min <= ? AND grade_max >= ?"
params.extend([grade, grade])
if age_months is not None:
query += " AND age_min_months <= ? AND age_max_months >= ?"
params.extend([age_months, age_months])
if provider_type:
query += " AND provider_type = ?"
params.append(provider_type)
allowed_bools = {
"UPK", "ADA", "accepts_ccfa", "headstart", "has_language_program",
"has_advanced_placement", "has_international_baccalaureate", "uniform",
"special_admission", "surround_care", "build_care", "tuition",
}
for key, val in boolean_filters.items():
if key in allowed_bools and val is not None:
query += f" AND {key} = ?"
params.append(val)
cur = self.conn.execute(query + " ORDER BY school", params)
results = [dict(r) for r in cur.fetchall()]
# Post-filter by distance if location provided
if lat is not None and lon is not None:
filtered = []
for r in results:
if r["latitude"] and r["longitude"]:
dist = haversine_miles(lat, lon, r["latitude"], r["longitude"])
if dist <= radius_miles:
r["distance_miles"] = round(dist, 2)
filtered.append(r)
results = sorted(filtered, key=lambda x: x["distance_miles"])
return results
# --- Soft Filtering (Vector Search / RAG) ---
def semantic_search(self, query: str, top_k: int = 10,
pre_filter_ids: set = None) -> list:
"""
Semantic search over BPS school descriptions.
Args:
query: Natural language query
top_k: Number of results to return
pre_filter_ids: If provided, only search within these school IDs
"""
if self.index is None or self.model is None:
return self._keyword_search(query, top_k, pre_filter_ids)
import numpy as np
query_vec = self.model.encode([query], normalize_embeddings=True)
query_vec = np.array(query_vec).astype("float32")
search_k = min(top_k * 5, self.index.ntotal) if pre_filter_ids else top_k
scores, indices = self.index.search(query_vec, search_k)
results = []
for score, idx in zip(scores[0], indices[0]):
if idx < 0:
continue
meta = self.metadata[idx]
if pre_filter_ids and meta["id"] not in pre_filter_ids:
continue
results.append({
"id": meta["id"],
"school": meta["school"],
"score": float(score),
"description": self.documents[idx],
"metadata": meta,
})
if len(results) >= top_k:
break
return results
def _keyword_search(self, query: str, top_k: int = 10,
pre_filter_ids: set = None) -> list:
"""Fallback keyword search when FAISS/embeddings not available."""
if not self.documents:
return []
query_words = set(query.lower().split())
scored = []
for i, (doc, meta) in enumerate(zip(self.documents, self.metadata)):
if pre_filter_ids and meta["id"] not in pre_filter_ids:
continue
doc_lower = doc.lower()
score = sum(1 for w in query_words if w in doc_lower)
if score > 0:
scored.append({
"id": meta["id"],
"school": meta["school"],
"score": score / len(query_words),
"description": doc,
"metadata": meta,
})
scored.sort(key=lambda x: x["score"], reverse=True)
return scored[:top_k]
# --- Combined Hard + Soft Filter ---
def search(self, query: str = None, grade: int = None, provider_type: str = None,
lat: float = None, lon: float = None, radius_miles: float = 1.0,
top_k: int = 10, **filters) -> list:
"""
Full search pipeline:
1. Apply hard filters (grade, provider_type, location, booleans)
2. Within hard-filtered results, rank by semantic similarity to query
"""
hard_results = self.hard_filter(
grade=grade, provider_type=provider_type,
lat=lat, lon=lon, radius_miles=radius_miles, **filters,
)
if not query or not query.strip():
return hard_results[:top_k]
eligible_ids = set(r["id"] for r in hard_results)
if not eligible_ids:
return self.semantic_search(query, top_k)
soft_results = self.semantic_search(query, top_k, eligible_ids)
dist_map = {r["id"]: r.get("distance_miles") for r in hard_results}
for r in soft_results:
if dist_map.get(r["id"]) is not None:
r["distance_miles"] = dist_map[r["id"]]
return soft_results
# --- Filter Based on Preferences ---
def filter_based_on_preferences(self, school_ids: list, **preferences) -> dict:
"""
Filter eligible school IDs by preferences. Returns results split by
BPS vs non-BPS, with BPS schools semantically ranked when a query is provided.
Args:
school_ids: List of school IDs (from find_eligible_schools). Required, cannot be empty.
**preferences: Optional filters (query, top_k, ADA, UPK, has_language_program, etc.)
Returns:
Dict with bps_schools, non_bps_schools, bps_count, non_bps_count, notes.
"""
if not school_ids:
return {
"error": "school_ids is required and cannot be empty. Call find_eligible_schools first to get eligible school IDs.",
"bps_schools": [],
"non_bps_schools": [],
"notes": [],
}
query = preferences.pop("query", None)
top_k = preferences.pop("top_k", 10)
lat = preferences.pop("lat", None)
lon = preferences.pop("lon", None)
radius_miles = preferences.pop("radius_miles", 1.0)
provider_type = preferences.pop("provider_type", None)
# Build SQL with ID constraint + optional filters
placeholders = ",".join("?" for _ in school_ids)
sql = f"SELECT * FROM schools WHERE id IN ({placeholders})"
params = list(school_ids)
if provider_type:
sql += " AND provider_type = ?"
params.append(provider_type)
allowed_bools = {
"UPK", "ADA", "accepts_ccfa", "headstart", "has_language_program",
"has_advanced_placement", "has_international_baccalaureate", "uniform",
"special_admission", "surround_care", "build_care", "tuition",
}
for key, val in preferences.items():
if key in allowed_bools and val is not None:
sql += f" AND {key} = ?"
params.append(val)
cur = self.conn.execute(sql + " ORDER BY school", params)
rows = [dict(r) for r in cur.fetchall()]
# Post-filter by distance
if lat is not None and lon is not None:
filtered = []
for r in rows:
if r["latitude"] and r["longitude"]:
dist = haversine_miles(lat, lon, r["latitude"], r["longitude"])
if dist <= radius_miles:
r["distance_miles"] = round(dist, 2)
filtered.append(r)
rows = sorted(filtered, key=lambda x: x["distance_miles"])
# Split BPS vs non-BPS
bps_rows = [r for r in rows if r["provider_type"] == "Boston Public School"]
non_bps_rows = [r for r in rows if r["provider_type"] != "Boston Public School"]
bps_count = len(bps_rows)
non_bps_count = len(non_bps_rows)
notes = []
# Semantic ranking for BPS if query provided
if query and bps_rows:
bps_ids = set(str(r["id"]) for r in bps_rows)
ranked = self.semantic_search(query, top_k=len(bps_rows), pre_filter_ids=bps_ids)
ranked_ids = [r["id"] for r in ranked]
score_map = {r["id"]: r["score"] for r in ranked}
# Build ranked BPS list: ranked first, then unranked
bps_by_id = {str(r["id"]): r for r in bps_rows}
ranked_bps = []
for rid in ranked_ids:
if str(rid) in bps_by_id:
row = bps_by_id.pop(str(rid))
row["score"] = score_map[rid]
ranked_bps.append(row)
# Append any BPS schools not in FAISS results
for row in sorted(bps_by_id.values(), key=lambda x: x["school"]):
ranked_bps.append(row)
bps_rows = ranked_bps
if query and non_bps_count > 0:
notes.append(
f"Non-BPS schools cannot be ranked by their semantic preference "
f"'{query}' because detailed program descriptions are not publicly "
f"available for these providers, tell this to the user."
)
if bps_count == 0 and non_bps_count == 0:
notes.append(
"No schools matched the given filters. Tell the user to broaden "
"their criteria or to reach out to their eligible schools for more information."
)
return {
"bps_schools": bps_rows[:top_k],
"non_bps_schools": non_bps_rows[:top_k],
"bps_count": bps_count,
"non_bps_count": non_bps_count,
"notes": notes,
}
# --- Utility methods ---
def get_school_detail(self, school_id: str) -> dict:
"""Get full school record + RAG description if available."""
cur = self.conn.execute("SELECT * FROM schools WHERE id = ?", (school_id,))
row = cur.fetchone()
if row:
result = dict(row)
if self.metadata:
for i, m in enumerate(self.metadata):
if m["id"] == school_id:
result["description"] = self.documents[i]
break
return result
return None
def get_all_provider_types(self) -> list:
"""Get distinct provider types."""
cur = self.conn.execute(
"SELECT DISTINCT provider_type FROM schools ORDER BY provider_type"
)
return [r[0] for r in cur.fetchall()]
def close(self):
self.conn.close()