Spaces:
Running
Running
| import os | |
| import time | |
| import logging | |
| import argparse | |
| import numpy as np | |
| import pandas as pd | |
| import ee | |
| import pyarrow.parquet as pq | |
| import pyarrow as pa | |
| from datetime import datetime | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| def authenticate_gee(service_account_json): | |
| """Authenticate to GEE using a service account JSON key file.""" | |
| logging.info(f"Authenticating to GEE using {service_account_json}") | |
| try: | |
| credentials = ee.ServiceAccountCredentials( | |
| '', # Earth Engine will extract the email from the JSON | |
| service_account_json | |
| ) | |
| ee.Initialize(credentials) | |
| logging.info("GEE Initialization successful.") | |
| except Exception as e: | |
| logging.error(f"GEE Initialization failed: {e}") | |
| raise | |
| def optimize_dataframe(df): | |
| """Optimize pandas DataFrame memory usage.""" | |
| for col in df.columns: | |
| col_type = df[col].dtype | |
| if col_type == 'object': | |
| df[col] = df[col].astype('category') | |
| elif col_type == 'float64': | |
| df[col] = df[col].astype('float32') | |
| elif col_type == 'int64': | |
| df[col] = pd.to_numeric(df[col], downcast='integer') | |
| return df | |
| def execute_with_backoff(gee_task_func, max_retries=5, base_delay=2): | |
| """Execute a GEE task with exponential backoff to handle rate limits.""" | |
| retries = 0 | |
| while retries < max_retries: | |
| try: | |
| return gee_task_func() | |
| except ee.EEException as e: | |
| err_str = str(e).lower() | |
| if 'too many requests' in err_str or 'rate limited' in err_str or 'capacity' in err_str or 'timed out' in err_str: | |
| delay = base_delay * (2 ** retries) | |
| logging.warning(f"GEE API error: {e}. Retrying in {delay} seconds (Attempt {retries + 1}/{max_retries})") | |
| time.sleep(delay) | |
| retries += 1 | |
| else: | |
| raise e | |
| raise Exception("Max retries exceeded for GEE API.") | |
| def get_gee_features_for_chunk(df_chunk, year, month): | |
| """ | |
| Process a chunk of observations for a specific Year-Month block. | |
| Converts to ee.FeatureCollection and extracts variables using Earth Engine. | |
| """ | |
| # 1. Prepare features | |
| features = [] | |
| for idx, row in df_chunk.iterrows(): | |
| # Ensure we have valid coordinates | |
| if pd.isna(row['longitude']) or pd.isna(row['latitude']): | |
| continue | |
| # Parse date to millis since epoch for GEE filtering | |
| try: | |
| if isinstance(row['date'], str): | |
| dt = datetime.strptime(row['date'].split(' ')[0], "%Y-%m-%d") | |
| else: | |
| dt = pd.to_datetime(row['date']) | |
| date_ms = int(dt.timestamp() * 1000) | |
| except Exception: | |
| # Fallback if date is malformed | |
| dt = datetime(year, month, 1) | |
| date_ms = int(dt.timestamp() * 1000) | |
| geom = ee.Geometry.Point([row['longitude'], row['latitude']]) | |
| feat = ee.Feature(geom, { | |
| 'image_id': str(row['image_id']), | |
| 'date_ms': date_ms, | |
| 'year': year | |
| }) | |
| features.append(feat) | |
| if not features: | |
| return [] | |
| # Limit chunk size to avoid payload too large errors (GEE limit is ~10MB payload) | |
| # If the chunk is huge (e.g. >5000), we split it. Here we process the given chunk. | |
| fc = ee.FeatureCollection(features) | |
| # 2. Define the extraction function to map over the FeatureCollection | |
| # We use map() because ERA5 and MODIS are time-dependent per exact feature date. | |
| def extract_point_data(feature): | |
| # Feature Date | |
| f_date = ee.Date(feature.getNumber('date_ms')) | |
| f_year = feature.getNumber('year') | |
| geom = feature.geometry() | |
| # A. SRTM Elevation (Static) | |
| srtm = ee.Image('CGIAR/SRTM90_V4') | |
| elev_dict = srtm.reduceRegion(reducer=ee.Reducer.first(), geometry=geom, scale=90) | |
| # B. Copernicus Land Cover (Yearly) | |
| copernicus_ic = ee.ImageCollection('COPERNICUS/Landcover/100m/Proba-V-C3/Global') \ | |
| .filter(ee.Filter.calendarRange(f_year, f_year, 'year')) | |
| # Fallback to 2019 if year is beyond available data (2015-2019 available) | |
| lc_image = ee.Algorithms.If( | |
| copernicus_ic.size().gt(0), | |
| copernicus_ic.first(), | |
| ee.ImageCollection('COPERNICUS/Landcover/100m/Proba-V-C3/Global').filterDate('2019-01-01', '2019-12-31').first() | |
| ) | |
| lc_dict = ee.Image(lc_image).select('discrete_classification').reduceRegion(reducer=ee.Reducer.first(), geometry=geom, scale=100) | |
| # C. ERA5 Daily (Exact Date) | |
| era5_ic = ee.ImageCollection('ECMWF/ERA5/DAILY') \ | |
| .filterDate(f_date, f_date.advance(1, 'day')) | |
| # If available, sample mean_2m_air_temperature and total_precipitation | |
| era5_image = ee.Algorithms.If( | |
| era5_ic.size().gt(0), | |
| era5_ic.first(), | |
| ee.Image.constant(ee.Number(-9999)).rename('mean_2m_air_temperature').addBands(ee.Image.constant(ee.Number(-9999)).rename('total_precipitation')) | |
| ) | |
| era5_dict = ee.Image(era5_image).select(['mean_2m_air_temperature', 'total_precipitation']).reduceRegion(reducer=ee.Reducer.first(), geometry=geom, scale=27830) | |
| # D. MODIS NDVI (16-day composite, closest to date) | |
| modis_ic = ee.ImageCollection('MODIS/006/MOD13A2') \ | |
| .filterDate(f_date.advance(-16, 'day'), f_date.advance(16, 'day')) | |
| modis_image = ee.Algorithms.If( | |
| modis_ic.size().gt(0), | |
| modis_ic.first(), | |
| ee.Image.constant(ee.Number(-9999)).rename('NDVI') | |
| ) | |
| modis_dict = ee.Image(modis_image).select('NDVI').reduceRegion(reducer=ee.Reducer.first(), geometry=geom, scale=1000) | |
| # Combine all dictionaries into feature properties | |
| return feature.set(elev_dict).set(lc_dict).set(era5_dict).set(modis_dict) | |
| # Apply extraction | |
| extracted_fc = fc.map(extract_point_data) | |
| # 3. Retrieve results locally | |
| def fetch_data(): | |
| return extracted_fc.getInfo()['features'] | |
| raw_results = execute_with_backoff(fetch_data) | |
| # 4. Parse results back to a dictionary format suitable for pandas | |
| processed_data = [] | |
| for feat in raw_results: | |
| props = feat.get('properties', {}) | |
| processed_data.append({ | |
| 'image_id': props.get('image_id'), | |
| 'elevation': props.get('elevation'), | |
| 'landcover_class': props.get('discrete_classification'), | |
| 'temperature_2m': props.get('mean_2m_air_temperature'), | |
| 'total_precipitation': props.get('total_precipitation'), | |
| 'ndvi': props.get('NDVI') | |
| }) | |
| return processed_data | |
| def chunker(seq, size): | |
| """Yield successive chunks from a list/dataframe.""" | |
| for pos in range(0, len(seq), size): | |
| yield seq.iloc[pos:pos + size] if isinstance(seq, pd.DataFrame) else seq[pos:pos + size] | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Extract environmental features from GEE.") | |
| parser.add_argument('--input_parquet', type=str, required=True, help="Path to base metadata parquet.") | |
| parser.add_argument('--output_parquet', type=str, required=True, help="Path to save enriched parquet.") | |
| parser.add_argument('--service_account_json', type=str, required=True, help="Path to GEE JSON key.") | |
| parser.add_argument('--chunk_size', type=int, default=2500, help="Max features per GEE payload.") | |
| args = parser.parse_args() | |
| authenticate_gee(args.service_account_json) | |
| logging.info(f"Loading {args.input_parquet}") | |
| # Load metadata and optimize memory | |
| df = pd.read_parquet(args.input_parquet) | |
| df = optimize_dataframe(df) | |
| # Ensure date column is datetime to extract year and month | |
| if 'date' in df.columns and not pd.api.types.is_datetime64_any_dtype(df['date']): | |
| df['parsed_date'] = pd.to_datetime(df['date'], errors='coerce') | |
| elif 'date' in df.columns: | |
| df['parsed_date'] = df['date'] | |
| else: | |
| raise ValueError("No 'date' column found in parquet file.") | |
| df['year'] = df['parsed_date'].dt.year | |
| df['month'] = df['parsed_date'].dt.month | |
| # Prepare output tracking | |
| all_enriched_records = [] | |
| total_processed = 0 | |
| # Group by Year and Month as requested | |
| grouped = df.dropna(subset=['year', 'month']).groupby(['year', 'month']) | |
| total_groups = len(grouped) | |
| logging.info(f"Starting GEE extraction across {total_groups} Year-Month blocks.") | |
| for i, ((year, month), group_df) in enumerate(grouped): | |
| year = int(year) | |
| month = int(month) | |
| logging.info(f"Processing Block {i+1}/{total_groups}: {year}-{month:02d} ({len(group_df)} records)") | |
| # Sub-chunking to respect GEE payload limits (CRITICAL for millions of rows) | |
| for chunk_df in chunker(group_df, args.chunk_size): | |
| extracted_chunk = get_gee_features_for_chunk(chunk_df, year, month) | |
| all_enriched_records.extend(extracted_chunk) | |
| total_processed += len(extracted_chunk) | |
| if total_processed % 10000 == 0: | |
| logging.info(f" ...processed {total_processed} records so far.") | |
| logging.info("Extraction complete. Merging with base metadata.") | |
| # Convert extracted list of dicts to DataFrame | |
| df_extracted = pd.DataFrame(all_enriched_records) | |
| # Drop temp columns used for grouping | |
| df.drop(columns=['parsed_date', 'year', 'month'], inplace=True, errors='ignore') | |
| # Ensure image_id types match for merge | |
| df['image_id'] = df['image_id'].astype(str) | |
| df_extracted['image_id'] = df_extracted['image_id'].astype(str) | |
| # Merge based on image_id | |
| df_final = pd.merge(df, df_extracted, on='image_id', how='left') | |
| # Memory optimization before saving | |
| df_final = optimize_dataframe(df_final) | |
| logging.info(f"Saving enriched metadata to {args.output_parquet}") | |
| # Write in chunks if needed, but pyarrow handles large DF writes well | |
| df_final.to_parquet(args.output_parquet, engine='pyarrow', index=False) | |
| logging.info("Done.") | |
| if __name__ == "__main__": | |
| main() | |