""" GPU预加载数据加载器 GPU Preloaded Data Loader - 优化小数据集训练速度 通过一次性将所有数据加载到GPU,消除每个batch的CPU-GPU传输开销。 适用于可以完全放入GPU显存的小数据集。 """ import torch import numpy as np import pandas as pd from typing import Union, Tuple, Optional, Dict, Any from pathlib import Path from loguru import logger class GPUPreloadDataLoader: """ GPU预加载数据加载器 将所有数据一次性加载到GPU显存中,在GPU上进行切片操作, 避免每个batch的CPU-GPU传输开销。 优点: - 消除数据传输瓶颈,训练速度提升1-5%(取决于数据类型) - GPU上的tensor切片操作非常快 - 简化了训练循环 缺点: - 占用更多GPU显存 - 不支持数据增强 - 不适合大数据集 适用场景: - 小数据集(能完全放入GPU显存) - 表格数据(CSV等结构化数据) - 不需要复杂数据预处理的场景 """ def __init__( self, data: Union[str, Path, np.ndarray, pd.DataFrame], batch_size: int = 4096, shuffle: bool = True, device: Optional[torch.device] = None, normalize_features: bool = True, normalize_labels: bool = False, input_dim: Optional[int] = None, output_dim: Optional[int] = None, feature_cols: Optional[Union[slice, list]] = None, label_cols: Optional[Union[slice, list]] = None, feature_names: Optional[list] = None, label_names: Optional[list] = None ): """ 初始化GPU预加载数据加载器 Args: data: 数据路径或数组 batch_size: 批次大小(可以设置更大,如4096/8192) shuffle: 是否在每个epoch开始时打乱数据 device: 目标设备(默认使用cuda如果可用) normalize_features: 是否标准化特征 normalize_labels: 是否标准化标签 input_dim: 输入特征维度(如果提供,会自动确定特征列范围) output_dim: 输出标签维度(如果提供,会自动确定标签列范围) feature_cols: 特征列的切片范围或列名列表 label_cols: 标签列的切片范围或列名列表(推荐使用列名列表) feature_names: 特征列名列表(用于从CSV中选择列) label_names: 标签列名列表(用于从CSV中选择列) """ self.batch_size = batch_size self.shuffle = shuffle self.normalize_features = normalize_features self.normalize_labels = normalize_labels # 确定设备和列范围 if device is None: self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') else: self.device = device # 优先使用列名(更安全) if feature_names is not None and label_names is not None: self.feature_cols = feature_names self.label_cols = label_names self.use_column_names = True elif feature_cols is not None and label_cols is not None: self.feature_cols = feature_cols self.label_cols = label_cols self.use_column_names = isinstance(feature_cols, list) and isinstance(label_cols, list) elif input_dim is not None and output_dim is not None: # 根据input_dim和output_dim自动确定列范围 # 假设格式:前input_dim列是特征,最后output_dim列是标签 # 注意:这种假设可能不安全,推荐使用列名 self.feature_cols = slice(0, input_dim) self.label_cols = slice(-output_dim, None) self.use_column_names = False else: # 默认:最后一列是标签,其余是特征 self.feature_cols = slice(0, -1) self.label_cols = slice(-1, None) self.use_column_names = False # 加载和预处理数据 features, labels = self._load_and_preprocess_data(data) # 转换为GPU上的tensor self.features = torch.FloatTensor(features).to(self.device) self.labels = torch.FloatTensor(labels).to(self.device) self.num_samples = self.features.size(0) self.num_batches = (self.num_samples + self.batch_size - 1) // self.batch_size logger.info(f"GPU预加载数据加载器初始化完成:") logger.info(f" 样本数: {self.num_samples}") logger.info(f" 特征维度: {self.features.size(1)}") logger.info(f" 标签维度: {self.labels.size(1)}") logger.info(f" 批次大小: {self.batch_size}") logger.info(f" 批次数: {self.num_batches}") logger.info(f" 设备: {self.device}") logger.info(f" 显存占用: {self.features.element_size() * self.features.nelement() / 1024**2:.2f} MB (特征) + " f"{self.labels.element_size() * self.labels.nelement() / 1024**2:.2f} MB (标签)") def _load_and_preprocess_data( self, data: Union[str, Path, np.ndarray, pd.DataFrame] ) -> Tuple[np.ndarray, np.ndarray]: """ 加载和预处理数据 Args: data: 数据路径或数组 Returns: 特征数组和标签数组 """ # 加载数据 if isinstance(data, (str, Path)): # 从文件加载 df = pd.read_csv(data) elif isinstance(data, pd.DataFrame): df = data elif isinstance(data, np.ndarray): # numpy数组直接使用切片 data_array = data features = data_array[:, self.feature_cols] labels = data_array[:, self.label_cols] # 确保标签是2D数组 if labels.ndim == 1: labels = labels.reshape(-1, 1) logger.info(f"数据分割: 特征列 {self.feature_cols}, 标签列 {self.label_cols}") logger.info(f"特征形状: {features.shape}, 标签形状: {labels.shape}") return features, labels else: raise ValueError(f"不支持的数据类型: {type(data)}") # 如果是DataFrame,根据是否使用列名来选择列 if self.use_column_names: # 使用列名选择(更安全) features = df[self.feature_cols].values labels = df[self.label_cols].values logger.info(f"使用列名选择: 特征列 {self.feature_cols}, 标签列 {self.label_cols}") else: # 使用列索引切片 data_array = df.values features = data_array[:, self.feature_cols] labels = data_array[:, self.label_cols] logger.info(f"使用索引切片: 特征列 {self.feature_cols}, 标签列 {self.label_cols}") # 确保标签是2D数组 if labels.ndim == 1: labels = labels.reshape(-1, 1) logger.info(f"特征形状: {features.shape}, 标签形状: {labels.shape}") # 标准化 if self.normalize_features: features = self._normalize(features, fit=True) if self.normalize_labels: labels = self._normalize(labels, fit=True) return features, labels def _normalize( self, data: np.ndarray, fit: bool = True ) -> np.ndarray: """ 标准化数据 Args: data: 数据数组 fit: 是否拟合标准化参数 Returns: 标准化后的数据 """ if fit: # 计算均值和标准差 self.mean = np.mean(data, axis=0) self.std = np.std(data, axis=0) # 避免除零 self.std[self.std < 1e-8] = 1.0 return (data - self.mean) / self.std def __iter__(self): """ 创建迭代器 Returns: 迭代器对象 """ self.current_batch = 0 # 生成索引 if self.shuffle: # 在GPU上生成随机索引 self.indices = torch.randperm(self.num_samples, device=self.device) else: self.indices = torch.arange(self.num_samples, device=self.device) return self def __next__(self) -> Tuple[torch.Tensor, torch.Tensor]: """ 获取下一个batch Returns: (特征, 标签) 元组 Raises: StopIteration: 当迭代完成时 """ if self.current_batch >= self.num_batches: raise StopIteration # 计算当前batch的索引范围 start_idx = self.current_batch * self.batch_size end_idx = min(start_idx + self.batch_size, self.num_samples) # 获取当前batch的索引 batch_indices = self.indices[start_idx:end_idx] # 在GPU上进行切片操作 batch_features = self.features[batch_indices] batch_labels = self.labels[batch_indices] self.current_batch += 1 return batch_features, batch_labels def __len__(self) -> int: """ 返回批次数 Returns: 批次数 """ return self.num_batches def to(self, device: torch.device) -> 'GPUPreloadDataLoader': """ 将数据移动到指定设备 Args: device: 目标设备 Returns: self """ self.device = device self.features = self.features.to(device) self.labels = self.labels.to(device) logger.info(f"数据已移动到设备: {device}") return self class GPUPreloadDataLoaderFactory: """ GPU预加载数据加载器工厂类 用于创建训练、验证和测试的GPU预加载数据加载器 """ def __init__(self, config: Optional[Dict[str, Any]] = None): """ 初始化工厂 Args: config: 配置字典 """ self.config = config or {} def create_train_loader( self, data_path: Union[str, Path], input_dim: Optional[int] = None, output_dim: Optional[int] = None, **kwargs ) -> GPUPreloadDataLoader: """ 创建训练数据加载器 Args: data_path: 数据路径 input_dim: 输入特征维度 output_dim: 输出标签维度 **kwargs: 额外参数 Returns: 训练数据加载器 """ config = {**self.config, **kwargs} config['shuffle'] = True # 训练时打乱数据 # 明确指定列名(优先级高于 input_dim/output_dim) default_feature_names = [ 'user_pad_p', 'user_pad_a', 'user_pad_d', 'vitality', 'ai_current_pad_p', 'ai_current_pad_a', 'ai_current_pad_d' ] default_label_names = ['ai_delta_p', 'ai_delta_a', 'ai_delta_d'] # 使用列名而不是索引切片(更安全) config['feature_names'] = config.get('feature_names', default_feature_names) config['label_names'] = config.get('label_names', default_label_names) # 移除 input_dim 和 output_dim(不再使用) config.pop('input_dim', None) config.pop('output_dim', None) return GPUPreloadDataLoader( data=data_path, **config ) def create_val_loader( self, data_path: Union[str, Path], input_dim: Optional[int] = None, output_dim: Optional[int] = None, **kwargs ) -> GPUPreloadDataLoader: """ 创建验证数据加载器 Args: data_path: 数据路径 input_dim: 输入特征维度 output_dim: 输出标签维度 **kwargs: 额外参数 Returns: 验证数据加载器 """ config = {**self.config, **kwargs} config['shuffle'] = False # 验证时不打乱数据 # 明确指定列名(优先级高于 input_dim/output_dim) default_feature_names = [ 'user_pad_p', 'user_pad_a', 'user_pad_d', 'vitality', 'ai_current_pad_p', 'ai_current_pad_a', 'ai_current_pad_d' ] default_label_names = ['ai_delta_p', 'ai_delta_a', 'ai_delta_d'] # 使用列名而不是索引切片(更安全) config['feature_names'] = config.get('feature_names', default_feature_names) config['label_names'] = config.get('label_names', default_label_names) # 移除 input_dim 和 output_dim(不再使用) config.pop('input_dim', None) config.pop('output_dim', None) return GPUPreloadDataLoader( data=data_path, **config ) def create_test_loader( self, data_path: Union[str, Path], input_dim: Optional[int] = None, output_dim: Optional[int] = None, **kwargs ) -> GPUPreloadDataLoader: """ 创建测试数据加载器 Args: data_path: 数据路径 input_dim: 输入特征维度 output_dim: 输出标签维度 **kwargs: 额外参数 Returns: 测试数据加载器 """ config = {**self.config, **kwargs} config['shuffle'] = False # 测试时不打乱数据 # 明确指定列名(优先级高于 input_dim/output_dim) default_feature_names = [ 'user_pad_p', 'user_pad_a', 'user_pad_d', 'vitality', 'ai_current_pad_p', 'ai_current_pad_a', 'ai_current_pad_d' ] default_label_names = ['ai_delta_p', 'ai_delta_a', 'ai_delta_d'] # 使用列名而不是索引切片(更安全) config['feature_names'] = config.get('feature_names', default_feature_names) config['label_names'] = config.get('label_names', default_label_names) # 移除 input_dim 和 output_dim(不再使用) config.pop('input_dim', None) config.pop('output_dim', None) return GPUPreloadDataLoader( data=data_path, **config ) def create_gpu_preload_loader( data_path: Union[str, Path], batch_size: int = 4096, shuffle: bool = True, device: Optional[torch.device] = None, **kwargs ) -> GPUPreloadDataLoader: """ 创建GPU预加载数据加载器的便捷函数 Args: data_path: 数据路径 batch_size: 批次大小 shuffle: 是否打乱数据 device: 目标设备 **kwargs: 其他参数 Returns: GPU预加载数据加载器实例 """ return GPUPreloadDataLoader( data=data_path, batch_size=batch_size, shuffle=shuffle, device=device, **kwargs )