Spaces:
Sleeping
Sleeping
| """Lightweight in-memory catalog for loading CSV data into named tables.""" | |
| from __future__ import annotations | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from typing import Any, Callable, Dict, Iterable, List, Optional | |
| import logging | |
| from .sources import DataSource, default_sources | |
| from .utils import ( | |
| canonicalize_name, | |
| centers_for_faculty, | |
| extract_leadership_names, | |
| extract_advisor_names, | |
| generate_name_variants, | |
| tokenize_name, | |
| ) | |
| class EntityRecords: | |
| """Container for a structured dataset plus helpful indices.""" | |
| name: str | |
| records: List[Dict[str, Any]] | |
| key_field: Optional[str] = None | |
| origin: Optional[str] = None | |
| _index: Dict[str, Dict[str, Any]] = field(default_factory=dict, init=False, repr=False) | |
| def build_index(self, normalizer: Optional[Callable[[str], str]] = None) -> None: | |
| if self.key_field is None: | |
| return | |
| normalizer = normalizer or canonicalize_name | |
| for row in self.records: | |
| key = row.get(self.key_field) | |
| if not key: | |
| continue | |
| self._index[normalizer(str(key))] = row | |
| def get_by_key(self, value: str, normalizer: Optional[Callable[[str], str]] = None) -> Optional[Dict[str, Any]]: | |
| if not value or self.key_field is None: | |
| return None | |
| normalizer = normalizer or canonicalize_name | |
| return self._index.get(normalizer(value)) | |
| class DataCatalog: | |
| """ | |
| Registry that mirrors Satyrn's Ring layer. | |
| Each entity (faculty, staff, events...) is stored with metadata so | |
| analysis blueprints can retrieve and join information deterministically. | |
| """ | |
| def __init__(self) -> None: | |
| self.entities: Dict[str, EntityRecords] = {} | |
| self.metadata: Dict[str, Dict[str, Any]] = {} | |
| self.relationships: Dict[str, RelationshipDefinition] = {} | |
| def register_entity( | |
| self, | |
| name: str, | |
| records: Iterable[Dict[str, Any]], | |
| *, | |
| key_field: Optional[str] = None, | |
| origin: Optional[str] = None, | |
| normalizer: Optional[Callable[[str], str]] = None, | |
| ) -> None: | |
| dataset = EntityRecords( | |
| name=name, | |
| records=list(records), | |
| key_field=key_field, | |
| origin=origin, | |
| ) | |
| dataset.build_index(normalizer) | |
| self.entities[name] = dataset | |
| def get(self, name: str) -> EntityRecords: | |
| if name not in self.entities: | |
| raise KeyError(f"Unknown entity '{name}'") | |
| return self.entities[name] | |
| def try_get(self, name: str) -> Optional[EntityRecords]: | |
| return self.entities.get(name) | |
| # Relationship handling ------------------------------------------------- | |
| def register_relationship(self, relationship: RelationshipDefinition) -> None: | |
| self.relationships[relationship.name] = relationship | |
| def resolve_relationship(self, name: str, source_row: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| relationship = self.relationships.get(name) | |
| if not relationship: | |
| raise KeyError(f"Relationship '{name}' not registered.") | |
| source_value = source_row.get(relationship.from_field) | |
| if not source_value: | |
| return [] | |
| target_entity = self.try_get(relationship.to_entity) | |
| if not target_entity: | |
| return [] | |
| candidates = relationship.variant_generator(source_value) if relationship.variant_generator else [source_value] | |
| matches: List[Dict[str, Any]] = [] | |
| for candidate in candidates: | |
| target_row = target_entity.get_by_key(candidate, relationship.normalizer) | |
| if target_row: | |
| matches.append(target_row) | |
| if matches: | |
| return matches | |
| if relationship.allow_token_subset: | |
| source_tokens = tokenize_name(source_value) | |
| if source_tokens: | |
| for row in target_entity.records: | |
| dest_tokens = tokenize_name(row.get(relationship.to_field, "")) | |
| if source_tokens.issubset(dest_tokens): | |
| matches.append(row) | |
| if matches: | |
| return matches | |
| if relationship.fallback_matcher: | |
| extra = relationship.fallback_matcher(source_row, target_entity.records) | |
| if extra: | |
| matches.extend(extra) | |
| return matches | |
| class RelationshipDefinition: | |
| name: str | |
| from_entity: str | |
| to_entity: str | |
| from_field: str | |
| to_field: str | |
| normalizer: Callable[[str], str] = canonicalize_name | |
| variant_generator: Optional[Callable[[str], Iterable[str]]] = lambda value: generate_name_variants(value) | |
| allow_token_subset: bool = False | |
| fallback_matcher: Optional[Callable[[Dict[str, Any], List[Dict[str, Any]]], List[Dict[str, Any]]]] = None | |
| def load_default_catalog( | |
| base_dir: Path | str = "Archive", | |
| *, | |
| sources: Optional[Iterable[DataSource]] = None, | |
| ) -> DataCatalog: | |
| """ | |
| Load the project CSVs into a catalog with sensible defaults. | |
| This mirrors the datasets referenced in the exploratory notebook, | |
| giving downstream code a single call to bootstrap the backend. | |
| A custom ``sources`` iterable can be supplied to extend or override | |
| the default data sources (e.g., to add TA office hours). | |
| """ | |
| base_path = Path(base_dir) | |
| catalog = DataCatalog() | |
| logger = logging.getLogger(__name__) | |
| source_list = list(sources) if sources is not None else default_sources(base_path, name_normalizer=canonicalize_name) | |
| for source in source_list: | |
| result = source.load() | |
| for entity in result.entities: | |
| if not entity.records: | |
| continue | |
| catalog.register_entity( | |
| entity.name, | |
| entity.records, | |
| key_field=entity.key_field, | |
| origin=entity.origin, | |
| normalizer=entity.normalizer, | |
| ) | |
| # Log which file provided this entity so we can trace provenance | |
| try: | |
| origin_txt = entity.origin or "(unknown)" | |
| except Exception: | |
| origin_txt = "(unknown)" | |
| logger.info("Loaded entity '%s' with %d records from %s", entity.name, len(entity.records), origin_txt) | |
| for key, value in result.metadata.items(): | |
| existing = catalog.metadata.get(key) | |
| if isinstance(existing, dict) and isinstance(value, dict): | |
| existing.update(value) | |
| else: | |
| catalog.metadata[key] = value | |
| # Register common relationships between entities to mirror Satyrn's Ring metadata. | |
| if catalog.try_get("faculty") and catalog.try_get("faculty_offices"): | |
| catalog.register_relationship( | |
| RelationshipDefinition( | |
| name="faculty_to_office", | |
| from_entity="faculty", | |
| to_entity="faculty_offices", | |
| from_field="Name", | |
| to_field="Assignee Name", | |
| allow_token_subset=True, | |
| ) | |
| ) | |
| if catalog.try_get("centers") and catalog.try_get("faculty"): | |
| catalog.register_relationship( | |
| RelationshipDefinition( | |
| name="center_to_faculty_leads", | |
| from_entity="centers", | |
| to_entity="faculty", | |
| from_field="Leadership", | |
| to_field="Name", | |
| normalizer=canonicalize_name, | |
| variant_generator=lambda value: extract_leadership_names(value), | |
| allow_token_subset=False, | |
| ) | |
| ) | |
| catalog.register_relationship( | |
| RelationshipDefinition( | |
| name="faculty_to_centers", | |
| from_entity="faculty", | |
| to_entity="centers", | |
| from_field="Name", | |
| to_field="Leadership", | |
| normalizer=canonicalize_name, | |
| variant_generator=None, | |
| allow_token_subset=False, | |
| fallback_matcher=centers_for_faculty, | |
| ) | |
| ) | |
| if catalog.try_get("students") and catalog.try_get("mudd_seating"): | |
| catalog.register_relationship( | |
| RelationshipDefinition( | |
| name="student_to_mudd_seat", | |
| from_entity="students", | |
| to_entity="mudd_seating", | |
| from_field="Name", | |
| to_field="Student/Visitor", | |
| allow_token_subset=True, | |
| ) | |
| ) | |
| if catalog.try_get("students") and catalog.try_get("faculty"): | |
| catalog.register_relationship( | |
| RelationshipDefinition( | |
| name="student_to_advisors", | |
| from_entity="students", | |
| to_entity="faculty", | |
| from_field="Advisor(s)", | |
| to_field="Name", | |
| normalizer=canonicalize_name, | |
| variant_generator=lambda value: extract_advisor_names(value), | |
| allow_token_subset=False, | |
| ) | |
| ) | |
| catalog.register_relationship( | |
| RelationshipDefinition( | |
| name="faculty_to_advisees", | |
| from_entity="faculty", | |
| to_entity="students", | |
| from_field="Name", | |
| to_field="Advisor(s)", | |
| normalizer=canonicalize_name, | |
| variant_generator=None, | |
| allow_token_subset=False, | |
| fallback_matcher=lambda source, students: [ | |
| row for row in students if canonicalize_name(source.get("Name", "")) in { | |
| canonicalize_name(name) for name in extract_advisor_names(row.get("Advisor(s)")) | |
| } | |
| ], | |
| ) | |
| ) | |
| return catalog | |