""" Comprehensive vegetation and terrain analysis module for RehabWatch. Performs multi-index analysis including: - Vegetation indices (NDVI, SAVI, EVI) - Soil/water indices (BSI, NDWI, NDMI, NBR) - Terrain analysis (slope, aspect, erosion risk) - Land cover classification and change - Rehabilitation metrics and scoring """ import numpy as np import xarray as xr from datetime import datetime, timedelta from typing import Dict, Any, Optional, Tuple, List from .stac_utils import ( get_sentinel_composite, calculate_ndvi, calculate_savi, calculate_evi, calculate_ndwi, calculate_ndmi, calculate_bsi, calculate_nbr, calculate_all_indices, calculate_vegetation_heterogeneity, get_dem_data, calculate_slope, calculate_aspect, calculate_terrain_ruggedness, calculate_erosion_risk, get_land_cover, get_worldcover, calculate_land_cover_change, calculate_vegetation_cover_percent, calculate_bare_ground_percent, search_sentinel2, create_reference_bbox, get_bbox_center, LULC_CLASSES, WORLDCOVER_CLASSES ) def analyze_vegetation_change( bbox: Tuple[float, float, float, float], date_before: str, date_after: str, window_days: int = 15, cloud_threshold: int = 25 ) -> Dict[str, Any]: """ Analyze vegetation change between two dates using multiple indices. Args: bbox: Bounding box (min_lon, min_lat, max_lon, max_lat) date_before: Start date (YYYY-MM-DD) date_after: End date (YYYY-MM-DD) window_days: Days before/after each date for composite (default 15) cloud_threshold: Maximum cloud cover percentage Returns: Dict containing composites, indices, and statistics """ # Parse dates and create windows before_dt = datetime.strptime(date_before, '%Y-%m-%d') after_dt = datetime.strptime(date_after, '%Y-%m-%d') before_start = (before_dt - timedelta(days=window_days)).strftime('%Y-%m-%d') before_end = (before_dt + timedelta(days=window_days)).strftime('%Y-%m-%d') after_start = (after_dt - timedelta(days=window_days)).strftime('%Y-%m-%d') after_end = (after_dt + timedelta(days=window_days)).strftime('%Y-%m-%d') # Get cloud-free composites composite_before = get_sentinel_composite( bbox, before_start, before_end, cloud_threshold ) composite_after = get_sentinel_composite( bbox, after_start, after_end, cloud_threshold ) # Calculate all indices for both periods indices_before = calculate_all_indices(composite_before) indices_after = calculate_all_indices(composite_after) # Calculate changes for each index index_changes = {} for key in indices_before: index_changes[key] = indices_after[key] - indices_before[key] # Calculate vegetation heterogeneity (proxy for diversity) heterogeneity_before = calculate_vegetation_heterogeneity(indices_before['ndvi']) heterogeneity_after = calculate_vegetation_heterogeneity(indices_after['ndvi']) # Calculate comprehensive statistics stats = calculate_statistics( indices_before, indices_after, index_changes, bbox ) return { 'composite_before': composite_before, 'composite_after': composite_after, 'indices_before': indices_before, 'indices_after': indices_after, 'index_changes': index_changes, 'ndvi_before': indices_before['ndvi'], 'ndvi_after': indices_after['ndvi'], 'ndvi_change': index_changes['ndvi'], 'heterogeneity_before': heterogeneity_before, 'heterogeneity_after': heterogeneity_after, 'stats': stats, 'date_before': date_before, 'date_after': date_after, 'bbox': bbox } def analyze_terrain( bbox: Tuple[float, float, float, float], bsi: Optional[xr.DataArray] = None ) -> Dict[str, Any]: """ Analyze terrain characteristics including slope, aspect, and erosion risk. Args: bbox: Bounding box (min_lon, min_lat, max_lon, max_lat) bsi: Bare Soil Index array (optional, for erosion risk) Returns: Dict containing terrain data and statistics """ try: # Get DEM data dem = get_dem_data(bbox) # Calculate terrain derivatives slope = calculate_slope(dem) aspect = calculate_aspect(dem) ruggedness = calculate_terrain_ruggedness(dem) # Calculate erosion risk if BSI provided erosion_risk = None if bsi is not None: # Resample BSI to match DEM resolution if needed erosion_risk = calculate_erosion_risk(slope, bsi) # Calculate statistics terrain_stats = { 'elevation_min': float(np.nanmin(dem.values)), 'elevation_max': float(np.nanmax(dem.values)), 'elevation_mean': float(np.nanmean(dem.values)), 'slope_mean': float(np.nanmean(slope.values)), 'slope_max': float(np.nanmax(slope.values)), 'ruggedness_mean': float(np.nanmean(ruggedness.values)), } # Slope classification flat_pixels = np.sum(slope.values < 5) gentle_pixels = np.sum((slope.values >= 5) & (slope.values < 15)) moderate_pixels = np.sum((slope.values >= 15) & (slope.values < 30)) steep_pixels = np.sum(slope.values >= 30) total_pixels = slope.size terrain_stats['percent_flat'] = round((flat_pixels / total_pixels) * 100, 1) terrain_stats['percent_gentle'] = round((gentle_pixels / total_pixels) * 100, 1) terrain_stats['percent_moderate'] = round((moderate_pixels / total_pixels) * 100, 1) terrain_stats['percent_steep'] = round((steep_pixels / total_pixels) * 100, 1) if erosion_risk is not None: terrain_stats['erosion_risk_mean'] = float(np.nanmean(erosion_risk.values)) high_risk = np.sum(erosion_risk.values > 0.6) terrain_stats['percent_high_erosion_risk'] = round((high_risk / total_pixels) * 100, 1) return { 'dem': dem, 'slope': slope, 'aspect': aspect, 'ruggedness': ruggedness, 'erosion_risk': erosion_risk, 'stats': terrain_stats } except Exception as e: return { 'error': str(e), 'stats': {} } def analyze_land_cover( bbox: Tuple[float, float, float, float], year_before: int, year_after: int ) -> Dict[str, Any]: """ Analyze land cover and its changes between two years. Args: bbox: Bounding box year_before: Earlier year (2017-2023) year_after: Later year (2017-2023) Returns: Dict containing land cover data and statistics """ try: # Get land cover for both years lulc_before = get_land_cover(bbox, year_before) lulc_after = get_land_cover(bbox, year_after) # Calculate change statistics change_stats = calculate_land_cover_change(lulc_before, lulc_after) # Calculate vegetation and bare ground percentages veg_cover_before = calculate_vegetation_cover_percent(lulc_before) veg_cover_after = calculate_vegetation_cover_percent(lulc_after) bare_before = calculate_bare_ground_percent(lulc_before) bare_after = calculate_bare_ground_percent(lulc_after) land_cover_stats = { 'vegetation_cover_before': round(veg_cover_before, 1), 'vegetation_cover_after': round(veg_cover_after, 1), 'vegetation_cover_change': round(veg_cover_after - veg_cover_before, 1), 'bare_ground_before': round(bare_before, 1), 'bare_ground_after': round(bare_after, 1), 'bare_ground_change': round(bare_after - bare_before, 1), 'year_before': year_before, 'year_after': year_after, 'class_changes': change_stats['changes'] } return { 'lulc_before': lulc_before, 'lulc_after': lulc_after, 'stats': land_cover_stats, 'classes': LULC_CLASSES } except Exception as e: return { 'error': str(e), 'stats': {} } def calculate_statistics( indices_before: Dict[str, xr.DataArray], indices_after: Dict[str, xr.DataArray], index_changes: Dict[str, xr.DataArray], bbox: Tuple[float, float, float, float] ) -> Dict[str, float]: """ Calculate comprehensive vegetation and soil statistics. Args: indices_before: Dict of index arrays at start date indices_after: Dict of index arrays at end date index_changes: Dict of index change arrays bbox: Bounding box for area calculation Returns: Dict with comprehensive statistics """ stats = {} # Get NDVI arrays ndvi_before = indices_before['ndvi'] ndvi_after = indices_after['ndvi'] ndvi_change = index_changes['ndvi'] # Get valid (non-NaN) data valid_before = ndvi_before.values[~np.isnan(ndvi_before.values)] valid_after = ndvi_after.values[~np.isnan(ndvi_after.values)] valid_change = ndvi_change.values[~np.isnan(ndvi_change.values)] # NDVI statistics stats['ndvi_before_mean'] = round(float(np.nanmean(valid_before)), 4) if len(valid_before) > 0 else 0 stats['ndvi_after_mean'] = round(float(np.nanmean(valid_after)), 4) if len(valid_after) > 0 else 0 stats['ndvi_change_mean'] = round(float(np.nanmean(valid_change)), 4) if len(valid_change) > 0 else 0 stats['ndvi_change_std'] = round(float(np.nanstd(valid_change)), 4) if len(valid_change) > 0 else 0 # Calculate percent change if stats['ndvi_before_mean'] > 0: stats['percent_change'] = round(((stats['ndvi_after_mean'] - stats['ndvi_before_mean']) / stats['ndvi_before_mean']) * 100, 2) else: stats['percent_change'] = 0 # All other indices - before/after means for idx_name in ['savi', 'evi', 'ndwi', 'ndmi', 'bsi', 'nbr']: if idx_name in indices_before and idx_name in indices_after: before_vals = indices_before[idx_name].values after_vals = indices_after[idx_name].values valid_b = before_vals[~np.isnan(before_vals)] valid_a = after_vals[~np.isnan(after_vals)] stats[f'{idx_name}_before_mean'] = round(float(np.nanmean(valid_b)), 4) if len(valid_b) > 0 else 0 stats[f'{idx_name}_after_mean'] = round(float(np.nanmean(valid_a)), 4) if len(valid_a) > 0 else 0 stats[f'{idx_name}_change'] = round(stats[f'{idx_name}_after_mean'] - stats[f'{idx_name}_before_mean'], 4) # Area calculations (improved, degraded, stable) pixel_area_ha = (10 * 10) / 10000 # 0.01 ha per pixel total_pixels = len(valid_change) improved_pixels = np.sum(valid_change > 0.05) degraded_pixels = np.sum(valid_change < -0.05) stable_pixels = np.sum((valid_change >= -0.05) & (valid_change <= 0.05)) stats['area_improved_ha'] = round(float(improved_pixels * pixel_area_ha), 2) stats['area_degraded_ha'] = round(float(degraded_pixels * pixel_area_ha), 2) stats['area_stable_ha'] = round(float(stable_pixels * pixel_area_ha), 2) stats['total_area_ha'] = round(float(total_pixels * pixel_area_ha), 2) # Calculate percentages if total_pixels > 0: stats['percent_improved'] = round((improved_pixels / total_pixels) * 100, 2) stats['percent_degraded'] = round((degraded_pixels / total_pixels) * 100, 2) stats['percent_stable'] = round((stable_pixels / total_pixels) * 100, 2) else: stats['percent_improved'] = 0 stats['percent_degraded'] = 0 stats['percent_stable'] = 0 # Water presence (from NDWI) if 'ndwi' in indices_after: ndwi_vals = indices_after['ndwi'].values valid_ndwi = ndwi_vals[~np.isnan(ndwi_vals)] water_pixels = np.sum(valid_ndwi > 0) stats['percent_water'] = round((water_pixels / len(valid_ndwi)) * 100, 2) if len(valid_ndwi) > 0 else 0 # Bare soil extent (from BSI) if 'bsi' in indices_after: bsi_vals = indices_after['bsi'].values valid_bsi = bsi_vals[~np.isnan(bsi_vals)] bare_pixels = np.sum(valid_bsi > 0.1) stats['percent_bare_soil'] = round((bare_pixels / len(valid_bsi)) * 100, 2) if len(valid_bsi) > 0 else 0 # Moisture stress (from NDMI) if 'ndmi' in indices_after: ndmi_vals = indices_after['ndmi'].values valid_ndmi = ndmi_vals[~np.isnan(ndmi_vals)] stressed_pixels = np.sum(valid_ndmi < 0) stats['percent_moisture_stressed'] = round((stressed_pixels / len(valid_ndmi)) * 100, 2) if len(valid_ndmi) > 0 else 0 # Vegetation health classification if len(valid_after) > 0: sparse = np.sum((valid_after > 0) & (valid_after <= 0.2)) low = np.sum((valid_after > 0.2) & (valid_after <= 0.4)) moderate = np.sum((valid_after > 0.4) & (valid_after <= 0.6)) dense = np.sum(valid_after > 0.6) stats['percent_sparse_veg'] = round((sparse / len(valid_after)) * 100, 2) stats['percent_low_veg'] = round((low / len(valid_after)) * 100, 2) stats['percent_moderate_veg'] = round((moderate / len(valid_after)) * 100, 2) stats['percent_dense_veg'] = round((dense / len(valid_after)) * 100, 2) return stats def calculate_reference_ndvi( bbox: Tuple[float, float, float, float], date: str, window_days: int = 15, cloud_threshold: int = 25, buffer_deg: float = 0.01 ) -> float: """ Calculate mean NDVI for reference area (buffer around site). """ dt = datetime.strptime(date, '%Y-%m-%d') start = (dt - timedelta(days=window_days)).strftime('%Y-%m-%d') end = (dt + timedelta(days=window_days)).strftime('%Y-%m-%d') ref_bbox = create_reference_bbox(bbox, buffer_deg) try: composite = get_sentinel_composite(ref_bbox, start, end, cloud_threshold) ndvi = calculate_ndvi(composite) valid_ndvi = ndvi.values[~np.isnan(ndvi.values)] return float(np.nanmean(valid_ndvi)) if len(valid_ndvi) > 0 else 0 except Exception: return 0 def calculate_rehab_score(site_ndvi: float, reference_ndvi: float) -> int: """ Calculate rehabilitation score (0-100). The score represents how close the site's vegetation is to the reference (undisturbed) area. """ if reference_ndvi <= 0: return 0 score = (site_ndvi / reference_ndvi) * 100 return min(100, max(0, round(score))) def calculate_comprehensive_rehab_score( stats: Dict[str, float], terrain_stats: Optional[Dict[str, float]] = None, land_cover_stats: Optional[Dict[str, float]] = None, reference_ndvi: float = 0.5 ) -> Dict[str, Any]: """ Calculate comprehensive rehabilitation score using multiple metrics. Returns: Dict with component scores and overall score """ scores = {} # Vegetation score (based on NDVI) site_ndvi = stats.get('ndvi_after_mean', 0) scores['vegetation_score'] = min(100, max(0, round((site_ndvi / reference_ndvi) * 100))) # Improvement score (based on change) improvement = stats.get('percent_improved', 0) degradation = stats.get('percent_degraded', 0) scores['improvement_score'] = min(100, max(0, round(50 + improvement - degradation))) # Soil stability score (based on BSI - lower is better) bare_soil = stats.get('percent_bare_soil', 50) scores['soil_stability_score'] = min(100, max(0, round(100 - bare_soil))) # Moisture score (based on NDMI) moisture_stressed = stats.get('percent_moisture_stressed', 50) scores['moisture_score'] = min(100, max(0, round(100 - moisture_stressed))) # Terrain score (if available) if terrain_stats: erosion_risk = terrain_stats.get('percent_high_erosion_risk', 50) scores['terrain_score'] = min(100, max(0, round(100 - erosion_risk))) # Land cover score (if available) if land_cover_stats: veg_cover = land_cover_stats.get('vegetation_cover_after', 0) scores['land_cover_score'] = min(100, max(0, round(veg_cover))) # Calculate weighted overall score weights = { 'vegetation_score': 0.30, 'improvement_score': 0.25, 'soil_stability_score': 0.20, 'moisture_score': 0.10, 'terrain_score': 0.10, 'land_cover_score': 0.05 } total_weight = 0 weighted_sum = 0 for key, weight in weights.items(): if key in scores: weighted_sum += scores[key] * weight total_weight += weight scores['overall_score'] = round(weighted_sum / total_weight) if total_weight > 0 else 0 return scores def generate_interpretation( stats: Dict[str, float], rehab_score: int, terrain_stats: Optional[Dict] = None, land_cover_stats: Optional[Dict] = None ) -> str: """ Generate comprehensive plain-language interpretation of the analysis results. """ interpretation_parts = [] # Vegetation change interpretation change = stats.get('percent_change', 0) if change > 10: change_text = f"Vegetation cover has significantly improved by {change:.1f}%" elif change > 0: change_text = f"Vegetation cover has moderately improved by {change:.1f}%" elif change > -10: change_text = f"Vegetation cover has slightly declined by {abs(change):.1f}%" else: change_text = f"Vegetation cover has significantly declined by {abs(change):.1f}%" interpretation_parts.append(change_text + " over the analysis period.") # Area breakdown if stats.get('percent_improved', 0) > stats.get('percent_degraded', 0): area_text = (f"Approximately {stats['percent_improved']:.0f}% of the site " f"({stats['area_improved_ha']:.1f} ha) shows vegetation improvement, " f"while {stats['percent_degraded']:.0f}% ({stats['area_degraded_ha']:.1f} ha) " "shows decline.") else: area_text = (f"Approximately {stats['percent_degraded']:.0f}% of the site " f"({stats['area_degraded_ha']:.1f} ha) shows vegetation decline, " f"while {stats['percent_improved']:.0f}% ({stats['area_improved_ha']:.1f} ha) " "shows improvement.") interpretation_parts.append(area_text) # Soil and moisture conditions bare_soil = stats.get('percent_bare_soil', 0) moisture_stress = stats.get('percent_moisture_stressed', 0) if bare_soil > 30: interpretation_parts.append(f"Bare soil covers {bare_soil:.0f}% of the area, indicating potential erosion risk.") elif bare_soil > 10: interpretation_parts.append(f"Moderate bare soil exposure ({bare_soil:.0f}%) is present.") if moisture_stress > 50: interpretation_parts.append(f"Significant moisture stress detected in {moisture_stress:.0f}% of vegetation.") # Water presence water = stats.get('percent_water', 0) if water > 5: interpretation_parts.append(f"Water bodies or saturated areas cover {water:.0f}% of the site.") # Terrain interpretation if terrain_stats: steep = terrain_stats.get('percent_steep', 0) erosion = terrain_stats.get('percent_high_erosion_risk', 0) if steep > 20: interpretation_parts.append(f"The terrain includes {steep:.0f}% steep slopes (>30 degrees).") if erosion > 30: interpretation_parts.append(f"High erosion risk identified in {erosion:.0f}% of the area.") # Land cover interpretation if land_cover_stats: veg_change = land_cover_stats.get('vegetation_cover_change', 0) bare_change = land_cover_stats.get('bare_ground_change', 0) if veg_change > 5: interpretation_parts.append(f"Land cover analysis shows {veg_change:.0f}% increase in vegetated area.") elif veg_change < -5: interpretation_parts.append(f"Land cover analysis shows {abs(veg_change):.0f}% decrease in vegetated area.") if bare_change < -5: interpretation_parts.append(f"Bare ground has decreased by {abs(bare_change):.0f}%.") # Rehabilitation score interpretation if rehab_score >= 80: rehab_text = (f"The site has achieved {rehab_score}% of reference vegetation conditions, " "indicating excellent rehabilitation progress.") elif rehab_score >= 60: rehab_text = (f"The site has achieved {rehab_score}% of reference vegetation conditions, " "indicating good rehabilitation progress.") elif rehab_score >= 40: rehab_text = (f"The site has achieved {rehab_score}% of reference vegetation conditions, " "indicating moderate rehabilitation progress.") elif rehab_score >= 20: rehab_text = (f"The site has achieved {rehab_score}% of reference vegetation conditions, " "indicating early-stage rehabilitation.") else: rehab_text = (f"The site has achieved {rehab_score}% of reference vegetation conditions, " "indicating limited rehabilitation progress to date.") interpretation_parts.append(rehab_text) return " ".join(interpretation_parts) def get_monthly_ndvi_timeseries( bbox: Tuple[float, float, float, float], start_year: int, end_year: int, cloud_threshold: int = 30 ) -> List[Dict[str, Any]]: """ Get monthly NDVI time series for a bounding box. """ results = [] for year in range(start_year, end_year + 1): for month in range(1, 13): now = datetime.now() if year > now.year or (year == now.year and month > now.month): continue start_date = f"{year}-{month:02d}-01" if month == 12: end_date = f"{year}-12-31" else: next_month = datetime(year, month + 1, 1) end_of_month = next_month - timedelta(days=1) end_date = end_of_month.strftime('%Y-%m-%d') try: items = search_sentinel2(bbox, start_date, end_date, cloud_threshold) if len(items) > 0: composite = get_sentinel_composite( bbox, start_date, end_date, cloud_threshold ) ndvi = calculate_ndvi(composite) valid_ndvi = ndvi.values[~np.isnan(ndvi.values)] if len(valid_ndvi) > 0: mean_ndvi = float(np.nanmean(valid_ndvi)) results.append({ 'date': f"{year}-{month:02d}-15", 'ndvi': mean_ndvi }) except Exception: continue return sorted(results, key=lambda x: x['date']) def get_multi_index_timeseries( bbox: Tuple[float, float, float, float], start_year: int, end_year: int, cloud_threshold: int = 30 ) -> List[Dict[str, Any]]: """ Get monthly time series for multiple indices. """ results = [] for year in range(start_year, end_year + 1): for month in range(1, 13): now = datetime.now() if year > now.year or (year == now.year and month > now.month): continue start_date = f"{year}-{month:02d}-01" if month == 12: end_date = f"{year}-12-31" else: next_month = datetime(year, month + 1, 1) end_of_month = next_month - timedelta(days=1) end_date = end_of_month.strftime('%Y-%m-%d') try: items = search_sentinel2(bbox, start_date, end_date, cloud_threshold) if len(items) > 0: composite = get_sentinel_composite( bbox, start_date, end_date, cloud_threshold ) indices = calculate_all_indices(composite) record = {'date': f"{year}-{month:02d}-15"} for idx_name, idx_data in indices.items(): valid_vals = idx_data.values[~np.isnan(idx_data.values)] if len(valid_vals) > 0: record[idx_name] = float(np.nanmean(valid_vals)) if len(record) > 1: results.append(record) except Exception: continue return sorted(results, key=lambda x: x['date']) def calculate_seasonal_stability(timeseries: List[Dict[str, Any]]) -> Dict[str, float]: """ Calculate seasonal stability metrics from time series data. Lower variance indicates more stable ecosystem function. """ if len(timeseries) < 4: return {} ndvi_values = [r.get('ndvi', 0) for r in timeseries if 'ndvi' in r] if len(ndvi_values) < 4: return {} return { 'ndvi_mean': round(float(np.mean(ndvi_values)), 4), 'ndvi_std': round(float(np.std(ndvi_values)), 4), 'ndvi_cv': round(float(np.std(ndvi_values) / np.mean(ndvi_values)) * 100, 2), 'ndvi_min': round(float(np.min(ndvi_values)), 4), 'ndvi_max': round(float(np.max(ndvi_values)), 4), 'ndvi_range': round(float(np.max(ndvi_values) - np.min(ndvi_values)), 4) } def ndvi_to_image_array(ndvi: xr.DataArray) -> np.ndarray: """Convert NDVI xarray to a colored numpy array for visualization.""" import matplotlib.pyplot as plt from matplotlib.colors import LinearSegmentedColormap colors = ['#8B4513', '#D2B48C', '#FFFF00', '#90EE90', '#228B22', '#006400'] cmap = LinearSegmentedColormap.from_list('ndvi', colors) ndvi_normalized = (ndvi.values - (-0.1)) / (0.8 - (-0.1)) ndvi_normalized = np.clip(ndvi_normalized, 0, 1) rgba = cmap(ndvi_normalized) rgb = (rgba[:, :, :3] * 255).astype(np.uint8) return rgb def change_to_image_array(change: xr.DataArray) -> np.ndarray: """Convert NDVI change xarray to a colored numpy array for visualization.""" import matplotlib.pyplot as plt from matplotlib.colors import LinearSegmentedColormap colors = ['#B71C1C', '#EF9A9A', '#FFFFFF', '#A5D6A7', '#1B5E20'] cmap = LinearSegmentedColormap.from_list('change', colors) change_normalized = (change.values - (-0.3)) / (0.3 - (-0.3)) change_normalized = np.clip(change_normalized, 0, 1) rgba = cmap(change_normalized) rgb = (rgba[:, :, :3] * 255).astype(np.uint8) return rgb def index_to_image_array( data: xr.DataArray, colormap: str = 'viridis', vmin: float = -1, vmax: float = 1 ) -> np.ndarray: """Convert any index xarray to a colored numpy array.""" import matplotlib.pyplot as plt cmap = plt.get_cmap(colormap) data_normalized = (data.values - vmin) / (vmax - vmin) data_normalized = np.clip(data_normalized, 0, 1) rgba = cmap(data_normalized) rgb = (rgba[:, :, :3] * 255).astype(np.uint8) return rgb