Gitdeeper4's picture
رفع جميع ملفات TSU-WAVE مع YAML
12834b7
"""Spectral Dispersion Bandwidth (SDB)
Tracks spectral spreading of wave energy and nonlinear harmonic energy transfer.
SDB = Δf₉₅ / f_peak
High Threat: SDB < 1.0 (narrow-band coherent bore)
Reduced Threat: SDB > 3.5 (broad dispersed packet)
"""
import numpy as np
from scipy import signal, fft
class SpectralDispersionBandwidth:
"""
Spectral Dispersion Bandwidth - Parameter 5 of 7
Thresholds:
CRITICAL: SDB < 1.0 (narrow-band coherent bore - HIGH THREAT)
ALERT: 1.0 ≤ SDB < 2.5 (moderate bandwidth)
MONITOR: 2.5 ≤ SDB < 3.5 (broad bandwidth)
SAFE: SDB ≥ 3.5 (broad dispersed packet - REDUCED THREAT)
Second harmonic onset at h/H₀ > 0.35 → F₂ > 15%
"""
def __init__(self, fs=1.0/60): # Default sampling: 1 minute
self.fs = fs # Sampling frequency [Hz]
self.thresholds = {
'critical': 1.0,
'alert': 2.5,
'monitor': 3.5,
'safe': 3.5
}
def compute_spectrum(self, signal_data, window='hann'):
"""Compute power spectral density
Args:
signal_data: 1D array of sea surface elevation [m]
window: window function type
Returns:
freqs: frequency array [Hz]
psd: power spectral density
"""
# Apply window function
if window == 'hann':
win = np.hanning(len(signal_data))
elif window == 'hamming':
win = np.hamming(len(signal_data))
else:
win = np.ones(len(signal_data))
windowed = signal_data * win
# Compute FFT
n = len(windowed)
fft_vals = fft.fft(windowed)
fft_vals = fft_vals[:n//2]
# Compute PSD
psd = np.abs(fft_vals)**2 / (self.fs * n)
psd[1:-1] *= 2 # Double for one-sided spectrum
# Frequency array
freqs = fft.fftfreq(n, 1/self.fs)[:n//2]
return freqs, psd
def find_spectral_peak(self, freqs, psd, f_min=0, f_max=0.01):
"""Find spectral peak in tsunami band (0.2-5 mHz)
Args:
freqs: frequency array [Hz]
psd: power spectral density
f_min, f_max: frequency range for peak search [Hz]
Returns:
f_peak: peak frequency [Hz]
E_peak: peak energy density
"""
# Tsunami band: 0.2-5 mHz (T = 3-30 min)
mask = (freqs >= f_min) & (freqs <= f_max)
if not np.any(mask):
return None, None
band_freqs = freqs[mask]
band_psd = psd[mask]
peak_idx = np.argmax(band_psd)
f_peak = band_freqs[peak_idx]
E_peak = band_psd[peak_idx]
return f_peak, E_peak
def compute_bandwidth(self, freqs, psd, f_peak, threshold=0.95):
"""Compute frequency bandwidth containing threshold% of energy
Args:
freqs: frequency array [Hz]
psd: power spectral density
f_peak: peak frequency [Hz]
threshold: energy threshold (default 0.95 for 95%)
Returns:
bandwidth: frequency bandwidth [Hz]
"""
# Compute cumulative energy
total_energy = np.sum(psd)
cum_energy = np.cumsum(psd) / total_energy
# Find frequencies at threshold/2 and 1-threshold/2
f_low_idx = np.where(cum_energy >= (1 - threshold)/2)[0]
f_high_idx = np.where(cum_energy >= (1 + threshold)/2)[0]
if len(f_low_idx) > 0 and len(f_high_idx) > 0:
f_low = freqs[f_low_idx[0]]
f_high = freqs[f_high_idx[0]]
bandwidth = f_high - f_low
else:
bandwidth = np.nan
return bandwidth
def compute_sdb(self, signal_data, fs=None):
"""Compute Spectral Dispersion Bandwidth
SDB = Δf₉₅ / f_peak
Args:
signal_data: 1D array of sea surface elevation [m]
fs: sampling frequency [Hz] (optional)
Returns:
dict with SDB value, status, and spectral components
"""
if fs is not None:
self.fs = fs
# Compute spectrum
freqs, psd = self.compute_spectrum(signal_data)
# Find spectral peak in tsunami band (0.2-5 mHz)
f_peak, E_peak = self.find_spectral_peak(freqs, psd, 0.2e-3, 5e-3)
if f_peak is None:
return {
'value': None,
'status': 'UNKNOWN',
'error': 'No significant spectral peak found'
}
# Compute 95% bandwidth
bandwidth = self.compute_bandwidth(freqs, psd, f_peak, 0.95)
if np.isnan(bandwidth):
return {
'value': None,
'status': 'UNKNOWN',
'error': 'Could not compute bandwidth'
}
sdb = bandwidth / f_peak
# Compute harmonic energy fractions
harmonic_fractions = self.compute_harmonic_fractions(freqs, psd, f_peak)
return {
'value': float(sdb),
'status': self.get_status(sdb),
'peak_frequency_hz': float(f_peak),
'peak_period_min': float(1/(f_peak * 60)),
'bandwidth_hz': float(bandwidth),
'harmonic_fractions': harmonic_fractions,
'threat_level': self.get_threat_level(sdb)
}
def get_status(self, sdb):
"""Get alert status based on SDB value"""
if sdb < self.thresholds['critical']:
return 'CRITICAL'
elif sdb < self.thresholds['alert']:
return 'ALERT'
elif sdb < self.thresholds['monitor']:
return 'MONITOR'
else:
return 'SAFE'
def get_threat_level(self, sdb):
"""Get descriptive threat level"""
if sdb < 1.0:
return "HIGH THREAT - Narrow-band coherent bore"
elif sdb < 2.5:
return "MODERATE THREAT - Moderate bandwidth"
elif sdb < 3.5:
return "LOW THREAT - Broad bandwidth"
else:
return "REDUCED THREAT - Broad dispersed packet"
def compute_harmonic_fractions(self, freqs, psd, f_peak):
"""Compute energy fractions in harmonics
Args:
freqs: frequency array
psd: power spectral density
f_peak: fundamental frequency
Returns:
dict with harmonic fractions
"""
fractions = {}
for harmonic in [1, 2, 3, 4]:
f_harm = harmonic * f_peak
# Find nearest frequency
idx = np.argmin(np.abs(freqs - f_harm))
if idx < len(psd):
fractions[f'H{harmonic}'] = float(psd[idx] / np.sum(psd))
# Check for nonlinear transfer (F₂ > 15%)
if fractions.get('H2', 0) > 0.15:
fractions['nonlinear_transfer'] = 'DETECTED'
else:
fractions['nonlinear_transfer'] = 'NOT_DETECTED'
return fractions
def validate_event(self, signal_data, expected_fpeak=None):
"""Validate SDB against expected values"""
result = self.compute_sdb(signal_data)
validation = {
'sdb': result,
'validation': {}
}
if expected_fpeak is not None:
error = abs(result['peak_frequency_hz'] - expected_fpeak) / expected_fpeak * 100
validation['validation']['expected_fpeak'] = expected_fpeak
validation['validation']['error_percent'] = error
return validation