| | """ Generic extraction of features, supporting VITO backend. |
| | """ |
| |
|
| | from typing import Callable, Optional |
| |
|
| | import openeo |
| | from openeo.rest import OpenEoApiError |
| |
|
| | from openeo_gfmap.backend import Backend, BackendContext |
| | from openeo_gfmap.fetching import CollectionFetcher, FetchType, _log |
| | from openeo_gfmap.fetching.commons import ( |
| | _load_collection, |
| | convert_band_names, |
| | rename_bands, |
| | resample_reproject, |
| | ) |
| | from openeo_gfmap.spatial import SpatialContext |
| | from openeo_gfmap.temporal import TemporalContext |
| |
|
| | BASE_DEM_MAPPING = {"DEM": "COP-DEM"} |
| | BASE_WEATHER_MAPPING = { |
| | "dewpoint-temperature": "AGERA5-DEWTEMP", |
| | "precipitation-flux": "AGERA5-PRECIP", |
| | "solar-radiation-flux": "AGERA5-SOLRAD", |
| | "temperature-max": "AGERA5-TMAX", |
| | "temperature-mean": "AGERA5-TMEAN", |
| | "temperature-min": "AGERA5-TMIN", |
| | "vapour-pressure": "AGERA5-VAPOUR", |
| | "wind-speed": "AGERA5-WIND", |
| | } |
| | AGERA5_STAC_MAPPING = { |
| | "dewpoint_temperature_mean": "AGERA5-DEWTEMP", |
| | "total_precipitation": "AGERA5-PRECIP", |
| | "solar_radiation_flux": "AGERA5-SOLRAD", |
| | "2m_temperature_max": "AGERA5-TMAX", |
| | "2m_temperature_mean": "AGERA5-TMEAN", |
| | "2m_temperature_min": "AGERA5-TMIN", |
| | "vapour_pressure": "AGERA5-VAPOUR", |
| | "wind_speed": "AGERA5-WIND", |
| | } |
| | KNOWN_UNTEMPORAL_COLLECTIONS = ["COPERNICUS_30"] |
| |
|
| | AGERA5_TERRASCOPE_STAC = "https://stac.openeo.vito.be/collections/agera5_daily" |
| |
|
| |
|
| | def _get_generic_fetcher( |
| | collection_name: str, fetch_type: FetchType, backend: Backend, is_stac: bool |
| | ) -> Callable: |
| | band_mapping: Optional[dict] = None |
| |
|
| | if collection_name == "COPERNICUS_30": |
| | band_mapping = BASE_DEM_MAPPING |
| | elif collection_name == "AGERA5": |
| | band_mapping = BASE_WEATHER_MAPPING |
| | elif is_stac and (AGERA5_TERRASCOPE_STAC in collection_name): |
| | band_mapping = AGERA5_STAC_MAPPING |
| |
|
| | def generic_default_fetcher( |
| | connection: openeo.Connection, |
| | spatial_extent: SpatialContext, |
| | temporal_extent: TemporalContext, |
| | bands: list, |
| | **params, |
| | ) -> openeo.DataCube: |
| | if band_mapping is not None: |
| | bands = convert_band_names(bands, band_mapping) |
| |
|
| | if (collection_name in KNOWN_UNTEMPORAL_COLLECTIONS) and ( |
| | temporal_extent is not None |
| | ): |
| | _log.warning( |
| | "Ignoring the temporal extent provided by the user as the collection %s is known to be untemporal.", |
| | collection_name, |
| | ) |
| | temporal_extent = None |
| |
|
| | try: |
| | cube = _load_collection( |
| | connection, |
| | bands, |
| | collection_name, |
| | spatial_extent, |
| | temporal_extent, |
| | fetch_type, |
| | is_stac=is_stac, |
| | **params, |
| | ) |
| | except OpenEoApiError as e: |
| | if "CollectionNotFound" in str(e): |
| | raise ValueError( |
| | f"Collection {collection_name} not found in the selected backend {backend.value}." |
| | ) from e |
| | raise e |
| |
|
| | |
| | |
| | |
| |
|
| | return cube |
| |
|
| | return generic_default_fetcher |
| |
|
| |
|
| | def _get_generic_processor( |
| | collection_name: str, fetch_type: FetchType, is_stac: bool |
| | ) -> Callable: |
| | """Builds the preprocessing function from the collection name as it stored |
| | in the target backend. |
| | """ |
| | band_mapping: Optional[dict] = None |
| | if collection_name == "COPERNICUS_30": |
| | band_mapping = BASE_DEM_MAPPING |
| | elif collection_name == "AGERA5": |
| | band_mapping = BASE_WEATHER_MAPPING |
| | elif is_stac and (AGERA5_TERRASCOPE_STAC in collection_name): |
| | band_mapping = AGERA5_STAC_MAPPING |
| |
|
| | def generic_default_processor(cube: openeo.DataCube, **params): |
| | """Default collection preprocessing method for generic datasets. |
| | This method renames bands and removes the time dimension in case the |
| | requested dataset is DEM |
| | """ |
| | if params.get("target_resolution", None) is not None: |
| | cube = resample_reproject( |
| | cube, |
| | params.get("target_resolution", 10.0), |
| | params.get("target_crs", None), |
| | method=params.get("resampling_method", "near"), |
| | ) |
| |
|
| | if collection_name == "COPERNICUS_30": |
| | cube = cube.min_time() |
| |
|
| | if band_mapping is not None: |
| | cube = rename_bands(cube, band_mapping) |
| |
|
| | return cube |
| |
|
| | return generic_default_processor |
| |
|
| |
|
| | def build_generic_extractor( |
| | backend_context: BackendContext, |
| | bands: list, |
| | fetch_type: FetchType, |
| | collection_name: str, |
| | **params, |
| | ) -> CollectionFetcher: |
| | """Creates a generic extractor adapted to the given backend. Provides band mappings for known |
| | collections, such as AGERA5 available on Terrascope/FED and Copernicus 30m DEM in all backends. |
| | """ |
| | fetcher = _get_generic_fetcher( |
| | collection_name, fetch_type, backend_context.backend, False |
| | ) |
| | preprocessor = _get_generic_processor(collection_name, fetch_type, False) |
| |
|
| | return CollectionFetcher(backend_context, bands, fetcher, preprocessor, **params) |
| |
|
| |
|
| | def build_generic_extractor_stac( |
| | backend_context: BackendContext, |
| | bands: list, |
| | fetch_type: FetchType, |
| | collection_url: str, |
| | **params, |
| | ) -> CollectionFetcher: |
| | """Creates a generic extractor adapted to the given backend. Currently only tested with VITO backend""" |
| | fetcher = _get_generic_fetcher( |
| | collection_url, fetch_type, backend_context.backend, True |
| | ) |
| | preprocessor = _get_generic_processor(collection_url, fetch_type, True) |
| |
|
| | return CollectionFetcher(backend_context, bands, fetcher, preprocessor, **params) |
| |
|