File size: 2,655 Bytes
b4123b8
dd1d7f5
b4123b8
 
 
dd1d7f5
b4123b8
 
 
 
 
 
dd1d7f5
b4123b8
c170961
dd1d7f5
 
 
b4123b8
 
 
dd1d7f5
c170961
b4123b8
 
 
 
dd1d7f5
c170961
b4123b8
 
 
dd1d7f5
 
b4123b8
 
 
 
dd1d7f5
b4123b8
 
 
 
 
 
 
 
dd1d7f5
b4123b8
 
dd1d7f5
 
b4123b8
 
 
 
 
 
 
 
dd1d7f5
b4123b8
 
 
 
 
 
 
 
 
dd1d7f5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
"""
Minimal vegetation index extraction (NDVI, ARI, GNDVI only).
"""

import numpy as np
from typing import Dict, Any
import logging

logger = logging.getLogger(__name__)


class VegetationIndexExtractor:
    """Minimal vegetation index extraction."""
    
    def __init__(self, epsilon: float = 1e-10, soil_factor: float = 0.5):
        """Initialize with defaults."""
        self.epsilon = epsilon
        self.soil_factor = soil_factor
        
        self.index_formulas = {
            "NDVI": lambda nir, red: (nir - red) / (nir + red + self.epsilon),
            "GNDVI": lambda nir, green: (nir - green) / (nir + green + self.epsilon),
            "SAVI": lambda nir, red: ((nir - red) / (nir + red + self.soil_factor)) * (1.0 + self.soil_factor),
        }
        
        self.index_bands = {
            "NDVI": ["nir", "red"],
            "GNDVI": ["nir", "green"],
            "SAVI": ["nir", "red"],
        }
    
    def compute_vegetation_indices(self, spectral_stack: Dict[str, np.ndarray], 
                                  mask: np.ndarray) -> Dict[str, Dict[str, Any]]:
        """Compute NDVI, ARI, and GNDVI."""
        indices = {}
        
        for index_name, formula in self.index_formulas.items():
            try:
                required_bands = self.index_bands[index_name]
                if not all(band in spectral_stack for band in required_bands):
                    continue
                
                band_data = []
                for band in required_bands:
                    arr = spectral_stack[band]
                    if isinstance(arr, np.ndarray):
                        arr = arr.squeeze(-1)
                    band_data.append(np.asarray(arr, dtype=np.float64))
                
                index_values = formula(*band_data).astype(np.float64)
                binary_mask = (np.asarray(mask).astype(np.int32) > 0)
                masked_values = np.where(binary_mask, index_values, np.nan)
                
                valid_values = masked_values[~np.isnan(masked_values)]
                if len(valid_values) > 0:
                    stats = {
                        'mean': float(np.mean(valid_values)),
                        'std': float(np.std(valid_values)),
                    }
                else:
                    stats = {'mean': 0.0, 'std': 0.0}
                
                indices[index_name] = {
                    'values': masked_values,
                    'statistics': stats
                }
                
            except Exception as e:
                logger.error(f"Failed to compute {index_name}: {e}")
        
        return indices