heboya8's picture
Upload folder using huggingface_hub
2eee82e verified
import os
import logging
import tensorflow as tf
import pandas as pd
import pyarrow.parquet as pq
from sklearn.preprocessing import MinMaxScaler
# Configure logging
logger = logging.getLogger(__name__)
def create_data_loader(parquet_paths: list, scaler: MinMaxScaler, seq_length: int, batch_size: int) -> tf.data.Dataset:
"""Create a tf.data.Dataset from Parquet files for LSTM training or evaluation.
Args:
parquet_paths (list): List of paths to Parquet files.
scaler (MinMaxScaler): Scaler fitted on the data.
seq_length (int): Length of input sequences.
batch_size (int): Batch size for the dataset.
Returns:
tf.data.Dataset: Dataset yielding (sequence, target) pairs with shapes (batch_size, seq_length, 1) and (batch_size, 1).
Raises:
ValueError: If inputs are invalid or no valid data is found.
"""
if not parquet_paths:
logger.error("No parquet paths provided")
raise ValueError("parquet_paths cannot be empty")
if not isinstance(scaler, MinMaxScaler):
logger.error("Invalid scaler provided")
raise ValueError("scaler must be an instance of MinMaxScaler")
if not isinstance(seq_length, int) or seq_length <= 0:
logger.error(f"Invalid seq_length: {seq_length}")
raise ValueError("seq_length must be a positive integer")
if not isinstance(batch_size, int) or batch_size <= 0:
logger.error(f"Invalid batch_size: {batch_size}")
raise ValueError("batch_size must be a positive integer")
total_sequences = 0
def _scaled_generator():
nonlocal total_sequences
for path in parquet_paths:
if not os.path.exists(path):
logger.warning(f"Parquet file not found, skipping: {path}")
continue
try:
file_size = os.path.getsize(path) / (1024 * 1024) # Size in MB
if file_size < 100: # Load small files into memory
df = pd.read_parquet(path, columns=['Close'])
logger.debug(f"Loaded {path} into memory, size: {file_size:.2f} MB")
if 'Close' not in df.columns or df['Close'].isna().any():
logger.warning(f"Invalid or missing 'Close' column in {path}")
continue
prices = df['Close'].astype('float32').values.reshape(-1, 1)
if prices.size <= seq_length:
logger.warning(f"File {path} has {prices.size} rows, insufficient for seq_length {seq_length}")
continue
scaled = scaler.transform(prices)
for j in range(len(scaled) - seq_length):
total_sequences += 1
yield scaled[j:j + seq_length], scaled[j + seq_length]
else:
parquet_file = pq.ParquetFile(path)
for batch in parquet_file.iter_batches(batch_size=10_000, columns=['Close']):
chunk = batch.to_pandas()
if 'Close' not in chunk.columns or chunk['Close'].isna().any():
logger.warning(f"Invalid or missing 'Close' column in {path}")
continue
prices = chunk['Close'].astype('float32').values.reshape(-1, 1)
scaled = scaler.transform(prices)
logger.debug(f"Processing batch from {path}, scaled shape: {scaled.shape}")
for j in range(len(scaled) - seq_length):
total_sequences += 1
yield scaled[j:j + seq_length], scaled[j + seq_length]
except Exception as e:
logger.error(f"Error processing parquet file {path}: {e}")
continue
if total_sequences == 0:
logger.error("No valid sequences generated from any Parquet file")
raise ValueError("No valid sequences generated from any Parquet file")
dataset = tf.data.Dataset.from_generator(
_scaled_generator,
output_types=(tf.float32, tf.float32),
output_shapes=((seq_length, 1), (1,))
).batch(batch_size).prefetch(tf.data.AUTOTUNE)
logger.info(f"Created data loader with seq_length={seq_length}, batch_size={batch_size}, total_sequences={total_sequences}")
return dataset