Spaces:
Build error
Build error
| import datetime | |
| import json | |
| import os | |
| from itertools import repeat | |
| import ee | |
| import numpy as np | |
| import pandas as pd | |
| import plotly.graph_objects as go | |
| import yaml | |
| from utils import duckdb_queries as dq | |
| from . import logging | |
| GEE_SERVICE_ACCOUNT = ( | |
| "climatebase-july-2023@ee-geospatialml-aquarry.iam.gserviceaccount.com" | |
| ) | |
| class IndexGenerator: | |
| """ | |
| A class to generate indices and compute zonal means. | |
| Args: | |
| indices (string[], required): Array of index names to include in aggregate index generation. | |
| """ | |
| def __init__(self): | |
| # Authenticate to GEE & DuckDB | |
| self._authenticate_ee(GEE_SERVICE_ACCOUNT) | |
| self.roi = None | |
| self.project_name = None | |
| self.project_geometry = None | |
| self.project_centroid = None | |
| self.indices = None | |
| self.metric_name = None | |
| def set_metric(self, metric_name): | |
| # Use defined subset of indices | |
| indices_file = f'metrics/{metric_name.replace(" ", "_")}.yaml' | |
| self.indices = self._load_indices(indices_file) | |
| self.metric_name = metric_name | |
| def set_project(self, project_name): | |
| self.project_name = project_name | |
| self.project_geometry = dq.get_project_geometry(self.project_name) | |
| self.project_centroid = dq.get_project_centroid(self.project_name) | |
| # to-do: refactor to involve fewer transformations | |
| _polygon = json.dumps( | |
| json.loads(self.project_geometry[0][0])["features"][0]["geometry"] | |
| ) | |
| # to-do: don't use self.roi and instead pass patameter strategically | |
| self.roi = ee.Geometry.Polygon(json.loads(_polygon)["coordinates"]) | |
| def _cloudfree(self, gee_path, daterange): | |
| """ | |
| Internal method to generate a cloud-free composite. | |
| Args: | |
| gee_path (str): The path to the Google Earth Engine (GEE) image or image collection. | |
| Returns: | |
| ee.Image: The cloud-free composite clipped to the region of interest. | |
| """ | |
| # Load a raw Landsat ImageCollection for a single year. | |
| collection = ( | |
| ee.ImageCollection(gee_path).filterDate(*daterange).filterBounds(self.roi) | |
| ) | |
| # Create a cloud-free composite with custom parameters for cloud score threshold and percentile. | |
| composite_cloudfree = ee.Algorithms.Landsat.simpleComposite( | |
| **{"collection": collection, "percentile": 75, "cloudScoreRange": 5} | |
| ) | |
| return composite_cloudfree.clip(self.roi) | |
| def _load_indices(indices_file): | |
| # Read index configurations | |
| with open(indices_file, "r") as stream: | |
| try: | |
| return yaml.safe_load(stream) | |
| except yaml.YAMLError as e: | |
| logging.error(e) | |
| return None | |
| def generate_index(self, index_config, year): | |
| """ | |
| Generates an index based on the provided index configuration. | |
| Args: | |
| index_config (dict): Configuration for generating the index. | |
| Returns: | |
| ee.Image: The generated index clipped to the region of interest. | |
| """ | |
| # Calculate date range, assume 1 year | |
| start_date = str(datetime.date(year, 1, 1)) | |
| end_date = str(datetime.date(year, 12, 31)) | |
| daterange = [start_date, end_date] | |
| # Calculate index based on type | |
| logging.info( | |
| f"Generating index: {index_config['name']} of type {index_config['gee_type']}" | |
| ) | |
| match index_config["gee_type"]: | |
| case "image": | |
| dataset = ee.Image(index_config["gee_path"]).clip(self.roi) | |
| if index_config.get("select"): | |
| dataset = dataset.select(index_config["select"]) | |
| case "image_collection": | |
| dataset = ( | |
| ee.ImageCollection(index_config["gee_path"]) | |
| .filterBounds(self.roi) | |
| .map(lambda image: image.clip(self.roi)) | |
| .mean() | |
| ) | |
| if index_config.get("select"): | |
| dataset = dataset.select(index_config["select"]) | |
| case "feature_collection": | |
| dataset = ( | |
| ee.Image() | |
| .float() | |
| .paint( | |
| ee.FeatureCollection(index_config["gee_path"]), | |
| index_config["select"], | |
| ) | |
| .clip(self.roi) | |
| ) | |
| case "algebraic": | |
| image = self._cloudfree(index_config["gee_path"], daterange) | |
| # to-do: params should come from index_config | |
| dataset = image.normalizedDifference(["B4", "B3"]) | |
| case _: | |
| dataset = None | |
| if not dataset: | |
| raise Exception("Failed to generate dataset.") | |
| # Normalize to a range of [0, 1] | |
| min_val = 0 | |
| max_val = 1 | |
| if type(index_config['min'])==int or type(index_config['min']==float): | |
| min_val = index_config['min'] | |
| if str(index_config['max'])=='roi_area': | |
| max_val = self.roi.area().getInfo() # in m^2 | |
| elif type(index_config['max'])==int or type(index_config['max']==float): | |
| max_val = index_config['max'] | |
| dataset.subtract(min_val)\ | |
| .divide(max_val - min_val) | |
| logging.info(f"Generated index: {index_config['name']}") | |
| return dataset | |
| def zonal_mean_index(self, index_key, year): | |
| index_config = self.indices[index_key] | |
| dataset = self.generate_index(index_config, year) | |
| logging.info(f"Calculating zonal mean for {index_key}...") | |
| out = dataset.reduceRegion( | |
| **{ | |
| "reducer": ee.Reducer.mean(), | |
| "geometry": self.roi, | |
| "scale": 2000, # map scale | |
| "bestEffort": True, | |
| "maxPixels": 1e3, | |
| } | |
| ).getInfo() | |
| if index_config.get("bandname"): | |
| return out[index_config.get("bandname")] | |
| logging.info(f"Calculated zonal mean for {index_key}.") | |
| return out | |
| def generate_composite_index_df(self, year): | |
| data = { | |
| "metric": self.metric_name, | |
| "year": year, | |
| "centroid": "", | |
| "project_name": "", | |
| "value": list(map(self.zonal_mean_index, self.indices, repeat(year))), | |
| # to-do: calculate with duckdb; also, should be part of project table instead | |
| "area": self.roi.area().getInfo(), # m^2 | |
| "geojson": "", | |
| "coefficient": list(map(lambda x: self.indices[x]['coefficient'], self.indices)) | |
| } | |
| logging.info("data", data) | |
| df = pd.DataFrame(data) | |
| return df | |
| def _authenticate_ee(ee_service_account): | |
| """ | |
| Huggingface Spaces does not support secret files, therefore authenticate with an environment variable containing the JSON. | |
| """ | |
| logging.info("Authenticating to Google Earth Engine...") | |
| credentials = ee.ServiceAccountCredentials( | |
| ee_service_account, key_data=os.environ["ee_service_account"] | |
| ) | |
| ee.Initialize(credentials) | |
| logging.info("Authenticated to Google Earth Engine.") | |
| def _calculate_yearly_index(self, years): | |
| dfs = [] | |
| logging.info(years) | |
| # to-do: pararelize? | |
| for year in years: | |
| logging.info(year) | |
| df = self.generate_composite_index_df(year) | |
| dfs.append(df) | |
| # Concatenate all dataframes | |
| df_concat = pd.concat(dfs) | |
| df_concat["centroid"] = str(self.project_centroid) | |
| df_concat["project_name"] = self.project_name | |
| df_concat["geojson"] = str(self.project_geometry) | |
| return df_concat.round(2) | |
| # h/t: https://community.plotly.com/t/dynamic-zoom-for-mapbox/32658/12\ | |
| def _latlon_to_config(longitudes=None, latitudes=None): | |
| """Function documentation:\n | |
| Basic framework adopted from Krichardson under the following thread: | |
| https://community.plotly.com/t/dynamic-zoom-for-mapbox/32658/7 | |
| # NOTE: | |
| # THIS IS A TEMPORARY SOLUTION UNTIL THE DASH TEAM IMPLEMENTS DYNAMIC ZOOM | |
| # in their plotly-functions associated with mapbox, such as go.Densitymapbox() etc. | |
| Returns the appropriate zoom-level for these plotly-mapbox-graphics along with | |
| the center coordinate tuple of all provided coordinate tuples. | |
| """ | |
| # Check whether both latitudes and longitudes have been passed, | |
| # or if the list lenghts don't match | |
| if (latitudes is None or longitudes is None) or ( | |
| len(latitudes) != len(longitudes) | |
| ): | |
| # Otherwise, return the default values of 0 zoom and the coordinate origin as center point | |
| return 0, (0, 0) | |
| # Get the boundary-box | |
| b_box = {} | |
| b_box["height"] = latitudes.max() - latitudes.min() | |
| b_box["width"] = longitudes.max() - longitudes.min() | |
| b_box["center"] = (np.mean(longitudes), np.mean(latitudes)) | |
| # get the area of the bounding box in order to calculate a zoom-level | |
| area = b_box["height"] * b_box["width"] | |
| # * 1D-linear interpolation with numpy: | |
| # - Pass the area as the only x-value and not as a list, in order to return a scalar as well | |
| # - The x-points "xp" should be in parts in comparable order of magnitude of the given area | |
| # - The zpom-levels are adapted to the areas, i.e. start with the smallest area possible of 0 | |
| # which leads to the highest possible zoom value 20, and so forth decreasing with increasing areas | |
| # as these variables are antiproportional | |
| zoom = np.interp( | |
| x=area, | |
| xp=[0, 5**-10, 4**-10, 3**-10, 2**-10, 1**-10, 1**-5], | |
| fp=[20, 15, 14, 13, 12, 7, 5], | |
| ) | |
| # Finally, return the zoom level and the associated boundary-box center coordinates | |
| return zoom, b_box["center"] | |
| def show_project_map(self): | |
| features = json.loads(self.project_geometry[0][0].replace("'", '"'))["features"] | |
| geometry = features[0]["geometry"] | |
| longitudes = np.array(geometry["coordinates"])[0, :, 0] | |
| latitudes = np.array(geometry["coordinates"])[0, :, 1] | |
| zoom, bbox_center = self._latlon_to_config(longitudes, latitudes) | |
| fig = go.Figure( | |
| go.Scattermapbox( | |
| mode="markers", | |
| lon=[bbox_center[0]], | |
| lat=[bbox_center[1]], | |
| marker={"size": 20, "color": ["cyan"]}, | |
| ) | |
| ) | |
| fig.update_layout( | |
| mapbox={ | |
| "style": "satellite", | |
| "accesstoken":os.environ['MAPBOX_ACCESS_TOKEN'], | |
| "center": {"lon": bbox_center[0], "lat": bbox_center[1]}, | |
| "zoom": zoom, | |
| "layers": [ | |
| { | |
| "source": { | |
| "type": "FeatureCollection", | |
| "features": [{"type": "Feature", "geometry": geometry}], | |
| }, | |
| "type": "fill", | |
| "below": "traces", | |
| "color": "royalblue", | |
| "opacity": 0.5, | |
| } | |
| ], | |
| }, | |
| margin={"l": 0, "r": 0, "b": 0, "t": 0}, | |
| ) | |
| return fig | |
| def calculate_score(self, start_year, end_year): | |
| years = [] | |
| # Create `bioindicator` table IF NOT EXISTS. | |
| dq.get_or_create_bioindicator_table() | |
| for year in range(start_year, end_year+1): | |
| row_exists = dq.check_if_project_exists_for_year(self.project_name, year) | |
| if not row_exists: | |
| years.append(year) | |
| if len(years) > 0: | |
| df = self._calculate_yearly_index(years) | |
| # Write score table to `_temptable` | |
| dq.write_score_to_temptable(df) | |
| # UPSERT project record | |
| dq.upsert_project_record() | |
| logging.info("upserted records into motherduck") | |
| scores = dq.get_project_scores(self.project_name, start_year, end_year) | |
| scores.columns = scores.columns.str.replace('_', ' ').str.title() | |
| if 'Area' in scores.columns: | |
| scores['Area'] /= 1000**2 | |
| scores.rename(columns={'Area':'Area (km^2)'}, inplace=True) | |
| if 'Score' in scores.columns: | |
| scores['Score'] /= 1000**2 | |
| scores.rename(columns={'Score': 'Score (Area * Value)'}, inplace=True) | |
| # Round scores to 4 significant figures | |
| scores = scores.apply( | |
| lambda x: ['%.4g'%x_i for x_i in x] | |
| if pd.api.types.is_numeric_dtype(x) | |
| else x) | |
| return scores | |
| def get_metric_file(self): | |
| # Use defined subset of indices | |
| indices_file = f'metrics/{self.metric_name.replace(" ", "_")}.yaml' | |
| with open(indices_file, "r") as stream: | |
| return stream.read() |