File size: 7,076 Bytes
b0e88cf | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 | # EVOLVE-BLOCK-START
"""
Real-Time Adaptive Signal Processing Algorithm for Non-Stationary Time Series
This algorithm implements a sliding window approach to filter volatile, non-stationary
time series data while minimizing noise and preserving signal dynamics.
"""
import numpy as np
def adaptive_filter(x, window_size=20):
"""
Adaptive signal processing algorithm using sliding window approach.
Args:
x: Input signal (1D array of real-valued samples)
window_size: Size of the sliding window (W samples)
Returns:
y: Filtered output signal with length = len(x) - window_size + 1
"""
if len(x) < window_size:
raise ValueError(f"Input signal length ({len(x)}) must be >= window_size ({window_size})")
# Initialize output array
output_length = len(x) - window_size + 1
y = np.zeros(output_length)
# Simple moving average as baseline
for i in range(output_length):
window = x[i : i + window_size]
# Basic moving average filter
y[i] = np.mean(window)
return y
def enhanced_filter_with_trend_preservation(x, window_size=20):
"""
Enhanced version with trend preservation using weighted moving average.
Args:
x: Input signal (1D array of real-valued samples)
window_size: Size of the sliding window
Returns:
y: Filtered output signal
"""
if len(x) < window_size:
raise ValueError(f"Input signal length ({len(x)}) must be >= window_size ({window_size})")
output_length = len(x) - window_size + 1
y = np.zeros(output_length)
# Create weights that emphasize recent samples
weights = np.exp(np.linspace(-2, 0, window_size))
weights = weights / np.sum(weights)
for i in range(output_length):
window = x[i : i + window_size]
# Weighted moving average with exponential weights
y[i] = np.sum(window * weights)
return y
def process_signal(input_signal, window_size=20, algorithm_type="enhanced"):
"""
Main signal processing function that applies the selected algorithm.
Args:
input_signal: Input time series data
window_size: Window size for processing
algorithm_type: Type of algorithm to use ("basic" or "enhanced")
Returns:
Filtered signal
"""
if algorithm_type == "enhanced":
return enhanced_filter_with_trend_preservation(input_signal, window_size)
else:
return adaptive_filter(input_signal, window_size)
# EVOLVE-BLOCK-END
def generate_test_signal(length=1000, noise_level=0.3, seed=42):
"""
Generate synthetic test signal with known characteristics.
Args:
length: Length of the signal
noise_level: Standard deviation of noise to add
seed: Random seed for reproducibility
Returns:
Tuple of (noisy_signal, clean_signal)
"""
np.random.seed(seed)
t = np.linspace(0, 10, length)
# Create a complex signal with multiple components
clean_signal = (
2 * np.sin(2 * np.pi * 0.5 * t) # Low frequency component
+ 1.5 * np.sin(2 * np.pi * 2 * t) # Medium frequency component
+ 0.5 * np.sin(2 * np.pi * 5 * t) # Higher frequency component
+ 0.8 * np.exp(-t / 5) * np.sin(2 * np.pi * 1.5 * t) # Decaying oscillation
)
# Add non-stationary behavior
trend = 0.1 * t * np.sin(0.2 * t) # Slowly varying trend
clean_signal += trend
# Add random walk component for non-stationarity
random_walk = np.cumsum(np.random.randn(length) * 0.05)
clean_signal += random_walk
# Add noise
noise = np.random.normal(0, noise_level, length)
noisy_signal = clean_signal + noise
return noisy_signal, clean_signal
def run_signal_processing(noisy_signal=None, signal_length=1000, noise_level=0.3, window_size=20):
"""
Run the signal processing algorithm on a test signal.
Args:
noisy_signal: Input signal to filter (if provided, use this; otherwise generate)
signal_length: Length if generating signal (for backward compatibility)
noise_level: Noise level if generating signal (for backward compatibility)
window_size: Window size for processing
Returns:
Dictionary containing results and metrics
"""
# Use provided signal or generate test signal (for backward compatibility)
if noisy_signal is not None:
# Filter the provided signal
filtered_signal = process_signal(noisy_signal, window_size, "enhanced")
clean_signal = None # Not available when using provided signal
else:
# Generate test signal (for __main__ and backward compatibility)
noisy_signal, clean_signal = generate_test_signal(signal_length, noise_level)
filtered_signal = process_signal(noisy_signal, window_size, "enhanced")
# Calculate basic metrics (only if we have clean_signal from generation)
if len(filtered_signal) > 0 and clean_signal is not None:
# Align signals for comparison (account for processing delay)
delay = window_size - 1
aligned_clean = clean_signal[delay:]
aligned_noisy = noisy_signal[delay:]
# Ensure same length
min_length = min(len(filtered_signal), len(aligned_clean))
filtered_signal = filtered_signal[:min_length]
aligned_clean = aligned_clean[:min_length]
aligned_noisy = aligned_noisy[:min_length]
# Calculate correlation with clean signal
correlation = np.corrcoef(filtered_signal, aligned_clean)[0, 1] if min_length > 1 else 0
# Calculate noise reduction
noise_before = np.var(aligned_noisy - aligned_clean)
noise_after = np.var(filtered_signal - aligned_clean)
noise_reduction = (noise_before - noise_after) / noise_before if noise_before > 0 else 0
return {
"filtered_signal": filtered_signal,
"clean_signal": aligned_clean,
"noisy_signal": aligned_noisy,
"correlation": correlation,
"noise_reduction": noise_reduction,
"signal_length": min_length,
}
elif len(filtered_signal) > 0:
# When using provided signal (no clean_signal available), just return filtered signal
return {
"filtered_signal": filtered_signal,
"clean_signal": None,
"noisy_signal": None,
"correlation": 0,
"noise_reduction": 0,
"signal_length": len(filtered_signal),
}
else:
return {
"filtered_signal": [],
"clean_signal": [],
"noisy_signal": [],
"correlation": 0,
"noise_reduction": 0,
"signal_length": 0,
}
if __name__ == "__main__":
# Test the algorithm
results = run_signal_processing()
print("Signal processing completed!")
print(f"Correlation with clean signal: {results['correlation']:.3f}")
print(f"Noise reduction: {results['noise_reduction']:.3f}")
print(f"Processed signal length: {results['signal_length']}")
|