patchtst-wavelet-sp500-research / wavelet_denoise.py
tbukuai's picture
Add wavelet denoising module
bba1fd4 verified
"""
Wavelet Denoising for Financial Time Series
=============================================
DWT denoising with Daubechies db4 wavelet, soft-thresholding (Donoho-Johnstone).
Reference: arxiv:2408.12408 Section III-C
Usage:
from wavelet_denoise import wavelet_denoise, preprocess_ohlcv
# Single signal
denoised_close = wavelet_denoise(df['Close'].values)
# Full OHLCV DataFrame
df_denoised, scaler = preprocess_ohlcv(df)
"""
import numpy as np
import pywt
def wavelet_denoise(signal: np.ndarray, wavelet: str = 'db4', level: int = None) -> np.ndarray:
"""
DWT denoising: db4 wavelet, soft-thresholding, zero high-freq detail coefficients.
From: arxiv:2408.12408 Section III-C (best configuration for S&P 500).
Algorithm:
1. Pad signal to power-of-2 length (mitigate boundary effects)
2. Decompose with DWT (db4 wavelet)
3. Estimate noise σ via MAD of finest detail coefficients
4. Compute universal threshold (Donoho-Johnstone): σ * √(2·log(N))
5. Soft-threshold all detail coefficients (keep approximation untouched)
6. Reconstruct and trim to original length
Args:
signal: 1D numpy array of time series values
wavelet: Wavelet family to use (default: 'db4' — Daubechies 4)
level: Decomposition level (default: maximum possible)
Returns:
Denoised signal of same length as input
"""
n = len(signal)
# Pad to power of 2 to mitigate boundary effects
pad_len = 2 ** int(np.ceil(np.log2(n))) - n
padded = np.pad(signal, (0, pad_len), mode='edge')
# Decompose
if level is None:
level = pywt.dwt_max_level(len(padded), wavelet)
coeffs = pywt.wavedec(padded, wavelet, level=level)
# Estimate noise sigma via Median Absolute Deviation (MAD) of finest detail coefficients
sigma = np.median(np.abs(coeffs[-1])) / 0.6745
# Universal threshold (Donoho-Johnstone)
threshold = sigma * np.sqrt(2 * np.log(len(padded)))
# Soft-threshold all detail coefficients (keep approximation untouched)
coeffs_thresh = [coeffs[0]] # Keep approximation coefficients as-is
for detail in coeffs[1:]:
coeffs_thresh.append(pywt.threshold(detail, threshold, mode='soft'))
# Reconstruct and trim padding
denoised = pywt.waverec(coeffs_thresh, wavelet)
return denoised[:n]
def preprocess_ohlcv(df, columns=None):
"""
Apply wavelet denoising to each OHLCV column, then normalize to [0,1].
Args:
df: pandas DataFrame with OHLCV columns
columns: list of column names to process (default: ['Open','High','Low','Close','Volume'])
Returns:
denoised_df: DataFrame with denoised and normalized values
scaler: fitted MinMaxScaler (for inverse transform during evaluation)
"""
from sklearn.preprocessing import MinMaxScaler
if columns is None:
columns = ['Open', 'High', 'Low', 'Close', 'Volume']
denoised = df.copy()
# Step 1: Wavelet denoise each column independently
for col in columns:
denoised[col] = wavelet_denoise(df[col].values)
# Step 2: MinMax normalize to [0, 1]
scaler = MinMaxScaler()
denoised[columns] = scaler.fit_transform(denoised[columns])
return denoised, scaler
if __name__ == "__main__":
# Quick test with synthetic data
np.random.seed(42)
t = np.linspace(0, 4 * np.pi, 1000)
clean = np.sin(t) + 0.5 * np.sin(3 * t)
noisy = clean + 0.3 * np.random.randn(len(t))
denoised = wavelet_denoise(noisy)
mse_noisy = np.mean((noisy - clean) ** 2)
mse_denoised = np.mean((denoised - clean) ** 2)
print(f"MSE (noisy vs clean): {mse_noisy:.6f}")
print(f"MSE (denoised vs clean): {mse_denoised:.6f}")
print(f"Noise reduction: {(1 - mse_denoised / mse_noisy) * 100:.1f}%")