File size: 5,851 Bytes
4fcc331 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 | """ Generic extraction of features, supporting VITO backend.
"""
from typing import Callable, Optional
import openeo
from openeo.rest import OpenEoApiError
from openeo_gfmap.backend import Backend, BackendContext
from openeo_gfmap.fetching import CollectionFetcher, FetchType, _log
from openeo_gfmap.fetching.commons import (
_load_collection,
convert_band_names,
rename_bands,
resample_reproject,
)
from openeo_gfmap.spatial import SpatialContext
from openeo_gfmap.temporal import TemporalContext
BASE_DEM_MAPPING = {"DEM": "COP-DEM"}
BASE_WEATHER_MAPPING = {
"dewpoint-temperature": "AGERA5-DEWTEMP",
"precipitation-flux": "AGERA5-PRECIP",
"solar-radiation-flux": "AGERA5-SOLRAD",
"temperature-max": "AGERA5-TMAX",
"temperature-mean": "AGERA5-TMEAN",
"temperature-min": "AGERA5-TMIN",
"vapour-pressure": "AGERA5-VAPOUR",
"wind-speed": "AGERA5-WIND",
}
AGERA5_STAC_MAPPING = {
"dewpoint_temperature_mean": "AGERA5-DEWTEMP",
"total_precipitation": "AGERA5-PRECIP",
"solar_radiation_flux": "AGERA5-SOLRAD",
"2m_temperature_max": "AGERA5-TMAX",
"2m_temperature_mean": "AGERA5-TMEAN",
"2m_temperature_min": "AGERA5-TMIN",
"vapour_pressure": "AGERA5-VAPOUR",
"wind_speed": "AGERA5-WIND",
}
KNOWN_UNTEMPORAL_COLLECTIONS = ["COPERNICUS_30"]
AGERA5_TERRASCOPE_STAC = "https://stac.openeo.vito.be/collections/agera5_daily"
def _get_generic_fetcher(
collection_name: str, fetch_type: FetchType, backend: Backend, is_stac: bool
) -> Callable:
band_mapping: Optional[dict] = None
if collection_name == "COPERNICUS_30":
band_mapping = BASE_DEM_MAPPING
elif collection_name == "AGERA5":
band_mapping = BASE_WEATHER_MAPPING
elif is_stac and (AGERA5_TERRASCOPE_STAC in collection_name):
band_mapping = AGERA5_STAC_MAPPING
def generic_default_fetcher(
connection: openeo.Connection,
spatial_extent: SpatialContext,
temporal_extent: TemporalContext,
bands: list,
**params,
) -> openeo.DataCube:
if band_mapping is not None:
bands = convert_band_names(bands, band_mapping)
if (collection_name in KNOWN_UNTEMPORAL_COLLECTIONS) and (
temporal_extent is not None
):
_log.warning(
"Ignoring the temporal extent provided by the user as the collection %s is known to be untemporal.",
collection_name,
)
temporal_extent = None
try:
cube = _load_collection(
connection,
bands,
collection_name,
spatial_extent,
temporal_extent,
fetch_type,
is_stac=is_stac,
**params,
)
except OpenEoApiError as e:
if "CollectionNotFound" in str(e):
raise ValueError(
f"Collection {collection_name} not found in the selected backend {backend.value}."
) from e
raise e
# # Apply if the collection is a GeoJSON Feature collection
# if isinstance(spatial_extent, GeoJSON):
# cube = cube.filter_spatial(spatial_extent)
return cube
return generic_default_fetcher
def _get_generic_processor(
collection_name: str, fetch_type: FetchType, is_stac: bool
) -> Callable:
"""Builds the preprocessing function from the collection name as it stored
in the target backend.
"""
band_mapping: Optional[dict] = None
if collection_name == "COPERNICUS_30":
band_mapping = BASE_DEM_MAPPING
elif collection_name == "AGERA5":
band_mapping = BASE_WEATHER_MAPPING
elif is_stac and (AGERA5_TERRASCOPE_STAC in collection_name):
band_mapping = AGERA5_STAC_MAPPING
def generic_default_processor(cube: openeo.DataCube, **params):
"""Default collection preprocessing method for generic datasets.
This method renames bands and removes the time dimension in case the
requested dataset is DEM
"""
if params.get("target_resolution", None) is not None:
cube = resample_reproject(
cube,
params.get("target_resolution", 10.0),
params.get("target_crs", None),
method=params.get("resampling_method", "near"),
)
if collection_name == "COPERNICUS_30":
cube = cube.min_time()
if band_mapping is not None:
cube = rename_bands(cube, band_mapping)
return cube
return generic_default_processor
def build_generic_extractor(
backend_context: BackendContext,
bands: list,
fetch_type: FetchType,
collection_name: str,
**params,
) -> CollectionFetcher:
"""Creates a generic extractor adapted to the given backend. Provides band mappings for known
collections, such as AGERA5 available on Terrascope/FED and Copernicus 30m DEM in all backends.
"""
fetcher = _get_generic_fetcher(
collection_name, fetch_type, backend_context.backend, False
)
preprocessor = _get_generic_processor(collection_name, fetch_type, False)
return CollectionFetcher(backend_context, bands, fetcher, preprocessor, **params)
def build_generic_extractor_stac(
backend_context: BackendContext,
bands: list,
fetch_type: FetchType,
collection_url: str,
**params,
) -> CollectionFetcher:
"""Creates a generic extractor adapted to the given backend. Currently only tested with VITO backend"""
fetcher = _get_generic_fetcher(
collection_url, fetch_type, backend_context.backend, True
)
preprocessor = _get_generic_processor(collection_url, fetch_type, True)
return CollectionFetcher(backend_context, bands, fetcher, preprocessor, **params)
|