File size: 25,734 Bytes
0a6452f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 |
"""
合成数据生成器实现
Synthetic data generator for emotion and physiological state data
"""
import numpy as np
import pandas as pd
from typing import Union, Tuple, Optional, Dict, Any, List
from pathlib import Path
import matplotlib.pyplot as plt
import seaborn as sns
from loguru import logger
from scipy import stats
import warnings
class SyntheticDataGenerator:
"""
合成数据生成器
Synthetic data generator for emotion and physiological state prediction
生成符合PAD情绪模型和生理状态变化的数据:
- 输入:User PAD (3维) + Vitality (1维) + Current PAD (3维) = 7维
- 输出:ΔPAD (3维) = 3维
- 注意:ΔPressure 和 Confidence 不再生成,改为运行时计算
"""
def __init__(
self,
num_samples: int = 1000,
seed: Optional[int] = 42,
config: Optional[Dict[str, Any]] = None
):
"""
初始化合成数据生成器
Args:
num_samples: 样本数量
seed: 随机种子
config: 配置字典
"""
self.num_samples = num_samples
self.seed = seed
self.config = config or self._get_default_config()
# 设置随机种子
if seed is not None:
np.random.seed(seed)
# 特征和标签列名(与 CSV 文件列名一致)
self.feature_columns = [
'user_pad_p', 'user_pad_a', 'user_pad_d', # User PAD (3维)
'vitality', # Vitality (1维)
'ai_current_pad_p', 'ai_current_pad_a', 'ai_current_pad_d' # Current PAD (3维)
]
self.label_columns = [
'ai_delta_p', 'ai_delta_a', 'ai_delta_d' # ΔPAD (3维)
# 注意:delta_pressure 和 confidence 不再作为标签
# - delta_pressure 通过 PAD 动态计算
# - confidence 通过 MC Dropout 动态计算
]
logger.info(f"Synthetic data generator initialized: {num_samples} samples")
def _get_default_config(self) -> Dict[str, Any]:
"""获取默认配置"""
return {
# PAD值分布配置
'pad_distribution': {
'user_pad': {
'pleasure': {'mean': 0.0, 'std': 0.5, 'min': -1.0, 'max': 1.0},
'arousal': {'mean': 0.0, 'std': 0.4, 'min': -1.0, 'max': 1.0},
'dominance': {'mean': 0.1, 'std': 0.3, 'min': -1.0, 'max': 1.0}
},
'current_pad': {
'pleasure': {'mean': 0.0, 'std': 0.6, 'min': -1.0, 'max': 1.0},
'arousal': {'mean': 0.0, 'std': 0.5, 'min': -1.0, 'max': 1.0},
'dominance': {'mean': 0.1, 'std': 0.4, 'min': -1.0, 'max': 1.0}
}
},
# Vitality分布配置
'vitality_distribution': {
'mean': 50.0,
'std': 20.0,
'min': 0.0,
'max': 100.0
},
# ΔPAD分布配置
'delta_pad_distribution': {
'base_std': 0.1,
'influence_factor': 0.3,
'min': -0.5,
'max': 0.5
},
# ΔPressure分布配置
'delta_pressure_distribution': {
'base_std': 0.05,
'vitality_influence': 0.2,
'pad_influence': 0.15,
'min': -0.3,
'max': 0.3
},
# 置信度分布配置
'confidence_distribution': {
'base_mean': 0.7,
'base_std': 0.15,
'consistency_factor': 0.3,
'min': 0.0,
'max': 1.0
},
# 噪声配置
'noise': {
'enabled': True,
'feature_noise_std': 0.01,
'label_noise_std': 0.02
},
# 相关性配置
'correlations': {
'user_current_pad_correlation': 0.6, # User PAD与Current PAD的相关性
'vitality_pad_correlation': 0.3, # Vitality与PAD的相关性
'delta_consistency': 0.4 # Δ值的一致性
}
}
def generate_data(
self,
add_noise: bool = True,
add_correlations: bool = True,
return_dataframe: bool = False
) -> Union[Tuple[np.ndarray, np.ndarray], Tuple[pd.DataFrame, pd.DataFrame]]:
"""
生成合成数据
Args:
add_noise: 是否添加噪声
add_correlations: 是否添加相关性
return_dataframe: 是否返回DataFrame格式
Returns:
特征数据和标签数据的元组
"""
# 生成基础特征
user_pad = self._generate_user_pad()
vitality = self._generate_vitality()
current_pad = self._generate_current_pad(user_pad, vitality, add_correlations)
# 组合特征
features = np.hstack([user_pad, vitality.reshape(-1, 1), current_pad])
# 生成标签(仅 ΔPAD 3维)
delta_pad = self._generate_delta_pad(user_pad, current_pad, vitality, add_correlations)
# 标签就是 ΔPAD(不再包含 delta_pressure 和 confidence)
labels = delta_pad
# 添加噪声
if add_noise and self.config['noise']['enabled']:
features = self._add_feature_noise(features)
labels = self._add_label_noise(labels)
# 数据验证和修正
features = self._validate_and_fix_features(features)
labels = self._validate_and_fix_labels(labels)
# 转换格式
if return_dataframe:
features_df = pd.DataFrame(features, columns=self.feature_columns)
labels_df = pd.DataFrame(labels, columns=self.label_columns)
return features_df, labels_df
else:
return features, labels
def _generate_user_pad(self) -> np.ndarray:
"""生成User PAD数据"""
config = self.config['pad_distribution']['user_pad']
user_pad = np.zeros((self.num_samples, 3))
# 生成每个维度的数据
for i, dimension in enumerate(['pleasure', 'arousal', 'dominance']):
dim_config = config[dimension]
# 使用截断正态分布生成数据
data = stats.truncnorm(
(dim_config['min'] - dim_config['mean']) / dim_config['std'],
(dim_config['max'] - dim_config['mean']) / dim_config['std'],
loc=dim_config['mean'],
scale=dim_config['std']
).rvs(self.num_samples)
user_pad[:, i] = data
return user_pad
def _generate_vitality(self) -> np.ndarray:
"""生成Vitality数据"""
config = self.config['vitality_distribution']
# 使用Beta分布生成[0, 1]范围的数据,然后缩放到[0, 100]
alpha = ((config['mean'] - config['min']) / (config['max'] - config['min'])) * 2
beta = 2 - alpha
if alpha <= 0 or beta <= 0:
# 如果参数无效,使用截断正态分布
vitality = stats.truncnorm(
(config['min'] - config['mean']) / config['std'],
(config['max'] - config['mean']) / config['std'],
loc=config['mean'],
scale=config['std']
).rvs(self.num_samples)
else:
# 使用Beta分布
vitality = stats.beta.rvs(alpha, beta, size=self.num_samples)
vitality = vitality * (config['max'] - config['min']) + config['min']
return vitality
def _generate_current_pad(
self,
user_pad: np.ndarray,
vitality: np.ndarray,
add_correlations: bool
) -> np.ndarray:
"""生成Current PAD数据"""
config = self.config['pad_distribution']['current_pad']
correlation = self.config['correlations']['user_current_pad_correlation']
current_pad = np.zeros((self.num_samples, 3))
for i, dimension in enumerate(['pleasure', 'arousal', 'dominance']):
dim_config = config[dimension]
# 生成基础数据
base_data = stats.truncnorm(
(dim_config['min'] - dim_config['mean']) / dim_config['std'],
(dim_config['max'] - dim_config['mean']) / dim_config['std'],
loc=dim_config['mean'],
scale=dim_config['std']
).rvs(self.num_samples)
if add_correlations:
# 添加与User PAD的相关性
correlated_part = correlation * user_pad[:, i]
independent_part = (1 - abs(correlation)) * base_data
current_pad[:, i] = correlated_part + independent_part
# 添加与Vitality的轻微相关性
vitality_correlation = self.config['correlations']['vitality_pad_correlation']
vitality_influence = vitality_correlation * (vitality - 50) / 50 * 0.1
current_pad[:, i] += vitality_influence
else:
current_pad[:, i] = base_data
# 确保在有效范围内
current_pad[:, i] = np.clip(current_pad[:, i], -1.0, 1.0)
return current_pad
def _generate_delta_pad(
self,
user_pad: np.ndarray,
current_pad: np.ndarray,
vitality: np.ndarray,
add_correlations: bool
) -> np.ndarray:
"""生成ΔPAD数据"""
config = self.config['delta_pad_distribution']
delta_pad = np.zeros((self.num_samples, 3))
# 计算PAD差异(回归到均值的趋势)
pad_difference = current_pad - user_pad
for i in range(3):
# 基础变化量(回归到均值)
base_change = -pad_difference[:, i] * config['influence_factor']
# 添加随机变化
random_change = np.random.normal(0, config['base_std'], self.num_samples)
if add_correlations:
# 添加与Vitality的相关性(高活力时变化更大)
vitality_factor = (vitality / 100) * 0.2
vitality_change = np.random.normal(0, vitality_factor)
# 添加一致性(某些样本整体变化方向一致)
consistency_factor = self.config['correlations']['delta_consistency']
if consistency_factor > 0:
consistency_noise = np.random.normal(0, consistency_factor, self.num_samples)
random_change += consistency_noise
delta_pad[:, i] = base_change + random_change + vitality_change
else:
delta_pad[:, i] = base_change + random_change
# 确保在合理范围内
delta_pad[:, i] = np.clip(delta_pad[:, i], config['min'], config['max'])
return delta_pad
def _generate_delta_pressure(
self,
vitality: np.ndarray,
delta_pad: np.ndarray,
add_correlations: bool
) -> np.ndarray:
"""生成ΔPressure数据"""
config = self.config['delta_pressure_distribution']
# 基础压力变化
base_pressure = np.random.normal(0, config['base_std'], self.num_samples)
if add_correlations:
# 与Vitality的相关性(低活力时压力增加)
vitality_stress = -(vitality - 50) / 50 * config['vitality_influence']
# 与PAD变化的相关性(负面情绪变化时压力增加)
pad_stress = np.mean(delta_pad[:, :2], axis=1) * config['pad_influence'] # 主要考虑pleasure和arousal
delta_pressure = base_pressure + vitality_stress + pad_stress
else:
delta_pressure = base_pressure
# 确保在合理范围内
delta_pressure = np.clip(delta_pressure, config['min'], config['max'])
return delta_pressure
def _generate_confidence(
self,
features: np.ndarray,
delta_pad: np.ndarray,
delta_pressure: np.ndarray,
add_correlations: bool
) -> np.ndarray:
"""生成置信度数据"""
config = self.config['confidence_distribution']
# 基础置信度
base_confidence = np.random.normal(
config['base_mean'],
config['base_std'],
self.num_samples
)
if add_correlations:
# 基于数据一致性的置信度调整
# PAD值差异越小,置信度越高
user_pad = features[:, :3]
current_pad = features[:, 4:7]
pad_diff = np.abs(current_pad - user_pad)
consistency_score = 1.0 - np.mean(pad_diff, axis=1)
# 变化量越大,置信度越低
change_magnitude = np.sqrt(np.sum(delta_pad**2, axis=1) + delta_pressure**2)
change_factor = 1.0 - np.tanh(change_magnitude * 2)
# 组合因素
consistency_factor = config['consistency_factor']
confidence = base_confidence + consistency_factor * consistency_score * 0.2
confidence += consistency_factor * change_factor * 0.1
else:
confidence = base_confidence
# 确保在[0, 1]范围内
confidence = np.clip(confidence, config['min'], config['max'])
return confidence
def _add_feature_noise(self, features: np.ndarray) -> np.ndarray:
"""为特征添加噪声"""
noise_std = self.config['noise']['feature_noise_std']
noise = np.random.normal(0, noise_std, features.shape)
# 为不同维度添加不同程度的噪声
noise[:, 3] *= 2 # Vitality的噪声稍大
return features + noise
def _add_label_noise(self, labels: np.ndarray) -> np.ndarray:
"""为标签添加噪声"""
noise_std = self.config['noise']['label_noise_std']
noise = np.random.normal(0, noise_std, labels.shape)
# 为不同标签添加不同程度的噪声
noise[:, 4] *= 0.5 # 置信度的噪声较小
return labels + noise
def _validate_and_fix_features(self, features: np.ndarray) -> np.ndarray:
"""验证和修正特征数据"""
# PAD值限制在[-1, 1]范围内
pad_indices = [0, 1, 2, 4, 5, 6]
features[:, pad_indices] = np.clip(features[:, pad_indices], -1.0, 1.0)
# Vitality值限制在[0, 100]范围内
features[:, 3] = np.clip(features[:, 3], 0.0, 100.0)
return features
def _validate_and_fix_labels(self, labels: np.ndarray) -> np.ndarray:
"""验证和修正标签数据"""
# ΔPAD限制在[-0.5, 0.5]范围内
labels[:, :3] = np.clip(labels[:, :3], -0.5, 0.5)
# ΔPressure限制在[-0.3, 0.3]范围内
labels[:, 3] = np.clip(labels[:, 3], -0.3, 0.3)
# Confidence限制在[0, 1]范围内
labels[:, 4] = np.clip(labels[:, 4], 0.0, 1.0)
return labels
def generate_dataset_with_patterns(
self,
patterns: List[str],
pattern_weights: Optional[List[float]] = None
) -> Tuple[np.ndarray, np.ndarray]:
"""
生成具有特定模式的数据
Args:
patterns: 模式列表 ['stress', 'relaxation', 'excitement', 'calm']
pattern_weights: 模式权重列表
Returns:
特征数据和标签数据
"""
if pattern_weights is None:
pattern_weights = [1.0] * len(patterns)
# 计算每个模式的样本数量
total_weight = sum(pattern_weights)
pattern_samples = [
int(self.num_samples * weight / total_weight)
for weight in pattern_weights
]
# 调整以确保总样本数正确
pattern_samples[-1] = self.num_samples - sum(pattern_samples[:-1])
all_features = []
all_labels = []
for pattern, num_samples in zip(patterns, pattern_samples):
if num_samples > 0:
# 生成特定模式的数据
features, labels = self._generate_pattern_data(pattern, num_samples)
all_features.append(features)
all_labels.append(labels)
# 合并所有数据
features = np.vstack(all_features)
labels = np.vstack(all_labels)
# 打乱数据
indices = np.random.permutation(len(features))
features = features[indices]
labels = labels[indices]
logger.info(f"Generated data with patterns: {patterns}")
return features, labels
def _generate_pattern_data(self, pattern: str, num_samples: int) -> Tuple[np.ndarray, np.ndarray]:
"""生成特定模式的数据"""
# 临时修改生成器参数
original_samples = self.num_samples
self.num_samples = num_samples
# 根据模式调整参数
if pattern == 'stress':
# 压力模式:低活力,负面情绪,压力增加
config = self.config.copy()
config['vitality_distribution']['mean'] = 30.0
config['vitality_distribution']['std'] = 10.0
config['pad_distribution']['user_pad']['pleasure']['mean'] = -0.3
config['pad_distribution']['user_pad']['arousal']['mean'] = 0.2
config['delta_pressure_distribution']['base_std'] = 0.1
elif pattern == 'relaxation':
# 放松模式:中高活力,正面情绪,压力减少
config = self.config.copy()
config['vitality_distribution']['mean'] = 70.0
config['vitality_distribution']['std'] = 15.0
config['pad_distribution']['user_pad']['pleasure']['mean'] = 0.4
config['pad_distribution']['user_pad']['arousal']['mean'] = -0.2
config['delta_pressure_distribution']['base_std'] = 0.08
elif pattern == 'excitement':
# 兴奋模式:高活力,高激活度
config = self.config.copy()
config['vitality_distribution']['mean'] = 85.0
config['vitality_distribution']['std'] = 10.0
config['pad_distribution']['user_pad']['arousal']['mean'] = 0.6
config['pad_distribution']['current_pad']['arousal']['mean'] = 0.7
elif pattern == 'calm':
# 平静模式:中等活力,低激活度
config = self.config.copy()
config['vitality_distribution']['mean'] = 60.0
config['vitality_distribution']['std'] = 12.0
config['pad_distribution']['user_pad']['arousal']['mean'] = -0.4
config['pad_distribution']['current_pad']['arousal']['mean'] = -0.3
else:
# 默认模式
config = self.config
# 临时更新配置
original_config = self.config
self.config = config
# 生成数据
features, labels = self.generate_data(add_noise=True, add_correlations=True)
# 恢复原始配置
self.config = original_config
self.num_samples = original_samples
return features, labels
def save_data(
self,
features: np.ndarray,
labels: np.ndarray,
output_path: Union[str, Path],
format: str = 'csv'
):
"""
保存生成的数据
Args:
features: 特征数据
labels: 标签数据
output_path: 输出路径
format: 文件格式 ('csv', 'parquet', 'json')
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
# 创建DataFrame
features_df = pd.DataFrame(features, columns=self.feature_columns)
labels_df = pd.DataFrame(labels, columns=self.label_columns)
# 合并数据
combined_df = pd.concat([features_df, labels_df], axis=1)
# 保存数据
if format.lower() == 'csv':
combined_df.to_csv(output_path, index=False)
elif format.lower() == 'parquet':
combined_df.to_parquet(output_path, index=False)
elif format.lower() == 'json':
combined_df.to_json(output_path, orient='records', indent=2)
else:
raise ValueError(f"Unsupported format: {format}")
logger.info(f"Data saved to {output_path}")
def visualize_data_distribution(
self,
features: np.ndarray,
labels: np.ndarray,
save_path: Optional[Union[str, Path]] = None
):
"""
可视化数据分布
Args:
features: 特征数据
labels: 标签数据
save_path: 保存路径
"""
features_df = pd.DataFrame(features, columns=self.feature_columns)
labels_df = pd.DataFrame(labels, columns=self.label_columns)
# 创建子图
fig, axes = plt.subplots(3, 4, figsize=(16, 12))
fig.suptitle('Synthetic Data Distribution', fontsize=16)
# 特征分布
for i, col in enumerate(self.feature_columns):
row, col_idx = i // 4, i % 4
axes[row, col_idx].hist(features_df[col], bins=30, alpha=0.7)
axes[row, col_idx].set_title(f'Feature: {col}')
axes[row, col_idx].set_xlabel('Value')
axes[row, col_idx].set_ylabel('Frequency')
# 标签分布(前3个)
for i, col in enumerate(self.label_columns[:3]):
row, col_idx = 2, i
axes[row, col_idx].hist(labels_df[col], bins=30, alpha=0.7, color='orange')
axes[row, col_idx].set_title(f'Label: {col}')
axes[row, col_idx].set_xlabel('Value')
axes[row, col_idx].set_ylabel('Frequency')
# 最后一个子图显示标签分布
axes[2, 3].hist(labels_df['delta_pressure'], bins=30, alpha=0.7, color='orange')
axes[2, 3].set_title('Label: delta_pressure')
axes[2, 3].set_xlabel('Value')
axes[2, 3].set_ylabel('Frequency')
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
logger.info(f"Visualization saved to {save_path}")
plt.show()
def get_data_statistics(
self,
features: np.ndarray,
labels: np.ndarray
) -> Dict[str, Any]:
"""
获取数据统计信息
Args:
features: 特征数据
labels: 标签数据
Returns:
统计信息字典
"""
features_df = pd.DataFrame(features, columns=self.feature_columns)
labels_df = pd.DataFrame(labels, columns=self.label_columns)
stats = {
'features': {
'mean': features_df.mean().to_dict(),
'std': features_df.std().to_dict(),
'min': features_df.min().to_dict(),
'max': features_df.max().to_dict(),
'median': features_df.median().to_dict()
},
'labels': {
'mean': labels_df.mean().to_dict(),
'std': labels_df.std().to_dict(),
'min': labels_df.min().to_dict(),
'max': labels_df.max().to_dict(),
'median': labels_df.median().to_dict()
},
'correlations': {
'feature_correlations': features_df.corr().to_dict(),
'label_correlations': labels_df.corr().to_dict()
}
}
return stats
# 便捷函数
def generate_synthetic_data(
num_samples: int = 1000,
seed: Optional[int] = 42,
config: Optional[Dict[str, Any]] = None,
**kwargs
) -> Tuple[np.ndarray, np.ndarray]:
"""
生成合成数据的便捷函数
Args:
num_samples: 样本数量
seed: 随机种子
config: 配置字典
**kwargs: 其他参数
Returns:
特征数据和标签数据
"""
generator = SyntheticDataGenerator(num_samples, seed, config)
return generator.generate_data(**kwargs)
def create_synthetic_dataset(
num_samples: int = 1000,
output_path: Optional[Union[str, Path]] = None,
format: str = 'csv',
**kwargs
) -> Tuple[np.ndarray, np.ndarray]:
"""
创建并保存合成数据集的便捷函数
Args:
num_samples: 样本数量
output_path: 输出路径
format: 文件格式
**kwargs: 其他参数
Returns:
特征数据和标签数据
"""
generator = SyntheticDataGenerator(num_samples)
features, labels = generator.generate_data(**kwargs)
if output_path:
generator.save_data(features, labels, output_path, format)
return features, labels |