import xarray as xr import numpy as np from shapely.geometry import Point from shapely.ops import transform import pyproj from typing import Dict from pathlib import Path import re from shapely.wkt import loads import json import pandas as pd import geopandas as gpd def _load_field_mapping(consortium, consortium_folder_name: str, sensors_df: pd.DataFrame) -> Dict: """ Load field geometries from geojson files in data/01-raw/{consortium_folder_name}/field_shapes. Each file must be named {field_name}_irri.geojson. The mapping is done if field_name (lowercase) appears inside datastream_name.lower(). Returns {sensor_name: geometry_wkt}. """ base_dir = Path(f"data/01_raw/{consortium_folder_name}/fields_shapes") if not base_dir.exists(): raise FileNotFoundError(f"Folder not found: {base_dir}") result = {} # lowercase names for matching sensor_names = sensors_df["datastream_name"].tolist() sensor_names_lower = [s.lower() for s in sensor_names] # iterate over all geojson field files for geojson_path in base_dir.glob("*.geojson"): stem = geojson_path.stem.lower() # different extraction rules per consortium if consortium == "consortium0": field_name = stem.replace("_irri", "") elif consortium == "consortium1": field_name = stem # use full stem else: field_name = stem # fallback gdf = gpd.read_file(geojson_path) if gdf.empty: continue geom_wkt = gdf.geometry.iloc[0].wkt for sensor_name, sensor_lower in zip(sensor_names, sensor_names_lower): if field_name in sensor_lower: # this now works for both cases result[sensor_name] = geom_wkt return result def _infer_dataset_crs(ds: xr.Dataset) -> str: """Infer CRS from dataset coordinates.""" # check for common CRS attributes if 'crs' in ds.attrs: return ds.attrs['crs'] # check coordinate ranges to infer CRS x_range = float(ds.x.max() - ds.x.min()) y_range = float(ds.y.max() - ds.y.min()) # if ranges are small (< 1), likely lat/lon (WGS84) if x_range < 1 and y_range < 1: return "EPSG:4326" # else, assume UTM zone 32N (common for Northern Italy/Trento area) return "EPSG:32632" # todo: instead of closest maybe change to be contained/within the pixel only. def _get_closest_pixel_indices(ds: xr.Dataset, lat: float, lon: float, transformer: pyproj.Transformer) -> tuple: """Get indices of closest pixel to given location.""" # transform coordinates x_proj, y_proj = transformer.transform(lon, lat) # find nearest pixel indices x_idx = int(np.argmin(np.abs(ds.x.values - x_proj))) y_idx = int(np.argmin(np.abs(ds.y.values - y_proj))) return y_idx, x_idx def _get_field_pixel_indices(ds: xr.Dataset, field_geom, transformer: pyproj.Transformer) -> tuple: """Get indices of all pixels within field boundary.""" # clean and make geometry work ''' field_geom = __swap_coordinates(field_geom) print("Geometry to parse:", field_geom)''' # Load geometry from the swapped WKT string field_geom = loads(field_geom) # transform field geometry to dataset CRS field_proj = transform(transformer.transform, field_geom) # obtain pixel bounds from dataset y_coords = ds.y.values x_coords = ds.x.values # create meshgrid of pixel centers x_grid, y_grid = np.meshgrid(x_coords, y_coords) # check which pixels fall within the field y_indices = [] x_indices = [] for i in range(len(y_coords)): for j in range(len(x_coords)): pixel_point = Point(x_grid[i, j], y_grid[i, j]) if field_proj.contains(pixel_point): y_indices.append(i) x_indices.append(j) return y_indices, x_indices def __swap_coordinates(geometry): """ Swaps x and y coordinates in a geometry string for both POLYGON and MULTIPOLYGON. Handles both outer rings and inner rings (holes) in polygons. Parameters: - geometry (str): The geometry string Returns: - str: The geometry string with swapped coordinates. """ geometry = geometry.strip() geometry_type = geometry.split("(")[0].strip() def swap_coordinate_pairs(coord_string): """Helper function to swap coordinate pairs in a string.""" coordinates = re.findall(r"([-+]?\d*\.\d+|\d+)\s([-+]?\d*\.\d+|\d+)", coord_string) return ",".join(f"{y} {x}" for x, y in coordinates) if geometry_type == 'POLYGON': # handle single polygon with possible inner rings rings = re.findall(r"\(([-+\d\s.,]+)\)", geometry) swapped_rings = [swap_coordinate_pairs(ring) for ring in rings] swapped_geometry = f"POLYGON (({'),('.join(swapped_rings)}))" elif geometry_type == 'MULTIPOLYGON': # split the geometry into individual polygons polygons = re.findall(r"\(\(([-+\d\s.,()]+?)\)\)", geometry) swapped_polygons = [] for polygon in polygons: # split polygon into rings (first is outer, rest are inner) rings = polygon.split('),(') # swap coordinates for each ring swapped_rings = [swap_coordinate_pairs(ring) for ring in rings] # reconst. the polygon with all its rings if len(rings) > 1: # if there are inner rings, join them with ),( swapped_polygons.append(f"(({'),('.join(swapped_rings)}))") else: # if there's only outer ring swapped_polygons.append(f"(({swapped_rings[0]}))") # re-build the MULTIPOLYGON swapped_geometry = f"MULTIPOLYGON ({','.join(swapped_polygons)})" else: raise ValueError(f"Unsupported geometry type: {geometry_type}") # val the swapped geometry try: swapped_geom = loads(swapped_geometry) if not swapped_geom.is_valid: print(f"Warning: Invalid geometry after swap: {swapped_geom.wkt}") except Exception as e: raise ValueError(f"Invalid geometry after swapping coordinates: {str(e)}") return swapped_geometry # this function is just to get what fields are sensors part of # todo: this implementation proves there are problems with current shapes in the services. No full mapping exists! def get_fields_4_sensors(fields_json, locations_df): # --- Load JSON file --- with open(fields_json, "r") as f: fields_data = json.load(f) # --- Build list of field geometries --- field_geoms = [] field_ids = [] for fdict in fields_data: field_geom = fdict["feature"] field_geom = __swap_coordinates(field_geom) print("Geometry to parse:", field_geom) geom = loads(field_geom) field_geoms.append(geom) field_ids.append(fdict["field_id"]) # create fields gdf fields_gdf = gpd.GeoDataFrame({"field_id": field_ids, "geometry": field_geoms}, crs="EPSG:4326") # convert to gdf crossing x and y devices_gdf = gpd.GeoDataFrame( locations_df, geometry=gpd.points_from_xy(locations_df.y, locations_df.x), crs="EPSG:4326" ) # spatial joint points in polygons! joined = gpd.sjoin(devices_gdf, fields_gdf, how="left", predicate="within") # agg field IDs per device device_to_fields = ( joined.groupby("datastream_id")["field_id"] .apply(lambda x: list(x.dropna().unique())) .reindex(locations_df["datastream_id"]) .reset_index() ) # merge back to original devices df locations_with_fields = locations_df.merge(device_to_fields, on="datastream_id") print(locations_with_fields.head()) return locations_with_fields