|
|
import pandas as pd |
|
|
import numpy as np |
|
|
from sklearn.base import BaseEstimator, TransformerMixin |
|
|
from scipy.fftpack import fft |
|
|
from scipy.signal import welch |
|
|
import pywt |
|
|
from _config import config |
|
|
|
|
|
class ExtractFeatures(BaseEstimator, TransformerMixin): |
|
|
def __init__(self, window_length, window_step_size, data_frequency, selected_domains=None, include_magnitude=False, features_label_columns=None): |
|
|
self.window_length = window_length |
|
|
self.window_step_size = window_step_size |
|
|
self.data_frequency = data_frequency |
|
|
self.selected_domains = selected_domains |
|
|
self.include_magnitude = include_magnitude |
|
|
self.features_label_columns = features_label_columns |
|
|
|
|
|
def fit(self, X, y=None): |
|
|
return self |
|
|
|
|
|
def transform(self, X): |
|
|
features_list = [] |
|
|
|
|
|
if 'groupid' in X.columns: |
|
|
for groupid in X['groupid'].unique(): |
|
|
temp = X[X['groupid'] == groupid] |
|
|
temp_ex = temp[['accel_time', 'x', 'y', 'z']].copy() |
|
|
windows = self._window_data(temp_ex[['x', 'y', 'z']]) |
|
|
|
|
|
for window in windows: |
|
|
features = self._extract_features_from_window(window) |
|
|
features['groupid'] = groupid |
|
|
|
|
|
|
|
|
for label in self.features_label_columns: |
|
|
features[label] = temp[label].iloc[0] |
|
|
|
|
|
features_list.append(pd.DataFrame([features])) |
|
|
|
|
|
else: |
|
|
windows = self._window_data(X[['x', 'y', 'z']]) |
|
|
for window in windows: |
|
|
features = self._extract_features_from_window(window) |
|
|
features_list.append(pd.DataFrame([features])) |
|
|
|
|
|
all_features = pd.concat(features_list, ignore_index=True) |
|
|
|
|
|
|
|
|
window_length_str = str(self.window_length) |
|
|
window_step_size_str = str(self.window_step_size) |
|
|
if self.selected_domains is None: |
|
|
domain_str = "all_features" |
|
|
else: |
|
|
domain_str = "_".join(self.selected_domains) |
|
|
file_name = f"features_window_{window_length_str}_step_{window_step_size_str}_{domain_str}.csv" |
|
|
all_features.to_csv(file_name, index=False) |
|
|
|
|
|
print("All features extracted successfully.") |
|
|
return all_features |
|
|
|
|
|
|
|
|
def _calculate_magnitude(self, window): |
|
|
return np.sqrt(window[:, 0]**2 + window[:, 1]**2 + window[:, 2]**2) |
|
|
|
|
|
def _window_data(self, data): |
|
|
window_samples = int(self.window_length * self.data_frequency) |
|
|
step_samples = int(self.window_step_size * self.data_frequency) |
|
|
windows = [data[i:i + window_samples] for i in range(0, len(data) - window_samples + 1, step_samples)] |
|
|
return np.array(windows) |
|
|
|
|
|
def _extract_features_from_window(self, window): |
|
|
all_features = {} |
|
|
|
|
|
if self.selected_domains is None or 'time_domain' in self.selected_domains: |
|
|
all_features.update(self._extract_time_domain_features(window)) |
|
|
|
|
|
if self.selected_domains is None or 'spatial' in self.selected_domains: |
|
|
all_features.update(self._extract_spatial_features(window)) |
|
|
|
|
|
if self.selected_domains is None or 'frequency' in self.selected_domains: |
|
|
all_features.update(self._extract_frequency_domain_features(window)) |
|
|
|
|
|
if self.selected_domains is None or 'statistical' in self.selected_domains: |
|
|
all_features.update(self._extract_statistical_features(window)) |
|
|
|
|
|
if self.selected_domains is None or 'wavelet' in self.selected_domains: |
|
|
all_features.update(self._extract_wavelet_features(window)) |
|
|
|
|
|
return all_features |
|
|
|
|
|
def _extract_time_domain_features(self, window): |
|
|
features = { |
|
|
'mean_x': np.mean(window[:, 0]), |
|
|
'mean_y': np.mean(window[:, 1]), |
|
|
'mean_z': np.mean(window[:, 2]), |
|
|
'std_x': np.std(window[:, 0]), |
|
|
'std_y': np.std(window[:, 1]), |
|
|
'std_z': np.std(window[:, 2]), |
|
|
'variance_x': np.var(window[:, 0]), |
|
|
'variance_y': np.var(window[:, 1]), |
|
|
'variance_z': np.var(window[:, 2]), |
|
|
'rms_x': np.sqrt(np.mean(window[:, 0]**2)), |
|
|
'rms_y': np.sqrt(np.mean(window[:, 1]**2)), |
|
|
'rms_z': np.sqrt(np.mean(window[:, 2]**2)), |
|
|
'max_x': np.max(window[:, 0]), |
|
|
'max_y': np.max(window[:, 1]), |
|
|
'max_z': np.max(window[:, 2]), |
|
|
'min_x': np.min(window[:, 0]), |
|
|
'min_y': np.min(window[:, 1]), |
|
|
'min_z': np.min(window[:, 2]), |
|
|
'peak_to_peak_x': np.ptp(window[:, 0]), |
|
|
'peak_to_peak_y': np.ptp(window[:, 1]), |
|
|
'peak_to_peak_z': np.ptp(window[:, 2]), |
|
|
'skewness_x': pd.Series(window[:, 0]).skew(), |
|
|
'skewness_y': pd.Series(window[:, 1]).skew(), |
|
|
'skewness_z': pd.Series(window[:, 2]).skew(), |
|
|
'kurtosis_x': pd.Series(window[:, 0]).kurt(), |
|
|
'kurtosis_y': pd.Series(window[:, 1]).kurt(), |
|
|
'kurtosis_z': pd.Series(window[:, 2]).kurt(), |
|
|
'zero_crossing_rate_x': np.sum(np.diff(np.sign(window[:, 0])) != 0), |
|
|
'zero_crossing_rate_y': np.sum(np.diff(np.sign(window[:, 1])) != 0), |
|
|
'zero_crossing_rate_z': np.sum(np.diff(np.sign(window[:, 2])) != 0), |
|
|
'sma' : np.sum(np.abs(window[:, 0])) + np.sum(np.abs(window[:, 1])) + np.sum(np.abs(window[:, 2])), |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if self.include_magnitude: |
|
|
magnitude = self._calculate_magnitude(window) |
|
|
features['mean_magnitude'] = np.mean(magnitude) |
|
|
features['std_magnitude'] = np.std(magnitude) |
|
|
features['variance_magnitude'] = np.var(magnitude) |
|
|
features['rms_magnitude'] = np.sqrt(np.mean(magnitude**2)) |
|
|
features['max_magnitude'] = np.max(magnitude) |
|
|
features['min_magnitude'] = np.min(magnitude) |
|
|
features['peak_to_peak_magnitude'] = np.ptp(magnitude) |
|
|
features['skewness_magnitude'] = pd.Series(magnitude).skew() |
|
|
features['kurtosis_magnitude'] = pd.Series(magnitude).kurt() |
|
|
features['zero_crossing_rate_magnitude'] = np.sum(np.diff(np.sign(magnitude)) != 0) |
|
|
|
|
|
|
|
|
return features |
|
|
|
|
|
|
|
|
def _extract_spatial_features(self, window): |
|
|
features = {} |
|
|
|
|
|
|
|
|
magnitude = self._calculate_magnitude(window) |
|
|
features['euclidean_norm'] = np.mean(magnitude) |
|
|
|
|
|
|
|
|
pitch = np.arctan2(window[:, 1], np.sqrt(window[:, 0]**2 + window[:, 2]**2)) * (180 / np.pi) |
|
|
roll = np.arctan2(window[:, 0], np.sqrt(window[:, 1]**2 + window[:, 2]**2)) * (180 / np.pi) |
|
|
features['mean_pitch'] = np.mean(pitch) |
|
|
features['mean_roll'] = np.mean(roll) |
|
|
|
|
|
|
|
|
features['correlation_xy'] = np.corrcoef(window[:, 0], window[:, 1])[0, 1] |
|
|
features['correlation_xz'] = np.corrcoef(window[:, 0], window[:, 2])[0, 1] |
|
|
features['correlation_yz'] = np.corrcoef(window[:, 1], window[:, 2])[0, 1] |
|
|
|
|
|
|
|
|
return features |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _extract_frequency_domain_features(self, window): |
|
|
n = len(window) |
|
|
freq_values = np.fft.fftfreq(n, d=1/self.data_frequency)[:n // 2] |
|
|
fft_values = fft(window, axis=0) |
|
|
fft_magnitude = np.abs(fft_values)[:n // 2] |
|
|
|
|
|
features = {} |
|
|
|
|
|
|
|
|
def spectral_entropy(signal): |
|
|
psd = np.square(signal) |
|
|
psd_norm = psd / np.sum(psd) |
|
|
return -np.sum(psd_norm * np.log(psd_norm + 1e-10)) |
|
|
|
|
|
for i, axis in enumerate(['x', 'y', 'z']): |
|
|
|
|
|
dominant_frequency = freq_values[np.argmax(fft_magnitude[:, i])] |
|
|
features[f'dominant_frequency_{axis}'] = dominant_frequency |
|
|
|
|
|
|
|
|
entropy = spectral_entropy(fft_magnitude[:, i]) |
|
|
features[f'spectral_entropy_{axis}'] = entropy |
|
|
|
|
|
|
|
|
f, psd_values = welch(window[:, i], fs=self.data_frequency, nperseg=n) |
|
|
features[f'psd_mean_{axis}'] = np.mean(psd_values) |
|
|
features[f'energy_{axis}'] = np.sum(psd_values**2) |
|
|
|
|
|
|
|
|
cumulative_energy = np.cumsum(psd_values) |
|
|
total_energy = cumulative_energy[-1] |
|
|
low_cutoff_idx = np.argmax(cumulative_energy > 0.1 * total_energy) |
|
|
high_cutoff_idx = np.argmax(cumulative_energy > 0.9 * total_energy) |
|
|
bandwidth = f[high_cutoff_idx] - f[low_cutoff_idx] |
|
|
features[f'bandwidth_{axis}'] = bandwidth |
|
|
|
|
|
|
|
|
spectral_centroid = np.sum(f * psd_values) / np.sum(psd_values) |
|
|
features[f'spectral_centroid_{axis}'] = spectral_centroid |
|
|
|
|
|
if self.include_magnitude: |
|
|
|
|
|
magnitude = self._calculate_magnitude(window) |
|
|
fft_magnitude_mag = np.abs(fft(magnitude))[:n // 2] |
|
|
|
|
|
|
|
|
features['dominant_frequency_magnitude'] = freq_values[np.argmax(fft_magnitude_mag)] |
|
|
|
|
|
|
|
|
features['spectral_entropy_magnitude'] = spectral_entropy(fft_magnitude_mag) |
|
|
|
|
|
|
|
|
f, psd_values_mag = welch(magnitude, fs=self.data_frequency, nperseg=n) |
|
|
features['psd_mean_magnitude'] = np.mean(psd_values_mag) |
|
|
features['energy_magnitude'] = np.sum(psd_values_mag**2) |
|
|
|
|
|
|
|
|
cumulative_energy_mag = np.cumsum(psd_values_mag) |
|
|
total_energy_mag = cumulative_energy_mag[-1] |
|
|
low_cutoff_idx_mag = np.argmax(cumulative_energy_mag > 0.1 * total_energy_mag) |
|
|
high_cutoff_idx_mag = np.argmax(cumulative_energy_mag > 0.9 * total_energy_mag) |
|
|
bandwidth_mag = f[high_cutoff_idx_mag] - f[low_cutoff_idx_mag] |
|
|
features['bandwidth_magnitude'] = bandwidth_mag |
|
|
|
|
|
|
|
|
features['spectral_centroid_magnitude'] = np.sum(f * psd_values_mag) / np.sum(psd_values_mag) |
|
|
|
|
|
|
|
|
return features |
|
|
|
|
|
|
|
|
def _extract_statistical_features(self, window): |
|
|
features = { |
|
|
'25th_percentile_x': np.percentile(window[:, 0], 25), |
|
|
'25th_percentile_y': np.percentile(window[:, 1], 25), |
|
|
'25th_percentile_z': np.percentile(window[:, 2], 25), |
|
|
'75th_percentile_x': np.percentile(window[:, 0], 75), |
|
|
'75th_percentile_y': np.percentile(window[:, 1], 75), |
|
|
'75th_percentile_z': np.percentile(window[:, 2], 75), |
|
|
} |
|
|
|
|
|
if self.include_magnitude: |
|
|
magnitude = self._calculate_magnitude(window) |
|
|
features['25th_percentile_magnitude'] = np.percentile(magnitude, 25) |
|
|
features['75th_percentile_magnitude'] = np.percentile(magnitude, 75) |
|
|
|
|
|
|
|
|
return features |
|
|
|
|
|
def _extract_wavelet_features(self, window, wavelet='db1'): |
|
|
coeffs = pywt.wavedec(window, wavelet, axis=0, level=3) |
|
|
features = { |
|
|
'wavelet_energy_approx_x': np.sum(coeffs[0][:, 0]**2), |
|
|
'wavelet_energy_approx_y': np.sum(coeffs[0][:, 1]**2), |
|
|
'wavelet_energy_approx_z': np.sum(coeffs[0][:, 2]**2), |
|
|
} |
|
|
|
|
|
if self.include_magnitude: |
|
|
magnitude = self._calculate_magnitude(window) |
|
|
coeffs_magnitude = pywt.wavedec(magnitude, wavelet, level=3) |
|
|
features['wavelet_energy_approx_magnitude'] = np.sum(coeffs_magnitude[0]**2) |
|
|
|
|
|
|
|
|
return features |