|
|
""" |
|
|
数据预处理类实现 |
|
|
Data preprocessor implementation for emotion and physiological state data |
|
|
""" |
|
|
|
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from typing import Union, Tuple, Optional, Dict, Any, List |
|
|
from pathlib import Path |
|
|
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler |
|
|
from sklearn.impute import SimpleImputer, KNNImputer |
|
|
from sklearn.ensemble import IsolationForest |
|
|
from scipy import stats |
|
|
import warnings |
|
|
from loguru import logger |
|
|
|
|
|
class DataPreprocessor: |
|
|
""" |
|
|
数据预处理器 |
|
|
Data preprocessor for emotion and physiological state data |
|
|
""" |
|
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None): |
|
|
""" |
|
|
初始化数据预处理器 |
|
|
|
|
|
Args: |
|
|
config: 配置字典 |
|
|
""" |
|
|
self.config = config or self._get_default_config() |
|
|
|
|
|
|
|
|
self.feature_scalers = {} |
|
|
self.label_scalers = {} |
|
|
|
|
|
|
|
|
self.imputers = {} |
|
|
self.outlier_detector = None |
|
|
|
|
|
|
|
|
self.feature_columns = [ |
|
|
'user_pad_p', 'user_pad_a', 'user_pad_d', |
|
|
'vitality', |
|
|
'ai_current_pad_p', 'ai_current_pad_a', 'ai_current_pad_d' |
|
|
] |
|
|
|
|
|
self.label_columns = [ |
|
|
'ai_delta_p', 'ai_delta_a', 'ai_delta_d' |
|
|
|
|
|
|
|
|
|
|
|
] |
|
|
|
|
|
|
|
|
self.feature_stats = {} |
|
|
self.label_stats = {} |
|
|
|
|
|
logger.info("Data preprocessor initialized") |
|
|
|
|
|
def _get_default_config(self) -> Dict[str, Any]: |
|
|
"""获取默认配置""" |
|
|
return { |
|
|
|
|
|
'feature_scaling': { |
|
|
'method': 'standard', |
|
|
'pad_features': 'standard', |
|
|
'vitality_feature': 'min_max' |
|
|
}, |
|
|
|
|
|
|
|
|
'label_scaling': { |
|
|
'method': 'standard', |
|
|
'delta_pad': 'standard' |
|
|
}, |
|
|
|
|
|
|
|
|
'missing_values': { |
|
|
'strategy': 'mean', |
|
|
'fill_value': None, |
|
|
'knn_neighbors': 5 |
|
|
}, |
|
|
|
|
|
|
|
|
'outliers': { |
|
|
'method': 'isolation_forest', |
|
|
'contamination': 0.1, |
|
|
'z_threshold': 3.0, |
|
|
'iqr_factor': 1.5 |
|
|
}, |
|
|
|
|
|
|
|
|
'validation': { |
|
|
'check_ranges': True, |
|
|
'check_nan': True, |
|
|
'check_inf': True, |
|
|
'strict_mode': False |
|
|
}, |
|
|
|
|
|
|
|
|
'pad_ranges': { |
|
|
'min': -1.0, |
|
|
'max': 1.0 |
|
|
}, |
|
|
|
|
|
|
|
|
'vitality_ranges': { |
|
|
'min': 0.0, |
|
|
'max': 100.0 |
|
|
}, |
|
|
|
|
|
|
|
|
'confidence_ranges': { |
|
|
'min': 0.0, |
|
|
'max': 1.0 |
|
|
} |
|
|
} |
|
|
|
|
|
def fit( |
|
|
self, |
|
|
features: Union[np.ndarray, pd.DataFrame], |
|
|
labels: Optional[Union[np.ndarray, pd.DataFrame]] = None, |
|
|
feature_columns: Optional[List[str]] = None, |
|
|
label_columns: Optional[List[str]] = None |
|
|
) -> 'DataPreprocessor': |
|
|
""" |
|
|
拟合预处理器 |
|
|
|
|
|
Args: |
|
|
features: 特征数据 |
|
|
labels: 标签数据 |
|
|
feature_columns: 特征列名 |
|
|
label_columns: 标签列名 |
|
|
|
|
|
Returns: |
|
|
自身实例 |
|
|
""" |
|
|
|
|
|
features = self._to_dataframe(features, feature_columns or self.feature_columns) |
|
|
|
|
|
if labels is not None: |
|
|
labels = self._to_dataframe(labels, label_columns or self.label_columns) |
|
|
|
|
|
|
|
|
self._validate_data(features, labels, fit_mode=True) |
|
|
|
|
|
|
|
|
features_clean = self._handle_missing_values(features, fit_mode=True) |
|
|
|
|
|
if labels is not None: |
|
|
labels_clean = self._handle_missing_values(labels, fit_mode=True, is_label=True) |
|
|
else: |
|
|
labels_clean = None |
|
|
|
|
|
|
|
|
if self.config['outliers']['method'] != 'none': |
|
|
features_clean, labels_clean = self._detect_outliers( |
|
|
features_clean, labels_clean, fit_mode=True |
|
|
) |
|
|
|
|
|
|
|
|
self._compute_statistics(features_clean, labels_clean) |
|
|
|
|
|
|
|
|
self._fit_scalers(features_clean, labels_clean) |
|
|
|
|
|
logger.info("Preprocessor fitted successfully") |
|
|
return self |
|
|
|
|
|
def transform( |
|
|
self, |
|
|
features: Union[np.ndarray, pd.DataFrame], |
|
|
labels: Optional[Union[np.ndarray, pd.DataFrame]] = None, |
|
|
feature_columns: Optional[List[str]] = None, |
|
|
label_columns: Optional[List[str]] = None |
|
|
) -> Tuple[np.ndarray, Optional[np.ndarray]]: |
|
|
""" |
|
|
转换数据 |
|
|
|
|
|
Args: |
|
|
features: 特征数据 |
|
|
labels: 标签数据 |
|
|
feature_columns: 特征列名 |
|
|
label_columns: 标签列名 |
|
|
|
|
|
Returns: |
|
|
转换后的特征和标签 |
|
|
""" |
|
|
|
|
|
features = self._to_dataframe(features, feature_columns or self.feature_columns) |
|
|
|
|
|
if labels is not None: |
|
|
labels = self._to_dataframe(labels, label_columns or self.label_columns) |
|
|
|
|
|
|
|
|
self._validate_data(features, labels, fit_mode=False) |
|
|
|
|
|
|
|
|
features_clean = self._handle_missing_values(features, fit_mode=False) |
|
|
|
|
|
if labels is not None: |
|
|
labels_clean = self._handle_missing_values(labels, fit_mode=False, is_label=True) |
|
|
else: |
|
|
labels_clean = None |
|
|
|
|
|
|
|
|
features_scaled = self._scale_features(features_clean) |
|
|
|
|
|
if labels_clean is not None: |
|
|
labels_scaled = self._scale_labels(labels_clean) |
|
|
else: |
|
|
labels_scaled = None |
|
|
|
|
|
logger.info(f"Data transformed: {len(features_scaled)} samples") |
|
|
return features_scaled, labels_scaled |
|
|
|
|
|
def fit_transform( |
|
|
self, |
|
|
features: Union[np.ndarray, pd.DataFrame], |
|
|
labels: Optional[Union[np.ndarray, pd.DataFrame]] = None, |
|
|
feature_columns: Optional[List[str]] = None, |
|
|
label_columns: Optional[List[str]] = None |
|
|
) -> Tuple[np.ndarray, Optional[np.ndarray]]: |
|
|
""" |
|
|
拟合并转换数据 |
|
|
|
|
|
Args: |
|
|
features: 特征数据 |
|
|
labels: 标签数据 |
|
|
feature_columns: 特征列名 |
|
|
label_columns: 标签列名 |
|
|
|
|
|
Returns: |
|
|
转换后的特征和标签 |
|
|
""" |
|
|
return self.fit(features, labels, feature_columns, label_columns).transform( |
|
|
features, labels, feature_columns, label_columns |
|
|
) |
|
|
|
|
|
def inverse_transform_labels( |
|
|
self, |
|
|
labels: Union[np.ndarray, pd.DataFrame], |
|
|
label_columns: Optional[List[str]] = None |
|
|
) -> np.ndarray: |
|
|
""" |
|
|
反转换标签 |
|
|
|
|
|
Args: |
|
|
labels: 标准化的标签数据 |
|
|
label_columns: 标签列名 |
|
|
|
|
|
Returns: |
|
|
反转换后的标签 |
|
|
""" |
|
|
labels = self._to_dataframe(labels, label_columns or self.label_columns) |
|
|
|
|
|
if not self.label_scalers: |
|
|
raise ValueError("Label scalers not fitted. Call fit() first.") |
|
|
|
|
|
return self._inverse_scale_labels(labels) |
|
|
|
|
|
def _to_dataframe( |
|
|
self, |
|
|
data: Union[np.ndarray, pd.DataFrame], |
|
|
columns: List[str] |
|
|
) -> pd.DataFrame: |
|
|
"""转换为DataFrame""" |
|
|
if isinstance(data, pd.DataFrame): |
|
|
return data[columns].copy() |
|
|
elif isinstance(data, np.ndarray): |
|
|
if data.shape[1] != len(columns): |
|
|
raise ValueError(f"Expected {len(columns)} columns, got {data.shape[1]}") |
|
|
return pd.DataFrame(data, columns=columns) |
|
|
else: |
|
|
raise ValueError(f"Unsupported data type: {type(data)}") |
|
|
|
|
|
def _validate_data( |
|
|
self, |
|
|
features: pd.DataFrame, |
|
|
labels: Optional[pd.DataFrame], |
|
|
fit_mode: bool = False |
|
|
): |
|
|
"""验证数据""" |
|
|
validation_config = self.config['validation'] |
|
|
|
|
|
|
|
|
if features.shape[1] != 10: |
|
|
raise ValueError(f"Expected 10 feature columns, got {features.shape[1]}") |
|
|
|
|
|
if labels is not None and labels.shape[1] != 3: |
|
|
raise ValueError(f"Expected 3 label columns (ΔPAD), got {labels.shape[1]}") |
|
|
|
|
|
|
|
|
if validation_config['check_nan']: |
|
|
if features.isnull().any().any(): |
|
|
if validation_config['strict_mode']: |
|
|
raise ValueError("Found NaN values in features") |
|
|
else: |
|
|
logger.warning("Found NaN values in features") |
|
|
|
|
|
if labels is not None and labels.isnull().any().any(): |
|
|
if validation_config['strict_mode']: |
|
|
raise ValueError("Found NaN values in labels") |
|
|
else: |
|
|
logger.warning("Found NaN values in labels") |
|
|
|
|
|
|
|
|
if validation_config['check_inf']: |
|
|
if np.isinf(features.values).any(): |
|
|
raise ValueError("Found infinite values in features") |
|
|
|
|
|
if labels is not None and np.isinf(labels.values).any(): |
|
|
raise ValueError("Found infinite values in labels") |
|
|
|
|
|
|
|
|
if validation_config['check_ranges']: |
|
|
self._check_data_ranges(features, labels, fit_mode) |
|
|
|
|
|
def _check_data_ranges( |
|
|
self, |
|
|
features: pd.DataFrame, |
|
|
labels: Optional[pd.DataFrame], |
|
|
fit_mode: bool |
|
|
): |
|
|
"""检查数据范围""" |
|
|
pad_ranges = self.config['pad_ranges'] |
|
|
vitality_ranges = self.config['vitality_ranges'] |
|
|
confidence_ranges = self.config['confidence_ranges'] |
|
|
|
|
|
|
|
|
pad_columns = [col for col in features.columns if 'pad' in col.lower() or 'pleasure' in col.lower() |
|
|
or 'arousal' in col.lower() or 'dominance' in col.lower()] |
|
|
|
|
|
for col in pad_columns: |
|
|
values = features[col].values |
|
|
out_of_range = np.sum((values < pad_ranges['min'] - 0.5) | |
|
|
(values > pad_ranges['max'] + 0.5)) |
|
|
if out_of_range > 0: |
|
|
if fit_mode: |
|
|
logger.warning(f"Found {out_of_range} PAD values outside expected range in column {col}") |
|
|
else: |
|
|
logger.warning(f"Found {out_of_range} PAD values outside expected range in column {col}") |
|
|
|
|
|
|
|
|
if 'vitality' in features.columns: |
|
|
vitality_values = features['vitality'].values |
|
|
out_of_range = np.sum((vitality_values < vitality_ranges['min'] - 10) | |
|
|
(vitality_values > vitality_ranges['max'] + 10)) |
|
|
if out_of_range > 0: |
|
|
logger.warning(f"Found {out_of_range} vitality values outside expected range") |
|
|
|
|
|
|
|
|
if labels is not None and 'confidence' in labels.columns: |
|
|
confidence_values = labels['confidence'].values |
|
|
out_of_range = np.sum((confidence_values < confidence_ranges['min'] - 0.1) | |
|
|
(confidence_values > confidence_ranges['max'] + 0.1)) |
|
|
if out_of_range > 0: |
|
|
logger.warning(f"Found {out_of_range} confidence values outside expected range") |
|
|
|
|
|
def _handle_missing_values( |
|
|
self, |
|
|
data: pd.DataFrame, |
|
|
fit_mode: bool = False, |
|
|
is_label: bool = False |
|
|
) -> pd.DataFrame: |
|
|
"""处理缺失值""" |
|
|
if not data.isnull().any().any(): |
|
|
return data |
|
|
|
|
|
missing_config = self.config['missing_values'] |
|
|
strategy = missing_config['strategy'] |
|
|
|
|
|
if is_label: |
|
|
|
|
|
strategy = 'mean' |
|
|
|
|
|
data_clean = data.copy() |
|
|
|
|
|
if strategy in ['mean', 'median', 'most_frequent']: |
|
|
imputer_key = f"{'label' if is_label else 'feature'}_{strategy}" |
|
|
|
|
|
if fit_mode: |
|
|
self.imputers[imputer_key] = SimpleImputer(strategy=strategy) |
|
|
data_clean[:] = self.imputers[imputer_key].fit_transform(data_clean) |
|
|
else: |
|
|
if imputer_key not in self.imputers: |
|
|
raise ValueError(f"Imputer not fitted for strategy: {strategy}") |
|
|
data_clean[:] = self.imputers[imputer_key].transform(data_clean) |
|
|
|
|
|
elif strategy == 'constant': |
|
|
fill_value = missing_config['fill_value'] or 0 |
|
|
data_clean = data_clean.fillna(fill_value) |
|
|
|
|
|
elif strategy == 'knn': |
|
|
imputer_key = f"{'label' if is_label else 'feature'}_knn" |
|
|
|
|
|
if fit_mode: |
|
|
n_neighbors = missing_config['knn_neighbors'] |
|
|
self.imputers[imputer_key] = KNNImputer(n_neighbors=n_neighbors) |
|
|
data_clean[:] = self.imputers[imputer_key].fit_transform(data_clean) |
|
|
else: |
|
|
if imputer_key not in self.imputers: |
|
|
raise ValueError("KNN imputer not fitted") |
|
|
data_clean[:] = self.imputers[imputer_key].transform(data_clean) |
|
|
|
|
|
logger.info(f"Handled missing values using strategy: {strategy}") |
|
|
return data_clean |
|
|
|
|
|
def _detect_outliers( |
|
|
self, |
|
|
features: pd.DataFrame, |
|
|
labels: Optional[pd.DataFrame], |
|
|
fit_mode: bool = False |
|
|
) -> Tuple[pd.DataFrame, Optional[pd.DataFrame]]: |
|
|
"""检测和处理异常值""" |
|
|
method = self.config['outliers']['method'] |
|
|
|
|
|
if method == 'none': |
|
|
return features, labels |
|
|
|
|
|
if method == 'isolation_forest': |
|
|
return self._detect_outliers_isolation_forest(features, labels, fit_mode) |
|
|
elif method == 'z_score': |
|
|
return self._detect_outliers_z_score(features, labels) |
|
|
elif method == 'iqr': |
|
|
return self._detect_outliers_iqr(features, labels) |
|
|
else: |
|
|
raise ValueError(f"Unknown outlier detection method: {method}") |
|
|
|
|
|
def _detect_outliers_isolation_forest( |
|
|
self, |
|
|
features: pd.DataFrame, |
|
|
labels: Optional[pd.DataFrame], |
|
|
fit_mode: bool = False |
|
|
) -> Tuple[pd.DataFrame, Optional[pd.DataFrame]]: |
|
|
"""使用Isolation Forest检测异常值""" |
|
|
contamination = self.config['outliers']['contamination'] |
|
|
|
|
|
if fit_mode: |
|
|
self.outlier_detector = IsolationForest( |
|
|
contamination=contamination, |
|
|
random_state=42 |
|
|
) |
|
|
outlier_labels = self.outlier_detector.fit_predict(features.values) |
|
|
else: |
|
|
if self.outlier_detector is None: |
|
|
raise ValueError("Outlier detector not fitted") |
|
|
outlier_labels = self.outlier_detector.predict(features.values) |
|
|
|
|
|
|
|
|
normal_mask = outlier_labels == 1 |
|
|
features_clean = features[normal_mask] |
|
|
|
|
|
if labels is not None: |
|
|
labels_clean = labels[normal_mask] |
|
|
else: |
|
|
labels_clean = None |
|
|
|
|
|
num_outliers = np.sum(outlier_labels == -1) |
|
|
logger.info(f"Detected and removed {num_outliers} outliers using Isolation Forest") |
|
|
|
|
|
return features_clean, labels_clean |
|
|
|
|
|
def _detect_outliers_z_score( |
|
|
self, |
|
|
features: pd.DataFrame, |
|
|
labels: Optional[pd.DataFrame] |
|
|
) -> Tuple[pd.DataFrame, Optional[pd.DataFrame]]: |
|
|
"""使用Z-score检测异常值""" |
|
|
threshold = self.config['outliers']['z_threshold'] |
|
|
|
|
|
z_scores = np.abs(stats.zscore(features.values)) |
|
|
normal_mask = np.all(z_scores < threshold, axis=1) |
|
|
|
|
|
features_clean = features[normal_mask] |
|
|
|
|
|
if labels is not None: |
|
|
labels_clean = labels[normal_mask] |
|
|
else: |
|
|
labels_clean = None |
|
|
|
|
|
num_outliers = np.sum(~normal_mask) |
|
|
logger.info(f"Detected and removed {num_outliers} outliers using Z-score") |
|
|
|
|
|
return features_clean, labels_clean |
|
|
|
|
|
def _detect_outliers_iqr( |
|
|
self, |
|
|
features: pd.DataFrame, |
|
|
labels: Optional[pd.DataFrame] |
|
|
) -> Tuple[pd.DataFrame, Optional[pd.DataFrame]]: |
|
|
"""使用IQR方法检测异常值""" |
|
|
factor = self.config['outliers']['iqr_factor'] |
|
|
|
|
|
Q1 = features.quantile(0.25) |
|
|
Q3 = features.quantile(0.75) |
|
|
IQR = Q3 - Q1 |
|
|
|
|
|
lower_bound = Q1 - factor * IQR |
|
|
upper_bound = Q3 + factor * IQR |
|
|
|
|
|
normal_mask = ~((features < lower_bound) | (features > upper_bound)).any(axis=1) |
|
|
|
|
|
features_clean = features[normal_mask] |
|
|
|
|
|
if labels is not None: |
|
|
labels_clean = labels[normal_mask] |
|
|
else: |
|
|
labels_clean = None |
|
|
|
|
|
num_outliers = np.sum(~normal_mask) |
|
|
logger.info(f"Detected and removed {num_outliers} outliers using IQR method") |
|
|
|
|
|
return features_clean, labels_clean |
|
|
|
|
|
def _compute_statistics( |
|
|
self, |
|
|
features: pd.DataFrame, |
|
|
labels: Optional[pd.DataFrame] |
|
|
): |
|
|
"""计算统计信息""" |
|
|
|
|
|
self.feature_stats = { |
|
|
'mean': features.mean(), |
|
|
'std': features.std(), |
|
|
'min': features.min(), |
|
|
'max': features.max(), |
|
|
'median': features.median(), |
|
|
'q25': features.quantile(0.25), |
|
|
'q75': features.quantile(0.75) |
|
|
} |
|
|
|
|
|
|
|
|
if labels is not None: |
|
|
self.label_stats = { |
|
|
'mean': labels.mean(), |
|
|
'std': labels.std(), |
|
|
'min': labels.min(), |
|
|
'max': labels.max(), |
|
|
'median': labels.median(), |
|
|
'q25': labels.quantile(0.25), |
|
|
'q75': labels.quantile(0.75) |
|
|
} |
|
|
|
|
|
logger.info("Statistics computed") |
|
|
|
|
|
def _fit_scalers( |
|
|
self, |
|
|
features: pd.DataFrame, |
|
|
labels: Optional[pd.DataFrame] |
|
|
): |
|
|
"""拟合标准化器""" |
|
|
feature_config = self.config['feature_scaling'] |
|
|
|
|
|
|
|
|
pad_columns = [col for col in features.columns if any(pad in col.lower() |
|
|
for pad in ['pleasure', 'arousal', 'dominance'])] |
|
|
|
|
|
if pad_columns: |
|
|
method = feature_config.get('pad_features', 'standard') |
|
|
if method != 'none': |
|
|
self.feature_scalers['pad'] = self._create_scaler(method) |
|
|
self.feature_scalers['pad'].fit(features[pad_columns]) |
|
|
|
|
|
|
|
|
if 'vitality' in features.columns: |
|
|
method = feature_config.get('vitality_feature', 'min_max') |
|
|
if method != 'none': |
|
|
self.feature_scalers['vitality'] = self._create_scaler(method) |
|
|
self.feature_scalers['vitality'].fit(features[['vitality']]) |
|
|
|
|
|
|
|
|
if labels is not None: |
|
|
label_config = self.config['label_scaling'] |
|
|
|
|
|
|
|
|
delta_pad_columns = [col for col in labels.columns if 'delta_' in col and |
|
|
'pad' in col or any(pad in col.lower() |
|
|
for pad in ['pleasure', 'arousal', 'dominance'])] |
|
|
|
|
|
if delta_pad_columns: |
|
|
method = label_config.get('delta_pad', 'standard') |
|
|
if method != 'none': |
|
|
self.label_scalers['delta_pad'] = self._create_scaler(method) |
|
|
self.label_scalers['delta_pad'].fit(labels[delta_pad_columns]) |
|
|
|
|
|
|
|
|
if 'delta_pressure' in labels.columns: |
|
|
method = label_config.get('delta_pressure', 'standard') |
|
|
if method != 'none': |
|
|
self.label_scalers['delta_pressure'] = self._create_scaler(method) |
|
|
self.label_scalers['delta_pressure'].fit(labels[['delta_pressure']]) |
|
|
|
|
|
|
|
|
if 'confidence' in labels.columns: |
|
|
method = label_config.get('confidence', 'none') |
|
|
if method != 'none': |
|
|
self.label_scalers['confidence'] = self._create_scaler(method) |
|
|
self.label_scalers['confidence'].fit(labels[['confidence']]) |
|
|
|
|
|
logger.info("Scalers fitted") |
|
|
|
|
|
def _create_scaler(self, method: str): |
|
|
"""创建标准化器""" |
|
|
if method == 'standard': |
|
|
return StandardScaler() |
|
|
elif method == 'min_max': |
|
|
return MinMaxScaler() |
|
|
elif method == 'robust': |
|
|
return RobustScaler() |
|
|
else: |
|
|
raise ValueError(f"Unknown scaling method: {method}") |
|
|
|
|
|
def _scale_features(self, features: pd.DataFrame) -> np.ndarray: |
|
|
"""标准化特征""" |
|
|
features_scaled = features.copy() |
|
|
|
|
|
|
|
|
pad_columns = [col for col in features.columns if any(pad in col.lower() |
|
|
for pad in ['pleasure', 'arousal', 'dominance'])] |
|
|
|
|
|
if pad_columns and 'pad' in self.feature_scalers: |
|
|
features_scaled[pad_columns] = self.feature_scalers['pad'].transform(features[pad_columns]) |
|
|
|
|
|
|
|
|
if 'vitality' in features.columns and 'vitality' in self.feature_scalers: |
|
|
features_scaled[['vitality']] = self.feature_scalers['vitality'].transform(features[['vitality']]) |
|
|
|
|
|
return features_scaled.values |
|
|
|
|
|
def _scale_labels(self, labels: pd.DataFrame) -> np.ndarray: |
|
|
"""标准化标签""" |
|
|
labels_scaled = labels.copy() |
|
|
|
|
|
|
|
|
delta_pad_columns = [col for col in labels.columns if 'delta_' in col and |
|
|
any(pad in col.lower() for pad in ['pleasure', 'arousal', 'dominance'])] |
|
|
|
|
|
if delta_pad_columns and 'delta_pad' in self.label_scalers: |
|
|
labels_scaled[delta_pad_columns] = self.label_scalers['delta_pad'].transform(labels[delta_pad_columns]) |
|
|
|
|
|
|
|
|
if 'delta_pressure' in labels.columns and 'delta_pressure' in self.label_scalers: |
|
|
labels_scaled[['delta_pressure']] = self.label_scalers['delta_pressure'].transform(labels[['delta_pressure']]) |
|
|
|
|
|
|
|
|
if 'confidence' in labels.columns and 'confidence' in self.label_scalers: |
|
|
labels_scaled[['confidence']] = self.label_scalers['confidence'].transform(labels[['confidence']]) |
|
|
|
|
|
return labels_scaled.values |
|
|
|
|
|
def _inverse_scale_labels(self, labels: pd.DataFrame) -> np.ndarray: |
|
|
"""反标准化标签""" |
|
|
labels_unscaled = labels.copy() |
|
|
|
|
|
|
|
|
delta_pad_columns = [col for col in labels.columns if 'delta_' in col and |
|
|
any(pad in col.lower() for pad in ['pleasure', 'arousal', 'dominance'])] |
|
|
|
|
|
if delta_pad_columns and 'delta_pad' in self.label_scalers: |
|
|
labels_unscaled[delta_pad_columns] = self.label_scalers['delta_pad'].inverse_transform(labels[delta_pad_columns]) |
|
|
|
|
|
|
|
|
if 'delta_pressure' in labels.columns and 'delta_pressure' in self.label_scalers: |
|
|
labels_unscaled[['delta_pressure']] = self.label_scalers['delta_pressure'].inverse_transform(labels[['delta_pressure']]) |
|
|
|
|
|
|
|
|
if 'confidence' in labels.columns and 'confidence' in self.label_scalers: |
|
|
labels_unscaled[['confidence']] = self.label_scalers['confidence'].inverse_transform(labels[['confidence']]) |
|
|
|
|
|
return labels_unscaled.values |
|
|
|
|
|
def get_feature_statistics(self) -> Dict[str, Any]: |
|
|
"""获取特征统计信息""" |
|
|
return self.feature_stats |
|
|
|
|
|
def get_label_statistics(self) -> Dict[str, Any]: |
|
|
"""获取标签统计信息""" |
|
|
return self.label_stats |
|
|
|
|
|
def save_preprocessor(self, path: Union[str, Path]): |
|
|
"""保存预处理器""" |
|
|
import joblib |
|
|
|
|
|
preprocessor_data = { |
|
|
'config': self.config, |
|
|
'feature_scalers': self.feature_scalers, |
|
|
'label_scalers': self.label_scalers, |
|
|
'imputers': self.imputers, |
|
|
'outlier_detector': self.outlier_detector, |
|
|
'feature_stats': self.feature_stats, |
|
|
'label_stats': self.label_stats, |
|
|
'feature_columns': self.feature_columns, |
|
|
'label_columns': self.label_columns |
|
|
} |
|
|
|
|
|
joblib.dump(preprocessor_data, path) |
|
|
logger.info(f"Preprocessor saved to {path}") |
|
|
|
|
|
@classmethod |
|
|
def load_preprocessor(cls, path: Union[str, Path]) -> 'DataPreprocessor': |
|
|
"""加载预处理器""" |
|
|
import joblib |
|
|
|
|
|
preprocessor_data = joblib.load(path) |
|
|
|
|
|
|
|
|
preprocessor = cls(preprocessor_data['config']) |
|
|
|
|
|
|
|
|
preprocessor.feature_scalers = preprocessor_data['feature_scalers'] |
|
|
preprocessor.label_scalers = preprocessor_data['label_scalers'] |
|
|
preprocessor.imputers = preprocessor_data['imputers'] |
|
|
preprocessor.outlier_detector = preprocessor_data['outlier_detector'] |
|
|
preprocessor.feature_stats = preprocessor_data['feature_stats'] |
|
|
preprocessor.label_stats = preprocessor_data['label_stats'] |
|
|
preprocessor.feature_columns = preprocessor_data['feature_columns'] |
|
|
preprocessor.label_columns = preprocessor_data['label_columns'] |
|
|
|
|
|
logger.info(f"Preprocessor loaded from {path}") |
|
|
return preprocessor |
|
|
|
|
|
|
|
|
def create_preprocessor(config: Optional[Dict[str, Any]] = None) -> DataPreprocessor: |
|
|
""" |
|
|
创建数据预处理器 |
|
|
|
|
|
Args: |
|
|
config: 配置字典 |
|
|
|
|
|
Returns: |
|
|
数据预处理器实例 |
|
|
""" |
|
|
return DataPreprocessor(config) |