tsunami / src /tsuwave /signals /tidal_remove.py
Gitdeeper4's picture
رفع جميع ملفات TSU-WAVE مع YAML
12834b7
"""Tidal removal using harmonic analysis"""
import numpy as np
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple
import logging
logger = logging.getLogger(__name__)
class HarmonicAnalysis:
"""Harmonic analysis for tidal prediction"""
# Major tidal constituents
CONSTITUENTS = {
'M2': {'speed': 28.984104, 'name': 'Principal lunar semidiurnal'},
'S2': {'speed': 30.000000, 'name': 'Principal solar semidiurnal'},
'N2': {'speed': 28.439730, 'name': 'Larger lunar elliptic semidiurnal'},
'K1': {'speed': 15.041069, 'name': 'Lunar diurnal'},
'O1': {'speed': 13.943036, 'name': 'Lunar diurnal'},
'P1': {'speed': 14.958931, 'name': 'Solar diurnal'},
'K2': {'speed': 30.082137, 'name': 'Lunisolar semidiurnal'},
'M4': {'speed': 57.968208, 'name': 'Shallow water override'},
'MS4': {'speed': 58.984104, 'name': 'Shallow water override'},
}
def __init__(self, constituents: List[str] = None):
if constituents is None:
self.constituents = ['M2', 'S2', 'N2', 'K1', 'O1']
else:
self.constituents = constituents
self.amplitudes = {}
self.phases = {}
self.fitted = False
def fit(self, times: List[datetime], data: np.ndarray):
"""Fit harmonic constituents to data"""
n = len(data)
n_const = len(self.constituents)
# Convert times to hours from start
t0 = times[0]
hours = np.array([(t - t0).total_seconds() / 3600 for t in times])
# Build design matrix
A = np.zeros((n, 2 * n_const))
for i, const in enumerate(self.constituents):
speed = self.CONSTITUENTS[const]['speed'] # degrees per hour
angle = 2 * np.pi * speed * hours / 360 # convert to radians
A[:, 2*i] = np.cos(angle)
A[:, 2*i + 1] = np.sin(angle)
# Solve least squares
try:
x, residuals, rank, s = np.linalg.lstsq(A, data, rcond=None)
# Extract amplitudes and phases
for i, const in enumerate(self.constituents):
a = x[2*i]
b = x[2*i + 1]
self.amplitudes[const] = np.sqrt(a**2 + b**2)
self.phases[const] = np.arctan2(b, a)
self.fitted = True
logger.info(f"Fitted {len(self.constituents)} tidal constituents")
except Exception as e:
logger.error(f"Tidal fit failed: {e}")
self.fitted = False
def predict(self, times: List[datetime]) -> np.ndarray:
"""Predict tidal elevation"""
if not self.fitted:
raise ValueError("Model not fitted. Call fit() first.")
t0 = times[0]
hours = np.array([(t - t0).total_seconds() / 3600 for t in times])
prediction = np.zeros(len(times))
for const in self.constituents:
speed = self.CONSTITUENTS[const]['speed']
angle = 2 * np.pi * speed * hours / 360
prediction += self.amplitudes[const] * np.cos(angle - self.phases[const])
return prediction
def get_residual(self, times: List[datetime], data: np.ndarray) -> np.ndarray:
"""Get residual after removing tides"""
tide = self.predict(times)
return data - tide
class TidalRemover:
"""Remove tidal components from sea level data"""
def __init__(self, analysis_window: int = 30): # 30 days default
self.analysis_window = analysis_window
self.harmonic = HarmonicAnalysis()
self.last_fit_time = None
def remove_tides(self, times: List[datetime], data: np.ndarray) -> np.ndarray:
"""Remove tidal components from data"""
# Fit harmonic model
self.harmonic.fit(times, data)
# Predict and remove tides
tide = self.harmonic.predict(times)
residual = data - tide
# Detrend (remove any remaining linear trend)
trend = np.polyfit(range(len(residual)), residual, 1)
detrended = residual - np.polyval(trend, range(len(residual)))
logger.info(f"Removed tides: {np.std(tide):.2f} m tide, "
f"{np.std(residual):.2f} m residual")
return detrended
def rolling_remove(self, times: List[datetime], data: np.ndarray,
window_days: int = 30) -> np.ndarray:
"""Rolling window tidal removal for long time series"""
n = len(times)
result = np.zeros(n)
# Convert to timestamps
timestamps = np.array([t.timestamp() for t in times])
window_seconds = window_days * 24 * 3600
for i in range(n):
# Find data in window
window_start = timestamps[i] - window_seconds / 2
window_end = timestamps[i] + window_seconds / 2
mask = (timestamps >= window_start) & (timestamps <= window_end)
window_indices = np.where(mask)[0]
if len(window_indices) > 24: # need at least daily data
window_times = [times[j] for j in window_indices]
window_data = data[window_indices]
# Fit and remove tides for this window
harmonic = HarmonicAnalysis()
harmonic.fit(window_times, window_data)
tide = harmonic.predict([times[i]])
result[i] = data[i] - tide[0]
else:
result[i] = data[i] # fallback to original
return result
def detect_tsunami(self, times: List[datetime], data: np.ndarray,
threshold: float = 3.0) -> Tuple[np.ndarray, List[int]]:
"""Detect tsunami by removing tides and finding anomalies"""
# Remove tides
residual = self.remove_tides(times, data)
# Compute statistics
std = np.std(residual)
mean = np.mean(residual)
# Find anomalies (potential tsunami arrivals)
anomalies = []
for i, val in enumerate(residual):
if abs(val - mean) > threshold * std:
anomalies.append(i)
logger.info(f"Detected {len(anomalies)} potential tsunami arrivals")
return residual, anomalies
def filter_tsunami_band(self, data: np.ndarray, fs: float) -> np.ndarray:
"""Apply tsunami band filter after tidal removal"""
from .bandpass import TsunamiBandFilter
filter = TsunamiBandFilter()
return filter.process(data, fs)
def process(self, times: List[datetime], data: np.ndarray,
fs: float) -> Dict:
"""Complete processing: remove tides + tsunami band filter"""
# Remove tides
detided = self.remove_tides(times, data)
# Apply tsunami band filter
filtered = self.filter_tsunami_band(detided, fs)
# Detect arrivals
_, arrivals = self.detect_tsunami(times, filtered)
return {
'original': data,
'detided': detided,
'filtered': filtered,
'arrival_indices': arrivals,
'arrival_times': [times[i] for i in arrivals] if arrivals else []
}