Spaces:
Sleeping
Sleeping
| # schema_utils.py | |
| from typing import Dict, Set | |
| from schema_map import schema | |
| def fieldset_for(collection: str) -> Set[str]: | |
| col = schema.get(collection, {}) | |
| fields = {k for k in col.keys() if not k.startswith("$")} | |
| # expand nested alias target names too | |
| aliases = col.get("$aliases", {}) or {} | |
| fields |= set(aliases.values()) | |
| return fields | |
| def alias_map_for(collection: str) -> Dict[str, str]: | |
| col = schema.get(collection, {}) | |
| return col.get("$aliases", {}) or {} | |
| def resolve_field(collection: str, field: str) -> str: | |
| """Return the canonical field name (apply alias if present).""" | |
| aliases = alias_map_for(collection) | |
| return aliases.get(field, field) | |
| def has_field(collection: str, field: str) -> bool: | |
| return resolve_field(collection, field) in fieldset_for(collection) | |