#!/usr/bin/env python3 """ 推理教程 Inference Tutorial for Emotion and Physiological State Prediction Model 这个脚本演示了如何使用训练好的模型进行推理预测: 1. 单样本推理 2. 批量推理 3. 不同输入格式处理 4. 结果解释和可视化 5. 性能优化和基准测试 运行方式: python inference_tutorial.py """ import sys import os from pathlib import Path import numpy as np import pandas as pd import torch import json import time from typing import Dict, Any, List, Union, Tuple import matplotlib.pyplot as plt import seaborn as sns # 添加项目根目录到Python路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) from src.data.synthetic_generator import SyntheticDataGenerator from src.models.pad_predictor import PADPredictor from src.data.preprocessor import DataPreprocessor from src.utils.inference_engine import create_inference_engine, InferenceEngine from src.utils.logger import setup_logger def main(): """主函数""" print("=" * 80) print("情绪与生理状态变化预测模型 - 推理教程") print("Emotion and Physiological State Prediction Model - Inference Tutorial") print("=" * 80) # 设置日志 setup_logger(level='INFO') # 创建输出目录 output_dir = Path(project_root) / "examples" / "inference_outputs" output_dir.mkdir(exist_ok=True) # 1. 准备模型和数据 print("\n1. 准备模型和数据") print("-" * 50) model_path, preprocessor_path = prepare_model_and_data(output_dir) # 2. 创建推理引擎 print("\n2. 创建推理引擎") print("-" * 50) engine = create_inference_engine( model_path=model_path, preprocessor_path=preprocessor_path, device='auto' ) # 3. 单样本推理 print("\n3. 单样本推理") print("-" * 50) demonstrate_single_inference(engine, output_dir) # 4. 批量推理 print("\n4. 批量推理") print("-" * 50) demonstrate_batch_inference(engine, output_dir) # 5. 不同输入格式处理 print("\n5. 不同输入格式处理") print("-" * 50) demonstrate_different_input_formats(engine, output_dir) # 6. 结果解释和可视化 print("\n6. 结果解释和可视化") print("-" * 50) demonstrate_result_interpretation(engine, output_dir) # 7. 性能优化和基准测试 print("\n7. 性能优化和基准测试") print("-" * 50) demonstrate_performance_optimization(engine, output_dir) # 8. 实际应用场景演示 print("\n8. 实际应用场景演示") print("-" * 50) demonstrate_real_world_scenarios(engine, output_dir) print("\n" + "=" * 80) print("推理教程完成!") print("Inference Tutorial Completed!") print("=" * 80) def prepare_model_and_data(output_dir: Path) -> Tuple[str, str]: """准备模型和数据""" print(" - 检查是否存在预训练模型...") # 检查快速开始教程中生成的模型 model_path = Path(project_root) / "examples" / "models" / "quick_start_model.pth" preprocessor_path = Path(project_root) / "examples" / "models" / "quick_start_preprocessor.pkl" if not model_path.exists() or not preprocessor_path.exists(): print(" - 未找到预训练模型,创建简单模型用于演示...") # 生成训练数据 generator = SyntheticDataGenerator(num_samples=500, seed=42) features, labels = generator.generate_data() # 创建和训练简单模型 preprocessor = DataPreprocessor() preprocessor.fit(features, labels) processed_features, processed_labels = preprocessor.transform(features, labels) # 创建模型 model = PADPredictor(input_dim=7, output_dim=5, hidden_dims=[64, 32]) # 简单训练 from torch.utils.data import DataLoader, TensorDataset dataset = TensorDataset( torch.FloatTensor(processed_features), torch.FloatTensor(processed_labels) ) train_loader = DataLoader(dataset, batch_size=32, shuffle=True) optimizer = torch.optim.Adam(model.parameters(), lr=0.001) criterion = torch.nn.MSELoss() model.train() for epoch in range(10): for batch_features, batch_labels in train_loader: optimizer.zero_grad() outputs = model(batch_features) loss = criterion(outputs, batch_labels) loss.backward() optimizer.step() # 保存模型和预处理器 output_dir.mkdir(parents=True, exist_ok=True) model_path = output_dir / "demo_model.pth" preprocessor_path = output_dir / "demo_preprocessor.pkl" model.save_model(str(model_path)) preprocessor.save(str(preprocessor_path)) print(f" - 演示模型已保存到: {model_path}") print(f" - 使用模型: {model_path}") print(f" - 使用预处理器: {preprocessor_path}") return str(model_path), str(preprocessor_path) def demonstrate_single_inference(engine: InferenceEngine, output_dir: Path): """演示单样本推理""" print(" - 演示不同情绪状态的单样本推理...") # 定义不同情绪状态的样本 samples = [ { 'name': '高兴状态', 'data': [0.8, 0.4, 0.6, 90.0, 0.7, 0.3, 0.5], 'description': '用户感到高兴,活力水平高' }, { 'name': '压力状态', 'data': [-0.6, 0.7, -0.3, 35.0, -0.5, 0.8, -0.2], 'description': '用户感到压力,激活度高但支配感低' }, { 'name': '平静状态', 'data': [0.1, -0.4, 0.2, 65.0, 0.0, -0.3, 0.1], 'description': '用户处于平静状态,激活度较低' }, { 'name': '兴奋状态', 'data': [0.6, 0.9, 0.4, 85.0, 0.5, 0.8, 0.3], 'description': '用户感到兴奋,激活度很高' }, { 'name': '疲劳状态', 'data': [-0.2, -0.7, -0.4, 25.0, -0.1, -0.6, -0.3], 'description': '用户感到疲劳,活力水平低' } ] results = [] for sample in samples: print(f"\n {sample['name']}:") print(f" 描述: {sample['description']}") print(f" 输入: User PAD=[{sample['data'][0]:.1f}, {sample['data'][1]:.1f}, {sample['data'][2]:.1f}], " f"Vitality={sample['data'][3]:.0f}, Current PAD=[{sample['data'][4]:.1f}, {sample['data'][5]:.1f}, {sample['data'][6]:.1f}]") # 进行推理 start_time = time.time() result = engine.predict(sample['data']) inference_time = (time.time() - start_time) * 1000 print(f" 推理时间: {inference_time:.2f}ms") print(f" 预测结果:") print(f" ΔPAD: [{result['delta_pad'][0]:.3f}, {result['delta_pad'][1]:.3f}, {result['delta_pad'][2]:.3f}]") print(f" ΔPressure: {result['delta_pressure']:.3f}") print(f" Confidence: {result['confidence']:.3f}") # 解释结果 interpretation = interpret_prediction(result) print(f" 解释: {interpretation}") # 保存结果 result_data = { 'name': sample['name'], 'description': sample['description'], 'input': sample['data'], 'prediction': result, 'inference_time_ms': inference_time, 'interpretation': interpretation } results.append(result_data) # 保存结果 results_path = output_dir / 'single_inference_results.json' with open(results_path, 'w', encoding='utf-8') as f: json.dump(results, f, indent=2, ensure_ascii=False) print(f"\n - 单样本推理结果已保存到: {results_path}") def demonstrate_batch_inference(engine: InferenceEngine, output_dir: Path): """演示批量推理""" print(" - 生成批量测试数据...") # 生成批量测试数据 generator = SyntheticDataGenerator(num_samples=100, seed=123) features, labels = generator.generate_data() print(f" - 批量大小: {len(features)}") # 批量推理 print(" - 执行批量推理...") start_time = time.time() batch_results = engine.predict_batch(features.tolist()) total_time = time.time() - start_time print(f" - 批量推理完成:") print(f" 总时间: {total_time:.3f}秒") print(f" 平均每样本时间: {total_time/len(features)*1000:.2f}ms") print(f" 吞吐量: {len(features)/total_time:.2f} 样本/秒") # 分析批量结果 analyze_batch_results(batch_results, labels, output_dir) # 保存批量结果 batch_data = { 'batch_size': len(features), 'total_time_seconds': total_time, 'avg_time_per_sample_ms': total_time/len(features)*1000, 'throughput_samples_per_second': len(features)/total_time, 'results': batch_results } batch_path = output_dir / 'batch_inference_results.json' with open(batch_path, 'w', encoding='utf-8') as f: json.dump(batch_data, f, indent=2, ensure_ascii=False) print(f" - 批量推理结果已保存到: {batch_path}") def analyze_batch_results(predictions: List[Dict], true_labels: np.ndarray, output_dir: Path): """分析批量推理结果""" print(" - 分析批量推理结果...") # 提取预测值 pred_delta_pad = np.array([p['delta_pad'] for p in predictions]) pred_delta_pressure = np.array([p['delta_pressure'] for p in predictions]) pred_confidence = np.array([p['confidence'] for p in predictions]) # 提取真实值 true_delta_pad = true_labels[:, :3] true_delta_pressure = true_labels[:, 3] true_confidence = true_labels[:, 4] # 计算误差指标 pad_mae = np.mean(np.abs(pred_delta_pad - true_delta_pad), axis=0) pressure_mae = np.mean(np.abs(pred_delta_pressure - true_delta_pressure)) confidence_mae = np.mean(np.abs(pred_confidence - true_confidence)) print(f" ΔPAD MAE: [{pad_mae[0]:.4f}, {pad_mae[1]:.4f}, {pad_mae[2]:.4f}]") print(f" ΔPressure MAE: {pressure_mae:.4f}") print(f" Confidence MAE: {confidence_mae:.4f}") # 可视化预测分布 visualize_prediction_distributions( pred_delta_pad, pred_delta_pressure, pred_confidence, true_delta_pad, true_delta_pressure, true_confidence, output_dir ) def visualize_prediction_distributions(pred_delta_pad, pred_delta_pressure, pred_confidence, true_delta_pad, true_delta_pressure, true_confidence, output_dir: Path): """可视化预测分布""" fig, axes = plt.subplots(2, 3, figsize=(15, 10)) fig.suptitle('预测值与真实值分布对比', fontsize=16) labels = ['ΔPleasure', 'ΔArousal', 'ΔDominance'] # ΔPAD分布对比 for i in range(3): row, col = 0, i # 真实值分布 axes[row, col].hist(true_delta_pad[:, i], bins=20, alpha=0.7, label='真实值', color='blue', density=True) # 预测值分布 axes[row, col].hist(pred_delta_pad[:, i], bins=20, alpha=0.7, label='预测值', color='red', density=True) axes[row, col].set_title(f'{labels[i]}') axes[row, col].set_xlabel('值') axes[row, col].set_ylabel('密度') axes[row, col].legend() axes[row, col].grid(True, alpha=0.3) # ΔPressure分布对比 axes[1, 0].hist(true_delta_pressure, bins=20, alpha=0.7, label='真实值', color='blue', density=True) axes[1, 0].hist(pred_delta_pressure, bins=20, alpha=0.7, label='预测值', color='red', density=True) axes[1, 0].set_title('ΔPressure') axes[1, 0].set_xlabel('值') axes[1, 0].set_ylabel('密度') axes[1, 0].legend() axes[1, 0].grid(True, alpha=0.3) # Confidence分布对比 axes[1, 1].hist(true_confidence, bins=20, alpha=0.7, label='真实值', color='blue', density=True) axes[1, 1].hist(pred_confidence, bins=20, alpha=0.7, label='预测值', color='red', density=True) axes[1, 1].set_title('Confidence') axes[1, 1].set_xlabel('值') axes[1, 1].set_ylabel('密度') axes[1, 1].legend() axes[1, 1].grid(True, alpha=0.3) # 隐藏最后一个子图 axes[1, 2].set_visible(False) plt.tight_layout() plt.savefig(output_dir / 'prediction_distributions.png', dpi=300, bbox_inches='tight') plt.close() print(f" - 预测分布图已保存到: {output_dir / 'prediction_distributions.png'}") def demonstrate_different_input_formats(engine: InferenceEngine, output_dir: Path): """演示不同输入格式处理""" print(" - 演示不同输入格式的处理...") # 1. 列表格式输入 print(" 1. 列表格式输入:") list_input = [0.5, 0.3, -0.2, 75.0, 0.1, 0.4, -0.1] result1 = engine.predict(list_input) print(f" 输入: {list_input}") print(f" 预测: ΔPAD={result1['delta_pad']}, Confidence={result1['confidence']:.3f}") # 2. NumPy数组格式输入 print(" 2. NumPy数组格式输入:") np_input = np.array([0.5, 0.3, -0.2, 75.0, 0.1, 0.4, -0.1]) result2 = engine.predict(np_input) print(f" 输入: {np_input}") print(f" 预测: ΔPAD={result2['delta_pad']}, Confidence={result2['confidence']:.3f}") # 3. 字典格式输入 print(" 3. 字典格式输入:") dict_input = { 'user_pleasure': 0.5, 'user_arousal': 0.3, 'user_dominance': -0.2, 'vitality': 75.0, 'current_pleasure': 0.1, 'current_arousal': 0.4, 'current_dominance': -0.1 } # 转换为列表格式 dict_to_list = [ dict_input['user_pleasure'], dict_input['user_arousal'], dict_input['user_dominance'], dict_input['vitality'], dict_input['current_pleasure'], dict_input['current_arousal'], dict_input['current_dominance'] ] result3 = engine.predict(dict_to_list) print(f" 输入: {dict_input}") print(f" 预测: ΔPAD={result3['delta_pad']}, Confidence={result3['confidence']:.3f}") # 4. 从JSON文件读取输入 print(" 4. 从JSON文件读取输入:") json_data = { "samples": [ [0.5, 0.3, -0.2, 75.0, 0.1, 0.4, -0.1], [-0.3, 0.6, 0.2, 45.0, -0.1, 0.7, 0.1] ] } json_path = output_dir / 'test_input.json' with open(json_path, 'w', encoding='utf-8') as f: json.dump(json_data, f, indent=2, ensure_ascii=False) # 从文件读取并预测 with open(json_path, 'r', encoding='utf-8') as f: loaded_data = json.load(f) for i, sample in enumerate(loaded_data['samples']): result = engine.predict(sample) print(f" 样本{i+1}预测: ΔPAD={result['delta_pad']}, Confidence={result['confidence']:.3f}") # 5. 从CSV文件读取输入 print(" 5. 从CSV文件读取输入:") csv_data = pd.DataFrame([ [0.5, 0.3, -0.2, 75.0, 0.1, 0.4, -0.1], [-0.3, 0.6, 0.2, 45.0, -0.1, 0.7, 0.1], [0.8, -0.4, 0.6, 90.0, 0.7, -0.3, 0.5] ], columns=['user_pleasure', 'user_arousal', 'user_dominance', 'vitality', 'current_pleasure', 'current_arousal', 'current_dominance']) csv_path = output_dir / 'test_input.csv' csv_data.to_csv(csv_path, index=False) # 从CSV读取并预测 loaded_csv = pd.read_csv(csv_path) csv_results = engine.predict_batch(loaded_csv.values.tolist()) for i, result in enumerate(csv_results): print(f" CSV样本{i+1}预测: ΔPAD={result['delta_pad']}, Confidence={result['confidence']:.3f}") print(f" - 测试文件已保存到: {output_dir}") def demonstrate_result_interpretation(engine: InferenceEngine, output_dir: Path): """演示结果解释""" print(" - 演示详细的结果解释...") # 生成不同场景的样本 scenarios = [ { 'name': '积极变化', 'input': [0.2, 0.1, 0.0, 60.0, 0.5, 0.3, 0.2], 'expected': '情绪向积极方向发展' }, { 'name': '消极变化', 'input': [0.5, 0.3, 0.2, 70.0, 0.1, -0.2, -0.1], 'expected': '情绪向消极方向发展' }, { 'name': '稳定状态', 'input': [0.3, 0.2, 0.1, 65.0, 0.35, 0.25, 0.15], 'expected': '情绪状态相对稳定' } ] interpretation_results = [] for scenario in scenarios: print(f"\n 场景: {scenario['name']}") print(f" 预期: {scenario['expected']}") # 预测 result = engine.predict(scenario['input']) # 详细解释 detailed_interpretation = detailed_interpret_prediction( scenario['input'], result ) print(f" 详细解释:") for line in detailed_interpretation.split('\n'): if line.strip(): print(f" {line}") # 保存解释结果 interpretation_results.append({ 'scenario': scenario['name'], 'input': scenario['input'], 'expected': scenario['expected'], 'prediction': result, 'detailed_interpretation': detailed_interpretation }) # 保存解释结果 interpretation_path = output_dir / 'result_interpretations.json' with open(interpretation_path, 'w', encoding='utf-8') as f: json.dump(interpretation_results, f, indent=2, ensure_ascii=False) print(f"\n - 结果解释已保存到: {interpretation_path}") def detailed_interpret_prediction(input_data: List[float], result: Dict[str, Any]) -> str: """详细解释预测结果""" user_pad = input_data[:3] vitality = input_data[3] current_pad = input_data[4:] delta_pad = result['delta_pad'] delta_pressure = result['delta_pressure'] confidence = result['confidence'] interpretations = [] # 当前状态分析 interpretations.append("当前状态分析:") # PAD状态分析 if user_pad[0] > 0.3: interpretations.append(f" - 用户当前情绪偏积极 (Pleasure: {user_pad[0]:.2f})") elif user_pad[0] < -0.3: interpretations.append(f" - 用户当前情绪偏消极 (Pleasure: {user_pad[0]:.2f})") else: interpretations.append(f" - 用户当前情绪中性 (Pleasure: {user_pad[0]:.2f})") if user_pad[1] > 0.3: interpretations.append(f" - 用户当前激活度较高 (Arousal: {user_pad[1]:.2f})") elif user_pad[1] < -0.3: interpretations.append(f" - 用户当前激活度较低 (Arousal: {user_pad[1]:.2f})") else: interpretations.append(f" - 用户当前激活度中等 (Arousal: {user_pad[1]:.2f})") if vitality > 70: interpretations.append(f" - 用户当前活力水平高 (Vitality: {vitality:.0f})") elif vitality < 40: interpretations.append(f" - 用户当前活力水平低 (Vitality: {vitality:.0f})") else: interpretations.append(f" - 用户当前活力水平中等 (Vitality: {vitality:.0f})") # 变化趋势分析 interpretations.append("\n变化趋势分析:") # PAD变化分析 if abs(delta_pad[0]) > 0.05: direction = "增加" if delta_pad[0] > 0 else "减少" interpretations.append(f" - 快乐度预计{direction} {abs(delta_pad[0]):.3f}") if abs(delta_pad[1]) > 0.05: direction = "增加" if delta_pad[1] > 0 else "减少" interpretations.append(f" - 激活度预计{direction} {abs(delta_pad[1]):.3f}") if abs(delta_pad[2]) > 0.05: direction = "增加" if delta_pad[2] > 0 else "减少" interpretations.append(f" - 支配度预计{direction} {abs(delta_pad[2]):.3f}") # 压力变化分析 if abs(delta_pressure) > 0.03: direction = "增加" if delta_pressure > 0 else "减少" interpretations.append(f" - 压力水平预计{direction} {abs(delta_pressure):.3f}") # 预测置信度 interpretations.append(f"\n预测置信度: {confidence:.3f}") if confidence > 0.8: interpretations.append(" - 高置信度预测,结果可靠性强") elif confidence > 0.6: interpretations.append(" - 中等置信度预测,结果较为可靠") else: interpretations.append(" - 低置信度预测,结果不确定性较高") return '\n'.join(interpretations) def demonstrate_performance_optimization(engine: InferenceEngine, output_dir: Path): """演示性能优化""" print(" - 演示性能优化技术...") # 1. 不同批次大小的性能测试 print(" 1. 不同批次大小的性能测试:") batch_sizes = [1, 8, 16, 32, 64, 128] # 生成测试数据 generator = SyntheticDataGenerator(num_samples=1000, seed=456) test_features, _ = generator.generate_data() batch_performance = [] for batch_size in batch_sizes: start_time = time.time() # 分批处理 for i in range(0, len(test_features), batch_size): batch = test_features[i:i+batch_size].tolist() if len(batch) < batch_size: continue # 跳过不完整的批次 engine.predict_batch(batch) total_time = time.time() - start_time throughput = len(test_features) / total_time batch_performance.append({ 'batch_size': batch_size, 'total_time': total_time, 'throughput': throughput }) print(f" 批次大小 {batch_size:3d}: {total_time:.3f}s, {throughput:.2f} 样本/秒") # 找到最佳批次大小 best_batch = max(batch_performance, key=lambda x: x['throughput']) print(f" 最佳批次大小: {best_batch['batch_size']} ({best_batch['throughput']:.2f} 样本/秒)") # 2. 预热效果测试 print("\n 2. 预热效果测试:") # 测试无预热的性能 cold_times = [] for _ in range(10): start_time = time.time() engine.predict([0.5, 0.3, -0.2, 75.0, 0.1, 0.4, -0.1]) cold_times.append(time.time() - start_time) # 预热 for _ in range(5): engine.predict([0.5, 0.3, -0.2, 75.0, 0.1, 0.4, -0.1]) # 测试预热后的性能 warm_times = [] for _ in range(10): start_time = time.time() engine.predict([0.5, 0.3, -0.2, 75.0, 0.1, 0.4, -0.1]) warm_times.append(time.time() - start_time) avg_cold_time = np.mean(cold_times) * 1000 avg_warm_time = np.mean(warm_times) * 1000 improvement = (avg_cold_time - avg_warm_time) / avg_cold_time * 100 print(f" 冷启动平均时间: {avg_cold_time:.2f}ms") print(f" 预热后平均时间: {avg_warm_time:.2f}ms") print(f" 性能提升: {improvement:.1f}%") # 3. 完整基准测试 print("\n 3. 完整基准测试:") benchmark_stats = engine.benchmark(num_samples=500, batch_size=32) print(f" 总样本数: {benchmark_stats['total_samples']}") print(f" 总时间: {benchmark_stats['total_time']:.3f}s") print(f" 吞吐量: {benchmark_stats['throughput']:.2f} 样本/秒") print(f" 平均延迟: {benchmark_stats['avg_latency']:.2f}ms") print(f" P95延迟: {benchmark_stats['p95_latency']:.2f}ms") print(f" P99延迟: {benchmark_stats['p99_latency']:.2f}ms") # 保存性能测试结果 performance_results = { 'batch_performance': batch_performance, 'warmup_performance': { 'cold_avg_time_ms': avg_cold_time, 'warm_avg_time_ms': avg_warm_time, 'improvement_percent': improvement }, 'benchmark_stats': benchmark_stats } performance_path = output_dir / 'performance_optimization.json' with open(performance_path, 'w', encoding='utf-8') as f: json.dump(performance_results, f, indent=2, ensure_ascii=False) print(f"\n - 性能优化结果已保存到: {performance_path}") def demonstrate_real_world_scenarios(engine: InferenceEngine, output_dir: Path): """演示实际应用场景""" print(" - 演示实际应用场景...") scenarios = [ { 'name': '健康管理应用', 'description': '监测用户的情绪和压力状态变化', 'samples': [ { 'situation': '早晨起床', 'input': [0.2, -0.3, 0.1, 60.0, 0.1, -0.2, 0.0], 'context': '用户刚起床,活力中等' }, { 'situation': '工作压力', 'input': [-0.2, 0.6, -0.1, 45.0, -0.4, 0.7, -0.2], 'context': '用户面临工作压力' }, { 'situation': '运动后', 'input': [0.6, 0.4, 0.3, 85.0, 0.7, 0.5, 0.4], 'context': '用户刚完成运动' } ] }, { 'name': '教育应用', 'description': '监测学生的学习状态和压力水平', 'samples': [ { 'situation': '专注学习', 'input': [0.3, 0.2, 0.4, 70.0, 0.4, 0.3, 0.5], 'context': '学生正在专注学习' }, { 'situation': '考试焦虑', 'input': [-0.4, 0.8, -0.3, 55.0, -0.5, 0.9, -0.4], 'context': '学生面临考试焦虑' }, { 'situation': '课后放松', 'input': [0.5, -0.2, 0.2, 75.0, 0.6, -0.1, 0.3], 'context': '学生课后放松状态' } ] }, { 'name': '智能家居', 'description': '根据用户情绪状态调整环境', 'samples': [ { 'situation': '回家放松', 'input': [0.4, -0.4, 0.2, 65.0, 0.5, -0.3, 0.3], 'context': '用户下班回家需要放松' }, { 'situation': '聚会准备', 'input': [0.7, 0.3, 0.5, 80.0, 0.8, 0.4, 0.6], 'context': '用户准备参加聚会' }, { 'situation': '睡前准备', 'input': [0.1, -0.6, 0.0, 40.0, 0.0, -0.7, -0.1], 'context': '用户准备睡觉' } ] } ] scenario_results = [] for scenario in scenarios: print(f"\n 场景: {scenario['name']}") print(f" 描述: {scenario['description']}") scenario_data = { 'name': scenario['name'], 'description': scenario['description'], 'samples': [] } for sample in scenario['samples']: print(f"\n 情况: {sample['situation']}") print(f" 背景: {sample['context']}") print(f" 输入: User PAD=[{sample['input'][0]:.1f}, {sample['input'][1]:.1f}, {sample['input'][2]:.1f}], " f"Vitality={sample['input'][3]:.0f}, Current PAD=[{sample['input'][4]:.1f}, {sample['input'][5]:.1f}, {sample['input'][6]:.1f}]") # 预测 result = engine.predict(sample['input']) print(f" 预测结果:") print(f" ΔPAD: [{result['delta_pad'][0]:.3f}, {result['delta_pad'][1]:.3f}, {result['delta_pad'][2]:.3f}]") print(f" ΔPressure: {result['delta_pressure']:.3f}") print(f" Confidence: {result['confidence']:.3f}") # 应用建议 suggestions = generate_application_suggestions(scenario['name'], sample['situation'], result) print(f" 应用建议: {suggestions}") # 保存样本结果 sample_data = { 'situation': sample['situation'], 'context': sample['context'], 'input': sample['input'], 'prediction': result, 'suggestions': suggestions } scenario_data['samples'].append(sample_data) scenario_results.append(scenario_data) # 保存场景结果 scenarios_path = output_dir / 'real_world_scenarios.json' with open(scenarios_path, 'w', encoding='utf-8') as f: json.dump(scenario_results, f, indent=2, ensure_ascii=False) print(f"\n - 实际应用场景结果已保存到: {scenarios_path}") def generate_application_suggestions(scenario_name: str, situation: str, result: Dict[str, Any]) -> str: """根据场景和预测结果生成应用建议""" delta_pad = result['delta_pad'] delta_pressure = result['delta_pressure'] confidence = result['confidence'] suggestions = [] if scenario_name == '健康管理应用': if delta_pressure > 0.05: suggestions.append("建议进行放松练习,如深呼吸或冥想") elif delta_pressure < -0.05: suggestions.append("压力水平良好,继续保持当前状态") if delta_pad[1] > 0.1: suggestions.append("激活度较高,建议适当休息") elif delta_pad[1] < -0.1: ("激活度较低,建议进行轻度运动") elif scenario_name == '教育应用': if delta_pressure > 0.08: suggestions.append("学习压力较大,建议安排休息时间") elif delta_pad[0] < -0.1: suggestions.append("情绪偏消极,建议进行积极引导") elif delta_pad[1] > 0.15: suggestions.append("激活度过高,可能影响专注力") elif scenario_name == '智能家居': if delta_pad[0] > 0.1: suggestions.append("情绪积极,可以播放欢快音乐") elif delta_pad[0] < -0.1: suggestions.append("情绪消极,建议调节灯光和音乐") elif delta_pad[1] < -0.2: suggestions.append("激活度低,建议调暗灯光准备休息") elif delta_pad[1] > 0.2: suggestions.append("激活度高,适合社交活动") # 基于置信度的建议 if confidence < 0.6: suggestions.append("预测置信度较低,建议收集更多数据") if not suggestions: suggestions.append("状态稳定,保持当前环境设置") return ";".join(suggestions) def interpret_prediction(result: Dict[str, Any]) -> str: """简单解释预测结果""" delta_pad = result['delta_pad'] delta_pressure = result['delta_pressure'] confidence = result['confidence'] interpretations = [] # 主要变化趋势 if abs(delta_pad[0]) > 0.05: if delta_pad[0] > 0: interpretations.append("情绪趋向积极") else: interpretations.append("情绪趋向消极") if abs(delta_pressure) > 0.03: if delta_pressure > 0: interpretations.append("压力增加") else: interpretations.append("压力缓解") if confidence > 0.8: interpretations.append("高置信度") elif confidence < 0.6: interpretations.append("低置信度") return ",".join(interpretations) if interpretations else "状态相对稳定" if __name__ == "__main__": main() suggestions.append