heboya8's picture
Upload folder using huggingface_hub
2eee82e verified
import os
import logging
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import pickle
from datetime import datetime, timezone, timedelta
import tensorflow as tf
from tensorflow import keras
from sklearn.metrics import mean_squared_error, mean_absolute_error
from typing import Dict, List
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")))
from components.utils.file_utils import load_extract_config, get_parquet_file_names
from components.model.model_utils import build_model_from_config
from components.model.data_utils import create_data_loader
from components.utils.utils import parse_timezone
# Configure logging with +07:00 timezone
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S %Z'
)
logger = logging.getLogger(__name__)
def train_lstm_model(**kwargs) -> Dict:
"""Train an LSTM model for BTC/USDT forecasting and save model and scaler.
Args:
kwargs: Airflow task instance arguments.
Returns:
Dict: Training metadata including model path, scaler path, metrics, and dataset info.
"""
# Verify GPU availability
gpus = tf.config.list_physical_devices('GPU')
if not gpus:
logger.warning("No GPU detected. Training on CPU, which may be slower.")
else:
logger.info(f"GPUs detected: {len(gpus)}. Using CUDA for training.")
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
cfg = load_extract_config('model_config.yml')
model_cfg = cfg['model']
train_cfg = cfg['training']
data_cfg = cfg['data']
out_cfg = cfg['output']
ver_cfg = cfg['versioning']
# Parse timezone from YAML
tz_offset_str = ver_cfg['timezone'] # '+07:00'
tz = parse_timezone(tz_offset_str)
# Get current time in the specified timezone
dt = datetime.now(tz)
dt_str = dt.strftime(ver_cfg['datetime_format']) + f"-({dt.strftime('%z')[:3]})"
model_path = os.path.join(out_cfg['checkpoints']['model_dir'], f"model_{dt_str}.h5")
scaler_path = os.path.join(out_cfg['checkpoints']['scaler_dir'], f"scaler_{dt_str}.pkl")
parquet_folder = load_extract_config('pipeline_config.yml')['paths']['parquet_folder']
# Ensure output directories exist
os.makedirs(os.path.dirname(model_path), exist_ok=True)
os.makedirs(os.path.dirname(scaler_path), exist_ok=True)
# Load data
file_names = get_parquet_file_names()
parquet_paths = [os.path.join(parquet_folder, el) for el in file_names]
all_df = pd.DataFrame()
used_files = []
for path, name in zip(parquet_paths, file_names):
if os.path.exists(path):
df = pd.read_parquet(path)
logger.info(f"Loaded {path} with {len(df)} rows")
all_df = pd.concat([all_df, df], ignore_index=True)
clean_name = name.replace(".parquet", "").replace(".csv", "")
used_files.append(clean_name)
else:
logger.warning(f"File not found: {path}")
if all_df.empty:
logger.error("No data loaded from Parquet files")
raise ValueError("No data loaded from Parquet files")
dataset_merge = " + ".join(used_files) if used_files else "none"
logger.info(f"Dataset merged: {dataset_merge}, total rows: {len(all_df)}")
# Scale data
scaler = MinMaxScaler()
prices = all_df['Close'].astype(float).values.reshape(-1, 1)
if prices.size <= data_cfg['seq_length']:
logger.error(f"Total data size {prices.size} is insufficient for seq_length {data_cfg['seq_length']}")
raise ValueError(f"Total data size {prices.size} is insufficient for seq_length {data_cfg['seq_length']}")
prices_scaled = scaler.fit_transform(prices)
# Create dataset with smaller batch size
seq_length = data_cfg['seq_length']
batch_size = train_cfg.get('batch_size', 64) # Default to 64 if not specified
if batch_size > 8192:
logger.warning(f"Batch size {batch_size} is large; reducing to 64 to avoid memory issues")
batch_size = 64
dataset = create_data_loader(parquet_paths, scaler, seq_length, batch_size)
# Calculate exact number of sequences
total_seqs = 0
for path in parquet_paths:
if os.path.exists(path):
df = pd.read_parquet(path, columns=['Close'])
seqs = max(0, len(df) - seq_length)
total_seqs += seqs
logger.info(f"File {path}: {len(df)} rows, {seqs} sequences")
if total_seqs == 0:
logger.error("Not enough sequences for training")
raise ValueError("Not enough sequences for training")
# Calculate steps for training, validation, and test
steps_total = (total_seqs + batch_size - 1) // batch_size
train_ratio = data_cfg.get('train_ratio', 0.7)
val_ratio = data_cfg.get('val_ratio', 0.2)
steps_train = max(1, int(steps_total * train_ratio))
steps_val = max(1, int(steps_total * val_ratio))
steps_test = max(1, steps_total - steps_train - steps_val)
logger.info(f"Dataset splits: total_steps={steps_total}, train={steps_train}, val={steps_val}, test={steps_test}")
# Save scaler
with open(scaler_path, 'wb') as f:
pickle.dump(scaler, f)
train_ds = dataset.take(steps_train)
val_ds = dataset.skip(steps_train).take(steps_val)
test_ds = dataset.skip(steps_train + steps_val)
# Build and train model
model = build_model_from_config(seq_length, cfg)
checkpoint_cb = keras.callbacks.ModelCheckpoint(
model_path, save_best_only=True, monitor='val_loss', verbose=0
)
early_stop = keras.callbacks.EarlyStopping(
monitor='val_loss', patience=train_cfg['patience'], restore_best_weights=True
)
# Log model summary
model.summary(print_fn=lambda x: logger.info(x))
# Train with exact steps_per_epoch
model.fit(
train_ds,
epochs=train_cfg['epochs'],
steps_per_epoch=steps_train,
validation_data=val_ds,
validation_steps=steps_val,
callbacks=[checkpoint_cb, early_stop],
verbose=2
)
# # Test evaluation
# y_true, y_pred = [], []
# for X, y in test_ds:
# pred = model.predict(X, verbose=0)
# y_true.append(y.numpy())
# y_pred.append(pred)
# y_true = np.concatenate(y_true)
# y_pred = np.concatenate(y_pred)
# y_true_orig = scaler.inverse_transform(y_true)
# y_pred_orig = scaler.inverse_transform(y_pred)
# test_rmse = np.sqrt(mean_squared_error(y_true_orig, y_pred_orig))
# test_mae = mean_absolute_error(y_true_orig, y_pred_orig)
# logger.info(f"Test RMSE: {test_rmse:.4f}, MAE: {test_mae:.4f}")
return {
'model_path': model_path,
'model_filename': os.path.basename(model_path),
'scaler_path': scaler_path,
'datetime': dt_str,
'dataset_merge': dataset_merge,
# 'test_rmse': float(test_rmse),
# 'test_mae': float(test_mae)
}
if __name__ == "__main__":
logger.info("Running standalone training test")
result = train_lstm_model()
print("Training completed successfully!")
print(result)
logger.info("Standalone training run completed")