Spaces:
Runtime error
Runtime error
| """ | |
| BPS School Finder - Database Query Interface | |
| ============================================== | |
| Runtime query class for the chatbot. Import this module: | |
| from database import BPSDatabase | |
| db = BPSDatabase() | |
| """ | |
| import sqlite3 | |
| import json | |
| import math | |
| from pathlib import Path | |
| # --------------------------------------------------------------------------- | |
| # Config | |
| # --------------------------------------------------------------------------- | |
| DB_PATH = Path(__file__).parent / "bps_schools.db" | |
| VECTOR_DIR = Path(__file__).parent / "vector_store" | |
| EMBEDDING_MODEL = "all-MiniLM-L6-v2" | |
| # --------------------------------------------------------------------------- | |
| # Helpers | |
| # --------------------------------------------------------------------------- | |
| def haversine_miles(lat1, lon1, lat2, lon2): | |
| """Calculate distance in miles between two lat/lon points.""" | |
| R = 3958.8 | |
| dlat = math.radians(lat2 - lat1) | |
| dlon = math.radians(lon2 - lon1) | |
| a = (math.sin(dlat / 2) ** 2 + | |
| math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * | |
| math.sin(dlon / 2) ** 2) | |
| return R * 2 * math.asin(math.sqrt(a)) | |
| # --------------------------------------------------------------------------- | |
| # Query Interface | |
| # --------------------------------------------------------------------------- | |
| class BPSDatabase: | |
| """Query interface for the chatbot.""" | |
| def __init__(self, db_path=None, vector_dir=None): | |
| self.db_path = db_path or DB_PATH | |
| self.vector_dir = vector_dir or VECTOR_DIR | |
| self.conn = sqlite3.connect(str(self.db_path)) | |
| self.conn.row_factory = sqlite3.Row | |
| self.index = None | |
| self.documents = None | |
| self.metadata = None | |
| self.model = None | |
| self._load_vector_store() | |
| def _load_vector_store(self): | |
| """Load FAISS index and metadata.""" | |
| try: | |
| meta_path = Path(self.vector_dir) / "metadata.json" | |
| docs_path = Path(self.vector_dir) / "documents.json" | |
| index_path = Path(self.vector_dir) / "school_index.faiss" | |
| if meta_path.exists(): | |
| with open(meta_path) as f: | |
| self.metadata = json.load(f) | |
| with open(docs_path) as f: | |
| self.documents = json.load(f) | |
| if index_path.exists(): | |
| import faiss | |
| self.index = faiss.read_index(str(index_path)) | |
| from sentence_transformers import SentenceTransformer | |
| self.model = SentenceTransformer(EMBEDDING_MODEL) | |
| print(f"Vector store loaded: {self.index.ntotal} schools indexed") | |
| except Exception as e: | |
| print(f"Vector store not fully loaded: {e}") | |
| # --- Hard Filtering (SQL) --- | |
| def find_schools_by_grade(self, grade: int) -> list: | |
| """Find BPS schools that serve a specific grade level. | |
| Grade encoding: K0=-2, K1=-1, K2=0, 1-12 as integers.""" | |
| cur = self.conn.execute( | |
| "SELECT * FROM schools WHERE grade_min <= ? AND grade_max >= ? ORDER BY school", | |
| (grade, grade), | |
| ) | |
| return [dict(r) for r in cur.fetchall()] | |
| def find_schools_by_age(self, age_months: int) -> list: | |
| """Find non-BPS schools that serve a specific age in months.""" | |
| cur = self.conn.execute( | |
| "SELECT * FROM schools WHERE age_min_months <= ? AND age_max_months >= ? ORDER BY school", | |
| (age_months, age_months), | |
| ) | |
| return [dict(r) for r in cur.fetchall()] | |
| def find_schools_near(self, lat: float, lon: float, radius_miles: float = 1.0) -> list: | |
| """Find schools within radius_miles of a given lat/lon.""" | |
| lat_delta = radius_miles / 69.0 | |
| lon_delta = radius_miles / 53.0 | |
| cur = self.conn.execute( | |
| """SELECT * FROM schools | |
| WHERE latitude BETWEEN ? AND ? | |
| AND longitude BETWEEN ? AND ?""", | |
| (lat - lat_delta, lat + lat_delta, lon - lon_delta, lon + lon_delta), | |
| ) | |
| results = [] | |
| for row in cur.fetchall(): | |
| d = dict(row) | |
| if d["latitude"] and d["longitude"]: | |
| dist = haversine_miles(lat, lon, d["latitude"], d["longitude"]) | |
| if dist <= radius_miles: | |
| d["distance_miles"] = round(dist, 2) | |
| results.append(d) | |
| results.sort(key=lambda x: x["distance_miles"]) | |
| return results | |
| def find_schools_by_provider_type(self, provider_type: str) -> list: | |
| """Find schools by provider type (exact match).""" | |
| cur = self.conn.execute( | |
| "SELECT * FROM schools WHERE provider_type = ? ORDER BY school", | |
| (provider_type,), | |
| ) | |
| return [dict(r) for r in cur.fetchall()] | |
| def find_schools_by_filters(self, **kwargs) -> list: | |
| """ | |
| Combined AND filter for boolean/integer fields. | |
| Supported kwargs: UPK, ADA, accepts_ccfa, headstart, has_language_program, | |
| has_advanced_placement, has_international_baccalaureate, uniform, | |
| special_admission, surround_care, build_care, tuition. | |
| """ | |
| allowed = { | |
| "UPK", "ADA", "accepts_ccfa", "headstart", "has_language_program", | |
| "has_advanced_placement", "has_international_baccalaureate", "uniform", | |
| "special_admission", "surround_care", "build_care", "tuition", | |
| } | |
| query = "SELECT * FROM schools WHERE 1=1" | |
| params = [] | |
| for key, val in kwargs.items(): | |
| if key in allowed and val is not None: | |
| query += f" AND {key} = ?" | |
| params.append(val) | |
| cur = self.conn.execute(query + " ORDER BY school", params) | |
| return [dict(r) for r in cur.fetchall()] | |
| def hard_filter(self, grade: int = None, age_months: int = None, | |
| provider_type: str = None, lat: float = None, lon: float = None, | |
| radius_miles: float = 1.0, **boolean_filters) -> list: | |
| """ | |
| Combined hard filter. All specified conditions must match (AND logic). | |
| """ | |
| query = "SELECT * FROM schools WHERE 1=1" | |
| params = [] | |
| if grade is not None: | |
| query += " AND grade_min <= ? AND grade_max >= ?" | |
| params.extend([grade, grade]) | |
| if age_months is not None: | |
| query += " AND age_min_months <= ? AND age_max_months >= ?" | |
| params.extend([age_months, age_months]) | |
| if provider_type: | |
| query += " AND provider_type = ?" | |
| params.append(provider_type) | |
| allowed_bools = { | |
| "UPK", "ADA", "accepts_ccfa", "headstart", "has_language_program", | |
| "has_advanced_placement", "has_international_baccalaureate", "uniform", | |
| "special_admission", "surround_care", "build_care", "tuition", | |
| } | |
| for key, val in boolean_filters.items(): | |
| if key in allowed_bools and val is not None: | |
| query += f" AND {key} = ?" | |
| params.append(val) | |
| cur = self.conn.execute(query + " ORDER BY school", params) | |
| results = [dict(r) for r in cur.fetchall()] | |
| # Post-filter by distance if location provided | |
| if lat is not None and lon is not None: | |
| filtered = [] | |
| for r in results: | |
| if r["latitude"] and r["longitude"]: | |
| dist = haversine_miles(lat, lon, r["latitude"], r["longitude"]) | |
| if dist <= radius_miles: | |
| r["distance_miles"] = round(dist, 2) | |
| filtered.append(r) | |
| results = sorted(filtered, key=lambda x: x["distance_miles"]) | |
| return results | |
| # --- Soft Filtering (Vector Search / RAG) --- | |
| def semantic_search(self, query: str, top_k: int = 10, | |
| pre_filter_ids: set = None) -> list: | |
| """ | |
| Semantic search over BPS school descriptions. | |
| Args: | |
| query: Natural language query | |
| top_k: Number of results to return | |
| pre_filter_ids: If provided, only search within these school IDs | |
| """ | |
| if self.index is None or self.model is None: | |
| return self._keyword_search(query, top_k, pre_filter_ids) | |
| import numpy as np | |
| query_vec = self.model.encode([query], normalize_embeddings=True) | |
| query_vec = np.array(query_vec).astype("float32") | |
| search_k = min(top_k * 5, self.index.ntotal) if pre_filter_ids else top_k | |
| scores, indices = self.index.search(query_vec, search_k) | |
| results = [] | |
| for score, idx in zip(scores[0], indices[0]): | |
| if idx < 0: | |
| continue | |
| meta = self.metadata[idx] | |
| if pre_filter_ids and meta["id"] not in pre_filter_ids: | |
| continue | |
| results.append({ | |
| "id": meta["id"], | |
| "school": meta["school"], | |
| "score": float(score), | |
| "description": self.documents[idx], | |
| "metadata": meta, | |
| }) | |
| if len(results) >= top_k: | |
| break | |
| return results | |
| def _keyword_search(self, query: str, top_k: int = 10, | |
| pre_filter_ids: set = None) -> list: | |
| """Fallback keyword search when FAISS/embeddings not available.""" | |
| if not self.documents: | |
| return [] | |
| query_words = set(query.lower().split()) | |
| scored = [] | |
| for i, (doc, meta) in enumerate(zip(self.documents, self.metadata)): | |
| if pre_filter_ids and meta["id"] not in pre_filter_ids: | |
| continue | |
| doc_lower = doc.lower() | |
| score = sum(1 for w in query_words if w in doc_lower) | |
| if score > 0: | |
| scored.append({ | |
| "id": meta["id"], | |
| "school": meta["school"], | |
| "score": score / len(query_words), | |
| "description": doc, | |
| "metadata": meta, | |
| }) | |
| scored.sort(key=lambda x: x["score"], reverse=True) | |
| return scored[:top_k] | |
| # --- Combined Hard + Soft Filter --- | |
| def search(self, query: str = None, grade: int = None, provider_type: str = None, | |
| lat: float = None, lon: float = None, radius_miles: float = 1.0, | |
| top_k: int = 10, **filters) -> list: | |
| """ | |
| Full search pipeline: | |
| 1. Apply hard filters (grade, provider_type, location, booleans) | |
| 2. Within hard-filtered results, rank by semantic similarity to query | |
| """ | |
| hard_results = self.hard_filter( | |
| grade=grade, provider_type=provider_type, | |
| lat=lat, lon=lon, radius_miles=radius_miles, **filters, | |
| ) | |
| if not query or not query.strip(): | |
| return hard_results[:top_k] | |
| eligible_ids = set(r["id"] for r in hard_results) | |
| if not eligible_ids: | |
| return self.semantic_search(query, top_k) | |
| soft_results = self.semantic_search(query, top_k, eligible_ids) | |
| dist_map = {r["id"]: r.get("distance_miles") for r in hard_results} | |
| for r in soft_results: | |
| if dist_map.get(r["id"]) is not None: | |
| r["distance_miles"] = dist_map[r["id"]] | |
| return soft_results | |
| # --- Filter Based on Preferences --- | |
| def filter_based_on_preferences(self, school_ids: list, **preferences) -> dict: | |
| """ | |
| Filter eligible school IDs by preferences. Returns results split by | |
| BPS vs non-BPS, with BPS schools semantically ranked when a query is provided. | |
| Args: | |
| school_ids: List of school IDs (from find_eligible_schools). Required, cannot be empty. | |
| **preferences: Optional filters (query, top_k, ADA, UPK, has_language_program, etc.) | |
| Returns: | |
| Dict with bps_schools, non_bps_schools, bps_count, non_bps_count, notes. | |
| """ | |
| if not school_ids: | |
| return { | |
| "error": "school_ids is required and cannot be empty. Call find_eligible_schools first to get eligible school IDs.", | |
| "bps_schools": [], | |
| "non_bps_schools": [], | |
| "notes": [], | |
| } | |
| query = preferences.pop("query", None) | |
| top_k = preferences.pop("top_k", 10) | |
| lat = preferences.pop("lat", None) | |
| lon = preferences.pop("lon", None) | |
| radius_miles = preferences.pop("radius_miles", 1.0) | |
| provider_type = preferences.pop("provider_type", None) | |
| # Build SQL with ID constraint + optional filters | |
| placeholders = ",".join("?" for _ in school_ids) | |
| sql = f"SELECT * FROM schools WHERE id IN ({placeholders})" | |
| params = list(school_ids) | |
| if provider_type: | |
| sql += " AND provider_type = ?" | |
| params.append(provider_type) | |
| allowed_bools = { | |
| "UPK", "ADA", "accepts_ccfa", "headstart", "has_language_program", | |
| "has_advanced_placement", "has_international_baccalaureate", "uniform", | |
| "special_admission", "surround_care", "build_care", "tuition", | |
| } | |
| for key, val in preferences.items(): | |
| if key in allowed_bools and val is not None: | |
| sql += f" AND {key} = ?" | |
| params.append(val) | |
| cur = self.conn.execute(sql + " ORDER BY school", params) | |
| rows = [dict(r) for r in cur.fetchall()] | |
| # Post-filter by distance | |
| if lat is not None and lon is not None: | |
| filtered = [] | |
| for r in rows: | |
| if r["latitude"] and r["longitude"]: | |
| dist = haversine_miles(lat, lon, r["latitude"], r["longitude"]) | |
| if dist <= radius_miles: | |
| r["distance_miles"] = round(dist, 2) | |
| filtered.append(r) | |
| rows = sorted(filtered, key=lambda x: x["distance_miles"]) | |
| # Split BPS vs non-BPS | |
| bps_rows = [r for r in rows if r["provider_type"] == "Boston Public School"] | |
| non_bps_rows = [r for r in rows if r["provider_type"] != "Boston Public School"] | |
| bps_count = len(bps_rows) | |
| non_bps_count = len(non_bps_rows) | |
| notes = [] | |
| # Semantic ranking for BPS if query provided | |
| if query and bps_rows: | |
| bps_ids = set(str(r["id"]) for r in bps_rows) | |
| ranked = self.semantic_search(query, top_k=len(bps_rows), pre_filter_ids=bps_ids) | |
| ranked_ids = [r["id"] for r in ranked] | |
| score_map = {r["id"]: r["score"] for r in ranked} | |
| # Build ranked BPS list: ranked first, then unranked | |
| bps_by_id = {str(r["id"]): r for r in bps_rows} | |
| ranked_bps = [] | |
| for rid in ranked_ids: | |
| if str(rid) in bps_by_id: | |
| row = bps_by_id.pop(str(rid)) | |
| row["score"] = score_map[rid] | |
| ranked_bps.append(row) | |
| # Append any BPS schools not in FAISS results | |
| for row in sorted(bps_by_id.values(), key=lambda x: x["school"]): | |
| ranked_bps.append(row) | |
| bps_rows = ranked_bps | |
| if query and non_bps_count > 0: | |
| notes.append( | |
| f"Non-BPS schools cannot be ranked by their semantic preference " | |
| f"'{query}' because detailed program descriptions are not publicly " | |
| f"available for these providers, tell this to the user." | |
| ) | |
| if bps_count == 0 and non_bps_count == 0: | |
| notes.append( | |
| "No schools matched the given filters. Tell the user to broaden " | |
| "their criteria or to reach out to their eligible schools for more information." | |
| ) | |
| return { | |
| "bps_schools": bps_rows[:top_k], | |
| "non_bps_schools": non_bps_rows[:top_k], | |
| "bps_count": bps_count, | |
| "non_bps_count": non_bps_count, | |
| "notes": notes, | |
| } | |
| # --- Utility methods --- | |
| def get_school_detail(self, school_id: str) -> dict: | |
| """Get full school record + RAG description if available.""" | |
| cur = self.conn.execute("SELECT * FROM schools WHERE id = ?", (school_id,)) | |
| row = cur.fetchone() | |
| if row: | |
| result = dict(row) | |
| if self.metadata: | |
| for i, m in enumerate(self.metadata): | |
| if m["id"] == school_id: | |
| result["description"] = self.documents[i] | |
| break | |
| return result | |
| return None | |
| def get_all_provider_types(self) -> list: | |
| """Get distinct provider types.""" | |
| cur = self.conn.execute( | |
| "SELECT DISTINCT provider_type FROM schools ORDER BY provider_type" | |
| ) | |
| return [r[0] for r in cur.fetchall()] | |
| def close(self): | |
| self.conn.close() | |