import numpy as np import pandas as pd import torch from torch import nn, optim from torch.utils.data import DataLoader, TensorDataset from sklearn.preprocessing import MinMaxScaler from sklearn.metrics import ( mean_squared_error, mean_absolute_error, r2_score, precision_score, recall_score, ) from sklearn.decomposition import PCA from sklearn.ensemble import RandomForestRegressor import logging import torch.optim.lr_scheduler as lr_scheduler from io import StringIO import sys try: from torchsummary import summary except Exception: summary = None logging.basicConfig( level=logging.DEBUG, filename="/tmp/app_log.txt", filemode="a", format="%(asctime)s - %(levelname)s - %(message)s", ) # ---------------- Utility metrics ---------------- def mean_absolute_percentage_error(y_true, y_pred): y_true, y_pred = np.array(y_true), np.array(y_pred) non_zero = np.abs(y_true) > 0 if np.sum(non_zero) == 0: logging.warning("All true values are zero in MAPE calculation") return np.nan return np.mean(np.abs((y_true[non_zero] - y_pred[non_zero]) / y_true[non_zero])) * 100 def directional_accuracy(y_true, y_pred): true_diff = np.diff(y_true) pred_diff = np.diff(y_pred) if len(true_diff) == 0: logging.warning("Insufficient data for directional accuracy") return np.nan return np.mean(np.sign(true_diff) == np.sign(pred_diff)) def mase(y_true, y_pred, y_train): mae_val = mean_absolute_error(y_true, y_pred) naive_mae = mean_absolute_error(y_train[1:], y_train[:-1]) if len(y_train) > 1 else np.nan if naive_mae == 0: logging.warning("Naive MAE is zero in MASE calculation") return np.nan return mae_val / naive_mae def compute_volatility(y_pred): returns = np.diff(y_pred) / y_pred[:-1] if len(returns) == 0: logging.warning("Insufficient data for volatility calculation") return np.nan return np.std(returns) * np.sqrt(252) def compute_sharpe_ratio(y_pred, risk_free_rate=0.01): returns = np.diff(y_pred) / y_pred[:-1] if len(returns) == 0: logging.warning("Insufficient data for Sharpe ratio calculation") return np.nan mean_return = np.mean(returns) std_return = np.std(returns) if std_return == 0: logging.warning("Standard deviation of returns is zero in Sharpe ratio") return np.nan return (mean_return - risk_free_rate) / std_return def compute_precision_recall(y_true, y_pred): true_diff = np.sign(np.diff(y_true)) pred_diff = np.sign(np.diff(y_pred)) if len(true_diff) == 0: logging.warning("Insufficient data for precision/recall calculation") return np.nan, np.nan precision = precision_score(true_diff > 0, pred_diff > 0, zero_division=0) recall = recall_score(true_diff > 0, pred_diff > 0, zero_division=0) return precision, recall # ---------------- Feature selection ---------------- def select_features(df, features, target, selector_method, importance_threshold): logging.info( f"Selecting features with method: {selector_method}, threshold: {importance_threshold}" ) if selector_method == "RandomForest": try: X = df[features].dropna() y = df[target].loc[X.index] rf = RandomForestRegressor(n_estimators=100, random_state=42) rf.fit(X, y) importances = pd.Series(rf.feature_importances_, index=features) selected_features = importances[importances >= importance_threshold].index.tolist() logging.debug(f"RandomForest selected features: {selected_features}, importances: {importances.to_dict()}") return selected_features if selected_features else features except Exception as e: logging.error(f"RandomForest feature selection failed: {str(e)}") return features elif selector_method == "PCA": try: X = df[features].dropna() scaler = MinMaxScaler() X_scaled = scaler.fit_transform(X) n_components = min(len(features), X_scaled.shape[0], 10) pca = PCA(n_components=n_components) pca.fit(X_scaled) explained_variance_ratio = pca.explained_variance_ratio_.cumsum() n_selected = sum(explained_variance_ratio < 0.95) + 1 if any(explained_variance_ratio < 0.95) else n_components selected_features = features[:n_selected] logging.debug(f"PCA selected features: {selected_features}, explained variance: {explained_variance_ratio.tolist()}") return selected_features if selected_features else features except Exception as e: logging.error(f"PCA feature selection failed: {str(e)}") return features else: logging.warning(f"Unsupported selector_method: {selector_method}, using all features") return features def train_and_evaluate( df, features, target, model_cls, horizon=1, hidden=64, layers=1, epochs=50, lr=0.001, beta1=0.9, beta2=0.999, weight_decay=0.01, dropout=0.2, window=30, test_split=0.2, selector_method="RandomForest", importance_threshold=0.0, scheduler_type="None", device='cpu', verbose=True ): try: logging.info(f"Starting train_and_evaluate: model={model_cls.__name__}, features={len(features)}, window={window}, horizon={horizon}, scheduler={scheduler_type}, selector_method={selector_method}") from .data import preprocess_data selected_features = select_features(df, features, target, selector_method, importance_threshold) logging.info(f"Selected features: {selected_features}") X, y, feature_scaler, target_scaler, updated_feature_cols, target_idx = preprocess_data(df, selected_features, target, window, horizon) logging.debug(f"Preprocess: type(X)={type(X)}, example={X if isinstance(X, tuple) else X.shape}, type(y)={type(y)}, example={y if isinstance(y, tuple) else y.shape}") if X.shape[0] < 10: logging.error(f"Insufficient data samples: {X.shape[0]}") return {"error": f"Insufficient data samples: {X.shape[0]}"} train_size = int((1 - test_split) * len(X)) X_train, X_test = X[:train_size], X[train_size:] y_train, y_test = y[:train_size], y[train_size:] logging.debug(f"Train size: {len(X_train)}, Test size: {len(X_test)}") logging.debug(f"X_train type: {type(X_train)}, shape: {X_train.shape if isinstance(X_train, np.ndarray) else 'not ndarray'}") logging.debug(f"X_test type: {type(X_test)}, shape: {X_test.shape if isinstance(X_test, np.ndarray) else 'not ndarray'}") train_dataset = TensorDataset(torch.tensor(X_train, dtype=torch.float32).to(device), torch.tensor(y_train, dtype=torch.float32).to(device)) test_dataset = TensorDataset(torch.tensor(X_test, dtype=torch.float32).to(device), torch.tensor(y_test, dtype=torch.float32).to(device)) train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True) test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False) # Debug DataLoader output for batch_X, batch_y in train_loader: logging.debug(f"DataLoader train batch: X_type={type(batch_X)}, X_shape={batch_X.shape}, y_type={type(batch_y)}, y_shape={batch_y.shape}") break for batch_X, batch_y in test_loader: logging.debug(f"DataLoader test batch: X_type={type(batch_X)}, X_shape={batch_X.shape}, y_type={type(batch_y)}, y_shape={batch_y.shape}") break input_size = X.shape[2] model = model_cls(input_size=input_size, hidden_size=hidden, num_layers=layers, output_size=horizon, dropout=dropout).to(device) logging.debug(f"Model initialized: {model_cls.__name__}, input_size={input_size}, hidden={hidden}, layers={layers}") # if verbose and summary: # try: # output = StringIO() # sys.stdout = output # summary(model, input_size=(window, input_size)) # sys.stdout = sys.__stdout__ # logging.debug(f"Model summary:\n{output.getvalue()}") # except Exception as e: # logging.warning(f"Failed to generate model summary: {str(e)}") optimizer = optim.Adam(model.parameters(), lr=lr, betas=(beta1, beta2), weight_decay=weight_decay) criterion = nn.MSELoss() scheduler = None if scheduler_type == "ReduceLROnPlateau": scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=10, verbose=verbose) logging.debug("Initialized ReduceLROnPlateau scheduler") elif scheduler_type != "None": logging.warning(f"Unsupported scheduler type: {scheduler_type}, using None") train_losses = [] val_losses = [] for epoch in range(epochs): model.train() train_loss = 0.0 for batch_X, batch_y in train_loader: logging.debug(f"Training Batch_X type: {type(batch_X)}, shape: {batch_X.shape}") logging.debug(f"Training Batch_Y type: {type(batch_y)}, shape: {batch_y.shape}") optimizer.zero_grad() logging.debug(f"Training input to model: type={type(batch_X)}, example={batch_X if isinstance(batch_X, tuple) else batch_X.shape}") try: outputs = model(batch_X) logging.debug(f"Training model output shape: {outputs.shape}") except Exception as e: logging.error(f"Training model forward error: {str(e)}, batch_X_type={type(batch_X)}, batch_X_shape={batch_X.shape}") raise loss = criterion(outputs, batch_y) loss.backward() optimizer.step() train_loss += loss.item() * batch_X.size(0) train_loss /= len(train_loader.dataset) train_losses.append(train_loss) model.eval() val_loss = 0.0 with torch.no_grad(): for batch_X, batch_y in test_loader: logging.debug(f"Validation Batch_X type: {type(batch_X)}, shape: {batch_X.shape}") logging.debug(f"Validation Batch_Y type: {type(batch_y)}, shape: {batch_y.shape}") logging.debug(f"Validation input to model: type={type(batch_X)}, example={batch_X if isinstance(batch_X, tuple) else batch_X.shape}") try: outputs = model(batch_X) logging.debug(f"Validation model output shape: {outputs.shape}") except Exception as e: logging.error(f"Validation model forward error: {str(e)}, batch_X_type={type(batch_X)}, batch_X_shape={batch_X.shape}") raise loss = criterion(outputs, batch_y) val_loss += loss.item() * batch_X.size(0) val_loss /= len(test_loader.dataset) val_losses.append(val_loss) if scheduler: scheduler.step(val_loss) current_lr = optimizer.param_groups[0]['lr'] logging.debug(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.6f}, Val Loss: {val_loss:.6f}, LR: {current_lr}") else: logging.debug(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.6f}, Val Loss: {val_loss:.6f}") # ---------------- Evaluation ---------------- model.eval() with torch.no_grad(): X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device) logging.debug(f"Eval model call: type={type(X_test_tensor)}, example={X_test_tensor if isinstance(X_test_tensor, tuple) else X_test_tensor.shape}") try: y_pred_scaled = model(X_test_tensor).cpu().numpy() logging.debug(f"Eval model output shape: {y_pred_scaled.shape}") except Exception as e: logging.error(f"Eval model forward error: {str(e)}, X_test_type={type(X_test_tensor)}, X_test_shape={X_test_tensor.shape}") raise y_test_unscaled = target_scaler.inverse_transform(y_test.reshape(-1, horizon)).flatten() y_pred_unscaled = target_scaler.inverse_transform(y_pred_scaled.reshape(-1, horizon)).flatten() precision, recall = compute_precision_recall(y_test_unscaled, y_pred_unscaled) metrics = { "R2": float(r2_score(y_test_unscaled, y_pred_unscaled)), "MAPE": float(mean_absolute_percentage_error(y_test_unscaled, y_pred_unscaled)), "RMSE": float(np.sqrt(mean_squared_error(y_test_unscaled, y_pred_unscaled))), "MAE": float(mean_absolute_error(y_test_unscaled, y_pred_unscaled)), "DirAcc": float(directional_accuracy(y_test_unscaled, y_pred_unscaled)), "MASE": float( mase( y_test_unscaled, y_pred_unscaled, target_scaler.inverse_transform(y_train.reshape(-1, horizon)).flatten(), ) ), "Volatility": float(compute_volatility(y_pred_unscaled)), "Sharpe": float(compute_sharpe_ratio(y_pred_unscaled)), "Precision": float(np.nan if np.isnan(precision) else precision), "Recall": float(np.nan if np.isnan(recall) else recall), } # Latest prediction (use last window from original X) latest_data = torch.tensor(X[-1:], dtype=torch.float32).to(device) with torch.no_grad(): logging.debug(f"Latest prediction input: type={type(latest_data)}, shape={latest_data.shape}") latest_prediction_scaled = model(latest_data).cpu().numpy() latest_prediction = target_scaler.inverse_transform( latest_prediction_scaled.reshape(-1, horizon) ).flatten() result = { "model": model, "train_loss": train_losses, "val_loss": val_losses, "metrics": metrics, "actual": y_test_unscaled, "forecast": y_pred_unscaled, "latest_prediction": latest_prediction, "arch": { "input_size": input_size, "hidden": hidden, "layers": layers, "dropout": dropout, "window": window, }, "scalers": {"feature_scaler": feature_scaler, "target_scaler": target_scaler}, "features": updated_feature_cols, } logging.info("Training and evaluation completed successfully") return result except Exception as e: logging.error(f"Error in train_and_evaluate: {str(e)}") return {"error": str(e)}