|
|
""" |
|
|
数据集类实现 |
|
|
Dataset implementation for emotion and physiological state data |
|
|
""" |
|
|
|
|
|
import torch |
|
|
import torch.nn.functional as F |
|
|
from torch.utils.data import Dataset |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from typing import Union, Tuple, Optional, List, Dict, Any |
|
|
from pathlib import Path |
|
|
import logging |
|
|
from loguru import logger |
|
|
|
|
|
class EmotionDataset(Dataset): |
|
|
""" |
|
|
情绪与生理状态变化预测数据集 |
|
|
Dataset for emotion and physiological state change prediction |
|
|
|
|
|
输入特征 (10维): |
|
|
- User PAD: Pleasure, Arousal, Dominance (3维) |
|
|
- Vitality: 生理活力值 (1维) |
|
|
- Current PAD: 当前状态 Pleasure, Arousal, Dominance (3维) |
|
|
- PAD差异: User与Current的差值 (3维,动态计算) |
|
|
|
|
|
输出标签 (3维): |
|
|
- ΔPAD: PAD状态变化量 (3维) |
|
|
|
|
|
注: |
|
|
- ΔPressure 不再作为预测目标,改用基于 PAD 变化的动态计算 |
|
|
- Confidence 通过 MC Dropout 动态计算 |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
data: Union[np.ndarray, pd.DataFrame, str, Path], |
|
|
labels: Optional[Union[np.ndarray, pd.DataFrame]] = None, |
|
|
feature_columns: Optional[List[str]] = None, |
|
|
label_columns: Optional[List[str]] = None, |
|
|
normalize_features: bool = True, |
|
|
normalize_labels: bool = False, |
|
|
feature_scaler: Optional[Dict[str, Any]] = None, |
|
|
label_scaler: Optional[Dict[str, Any]] = None, |
|
|
validation_mode: bool = False |
|
|
): |
|
|
""" |
|
|
初始化数据集 |
|
|
|
|
|
Args: |
|
|
data: 输入数据,可以是数组、DataFrame或文件路径 |
|
|
labels: 标签数据,如果data包含标签则为None |
|
|
feature_columns: 特征列名列表 |
|
|
label_columns: 标签列名列表 |
|
|
normalize_features: 是否标准化特征 |
|
|
normalize_labels: 是否标准化标签 |
|
|
feature_scaler: 特征标准化参数 |
|
|
label_scaler: 标签标准化参数 |
|
|
validation_mode: 是否为验证模式 |
|
|
""" |
|
|
self.normalize_features = normalize_features |
|
|
self.normalize_labels = normalize_labels |
|
|
self.validation_mode = validation_mode |
|
|
|
|
|
|
|
|
self.default_feature_columns = [ |
|
|
'user_pad_p', 'user_pad_a', 'user_pad_d', |
|
|
'vitality', |
|
|
'ai_current_pad_p', 'ai_current_pad_a', 'ai_current_pad_d' |
|
|
] |
|
|
|
|
|
self.default_label_columns = [ |
|
|
'ai_delta_p', 'ai_delta_a', 'ai_delta_d' |
|
|
|
|
|
|
|
|
|
|
|
] |
|
|
|
|
|
|
|
|
self.features, self.labels = self._load_data( |
|
|
data, labels, feature_columns, label_columns |
|
|
) |
|
|
|
|
|
|
|
|
self.extra_labels = self._load_extra_labels(data) |
|
|
|
|
|
|
|
|
self._validate_data() |
|
|
|
|
|
|
|
|
self.feature_scaler = feature_scaler or self._create_feature_scaler() |
|
|
self.label_scaler = label_scaler or self._create_label_scaler() |
|
|
|
|
|
|
|
|
if self.normalize_features: |
|
|
self.features = self._normalize_features(self.features) |
|
|
|
|
|
if self.normalize_labels and self.labels is not None: |
|
|
self.labels = self._normalize_labels(self.labels) |
|
|
|
|
|
logger.info(f"Dataset initialized: {len(self)} samples") |
|
|
logger.info(f"Features shape: {self.features.shape}") |
|
|
if self.labels is not None: |
|
|
logger.info(f"Labels shape: {self.labels.shape}") |
|
|
|
|
|
def _load_data( |
|
|
self, |
|
|
data: Union[np.ndarray, pd.DataFrame, str, Path], |
|
|
labels: Optional[Union[np.ndarray, pd.DataFrame]], |
|
|
feature_columns: Optional[List[str]], |
|
|
label_columns: Optional[List[str]] |
|
|
) -> Tuple[np.ndarray, Optional[np.ndarray]]: |
|
|
""" |
|
|
加载数据 |
|
|
|
|
|
Args: |
|
|
data: 输入数据 |
|
|
labels: 标签数据 |
|
|
feature_columns: 特征列名 |
|
|
label_columns: 标签列名 |
|
|
|
|
|
Returns: |
|
|
features和labels的元组 |
|
|
""" |
|
|
|
|
|
if isinstance(data, (str, Path)): |
|
|
data_path = Path(data) |
|
|
if data_path.suffix.lower() in ['.csv', '.tsv']: |
|
|
df = pd.read_csv(data_path, encoding='utf-8') |
|
|
elif data_path.suffix.lower() in ['.json']: |
|
|
df = pd.read_json(data_path) |
|
|
elif data_path.suffix.lower() in ['.pkl', '.pickle']: |
|
|
df = pd.read_pickle(data_path) |
|
|
else: |
|
|
raise ValueError(f"Unsupported file format: {data_path.suffix}") |
|
|
elif isinstance(data, pd.DataFrame): |
|
|
df = data.copy() |
|
|
elif isinstance(data, np.ndarray): |
|
|
|
|
|
if labels is None and data.shape[1] == 12: |
|
|
feature_cols = feature_columns or self.default_feature_columns |
|
|
label_cols = label_columns or self.default_label_columns |
|
|
df = pd.DataFrame(data, columns=feature_cols + label_cols) |
|
|
labels = df[label_cols].values |
|
|
df = df[feature_cols] |
|
|
else: |
|
|
df = pd.DataFrame(data, columns=feature_columns or self.default_feature_columns) |
|
|
else: |
|
|
raise ValueError(f"Unsupported data type: {type(data)}") |
|
|
|
|
|
|
|
|
if labels is None: |
|
|
|
|
|
if label_columns: |
|
|
labels_df = df[label_columns] |
|
|
|
|
|
feature_cols = feature_columns or self.default_feature_columns |
|
|
features_df = df[feature_cols] |
|
|
else: |
|
|
|
|
|
label_cols = [col for col in self.default_label_columns if col in df.columns] |
|
|
if label_cols: |
|
|
labels_df = df[label_cols] |
|
|
|
|
|
feature_cols = [col for col in self.default_feature_columns if col in df.columns] |
|
|
features_df = df[feature_cols] |
|
|
else: |
|
|
labels_df = None |
|
|
|
|
|
feature_cols = [col for col in self.default_feature_columns if col in df.columns] |
|
|
features_df = df[feature_cols] if feature_cols else df |
|
|
else: |
|
|
|
|
|
feature_cols = [col for col in (feature_columns or self.default_feature_columns) if col in df.columns] |
|
|
features_df = df[feature_cols] if feature_cols else df |
|
|
if isinstance(labels, pd.DataFrame): |
|
|
labels_df = labels.values |
|
|
else: |
|
|
labels_df = labels |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
features_array = features_df.values |
|
|
enhanced_features = np.zeros((features_array.shape[0], 10)) |
|
|
|
|
|
|
|
|
enhanced_features[:, :7] = features_array |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
enhanced_features[:, 7] = features_array[:, 0] - features_array[:, 4] |
|
|
enhanced_features[:, 8] = features_array[:, 1] - features_array[:, 5] |
|
|
enhanced_features[:, 9] = features_array[:, 2] - features_array[:, 6] |
|
|
|
|
|
|
|
|
return enhanced_features, labels_df.values if labels_df is not None else None |
|
|
|
|
|
def _load_extra_labels(self, data: Union[np.ndarray, pd.DataFrame, str, Path]) -> Optional[np.ndarray]: |
|
|
""" |
|
|
加载额外的标签列(不用于训练,仅用于验证对比) |
|
|
|
|
|
Args: |
|
|
data: 输入数据 |
|
|
|
|
|
Returns: |
|
|
额外标签数组(delta_pressure 列) |
|
|
""" |
|
|
|
|
|
if isinstance(data, (str, Path)): |
|
|
data_path = Path(data) |
|
|
if data_path.suffix.lower() in ['.csv', '.tsv']: |
|
|
df = pd.read_csv(data_path, encoding='utf-8') |
|
|
elif data_path.suffix.lower() in ['.json']: |
|
|
df = pd.read_json(data_path) |
|
|
elif data_path.suffix.lower() in ['.pkl', '.pickle']: |
|
|
df = pd.read_pickle(data_path) |
|
|
else: |
|
|
return None |
|
|
elif isinstance(data, pd.DataFrame): |
|
|
df = data.copy() |
|
|
else: |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
if 'delta_pressure' in df.columns: |
|
|
return df['delta_pressure'].values.reshape(-1, 1) |
|
|
return None |
|
|
|
|
|
def _validate_data(self): |
|
|
"""验证数据格式和范围""" |
|
|
|
|
|
if self.features.shape[1] != 10: |
|
|
raise ValueError(f"Expected 10 feature dimensions, got {self.features.shape[1]}") |
|
|
|
|
|
|
|
|
if self.labels is not None and self.labels.shape[1] != 3: |
|
|
raise ValueError(f"Expected 3 label dimensions, got {self.labels.shape[1]}") |
|
|
|
|
|
|
|
|
self._check_feature_ranges() |
|
|
if self.labels is not None: |
|
|
self._check_label_ranges() |
|
|
|
|
|
|
|
|
if np.isnan(self.features).any(): |
|
|
logger.warning("Found NaN values in features") |
|
|
|
|
|
if self.labels is not None and np.isnan(self.labels).any(): |
|
|
logger.warning("Found NaN values in labels") |
|
|
|
|
|
|
|
|
if np.isinf(self.features).any(): |
|
|
raise ValueError("Found infinite values in features") |
|
|
|
|
|
if self.labels is not None and np.isinf(self.labels).any(): |
|
|
raise ValueError("Found infinite values in labels") |
|
|
|
|
|
def _check_feature_ranges(self): |
|
|
"""检查特征值的合理范围""" |
|
|
|
|
|
pad_indices = [0, 1, 2, 4, 5, 6] |
|
|
pad_values = self.features[:, pad_indices] |
|
|
|
|
|
if not np.all((pad_values >= -1.5) & (pad_values <= 1.5)): |
|
|
logger.warning("Some PAD values are outside the expected range [-1, 1]") |
|
|
|
|
|
|
|
|
vitality_values = self.features[:, 3] |
|
|
if not np.all((vitality_values >= -10) & (vitality_values <= 110)): |
|
|
logger.warning("Some vitality values are outside the expected range [0, 100]") |
|
|
|
|
|
|
|
|
diff_indices = [7, 8, 9] |
|
|
diff_values = self.features[:, diff_indices] |
|
|
if not np.all((diff_values >= -2.5) & (diff_values <= 2.5)): |
|
|
logger.warning("Some PAD difference values are outside the expected range [-2, 2]") |
|
|
|
|
|
def _check_label_ranges(self): |
|
|
"""检查标签值的合理范围""" |
|
|
|
|
|
if self.labels is not None and self.labels.shape[1] >= 3: |
|
|
delta_pad_values = self.labels[:, :3] |
|
|
|
|
|
if not np.all((delta_pad_values >= -1.0) & (delta_pad_values <= 1.0)): |
|
|
logger.warning("Some ΔPAD values are outside the expected range [-1, 1]") |
|
|
|
|
|
def _create_feature_scaler(self) -> Dict[str, Any]: |
|
|
"""创建特征标准化参数""" |
|
|
scaler = {} |
|
|
|
|
|
|
|
|
pad_indices = [0, 1, 2, 4, 5, 6] |
|
|
pad_values = self.features[:, pad_indices] |
|
|
scaler['pad_mean'] = np.mean(pad_values, axis=0) |
|
|
scaler['pad_std'] = np.std(pad_values, axis=0) |
|
|
scaler['pad_std'] = np.where(scaler['pad_std'] == 0, 1, scaler['pad_std']) |
|
|
|
|
|
|
|
|
vitality_values = self.features[:, 3] |
|
|
scaler['vitality_mean'] = np.mean(vitality_values) |
|
|
scaler['vitality_std'] = np.std(vitality_values) |
|
|
scaler['vitality_std'] = scaler['vitality_std'] if scaler['vitality_std'] > 0 else 1 |
|
|
|
|
|
|
|
|
diff_indices = [7, 8, 9] |
|
|
diff_values = self.features[:, diff_indices] |
|
|
scaler['diff_mean'] = np.mean(diff_values, axis=0) |
|
|
scaler['diff_std'] = np.std(diff_values, axis=0) |
|
|
scaler['diff_std'] = np.where(scaler['diff_std'] == 0, 1, scaler['diff_std']) |
|
|
|
|
|
return scaler |
|
|
|
|
|
def _create_label_scaler(self) -> Dict[str, Any]: |
|
|
"""创建标签标准化参数""" |
|
|
if self.labels is None: |
|
|
return {} |
|
|
|
|
|
scaler = {} |
|
|
|
|
|
|
|
|
delta_pad_indices = [0, 1, 2] |
|
|
delta_pad_values = self.labels[:, delta_pad_indices] |
|
|
scaler['delta_pad_mean'] = np.mean(delta_pad_values, axis=0) |
|
|
scaler['delta_pad_std'] = np.std(delta_pad_values, axis=0) |
|
|
scaler['delta_pad_std'] = np.where(scaler['delta_pad_std'] == 0, 1, scaler['delta_pad_std']) |
|
|
|
|
|
return scaler |
|
|
|
|
|
def _normalize_features(self, features: np.ndarray) -> np.ndarray: |
|
|
"""标准化特征""" |
|
|
normalized = features.copy() |
|
|
|
|
|
|
|
|
pad_indices = [0, 1, 2, 4, 5, 6] |
|
|
normalized[:, pad_indices] = ( |
|
|
features[:, pad_indices] - self.feature_scaler['pad_mean'] |
|
|
) / self.feature_scaler['pad_std'] |
|
|
|
|
|
|
|
|
normalized[:, 3] = ( |
|
|
features[:, 3] - self.feature_scaler['vitality_mean'] |
|
|
) / self.feature_scaler['vitality_std'] |
|
|
|
|
|
|
|
|
diff_indices = [7, 8, 9] |
|
|
|
|
|
|
|
|
|
|
|
normalized[:, diff_indices] = features[:, diff_indices] |
|
|
return normalized |
|
|
|
|
|
def _normalize_labels(self, labels: np.ndarray) -> np.ndarray: |
|
|
"""标准化标签""" |
|
|
normalized = labels.copy() |
|
|
|
|
|
|
|
|
delta_pad_indices = [0, 1, 2] |
|
|
normalized[:, delta_pad_indices] = ( |
|
|
labels[:, delta_pad_indices] - self.label_scaler['delta_pad_mean'] |
|
|
) / self.label_scaler['delta_pad_std'] |
|
|
|
|
|
return normalized |
|
|
|
|
|
def denormalize_features(self, features: np.ndarray) -> np.ndarray: |
|
|
"""反标准化特征""" |
|
|
denormalized = features.copy() |
|
|
|
|
|
|
|
|
pad_indices = [0, 1, 2, 4, 5, 6] |
|
|
denormalized[:, pad_indices] = ( |
|
|
features[:, pad_indices] * self.feature_scaler['pad_std'] + |
|
|
self.feature_scaler['pad_mean'] |
|
|
) |
|
|
|
|
|
|
|
|
denormalized[:, 3] = ( |
|
|
features[:, 3] * self.feature_scaler['vitality_std'] + |
|
|
self.feature_scaler['vitality_mean'] |
|
|
) |
|
|
|
|
|
|
|
|
diff_indices = [7, 8, 9] |
|
|
denormalized[:, diff_indices] = ( |
|
|
features[:, diff_indices] * self.feature_scaler['diff_std'] + |
|
|
self.feature_scaler['diff_mean'] |
|
|
) |
|
|
|
|
|
return denormalized |
|
|
|
|
|
def denormalize_labels(self, labels: np.ndarray) -> np.ndarray: |
|
|
"""反标准化标签""" |
|
|
denormalized = labels.copy() |
|
|
|
|
|
|
|
|
delta_pad_indices = [0, 1, 2] |
|
|
denormalized[:, delta_pad_indices] = ( |
|
|
labels[:, delta_pad_indices] * self.label_scaler['delta_pad_std'] + |
|
|
self.label_scaler['delta_pad_mean'] |
|
|
) |
|
|
|
|
|
return denormalized |
|
|
|
|
|
def __len__(self) -> int: |
|
|
"""返回数据集大小""" |
|
|
return len(self.features) |
|
|
|
|
|
def __getitem__(self, idx: int) -> Tuple[torch.Tensor, Optional[torch.Tensor]]: |
|
|
""" |
|
|
获取单个样本 |
|
|
|
|
|
Args: |
|
|
idx: 样本索引 |
|
|
|
|
|
Returns: |
|
|
特征张量和标签张量的元组 |
|
|
""" |
|
|
features = torch.FloatTensor(self.features[idx]) |
|
|
|
|
|
if self.labels is not None: |
|
|
labels = torch.FloatTensor(self.labels[idx]) |
|
|
return features, labels |
|
|
else: |
|
|
return features |
|
|
|
|
|
def get_feature_statistics(self) -> Dict[str, Any]: |
|
|
"""获取特征统计信息""" |
|
|
stats = {} |
|
|
|
|
|
|
|
|
stats['overall'] = { |
|
|
'mean': np.mean(self.features, axis=0), |
|
|
'std': np.std(self.features, axis=0), |
|
|
'min': np.min(self.features, axis=0), |
|
|
'max': np.max(self.features, axis=0) |
|
|
} |
|
|
|
|
|
|
|
|
pad_indices = [0, 1, 2, 4, 5, 6] |
|
|
pad_features = self.features[:, pad_indices] |
|
|
stats['pad_features'] = { |
|
|
'mean': np.mean(pad_features), |
|
|
'std': np.std(pad_features), |
|
|
'min': np.min(pad_features), |
|
|
'max': np.max(pad_features) |
|
|
} |
|
|
|
|
|
|
|
|
vitality_features = self.features[:, 3] |
|
|
stats['vitality'] = { |
|
|
'mean': np.mean(vitality_features), |
|
|
'std': np.std(vitality_features), |
|
|
'min': np.min(vitality_features), |
|
|
'max': np.max(vitality_features) |
|
|
} |
|
|
|
|
|
return stats |
|
|
|
|
|
def get_label_statistics(self) -> Optional[Dict[str, Any]]: |
|
|
"""获取标签统计信息""" |
|
|
if self.labels is None: |
|
|
return None |
|
|
|
|
|
stats = {} |
|
|
|
|
|
|
|
|
stats['overall'] = { |
|
|
'mean': np.mean(self.labels, axis=0), |
|
|
'std': np.std(self.labels, axis=0), |
|
|
'min': np.min(self.labels, axis=0), |
|
|
'max': np.max(self.labels, axis=0) |
|
|
} |
|
|
|
|
|
|
|
|
delta_pad_indices = [0, 1, 2] |
|
|
delta_pad_labels = self.labels[:, delta_pad_indices] |
|
|
stats['delta_pad'] = { |
|
|
'mean': np.mean(delta_pad_labels), |
|
|
'std': np.std(delta_pad_labels), |
|
|
'min': np.min(delta_pad_labels), |
|
|
'max': np.max(delta_pad_labels) |
|
|
} |
|
|
|
|
|
return stats |
|
|
|
|
|
def save_scalers(self, path: Union[str, Path]): |
|
|
"""保存标准化参数""" |
|
|
import json |
|
|
|
|
|
|
|
|
def convert_numpy(obj): |
|
|
if isinstance(obj, np.ndarray): |
|
|
return obj.tolist() |
|
|
elif isinstance(obj, np.generic): |
|
|
return obj.item() |
|
|
return obj |
|
|
|
|
|
scalers = { |
|
|
'feature_scaler': self.feature_scaler, |
|
|
'label_scaler': self.label_scaler |
|
|
} |
|
|
|
|
|
|
|
|
def recursive_convert(obj): |
|
|
if isinstance(obj, dict): |
|
|
return {k: recursive_convert(v) for k, v in obj.items()} |
|
|
elif isinstance(obj, list): |
|
|
return [recursive_convert(v) for v in obj] |
|
|
else: |
|
|
return convert_numpy(obj) |
|
|
|
|
|
scalers = recursive_convert(scalers) |
|
|
|
|
|
with open(path, 'w') as f: |
|
|
json.dump(scalers, f, indent=2) |
|
|
|
|
|
logger.info(f"Scalers saved to {path}") |
|
|
|
|
|
@classmethod |
|
|
def load_scalers(cls, path: Union[str, Path]) -> Tuple[Dict[str, Any], Dict[str, Any]]: |
|
|
"""加载标准化参数""" |
|
|
import json |
|
|
|
|
|
with open(path, 'r') as f: |
|
|
scalers = json.load(f) |
|
|
|
|
|
logger.info(f"Scalers loaded from {path}") |
|
|
return scalers['feature_scaler'], scalers['label_scaler'] |