MCP_indicators / src /cube_resolver.py
Qdonnars's picture
feat: Implement MCP Server for Indicateurs Territoriaux API
bad6218
"""Cube resolution logic for mapping indicator IDs to data cubes.
The API uses a specific naming convention:
- Data cubes: {thematique}_{maille} (e.g., conso_enaf_com, surface_bio_dpt)
- Measures: {cube_name}.id_{indicator_id} (e.g., conso_enaf_com.id_611)
- Geographic dimensions: geocode_{maille}, libelle_{maille}
This module provides logic to find the correct cube for a given indicator
and geographic level by parsing the /meta endpoint.
"""
from typing import Any
from .models import MAILLE_SUFFIX_MAP, GEO_DIMENSION_PATTERNS, CubeInfo
class CubeResolver:
"""Resolves indicator IDs to their corresponding data cubes.
The resolver caches the /meta response and provides efficient lookup
of cubes by indicator ID and geographic level.
"""
def __init__(self):
"""Initialize the resolver."""
# Cache of cube metadata from /meta
self._cubes_meta: list[dict[str, Any]] = []
# Mapping: indicator_id -> {maille -> cube_name}
self._indicator_cube_map: dict[int, dict[str, str]] = {}
# Mapping: cube_name -> CubeInfo
self._cube_info: dict[str, CubeInfo] = {}
# Set of all indicator IDs found in cubes
self._known_indicator_ids: set[int] = set()
self._initialized = False
@property
def is_initialized(self) -> bool:
"""Check if the resolver has been initialized."""
return self._initialized
def load_from_meta(self, meta_response: dict[str, Any]) -> None:
"""Load and parse cube metadata from /meta response.
Args:
meta_response: The response from /cubejs-api/v1/meta
"""
self._cubes_meta = meta_response.get("cubes", [])
self._build_mappings()
self._initialized = True
def _build_mappings(self) -> None:
"""Build the internal mappings from cube metadata."""
self._indicator_cube_map.clear()
self._cube_info.clear()
self._known_indicator_ids.clear()
for cube in self._cubes_meta:
cube_name = cube.get("name", "")
# Skip metadata cubes
if cube_name in ("indicateur_metadata", "indicateur_x_source_metadata"):
continue
# Determine ALL available mailles from cube dimensions
available_mailles = self._detect_all_mailles(cube)
if not available_mailles:
continue
# Extract indicator IDs from measures
indicator_ids = self._extract_indicator_ids(cube)
if indicator_ids:
# Store cube info (use finest maille as primary)
finest_maille = available_mailles[0] # Already sorted finest-first
self._cube_info[cube_name] = CubeInfo(
name=cube_name,
maille=finest_maille,
indicator_ids=indicator_ids,
)
# Build reverse mapping: indicator_id -> {maille -> cube_name}
# Register cube for ALL available mailles
for ind_id in indicator_ids:
self._known_indicator_ids.add(ind_id)
if ind_id not in self._indicator_cube_map:
self._indicator_cube_map[ind_id] = {}
for maille in available_mailles:
# Only register if not already mapped (prefer finest cube)
if maille not in self._indicator_cube_map[ind_id]:
self._indicator_cube_map[ind_id][maille] = cube_name
def _detect_all_mailles(self, cube: dict[str, Any]) -> list[str]:
"""Detect ALL available geographic levels (mailles) in a cube.
Cubes like conso_enaf_com contain dimensions for all levels
(commune, epci, departement, region) allowing queries at any level.
Args:
cube: Cube metadata from /meta
Returns:
List of available mailles, sorted from finest to coarsest
(commune, epci, departement, region)
"""
dimensions = cube.get("dimensions", [])
dim_names = [d.get("name", "") for d in dimensions]
# Order of mailles from finest to coarsest
maille_order = ["commune", "epci", "departement", "region"]
available = []
for maille in maille_order:
patterns = GEO_DIMENSION_PATTERNS.get(maille, {})
geocode_dim = patterns.get("geocode", "")
# Dimension names are prefixed with cube name
if any(geocode_dim in dim_name for dim_name in dim_names):
available.append(maille)
return available
def _detect_maille(self, cube: dict[str, Any]) -> str | None:
"""Detect the finest geographic level (maille) of a cube.
Args:
cube: Cube metadata from /meta
Returns:
The finest maille name or None
"""
mailles = self._detect_all_mailles(cube)
return mailles[0] if mailles else None
def _extract_indicator_ids(self, cube: dict[str, Any]) -> list[int]:
"""Extract indicator IDs from cube measures.
Measures follow the pattern: {cube_name}.id_{indicator_id}
Args:
cube: Cube metadata from /meta
Returns:
List of indicator IDs found in the cube's measures
"""
measures = cube.get("measures", [])
indicator_ids = []
for measure in measures:
measure_name = measure.get("name", "")
# Look for .id_{number} pattern
if ".id_" in measure_name:
try:
# Extract the ID after "id_"
id_part = measure_name.split(".id_")[-1]
# Handle potential additional suffixes
id_str = id_part.split("_")[0].split(".")[0]
indicator_id = int(id_str)
indicator_ids.append(indicator_id)
except (ValueError, IndexError):
continue
return indicator_ids
def find_cube_for_indicator(
self,
indicator_id: int,
maille: str,
) -> str | None:
"""Find the data cube for a given indicator and geographic level.
Args:
indicator_id: The indicator ID to look up
maille: The geographic level ('commune', 'epci', 'departement', 'region')
Returns:
The cube name if found, None otherwise
"""
if not self._initialized:
return None
maille_lower = maille.lower()
# Check direct mapping
if indicator_id in self._indicator_cube_map:
cube_map = self._indicator_cube_map[indicator_id]
if maille_lower in cube_map:
return cube_map[maille_lower]
return None
def get_measure_name(self, cube_name: str, indicator_id: int) -> str:
"""Get the full measure name for an indicator in a cube.
Args:
cube_name: The cube name
indicator_id: The indicator ID
Returns:
The full measure name (e.g., 'conso_enaf_com.id_611')
"""
return f"{cube_name}.id_{indicator_id}"
def get_dimension_name(self, cube_name: str, dimension: str) -> str:
"""Get the full dimension name for a cube.
Args:
cube_name: The cube name
dimension: The dimension name (e.g., 'geocode_region')
Returns:
The full dimension name (e.g., 'conso_enaf_com.geocode_region')
"""
return f"{cube_name}.{dimension}"
def get_available_mailles(self, indicator_id: int) -> list[str]:
"""Get the available geographic levels for an indicator.
Args:
indicator_id: The indicator ID
Returns:
List of available mailles
"""
if indicator_id not in self._indicator_cube_map:
return []
return list(self._indicator_cube_map[indicator_id].keys())
def get_cube_info(self, cube_name: str) -> CubeInfo | None:
"""Get information about a cube.
Args:
cube_name: The cube name
Returns:
CubeInfo if found, None otherwise
"""
return self._cube_info.get(cube_name)
def is_indicator_known(self, indicator_id: int) -> bool:
"""Check if an indicator ID exists in any cube.
Args:
indicator_id: The indicator ID to check
Returns:
True if the indicator exists in at least one cube
"""
return indicator_id in self._known_indicator_ids
def list_all_cubes(self) -> list[CubeInfo]:
"""List all data cubes with their metadata.
Returns:
List of CubeInfo objects
"""
return list(self._cube_info.values())
def get_cubes_for_indicator(self, indicator_id: int) -> dict[str, str]:
"""Get all cubes containing a given indicator.
Args:
indicator_id: The indicator ID
Returns:
Dict mapping maille to cube_name
"""
return self._indicator_cube_map.get(indicator_id, {}).copy()
# Singleton instance
_resolver_instance: CubeResolver | None = None
def get_resolver() -> CubeResolver:
"""Get or create the singleton CubeResolver instance.
Returns:
The shared CubeResolver instance
"""
global _resolver_instance
if _resolver_instance is None:
_resolver_instance = CubeResolver()
return _resolver_instance