import numpy as np from scipy.signal import savgol_filter, find_peaks import sys from CPR_Module.Common.logging_config import cpr_logger class MetricsCalculator: """Rate and depth calculation from motion data with improved peak detection""" def __init__(self, shoulder_width_cm): # Configuration parameters self.shoulder_width_cm = shoulder_width_cm # Parameters for cleaning the smoothed midpoints self.removing_impulse_noise_window_size = 5 self.removing_impulse_noise_threshold = 3.0 # Parameters for one chunk self.y_preprocessed = np.array([]) self.peaks = np.array([]) self.peaks_max = np.array([]) self.peaks_min = np.array([]) self.cm_px_ratio = None self.depth = None self.rate = None self.rate_and_depth_warnings = [] # Parameters for all chunks self.chunks_y_preprocessed = [] self.chunks_peaks = [] self.chunks_depth = [] self.chunks_rate = [] self.weighted_depth = None self.weighted_rate = None self.chunks_start_and_end_indices = [] self.chunks_rate_and_depth_warnings = [] # Parameters for validation self.min_depth_threshold = 3.0 # cm self.max_depth_threshold = 6.0 # cm self.min_rate_threshold = 100.0 # cpm self.max_rate_threshold = 120.0 # cpm #^ ################# Validating ####################### def validate_midpoints_and_frames_count_in_chunk(self, y_exact, chunk_start_frame_index, chunk_end_frame_index, sampling_interval_in_frames): """ Validate the number of midpoints and frames in a chunk Args: y_exact (np.ndarray): The exact y-values of the midpoints. chunk_start_frame_index (int): The starting frame index of the chunk. chunk_end_frame_index (int): The ending frame index of the chunk. sampling_interval_in_frames (int): The interval at which frames are sampled. Raises: ValueError: If the number of midpoints does not match the expected number for the given chunk. """ try: # Calculate expected number of sampled frames start = chunk_start_frame_index end = chunk_end_frame_index interval = sampling_interval_in_frames # Mathematical formula to count sampled frames expected_samples = (end // interval) - ((start - 1) // interval) # Validate actual_y_exact_length = len(y_exact) if actual_y_exact_length != expected_samples: cpr_logger.info(f"\nERROR: Mismatch in expected and actual samples") cpr_logger.info(f"Expected: {expected_samples} samples (frames {start}-{end} @ every {interval} frames)") cpr_logger.info(f"Actual: {actual_y_exact_length} midoints points recieived") return np.array([]) return y_exact except Exception as e: cpr_logger.error(f"\nCRITICAL VALIDATION ERROR: {str(e)}") return np.array([]) #^ ################# Preprocessing ####################### def _smooth_midpoints(self, midpoints): """ Smooth the y-values of the midpoints using Savitzky-Golay filter Args: y_exact (np.ndarray): The exact y-values of the midpoints. Returns: np.ndarray: The smoothed y-values. """ if len(midpoints) > 5: # Ensure enough data points try: y_smooth = savgol_filter( midpoints[:, 1], window_length=3, polyorder=2, mode='nearest' ) return y_smooth except Exception as e: cpr_logger.error(f"Smoothing error: {e}") y_smooth = midpoints[:, 1] # Fallback to original return y_smooth else: y_smooth = midpoints[:, 1] # Not enough points return y_smooth def _clean_midpoints(self, y_smooth): """ Clean the smoothed y-values to remove impulse noise using median filtering Args: y_smooth (np.ndarray): The smoothed y-values. Returns: np.ndarray: The cleaned y-values. """ if len(y_smooth) < self.removing_impulse_noise_window_size: return y_smooth # Not enough points for processing y_clean = np.array(y_smooth, dtype=float) # Copy to avoid modifying original half_window = self.removing_impulse_noise_window_size // 2 for i in range(len(y_smooth)): # Get local window (handle boundaries) start = max(0, i - half_window) end = min(len(y_smooth), i + half_window + 1) window = y_smooth[start:end] # Calculate local median and MAD (robust statistics) med = np.median(window) mad = 1.4826 * np.median(np.abs(window - med)) # Median Absolute Deviation # Detect and replace outliers if abs(y_smooth[i] - med) > self.removing_impulse_noise_threshold * mad: # Replace with median of immediate neighbors (better than global median) left = y_smooth[max(0, i-1)] right = y_smooth[min(len(y_smooth)-1, i+1)] y_clean[i] = np.median([left, right]) return y_clean def preprocess_midpoints(self, midpoints): """ Preprocess the y-values of the midpoints by smoothing and cleaning Sets: y_preprocessed (np.ndarray): The preprocessed y-values. Args: y_exact (np.ndarray): The exact y-values of the midpoints. Returns: bool: True if preprocessing was successful, False otherwise. """ y_smooth = self._smooth_midpoints(midpoints) y_clean = self._clean_midpoints(y_smooth) self.y_preprocessed = y_clean return len(self.y_preprocessed) > 0 # Return True if preprocessing was successful #^ ################# Processing ####################### def detect_midpoints_peaks(self): """ Detect peaks in the preprocessed y-values using dynamic distance Sets: peaks (np.ndarray): The detected peaks. peaks_max (np.ndarray): The detected max peaks. peaks_min (np.ndarray): The detected min peaks. Returns: bool: True if peaks were detected, False otherwise. """ if self.y_preprocessed.size == 0: cpr_logger.info("No smoothed values found for peak detection") return False try: distance = min(1, len(self.y_preprocessed)) # Dynamic distance based on data length # Detect max peaks with default prominence self.peaks_max, _ = find_peaks(self.y_preprocessed, distance=distance) # Detect min peaks with reduced or no prominence requirement self.peaks_min, _ = find_peaks( -self.y_preprocessed, distance=distance, prominence=(0.5, None) # Adjust based on your data's characteristics ) self.peaks = np.sort(np.concatenate((self.peaks_max, self.peaks_min))) return len(self.peaks) > 0 except Exception as e: cpr_logger.error(f"Peak detection error: {e}") return False def calculate_cm_px_ratio(self, shoulder_distances): """ Calculate the ratio of cm to pixels based on shoulder distances Sets: cm_px_ratio (float): The ratio of cm to pixels. Args: shoulder_distances (list): List of shoulder distances in pixels. """ if len(shoulder_distances) > 0: avg_shoulder_width_px = np.mean(shoulder_distances) self.cm_px_ratio = self.shoulder_width_cm / avg_shoulder_width_px else: self.cm_px_ratio = None cpr_logger.info("No shoulder distances available for cm/px ratio calculation") def calculate_rate_and_depth_for_chunk(self, original_fps, sampling_interval_in_frames=1): """ Calculate the rate and depth of the motion data for a chunk. Sets: depth (float): The calculated depth in cm. rate (float): The calculated rate in cpm. Args: original_fps (float): The original frames per second of the video. sampling_interval_in_frames (int): Number of frames skipped between samples. """ try: # Without Adjustment: A peak distance of 5 (downsampled frames) would incorrectly be interpreted as 5/30 = 0.167 sec (too short). # With Adjustment: The same peak distance 5 (downsampled frames) correctly represents 5/10 = 0.5 sec. effective_fps = original_fps / sampling_interval_in_frames # Correctly reduced FPS # Depth calculation (unchanged) depth = None if len(self.peaks) > 1: depth = np.mean(np.abs(np.diff(self.y_preprocessed[self.peaks]))) * self.cm_px_ratio # Rate calculation (now uses effective_fps) rate = None if len(self.peaks_max) > 1: # Peak indices are from the downsampled signal, so we use effective_fps peak_intervals = np.diff(self.peaks_max) # Already in downsampled frames rate = (1 / (np.mean(peak_intervals) / effective_fps)) * 60 # Correct CPM # Handle cases with no valid data if depth is None or rate is None: depth = 0 rate = 0 self.peaks = np.array([]) self.depth = depth self.rate = rate except Exception as e: cpr_logger.error(f"Error calculating rate and depth: {e}") def assign_chunk_data(self, chunk_start_frame_index, chunk_end_frame_index): """ Capture chunk data for later analysis Sets: chunks_depth (list): List of depths for each chunk. chunks_rate (list): List of rates for each chunk. chunks_start_and_end_indices (list): List of start and end indices for each chunk. chunks_y_preprocessed (list): List of preprocessed y-values for each chunk. chunks_peaks (list): List of detected peaks for each chunk. Args: chunk_start_frame_index (int): The starting frame index of the chunk. chunk_end_frame_index (int): The ending frame index of the chunk. """ self.chunks_depth.append(self.depth) self.chunks_rate.append(self.rate) self.chunks_start_and_end_indices.append((chunk_start_frame_index, chunk_end_frame_index)) self.chunks_y_preprocessed.append(self.y_preprocessed.copy()) self.chunks_peaks.append(self.peaks.copy()) self.current_chunk_start = chunk_start_frame_index self.current_chunk_end = chunk_end_frame_index self.chunks_rate_and_depth_warnings.append(self.rate_and_depth_warnings.copy()) def calculate_rate_and_depth_for_all_chunk(self): """ Calculate the weighted average rate and depth for all chunks Sets: weighted_depth (float): The weighted average depth in cm. weighted_rate (float): The weighted average rate in cpm. """ if not self.chunks_depth or not self.chunks_rate or not self.chunks_start_and_end_indices: cpr_logger.info("[WARNING] No chunk data available for averaging") return None if not (len(self.chunks_depth) == len(self.chunks_rate) == len(self.chunks_start_and_end_indices)): cpr_logger.info("[ERROR] Mismatched chunk data lists") return None total_weight = 0 weighted_depth_sum = 0 weighted_rate_sum = 0 for depth, rate, (start, end) in zip(self.chunks_depth, self.chunks_rate, self.chunks_start_and_end_indices): # Calculate chunk duration (+1 because inclusive) chunk_duration = end - start + 1 weighted_depth_sum += depth * chunk_duration weighted_rate_sum += rate * chunk_duration total_weight += chunk_duration if total_weight == 0: self.weighted_depth = None self.weighted_rate = None cpr_logger.info("[ERROR] No valid chunks for averaging") else: self.weighted_depth = weighted_depth_sum / total_weight self.weighted_rate = weighted_rate_sum / total_weight cpr_logger.info(f"[RESULTS] Weighted average depth: {self.weighted_depth:.1f} cm") cpr_logger.info(f"[RESULTS] Weighted average rate: {self.weighted_rate:.1f} cpm") #^ ################# Warnings ####################### def _get_rate_and_depth_status(self): """Internal validation logic""" depth_status = "normal" rate_status = "normal" if self.depth < self.min_depth_threshold and self.depth > 0: depth_status = "low" elif self.depth > self.max_depth_threshold: depth_status = "high" if self.rate < self.min_rate_threshold and self.rate > 0: rate_status = "low" elif self.rate > self.max_rate_threshold: rate_status = "high" return depth_status, rate_status def get_rate_and_depth_warnings(self): """Get performance warnings based on depth and rate""" depth_status, rate_status = self._get_rate_and_depth_status() warnings = [] if depth_status == "low": warnings.append("Depth too low!") elif depth_status == "high": warnings.append("Depth too high!") if rate_status == "low": warnings.append("Rate too slow!") elif rate_status == "high": warnings.append("Rate too fast!") self.rate_and_depth_warnings = warnings return warnings #^ ################# Handle Chunk ####################### def handle_chunk(self, midpoints, chunk_start_frame_index, chunk_end_frame_index, fps, shoulder_distances, sampling_interval_in_frames): """ Handle a chunk of motion data by validating, preprocessing, and calculating metrics for the chunk. Args: y_exact (np.ndarray): The exact y-values of the midpoints. chunk_start_frame_index (int): The starting frame index of the chunk. chunk_end_frame_index (int): The ending frame index of the chunk. fps (float): The frames per second of the video. shoulder_distances (list): List of shoulder distances in pixels. Returns: bool: True if the chunk was processed successfully, False otherwise. """ # The program is terminated if the validation fails midpoints = self.validate_midpoints_and_frames_count_in_chunk(midpoints, chunk_start_frame_index, chunk_end_frame_index, sampling_interval_in_frames) preprocessing_reult = self.preprocess_midpoints(midpoints) if not preprocessing_reult: cpr_logger.info("Preprocessing failed, skipping chunk") return False self.detect_midpoints_peaks() if not self.detect_midpoints_peaks(): cpr_logger.info("Peak detection failed, skipping chunk") self.peaks = np.array([]) self.peaks_max = np.array([]) self.peaks_min = np.array([]) self.depth = 0 self.rate = 0 return False self.calculate_cm_px_ratio(shoulder_distances) if self.cm_px_ratio is None: cpr_logger.info("cm/px ratio calculation failed, skipping chunk") self.depth = 0 self.rate = 0 return False self.calculate_rate_and_depth_for_chunk(fps, sampling_interval_in_frames) if self.depth is None or self.rate is None: cpr_logger.info("Rate and depth calculation failed, skipping chunk") return False else: cpr_logger.info(f"Chunk {chunk_start_frame_index}-{chunk_end_frame_index} - Depth: {self.depth:.1f} cm, Rate: {self.rate:.1f} cpm") self.get_rate_and_depth_warnings() self.assign_chunk_data(chunk_start_frame_index, chunk_end_frame_index) cpr_logger.info(f"Chunk {chunk_start_frame_index}-{chunk_end_frame_index} processed successfully") return True