Spaces:
Sleeping
Sleeping
| """Tidal removal using harmonic analysis""" | |
| import numpy as np | |
| from datetime import datetime, timedelta | |
| from typing import List, Dict, Optional, Tuple | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class HarmonicAnalysis: | |
| """Harmonic analysis for tidal prediction""" | |
| # Major tidal constituents | |
| CONSTITUENTS = { | |
| 'M2': {'speed': 28.984104, 'name': 'Principal lunar semidiurnal'}, | |
| 'S2': {'speed': 30.000000, 'name': 'Principal solar semidiurnal'}, | |
| 'N2': {'speed': 28.439730, 'name': 'Larger lunar elliptic semidiurnal'}, | |
| 'K1': {'speed': 15.041069, 'name': 'Lunar diurnal'}, | |
| 'O1': {'speed': 13.943036, 'name': 'Lunar diurnal'}, | |
| 'P1': {'speed': 14.958931, 'name': 'Solar diurnal'}, | |
| 'K2': {'speed': 30.082137, 'name': 'Lunisolar semidiurnal'}, | |
| 'M4': {'speed': 57.968208, 'name': 'Shallow water override'}, | |
| 'MS4': {'speed': 58.984104, 'name': 'Shallow water override'}, | |
| } | |
| def __init__(self, constituents: List[str] = None): | |
| if constituents is None: | |
| self.constituents = ['M2', 'S2', 'N2', 'K1', 'O1'] | |
| else: | |
| self.constituents = constituents | |
| self.amplitudes = {} | |
| self.phases = {} | |
| self.fitted = False | |
| def fit(self, times: List[datetime], data: np.ndarray): | |
| """Fit harmonic constituents to data""" | |
| n = len(data) | |
| n_const = len(self.constituents) | |
| # Convert times to hours from start | |
| t0 = times[0] | |
| hours = np.array([(t - t0).total_seconds() / 3600 for t in times]) | |
| # Build design matrix | |
| A = np.zeros((n, 2 * n_const)) | |
| for i, const in enumerate(self.constituents): | |
| speed = self.CONSTITUENTS[const]['speed'] # degrees per hour | |
| angle = 2 * np.pi * speed * hours / 360 # convert to radians | |
| A[:, 2*i] = np.cos(angle) | |
| A[:, 2*i + 1] = np.sin(angle) | |
| # Solve least squares | |
| try: | |
| x, residuals, rank, s = np.linalg.lstsq(A, data, rcond=None) | |
| # Extract amplitudes and phases | |
| for i, const in enumerate(self.constituents): | |
| a = x[2*i] | |
| b = x[2*i + 1] | |
| self.amplitudes[const] = np.sqrt(a**2 + b**2) | |
| self.phases[const] = np.arctan2(b, a) | |
| self.fitted = True | |
| logger.info(f"Fitted {len(self.constituents)} tidal constituents") | |
| except Exception as e: | |
| logger.error(f"Tidal fit failed: {e}") | |
| self.fitted = False | |
| def predict(self, times: List[datetime]) -> np.ndarray: | |
| """Predict tidal elevation""" | |
| if not self.fitted: | |
| raise ValueError("Model not fitted. Call fit() first.") | |
| t0 = times[0] | |
| hours = np.array([(t - t0).total_seconds() / 3600 for t in times]) | |
| prediction = np.zeros(len(times)) | |
| for const in self.constituents: | |
| speed = self.CONSTITUENTS[const]['speed'] | |
| angle = 2 * np.pi * speed * hours / 360 | |
| prediction += self.amplitudes[const] * np.cos(angle - self.phases[const]) | |
| return prediction | |
| def get_residual(self, times: List[datetime], data: np.ndarray) -> np.ndarray: | |
| """Get residual after removing tides""" | |
| tide = self.predict(times) | |
| return data - tide | |
| class TidalRemover: | |
| """Remove tidal components from sea level data""" | |
| def __init__(self, analysis_window: int = 30): # 30 days default | |
| self.analysis_window = analysis_window | |
| self.harmonic = HarmonicAnalysis() | |
| self.last_fit_time = None | |
| def remove_tides(self, times: List[datetime], data: np.ndarray) -> np.ndarray: | |
| """Remove tidal components from data""" | |
| # Fit harmonic model | |
| self.harmonic.fit(times, data) | |
| # Predict and remove tides | |
| tide = self.harmonic.predict(times) | |
| residual = data - tide | |
| # Detrend (remove any remaining linear trend) | |
| trend = np.polyfit(range(len(residual)), residual, 1) | |
| detrended = residual - np.polyval(trend, range(len(residual))) | |
| logger.info(f"Removed tides: {np.std(tide):.2f} m tide, " | |
| f"{np.std(residual):.2f} m residual") | |
| return detrended | |
| def rolling_remove(self, times: List[datetime], data: np.ndarray, | |
| window_days: int = 30) -> np.ndarray: | |
| """Rolling window tidal removal for long time series""" | |
| n = len(times) | |
| result = np.zeros(n) | |
| # Convert to timestamps | |
| timestamps = np.array([t.timestamp() for t in times]) | |
| window_seconds = window_days * 24 * 3600 | |
| for i in range(n): | |
| # Find data in window | |
| window_start = timestamps[i] - window_seconds / 2 | |
| window_end = timestamps[i] + window_seconds / 2 | |
| mask = (timestamps >= window_start) & (timestamps <= window_end) | |
| window_indices = np.where(mask)[0] | |
| if len(window_indices) > 24: # need at least daily data | |
| window_times = [times[j] for j in window_indices] | |
| window_data = data[window_indices] | |
| # Fit and remove tides for this window | |
| harmonic = HarmonicAnalysis() | |
| harmonic.fit(window_times, window_data) | |
| tide = harmonic.predict([times[i]]) | |
| result[i] = data[i] - tide[0] | |
| else: | |
| result[i] = data[i] # fallback to original | |
| return result | |
| def detect_tsunami(self, times: List[datetime], data: np.ndarray, | |
| threshold: float = 3.0) -> Tuple[np.ndarray, List[int]]: | |
| """Detect tsunami by removing tides and finding anomalies""" | |
| # Remove tides | |
| residual = self.remove_tides(times, data) | |
| # Compute statistics | |
| std = np.std(residual) | |
| mean = np.mean(residual) | |
| # Find anomalies (potential tsunami arrivals) | |
| anomalies = [] | |
| for i, val in enumerate(residual): | |
| if abs(val - mean) > threshold * std: | |
| anomalies.append(i) | |
| logger.info(f"Detected {len(anomalies)} potential tsunami arrivals") | |
| return residual, anomalies | |
| def filter_tsunami_band(self, data: np.ndarray, fs: float) -> np.ndarray: | |
| """Apply tsunami band filter after tidal removal""" | |
| from .bandpass import TsunamiBandFilter | |
| filter = TsunamiBandFilter() | |
| return filter.process(data, fs) | |
| def process(self, times: List[datetime], data: np.ndarray, | |
| fs: float) -> Dict: | |
| """Complete processing: remove tides + tsunami band filter""" | |
| # Remove tides | |
| detided = self.remove_tides(times, data) | |
| # Apply tsunami band filter | |
| filtered = self.filter_tsunami_band(detided, fs) | |
| # Detect arrivals | |
| _, arrivals = self.detect_tsunami(times, filtered) | |
| return { | |
| 'original': data, | |
| 'detided': detided, | |
| 'filtered': filtered, | |
| 'arrival_indices': arrivals, | |
| 'arrival_times': [times[i] for i in arrivals] if arrivals else [] | |
| } | |