remisek's picture
Fix
4fcc331
""" Generic extraction of features, supporting VITO backend.
"""
from typing import Callable, Optional
import openeo
from openeo.rest import OpenEoApiError
from openeo_gfmap.backend import Backend, BackendContext
from openeo_gfmap.fetching import CollectionFetcher, FetchType, _log
from openeo_gfmap.fetching.commons import (
_load_collection,
convert_band_names,
rename_bands,
resample_reproject,
)
from openeo_gfmap.spatial import SpatialContext
from openeo_gfmap.temporal import TemporalContext
BASE_DEM_MAPPING = {"DEM": "COP-DEM"}
BASE_WEATHER_MAPPING = {
"dewpoint-temperature": "AGERA5-DEWTEMP",
"precipitation-flux": "AGERA5-PRECIP",
"solar-radiation-flux": "AGERA5-SOLRAD",
"temperature-max": "AGERA5-TMAX",
"temperature-mean": "AGERA5-TMEAN",
"temperature-min": "AGERA5-TMIN",
"vapour-pressure": "AGERA5-VAPOUR",
"wind-speed": "AGERA5-WIND",
}
AGERA5_STAC_MAPPING = {
"dewpoint_temperature_mean": "AGERA5-DEWTEMP",
"total_precipitation": "AGERA5-PRECIP",
"solar_radiation_flux": "AGERA5-SOLRAD",
"2m_temperature_max": "AGERA5-TMAX",
"2m_temperature_mean": "AGERA5-TMEAN",
"2m_temperature_min": "AGERA5-TMIN",
"vapour_pressure": "AGERA5-VAPOUR",
"wind_speed": "AGERA5-WIND",
}
KNOWN_UNTEMPORAL_COLLECTIONS = ["COPERNICUS_30"]
AGERA5_TERRASCOPE_STAC = "https://stac.openeo.vito.be/collections/agera5_daily"
def _get_generic_fetcher(
collection_name: str, fetch_type: FetchType, backend: Backend, is_stac: bool
) -> Callable:
band_mapping: Optional[dict] = None
if collection_name == "COPERNICUS_30":
band_mapping = BASE_DEM_MAPPING
elif collection_name == "AGERA5":
band_mapping = BASE_WEATHER_MAPPING
elif is_stac and (AGERA5_TERRASCOPE_STAC in collection_name):
band_mapping = AGERA5_STAC_MAPPING
def generic_default_fetcher(
connection: openeo.Connection,
spatial_extent: SpatialContext,
temporal_extent: TemporalContext,
bands: list,
**params,
) -> openeo.DataCube:
if band_mapping is not None:
bands = convert_band_names(bands, band_mapping)
if (collection_name in KNOWN_UNTEMPORAL_COLLECTIONS) and (
temporal_extent is not None
):
_log.warning(
"Ignoring the temporal extent provided by the user as the collection %s is known to be untemporal.",
collection_name,
)
temporal_extent = None
try:
cube = _load_collection(
connection,
bands,
collection_name,
spatial_extent,
temporal_extent,
fetch_type,
is_stac=is_stac,
**params,
)
except OpenEoApiError as e:
if "CollectionNotFound" in str(e):
raise ValueError(
f"Collection {collection_name} not found in the selected backend {backend.value}."
) from e
raise e
# # Apply if the collection is a GeoJSON Feature collection
# if isinstance(spatial_extent, GeoJSON):
# cube = cube.filter_spatial(spatial_extent)
return cube
return generic_default_fetcher
def _get_generic_processor(
collection_name: str, fetch_type: FetchType, is_stac: bool
) -> Callable:
"""Builds the preprocessing function from the collection name as it stored
in the target backend.
"""
band_mapping: Optional[dict] = None
if collection_name == "COPERNICUS_30":
band_mapping = BASE_DEM_MAPPING
elif collection_name == "AGERA5":
band_mapping = BASE_WEATHER_MAPPING
elif is_stac and (AGERA5_TERRASCOPE_STAC in collection_name):
band_mapping = AGERA5_STAC_MAPPING
def generic_default_processor(cube: openeo.DataCube, **params):
"""Default collection preprocessing method for generic datasets.
This method renames bands and removes the time dimension in case the
requested dataset is DEM
"""
if params.get("target_resolution", None) is not None:
cube = resample_reproject(
cube,
params.get("target_resolution", 10.0),
params.get("target_crs", None),
method=params.get("resampling_method", "near"),
)
if collection_name == "COPERNICUS_30":
cube = cube.min_time()
if band_mapping is not None:
cube = rename_bands(cube, band_mapping)
return cube
return generic_default_processor
def build_generic_extractor(
backend_context: BackendContext,
bands: list,
fetch_type: FetchType,
collection_name: str,
**params,
) -> CollectionFetcher:
"""Creates a generic extractor adapted to the given backend. Provides band mappings for known
collections, such as AGERA5 available on Terrascope/FED and Copernicus 30m DEM in all backends.
"""
fetcher = _get_generic_fetcher(
collection_name, fetch_type, backend_context.backend, False
)
preprocessor = _get_generic_processor(collection_name, fetch_type, False)
return CollectionFetcher(backend_context, bands, fetcher, preprocessor, **params)
def build_generic_extractor_stac(
backend_context: BackendContext,
bands: list,
fetch_type: FetchType,
collection_url: str,
**params,
) -> CollectionFetcher:
"""Creates a generic extractor adapted to the given backend. Currently only tested with VITO backend"""
fetcher = _get_generic_fetcher(
collection_url, fetch_type, backend_context.backend, True
)
preprocessor = _get_generic_processor(collection_url, fetch_type, True)
return CollectionFetcher(backend_context, bands, fetcher, preprocessor, **params)