Chordia / src /data /gpu_preload_loader.py
Corolin's picture
first commit
0a6452f
"""
GPU预加载数据加载器
GPU Preloaded Data Loader - 优化小数据集训练速度
通过一次性将所有数据加载到GPU,消除每个batch的CPU-GPU传输开销。
适用于可以完全放入GPU显存的小数据集。
"""
import torch
import numpy as np
import pandas as pd
from typing import Union, Tuple, Optional, Dict, Any
from pathlib import Path
from loguru import logger
class GPUPreloadDataLoader:
"""
GPU预加载数据加载器
将所有数据一次性加载到GPU显存中,在GPU上进行切片操作,
避免每个batch的CPU-GPU传输开销。
优点:
- 消除数据传输瓶颈,训练速度提升1-5%(取决于数据类型)
- GPU上的tensor切片操作非常快
- 简化了训练循环
缺点:
- 占用更多GPU显存
- 不支持数据增强
- 不适合大数据集
适用场景:
- 小数据集(能完全放入GPU显存)
- 表格数据(CSV等结构化数据)
- 不需要复杂数据预处理的场景
"""
def __init__(
self,
data: Union[str, Path, np.ndarray, pd.DataFrame],
batch_size: int = 4096,
shuffle: bool = True,
device: Optional[torch.device] = None,
normalize_features: bool = True,
normalize_labels: bool = False,
input_dim: Optional[int] = None,
output_dim: Optional[int] = None,
feature_cols: Optional[Union[slice, list]] = None,
label_cols: Optional[Union[slice, list]] = None,
feature_names: Optional[list] = None,
label_names: Optional[list] = None
):
"""
初始化GPU预加载数据加载器
Args:
data: 数据路径或数组
batch_size: 批次大小(可以设置更大,如4096/8192)
shuffle: 是否在每个epoch开始时打乱数据
device: 目标设备(默认使用cuda如果可用)
normalize_features: 是否标准化特征
normalize_labels: 是否标准化标签
input_dim: 输入特征维度(如果提供,会自动确定特征列范围)
output_dim: 输出标签维度(如果提供,会自动确定标签列范围)
feature_cols: 特征列的切片范围或列名列表
label_cols: 标签列的切片范围或列名列表(推荐使用列名列表)
feature_names: 特征列名列表(用于从CSV中选择列)
label_names: 标签列名列表(用于从CSV中选择列)
"""
self.batch_size = batch_size
self.shuffle = shuffle
self.normalize_features = normalize_features
self.normalize_labels = normalize_labels
# 确定设备和列范围
if device is None:
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
else:
self.device = device
# 优先使用列名(更安全)
if feature_names is not None and label_names is not None:
self.feature_cols = feature_names
self.label_cols = label_names
self.use_column_names = True
elif feature_cols is not None and label_cols is not None:
self.feature_cols = feature_cols
self.label_cols = label_cols
self.use_column_names = isinstance(feature_cols, list) and isinstance(label_cols, list)
elif input_dim is not None and output_dim is not None:
# 根据input_dim和output_dim自动确定列范围
# 假设格式:前input_dim列是特征,最后output_dim列是标签
# 注意:这种假设可能不安全,推荐使用列名
self.feature_cols = slice(0, input_dim)
self.label_cols = slice(-output_dim, None)
self.use_column_names = False
else:
# 默认:最后一列是标签,其余是特征
self.feature_cols = slice(0, -1)
self.label_cols = slice(-1, None)
self.use_column_names = False
# 加载和预处理数据
features, labels = self._load_and_preprocess_data(data)
# 转换为GPU上的tensor
self.features = torch.FloatTensor(features).to(self.device)
self.labels = torch.FloatTensor(labels).to(self.device)
self.num_samples = self.features.size(0)
self.num_batches = (self.num_samples + self.batch_size - 1) // self.batch_size
logger.info(f"GPU预加载数据加载器初始化完成:")
logger.info(f" 样本数: {self.num_samples}")
logger.info(f" 特征维度: {self.features.size(1)}")
logger.info(f" 标签维度: {self.labels.size(1)}")
logger.info(f" 批次大小: {self.batch_size}")
logger.info(f" 批次数: {self.num_batches}")
logger.info(f" 设备: {self.device}")
logger.info(f" 显存占用: {self.features.element_size() * self.features.nelement() / 1024**2:.2f} MB (特征) + "
f"{self.labels.element_size() * self.labels.nelement() / 1024**2:.2f} MB (标签)")
def _load_and_preprocess_data(
self,
data: Union[str, Path, np.ndarray, pd.DataFrame]
) -> Tuple[np.ndarray, np.ndarray]:
"""
加载和预处理数据
Args:
data: 数据路径或数组
Returns:
特征数组和标签数组
"""
# 加载数据
if isinstance(data, (str, Path)):
# 从文件加载
df = pd.read_csv(data)
elif isinstance(data, pd.DataFrame):
df = data
elif isinstance(data, np.ndarray):
# numpy数组直接使用切片
data_array = data
features = data_array[:, self.feature_cols]
labels = data_array[:, self.label_cols]
# 确保标签是2D数组
if labels.ndim == 1:
labels = labels.reshape(-1, 1)
logger.info(f"数据分割: 特征列 {self.feature_cols}, 标签列 {self.label_cols}")
logger.info(f"特征形状: {features.shape}, 标签形状: {labels.shape}")
return features, labels
else:
raise ValueError(f"不支持的数据类型: {type(data)}")
# 如果是DataFrame,根据是否使用列名来选择列
if self.use_column_names:
# 使用列名选择(更安全)
features = df[self.feature_cols].values
labels = df[self.label_cols].values
logger.info(f"使用列名选择: 特征列 {self.feature_cols}, 标签列 {self.label_cols}")
else:
# 使用列索引切片
data_array = df.values
features = data_array[:, self.feature_cols]
labels = data_array[:, self.label_cols]
logger.info(f"使用索引切片: 特征列 {self.feature_cols}, 标签列 {self.label_cols}")
# 确保标签是2D数组
if labels.ndim == 1:
labels = labels.reshape(-1, 1)
logger.info(f"特征形状: {features.shape}, 标签形状: {labels.shape}")
# 标准化
if self.normalize_features:
features = self._normalize(features, fit=True)
if self.normalize_labels:
labels = self._normalize(labels, fit=True)
return features, labels
def _normalize(
self,
data: np.ndarray,
fit: bool = True
) -> np.ndarray:
"""
标准化数据
Args:
data: 数据数组
fit: 是否拟合标准化参数
Returns:
标准化后的数据
"""
if fit:
# 计算均值和标准差
self.mean = np.mean(data, axis=0)
self.std = np.std(data, axis=0)
# 避免除零
self.std[self.std < 1e-8] = 1.0
return (data - self.mean) / self.std
def __iter__(self):
"""
创建迭代器
Returns:
迭代器对象
"""
self.current_batch = 0
# 生成索引
if self.shuffle:
# 在GPU上生成随机索引
self.indices = torch.randperm(self.num_samples, device=self.device)
else:
self.indices = torch.arange(self.num_samples, device=self.device)
return self
def __next__(self) -> Tuple[torch.Tensor, torch.Tensor]:
"""
获取下一个batch
Returns:
(特征, 标签) 元组
Raises:
StopIteration: 当迭代完成时
"""
if self.current_batch >= self.num_batches:
raise StopIteration
# 计算当前batch的索引范围
start_idx = self.current_batch * self.batch_size
end_idx = min(start_idx + self.batch_size, self.num_samples)
# 获取当前batch的索引
batch_indices = self.indices[start_idx:end_idx]
# 在GPU上进行切片操作
batch_features = self.features[batch_indices]
batch_labels = self.labels[batch_indices]
self.current_batch += 1
return batch_features, batch_labels
def __len__(self) -> int:
"""
返回批次数
Returns:
批次数
"""
return self.num_batches
def to(self, device: torch.device) -> 'GPUPreloadDataLoader':
"""
将数据移动到指定设备
Args:
device: 目标设备
Returns:
self
"""
self.device = device
self.features = self.features.to(device)
self.labels = self.labels.to(device)
logger.info(f"数据已移动到设备: {device}")
return self
class GPUPreloadDataLoaderFactory:
"""
GPU预加载数据加载器工厂类
用于创建训练、验证和测试的GPU预加载数据加载器
"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
"""
初始化工厂
Args:
config: 配置字典
"""
self.config = config or {}
def create_train_loader(
self,
data_path: Union[str, Path],
input_dim: Optional[int] = None,
output_dim: Optional[int] = None,
**kwargs
) -> GPUPreloadDataLoader:
"""
创建训练数据加载器
Args:
data_path: 数据路径
input_dim: 输入特征维度
output_dim: 输出标签维度
**kwargs: 额外参数
Returns:
训练数据加载器
"""
config = {**self.config, **kwargs}
config['shuffle'] = True # 训练时打乱数据
# 明确指定列名(优先级高于 input_dim/output_dim)
default_feature_names = [
'user_pad_p', 'user_pad_a', 'user_pad_d',
'vitality',
'ai_current_pad_p', 'ai_current_pad_a', 'ai_current_pad_d'
]
default_label_names = ['ai_delta_p', 'ai_delta_a', 'ai_delta_d']
# 使用列名而不是索引切片(更安全)
config['feature_names'] = config.get('feature_names', default_feature_names)
config['label_names'] = config.get('label_names', default_label_names)
# 移除 input_dim 和 output_dim(不再使用)
config.pop('input_dim', None)
config.pop('output_dim', None)
return GPUPreloadDataLoader(
data=data_path,
**config
)
def create_val_loader(
self,
data_path: Union[str, Path],
input_dim: Optional[int] = None,
output_dim: Optional[int] = None,
**kwargs
) -> GPUPreloadDataLoader:
"""
创建验证数据加载器
Args:
data_path: 数据路径
input_dim: 输入特征维度
output_dim: 输出标签维度
**kwargs: 额外参数
Returns:
验证数据加载器
"""
config = {**self.config, **kwargs}
config['shuffle'] = False # 验证时不打乱数据
# 明确指定列名(优先级高于 input_dim/output_dim)
default_feature_names = [
'user_pad_p', 'user_pad_a', 'user_pad_d',
'vitality',
'ai_current_pad_p', 'ai_current_pad_a', 'ai_current_pad_d'
]
default_label_names = ['ai_delta_p', 'ai_delta_a', 'ai_delta_d']
# 使用列名而不是索引切片(更安全)
config['feature_names'] = config.get('feature_names', default_feature_names)
config['label_names'] = config.get('label_names', default_label_names)
# 移除 input_dim 和 output_dim(不再使用)
config.pop('input_dim', None)
config.pop('output_dim', None)
return GPUPreloadDataLoader(
data=data_path,
**config
)
def create_test_loader(
self,
data_path: Union[str, Path],
input_dim: Optional[int] = None,
output_dim: Optional[int] = None,
**kwargs
) -> GPUPreloadDataLoader:
"""
创建测试数据加载器
Args:
data_path: 数据路径
input_dim: 输入特征维度
output_dim: 输出标签维度
**kwargs: 额外参数
Returns:
测试数据加载器
"""
config = {**self.config, **kwargs}
config['shuffle'] = False # 测试时不打乱数据
# 明确指定列名(优先级高于 input_dim/output_dim)
default_feature_names = [
'user_pad_p', 'user_pad_a', 'user_pad_d',
'vitality',
'ai_current_pad_p', 'ai_current_pad_a', 'ai_current_pad_d'
]
default_label_names = ['ai_delta_p', 'ai_delta_a', 'ai_delta_d']
# 使用列名而不是索引切片(更安全)
config['feature_names'] = config.get('feature_names', default_feature_names)
config['label_names'] = config.get('label_names', default_label_names)
# 移除 input_dim 和 output_dim(不再使用)
config.pop('input_dim', None)
config.pop('output_dim', None)
return GPUPreloadDataLoader(
data=data_path,
**config
)
def create_gpu_preload_loader(
data_path: Union[str, Path],
batch_size: int = 4096,
shuffle: bool = True,
device: Optional[torch.device] = None,
**kwargs
) -> GPUPreloadDataLoader:
"""
创建GPU预加载数据加载器的便捷函数
Args:
data_path: 数据路径
batch_size: 批次大小
shuffle: 是否打乱数据
device: 目标设备
**kwargs: 其他参数
Returns:
GPU预加载数据加载器实例
"""
return GPUPreloadDataLoader(
data=data_path,
batch_size=batch_size,
shuffle=shuffle,
device=device,
**kwargs
)