LIBRE / src /infrastructure /processing /scipy_signal_processor.py
RyZ
feat: adding full working local ETL Pipeline
e391a84
Raw
History Blame Contribute Delete
5.56 kB
"""
infrastructure/processing/scipy_signal_processor.py
─────────────────────────────────────────────────────
ScipySignalProcessor β€” implements SignalProcessor using SciPy.
Pipeline (Template Method from SignalProcessor.process()):
1. filter_signal() β€” Butterworth bandpass (0.5–8 Hz, 4th order)
2. normalize() β€” Z-score normalisation (mean=0, std=1)
3. segment() β€” sliding 8-second windows (no overlap)
All constants are read from src/shared/constants.py β€” no magic numbers here.
"""
from __future__ import annotations
import numpy as np
from scipy import signal as scipy_signal
from src.domain.interfaces.services.signal_processor import SignalProcessor
from src.shared.constants import (
PPG_BANDPASS_HIGH,
PPG_BANDPASS_LOW,
PPG_FILTER_ORDER,
PPG_SEGMENT_DURATION_SEC,
)
from src.shared.logger import get_logger
logger = get_logger(__name__)
class ScipySignalProcessor(SignalProcessor):
"""
PPG signal preprocessor using SciPy's digital signal processing tools.
Implements the three abstract steps of SignalProcessor:
filter_signal β†’ Butterworth bandpass IIR filter
normalize β†’ Z-score (subtract mean, divide by std)
segment β†’ Fixed-length windows (non-overlapping)
"""
# ── Step 1: Filter ────────────────────────────────────────────────────────
def filter_signal(
self,
signal: np.ndarray,
sampling_rate: float,
) -> np.ndarray:
"""
Apply a 4th-order Butterworth bandpass filter (0.5–8 Hz).
Removes:
β€’ Baseline wander (< 0.5 Hz β€” motion artefacts, breathing)
β€’ High-frequency noise (> 8 Hz β€” electronics, EMI)
Uses ``sosfiltfilt`` (zero-phase, forward-backward) to avoid
phase distortion in the filtered signal.
Args:
signal: 1-D raw PPG amplitude array.
sampling_rate: Sampling rate in Hz.
Returns:
Filtered 1-D NumPy array (same length as input).
"""
nyquist = sampling_rate / 2.0
low = PPG_BANDPASS_LOW / nyquist
high = PPG_BANDPASS_HIGH / nyquist
# Clamp to valid range (< 1.0) to avoid ValueError from scipy
low = max(1e-4, min(low, 0.999))
high = max(low + 1e-4, min(high, 0.999))
sos = scipy_signal.butter(
N=PPG_FILTER_ORDER,
Wn=[low, high],
btype="bandpass",
output="sos",
)
filtered = scipy_signal.sosfiltfilt(sos, signal)
logger.debug(
"filter_signal() β€” fs=%.1f Hz, band=[%.2f, %.2f] Hz",
sampling_rate,
PPG_BANDPASS_LOW,
PPG_BANDPASS_HIGH,
)
return filtered
# ── Step 2: Normalize ─────────────────────────────────────────────────────
def normalize(self, signal: np.ndarray) -> np.ndarray:
"""
Z-score normalise the signal to mean=0, std=1.
Handles edge cases:
β€’ All-zero or constant signal β†’ return zeros (avoid division by zero).
Args:
signal: 1-D filtered PPG signal.
Returns:
Z-score normalised 1-D array.
"""
mean = np.mean(signal)
std = np.std(signal)
if std < 1e-8:
logger.warning(
"normalize() β€” signal std β‰ˆ 0 (constant signal). "
"Returning zero array."
)
return np.zeros_like(signal)
normalised = (signal - mean) / std
logger.debug(
"normalize() β€” mean=%.4f, std=%.4f β†’ Z-score applied", mean, std
)
return normalised
# ── Step 3: Segment ───────────────────────────────────────────────────────
def segment(
self,
signal: np.ndarray,
sampling_rate: float,
) -> np.ndarray:
"""
Split the signal into non-overlapping windows of PPG_SEGMENT_DURATION_SEC seconds.
Any trailing samples that don't fill a complete window are discarded.
Args:
signal: 1-D normalised PPG signal.
sampling_rate: Sampling rate in Hz.
Returns:
2-D NumPy array of shape ``(n_segments, window_size)``.
Returns shape ``(0, window_size)`` if the signal is too short.
"""
window_size = int(PPG_SEGMENT_DURATION_SEC * sampling_rate)
if len(signal) < window_size:
logger.warning(
"segment() β€” signal length %d < window_size %d. "
"Returning empty segments array.",
len(signal),
window_size,
)
return np.empty((0, window_size), dtype=np.float64)
n_segments = len(signal) // window_size
trimmed = signal[: n_segments * window_size]
segments = trimmed.reshape(n_segments, window_size)
logger.debug(
"segment() β€” %d segments of %d samples (%.1f s each)",
n_segments,
window_size,
PPG_SEGMENT_DURATION_SEC,
)
return segments