Spaces:
Sleeping
Sleeping
File size: 9,808 Bytes
c2b7a7b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 | """Lightweight in-memory catalog for loading CSV data into named tables."""
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Optional
import logging
from .sources import DataSource, default_sources
from .utils import (
canonicalize_name,
centers_for_faculty,
extract_leadership_names,
extract_advisor_names,
generate_name_variants,
tokenize_name,
)
@dataclass
class EntityRecords:
"""Container for a structured dataset plus helpful indices."""
name: str
records: List[Dict[str, Any]]
key_field: Optional[str] = None
origin: Optional[str] = None
_index: Dict[str, Dict[str, Any]] = field(default_factory=dict, init=False, repr=False)
def build_index(self, normalizer: Optional[Callable[[str], str]] = None) -> None:
if self.key_field is None:
return
normalizer = normalizer or canonicalize_name
for row in self.records:
key = row.get(self.key_field)
if not key:
continue
self._index[normalizer(str(key))] = row
def get_by_key(self, value: str, normalizer: Optional[Callable[[str], str]] = None) -> Optional[Dict[str, Any]]:
if not value or self.key_field is None:
return None
normalizer = normalizer or canonicalize_name
return self._index.get(normalizer(value))
class DataCatalog:
"""
Registry that mirrors Satyrn's Ring layer.
Each entity (faculty, staff, events...) is stored with metadata so
analysis blueprints can retrieve and join information deterministically.
"""
def __init__(self) -> None:
self.entities: Dict[str, EntityRecords] = {}
self.metadata: Dict[str, Dict[str, Any]] = {}
self.relationships: Dict[str, RelationshipDefinition] = {}
def register_entity(
self,
name: str,
records: Iterable[Dict[str, Any]],
*,
key_field: Optional[str] = None,
origin: Optional[str] = None,
normalizer: Optional[Callable[[str], str]] = None,
) -> None:
dataset = EntityRecords(
name=name,
records=list(records),
key_field=key_field,
origin=origin,
)
dataset.build_index(normalizer)
self.entities[name] = dataset
def get(self, name: str) -> EntityRecords:
if name not in self.entities:
raise KeyError(f"Unknown entity '{name}'")
return self.entities[name]
def try_get(self, name: str) -> Optional[EntityRecords]:
return self.entities.get(name)
# Relationship handling -------------------------------------------------
def register_relationship(self, relationship: RelationshipDefinition) -> None:
self.relationships[relationship.name] = relationship
def resolve_relationship(self, name: str, source_row: Dict[str, Any]) -> List[Dict[str, Any]]:
relationship = self.relationships.get(name)
if not relationship:
raise KeyError(f"Relationship '{name}' not registered.")
source_value = source_row.get(relationship.from_field)
if not source_value:
return []
target_entity = self.try_get(relationship.to_entity)
if not target_entity:
return []
candidates = relationship.variant_generator(source_value) if relationship.variant_generator else [source_value]
matches: List[Dict[str, Any]] = []
for candidate in candidates:
target_row = target_entity.get_by_key(candidate, relationship.normalizer)
if target_row:
matches.append(target_row)
if matches:
return matches
if relationship.allow_token_subset:
source_tokens = tokenize_name(source_value)
if source_tokens:
for row in target_entity.records:
dest_tokens = tokenize_name(row.get(relationship.to_field, ""))
if source_tokens.issubset(dest_tokens):
matches.append(row)
if matches:
return matches
if relationship.fallback_matcher:
extra = relationship.fallback_matcher(source_row, target_entity.records)
if extra:
matches.extend(extra)
return matches
@dataclass
class RelationshipDefinition:
name: str
from_entity: str
to_entity: str
from_field: str
to_field: str
normalizer: Callable[[str], str] = canonicalize_name
variant_generator: Optional[Callable[[str], Iterable[str]]] = lambda value: generate_name_variants(value)
allow_token_subset: bool = False
fallback_matcher: Optional[Callable[[Dict[str, Any], List[Dict[str, Any]]], List[Dict[str, Any]]]] = None
def load_default_catalog(
base_dir: Path | str = "Archive",
*,
sources: Optional[Iterable[DataSource]] = None,
) -> DataCatalog:
"""
Load the project CSVs into a catalog with sensible defaults.
This mirrors the datasets referenced in the exploratory notebook,
giving downstream code a single call to bootstrap the backend.
A custom ``sources`` iterable can be supplied to extend or override
the default data sources (e.g., to add TA office hours).
"""
base_path = Path(base_dir)
catalog = DataCatalog()
logger = logging.getLogger(__name__)
source_list = list(sources) if sources is not None else default_sources(base_path, name_normalizer=canonicalize_name)
for source in source_list:
result = source.load()
for entity in result.entities:
if not entity.records:
continue
catalog.register_entity(
entity.name,
entity.records,
key_field=entity.key_field,
origin=entity.origin,
normalizer=entity.normalizer,
)
# Log which file provided this entity so we can trace provenance
try:
origin_txt = entity.origin or "(unknown)"
except Exception:
origin_txt = "(unknown)"
logger.info("Loaded entity '%s' with %d records from %s", entity.name, len(entity.records), origin_txt)
for key, value in result.metadata.items():
existing = catalog.metadata.get(key)
if isinstance(existing, dict) and isinstance(value, dict):
existing.update(value)
else:
catalog.metadata[key] = value
# Register common relationships between entities to mirror Satyrn's Ring metadata.
if catalog.try_get("faculty") and catalog.try_get("faculty_offices"):
catalog.register_relationship(
RelationshipDefinition(
name="faculty_to_office",
from_entity="faculty",
to_entity="faculty_offices",
from_field="Name",
to_field="Assignee Name",
allow_token_subset=True,
)
)
if catalog.try_get("centers") and catalog.try_get("faculty"):
catalog.register_relationship(
RelationshipDefinition(
name="center_to_faculty_leads",
from_entity="centers",
to_entity="faculty",
from_field="Leadership",
to_field="Name",
normalizer=canonicalize_name,
variant_generator=lambda value: extract_leadership_names(value),
allow_token_subset=False,
)
)
catalog.register_relationship(
RelationshipDefinition(
name="faculty_to_centers",
from_entity="faculty",
to_entity="centers",
from_field="Name",
to_field="Leadership",
normalizer=canonicalize_name,
variant_generator=None,
allow_token_subset=False,
fallback_matcher=centers_for_faculty,
)
)
if catalog.try_get("students") and catalog.try_get("mudd_seating"):
catalog.register_relationship(
RelationshipDefinition(
name="student_to_mudd_seat",
from_entity="students",
to_entity="mudd_seating",
from_field="Name",
to_field="Student/Visitor",
allow_token_subset=True,
)
)
if catalog.try_get("students") and catalog.try_get("faculty"):
catalog.register_relationship(
RelationshipDefinition(
name="student_to_advisors",
from_entity="students",
to_entity="faculty",
from_field="Advisor(s)",
to_field="Name",
normalizer=canonicalize_name,
variant_generator=lambda value: extract_advisor_names(value),
allow_token_subset=False,
)
)
catalog.register_relationship(
RelationshipDefinition(
name="faculty_to_advisees",
from_entity="faculty",
to_entity="students",
from_field="Name",
to_field="Advisor(s)",
normalizer=canonicalize_name,
variant_generator=None,
allow_token_subset=False,
fallback_matcher=lambda source, students: [
row for row in students if canonicalize_name(source.get("Name", "")) in {
canonicalize_name(name) for name in extract_advisor_names(row.get("Advisor(s)"))
}
],
)
)
return catalog
|