File size: 8,160 Bytes
64ab846 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 | import xarray as xr
import numpy as np
from shapely.geometry import Point
from shapely.ops import transform
import pyproj
from typing import Dict
from pathlib import Path
import re
from shapely.wkt import loads
import json
import pandas as pd
import geopandas as gpd
def _load_field_mapping(consortium, consortium_folder_name: str, sensors_df: pd.DataFrame) -> Dict:
"""
Load field geometries from geojson files in data/01-raw/{consortium_folder_name}/field_shapes.
Each file must be named {field_name}_irri.geojson.
The mapping is done if field_name (lowercase) appears inside datastream_name.lower().
Returns {sensor_name: geometry_wkt}.
"""
base_dir = Path(f"data/01_raw/{consortium_folder_name}/fields_shapes")
if not base_dir.exists():
raise FileNotFoundError(f"Folder not found: {base_dir}")
result = {}
# lowercase names for matching
sensor_names = sensors_df["datastream_name"].tolist()
sensor_names_lower = [s.lower() for s in sensor_names]
# iterate over all geojson field files
for geojson_path in base_dir.glob("*.geojson"):
stem = geojson_path.stem.lower()
# different extraction rules per consortium
if consortium == "consortium0":
field_name = stem.replace("_irri", "")
elif consortium == "consortium1":
field_name = stem # use full stem
else:
field_name = stem # fallback
gdf = gpd.read_file(geojson_path)
if gdf.empty:
continue
geom_wkt = gdf.geometry.iloc[0].wkt
for sensor_name, sensor_lower in zip(sensor_names, sensor_names_lower):
if field_name in sensor_lower: # this now works for both cases
result[sensor_name] = geom_wkt
return result
def _infer_dataset_crs(ds: xr.Dataset) -> str:
"""Infer CRS from dataset coordinates."""
# check for common CRS attributes
if 'crs' in ds.attrs:
return ds.attrs['crs']
# check coordinate ranges to infer CRS
x_range = float(ds.x.max() - ds.x.min())
y_range = float(ds.y.max() - ds.y.min())
# if ranges are small (< 1), likely lat/lon (WGS84)
if x_range < 1 and y_range < 1:
return "EPSG:4326"
# else, assume UTM zone 32N (common for Northern Italy/Trento area)
return "EPSG:32632"
# todo: instead of closest maybe change to be contained/within the pixel only.
def _get_closest_pixel_indices(ds: xr.Dataset, lat: float, lon: float,
transformer: pyproj.Transformer) -> tuple:
"""Get indices of closest pixel to given location."""
# transform coordinates
x_proj, y_proj = transformer.transform(lon, lat)
# find nearest pixel indices
x_idx = int(np.argmin(np.abs(ds.x.values - x_proj)))
y_idx = int(np.argmin(np.abs(ds.y.values - y_proj)))
return y_idx, x_idx
def _get_field_pixel_indices(ds: xr.Dataset,
field_geom,
transformer: pyproj.Transformer) -> tuple:
"""Get indices of all pixels within field boundary."""
# clean and make geometry work
'''
field_geom = __swap_coordinates(field_geom)
print("Geometry to parse:", field_geom)'''
# Load geometry from the swapped WKT string
field_geom = loads(field_geom)
# transform field geometry to dataset CRS
field_proj = transform(transformer.transform, field_geom)
# obtain pixel bounds from dataset
y_coords = ds.y.values
x_coords = ds.x.values
# create meshgrid of pixel centers
x_grid, y_grid = np.meshgrid(x_coords, y_coords)
# check which pixels fall within the field
y_indices = []
x_indices = []
for i in range(len(y_coords)):
for j in range(len(x_coords)):
pixel_point = Point(x_grid[i, j], y_grid[i, j])
if field_proj.contains(pixel_point):
y_indices.append(i)
x_indices.append(j)
return y_indices, x_indices
def __swap_coordinates(geometry):
"""
Swaps x and y coordinates in a geometry string for both POLYGON and MULTIPOLYGON.
Handles both outer rings and inner rings (holes) in polygons.
Parameters:
- geometry (str): The geometry string
Returns:
- str: The geometry string with swapped coordinates.
"""
geometry = geometry.strip()
geometry_type = geometry.split("(")[0].strip()
def swap_coordinate_pairs(coord_string):
"""Helper function to swap coordinate pairs in a string."""
coordinates = re.findall(r"([-+]?\d*\.\d+|\d+)\s([-+]?\d*\.\d+|\d+)", coord_string)
return ",".join(f"{y} {x}" for x, y in coordinates)
if geometry_type == 'POLYGON':
# handle single polygon with possible inner rings
rings = re.findall(r"\(([-+\d\s.,]+)\)", geometry)
swapped_rings = [swap_coordinate_pairs(ring) for ring in rings]
swapped_geometry = f"POLYGON (({'),('.join(swapped_rings)}))"
elif geometry_type == 'MULTIPOLYGON':
# split the geometry into individual polygons
polygons = re.findall(r"\(\(([-+\d\s.,()]+?)\)\)", geometry)
swapped_polygons = []
for polygon in polygons:
# split polygon into rings (first is outer, rest are inner)
rings = polygon.split('),(')
# swap coordinates for each ring
swapped_rings = [swap_coordinate_pairs(ring) for ring in rings]
# reconst. the polygon with all its rings
if len(rings) > 1:
# if there are inner rings, join them with ),(
swapped_polygons.append(f"(({'),('.join(swapped_rings)}))")
else:
# if there's only outer ring
swapped_polygons.append(f"(({swapped_rings[0]}))")
# re-build the MULTIPOLYGON
swapped_geometry = f"MULTIPOLYGON ({','.join(swapped_polygons)})"
else:
raise ValueError(f"Unsupported geometry type: {geometry_type}")
# val the swapped geometry
try:
swapped_geom = loads(swapped_geometry)
if not swapped_geom.is_valid:
print(f"Warning: Invalid geometry after swap: {swapped_geom.wkt}")
except Exception as e:
raise ValueError(f"Invalid geometry after swapping coordinates: {str(e)}")
return swapped_geometry
# this function is just to get what fields are sensors part of
# todo: this implementation proves there are problems with current shapes in the services. No full mapping exists!
def get_fields_4_sensors(fields_json, locations_df):
# --- Load JSON file ---
with open(fields_json, "r") as f:
fields_data = json.load(f)
# --- Build list of field geometries ---
field_geoms = []
field_ids = []
for fdict in fields_data:
field_geom = fdict["feature"]
field_geom = __swap_coordinates(field_geom)
print("Geometry to parse:", field_geom)
geom = loads(field_geom)
field_geoms.append(geom)
field_ids.append(fdict["field_id"])
# create fields gdf
fields_gdf = gpd.GeoDataFrame({"field_id": field_ids, "geometry": field_geoms}, crs="EPSG:4326")
# convert to gdf crossing x and y
devices_gdf = gpd.GeoDataFrame(
locations_df,
geometry=gpd.points_from_xy(locations_df.y, locations_df.x),
crs="EPSG:4326"
)
# spatial joint points in polygons!
joined = gpd.sjoin(devices_gdf, fields_gdf, how="left", predicate="within")
# agg field IDs per device
device_to_fields = (
joined.groupby("datastream_id")["field_id"]
.apply(lambda x: list(x.dropna().unique()))
.reindex(locations_df["datastream_id"])
.reset_index()
)
# merge back to original devices df
locations_with_fields = locations_df.merge(device_to_fields, on="datastream_id")
print(locations_with_fields.head())
return locations_with_fields
|