"""Spectral Dispersion Bandwidth (SDB) Tracks spectral spreading of wave energy and nonlinear harmonic energy transfer. SDB = Δf₉₅ / f_peak High Threat: SDB < 1.0 (narrow-band coherent bore) Reduced Threat: SDB > 3.5 (broad dispersed packet) """ import numpy as np from scipy import signal, fft class SpectralDispersionBandwidth: """ Spectral Dispersion Bandwidth - Parameter 5 of 7 Thresholds: CRITICAL: SDB < 1.0 (narrow-band coherent bore - HIGH THREAT) ALERT: 1.0 ≤ SDB < 2.5 (moderate bandwidth) MONITOR: 2.5 ≤ SDB < 3.5 (broad bandwidth) SAFE: SDB ≥ 3.5 (broad dispersed packet - REDUCED THREAT) Second harmonic onset at h/H₀ > 0.35 → F₂ > 15% """ def __init__(self, fs=1.0/60): # Default sampling: 1 minute self.fs = fs # Sampling frequency [Hz] self.thresholds = { 'critical': 1.0, 'alert': 2.5, 'monitor': 3.5, 'safe': 3.5 } def compute_spectrum(self, signal_data, window='hann'): """Compute power spectral density Args: signal_data: 1D array of sea surface elevation [m] window: window function type Returns: freqs: frequency array [Hz] psd: power spectral density """ # Apply window function if window == 'hann': win = np.hanning(len(signal_data)) elif window == 'hamming': win = np.hamming(len(signal_data)) else: win = np.ones(len(signal_data)) windowed = signal_data * win # Compute FFT n = len(windowed) fft_vals = fft.fft(windowed) fft_vals = fft_vals[:n//2] # Compute PSD psd = np.abs(fft_vals)**2 / (self.fs * n) psd[1:-1] *= 2 # Double for one-sided spectrum # Frequency array freqs = fft.fftfreq(n, 1/self.fs)[:n//2] return freqs, psd def find_spectral_peak(self, freqs, psd, f_min=0, f_max=0.01): """Find spectral peak in tsunami band (0.2-5 mHz) Args: freqs: frequency array [Hz] psd: power spectral density f_min, f_max: frequency range for peak search [Hz] Returns: f_peak: peak frequency [Hz] E_peak: peak energy density """ # Tsunami band: 0.2-5 mHz (T = 3-30 min) mask = (freqs >= f_min) & (freqs <= f_max) if not np.any(mask): return None, None band_freqs = freqs[mask] band_psd = psd[mask] peak_idx = np.argmax(band_psd) f_peak = band_freqs[peak_idx] E_peak = band_psd[peak_idx] return f_peak, E_peak def compute_bandwidth(self, freqs, psd, f_peak, threshold=0.95): """Compute frequency bandwidth containing threshold% of energy Args: freqs: frequency array [Hz] psd: power spectral density f_peak: peak frequency [Hz] threshold: energy threshold (default 0.95 for 95%) Returns: bandwidth: frequency bandwidth [Hz] """ # Compute cumulative energy total_energy = np.sum(psd) cum_energy = np.cumsum(psd) / total_energy # Find frequencies at threshold/2 and 1-threshold/2 f_low_idx = np.where(cum_energy >= (1 - threshold)/2)[0] f_high_idx = np.where(cum_energy >= (1 + threshold)/2)[0] if len(f_low_idx) > 0 and len(f_high_idx) > 0: f_low = freqs[f_low_idx[0]] f_high = freqs[f_high_idx[0]] bandwidth = f_high - f_low else: bandwidth = np.nan return bandwidth def compute_sdb(self, signal_data, fs=None): """Compute Spectral Dispersion Bandwidth SDB = Δf₉₅ / f_peak Args: signal_data: 1D array of sea surface elevation [m] fs: sampling frequency [Hz] (optional) Returns: dict with SDB value, status, and spectral components """ if fs is not None: self.fs = fs # Compute spectrum freqs, psd = self.compute_spectrum(signal_data) # Find spectral peak in tsunami band (0.2-5 mHz) f_peak, E_peak = self.find_spectral_peak(freqs, psd, 0.2e-3, 5e-3) if f_peak is None: return { 'value': None, 'status': 'UNKNOWN', 'error': 'No significant spectral peak found' } # Compute 95% bandwidth bandwidth = self.compute_bandwidth(freqs, psd, f_peak, 0.95) if np.isnan(bandwidth): return { 'value': None, 'status': 'UNKNOWN', 'error': 'Could not compute bandwidth' } sdb = bandwidth / f_peak # Compute harmonic energy fractions harmonic_fractions = self.compute_harmonic_fractions(freqs, psd, f_peak) return { 'value': float(sdb), 'status': self.get_status(sdb), 'peak_frequency_hz': float(f_peak), 'peak_period_min': float(1/(f_peak * 60)), 'bandwidth_hz': float(bandwidth), 'harmonic_fractions': harmonic_fractions, 'threat_level': self.get_threat_level(sdb) } def get_status(self, sdb): """Get alert status based on SDB value""" if sdb < self.thresholds['critical']: return 'CRITICAL' elif sdb < self.thresholds['alert']: return 'ALERT' elif sdb < self.thresholds['monitor']: return 'MONITOR' else: return 'SAFE' def get_threat_level(self, sdb): """Get descriptive threat level""" if sdb < 1.0: return "HIGH THREAT - Narrow-band coherent bore" elif sdb < 2.5: return "MODERATE THREAT - Moderate bandwidth" elif sdb < 3.5: return "LOW THREAT - Broad bandwidth" else: return "REDUCED THREAT - Broad dispersed packet" def compute_harmonic_fractions(self, freqs, psd, f_peak): """Compute energy fractions in harmonics Args: freqs: frequency array psd: power spectral density f_peak: fundamental frequency Returns: dict with harmonic fractions """ fractions = {} for harmonic in [1, 2, 3, 4]: f_harm = harmonic * f_peak # Find nearest frequency idx = np.argmin(np.abs(freqs - f_harm)) if idx < len(psd): fractions[f'H{harmonic}'] = float(psd[idx] / np.sum(psd)) # Check for nonlinear transfer (F₂ > 15%) if fractions.get('H2', 0) > 0.15: fractions['nonlinear_transfer'] = 'DETECTED' else: fractions['nonlinear_transfer'] = 'NOT_DETECTED' return fractions def validate_event(self, signal_data, expected_fpeak=None): """Validate SDB against expected values""" result = self.compute_sdb(signal_data) validation = { 'sdb': result, 'validation': {} } if expected_fpeak is not None: error = abs(result['peak_frequency_hz'] - expected_fpeak) / expected_fpeak * 100 validation['validation']['expected_fpeak'] = expected_fpeak validation['validation']['error_percent'] = error return validation