File size: 6,350 Bytes
0af1504 dc7dd00 0af1504 dc7dd00 0af1504 dc7dd00 0af1504 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
""" Utility functions for Google Earth Engine data extraction and processing. """
from datetime import datetime, timedelta
import json
import os
import tempfile
import time
import ee
import geopandas as gpd
from shapely.geometry import Point
import pandas as pd
from indices import add_s2_indices, add_s1_indices
from variables import s2_bands, s1_bands
def check_inside_civ(lat: float, lon: float):
""" Check if the given latitude and longitude are inside Côte d'Ivoire. """
civ = gpd.read_file("data/CIV_0.json")
point = Point(lon, lat)
return civ.contains(point).any()
def initialize_ee():
""" Initialize Google Earth Engine """
sa_email = os.getenv("EE_SERVICE_ACCOUNT")
sa_key_json = os.getenv("EE_SERVICE_KEY")
try:
# Hugging Face
if sa_email and sa_key_json:
key_path = os.path.join(tempfile.gettempdir(), "ee-key.json")
with open(key_path, "w", encoding="utf-8") as f:
f.write(sa_key_json)
creds = ee.ServiceAccountCredentials(sa_email, key_path)
ee.Initialize(creds)
print(f"[INFO] GEE initialized with service account {sa_email}")
return
# Local
local_key = "secrets/gcp-sa-key.json"
if os.path.exists(local_key):
with open(local_key, "r", encoding="utf-8") as f:
key_data = json.load(f)
creds = ee.ServiceAccountCredentials(key_data["client_email"], local_key)
ee.Initialize(creds)
print(f"[INFO] GEE initialized from {local_key}")
return
# Neither HF nor Local
ee.Initialize()
print("[INFO] GEE initialized")
except Exception as e:
raise RuntimeError(f"GEE initialization failed : {e}") from e
def mask_s2_clouds(image):
""" Mask clouds and cirrus in Sentinel-2 images. """
qa = image.select('QA60')
# Bits 10 and 11 are clouds and cirrus, respectively.
cloud_bit_mask = 1 << 10
cirrus_bit_mask = 1 << 11
# Both flags should be set to zero, indicating clear conditions.
mask = (
qa.bitwiseAnd(cloud_bit_mask)
.eq(0)
.And(qa.bitwiseAnd(cirrus_bit_mask).eq(0))
)
masked = image.updateMask(mask).divide(10000)
masked = masked.copyProperties(
source=image,
properties=[
"system:time_start",
"system:time_end",
"CLOUDY_PIXEL_PERCENTAGE",
"SPACECRAFT_NAME"
]
)
return masked
def mask_edge(image):
""" Mask pixels at the edge in Sentinel-1 images. """
edge = image.lt(-30.0)
masked_image = image.mask().And(edge.Not())
return image.updateMask(masked_image)
def days_since_utc(utc_iso: str) -> int:
""" Compute the number of days since the image was taken. """
t_img = time.mktime(time.strptime(utc_iso[:19], "%Y-%m-%dT%H:%M:%S"))
return int((time.time() - t_img) // 86400)
def lonlat_to_utm_epsg(lon: float, lat: float) -> int:
""" Convert longitude and latitude to UTM EPSG code. """
zone = int((lon + 180) // 6) + 1
if lat >= 0:
return 32600 + zone # WGS84 UTM N
return 32700 + zone # WGS84 UTM S
def projected_xy(lon: float, lat: float):
""" Convert lon/lat to projected coordinates (easting, northing). """
epsg = f"EPSG:{lonlat_to_utm_epsg(lon, lat)}"
pt = ee.Geometry.Point([lon, lat])
proj = ee.Projection(epsg)
xy = ee.List(pt.transform(proj, 1).coordinates()).getInfo()
return float(xy[0]), float(xy[1])
def extract_from_gee(lat: float, lon: float, radius_m: int = 30):
""" Extract data from GEE for given lat/lon. """
pt = ee.Geometry.Point([float(lon), float(lat)])
roi = pt.buffer(radius_m).bounds()
end_date = datetime.now()
start_date = end_date - timedelta(days=31)
start = start_date.strftime('%Y-%m-%d')
end = end_date.strftime('%Y-%m-%d')
S2 = ee.ImageCollection("COPERNICUS/S2_SR_HARMONIZED")
S1 = ee.ImageCollection("COPERNICUS/S1_GRD")
DEM = ee.Image("USGS/SRTMGL1_003")
TIME_KEY = "system:time_start"
s2 = (S2.filterBounds(roi)
.filterDate(start, end)
.filter(ee.Filter.lt("CLOUDY_PIXEL_PERCENTAGE", 40))
.map(mask_s2_clouds)
.select(s2_bands))
img_s2 = s2.sort(TIME_KEY, False).first()
if img_s2 is None:
return None, {"error": "No Sentinel-2 image available on ROI/time window."}
cloud_cover = ee.Number(img_s2.get("CLOUDY_PIXEL_PERCENTAGE")).getInfo()
acq_iso_s2 = ee.Date(img_s2.get(TIME_KEY)).format().getInfo()
days_s2 = days_since_utc(acq_iso_s2)
s1 = (S1.filterBounds(roi)
.filterDate(start, end)
.filter(ee.Filter.eq('instrumentMode', 'IW'))
.filter(ee.Filter.listContains('transmitterReceiverPolarisation', 'VV'))
.filter(ee.Filter.listContains('transmitterReceiverPolarisation', 'VH'))
.map(mask_edge)
.select(s1_bands))
img_s1 = s1.sort(TIME_KEY, False).first()
acq_iso_s1 = ee.Date(img_s1.get(TIME_KEY)).format().getInfo()
days_s1 = days_since_utc(acq_iso_s1)
if days_s1 > days_s2:
estimation_date = acq_iso_s2
else:
estimation_date = acq_iso_s1
# DEM
s1_proj = img_s1.projection()
elevation = DEM.reproject(s1_proj).clip(roi)
s1_s2_dem_image = (
img_s1
.addBands(img_s2)
.addBands(elevation)
)
# Convert the image to a dictionary
image_dict = s1_s2_dem_image.reduceRegion(
reducer=ee.Reducer.mean(),
geometry=roi,
scale=10,
maxPixels=1e8
).getInfo()
if image_dict:
init_dict = {k: image_dict.get(k) for k in (s2_bands + s1_bands + ['elevation'])}
data = pd.DataFrame([init_dict])
data["lon"] = lon
data["lat"] = lat
easting, northing = projected_xy(lon, lat)
data["latitude_proj"] = northing
data["longitude_proj"] = easting
data = add_s2_indices(data)
data = add_s1_indices(data)
ndvi_mean = data["NDVI"].values[0] if "NDVI" in data else None
else:
data = None
ndvi_mean = None
return {
"X": data,
"cloud": float(cloud_cover),
"estimation_date": estimation_date.split("T")[0],
"ndvi_mean": ndvi_mean
}, None
|