NU-KIOSK-API / backend /data /catalog.py
Monish BV
Add kiosk-api: stripped backend for speech integration
c2b7a7b
"""Lightweight in-memory catalog for loading CSV data into named tables."""
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Optional
import logging
from .sources import DataSource, default_sources
from .utils import (
canonicalize_name,
centers_for_faculty,
extract_leadership_names,
extract_advisor_names,
generate_name_variants,
tokenize_name,
)
@dataclass
class EntityRecords:
"""Container for a structured dataset plus helpful indices."""
name: str
records: List[Dict[str, Any]]
key_field: Optional[str] = None
origin: Optional[str] = None
_index: Dict[str, Dict[str, Any]] = field(default_factory=dict, init=False, repr=False)
def build_index(self, normalizer: Optional[Callable[[str], str]] = None) -> None:
if self.key_field is None:
return
normalizer = normalizer or canonicalize_name
for row in self.records:
key = row.get(self.key_field)
if not key:
continue
self._index[normalizer(str(key))] = row
def get_by_key(self, value: str, normalizer: Optional[Callable[[str], str]] = None) -> Optional[Dict[str, Any]]:
if not value or self.key_field is None:
return None
normalizer = normalizer or canonicalize_name
return self._index.get(normalizer(value))
class DataCatalog:
"""
Registry that mirrors Satyrn's Ring layer.
Each entity (faculty, staff, events...) is stored with metadata so
analysis blueprints can retrieve and join information deterministically.
"""
def __init__(self) -> None:
self.entities: Dict[str, EntityRecords] = {}
self.metadata: Dict[str, Dict[str, Any]] = {}
self.relationships: Dict[str, RelationshipDefinition] = {}
def register_entity(
self,
name: str,
records: Iterable[Dict[str, Any]],
*,
key_field: Optional[str] = None,
origin: Optional[str] = None,
normalizer: Optional[Callable[[str], str]] = None,
) -> None:
dataset = EntityRecords(
name=name,
records=list(records),
key_field=key_field,
origin=origin,
)
dataset.build_index(normalizer)
self.entities[name] = dataset
def get(self, name: str) -> EntityRecords:
if name not in self.entities:
raise KeyError(f"Unknown entity '{name}'")
return self.entities[name]
def try_get(self, name: str) -> Optional[EntityRecords]:
return self.entities.get(name)
# Relationship handling -------------------------------------------------
def register_relationship(self, relationship: RelationshipDefinition) -> None:
self.relationships[relationship.name] = relationship
def resolve_relationship(self, name: str, source_row: Dict[str, Any]) -> List[Dict[str, Any]]:
relationship = self.relationships.get(name)
if not relationship:
raise KeyError(f"Relationship '{name}' not registered.")
source_value = source_row.get(relationship.from_field)
if not source_value:
return []
target_entity = self.try_get(relationship.to_entity)
if not target_entity:
return []
candidates = relationship.variant_generator(source_value) if relationship.variant_generator else [source_value]
matches: List[Dict[str, Any]] = []
for candidate in candidates:
target_row = target_entity.get_by_key(candidate, relationship.normalizer)
if target_row:
matches.append(target_row)
if matches:
return matches
if relationship.allow_token_subset:
source_tokens = tokenize_name(source_value)
if source_tokens:
for row in target_entity.records:
dest_tokens = tokenize_name(row.get(relationship.to_field, ""))
if source_tokens.issubset(dest_tokens):
matches.append(row)
if matches:
return matches
if relationship.fallback_matcher:
extra = relationship.fallback_matcher(source_row, target_entity.records)
if extra:
matches.extend(extra)
return matches
@dataclass
class RelationshipDefinition:
name: str
from_entity: str
to_entity: str
from_field: str
to_field: str
normalizer: Callable[[str], str] = canonicalize_name
variant_generator: Optional[Callable[[str], Iterable[str]]] = lambda value: generate_name_variants(value)
allow_token_subset: bool = False
fallback_matcher: Optional[Callable[[Dict[str, Any], List[Dict[str, Any]]], List[Dict[str, Any]]]] = None
def load_default_catalog(
base_dir: Path | str = "Archive",
*,
sources: Optional[Iterable[DataSource]] = None,
) -> DataCatalog:
"""
Load the project CSVs into a catalog with sensible defaults.
This mirrors the datasets referenced in the exploratory notebook,
giving downstream code a single call to bootstrap the backend.
A custom ``sources`` iterable can be supplied to extend or override
the default data sources (e.g., to add TA office hours).
"""
base_path = Path(base_dir)
catalog = DataCatalog()
logger = logging.getLogger(__name__)
source_list = list(sources) if sources is not None else default_sources(base_path, name_normalizer=canonicalize_name)
for source in source_list:
result = source.load()
for entity in result.entities:
if not entity.records:
continue
catalog.register_entity(
entity.name,
entity.records,
key_field=entity.key_field,
origin=entity.origin,
normalizer=entity.normalizer,
)
# Log which file provided this entity so we can trace provenance
try:
origin_txt = entity.origin or "(unknown)"
except Exception:
origin_txt = "(unknown)"
logger.info("Loaded entity '%s' with %d records from %s", entity.name, len(entity.records), origin_txt)
for key, value in result.metadata.items():
existing = catalog.metadata.get(key)
if isinstance(existing, dict) and isinstance(value, dict):
existing.update(value)
else:
catalog.metadata[key] = value
# Register common relationships between entities to mirror Satyrn's Ring metadata.
if catalog.try_get("faculty") and catalog.try_get("faculty_offices"):
catalog.register_relationship(
RelationshipDefinition(
name="faculty_to_office",
from_entity="faculty",
to_entity="faculty_offices",
from_field="Name",
to_field="Assignee Name",
allow_token_subset=True,
)
)
if catalog.try_get("centers") and catalog.try_get("faculty"):
catalog.register_relationship(
RelationshipDefinition(
name="center_to_faculty_leads",
from_entity="centers",
to_entity="faculty",
from_field="Leadership",
to_field="Name",
normalizer=canonicalize_name,
variant_generator=lambda value: extract_leadership_names(value),
allow_token_subset=False,
)
)
catalog.register_relationship(
RelationshipDefinition(
name="faculty_to_centers",
from_entity="faculty",
to_entity="centers",
from_field="Name",
to_field="Leadership",
normalizer=canonicalize_name,
variant_generator=None,
allow_token_subset=False,
fallback_matcher=centers_for_faculty,
)
)
if catalog.try_get("students") and catalog.try_get("mudd_seating"):
catalog.register_relationship(
RelationshipDefinition(
name="student_to_mudd_seat",
from_entity="students",
to_entity="mudd_seating",
from_field="Name",
to_field="Student/Visitor",
allow_token_subset=True,
)
)
if catalog.try_get("students") and catalog.try_get("faculty"):
catalog.register_relationship(
RelationshipDefinition(
name="student_to_advisors",
from_entity="students",
to_entity="faculty",
from_field="Advisor(s)",
to_field="Name",
normalizer=canonicalize_name,
variant_generator=lambda value: extract_advisor_names(value),
allow_token_subset=False,
)
)
catalog.register_relationship(
RelationshipDefinition(
name="faculty_to_advisees",
from_entity="faculty",
to_entity="students",
from_field="Name",
to_field="Advisor(s)",
normalizer=canonicalize_name,
variant_generator=None,
allow_token_subset=False,
fallback_matcher=lambda source, students: [
row for row in students if canonicalize_name(source.get("Name", "")) in {
canonicalize_name(name) for name in extract_advisor_names(row.get("Advisor(s)"))
}
],
)
)
return catalog