| """ |
| Wavelet Denoising for Financial Time Series |
| ============================================= |
| DWT denoising with Daubechies db4 wavelet, soft-thresholding (Donoho-Johnstone). |
| Reference: arxiv:2408.12408 Section III-C |
| |
| Usage: |
| from wavelet_denoise import wavelet_denoise, preprocess_ohlcv |
| |
| # Single signal |
| denoised_close = wavelet_denoise(df['Close'].values) |
| |
| # Full OHLCV DataFrame |
| df_denoised, scaler = preprocess_ohlcv(df) |
| """ |
|
|
| import numpy as np |
| import pywt |
|
|
|
|
| def wavelet_denoise(signal: np.ndarray, wavelet: str = 'db4', level: int = None) -> np.ndarray: |
| """ |
| DWT denoising: db4 wavelet, soft-thresholding, zero high-freq detail coefficients. |
| |
| From: arxiv:2408.12408 Section III-C (best configuration for S&P 500). |
| |
| Algorithm: |
| 1. Pad signal to power-of-2 length (mitigate boundary effects) |
| 2. Decompose with DWT (db4 wavelet) |
| 3. Estimate noise σ via MAD of finest detail coefficients |
| 4. Compute universal threshold (Donoho-Johnstone): σ * √(2·log(N)) |
| 5. Soft-threshold all detail coefficients (keep approximation untouched) |
| 6. Reconstruct and trim to original length |
| |
| Args: |
| signal: 1D numpy array of time series values |
| wavelet: Wavelet family to use (default: 'db4' — Daubechies 4) |
| level: Decomposition level (default: maximum possible) |
| |
| Returns: |
| Denoised signal of same length as input |
| """ |
| n = len(signal) |
| |
| |
| pad_len = 2 ** int(np.ceil(np.log2(n))) - n |
| padded = np.pad(signal, (0, pad_len), mode='edge') |
| |
| |
| if level is None: |
| level = pywt.dwt_max_level(len(padded), wavelet) |
| coeffs = pywt.wavedec(padded, wavelet, level=level) |
| |
| |
| sigma = np.median(np.abs(coeffs[-1])) / 0.6745 |
| |
| |
| threshold = sigma * np.sqrt(2 * np.log(len(padded))) |
| |
| |
| coeffs_thresh = [coeffs[0]] |
| for detail in coeffs[1:]: |
| coeffs_thresh.append(pywt.threshold(detail, threshold, mode='soft')) |
| |
| |
| denoised = pywt.waverec(coeffs_thresh, wavelet) |
| return denoised[:n] |
|
|
|
|
| def preprocess_ohlcv(df, columns=None): |
| """ |
| Apply wavelet denoising to each OHLCV column, then normalize to [0,1]. |
| |
| Args: |
| df: pandas DataFrame with OHLCV columns |
| columns: list of column names to process (default: ['Open','High','Low','Close','Volume']) |
| |
| Returns: |
| denoised_df: DataFrame with denoised and normalized values |
| scaler: fitted MinMaxScaler (for inverse transform during evaluation) |
| """ |
| from sklearn.preprocessing import MinMaxScaler |
| |
| if columns is None: |
| columns = ['Open', 'High', 'Low', 'Close', 'Volume'] |
| |
| denoised = df.copy() |
| |
| |
| for col in columns: |
| denoised[col] = wavelet_denoise(df[col].values) |
| |
| |
| scaler = MinMaxScaler() |
| denoised[columns] = scaler.fit_transform(denoised[columns]) |
| |
| return denoised, scaler |
|
|
|
|
| if __name__ == "__main__": |
| |
| np.random.seed(42) |
| t = np.linspace(0, 4 * np.pi, 1000) |
| clean = np.sin(t) + 0.5 * np.sin(3 * t) |
| noisy = clean + 0.3 * np.random.randn(len(t)) |
| |
| denoised = wavelet_denoise(noisy) |
| |
| mse_noisy = np.mean((noisy - clean) ** 2) |
| mse_denoised = np.mean((denoised - clean) ** 2) |
| |
| print(f"MSE (noisy vs clean): {mse_noisy:.6f}") |
| print(f"MSE (denoised vs clean): {mse_denoised:.6f}") |
| print(f"Noise reduction: {(1 - mse_denoised / mse_noisy) * 100:.1f}%") |
|
|