""" 合成数据生成器实现 Synthetic data generator for emotion and physiological state data """ import numpy as np import pandas as pd from typing import Union, Tuple, Optional, Dict, Any, List from pathlib import Path import matplotlib.pyplot as plt import seaborn as sns from loguru import logger from scipy import stats import warnings class SyntheticDataGenerator: """ 合成数据生成器 Synthetic data generator for emotion and physiological state prediction 生成符合PAD情绪模型和生理状态变化的数据: - 输入:User PAD (3维) + Vitality (1维) + Current PAD (3维) = 7维 - 输出:ΔPAD (3维) = 3维 - 注意:ΔPressure 和 Confidence 不再生成,改为运行时计算 """ def __init__( self, num_samples: int = 1000, seed: Optional[int] = 42, config: Optional[Dict[str, Any]] = None ): """ 初始化合成数据生成器 Args: num_samples: 样本数量 seed: 随机种子 config: 配置字典 """ self.num_samples = num_samples self.seed = seed self.config = config or self._get_default_config() # 设置随机种子 if seed is not None: np.random.seed(seed) # 特征和标签列名(与 CSV 文件列名一致) self.feature_columns = [ 'user_pad_p', 'user_pad_a', 'user_pad_d', # User PAD (3维) 'vitality', # Vitality (1维) 'ai_current_pad_p', 'ai_current_pad_a', 'ai_current_pad_d' # Current PAD (3维) ] self.label_columns = [ 'ai_delta_p', 'ai_delta_a', 'ai_delta_d' # ΔPAD (3维) # 注意:delta_pressure 和 confidence 不再作为标签 # - delta_pressure 通过 PAD 动态计算 # - confidence 通过 MC Dropout 动态计算 ] logger.info(f"Synthetic data generator initialized: {num_samples} samples") def _get_default_config(self) -> Dict[str, Any]: """获取默认配置""" return { # PAD值分布配置 'pad_distribution': { 'user_pad': { 'pleasure': {'mean': 0.0, 'std': 0.5, 'min': -1.0, 'max': 1.0}, 'arousal': {'mean': 0.0, 'std': 0.4, 'min': -1.0, 'max': 1.0}, 'dominance': {'mean': 0.1, 'std': 0.3, 'min': -1.0, 'max': 1.0} }, 'current_pad': { 'pleasure': {'mean': 0.0, 'std': 0.6, 'min': -1.0, 'max': 1.0}, 'arousal': {'mean': 0.0, 'std': 0.5, 'min': -1.0, 'max': 1.0}, 'dominance': {'mean': 0.1, 'std': 0.4, 'min': -1.0, 'max': 1.0} } }, # Vitality分布配置 'vitality_distribution': { 'mean': 50.0, 'std': 20.0, 'min': 0.0, 'max': 100.0 }, # ΔPAD分布配置 'delta_pad_distribution': { 'base_std': 0.1, 'influence_factor': 0.3, 'min': -0.5, 'max': 0.5 }, # ΔPressure分布配置 'delta_pressure_distribution': { 'base_std': 0.05, 'vitality_influence': 0.2, 'pad_influence': 0.15, 'min': -0.3, 'max': 0.3 }, # 置信度分布配置 'confidence_distribution': { 'base_mean': 0.7, 'base_std': 0.15, 'consistency_factor': 0.3, 'min': 0.0, 'max': 1.0 }, # 噪声配置 'noise': { 'enabled': True, 'feature_noise_std': 0.01, 'label_noise_std': 0.02 }, # 相关性配置 'correlations': { 'user_current_pad_correlation': 0.6, # User PAD与Current PAD的相关性 'vitality_pad_correlation': 0.3, # Vitality与PAD的相关性 'delta_consistency': 0.4 # Δ值的一致性 } } def generate_data( self, add_noise: bool = True, add_correlations: bool = True, return_dataframe: bool = False ) -> Union[Tuple[np.ndarray, np.ndarray], Tuple[pd.DataFrame, pd.DataFrame]]: """ 生成合成数据 Args: add_noise: 是否添加噪声 add_correlations: 是否添加相关性 return_dataframe: 是否返回DataFrame格式 Returns: 特征数据和标签数据的元组 """ # 生成基础特征 user_pad = self._generate_user_pad() vitality = self._generate_vitality() current_pad = self._generate_current_pad(user_pad, vitality, add_correlations) # 组合特征 features = np.hstack([user_pad, vitality.reshape(-1, 1), current_pad]) # 生成标签(仅 ΔPAD 3维) delta_pad = self._generate_delta_pad(user_pad, current_pad, vitality, add_correlations) # 标签就是 ΔPAD(不再包含 delta_pressure 和 confidence) labels = delta_pad # 添加噪声 if add_noise and self.config['noise']['enabled']: features = self._add_feature_noise(features) labels = self._add_label_noise(labels) # 数据验证和修正 features = self._validate_and_fix_features(features) labels = self._validate_and_fix_labels(labels) # 转换格式 if return_dataframe: features_df = pd.DataFrame(features, columns=self.feature_columns) labels_df = pd.DataFrame(labels, columns=self.label_columns) return features_df, labels_df else: return features, labels def _generate_user_pad(self) -> np.ndarray: """生成User PAD数据""" config = self.config['pad_distribution']['user_pad'] user_pad = np.zeros((self.num_samples, 3)) # 生成每个维度的数据 for i, dimension in enumerate(['pleasure', 'arousal', 'dominance']): dim_config = config[dimension] # 使用截断正态分布生成数据 data = stats.truncnorm( (dim_config['min'] - dim_config['mean']) / dim_config['std'], (dim_config['max'] - dim_config['mean']) / dim_config['std'], loc=dim_config['mean'], scale=dim_config['std'] ).rvs(self.num_samples) user_pad[:, i] = data return user_pad def _generate_vitality(self) -> np.ndarray: """生成Vitality数据""" config = self.config['vitality_distribution'] # 使用Beta分布生成[0, 1]范围的数据,然后缩放到[0, 100] alpha = ((config['mean'] - config['min']) / (config['max'] - config['min'])) * 2 beta = 2 - alpha if alpha <= 0 or beta <= 0: # 如果参数无效,使用截断正态分布 vitality = stats.truncnorm( (config['min'] - config['mean']) / config['std'], (config['max'] - config['mean']) / config['std'], loc=config['mean'], scale=config['std'] ).rvs(self.num_samples) else: # 使用Beta分布 vitality = stats.beta.rvs(alpha, beta, size=self.num_samples) vitality = vitality * (config['max'] - config['min']) + config['min'] return vitality def _generate_current_pad( self, user_pad: np.ndarray, vitality: np.ndarray, add_correlations: bool ) -> np.ndarray: """生成Current PAD数据""" config = self.config['pad_distribution']['current_pad'] correlation = self.config['correlations']['user_current_pad_correlation'] current_pad = np.zeros((self.num_samples, 3)) for i, dimension in enumerate(['pleasure', 'arousal', 'dominance']): dim_config = config[dimension] # 生成基础数据 base_data = stats.truncnorm( (dim_config['min'] - dim_config['mean']) / dim_config['std'], (dim_config['max'] - dim_config['mean']) / dim_config['std'], loc=dim_config['mean'], scale=dim_config['std'] ).rvs(self.num_samples) if add_correlations: # 添加与User PAD的相关性 correlated_part = correlation * user_pad[:, i] independent_part = (1 - abs(correlation)) * base_data current_pad[:, i] = correlated_part + independent_part # 添加与Vitality的轻微相关性 vitality_correlation = self.config['correlations']['vitality_pad_correlation'] vitality_influence = vitality_correlation * (vitality - 50) / 50 * 0.1 current_pad[:, i] += vitality_influence else: current_pad[:, i] = base_data # 确保在有效范围内 current_pad[:, i] = np.clip(current_pad[:, i], -1.0, 1.0) return current_pad def _generate_delta_pad( self, user_pad: np.ndarray, current_pad: np.ndarray, vitality: np.ndarray, add_correlations: bool ) -> np.ndarray: """生成ΔPAD数据""" config = self.config['delta_pad_distribution'] delta_pad = np.zeros((self.num_samples, 3)) # 计算PAD差异(回归到均值的趋势) pad_difference = current_pad - user_pad for i in range(3): # 基础变化量(回归到均值) base_change = -pad_difference[:, i] * config['influence_factor'] # 添加随机变化 random_change = np.random.normal(0, config['base_std'], self.num_samples) if add_correlations: # 添加与Vitality的相关性(高活力时变化更大) vitality_factor = (vitality / 100) * 0.2 vitality_change = np.random.normal(0, vitality_factor) # 添加一致性(某些样本整体变化方向一致) consistency_factor = self.config['correlations']['delta_consistency'] if consistency_factor > 0: consistency_noise = np.random.normal(0, consistency_factor, self.num_samples) random_change += consistency_noise delta_pad[:, i] = base_change + random_change + vitality_change else: delta_pad[:, i] = base_change + random_change # 确保在合理范围内 delta_pad[:, i] = np.clip(delta_pad[:, i], config['min'], config['max']) return delta_pad def _generate_delta_pressure( self, vitality: np.ndarray, delta_pad: np.ndarray, add_correlations: bool ) -> np.ndarray: """生成ΔPressure数据""" config = self.config['delta_pressure_distribution'] # 基础压力变化 base_pressure = np.random.normal(0, config['base_std'], self.num_samples) if add_correlations: # 与Vitality的相关性(低活力时压力增加) vitality_stress = -(vitality - 50) / 50 * config['vitality_influence'] # 与PAD变化的相关性(负面情绪变化时压力增加) pad_stress = np.mean(delta_pad[:, :2], axis=1) * config['pad_influence'] # 主要考虑pleasure和arousal delta_pressure = base_pressure + vitality_stress + pad_stress else: delta_pressure = base_pressure # 确保在合理范围内 delta_pressure = np.clip(delta_pressure, config['min'], config['max']) return delta_pressure def _generate_confidence( self, features: np.ndarray, delta_pad: np.ndarray, delta_pressure: np.ndarray, add_correlations: bool ) -> np.ndarray: """生成置信度数据""" config = self.config['confidence_distribution'] # 基础置信度 base_confidence = np.random.normal( config['base_mean'], config['base_std'], self.num_samples ) if add_correlations: # 基于数据一致性的置信度调整 # PAD值差异越小,置信度越高 user_pad = features[:, :3] current_pad = features[:, 4:7] pad_diff = np.abs(current_pad - user_pad) consistency_score = 1.0 - np.mean(pad_diff, axis=1) # 变化量越大,置信度越低 change_magnitude = np.sqrt(np.sum(delta_pad**2, axis=1) + delta_pressure**2) change_factor = 1.0 - np.tanh(change_magnitude * 2) # 组合因素 consistency_factor = config['consistency_factor'] confidence = base_confidence + consistency_factor * consistency_score * 0.2 confidence += consistency_factor * change_factor * 0.1 else: confidence = base_confidence # 确保在[0, 1]范围内 confidence = np.clip(confidence, config['min'], config['max']) return confidence def _add_feature_noise(self, features: np.ndarray) -> np.ndarray: """为特征添加噪声""" noise_std = self.config['noise']['feature_noise_std'] noise = np.random.normal(0, noise_std, features.shape) # 为不同维度添加不同程度的噪声 noise[:, 3] *= 2 # Vitality的噪声稍大 return features + noise def _add_label_noise(self, labels: np.ndarray) -> np.ndarray: """为标签添加噪声""" noise_std = self.config['noise']['label_noise_std'] noise = np.random.normal(0, noise_std, labels.shape) # 为不同标签添加不同程度的噪声 noise[:, 4] *= 0.5 # 置信度的噪声较小 return labels + noise def _validate_and_fix_features(self, features: np.ndarray) -> np.ndarray: """验证和修正特征数据""" # PAD值限制在[-1, 1]范围内 pad_indices = [0, 1, 2, 4, 5, 6] features[:, pad_indices] = np.clip(features[:, pad_indices], -1.0, 1.0) # Vitality值限制在[0, 100]范围内 features[:, 3] = np.clip(features[:, 3], 0.0, 100.0) return features def _validate_and_fix_labels(self, labels: np.ndarray) -> np.ndarray: """验证和修正标签数据""" # ΔPAD限制在[-0.5, 0.5]范围内 labels[:, :3] = np.clip(labels[:, :3], -0.5, 0.5) # ΔPressure限制在[-0.3, 0.3]范围内 labels[:, 3] = np.clip(labels[:, 3], -0.3, 0.3) # Confidence限制在[0, 1]范围内 labels[:, 4] = np.clip(labels[:, 4], 0.0, 1.0) return labels def generate_dataset_with_patterns( self, patterns: List[str], pattern_weights: Optional[List[float]] = None ) -> Tuple[np.ndarray, np.ndarray]: """ 生成具有特定模式的数据 Args: patterns: 模式列表 ['stress', 'relaxation', 'excitement', 'calm'] pattern_weights: 模式权重列表 Returns: 特征数据和标签数据 """ if pattern_weights is None: pattern_weights = [1.0] * len(patterns) # 计算每个模式的样本数量 total_weight = sum(pattern_weights) pattern_samples = [ int(self.num_samples * weight / total_weight) for weight in pattern_weights ] # 调整以确保总样本数正确 pattern_samples[-1] = self.num_samples - sum(pattern_samples[:-1]) all_features = [] all_labels = [] for pattern, num_samples in zip(patterns, pattern_samples): if num_samples > 0: # 生成特定模式的数据 features, labels = self._generate_pattern_data(pattern, num_samples) all_features.append(features) all_labels.append(labels) # 合并所有数据 features = np.vstack(all_features) labels = np.vstack(all_labels) # 打乱数据 indices = np.random.permutation(len(features)) features = features[indices] labels = labels[indices] logger.info(f"Generated data with patterns: {patterns}") return features, labels def _generate_pattern_data(self, pattern: str, num_samples: int) -> Tuple[np.ndarray, np.ndarray]: """生成特定模式的数据""" # 临时修改生成器参数 original_samples = self.num_samples self.num_samples = num_samples # 根据模式调整参数 if pattern == 'stress': # 压力模式:低活力,负面情绪,压力增加 config = self.config.copy() config['vitality_distribution']['mean'] = 30.0 config['vitality_distribution']['std'] = 10.0 config['pad_distribution']['user_pad']['pleasure']['mean'] = -0.3 config['pad_distribution']['user_pad']['arousal']['mean'] = 0.2 config['delta_pressure_distribution']['base_std'] = 0.1 elif pattern == 'relaxation': # 放松模式:中高活力,正面情绪,压力减少 config = self.config.copy() config['vitality_distribution']['mean'] = 70.0 config['vitality_distribution']['std'] = 15.0 config['pad_distribution']['user_pad']['pleasure']['mean'] = 0.4 config['pad_distribution']['user_pad']['arousal']['mean'] = -0.2 config['delta_pressure_distribution']['base_std'] = 0.08 elif pattern == 'excitement': # 兴奋模式:高活力,高激活度 config = self.config.copy() config['vitality_distribution']['mean'] = 85.0 config['vitality_distribution']['std'] = 10.0 config['pad_distribution']['user_pad']['arousal']['mean'] = 0.6 config['pad_distribution']['current_pad']['arousal']['mean'] = 0.7 elif pattern == 'calm': # 平静模式:中等活力,低激活度 config = self.config.copy() config['vitality_distribution']['mean'] = 60.0 config['vitality_distribution']['std'] = 12.0 config['pad_distribution']['user_pad']['arousal']['mean'] = -0.4 config['pad_distribution']['current_pad']['arousal']['mean'] = -0.3 else: # 默认模式 config = self.config # 临时更新配置 original_config = self.config self.config = config # 生成数据 features, labels = self.generate_data(add_noise=True, add_correlations=True) # 恢复原始配置 self.config = original_config self.num_samples = original_samples return features, labels def save_data( self, features: np.ndarray, labels: np.ndarray, output_path: Union[str, Path], format: str = 'csv' ): """ 保存生成的数据 Args: features: 特征数据 labels: 标签数据 output_path: 输出路径 format: 文件格式 ('csv', 'parquet', 'json') """ output_path = Path(output_path) output_path.parent.mkdir(parents=True, exist_ok=True) # 创建DataFrame features_df = pd.DataFrame(features, columns=self.feature_columns) labels_df = pd.DataFrame(labels, columns=self.label_columns) # 合并数据 combined_df = pd.concat([features_df, labels_df], axis=1) # 保存数据 if format.lower() == 'csv': combined_df.to_csv(output_path, index=False) elif format.lower() == 'parquet': combined_df.to_parquet(output_path, index=False) elif format.lower() == 'json': combined_df.to_json(output_path, orient='records', indent=2) else: raise ValueError(f"Unsupported format: {format}") logger.info(f"Data saved to {output_path}") def visualize_data_distribution( self, features: np.ndarray, labels: np.ndarray, save_path: Optional[Union[str, Path]] = None ): """ 可视化数据分布 Args: features: 特征数据 labels: 标签数据 save_path: 保存路径 """ features_df = pd.DataFrame(features, columns=self.feature_columns) labels_df = pd.DataFrame(labels, columns=self.label_columns) # 创建子图 fig, axes = plt.subplots(3, 4, figsize=(16, 12)) fig.suptitle('Synthetic Data Distribution', fontsize=16) # 特征分布 for i, col in enumerate(self.feature_columns): row, col_idx = i // 4, i % 4 axes[row, col_idx].hist(features_df[col], bins=30, alpha=0.7) axes[row, col_idx].set_title(f'Feature: {col}') axes[row, col_idx].set_xlabel('Value') axes[row, col_idx].set_ylabel('Frequency') # 标签分布(前3个) for i, col in enumerate(self.label_columns[:3]): row, col_idx = 2, i axes[row, col_idx].hist(labels_df[col], bins=30, alpha=0.7, color='orange') axes[row, col_idx].set_title(f'Label: {col}') axes[row, col_idx].set_xlabel('Value') axes[row, col_idx].set_ylabel('Frequency') # 最后一个子图显示标签分布 axes[2, 3].hist(labels_df['delta_pressure'], bins=30, alpha=0.7, color='orange') axes[2, 3].set_title('Label: delta_pressure') axes[2, 3].set_xlabel('Value') axes[2, 3].set_ylabel('Frequency') plt.tight_layout() if save_path: plt.savefig(save_path, dpi=300, bbox_inches='tight') logger.info(f"Visualization saved to {save_path}") plt.show() def get_data_statistics( self, features: np.ndarray, labels: np.ndarray ) -> Dict[str, Any]: """ 获取数据统计信息 Args: features: 特征数据 labels: 标签数据 Returns: 统计信息字典 """ features_df = pd.DataFrame(features, columns=self.feature_columns) labels_df = pd.DataFrame(labels, columns=self.label_columns) stats = { 'features': { 'mean': features_df.mean().to_dict(), 'std': features_df.std().to_dict(), 'min': features_df.min().to_dict(), 'max': features_df.max().to_dict(), 'median': features_df.median().to_dict() }, 'labels': { 'mean': labels_df.mean().to_dict(), 'std': labels_df.std().to_dict(), 'min': labels_df.min().to_dict(), 'max': labels_df.max().to_dict(), 'median': labels_df.median().to_dict() }, 'correlations': { 'feature_correlations': features_df.corr().to_dict(), 'label_correlations': labels_df.corr().to_dict() } } return stats # 便捷函数 def generate_synthetic_data( num_samples: int = 1000, seed: Optional[int] = 42, config: Optional[Dict[str, Any]] = None, **kwargs ) -> Tuple[np.ndarray, np.ndarray]: """ 生成合成数据的便捷函数 Args: num_samples: 样本数量 seed: 随机种子 config: 配置字典 **kwargs: 其他参数 Returns: 特征数据和标签数据 """ generator = SyntheticDataGenerator(num_samples, seed, config) return generator.generate_data(**kwargs) def create_synthetic_dataset( num_samples: int = 1000, output_path: Optional[Union[str, Path]] = None, format: str = 'csv', **kwargs ) -> Tuple[np.ndarray, np.ndarray]: """ 创建并保存合成数据集的便捷函数 Args: num_samples: 样本数量 output_path: 输出路径 format: 文件格式 **kwargs: 其他参数 Returns: 特征数据和标签数据 """ generator = SyntheticDataGenerator(num_samples) features, labels = generator.generate_data(**kwargs) if output_path: generator.save_data(features, labels, output_path, format) return features, labels