File size: 5,560 Bytes
e391a84 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | """
infrastructure/processing/scipy_signal_processor.py
βββββββββββββββββββββββββββββββββββββββββββββββββββββ
ScipySignalProcessor β implements SignalProcessor using SciPy.
Pipeline (Template Method from SignalProcessor.process()):
1. filter_signal() β Butterworth bandpass (0.5β8 Hz, 4th order)
2. normalize() β Z-score normalisation (mean=0, std=1)
3. segment() β sliding 8-second windows (no overlap)
All constants are read from src/shared/constants.py β no magic numbers here.
"""
from __future__ import annotations
import numpy as np
from scipy import signal as scipy_signal
from src.domain.interfaces.services.signal_processor import SignalProcessor
from src.shared.constants import (
PPG_BANDPASS_HIGH,
PPG_BANDPASS_LOW,
PPG_FILTER_ORDER,
PPG_SEGMENT_DURATION_SEC,
)
from src.shared.logger import get_logger
logger = get_logger(__name__)
class ScipySignalProcessor(SignalProcessor):
"""
PPG signal preprocessor using SciPy's digital signal processing tools.
Implements the three abstract steps of SignalProcessor:
filter_signal β Butterworth bandpass IIR filter
normalize β Z-score (subtract mean, divide by std)
segment β Fixed-length windows (non-overlapping)
"""
# ββ Step 1: Filter ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def filter_signal(
self,
signal: np.ndarray,
sampling_rate: float,
) -> np.ndarray:
"""
Apply a 4th-order Butterworth bandpass filter (0.5β8 Hz).
Removes:
β’ Baseline wander (< 0.5 Hz β motion artefacts, breathing)
β’ High-frequency noise (> 8 Hz β electronics, EMI)
Uses ``sosfiltfilt`` (zero-phase, forward-backward) to avoid
phase distortion in the filtered signal.
Args:
signal: 1-D raw PPG amplitude array.
sampling_rate: Sampling rate in Hz.
Returns:
Filtered 1-D NumPy array (same length as input).
"""
nyquist = sampling_rate / 2.0
low = PPG_BANDPASS_LOW / nyquist
high = PPG_BANDPASS_HIGH / nyquist
# Clamp to valid range (< 1.0) to avoid ValueError from scipy
low = max(1e-4, min(low, 0.999))
high = max(low + 1e-4, min(high, 0.999))
sos = scipy_signal.butter(
N=PPG_FILTER_ORDER,
Wn=[low, high],
btype="bandpass",
output="sos",
)
filtered = scipy_signal.sosfiltfilt(sos, signal)
logger.debug(
"filter_signal() β fs=%.1f Hz, band=[%.2f, %.2f] Hz",
sampling_rate,
PPG_BANDPASS_LOW,
PPG_BANDPASS_HIGH,
)
return filtered
# ββ Step 2: Normalize βββββββββββββββββββββββββββββββββββββββββββββββββββββ
def normalize(self, signal: np.ndarray) -> np.ndarray:
"""
Z-score normalise the signal to mean=0, std=1.
Handles edge cases:
β’ All-zero or constant signal β return zeros (avoid division by zero).
Args:
signal: 1-D filtered PPG signal.
Returns:
Z-score normalised 1-D array.
"""
mean = np.mean(signal)
std = np.std(signal)
if std < 1e-8:
logger.warning(
"normalize() β signal std β 0 (constant signal). "
"Returning zero array."
)
return np.zeros_like(signal)
normalised = (signal - mean) / std
logger.debug(
"normalize() β mean=%.4f, std=%.4f β Z-score applied", mean, std
)
return normalised
# ββ Step 3: Segment βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def segment(
self,
signal: np.ndarray,
sampling_rate: float,
) -> np.ndarray:
"""
Split the signal into non-overlapping windows of PPG_SEGMENT_DURATION_SEC seconds.
Any trailing samples that don't fill a complete window are discarded.
Args:
signal: 1-D normalised PPG signal.
sampling_rate: Sampling rate in Hz.
Returns:
2-D NumPy array of shape ``(n_segments, window_size)``.
Returns shape ``(0, window_size)`` if the signal is too short.
"""
window_size = int(PPG_SEGMENT_DURATION_SEC * sampling_rate)
if len(signal) < window_size:
logger.warning(
"segment() β signal length %d < window_size %d. "
"Returning empty segments array.",
len(signal),
window_size,
)
return np.empty((0, window_size), dtype=np.float64)
n_segments = len(signal) // window_size
trimmed = signal[: n_segments * window_size]
segments = trimmed.reshape(n_segments, window_size)
logger.debug(
"segment() β %d segments of %d samples (%.1f s each)",
n_segments,
window_size,
PPG_SEGMENT_DURATION_SEC,
)
return segments
|