|
|
""" |
|
|
GPU预加载数据加载器 |
|
|
GPU Preloaded Data Loader - 优化小数据集训练速度 |
|
|
|
|
|
通过一次性将所有数据加载到GPU,消除每个batch的CPU-GPU传输开销。 |
|
|
适用于可以完全放入GPU显存的小数据集。 |
|
|
""" |
|
|
|
|
|
import torch |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from typing import Union, Tuple, Optional, Dict, Any |
|
|
from pathlib import Path |
|
|
from loguru import logger |
|
|
|
|
|
|
|
|
class GPUPreloadDataLoader: |
|
|
""" |
|
|
GPU预加载数据加载器 |
|
|
|
|
|
将所有数据一次性加载到GPU显存中,在GPU上进行切片操作, |
|
|
避免每个batch的CPU-GPU传输开销。 |
|
|
|
|
|
优点: |
|
|
- 消除数据传输瓶颈,训练速度提升1-5%(取决于数据类型) |
|
|
- GPU上的tensor切片操作非常快 |
|
|
- 简化了训练循环 |
|
|
|
|
|
缺点: |
|
|
- 占用更多GPU显存 |
|
|
- 不支持数据增强 |
|
|
- 不适合大数据集 |
|
|
|
|
|
适用场景: |
|
|
- 小数据集(能完全放入GPU显存) |
|
|
- 表格数据(CSV等结构化数据) |
|
|
- 不需要复杂数据预处理的场景 |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
data: Union[str, Path, np.ndarray, pd.DataFrame], |
|
|
batch_size: int = 4096, |
|
|
shuffle: bool = True, |
|
|
device: Optional[torch.device] = None, |
|
|
normalize_features: bool = True, |
|
|
normalize_labels: bool = False, |
|
|
input_dim: Optional[int] = None, |
|
|
output_dim: Optional[int] = None, |
|
|
feature_cols: Optional[Union[slice, list]] = None, |
|
|
label_cols: Optional[Union[slice, list]] = None, |
|
|
feature_names: Optional[list] = None, |
|
|
label_names: Optional[list] = None |
|
|
): |
|
|
""" |
|
|
初始化GPU预加载数据加载器 |
|
|
|
|
|
Args: |
|
|
data: 数据路径或数组 |
|
|
batch_size: 批次大小(可以设置更大,如4096/8192) |
|
|
shuffle: 是否在每个epoch开始时打乱数据 |
|
|
device: 目标设备(默认使用cuda如果可用) |
|
|
normalize_features: 是否标准化特征 |
|
|
normalize_labels: 是否标准化标签 |
|
|
input_dim: 输入特征维度(如果提供,会自动确定特征列范围) |
|
|
output_dim: 输出标签维度(如果提供,会自动确定标签列范围) |
|
|
feature_cols: 特征列的切片范围或列名列表 |
|
|
label_cols: 标签列的切片范围或列名列表(推荐使用列名列表) |
|
|
feature_names: 特征列名列表(用于从CSV中选择列) |
|
|
label_names: 标签列名列表(用于从CSV中选择列) |
|
|
""" |
|
|
self.batch_size = batch_size |
|
|
self.shuffle = shuffle |
|
|
self.normalize_features = normalize_features |
|
|
self.normalize_labels = normalize_labels |
|
|
|
|
|
|
|
|
if device is None: |
|
|
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
|
|
else: |
|
|
self.device = device |
|
|
|
|
|
|
|
|
if feature_names is not None and label_names is not None: |
|
|
self.feature_cols = feature_names |
|
|
self.label_cols = label_names |
|
|
self.use_column_names = True |
|
|
elif feature_cols is not None and label_cols is not None: |
|
|
self.feature_cols = feature_cols |
|
|
self.label_cols = label_cols |
|
|
self.use_column_names = isinstance(feature_cols, list) and isinstance(label_cols, list) |
|
|
elif input_dim is not None and output_dim is not None: |
|
|
|
|
|
|
|
|
|
|
|
self.feature_cols = slice(0, input_dim) |
|
|
self.label_cols = slice(-output_dim, None) |
|
|
self.use_column_names = False |
|
|
else: |
|
|
|
|
|
self.feature_cols = slice(0, -1) |
|
|
self.label_cols = slice(-1, None) |
|
|
self.use_column_names = False |
|
|
|
|
|
|
|
|
features, labels = self._load_and_preprocess_data(data) |
|
|
|
|
|
|
|
|
self.features = torch.FloatTensor(features).to(self.device) |
|
|
self.labels = torch.FloatTensor(labels).to(self.device) |
|
|
|
|
|
self.num_samples = self.features.size(0) |
|
|
self.num_batches = (self.num_samples + self.batch_size - 1) // self.batch_size |
|
|
|
|
|
logger.info(f"GPU预加载数据加载器初始化完成:") |
|
|
logger.info(f" 样本数: {self.num_samples}") |
|
|
logger.info(f" 特征维度: {self.features.size(1)}") |
|
|
logger.info(f" 标签维度: {self.labels.size(1)}") |
|
|
logger.info(f" 批次大小: {self.batch_size}") |
|
|
logger.info(f" 批次数: {self.num_batches}") |
|
|
logger.info(f" 设备: {self.device}") |
|
|
logger.info(f" 显存占用: {self.features.element_size() * self.features.nelement() / 1024**2:.2f} MB (特征) + " |
|
|
f"{self.labels.element_size() * self.labels.nelement() / 1024**2:.2f} MB (标签)") |
|
|
|
|
|
def _load_and_preprocess_data( |
|
|
self, |
|
|
data: Union[str, Path, np.ndarray, pd.DataFrame] |
|
|
) -> Tuple[np.ndarray, np.ndarray]: |
|
|
""" |
|
|
加载和预处理数据 |
|
|
|
|
|
Args: |
|
|
data: 数据路径或数组 |
|
|
|
|
|
Returns: |
|
|
特征数组和标签数组 |
|
|
""" |
|
|
|
|
|
if isinstance(data, (str, Path)): |
|
|
|
|
|
df = pd.read_csv(data) |
|
|
elif isinstance(data, pd.DataFrame): |
|
|
df = data |
|
|
elif isinstance(data, np.ndarray): |
|
|
|
|
|
data_array = data |
|
|
features = data_array[:, self.feature_cols] |
|
|
labels = data_array[:, self.label_cols] |
|
|
|
|
|
|
|
|
if labels.ndim == 1: |
|
|
labels = labels.reshape(-1, 1) |
|
|
|
|
|
logger.info(f"数据分割: 特征列 {self.feature_cols}, 标签列 {self.label_cols}") |
|
|
logger.info(f"特征形状: {features.shape}, 标签形状: {labels.shape}") |
|
|
|
|
|
return features, labels |
|
|
else: |
|
|
raise ValueError(f"不支持的数据类型: {type(data)}") |
|
|
|
|
|
|
|
|
if self.use_column_names: |
|
|
|
|
|
features = df[self.feature_cols].values |
|
|
labels = df[self.label_cols].values |
|
|
logger.info(f"使用列名选择: 特征列 {self.feature_cols}, 标签列 {self.label_cols}") |
|
|
else: |
|
|
|
|
|
data_array = df.values |
|
|
features = data_array[:, self.feature_cols] |
|
|
labels = data_array[:, self.label_cols] |
|
|
logger.info(f"使用索引切片: 特征列 {self.feature_cols}, 标签列 {self.label_cols}") |
|
|
|
|
|
|
|
|
if labels.ndim == 1: |
|
|
labels = labels.reshape(-1, 1) |
|
|
|
|
|
logger.info(f"特征形状: {features.shape}, 标签形状: {labels.shape}") |
|
|
|
|
|
|
|
|
if self.normalize_features: |
|
|
features = self._normalize(features, fit=True) |
|
|
|
|
|
if self.normalize_labels: |
|
|
labels = self._normalize(labels, fit=True) |
|
|
|
|
|
return features, labels |
|
|
|
|
|
def _normalize( |
|
|
self, |
|
|
data: np.ndarray, |
|
|
fit: bool = True |
|
|
) -> np.ndarray: |
|
|
""" |
|
|
标准化数据 |
|
|
|
|
|
Args: |
|
|
data: 数据数组 |
|
|
fit: 是否拟合标准化参数 |
|
|
|
|
|
Returns: |
|
|
标准化后的数据 |
|
|
""" |
|
|
if fit: |
|
|
|
|
|
self.mean = np.mean(data, axis=0) |
|
|
self.std = np.std(data, axis=0) |
|
|
|
|
|
self.std[self.std < 1e-8] = 1.0 |
|
|
|
|
|
return (data - self.mean) / self.std |
|
|
|
|
|
def __iter__(self): |
|
|
""" |
|
|
创建迭代器 |
|
|
|
|
|
Returns: |
|
|
迭代器对象 |
|
|
""" |
|
|
self.current_batch = 0 |
|
|
|
|
|
|
|
|
if self.shuffle: |
|
|
|
|
|
self.indices = torch.randperm(self.num_samples, device=self.device) |
|
|
else: |
|
|
self.indices = torch.arange(self.num_samples, device=self.device) |
|
|
|
|
|
return self |
|
|
|
|
|
def __next__(self) -> Tuple[torch.Tensor, torch.Tensor]: |
|
|
""" |
|
|
获取下一个batch |
|
|
|
|
|
Returns: |
|
|
(特征, 标签) 元组 |
|
|
|
|
|
Raises: |
|
|
StopIteration: 当迭代完成时 |
|
|
""" |
|
|
if self.current_batch >= self.num_batches: |
|
|
raise StopIteration |
|
|
|
|
|
|
|
|
start_idx = self.current_batch * self.batch_size |
|
|
end_idx = min(start_idx + self.batch_size, self.num_samples) |
|
|
|
|
|
|
|
|
batch_indices = self.indices[start_idx:end_idx] |
|
|
|
|
|
|
|
|
batch_features = self.features[batch_indices] |
|
|
batch_labels = self.labels[batch_indices] |
|
|
|
|
|
self.current_batch += 1 |
|
|
|
|
|
return batch_features, batch_labels |
|
|
|
|
|
def __len__(self) -> int: |
|
|
""" |
|
|
返回批次数 |
|
|
|
|
|
Returns: |
|
|
批次数 |
|
|
""" |
|
|
return self.num_batches |
|
|
|
|
|
def to(self, device: torch.device) -> 'GPUPreloadDataLoader': |
|
|
""" |
|
|
将数据移动到指定设备 |
|
|
|
|
|
Args: |
|
|
device: 目标设备 |
|
|
|
|
|
Returns: |
|
|
self |
|
|
""" |
|
|
self.device = device |
|
|
self.features = self.features.to(device) |
|
|
self.labels = self.labels.to(device) |
|
|
logger.info(f"数据已移动到设备: {device}") |
|
|
return self |
|
|
|
|
|
|
|
|
class GPUPreloadDataLoaderFactory: |
|
|
""" |
|
|
GPU预加载数据加载器工厂类 |
|
|
|
|
|
用于创建训练、验证和测试的GPU预加载数据加载器 |
|
|
""" |
|
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None): |
|
|
""" |
|
|
初始化工厂 |
|
|
|
|
|
Args: |
|
|
config: 配置字典 |
|
|
""" |
|
|
self.config = config or {} |
|
|
|
|
|
def create_train_loader( |
|
|
self, |
|
|
data_path: Union[str, Path], |
|
|
input_dim: Optional[int] = None, |
|
|
output_dim: Optional[int] = None, |
|
|
**kwargs |
|
|
) -> GPUPreloadDataLoader: |
|
|
""" |
|
|
创建训练数据加载器 |
|
|
|
|
|
Args: |
|
|
data_path: 数据路径 |
|
|
input_dim: 输入特征维度 |
|
|
output_dim: 输出标签维度 |
|
|
**kwargs: 额外参数 |
|
|
|
|
|
Returns: |
|
|
训练数据加载器 |
|
|
""" |
|
|
config = {**self.config, **kwargs} |
|
|
config['shuffle'] = True |
|
|
|
|
|
|
|
|
default_feature_names = [ |
|
|
'user_pad_p', 'user_pad_a', 'user_pad_d', |
|
|
'vitality', |
|
|
'ai_current_pad_p', 'ai_current_pad_a', 'ai_current_pad_d' |
|
|
] |
|
|
default_label_names = ['ai_delta_p', 'ai_delta_a', 'ai_delta_d'] |
|
|
|
|
|
|
|
|
config['feature_names'] = config.get('feature_names', default_feature_names) |
|
|
config['label_names'] = config.get('label_names', default_label_names) |
|
|
|
|
|
|
|
|
config.pop('input_dim', None) |
|
|
config.pop('output_dim', None) |
|
|
|
|
|
return GPUPreloadDataLoader( |
|
|
data=data_path, |
|
|
**config |
|
|
) |
|
|
|
|
|
def create_val_loader( |
|
|
self, |
|
|
data_path: Union[str, Path], |
|
|
input_dim: Optional[int] = None, |
|
|
output_dim: Optional[int] = None, |
|
|
**kwargs |
|
|
) -> GPUPreloadDataLoader: |
|
|
""" |
|
|
创建验证数据加载器 |
|
|
|
|
|
Args: |
|
|
data_path: 数据路径 |
|
|
input_dim: 输入特征维度 |
|
|
output_dim: 输出标签维度 |
|
|
**kwargs: 额外参数 |
|
|
|
|
|
Returns: |
|
|
验证数据加载器 |
|
|
""" |
|
|
config = {**self.config, **kwargs} |
|
|
config['shuffle'] = False |
|
|
|
|
|
|
|
|
default_feature_names = [ |
|
|
'user_pad_p', 'user_pad_a', 'user_pad_d', |
|
|
'vitality', |
|
|
'ai_current_pad_p', 'ai_current_pad_a', 'ai_current_pad_d' |
|
|
] |
|
|
default_label_names = ['ai_delta_p', 'ai_delta_a', 'ai_delta_d'] |
|
|
|
|
|
|
|
|
config['feature_names'] = config.get('feature_names', default_feature_names) |
|
|
config['label_names'] = config.get('label_names', default_label_names) |
|
|
|
|
|
|
|
|
config.pop('input_dim', None) |
|
|
config.pop('output_dim', None) |
|
|
|
|
|
return GPUPreloadDataLoader( |
|
|
data=data_path, |
|
|
**config |
|
|
) |
|
|
|
|
|
def create_test_loader( |
|
|
self, |
|
|
data_path: Union[str, Path], |
|
|
input_dim: Optional[int] = None, |
|
|
output_dim: Optional[int] = None, |
|
|
**kwargs |
|
|
) -> GPUPreloadDataLoader: |
|
|
""" |
|
|
创建测试数据加载器 |
|
|
|
|
|
Args: |
|
|
data_path: 数据路径 |
|
|
input_dim: 输入特征维度 |
|
|
output_dim: 输出标签维度 |
|
|
**kwargs: 额外参数 |
|
|
|
|
|
Returns: |
|
|
测试数据加载器 |
|
|
""" |
|
|
config = {**self.config, **kwargs} |
|
|
config['shuffle'] = False |
|
|
|
|
|
|
|
|
default_feature_names = [ |
|
|
'user_pad_p', 'user_pad_a', 'user_pad_d', |
|
|
'vitality', |
|
|
'ai_current_pad_p', 'ai_current_pad_a', 'ai_current_pad_d' |
|
|
] |
|
|
default_label_names = ['ai_delta_p', 'ai_delta_a', 'ai_delta_d'] |
|
|
|
|
|
|
|
|
config['feature_names'] = config.get('feature_names', default_feature_names) |
|
|
config['label_names'] = config.get('label_names', default_label_names) |
|
|
|
|
|
|
|
|
config.pop('input_dim', None) |
|
|
config.pop('output_dim', None) |
|
|
|
|
|
return GPUPreloadDataLoader( |
|
|
data=data_path, |
|
|
**config |
|
|
) |
|
|
|
|
|
|
|
|
def create_gpu_preload_loader( |
|
|
data_path: Union[str, Path], |
|
|
batch_size: int = 4096, |
|
|
shuffle: bool = True, |
|
|
device: Optional[torch.device] = None, |
|
|
**kwargs |
|
|
) -> GPUPreloadDataLoader: |
|
|
""" |
|
|
创建GPU预加载数据加载器的便捷函数 |
|
|
|
|
|
Args: |
|
|
data_path: 数据路径 |
|
|
batch_size: 批次大小 |
|
|
shuffle: 是否打乱数据 |
|
|
device: 目标设备 |
|
|
**kwargs: 其他参数 |
|
|
|
|
|
Returns: |
|
|
GPU预加载数据加载器实例 |
|
|
""" |
|
|
return GPUPreloadDataLoader( |
|
|
data=data_path, |
|
|
batch_size=batch_size, |
|
|
shuffle=shuffle, |
|
|
device=device, |
|
|
**kwargs |
|
|
) |
|
|
|