""" Feature engineering module for biomass prediction. This module extracts the 99 features needed by the StableResNet model. Author: najahpokkiri Date: 2025-05-19 """ import numpy as np import logging from datetime import datetime # Configure logger logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Try to import optional dependencies but don't fail if not available try: from sklearn.preprocessing import StandardScaler from sklearn.decomposition import PCA SKLEARN_AVAILABLE = True except ImportError: SKLEARN_AVAILABLE = False logger.warning("scikit-learn not available. PCA features will be approximated.") try: from skimage.filters import sobel from skimage.feature import local_binary_pattern, graycomatrix, graycoprops SKIMAGE_AVAILABLE = True except ImportError: SKIMAGE_AVAILABLE = False logger.warning("scikit-image not available. Texture features will be approximated.") def safe_divide(a, b, fill_value=0.0): """Safe division that handles zeros in the denominator""" a = np.asarray(a, dtype=np.float32) b = np.asarray(b, dtype=np.float32) # Handle NaN/Inf in inputs a = np.nan_to_num(a, nan=0.0, posinf=0.0, neginf=0.0) b = np.nan_to_num(b, nan=1e-10, posinf=1e10, neginf=-1e10) mask = np.abs(b) < 1e-10 result = np.full_like(a, fill_value, dtype=np.float32) if np.any(~mask): result[~mask] = a[~mask] / b[~mask] return np.nan_to_num(result, nan=fill_value, posinf=fill_value, neginf=fill_value) def calculate_spectral_indices(satellite_data): """Calculate spectral indices from satellite bands""" indices = {} n_bands = satellite_data.shape[0] # Enhanced band mapping with error checking def safe_get_band(idx): return satellite_data[idx] if idx < n_bands else None # Sentinel-2 bands (assuming standard band order) # B2(blue), B3(green), B4(red), B8(nir), B11(swir1), B12(swir2) try: blue = safe_get_band(1) # Adjust indices based on your data green = safe_get_band(2) red = safe_get_band(3) nir = safe_get_band(7) swir1 = safe_get_band(9) swir2 = safe_get_band(10) if all(b is not None for b in [red, nir]): # NDVI (Normalized Difference Vegetation Index) indices['NDVI'] = safe_divide(nir - red, nir + red) if blue is not None and green is not None: # EVI (Enhanced Vegetation Index) indices['EVI'] = 2.5 * safe_divide(nir - red, nir + 6*red - 7.5*blue + 1) # SAVI (Soil Adjusted Vegetation Index) indices['SAVI'] = 1.5 * safe_divide(nir - red, nir + red + 0.5) # MSAVI2 (Modified Soil Adjusted Vegetation Index) indices['MSAVI2'] = 0.5 * (2 * nir + 1 - np.sqrt((2 * nir + 1)**2 - 8 * (nir - red))) # NDWI (Normalized Difference Water Index) indices['NDWI'] = safe_divide(green - nir, green + nir) if swir1 is not None and nir is not None: # NDMI (Normalized Difference Moisture Index) indices['NDMI'] = safe_divide(nir - swir1, nir + swir1) if swir2 is not None and nir is not None: # NBR (Normalized Burn Ratio) indices['NBR'] = safe_divide(nir - swir2, nir + swir2) except Exception as e: logger.warning(f"Error calculating spectral indices: {e}") # Clean up None values and NaNs indices = {k: np.nan_to_num(v, nan=0.0) for k, v in indices.items() if v is not None} # Ensure we have all required indices by providing defaults required_indices = ['NDVI', 'EVI', 'SAVI', 'MSAVI2', 'NDWI', 'NDMI', 'NBR'] for idx in required_indices: if idx not in indices: if satellite_data.shape[1] > 0 and satellite_data.shape[2] > 0: indices[idx] = np.zeros((satellite_data.shape[1], satellite_data.shape[2]), dtype=np.float32) return indices def extract_texture_features(satellite_data): """Extract texture features from satellite data""" texture_features = {} height, width = satellite_data.shape[1], satellite_data.shape[2] # If scikit-image is not available, return placeholders if not SKIMAGE_AVAILABLE: texture_names = ['Sobel_B7', 'LBP_B7', 'GLCM_contrast_B7', 'GLCM_dissimilarity_B7', 'GLCM_homogeneity_B7', 'GLCM_energy_B7'] for name in texture_names: texture_features[name] = np.zeros((height, width), dtype=np.float32) return texture_features try: # Use NIR band (band 7) for texture features b7_idx = min(7, satellite_data.shape[0] - 1) band = satellite_data[b7_idx].copy() band = np.nan_to_num(band, nan=0.0) # 1. Sobel filter for edge detection sobel_filtered = sobel(band) texture_features['Sobel_B7'] = sobel_filtered # 2. Local Binary Pattern # Normalize band to 0-255 range for LBP band_norm = band.copy() if np.any(~np.isnan(band)): band_min, band_max = np.nanpercentile(band, [1, 99]) if band_max > band_min: band_norm = np.clip((band - band_min) / (band_max - band_min + 1e-8) * 255, 0, 255).astype(np.uint8) else: band_norm = np.zeros_like(band, dtype=np.uint8) # Calculate LBP lbp = local_binary_pattern(band_norm, 8, 1, method='uniform') texture_features['LBP_B7'] = lbp # 3. GLCM properties # Create sample patch for GLCM calculation sample_size = min(128, height, width) center_y, center_x = height // 2, width // 2 offset = sample_size // 2 y_start = max(0, center_y - offset) y_end = min(height, center_y + offset) x_start = max(0, center_x - offset) x_end = min(width, center_x + offset) patch = band_norm[y_start:y_end, x_start:x_end] # Calculate GLCM properties if patch is valid if patch.size > 0: glcm = graycomatrix(patch, [1], [0], levels=256, symmetric=True, normed=True) for prop in ['contrast', 'dissimilarity', 'homogeneity', 'energy']: try: value = float(graycoprops(glcm, prop)[0, 0]) texture_features[f'GLCM_{prop}_B7'] = np.full((height, width), value) except: texture_features[f'GLCM_{prop}_B7'] = np.zeros((height, width), dtype=np.float32) else: # Create placeholder GLCM features if patch is invalid for prop in ['contrast', 'dissimilarity', 'homogeneity', 'energy']: texture_features[f'GLCM_{prop}_B7'] = np.zeros((height, width), dtype=np.float32) except Exception as e: logger.error(f"Error in texture feature extraction: {e}") # Provide placeholder features in case of error texture_names = ['Sobel_B7', 'LBP_B7', 'GLCM_contrast_B7', 'GLCM_dissimilarity_B7', 'GLCM_homogeneity_B7', 'GLCM_energy_B7'] for name in texture_names: texture_features[name] = np.zeros((height, width), dtype=np.float32) return texture_features def calculate_spatial_features(satellite_data, indices): """Calculate spatial context features like gradients""" spatial_features = {} height, width = satellite_data.shape[1], satellite_data.shape[2] # 1. Gradient of Band 7 (NIR) b7_idx = min(7, satellite_data.shape[0] - 1) band = satellite_data[b7_idx].copy() band = np.nan_to_num(band, nan=0.0) try: # Calculate the gradient magnitude grad_y, grad_x = np.gradient(band) grad_magnitude = np.sqrt(grad_x**2 + grad_y**2) spatial_features['Gradient_B7'] = grad_magnitude except Exception as e: logger.warning(f"Error calculating band gradient: {e}") spatial_features['Gradient_B7'] = np.zeros((height, width), dtype=np.float32) # 2. NDVI gradient try: ndvi = indices.get('NDVI', np.zeros((height, width), dtype=np.float32)) ndvi = np.nan_to_num(ndvi, nan=0.0) # Calculate the gradient magnitude for NDVI grad_y, grad_x = np.gradient(ndvi) grad_magnitude = np.sqrt(grad_x**2 + grad_y**2) spatial_features['NDVI_gradient'] = grad_magnitude except Exception as e: logger.warning(f"Error calculating NDVI gradient: {e}") spatial_features['NDVI_gradient'] = np.zeros((height, width), dtype=np.float32) return spatial_features def calculate_pca_features(satellite_data, n_components=25): """Calculate PCA features from satellite bands""" pca_features = {} height, width = satellite_data.shape[1], satellite_data.shape[2] n_bands = satellite_data.shape[0] # If scikit-learn is not available, return placeholders if not SKLEARN_AVAILABLE: for i in range(1, n_components + 1): # Create some basic derived features as placeholders if i <= n_bands: # Use band values directly for first components pca_features[f'PCA_{i:02d}'] = satellite_data[i-1] else: # Create synthetic features for remaining components pca_features[f'PCA_{i:02d}'] = np.zeros((height, width), dtype=np.float32) return pca_features try: # Reshape for PCA (pixels x bands) bands_reshaped = satellite_data.reshape(n_bands, -1).T # Handle NaN values valid_mask = ~np.any(np.isnan(bands_reshaped), axis=1) bands_clean = bands_reshaped[valid_mask] if len(bands_clean) == 0: logger.warning("No valid data for PCA calculation") # Create placeholder PCA features for i in range(1, n_components + 1): pca_features[f'PCA_{i:02d}'] = np.zeros((height, width), dtype=np.float32) return pca_features # Standardize valid data scaler = StandardScaler() bands_scaled = scaler.fit_transform(bands_clean) # Calculate PCA pca = PCA(n_components=min(n_components, bands_scaled.shape[1], bands_scaled.shape[0])) pca_result = pca.fit_transform(bands_scaled) # Extend to full 25 components if needed actual_components = pca_result.shape[1] if actual_components < n_components: logger.warning(f"Only {actual_components} PCA components calculated, padding to {n_components}") padding = np.zeros((pca_result.shape[0], n_components - actual_components)) pca_result = np.hstack([pca_result, padding]) # Map back to original pixels pca_all = np.zeros((bands_reshaped.shape[0], n_components)) pca_all[valid_mask] = pca_result # Reshape to spatial dimensions pca_spatial = pca_all.reshape(height, width, n_components) # Store each component with the correct naming for i in range(1, n_components + 1): pca_features[f'PCA_{i:02d}'] = pca_spatial[:, :, i-1] # Log PCA explained variance if hasattr(pca, 'explained_variance_ratio_'): logger.info(f"PCA explained variance: {pca.explained_variance_ratio_.sum():.3f}") except Exception as e: logger.error(f"Error calculating PCA features: {e}") # Create placeholder PCA features for i in range(1, n_components + 1): pca_features[f'PCA_{i:02d}'] = np.zeros((height, width), dtype=np.float32) return pca_features def extract_all_features(satellite_data): """ Extract exactly 99 features needed by the model: - 59 original bands - 7 spectral indices - 6 texture features - 2 spatial features - 25 PCA components Parameters: satellite_data (ndarray): Array of shape (bands, height, width) Returns: features_array (ndarray): Array of shape (valid_pixels, 99) valid_mask (ndarray): Boolean mask of valid pixels feature_names (list): List of 99 feature names """ start_time = datetime.now() logger.info("Extracting features for biomass prediction...") height, width = satellite_data.shape[1], satellite_data.shape[2] # Create valid pixel mask (no NaN or Inf values) valid_mask = np.all(np.isfinite(satellite_data), axis=0) valid_y, valid_x = np.where(valid_mask) n_valid = len(valid_y) logger.info(f"Found {n_valid} valid pixels out of {height*width}") # Generate all feature categories logger.info("Calculating spectral indices...") indices = calculate_spectral_indices(satellite_data) logger.info("Extracting texture features...") texture_features = extract_texture_features(satellite_data) logger.info("Calculating spatial features...") spatial_features = calculate_spatial_features(satellite_data, indices) logger.info("Computing PCA components...") pca_features = calculate_pca_features(satellite_data) # Define the ordered list of feature names feature_names = [] # 1. Add original band names (Band_01 through Band_59) for i in range(1, 60): feature_names.append(f'Band_{i:02d}') # 2. Add spectral indices spectral_indices = ['NDVI', 'EVI', 'SAVI', 'MSAVI2', 'NDWI', 'NDMI', 'NBR'] feature_names.extend(spectral_indices) # 3. Add texture features texture_names = ['Sobel_B7', 'LBP_B7', 'GLCM_contrast_B7', 'GLCM_dissimilarity_B7', 'GLCM_homogeneity_B7', 'GLCM_energy_B7'] feature_names.extend(texture_names) # 4. Add spatial features spatial_names = ['Gradient_B7', 'NDVI_gradient'] feature_names.extend(spatial_names) # 5. Add PCA components for i in range(1, 26): feature_names.append(f'PCA_{i:02d}') # Create feature dictionary with all features all_features = {} # 1. Original bands for i in range(min(satellite_data.shape[0], 59)): all_features[f'Band_{i+1:02d}'] = satellite_data[i] # Pad with zeros if we have fewer than 59 bands for i in range(satellite_data.shape[0], 59): all_features[f'Band_{i+1:02d}'] = np.zeros((height, width), dtype=np.float32) # 2. Add other feature categories all_features.update(indices) all_features.update(texture_features) all_features.update(spatial_features) all_features.update(pca_features) # Verify we have exactly 99 features assert len(feature_names) == 99, f"Expected 99 features, but got {len(feature_names)}" # Extract feature values for valid pixels feature_matrix = np.zeros((n_valid, len(feature_names)), dtype=np.float32) for i, name in enumerate(feature_names): if name in all_features: feature_data = all_features[name] if feature_data.ndim == 2: feature_values = feature_data[valid_y, valid_x] else: feature_values = np.full(n_valid, feature_data) feature_matrix[:, i] = np.nan_to_num(feature_values, nan=0.0) else: logger.warning(f"Feature '{name}' not found, using zeros") feature_matrix[:, i] = 0.0 end_time = datetime.now() processing_time = (end_time - start_time).total_seconds() logger.info(f"Successfully extracted {len(feature_names)} features for {n_valid} pixels in {processing_time:.2f} seconds") return feature_matrix, valid_mask, feature_names # Simple test function def test_feature_extraction(): """Test the feature extraction pipeline with sample data""" try: # Create sample data (5 bands, 100x100 pixels) satellite_data = np.random.random((5, 100, 100)).astype(np.float32) # Extract features feature_matrix, valid_mask, feature_names = extract_all_features(satellite_data) # Print summary print(f"Sample data shape: {satellite_data.shape}") print(f"Feature matrix shape: {feature_matrix.shape}") print(f"Number of feature names: {len(feature_names)}") print(f"Valid pixels: {np.sum(valid_mask)}") return True except Exception as e: print(f"Feature extraction test failed: {e}") import traceback traceback.print_exc() return False if __name__ == "__main__": # Run a simple test if this script is executed directly test_feature_extraction()