Spaces:
Sleeping
Sleeping
| """Spectral Dispersion Bandwidth (SDB) | |
| Tracks spectral spreading of wave energy and nonlinear harmonic energy transfer. | |
| SDB = Δf₉₅ / f_peak | |
| High Threat: SDB < 1.0 (narrow-band coherent bore) | |
| Reduced Threat: SDB > 3.5 (broad dispersed packet) | |
| """ | |
| import numpy as np | |
| from scipy import signal, fft | |
| class SpectralDispersionBandwidth: | |
| """ | |
| Spectral Dispersion Bandwidth - Parameter 5 of 7 | |
| Thresholds: | |
| CRITICAL: SDB < 1.0 (narrow-band coherent bore - HIGH THREAT) | |
| ALERT: 1.0 ≤ SDB < 2.5 (moderate bandwidth) | |
| MONITOR: 2.5 ≤ SDB < 3.5 (broad bandwidth) | |
| SAFE: SDB ≥ 3.5 (broad dispersed packet - REDUCED THREAT) | |
| Second harmonic onset at h/H₀ > 0.35 → F₂ > 15% | |
| """ | |
| def __init__(self, fs=1.0/60): # Default sampling: 1 minute | |
| self.fs = fs # Sampling frequency [Hz] | |
| self.thresholds = { | |
| 'critical': 1.0, | |
| 'alert': 2.5, | |
| 'monitor': 3.5, | |
| 'safe': 3.5 | |
| } | |
| def compute_spectrum(self, signal_data, window='hann'): | |
| """Compute power spectral density | |
| Args: | |
| signal_data: 1D array of sea surface elevation [m] | |
| window: window function type | |
| Returns: | |
| freqs: frequency array [Hz] | |
| psd: power spectral density | |
| """ | |
| # Apply window function | |
| if window == 'hann': | |
| win = np.hanning(len(signal_data)) | |
| elif window == 'hamming': | |
| win = np.hamming(len(signal_data)) | |
| else: | |
| win = np.ones(len(signal_data)) | |
| windowed = signal_data * win | |
| # Compute FFT | |
| n = len(windowed) | |
| fft_vals = fft.fft(windowed) | |
| fft_vals = fft_vals[:n//2] | |
| # Compute PSD | |
| psd = np.abs(fft_vals)**2 / (self.fs * n) | |
| psd[1:-1] *= 2 # Double for one-sided spectrum | |
| # Frequency array | |
| freqs = fft.fftfreq(n, 1/self.fs)[:n//2] | |
| return freqs, psd | |
| def find_spectral_peak(self, freqs, psd, f_min=0, f_max=0.01): | |
| """Find spectral peak in tsunami band (0.2-5 mHz) | |
| Args: | |
| freqs: frequency array [Hz] | |
| psd: power spectral density | |
| f_min, f_max: frequency range for peak search [Hz] | |
| Returns: | |
| f_peak: peak frequency [Hz] | |
| E_peak: peak energy density | |
| """ | |
| # Tsunami band: 0.2-5 mHz (T = 3-30 min) | |
| mask = (freqs >= f_min) & (freqs <= f_max) | |
| if not np.any(mask): | |
| return None, None | |
| band_freqs = freqs[mask] | |
| band_psd = psd[mask] | |
| peak_idx = np.argmax(band_psd) | |
| f_peak = band_freqs[peak_idx] | |
| E_peak = band_psd[peak_idx] | |
| return f_peak, E_peak | |
| def compute_bandwidth(self, freqs, psd, f_peak, threshold=0.95): | |
| """Compute frequency bandwidth containing threshold% of energy | |
| Args: | |
| freqs: frequency array [Hz] | |
| psd: power spectral density | |
| f_peak: peak frequency [Hz] | |
| threshold: energy threshold (default 0.95 for 95%) | |
| Returns: | |
| bandwidth: frequency bandwidth [Hz] | |
| """ | |
| # Compute cumulative energy | |
| total_energy = np.sum(psd) | |
| cum_energy = np.cumsum(psd) / total_energy | |
| # Find frequencies at threshold/2 and 1-threshold/2 | |
| f_low_idx = np.where(cum_energy >= (1 - threshold)/2)[0] | |
| f_high_idx = np.where(cum_energy >= (1 + threshold)/2)[0] | |
| if len(f_low_idx) > 0 and len(f_high_idx) > 0: | |
| f_low = freqs[f_low_idx[0]] | |
| f_high = freqs[f_high_idx[0]] | |
| bandwidth = f_high - f_low | |
| else: | |
| bandwidth = np.nan | |
| return bandwidth | |
| def compute_sdb(self, signal_data, fs=None): | |
| """Compute Spectral Dispersion Bandwidth | |
| SDB = Δf₉₅ / f_peak | |
| Args: | |
| signal_data: 1D array of sea surface elevation [m] | |
| fs: sampling frequency [Hz] (optional) | |
| Returns: | |
| dict with SDB value, status, and spectral components | |
| """ | |
| if fs is not None: | |
| self.fs = fs | |
| # Compute spectrum | |
| freqs, psd = self.compute_spectrum(signal_data) | |
| # Find spectral peak in tsunami band (0.2-5 mHz) | |
| f_peak, E_peak = self.find_spectral_peak(freqs, psd, 0.2e-3, 5e-3) | |
| if f_peak is None: | |
| return { | |
| 'value': None, | |
| 'status': 'UNKNOWN', | |
| 'error': 'No significant spectral peak found' | |
| } | |
| # Compute 95% bandwidth | |
| bandwidth = self.compute_bandwidth(freqs, psd, f_peak, 0.95) | |
| if np.isnan(bandwidth): | |
| return { | |
| 'value': None, | |
| 'status': 'UNKNOWN', | |
| 'error': 'Could not compute bandwidth' | |
| } | |
| sdb = bandwidth / f_peak | |
| # Compute harmonic energy fractions | |
| harmonic_fractions = self.compute_harmonic_fractions(freqs, psd, f_peak) | |
| return { | |
| 'value': float(sdb), | |
| 'status': self.get_status(sdb), | |
| 'peak_frequency_hz': float(f_peak), | |
| 'peak_period_min': float(1/(f_peak * 60)), | |
| 'bandwidth_hz': float(bandwidth), | |
| 'harmonic_fractions': harmonic_fractions, | |
| 'threat_level': self.get_threat_level(sdb) | |
| } | |
| def get_status(self, sdb): | |
| """Get alert status based on SDB value""" | |
| if sdb < self.thresholds['critical']: | |
| return 'CRITICAL' | |
| elif sdb < self.thresholds['alert']: | |
| return 'ALERT' | |
| elif sdb < self.thresholds['monitor']: | |
| return 'MONITOR' | |
| else: | |
| return 'SAFE' | |
| def get_threat_level(self, sdb): | |
| """Get descriptive threat level""" | |
| if sdb < 1.0: | |
| return "HIGH THREAT - Narrow-band coherent bore" | |
| elif sdb < 2.5: | |
| return "MODERATE THREAT - Moderate bandwidth" | |
| elif sdb < 3.5: | |
| return "LOW THREAT - Broad bandwidth" | |
| else: | |
| return "REDUCED THREAT - Broad dispersed packet" | |
| def compute_harmonic_fractions(self, freqs, psd, f_peak): | |
| """Compute energy fractions in harmonics | |
| Args: | |
| freqs: frequency array | |
| psd: power spectral density | |
| f_peak: fundamental frequency | |
| Returns: | |
| dict with harmonic fractions | |
| """ | |
| fractions = {} | |
| for harmonic in [1, 2, 3, 4]: | |
| f_harm = harmonic * f_peak | |
| # Find nearest frequency | |
| idx = np.argmin(np.abs(freqs - f_harm)) | |
| if idx < len(psd): | |
| fractions[f'H{harmonic}'] = float(psd[idx] / np.sum(psd)) | |
| # Check for nonlinear transfer (F₂ > 15%) | |
| if fractions.get('H2', 0) > 0.15: | |
| fractions['nonlinear_transfer'] = 'DETECTED' | |
| else: | |
| fractions['nonlinear_transfer'] = 'NOT_DETECTED' | |
| return fractions | |
| def validate_event(self, signal_data, expected_fpeak=None): | |
| """Validate SDB against expected values""" | |
| result = self.compute_sdb(signal_data) | |
| validation = { | |
| 'sdb': result, | |
| 'validation': {} | |
| } | |
| if expected_fpeak is not None: | |
| error = abs(result['peak_frequency_hz'] - expected_fpeak) / expected_fpeak * 100 | |
| validation['validation']['expected_fpeak'] = expected_fpeak | |
| validation['validation']['error_percent'] = error | |
| return validation | |