|
|
""" |
|
|
推理引擎实现 |
|
|
Inference Engine for emotion and physiological state prediction |
|
|
|
|
|
该模块实现了高效的推理引擎,支持模型加载、数据预处理、推理执行和结果后处理。 |
|
|
""" |
|
|
|
|
|
import torch |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from typing import Union, Dict, Any, Optional, List, Tuple |
|
|
from pathlib import Path |
|
|
import time |
|
|
import json |
|
|
import logging |
|
|
from dataclasses import dataclass |
|
|
|
|
|
from ..models.pad_predictor import PADPredictor |
|
|
from ..data.preprocessor import DataPreprocessor |
|
|
from .mc_dropout import MCDropoutConfidence, create_mc_dropout_confidence |
|
|
from .stress_calculator import StressCalculator, get_default_calculator |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class InferenceResult: |
|
|
"""推理结果数据类""" |
|
|
delta_pad: np.ndarray |
|
|
delta_pressure: np.ndarray |
|
|
confidence: np.ndarray |
|
|
raw_output: np.ndarray |
|
|
inference_time: float |
|
|
batch_size: int |
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
|
"""转换为字典格式""" |
|
|
return { |
|
|
'delta_pad': self.delta_pad.tolist(), |
|
|
'delta_pressure': self.delta_pressure.tolist(), |
|
|
'confidence': self.confidence.tolist(), |
|
|
'raw_output': self.raw_output.tolist(), |
|
|
'inference_time': self.inference_time, |
|
|
'batch_size': self.batch_size |
|
|
} |
|
|
|
|
|
def to_dataframe(self) -> pd.DataFrame: |
|
|
"""转换为DataFrame格式""" |
|
|
data = { |
|
|
'delta_pleasure': self.delta_pad[:, 0] if self.delta_pad.ndim > 1 else [self.delta_pad[0]], |
|
|
'delta_arousal': self.delta_pad[:, 1] if self.delta_pad.ndim > 1 else [self.delta_pad[1]], |
|
|
'delta_dominance': self.delta_pad[:, 2] if self.delta_pad.ndim > 1 else [self.delta_pad[2]], |
|
|
'delta_pressure': self.delta_pressure.flatten(), |
|
|
'confidence': self.confidence.flatten() |
|
|
} |
|
|
return pd.DataFrame(data) |
|
|
|
|
|
|
|
|
class InferenceEngine: |
|
|
""" |
|
|
推理引擎类 |
|
|
|
|
|
功能: |
|
|
- 模型加载和管理 |
|
|
- 数据预处理和验证 |
|
|
- 高效推理执行 |
|
|
- 结果后处理和格式化 |
|
|
- GPU/CPU自动检测和优化 |
|
|
""" |
|
|
|
|
|
def __init__(self, |
|
|
model_path: Optional[str] = None, |
|
|
preprocessor_path: Optional[str] = None, |
|
|
device: Optional[str] = None, |
|
|
config: Optional[Dict[str, Any]] = None): |
|
|
""" |
|
|
初始化推理引擎 |
|
|
|
|
|
Args: |
|
|
model_path: 模型文件路径 |
|
|
preprocessor_path: 预处理器文件路径 |
|
|
device: 计算设备 ('auto', 'cpu', 'cuda') |
|
|
config: 配置字典 |
|
|
""" |
|
|
self.config = config or self._get_default_config() |
|
|
self.logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
self.device = self._setup_device(device) |
|
|
|
|
|
|
|
|
self.model = None |
|
|
self.preprocessor = None |
|
|
|
|
|
|
|
|
self.inference_stats = { |
|
|
'total_inferences': 0, |
|
|
'total_time': 0.0, |
|
|
'avg_time': 0.0, |
|
|
'min_time': float('inf'), |
|
|
'max_time': 0.0 |
|
|
} |
|
|
|
|
|
|
|
|
if model_path: |
|
|
self.load_model(model_path) |
|
|
if preprocessor_path: |
|
|
self.load_preprocessor(preprocessor_path) |
|
|
|
|
|
|
|
|
self.mc_dropout_calculator = None |
|
|
if self.model is not None: |
|
|
self._setup_mc_dropout() |
|
|
|
|
|
|
|
|
self.stress_calculator = get_default_calculator() |
|
|
|
|
|
self.logger.info(f"InferenceEngine initialized on device: {self.device}") |
|
|
|
|
|
def _get_default_config(self) -> Dict[str, Any]: |
|
|
"""获取默认配置""" |
|
|
return { |
|
|
'inference': { |
|
|
'batch_size': 32, |
|
|
'max_batch_size': 1024, |
|
|
'precision': 'float32', |
|
|
'enable_profiling': False |
|
|
}, |
|
|
'mc_dropout': { |
|
|
'enabled': False, |
|
|
'n_samples': 30 |
|
|
}, |
|
|
'validation': { |
|
|
'check_input_ranges': True, |
|
|
'strict_mode': False, |
|
|
'pad_ranges': {'min': -1.0, 'max': 1.0}, |
|
|
'vitality_ranges': {'min': 0.0, 'max': 100.0} |
|
|
}, |
|
|
'output': { |
|
|
'include_confidence': True, |
|
|
'include_raw_output': False, |
|
|
'round_decimals': 6 |
|
|
} |
|
|
} |
|
|
|
|
|
def _setup_device(self, device: Optional[str]) -> str: |
|
|
"""设置计算设备""" |
|
|
if device == 'auto' or device is None: |
|
|
if torch.cuda.is_available(): |
|
|
device = 'cuda' |
|
|
self.logger.info(f"GPU detected: {torch.cuda.get_device_name()}") |
|
|
else: |
|
|
device = 'cpu' |
|
|
self.logger.info("GPU not available, using CPU") |
|
|
|
|
|
|
|
|
if device == 'cuda' and not torch.cuda.is_available(): |
|
|
self.logger.warning("CUDA requested but not available, falling back to CPU") |
|
|
device = 'cpu' |
|
|
|
|
|
return device |
|
|
|
|
|
def load_model(self, model_path: str) -> None: |
|
|
""" |
|
|
加载训练好的模型 |
|
|
|
|
|
Args: |
|
|
model_path: 模型文件路径 |
|
|
""" |
|
|
try: |
|
|
self.model = PADPredictor.load_model(model_path, self.device) |
|
|
self.model.eval() |
|
|
self.logger.info(f"Model loaded from {model_path}") |
|
|
|
|
|
|
|
|
model_info = self.model.get_model_info() |
|
|
self.logger.info(f"Model info: {model_info}") |
|
|
|
|
|
|
|
|
self._setup_mc_dropout() |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Failed to load model from {model_path}: {e}") |
|
|
raise |
|
|
|
|
|
def load_preprocessor(self, preprocessor_path: str) -> None: |
|
|
""" |
|
|
加载预处理器 |
|
|
|
|
|
Args: |
|
|
preprocessor_path: 预处理器文件路径 |
|
|
""" |
|
|
try: |
|
|
self.preprocessor = DataPreprocessor.load_preprocessor(preprocessor_path) |
|
|
self.logger.info(f"Preprocessor loaded from {preprocessor_path}") |
|
|
except Exception as e: |
|
|
self.logger.error(f"Failed to load preprocessor from {preprocessor_path}: {e}") |
|
|
raise |
|
|
|
|
|
def _setup_mc_dropout(self) -> None: |
|
|
""" |
|
|
设置MC Dropout置信度计算器 |
|
|
""" |
|
|
mc_config = self.config.get('mc_dropout', {}) |
|
|
|
|
|
if mc_config.get('enabled', False): |
|
|
n_samples = mc_config.get('n_samples', 30) |
|
|
self.mc_dropout_calculator = create_mc_dropout_confidence( |
|
|
self.model, |
|
|
n_samples=n_samples |
|
|
) |
|
|
self.logger.info(f"MC Dropout置信度计算器已启用 (n_samples={n_samples})") |
|
|
else: |
|
|
self.mc_dropout_calculator = None |
|
|
|
|
|
def validate_input(self, data: Union[np.ndarray, pd.DataFrame, List]) -> np.ndarray: |
|
|
""" |
|
|
验证输入数据 |
|
|
|
|
|
Args: |
|
|
data: 输入数据 |
|
|
|
|
|
Returns: |
|
|
验证并增强后的numpy数组(10维) |
|
|
""" |
|
|
|
|
|
if isinstance(data, list): |
|
|
data = np.array(data) |
|
|
elif isinstance(data, pd.DataFrame): |
|
|
data = data.values |
|
|
|
|
|
|
|
|
if data.ndim == 1: |
|
|
data = data.reshape(1, -1) |
|
|
elif data.ndim > 2: |
|
|
raise ValueError(f"Input data should be 1D or 2D, got {data.ndim}D") |
|
|
|
|
|
|
|
|
if data.shape[1] != 7: |
|
|
raise ValueError(f"Expected 7 input features, got {data.shape[1]}") |
|
|
|
|
|
|
|
|
if self.config['validation']['check_input_ranges']: |
|
|
self._check_input_ranges(data) |
|
|
|
|
|
|
|
|
data = self._enhance_features(data) |
|
|
|
|
|
return data |
|
|
|
|
|
def _check_input_ranges(self, data: np.ndarray) -> None: |
|
|
"""检查输入数据范围""" |
|
|
validation_config = self.config['validation'] |
|
|
|
|
|
|
|
|
pad_ranges = validation_config['pad_ranges'] |
|
|
pad_features = np.concatenate([data[:, :3], data[:, 4:7]], axis=1) |
|
|
|
|
|
pad_out_of_range = np.sum((pad_features < pad_ranges['min'] - 0.5) | |
|
|
(pad_features > pad_ranges['max'] + 0.5)) |
|
|
|
|
|
if pad_out_of_range > 0: |
|
|
if validation_config['strict_mode']: |
|
|
raise ValueError(f"Found {pad_out_of_range} PAD values outside expected range") |
|
|
else: |
|
|
self.logger.warning(f"Found {pad_out_of_range} PAD values outside expected range") |
|
|
|
|
|
|
|
|
vitality_ranges = validation_config['vitality_ranges'] |
|
|
vitality_values = data[:, 3] |
|
|
|
|
|
vitality_out_of_range = np.sum((vitality_values < vitality_ranges['min'] - 10) | |
|
|
(vitality_values > vitality_ranges['max'] + 10)) |
|
|
|
|
|
if vitality_out_of_range > 0: |
|
|
if validation_config['strict_mode']: |
|
|
raise ValueError(f"Found {vitality_out_of_range} vitality values outside expected range") |
|
|
else: |
|
|
self.logger.warning(f"Found {vitality_out_of_range} vitality values outside expected range") |
|
|
|
|
|
def _enhance_features(self, data: np.ndarray) -> np.ndarray: |
|
|
""" |
|
|
特征增强:添加PAD差异特征 |
|
|
|
|
|
从原始7维特征扩展到10维特征: |
|
|
- 前7维:原始特征 (user_pad_p, user_pad_a, user_pad_d, vitality, ai_current_pad_p, ai_current_pad_a, ai_current_pad_d) |
|
|
- 后3维:PAD差异特征 (user_p - ai_p, user_a - ai_a, user_d - ai_d) |
|
|
|
|
|
Args: |
|
|
data: 原始7维特征数组 |
|
|
|
|
|
Returns: |
|
|
增强10维特征数组 |
|
|
""" |
|
|
enhanced = np.zeros((data.shape[0], 10), dtype=data.dtype) |
|
|
|
|
|
|
|
|
enhanced[:, :7] = data |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
enhanced[:, 7] = data[:, 0] - data[:, 4] |
|
|
enhanced[:, 8] = data[:, 1] - data[:, 5] |
|
|
enhanced[:, 9] = data[:, 2] - data[:, 6] |
|
|
|
|
|
return enhanced |
|
|
|
|
|
def preprocess_data(self, data: np.ndarray) -> torch.Tensor: |
|
|
""" |
|
|
预处理数据 |
|
|
|
|
|
Args: |
|
|
data: 输入数据 |
|
|
|
|
|
Returns: |
|
|
预处理后的torch张量 |
|
|
""" |
|
|
if self.preprocessor is not None: |
|
|
|
|
|
processed_data, _ = self.preprocessor.transform(data) |
|
|
else: |
|
|
|
|
|
processed_data = data.astype(np.float32) |
|
|
|
|
|
|
|
|
tensor_data = torch.FloatTensor(processed_data).to(self.device) |
|
|
|
|
|
return tensor_data |
|
|
|
|
|
def postprocess_output( |
|
|
self, |
|
|
output: torch.Tensor, |
|
|
confidence: Optional[np.ndarray] = None, |
|
|
mc_info: Optional[Dict[str, Any]] = None |
|
|
) -> InferenceResult: |
|
|
""" |
|
|
后处理输出 |
|
|
|
|
|
Args: |
|
|
output: 模型原始输出(3维:ΔPAD) |
|
|
confidence: 置信度值(可选,来自MC Dropout) |
|
|
mc_info: MC Dropout信息(可选) |
|
|
|
|
|
Returns: |
|
|
处理后的推理结果 |
|
|
""" |
|
|
|
|
|
output_np = output.detach().cpu().numpy() |
|
|
|
|
|
|
|
|
delta_pad = output_np |
|
|
|
|
|
|
|
|
delta_pressure = self.stress_calculator.compute_stress_change(delta_pad) |
|
|
|
|
|
|
|
|
if confidence is None: |
|
|
|
|
|
confidence = np.ones((len(output_np), 1), dtype=np.float32) |
|
|
|
|
|
|
|
|
decimals = self.config['output']['round_decimals'] |
|
|
delta_pad = np.round(delta_pad, decimals) |
|
|
delta_pressure = np.round(delta_pressure, decimals) |
|
|
confidence = np.round(confidence, decimals) |
|
|
|
|
|
return InferenceResult( |
|
|
delta_pad=delta_pad, |
|
|
delta_pressure=delta_pressure, |
|
|
confidence=confidence, |
|
|
raw_output=output_np if self.config['output']['include_raw_output'] else None, |
|
|
inference_time=0.0, |
|
|
batch_size=len(output_np) |
|
|
) |
|
|
|
|
|
def predict(self, |
|
|
data: Union[np.ndarray, pd.DataFrame, List], |
|
|
return_time: bool = True, |
|
|
use_mc_dropout: Optional[bool] = None) -> Union[InferenceResult, Tuple[InferenceResult, float]]: |
|
|
""" |
|
|
执行推理 |
|
|
|
|
|
Args: |
|
|
data: 输入数据 |
|
|
return_time: 是否返回推理时间 |
|
|
use_mc_dropout: 是否使用MC Dropout计算置信度(None表示使用配置中的设置) |
|
|
|
|
|
Returns: |
|
|
推理结果,可选包含推理时间 |
|
|
""" |
|
|
if self.model is None: |
|
|
raise ValueError("Model not loaded. Call load_model() first.") |
|
|
|
|
|
|
|
|
input_data = self.validate_input(data) |
|
|
|
|
|
|
|
|
start_time = time.time() |
|
|
processed_data = self.preprocess_data(input_data) |
|
|
|
|
|
|
|
|
if use_mc_dropout is None: |
|
|
use_mc_dropout = self.config.get('mc_dropout', {}).get('enabled', False) |
|
|
|
|
|
|
|
|
if use_mc_dropout and self.mc_dropout_calculator is not None: |
|
|
|
|
|
predictions, confidence, mc_info = self.mc_dropout_calculator.predict_with_confidence( |
|
|
processed_data |
|
|
) |
|
|
output = torch.from_numpy(predictions).to(self.device) |
|
|
else: |
|
|
|
|
|
with torch.no_grad(): |
|
|
output = self.model(processed_data) |
|
|
confidence = None |
|
|
mc_info = None |
|
|
|
|
|
inference_time = time.time() - start_time |
|
|
|
|
|
|
|
|
result = self.postprocess_output(output, confidence=confidence, mc_info=mc_info) |
|
|
result.inference_time = inference_time |
|
|
|
|
|
|
|
|
self._update_stats(inference_time) |
|
|
|
|
|
if return_time: |
|
|
return result, inference_time |
|
|
else: |
|
|
return result |
|
|
|
|
|
def predict_batch(self, |
|
|
data: Union[np.ndarray, pd.DataFrame, List], |
|
|
batch_size: Optional[int] = None) -> List[InferenceResult]: |
|
|
""" |
|
|
批量推理 |
|
|
|
|
|
Args: |
|
|
data: 输入数据 |
|
|
batch_size: 批次大小 |
|
|
|
|
|
Returns: |
|
|
推理结果列表 |
|
|
""" |
|
|
if self.model is None: |
|
|
raise ValueError("Model not loaded. Call load_model() first.") |
|
|
|
|
|
|
|
|
input_data = self.validate_input(data) |
|
|
|
|
|
|
|
|
if batch_size is None: |
|
|
batch_size = self.config['inference']['batch_size'] |
|
|
batch_size = min(batch_size, self.config['inference']['max_batch_size']) |
|
|
|
|
|
|
|
|
results = [] |
|
|
total_samples = len(input_data) |
|
|
|
|
|
for i in range(0, total_samples, batch_size): |
|
|
batch_data = input_data[i:i + batch_size] |
|
|
batch_result = self.predict(batch_data, return_time=False) |
|
|
results.append(batch_result) |
|
|
|
|
|
self.logger.info(f"Batch inference completed: {total_samples} samples in {len(results)} batches") |
|
|
return results |
|
|
|
|
|
def _update_stats(self, inference_time: float) -> None: |
|
|
"""更新性能统计""" |
|
|
self.inference_stats['total_inferences'] += 1 |
|
|
self.inference_stats['total_time'] += inference_time |
|
|
self.inference_stats['avg_time'] = ( |
|
|
self.inference_stats['total_time'] / self.inference_stats['total_inferences'] |
|
|
) |
|
|
self.inference_stats['min_time'] = min(self.inference_stats['min_time'], inference_time) |
|
|
self.inference_stats['max_time'] = max(self.inference_stats['max_time'], inference_time) |
|
|
|
|
|
def get_performance_stats(self) -> Dict[str, Any]: |
|
|
"""获取性能统计信息""" |
|
|
return self.inference_stats.copy() |
|
|
|
|
|
def reset_stats(self) -> None: |
|
|
"""重置性能统计""" |
|
|
self.inference_stats = { |
|
|
'total_inferences': 0, |
|
|
'total_time': 0.0, |
|
|
'avg_time': 0.0, |
|
|
'min_time': float('inf'), |
|
|
'max_time': 0.0 |
|
|
} |
|
|
|
|
|
def save_results(self, |
|
|
results: Union[InferenceResult, List[InferenceResult]], |
|
|
output_path: str, |
|
|
format: str = 'json') -> None: |
|
|
""" |
|
|
保存推理结果 |
|
|
|
|
|
Args: |
|
|
results: 推理结果 |
|
|
output_path: 输出路径 |
|
|
format: 输出格式 ('json', 'csv') |
|
|
""" |
|
|
if isinstance(results, InferenceResult): |
|
|
results = [results] |
|
|
|
|
|
if format.lower() == 'json': |
|
|
|
|
|
data = [result.to_dict() for result in results] |
|
|
with open(output_path, 'w', encoding='utf-8') as f: |
|
|
json.dump(data, f, indent=2, ensure_ascii=False) |
|
|
|
|
|
elif format.lower() == 'csv': |
|
|
|
|
|
all_data = [] |
|
|
for result in results: |
|
|
df = result.to_dataframe() |
|
|
df['inference_time'] = result.inference_time |
|
|
all_data.append(df) |
|
|
|
|
|
combined_df = pd.concat(all_data, ignore_index=True) |
|
|
combined_df.to_csv(output_path, index=False) |
|
|
|
|
|
else: |
|
|
raise ValueError(f"Unsupported format: {format}") |
|
|
|
|
|
self.logger.info(f"Results saved to {output_path}") |
|
|
|
|
|
def get_model_info(self) -> Dict[str, Any]: |
|
|
"""获取模型信息""" |
|
|
if self.model is None: |
|
|
return {"status": "No model loaded"} |
|
|
|
|
|
info = self.model.get_model_info() |
|
|
info.update({ |
|
|
'device': self.device, |
|
|
'preprocessor_loaded': self.preprocessor is not None, |
|
|
'performance_stats': self.get_performance_stats() |
|
|
}) |
|
|
return info |
|
|
|
|
|
def benchmark(self, |
|
|
num_samples: int = 1000, |
|
|
batch_size: int = 32) -> Dict[str, Any]: |
|
|
""" |
|
|
性能基准测试 |
|
|
|
|
|
Args: |
|
|
num_samples: 测试样本数量 |
|
|
batch_size: 批次大小 |
|
|
|
|
|
Returns: |
|
|
基准测试结果 |
|
|
""" |
|
|
if self.model is None: |
|
|
raise ValueError("Model not loaded. Call load_model() first.") |
|
|
|
|
|
|
|
|
test_data = np.random.randn(num_samples, 7).astype(np.float32) |
|
|
|
|
|
|
|
|
self.reset_stats() |
|
|
|
|
|
|
|
|
start_time = time.time() |
|
|
results = self.predict_batch(test_data, batch_size) |
|
|
total_time = time.time() - start_time |
|
|
|
|
|
|
|
|
stats = self.get_performance_stats() |
|
|
stats.update({ |
|
|
'total_samples': num_samples, |
|
|
'batch_size': batch_size, |
|
|
'total_time': total_time, |
|
|
'throughput': num_samples / total_time, |
|
|
'avg_latency': stats['avg_time'] * 1000, |
|
|
'p95_latency': np.percentile([r.inference_time for r in results], 95) * 1000, |
|
|
'p99_latency': np.percentile([r.inference_time for r in results], 99) * 1000 |
|
|
}) |
|
|
|
|
|
return stats |
|
|
|
|
|
|
|
|
def create_inference_engine(model_path: Optional[str] = None, |
|
|
preprocessor_path: Optional[str] = None, |
|
|
device: Optional[str] = None, |
|
|
config: Optional[Dict[str, Any]] = None) -> InferenceEngine: |
|
|
""" |
|
|
创建推理引擎的工厂函数 |
|
|
|
|
|
Args: |
|
|
model_path: 模型文件路径 |
|
|
preprocessor_path: 预处理器文件路径 |
|
|
device: 计算设备 |
|
|
config: 配置字典 |
|
|
|
|
|
Returns: |
|
|
推理引擎实例 |
|
|
""" |
|
|
return InferenceEngine(model_path, preprocessor_path, device, config) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
|
|
|
|
|
|
engine = InferenceEngine() |
|
|
|
|
|
|
|
|
test_data = np.random.randn(10, 7).astype(np.float32) |
|
|
|
|
|
print("推理引擎测试:") |
|
|
print(f"测试数据形状: {test_data.shape}") |
|
|
print(f"设备: {engine.device}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("推理引擎创建成功!") |