Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| from sklearn.base import BaseEstimator, TransformerMixin | |
| from scipy.fftpack import fft | |
| from scipy.signal import welch | |
| import pywt | |
| #from _config import config | |
| class ExtractFeatures(BaseEstimator, TransformerMixin): | |
| def __init__(self, window_length, window_step_size, data_frequency, selected_domains=None, include_magnitude=False, features_label_columns=None): | |
| self.window_length = window_length | |
| self.window_step_size = window_step_size | |
| self.data_frequency = data_frequency | |
| self.selected_domains = selected_domains | |
| self.include_magnitude = include_magnitude | |
| self.features_label_columns = features_label_columns #if label_columns else ["arousal", "valence"] # Default to arousal and valence if not specified | |
| def fit(self, X, y=None): | |
| return self | |
| def transform(self, X): | |
| features_list = [] | |
| if 'groupid' in X.columns: # Check for groupid column | |
| for groupid in X['groupid'].unique(): # Iterate over unique group IDs | |
| temp = X[X['groupid'] == groupid] # Filter rows by group ID | |
| temp_ex = temp[['accel_time', 'x', 'y', 'z']].copy() # Keep only the necessary columns (accel_time can be removed if unused) | |
| windows = self._window_data(temp_ex[['x', 'y', 'z']]) # Create windows of data | |
| for window in windows: | |
| features = self._extract_features_from_window(window) # Extract features from each window | |
| features['groupid'] = groupid # Add groupid to the features | |
| # Dynamically add emotion labels to the features | |
| for label in self.features_label_columns: | |
| features[label] = temp[label].iloc[0] | |
| features_list.append(pd.DataFrame([features])) # Convert dictionary to DataFrame | |
| else: # In case there's no groupid, calculate features without it | |
| windows = self._window_data(X[['x', 'y', 'z']]) | |
| for window in windows: | |
| features = self._extract_features_from_window(window) | |
| features_list.append(pd.DataFrame([features])) | |
| all_features = pd.concat(features_list, ignore_index=True) | |
| # Export features to CSV | |
| window_length_str = str(self.window_length) | |
| window_step_size_str = str(self.window_step_size) | |
| if self.selected_domains is None: # All features calculated if domains are not selected | |
| domain_str = "all_features" | |
| else: | |
| domain_str = "_".join(self.selected_domains) | |
| file_name = f"features_window_{window_length_str}_step_{window_step_size_str}_{domain_str}.csv" | |
| all_features.to_csv(file_name, index=False) | |
| print("All features extracted successfully.") | |
| return all_features | |
| # Time Domain Features | |
| def _calculate_magnitude(self, window): | |
| return np.sqrt(window[:, 0]**2 + window[:, 1]**2 + window[:, 2]**2) | |
| def _window_data(self, data): # Function to create windows of the data | |
| window_samples = int(self.window_length * self.data_frequency) # Number of samples in each window 60sec * 25Hz = 1500 samples | |
| step_samples = int(self.window_step_size * self.data_frequency) # Number of samples to move the window | |
| windows = [data[i:i + window_samples] for i in range(0, len(data) - window_samples + 1, step_samples)] # Create windows | |
| return np.array(windows) | |
| def _extract_features_from_window(self, window): #DONE Mehrere domains gleichzeitig berechnen | |
| all_features = {} | |
| if self.selected_domains is None or 'time_domain' in self.selected_domains: | |
| all_features.update(self._extract_time_domain_features(window)) | |
| if self.selected_domains is None or 'spatial' in self.selected_domains: | |
| all_features.update(self._extract_spatial_features(window)) | |
| if self.selected_domains is None or 'frequency' in self.selected_domains: | |
| all_features.update(self._extract_frequency_domain_features(window)) | |
| if self.selected_domains is None or 'statistical' in self.selected_domains: | |
| all_features.update(self._extract_statistical_features(window)) | |
| if self.selected_domains is None or 'wavelet' in self.selected_domains: | |
| all_features.update(self._extract_wavelet_features(window)) | |
| return all_features | |
| def _extract_time_domain_features(self, window): | |
| features = { | |
| 'mean_x': np.mean(window[:, 0]), | |
| 'mean_y': np.mean(window[:, 1]), | |
| 'mean_z': np.mean(window[:, 2]), | |
| 'std_x': np.std(window[:, 0]), | |
| 'std_y': np.std(window[:, 1]), | |
| 'std_z': np.std(window[:, 2]), | |
| 'variance_x': np.var(window[:, 0]), | |
| 'variance_y': np.var(window[:, 1]), | |
| 'variance_z': np.var(window[:, 2]), | |
| 'rms_x': np.sqrt(np.mean(window[:, 0]**2)), | |
| 'rms_y': np.sqrt(np.mean(window[:, 1]**2)), | |
| 'rms_z': np.sqrt(np.mean(window[:, 2]**2)), | |
| 'max_x': np.max(window[:, 0]), | |
| 'max_y': np.max(window[:, 1]), | |
| 'max_z': np.max(window[:, 2]), | |
| 'min_x': np.min(window[:, 0]), | |
| 'min_y': np.min(window[:, 1]), | |
| 'min_z': np.min(window[:, 2]), | |
| 'peak_to_peak_x': np.ptp(window[:, 0]), | |
| 'peak_to_peak_y': np.ptp(window[:, 1]), | |
| 'peak_to_peak_z': np.ptp(window[:, 2]), | |
| 'skewness_x': pd.Series(window[:, 0]).skew(), | |
| 'skewness_y': pd.Series(window[:, 1]).skew(), | |
| 'skewness_z': pd.Series(window[:, 2]).skew(), | |
| 'kurtosis_x': pd.Series(window[:, 0]).kurt(), | |
| 'kurtosis_y': pd.Series(window[:, 1]).kurt(), | |
| 'kurtosis_z': pd.Series(window[:, 2]).kurt(), | |
| 'zero_crossing_rate_x': np.sum(np.diff(np.sign(window[:, 0])) != 0), | |
| 'zero_crossing_rate_y': np.sum(np.diff(np.sign(window[:, 1])) != 0), | |
| 'zero_crossing_rate_z': np.sum(np.diff(np.sign(window[:, 2])) != 0), | |
| 'sma' : np.sum(np.abs(window[:, 0])) + np.sum(np.abs(window[:, 1])) + np.sum(np.abs(window[:, 2])), #Signal Magnitude Area | |
| } | |
| # print(f"Time domain features extracted successfully.") | |
| # Additional features for Magnitude (xyz in one vector) | |
| if self.include_magnitude: | |
| magnitude = self._calculate_magnitude(window) | |
| features['mean_magnitude'] = np.mean(magnitude) | |
| features['std_magnitude'] = np.std(magnitude) | |
| features['variance_magnitude'] = np.var(magnitude) | |
| features['rms_magnitude'] = np.sqrt(np.mean(magnitude**2)) | |
| features['max_magnitude'] = np.max(magnitude) | |
| features['min_magnitude'] = np.min(magnitude) | |
| features['peak_to_peak_magnitude'] = np.ptp(magnitude) | |
| features['skewness_magnitude'] = pd.Series(magnitude).skew() | |
| features['kurtosis_magnitude'] = pd.Series(magnitude).kurt() | |
| features['zero_crossing_rate_magnitude'] = np.sum(np.diff(np.sign(magnitude)) != 0) | |
| # print(f"Additional time domain features for magnitude extracted successfully.") | |
| return features | |
| # Spatial Features | |
| def _extract_spatial_features(self, window): | |
| features = {} | |
| # Euclidean Norm (Magnitude) | |
| magnitude = self._calculate_magnitude(window) | |
| features['euclidean_norm'] = np.mean(magnitude) # or np.linalg.norm for each window | |
| # Tilt Angles (Pitch and Roll) | |
| pitch = np.arctan2(window[:, 1], np.sqrt(window[:, 0]**2 + window[:, 2]**2)) * (180 / np.pi) | |
| roll = np.arctan2(window[:, 0], np.sqrt(window[:, 1]**2 + window[:, 2]**2)) * (180 / np.pi) | |
| features['mean_pitch'] = np.mean(pitch) | |
| features['mean_roll'] = np.mean(roll) | |
| # Correlation between Axes | |
| features['correlation_xy'] = np.corrcoef(window[:, 0], window[:, 1])[0, 1] | |
| features['correlation_xz'] = np.corrcoef(window[:, 0], window[:, 2])[0, 1] | |
| features['correlation_yz'] = np.corrcoef(window[:, 1], window[:, 2])[0, 1] | |
| # print(f"Spatial features extracted successfully.") | |
| return features | |
| # Frequency Domain Features | |
| def _extract_frequency_domain_features(self, window): | |
| n = len(window) | |
| freq_values = np.fft.fftfreq(n, d=1/self.data_frequency)[:n // 2] | |
| fft_values = fft(window, axis=0) | |
| fft_magnitude = np.abs(fft_values)[:n // 2] | |
| features = {} | |
| # Spectral Entropy | |
| def spectral_entropy(signal): | |
| psd = np.square(signal) | |
| psd_norm = psd / np.sum(psd) | |
| return -np.sum(psd_norm * np.log(psd_norm + 1e-10)) | |
| for i, axis in enumerate(['x', 'y', 'z']): | |
| # Dominant Frequency | |
| dominant_frequency = freq_values[np.argmax(fft_magnitude[:, i])] | |
| features[f'dominant_frequency_{axis}'] = dominant_frequency | |
| # Spectral Entropy | |
| entropy = spectral_entropy(fft_magnitude[:, i]) | |
| features[f'spectral_entropy_{axis}'] = entropy | |
| # Power Spectral Density (PSD) and Energy | |
| f, psd_values = welch(window[:, i], fs=self.data_frequency, nperseg=n) | |
| features[f'psd_mean_{axis}'] = np.mean(psd_values) | |
| features[f'energy_{axis}'] = np.sum(psd_values**2) | |
| # Bandwidth (frequency range containing significant portion of the energy) | |
| cumulative_energy = np.cumsum(psd_values) | |
| total_energy = cumulative_energy[-1] | |
| low_cutoff_idx = np.argmax(cumulative_energy > 0.1 * total_energy) | |
| high_cutoff_idx = np.argmax(cumulative_energy > 0.9 * total_energy) | |
| bandwidth = f[high_cutoff_idx] - f[low_cutoff_idx] | |
| features[f'bandwidth_{axis}'] = bandwidth | |
| # Spectral Centroid (Center of mass of the spectrum) | |
| spectral_centroid = np.sum(f * psd_values) / np.sum(psd_values) | |
| features[f'spectral_centroid_{axis}'] = spectral_centroid | |
| if self.include_magnitude: | |
| # Magnitude-based Frequency Domain Features | |
| magnitude = self._calculate_magnitude(window) | |
| fft_magnitude_mag = np.abs(fft(magnitude))[:n // 2] | |
| # Dominant Frequency for Magnitude | |
| features['dominant_frequency_magnitude'] = freq_values[np.argmax(fft_magnitude_mag)] | |
| # Spectral Entropy for Magnitude | |
| features['spectral_entropy_magnitude'] = spectral_entropy(fft_magnitude_mag) | |
| # Power Spectral Density and Energy for Magnitude | |
| f, psd_values_mag = welch(magnitude, fs=self.data_frequency, nperseg=n) | |
| features['psd_mean_magnitude'] = np.mean(psd_values_mag) | |
| features['energy_magnitude'] = np.sum(psd_values_mag**2) | |
| # Bandwidth for Magnitude | |
| cumulative_energy_mag = np.cumsum(psd_values_mag) | |
| total_energy_mag = cumulative_energy_mag[-1] | |
| low_cutoff_idx_mag = np.argmax(cumulative_energy_mag > 0.1 * total_energy_mag) | |
| high_cutoff_idx_mag = np.argmax(cumulative_energy_mag > 0.9 * total_energy_mag) | |
| bandwidth_mag = f[high_cutoff_idx_mag] - f[low_cutoff_idx_mag] | |
| features['bandwidth_magnitude'] = bandwidth_mag | |
| # Spectral Centroid for Magnitude | |
| features['spectral_centroid_magnitude'] = np.sum(f * psd_values_mag) / np.sum(psd_values_mag) | |
| # print(f"Frequency domain features extracted successfully.") | |
| return features | |
| def _extract_statistical_features(self, window): | |
| features = { | |
| '25th_percentile_x': np.percentile(window[:, 0], 25), | |
| '25th_percentile_y': np.percentile(window[:, 1], 25), | |
| '25th_percentile_z': np.percentile(window[:, 2], 25), | |
| '75th_percentile_x': np.percentile(window[:, 0], 75), | |
| '75th_percentile_y': np.percentile(window[:, 1], 75), | |
| '75th_percentile_z': np.percentile(window[:, 2], 75), | |
| } | |
| if self.include_magnitude: | |
| magnitude = self._calculate_magnitude(window) | |
| features['25th_percentile_magnitude'] = np.percentile(magnitude, 25) | |
| features['75th_percentile_magnitude'] = np.percentile(magnitude, 75) | |
| # print(f"Statistical features extracted successfully.") | |
| return features | |
| def _extract_wavelet_features(self, window, wavelet='db1'): | |
| coeffs = pywt.wavedec(window, wavelet, axis=0, level=3) | |
| features = { | |
| 'wavelet_energy_approx_x': np.sum(coeffs[0][:, 0]**2), | |
| 'wavelet_energy_approx_y': np.sum(coeffs[0][:, 1]**2), | |
| 'wavelet_energy_approx_z': np.sum(coeffs[0][:, 2]**2), | |
| } | |
| if self.include_magnitude: | |
| magnitude = self._calculate_magnitude(window) | |
| coeffs_magnitude = pywt.wavedec(magnitude, wavelet, level=3) | |
| features['wavelet_energy_approx_magnitude'] = np.sum(coeffs_magnitude[0]**2) | |
| # print(f"Wavelet features extracted successfully.") | |
| return features |