Spaces:
Running
Running
| """ | |
| Feature engineering module for biomass prediction. | |
| This module extracts the 99 features needed by the StableResNet model. | |
| Author: najahpokkiri | |
| Date: 2025-05-19 | |
| """ | |
| import numpy as np | |
| import logging | |
| from datetime import datetime | |
| # Configure logger | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # Try to import optional dependencies but don't fail if not available | |
| try: | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.decomposition import PCA | |
| SKLEARN_AVAILABLE = True | |
| except ImportError: | |
| SKLEARN_AVAILABLE = False | |
| logger.warning("scikit-learn not available. PCA features will be approximated.") | |
| try: | |
| from skimage.filters import sobel | |
| from skimage.feature import local_binary_pattern, graycomatrix, graycoprops | |
| SKIMAGE_AVAILABLE = True | |
| except ImportError: | |
| SKIMAGE_AVAILABLE = False | |
| logger.warning("scikit-image not available. Texture features will be approximated.") | |
| def safe_divide(a, b, fill_value=0.0): | |
| """Safe division that handles zeros in the denominator""" | |
| a = np.asarray(a, dtype=np.float32) | |
| b = np.asarray(b, dtype=np.float32) | |
| # Handle NaN/Inf in inputs | |
| a = np.nan_to_num(a, nan=0.0, posinf=0.0, neginf=0.0) | |
| b = np.nan_to_num(b, nan=1e-10, posinf=1e10, neginf=-1e10) | |
| mask = np.abs(b) < 1e-10 | |
| result = np.full_like(a, fill_value, dtype=np.float32) | |
| if np.any(~mask): | |
| result[~mask] = a[~mask] / b[~mask] | |
| return np.nan_to_num(result, nan=fill_value, posinf=fill_value, neginf=fill_value) | |
| def calculate_spectral_indices(satellite_data): | |
| """Calculate spectral indices from satellite bands""" | |
| indices = {} | |
| n_bands = satellite_data.shape[0] | |
| # Enhanced band mapping with error checking | |
| def safe_get_band(idx): | |
| return satellite_data[idx] if idx < n_bands else None | |
| # Sentinel-2 bands (assuming standard band order) | |
| # B2(blue), B3(green), B4(red), B8(nir), B11(swir1), B12(swir2) | |
| try: | |
| blue = safe_get_band(1) # Adjust indices based on your data | |
| green = safe_get_band(2) | |
| red = safe_get_band(3) | |
| nir = safe_get_band(7) | |
| swir1 = safe_get_band(9) | |
| swir2 = safe_get_band(10) | |
| if all(b is not None for b in [red, nir]): | |
| # NDVI (Normalized Difference Vegetation Index) | |
| indices['NDVI'] = safe_divide(nir - red, nir + red) | |
| if blue is not None and green is not None: | |
| # EVI (Enhanced Vegetation Index) | |
| indices['EVI'] = 2.5 * safe_divide(nir - red, nir + 6*red - 7.5*blue + 1) | |
| # SAVI (Soil Adjusted Vegetation Index) | |
| indices['SAVI'] = 1.5 * safe_divide(nir - red, nir + red + 0.5) | |
| # MSAVI2 (Modified Soil Adjusted Vegetation Index) | |
| indices['MSAVI2'] = 0.5 * (2 * nir + 1 - np.sqrt((2 * nir + 1)**2 - 8 * (nir - red))) | |
| # NDWI (Normalized Difference Water Index) | |
| indices['NDWI'] = safe_divide(green - nir, green + nir) | |
| if swir1 is not None and nir is not None: | |
| # NDMI (Normalized Difference Moisture Index) | |
| indices['NDMI'] = safe_divide(nir - swir1, nir + swir1) | |
| if swir2 is not None and nir is not None: | |
| # NBR (Normalized Burn Ratio) | |
| indices['NBR'] = safe_divide(nir - swir2, nir + swir2) | |
| except Exception as e: | |
| logger.warning(f"Error calculating spectral indices: {e}") | |
| # Clean up None values and NaNs | |
| indices = {k: np.nan_to_num(v, nan=0.0) for k, v in indices.items() if v is not None} | |
| # Ensure we have all required indices by providing defaults | |
| required_indices = ['NDVI', 'EVI', 'SAVI', 'MSAVI2', 'NDWI', 'NDMI', 'NBR'] | |
| for idx in required_indices: | |
| if idx not in indices: | |
| if satellite_data.shape[1] > 0 and satellite_data.shape[2] > 0: | |
| indices[idx] = np.zeros((satellite_data.shape[1], satellite_data.shape[2]), dtype=np.float32) | |
| return indices | |
| def extract_texture_features(satellite_data): | |
| """Extract texture features from satellite data""" | |
| texture_features = {} | |
| height, width = satellite_data.shape[1], satellite_data.shape[2] | |
| # If scikit-image is not available, return placeholders | |
| if not SKIMAGE_AVAILABLE: | |
| texture_names = ['Sobel_B7', 'LBP_B7', 'GLCM_contrast_B7', 'GLCM_dissimilarity_B7', | |
| 'GLCM_homogeneity_B7', 'GLCM_energy_B7'] | |
| for name in texture_names: | |
| texture_features[name] = np.zeros((height, width), dtype=np.float32) | |
| return texture_features | |
| try: | |
| # Use NIR band (band 7) for texture features | |
| b7_idx = min(7, satellite_data.shape[0] - 1) | |
| band = satellite_data[b7_idx].copy() | |
| band = np.nan_to_num(band, nan=0.0) | |
| # 1. Sobel filter for edge detection | |
| sobel_filtered = sobel(band) | |
| texture_features['Sobel_B7'] = sobel_filtered | |
| # 2. Local Binary Pattern | |
| # Normalize band to 0-255 range for LBP | |
| band_norm = band.copy() | |
| if np.any(~np.isnan(band)): | |
| band_min, band_max = np.nanpercentile(band, [1, 99]) | |
| if band_max > band_min: | |
| band_norm = np.clip((band - band_min) / (band_max - band_min + 1e-8) * 255, 0, 255).astype(np.uint8) | |
| else: | |
| band_norm = np.zeros_like(band, dtype=np.uint8) | |
| # Calculate LBP | |
| lbp = local_binary_pattern(band_norm, 8, 1, method='uniform') | |
| texture_features['LBP_B7'] = lbp | |
| # 3. GLCM properties | |
| # Create sample patch for GLCM calculation | |
| sample_size = min(128, height, width) | |
| center_y, center_x = height // 2, width // 2 | |
| offset = sample_size // 2 | |
| y_start = max(0, center_y - offset) | |
| y_end = min(height, center_y + offset) | |
| x_start = max(0, center_x - offset) | |
| x_end = min(width, center_x + offset) | |
| patch = band_norm[y_start:y_end, x_start:x_end] | |
| # Calculate GLCM properties if patch is valid | |
| if patch.size > 0: | |
| glcm = graycomatrix(patch, [1], [0], levels=256, symmetric=True, normed=True) | |
| for prop in ['contrast', 'dissimilarity', 'homogeneity', 'energy']: | |
| try: | |
| value = float(graycoprops(glcm, prop)[0, 0]) | |
| texture_features[f'GLCM_{prop}_B7'] = np.full((height, width), value) | |
| except: | |
| texture_features[f'GLCM_{prop}_B7'] = np.zeros((height, width), dtype=np.float32) | |
| else: | |
| # Create placeholder GLCM features if patch is invalid | |
| for prop in ['contrast', 'dissimilarity', 'homogeneity', 'energy']: | |
| texture_features[f'GLCM_{prop}_B7'] = np.zeros((height, width), dtype=np.float32) | |
| except Exception as e: | |
| logger.error(f"Error in texture feature extraction: {e}") | |
| # Provide placeholder features in case of error | |
| texture_names = ['Sobel_B7', 'LBP_B7', 'GLCM_contrast_B7', 'GLCM_dissimilarity_B7', | |
| 'GLCM_homogeneity_B7', 'GLCM_energy_B7'] | |
| for name in texture_names: | |
| texture_features[name] = np.zeros((height, width), dtype=np.float32) | |
| return texture_features | |
| def calculate_spatial_features(satellite_data, indices): | |
| """Calculate spatial context features like gradients""" | |
| spatial_features = {} | |
| height, width = satellite_data.shape[1], satellite_data.shape[2] | |
| # 1. Gradient of Band 7 (NIR) | |
| b7_idx = min(7, satellite_data.shape[0] - 1) | |
| band = satellite_data[b7_idx].copy() | |
| band = np.nan_to_num(band, nan=0.0) | |
| try: | |
| # Calculate the gradient magnitude | |
| grad_y, grad_x = np.gradient(band) | |
| grad_magnitude = np.sqrt(grad_x**2 + grad_y**2) | |
| spatial_features['Gradient_B7'] = grad_magnitude | |
| except Exception as e: | |
| logger.warning(f"Error calculating band gradient: {e}") | |
| spatial_features['Gradient_B7'] = np.zeros((height, width), dtype=np.float32) | |
| # 2. NDVI gradient | |
| try: | |
| ndvi = indices.get('NDVI', np.zeros((height, width), dtype=np.float32)) | |
| ndvi = np.nan_to_num(ndvi, nan=0.0) | |
| # Calculate the gradient magnitude for NDVI | |
| grad_y, grad_x = np.gradient(ndvi) | |
| grad_magnitude = np.sqrt(grad_x**2 + grad_y**2) | |
| spatial_features['NDVI_gradient'] = grad_magnitude | |
| except Exception as e: | |
| logger.warning(f"Error calculating NDVI gradient: {e}") | |
| spatial_features['NDVI_gradient'] = np.zeros((height, width), dtype=np.float32) | |
| return spatial_features | |
| def calculate_pca_features(satellite_data, n_components=25): | |
| """Calculate PCA features from satellite bands""" | |
| pca_features = {} | |
| height, width = satellite_data.shape[1], satellite_data.shape[2] | |
| n_bands = satellite_data.shape[0] | |
| # If scikit-learn is not available, return placeholders | |
| if not SKLEARN_AVAILABLE: | |
| for i in range(1, n_components + 1): | |
| # Create some basic derived features as placeholders | |
| if i <= n_bands: | |
| # Use band values directly for first components | |
| pca_features[f'PCA_{i:02d}'] = satellite_data[i-1] | |
| else: | |
| # Create synthetic features for remaining components | |
| pca_features[f'PCA_{i:02d}'] = np.zeros((height, width), dtype=np.float32) | |
| return pca_features | |
| try: | |
| # Reshape for PCA (pixels x bands) | |
| bands_reshaped = satellite_data.reshape(n_bands, -1).T | |
| # Handle NaN values | |
| valid_mask = ~np.any(np.isnan(bands_reshaped), axis=1) | |
| bands_clean = bands_reshaped[valid_mask] | |
| if len(bands_clean) == 0: | |
| logger.warning("No valid data for PCA calculation") | |
| # Create placeholder PCA features | |
| for i in range(1, n_components + 1): | |
| pca_features[f'PCA_{i:02d}'] = np.zeros((height, width), dtype=np.float32) | |
| return pca_features | |
| # Standardize valid data | |
| scaler = StandardScaler() | |
| bands_scaled = scaler.fit_transform(bands_clean) | |
| # Calculate PCA | |
| pca = PCA(n_components=min(n_components, bands_scaled.shape[1], bands_scaled.shape[0])) | |
| pca_result = pca.fit_transform(bands_scaled) | |
| # Extend to full 25 components if needed | |
| actual_components = pca_result.shape[1] | |
| if actual_components < n_components: | |
| logger.warning(f"Only {actual_components} PCA components calculated, padding to {n_components}") | |
| padding = np.zeros((pca_result.shape[0], n_components - actual_components)) | |
| pca_result = np.hstack([pca_result, padding]) | |
| # Map back to original pixels | |
| pca_all = np.zeros((bands_reshaped.shape[0], n_components)) | |
| pca_all[valid_mask] = pca_result | |
| # Reshape to spatial dimensions | |
| pca_spatial = pca_all.reshape(height, width, n_components) | |
| # Store each component with the correct naming | |
| for i in range(1, n_components + 1): | |
| pca_features[f'PCA_{i:02d}'] = pca_spatial[:, :, i-1] | |
| # Log PCA explained variance | |
| if hasattr(pca, 'explained_variance_ratio_'): | |
| logger.info(f"PCA explained variance: {pca.explained_variance_ratio_.sum():.3f}") | |
| except Exception as e: | |
| logger.error(f"Error calculating PCA features: {e}") | |
| # Create placeholder PCA features | |
| for i in range(1, n_components + 1): | |
| pca_features[f'PCA_{i:02d}'] = np.zeros((height, width), dtype=np.float32) | |
| return pca_features | |
| def extract_all_features(satellite_data): | |
| """ | |
| Extract exactly 99 features needed by the model: | |
| - 59 original bands | |
| - 7 spectral indices | |
| - 6 texture features | |
| - 2 spatial features | |
| - 25 PCA components | |
| Parameters: | |
| satellite_data (ndarray): Array of shape (bands, height, width) | |
| Returns: | |
| features_array (ndarray): Array of shape (valid_pixels, 99) | |
| valid_mask (ndarray): Boolean mask of valid pixels | |
| feature_names (list): List of 99 feature names | |
| """ | |
| start_time = datetime.now() | |
| logger.info("Extracting features for biomass prediction...") | |
| height, width = satellite_data.shape[1], satellite_data.shape[2] | |
| # Create valid pixel mask (no NaN or Inf values) | |
| valid_mask = np.all(np.isfinite(satellite_data), axis=0) | |
| valid_y, valid_x = np.where(valid_mask) | |
| n_valid = len(valid_y) | |
| logger.info(f"Found {n_valid} valid pixels out of {height*width}") | |
| # Generate all feature categories | |
| logger.info("Calculating spectral indices...") | |
| indices = calculate_spectral_indices(satellite_data) | |
| logger.info("Extracting texture features...") | |
| texture_features = extract_texture_features(satellite_data) | |
| logger.info("Calculating spatial features...") | |
| spatial_features = calculate_spatial_features(satellite_data, indices) | |
| logger.info("Computing PCA components...") | |
| pca_features = calculate_pca_features(satellite_data) | |
| # Define the ordered list of feature names | |
| feature_names = [] | |
| # 1. Add original band names (Band_01 through Band_59) | |
| for i in range(1, 60): | |
| feature_names.append(f'Band_{i:02d}') | |
| # 2. Add spectral indices | |
| spectral_indices = ['NDVI', 'EVI', 'SAVI', 'MSAVI2', 'NDWI', 'NDMI', 'NBR'] | |
| feature_names.extend(spectral_indices) | |
| # 3. Add texture features | |
| texture_names = ['Sobel_B7', 'LBP_B7', 'GLCM_contrast_B7', 'GLCM_dissimilarity_B7', | |
| 'GLCM_homogeneity_B7', 'GLCM_energy_B7'] | |
| feature_names.extend(texture_names) | |
| # 4. Add spatial features | |
| spatial_names = ['Gradient_B7', 'NDVI_gradient'] | |
| feature_names.extend(spatial_names) | |
| # 5. Add PCA components | |
| for i in range(1, 26): | |
| feature_names.append(f'PCA_{i:02d}') | |
| # Create feature dictionary with all features | |
| all_features = {} | |
| # 1. Original bands | |
| for i in range(min(satellite_data.shape[0], 59)): | |
| all_features[f'Band_{i+1:02d}'] = satellite_data[i] | |
| # Pad with zeros if we have fewer than 59 bands | |
| for i in range(satellite_data.shape[0], 59): | |
| all_features[f'Band_{i+1:02d}'] = np.zeros((height, width), dtype=np.float32) | |
| # 2. Add other feature categories | |
| all_features.update(indices) | |
| all_features.update(texture_features) | |
| all_features.update(spatial_features) | |
| all_features.update(pca_features) | |
| # Verify we have exactly 99 features | |
| assert len(feature_names) == 99, f"Expected 99 features, but got {len(feature_names)}" | |
| # Extract feature values for valid pixels | |
| feature_matrix = np.zeros((n_valid, len(feature_names)), dtype=np.float32) | |
| for i, name in enumerate(feature_names): | |
| if name in all_features: | |
| feature_data = all_features[name] | |
| if feature_data.ndim == 2: | |
| feature_values = feature_data[valid_y, valid_x] | |
| else: | |
| feature_values = np.full(n_valid, feature_data) | |
| feature_matrix[:, i] = np.nan_to_num(feature_values, nan=0.0) | |
| else: | |
| logger.warning(f"Feature '{name}' not found, using zeros") | |
| feature_matrix[:, i] = 0.0 | |
| end_time = datetime.now() | |
| processing_time = (end_time - start_time).total_seconds() | |
| logger.info(f"Successfully extracted {len(feature_names)} features for {n_valid} pixels in {processing_time:.2f} seconds") | |
| return feature_matrix, valid_mask, feature_names | |
| # Simple test function | |
| def test_feature_extraction(): | |
| """Test the feature extraction pipeline with sample data""" | |
| try: | |
| # Create sample data (5 bands, 100x100 pixels) | |
| satellite_data = np.random.random((5, 100, 100)).astype(np.float32) | |
| # Extract features | |
| feature_matrix, valid_mask, feature_names = extract_all_features(satellite_data) | |
| # Print summary | |
| print(f"Sample data shape: {satellite_data.shape}") | |
| print(f"Feature matrix shape: {feature_matrix.shape}") | |
| print(f"Number of feature names: {len(feature_names)}") | |
| print(f"Valid pixels: {np.sum(valid_mask)}") | |
| return True | |
| except Exception as e: | |
| print(f"Feature extraction test failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return False | |
| if __name__ == "__main__": | |
| # Run a simple test if this script is executed directly | |
| test_feature_extraction() |