mlstocks / backend /app /model_builder /trainer.py.backup
github-actions[bot]
Deploy to Hugging Face Space
abf702c
import json
import numpy as np
import pandas as pd
from typing import List, Dict, Any, Tuple, Optional
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.models.database import HistoricalData, UserAIModel
from app.services.user_service import user_service
# Machine Learning Imports
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LinearRegression
from sklearn.svm import SVR
from sklearn.neural_network import MLPRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, r2_score
# Optional/Heavy Imports
try:
import xgboost as xgb
except ImportError:
xgb = None
try:
import torch
import torch.nn as nn
import torch.optim as optim
except ImportError:
torch = None
class ModelTrainer:
"""
Handles the orchestration of data fetching, preprocessing,
training, and evaluation of AI models.
"""
def __init__(self, db: AsyncSession, user_id: int):
self.db = db
self.user_id = user_id
self.logs = []
def _log(self, msg: str):
print(f"[ModelTrainer] {msg}")
self.logs.append(msg)
async def fetch_data(self, symbol: str) -> pd.DataFrame:
self._log(f"Step 1: Fetching historical data for {symbol}...")
result = await self.db.execute(
select(HistoricalData)
.where(HistoricalData.symbol == symbol)
.order_by(HistoricalData.date.asc())
)
rows = result.scalars().all()
if not rows:
self._log(f"⚠️ Data missing in primary vault for {symbol}. Triggering auto-sync via Yahoo Finance...")
try:
await user_service.sync_historical_data(self.db, symbol)
self._log(f"✅ Sync complete for {symbol}. Re-fetching records...")
result = await self.db.execute(
select(HistoricalData)
.where(HistoricalData.symbol == symbol)
.order_by(HistoricalData.date.asc())
)
rows = result.scalars().all()
except Exception as e:
self._log(f"❌ Auto-sync failed for {symbol}: {str(e)}")
if not rows:
self._log(f"CRITICAL: No historical data available for {symbol}. Abandoning training.")
return pd.DataFrame()
df_data = []
for r in rows:
indicators = json.loads(r.indicators) if r.indicators else {}
row_dict = {"close": r.close}
row_dict.update(indicators)
df_data.append(row_dict)
df = pd.DataFrame(df_data)
self._log(f"📊 Dataset Loaded. Total records: {len(df)}")
return df
def preprocess(self, df: pd.DataFrame, features: List[str]) -> Tuple[np.ndarray, np.ndarray, List[str]]:
self._log("=" * 60)
self._log("Step 2: Exploratory Data Analysis (EDA)")
self._log("=" * 60)
# EDA: Basic statistics
self._log(f"📊 Dataset shape: {df.shape[0]} rows × {df.shape[1]} columns")
self._log(f"📅 Date range: {df.index[0] if hasattr(df, 'index') else 'N/A'} to {df.index[-1] if hasattr(df, 'index') else 'N/A'}")
# Check for missing values
missing_count = df.isnull().sum().sum()
if missing_count > 0:
self._log(f"⚠️ Missing values detected: {missing_count} cells")
self._log(" Applying forward-fill interpolation...")
df = df.fillna(method='ffill').fillna(method='bfill')
else:
self._log("✅ No missing values detected")
# Outlier detection using IQR method
self._log("\n🔍 Outlier Detection (IQR Method):")
outliers_removed = 0
for col in ['close']:
if col in df.columns:
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outlier_mask = (df[col] < lower_bound) | (df[col] > upper_bound)
outlier_count = outlier_mask.sum()
if outlier_count > 0:
self._log(f" {col}: {outlier_count} outliers detected (bounds: {lower_bound:.2f} - {upper_bound:.2f})")
# Cap outliers instead of removing to preserve temporal sequence
df.loc[df[col] < lower_bound, col] = lower_bound
df.loc[df[col] > upper_bound, col] = upper_bound
outliers_removed += outlier_count
if outliers_removed > 0:
self._log(f" ✅ {outliers_removed} outliers capped to preserve data integrity")
else:
self._log(" ✅ No significant outliers detected")
self._log("\n" + "=" * 60)
self._log("Step 3: Feature Engineering")
self._log("=" * 60)
# Create target: Predict next day close
df = df.copy()
df['target'] = df['close'].shift(-1)
# Feature engineering: Add lag features and rolling statistics
self._log("🔧 Creating temporal features:")
df['close_lag_1'] = df['close'].shift(1)
df['close_lag_5'] = df['close'].shift(5)
df['rolling_mean_5'] = df['close'].rolling(window=5).mean()
df['rolling_std_5'] = df['close'].rolling(window=5).std()
df['price_change'] = df['close'].pct_change()
self._log(" - Lag features: close_lag_1, close_lag_5")
self._log(" - Rolling statistics: 5-day mean, 5-day std")
self._log(" - Price change percentage")
# Drop NaN values created by shifting and rolling
initial_rows = len(df)
df = df.dropna()
rows_dropped = initial_rows - len(df)
self._log(f" 📉 Dropped {rows_dropped} rows due to feature engineering (NaN values)")
# Select features
feature_cols = [f for f in features if f in df.columns]
# Add engineered features if not already in the list
engineered_features = ['close_lag_1', 'rolling_mean_5', 'price_change']
for feat in engineered_features:
if feat in df.columns and feat not in feature_cols:
feature_cols.append(feat)
if not feature_cols:
self._log("⚠️ Warning: Requested features not found in data. Falling back to price-only model.")
feature_cols = ['close']
X = df[feature_cols].values
y = df['target'].values
self._log(f"\n🧬 Final Feature Vector: {len(feature_cols)} dimensions")
self._log(f" Features: {', '.join(feature_cols)}")
self._log(f"📊 Training samples after preprocessing: {len(X)}")
self._log(f" Target variable: next-day closing price")
return X, y, feature_cols
async def train(self, target_symbol: str, model_type: str, features: List[str], test_size: float = 0.2):
split_idx = int(len(X) * (1 - test_size))
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
self._log(f"✅ Training set: {len(X_train)} samples")
self._log(f"✅ Validation set: {len(X_test)} samples")
self._log(f" Target mean (train): ${y_train.mean():.2f}")
self._log(f" Target std (train): ${y_train.std():.2f}")
# Scaling
scaler = None
needs_scaling = model_type in ['SVM', 'Neural Network (MLP)', 'LSTM', 'CNN 1D', 'Transformer', 'Linear Regression']
if needs_scaling:
self._log("\n⚖️ Applying Feature Normalization (StandardScaler):")
self._log(f" Reason: {model_type} is sensitive to feature scales")
scaler = StandardScaler()
X_train_before_mean = X_train.mean()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
self._log(f" ✅ Features normalized (mean ≈ 0, std ≈ 1)")
self._log(f" Before: mean={X_train_before_mean:.2f} | After: mean={X_train.mean():.4f}")
else:
self._log(f"\n⚖️ Skipping normalization ({model_type} is scale-invariant)")
self._log("\n" + "=" * 60)
self._log(f"Step 5: Model Training - {model_type}")
self._log("=" * 60)
# Training
model = None
if model_type == 'RandomForest':
self._log("🌲 Training Random Forest Regressor...")
self._log(" Hyperparameters: n_estimators=100, random_state=42")
model = RandomForestRegressor(n_estimators=100, random_state=42)
self._log(" Fitting ensemble of decision trees...")
model.fit(X_train, y_train)
self._log(" ✅ Training complete - 100 trees grown")
elif model_type == 'XGBoost':
if xgb:
self._log("🔥 Training XGBoost Gradient Boosting...")
self._log(" Hyperparameters: n_estimators=100, objective=reg:squarederror")
model = xgb.XGBRegressor(objective='reg:squarederror', n_estimators=100)
self._log(" Fitting boosted trees with gradient descent...")
model.fit(X_train, y_train)
self._log(" ✅ Training complete - 100 boosting rounds")
else:
self._log("⚠️ XGBoost not available, falling back to RandomForest")
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
elif model_type == 'Linear Regression':
self._log("📈 Training Linear Regression...")
self._log(" Method: Ordinary Least Squares (OLS)")
model = LinearRegression()
self._log(" Fitting linear model to training data...")
model.fit(X_train, y_train)
self._log(f" ✅ Training complete - {len(feature_cols)} coefficients learned")
elif model_type == 'SVM':
self._log("🎯 Training Support Vector Machine...")
self._log(" Kernel: Radial Basis Function (RBF)")
model = SVR(kernel='rbf')
self._log(" Finding optimal hyperplane...")
model.fit(X_train, y_train)
self._log(" ✅ Training complete - support vectors identified")
elif model_type == 'Neural Network (MLP)':
self._log("🧠 Training Multi-Layer Perceptron...")
self._log(" Architecture: Input → 100 → 50 → Output")
self._log(" Activation: ReLU | Optimizer: Adam")
model = MLPRegressor(hidden_layer_sizes=(100, 50), max_iter=500, random_state=42)
self._log(" Training neural network (max 500 iterations)...")
model.fit(X_train, y_train)
self._log(f" ✅ Training complete - converged in {model.n_iter_} iterations")
elif model_type in ['LSTM', 'CNN 1D', 'Transformer']:
self._log(f"🤖 Training Deep Learning Model: {model_type}")
preds, model = await self._train_deep_learning(X_train, y_train, X_test, model_type)
# Evaluation
self._log("\n" + "=" * 60)
self._log("Step 6: Model Evaluation & Validation")
self._log("=" * 60)
if model_type not in ['LSTM', 'CNN 1D', 'Transformer']:
self._log("🔮 Generating predictions on validation set...")
preds = model.predict(X_test)
self._log(f" ✅ {len(preds)} predictions generated")
# Calculate comprehensive metrics
from sklearn.metrics import mean_absolute_error
mse = mean_squared_error(y_test, preds)
rmse = np.sqrt(mse)
mae = mean_absolute_error(y_test, preds)
r2 = r2_score(y_test, preds)
# Calculate prediction accuracy (within 5% of actual)
accuracy_5pct = np.mean(np.abs((preds - y_test) / y_test) < 0.05) * 100
metrics = {"mse": float(mse), "r2": float(r2), "rmse": float(rmse), "mae": float(mae)}
self._log("\n📊 Performance Metrics:")
self._log(f" R² Score: {r2:.4f} {'✅' if r2 > 0.7 else '⚠️' if r2 > 0.5 else '❌'}")
self._log(f" MSE (Mean Squared Error): {mse:.4f}")
self._log(f" RMSE (Root MSE): ${rmse:.2f}")
self._log(f" MAE (Mean Absolute Error): ${mae:.2f}")
self._log(f" Prediction Accuracy (±5%): {accuracy_5pct:.1f}%")
# Interpretation
self._log("\n💡 Model Performance Interpretation:")
if r2 > 0.8:
self._log(" 🌟 Excellent - Model explains >80% of variance")
elif r2 > 0.6:
self._log(" ✅ Good - Model has strong predictive power")
elif r2 > 0.4:
self._log(" ⚠️ Fair - Model shows moderate predictive ability")
else:
self._log(" ❌ Poor - Consider feature engineering or different model")
self._log(f"\n🏁 Validation Complete!")
# Saving
import random
# Naming Convention: [stock]-[fund/tech]-[ml/dl]-[algo]-[3digit]
is_dl = model_type in ['LSTM', 'CNN 1D', 'Transformer']
tech_cat = "dl" if is_dl else "ml"
algo_slug = model_type.lower().replace(" ", "-").replace("(", "").replace(")", "")
rand_id = random.randint(100, 999)
final_name = f"{target_symbol}-technical-{tech_cat}-{algo_slug}-{rand_id}"
db_model = UserAIModel(
user_id=self.user_id,
name=final_name,
model_type=model_type,
target_symbol=target_symbol,
parameters=json.dumps({
"features": feature_cols,
"total_samples": len(X),
"train_samples": len(X_train),
"test_samples": len(X_test),
"test_size": test_size
}),
metrics=json.dumps(metrics)
)
self.db.add(db_model)
await self.db.commit()
self._log(f"💾 Model artifact indexed in database (ID: {db_model.id})")
return {
"status": "success",
"logs": self.logs,
"metrics": metrics,
"model_id": db_model.id
}
async def _train_deep_learning(self, X_train, y_train, X_test, model_type: str):
if not torch:
raise ImportError("PyTorch not installed.")
input_dim = X_train.shape[1]
X_train_t = torch.tensor(X_train, dtype=torch.float32).unsqueeze(1)
y_train_t = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
X_test_t = torch.tensor(X_test, dtype=torch.float32).unsqueeze(1)
dl_model = None
if model_type == 'LSTM':
dl_model = LSTMNet(input_dim)
elif model_type == 'CNN 1D':
dl_model = CNNNet(input_dim)
elif model_type == 'Transformer':
dl_model = TransformerNet(input_dim)
criterion = nn.MSELoss()
optimizer = optim.Adam(dl_model.parameters(), lr=0.01)
self._log(f"Training {model_type} with PyTorch... Epochs=50")
dl_model.train()
for _ in range(50):
optimizer.zero_grad()
outputs = dl_model(X_train_t)
loss = criterion(outputs, y_train_t)
loss.backward()
optimizer.step()
dl_model.eval()
with torch.no_grad():
preds_t = dl_model(X_test_t)
preds = preds_t.numpy().flatten()
return preds, dl_model
# --- PyTorch Model Classes ---
if torch:
class LSTMNet(nn.Module):
def __init__(self, input_dim):
super(LSTMNet, self).__init__()
self.lstm = nn.LSTM(input_dim, 64, batch_first=True)
self.dropout = nn.Dropout(0.2)
self.fc1 = nn.Linear(64, 32)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(32, 1)
def forward(self, x):
out, _ = self.lstm(x)
out = out[:, -1, :]
out = self.dropout(out)
out = self.fc1(out)
out = self.relu(out)
out = self.fc2(out)
return out
class CNNNet(nn.Module):
def __init__(self, input_dim):
super(CNNNet, self).__init__()
self.conv1 = nn.Conv1d(in_channels=input_dim, out_channels=64, kernel_size=1)
self.relu = nn.ReLU()
self.flatten = nn.Flatten()
self.fc1 = nn.Linear(64, 32)
self.fc2 = nn.Linear(32, 1)
def forward(self, x):
x = x.permute(0, 2, 1)
out = self.conv1(x)
out = self.relu(out)
out = self.flatten(out)
out = self.fc1(out)
out = self.relu(out)
out = self.fc2(out)
return out
class TransformerNet(nn.Module):
def __init__(self, input_dim):
super(TransformerNet, self).__init__()
self.embedding = nn.Linear(input_dim, 32)
encoder_layer = nn.TransformerEncoderLayer(d_model=32, nhead=2, batch_first=True, dim_feedforward=64)
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=1)
self.fc1 = nn.Linear(32, 1)
def forward(self, x):
out = self.embedding(x)
out = self.transformer(out)
out = out[:, -1, :]
out = self.fc1(out)
return out