| import json
|
| import numpy as np
|
| import pandas as pd
|
| from typing import List, Dict, Any, Tuple, Optional
|
| from sqlalchemy.ext.asyncio import AsyncSession
|
| from sqlalchemy import select
|
| from app.models.database import HistoricalData, UserAIModel
|
| from app.services.user_service import user_service
|
|
|
|
|
| from sklearn.ensemble import RandomForestRegressor
|
| from sklearn.linear_model import LinearRegression
|
| from sklearn.svm import SVR
|
| from sklearn.neural_network import MLPRegressor
|
| from sklearn.preprocessing import StandardScaler
|
| from sklearn.metrics import mean_squared_error, r2_score
|
|
|
|
|
| try:
|
| import xgboost as xgb
|
| except ImportError:
|
| xgb = None
|
|
|
| try:
|
| import torch
|
| import torch.nn as nn
|
| import torch.optim as optim
|
| except ImportError:
|
| torch = None
|
|
|
|
|
| class ModelTrainer:
|
| """
|
| Handles the orchestration of data fetching, preprocessing,
|
| training, and evaluation of AI models.
|
| """
|
|
|
| def __init__(self, db: AsyncSession, user_id: int):
|
| self.db = db
|
| self.user_id = user_id
|
| self.logs = []
|
|
|
| def _log(self, msg: str):
|
| print(f"[ModelTrainer] {msg}")
|
| self.logs.append(msg)
|
|
|
| async def fetch_data(self, symbol: str) -> pd.DataFrame:
|
| self._log(f"Step 1: Fetching historical data for {symbol}...")
|
|
|
| result = await self.db.execute(
|
| select(HistoricalData)
|
| .where(HistoricalData.symbol == symbol)
|
| .order_by(HistoricalData.date.asc())
|
| )
|
| rows = result.scalars().all()
|
|
|
| if not rows:
|
| self._log(f"⚠️ Data missing in primary vault for {symbol}. Triggering auto-sync via Yahoo Finance...")
|
| try:
|
| await user_service.sync_historical_data(self.db, symbol)
|
| self._log(f"✅ Sync complete for {symbol}. Re-fetching records...")
|
| result = await self.db.execute(
|
| select(HistoricalData)
|
| .where(HistoricalData.symbol == symbol)
|
| .order_by(HistoricalData.date.asc())
|
| )
|
| rows = result.scalars().all()
|
| except Exception as e:
|
| self._log(f"❌ Auto-sync failed for {symbol}: {str(e)}")
|
|
|
| if not rows:
|
| self._log(f"CRITICAL: No historical data available for {symbol}. Abandoning training.")
|
| return pd.DataFrame()
|
|
|
| df_data = []
|
| for r in rows:
|
| indicators = json.loads(r.indicators) if r.indicators else {}
|
| row_dict = {"close": r.close}
|
| row_dict.update(indicators)
|
| df_data.append(row_dict)
|
|
|
| df = pd.DataFrame(df_data)
|
| self._log(f"📊 Dataset Loaded. Total records: {len(df)}")
|
| return df
|
|
|
|
|
| def preprocess(self, df: pd.DataFrame, features: List[str]) -> Tuple[np.ndarray, np.ndarray, List[str]]:
|
| self._log("=" * 60)
|
| self._log("Step 2: Exploratory Data Analysis (EDA)")
|
| self._log("=" * 60)
|
|
|
|
|
| self._log(f"📊 Dataset shape: {df.shape[0]} rows × {df.shape[1]} columns")
|
| self._log(f"📅 Date range: {df.index[0] if hasattr(df, 'index') else 'N/A'} to {df.index[-1] if hasattr(df, 'index') else 'N/A'}")
|
|
|
|
|
| missing_count = df.isnull().sum().sum()
|
| if missing_count > 0:
|
| self._log(f"⚠️ Missing values detected: {missing_count} cells")
|
| self._log(" Applying forward-fill interpolation...")
|
| df = df.fillna(method='ffill').fillna(method='bfill')
|
| else:
|
| self._log("✅ No missing values detected")
|
|
|
|
|
| self._log("\n🔍 Outlier Detection (IQR Method):")
|
| outliers_removed = 0
|
| for col in ['close']:
|
| if col in df.columns:
|
| Q1 = df[col].quantile(0.25)
|
| Q3 = df[col].quantile(0.75)
|
| IQR = Q3 - Q1
|
| lower_bound = Q1 - 1.5 * IQR
|
| upper_bound = Q3 + 1.5 * IQR
|
| outlier_mask = (df[col] < lower_bound) | (df[col] > upper_bound)
|
| outlier_count = outlier_mask.sum()
|
| if outlier_count > 0:
|
| self._log(f" {col}: {outlier_count} outliers detected (bounds: {lower_bound:.2f} - {upper_bound:.2f})")
|
|
|
| df.loc[df[col] < lower_bound, col] = lower_bound
|
| df.loc[df[col] > upper_bound, col] = upper_bound
|
| outliers_removed += outlier_count
|
|
|
| if outliers_removed > 0:
|
| self._log(f" ✅ {outliers_removed} outliers capped to preserve data integrity")
|
| else:
|
| self._log(" ✅ No significant outliers detected")
|
|
|
| self._log("\n" + "=" * 60)
|
| self._log("Step 3: Feature Engineering")
|
| self._log("=" * 60)
|
|
|
|
|
| df = df.copy()
|
| df['target'] = df['close'].shift(-1)
|
|
|
|
|
| self._log("🔧 Creating temporal features:")
|
| df['close_lag_1'] = df['close'].shift(1)
|
| df['close_lag_5'] = df['close'].shift(5)
|
| df['rolling_mean_5'] = df['close'].rolling(window=5).mean()
|
| df['rolling_std_5'] = df['close'].rolling(window=5).std()
|
| df['price_change'] = df['close'].pct_change()
|
|
|
| self._log(" - Lag features: close_lag_1, close_lag_5")
|
| self._log(" - Rolling statistics: 5-day mean, 5-day std")
|
| self._log(" - Price change percentage")
|
|
|
|
|
| initial_rows = len(df)
|
| df = df.dropna()
|
| rows_dropped = initial_rows - len(df)
|
| self._log(f" 📉 Dropped {rows_dropped} rows due to feature engineering (NaN values)")
|
|
|
|
|
| feature_cols = [f for f in features if f in df.columns]
|
|
|
|
|
| engineered_features = ['close_lag_1', 'rolling_mean_5', 'price_change']
|
| for feat in engineered_features:
|
| if feat in df.columns and feat not in feature_cols:
|
| feature_cols.append(feat)
|
|
|
| if not feature_cols:
|
| self._log("⚠️ Warning: Requested features not found in data. Falling back to price-only model.")
|
| feature_cols = ['close']
|
|
|
| X = df[feature_cols].values
|
| y = df['target'].values
|
|
|
| self._log(f"\n🧬 Final Feature Vector: {len(feature_cols)} dimensions")
|
| self._log(f" Features: {', '.join(feature_cols)}")
|
| self._log(f"📊 Training samples after preprocessing: {len(X)}")
|
| self._log(f" Target variable: next-day closing price")
|
|
|
| return X, y, feature_cols
|
|
|
|
|
| async def train(self, target_symbol: str, model_type: str, features: List[str], test_size: float = 0.2):
|
| split_idx = int(len(X) * (1 - test_size))
|
| X_train, X_test = X[:split_idx], X[split_idx:]
|
| y_train, y_test = y[:split_idx], y[split_idx:]
|
|
|
| self._log(f"✅ Training set: {len(X_train)} samples")
|
| self._log(f"✅ Validation set: {len(X_test)} samples")
|
| self._log(f" Target mean (train): ${y_train.mean():.2f}")
|
| self._log(f" Target std (train): ${y_train.std():.2f}")
|
|
|
|
|
| scaler = None
|
| needs_scaling = model_type in ['SVM', 'Neural Network (MLP)', 'LSTM', 'CNN 1D', 'Transformer', 'Linear Regression']
|
| if needs_scaling:
|
| self._log("\n⚖️ Applying Feature Normalization (StandardScaler):")
|
| self._log(f" Reason: {model_type} is sensitive to feature scales")
|
| scaler = StandardScaler()
|
| X_train_before_mean = X_train.mean()
|
| X_train = scaler.fit_transform(X_train)
|
| X_test = scaler.transform(X_test)
|
| self._log(f" ✅ Features normalized (mean ≈ 0, std ≈ 1)")
|
| self._log(f" Before: mean={X_train_before_mean:.2f} | After: mean={X_train.mean():.4f}")
|
| else:
|
| self._log(f"\n⚖️ Skipping normalization ({model_type} is scale-invariant)")
|
|
|
|
|
| self._log("\n" + "=" * 60)
|
| self._log(f"Step 5: Model Training - {model_type}")
|
| self._log("=" * 60)
|
|
|
|
|
| model = None
|
| if model_type == 'RandomForest':
|
| self._log("🌲 Training Random Forest Regressor...")
|
| self._log(" Hyperparameters: n_estimators=100, random_state=42")
|
| model = RandomForestRegressor(n_estimators=100, random_state=42)
|
| self._log(" Fitting ensemble of decision trees...")
|
| model.fit(X_train, y_train)
|
| self._log(" ✅ Training complete - 100 trees grown")
|
| elif model_type == 'XGBoost':
|
| if xgb:
|
| self._log("🔥 Training XGBoost Gradient Boosting...")
|
| self._log(" Hyperparameters: n_estimators=100, objective=reg:squarederror")
|
| model = xgb.XGBRegressor(objective='reg:squarederror', n_estimators=100)
|
| self._log(" Fitting boosted trees with gradient descent...")
|
| model.fit(X_train, y_train)
|
| self._log(" ✅ Training complete - 100 boosting rounds")
|
| else:
|
| self._log("⚠️ XGBoost not available, falling back to RandomForest")
|
| model = RandomForestRegressor(n_estimators=100, random_state=42)
|
| model.fit(X_train, y_train)
|
| elif model_type == 'Linear Regression':
|
| self._log("📈 Training Linear Regression...")
|
| self._log(" Method: Ordinary Least Squares (OLS)")
|
| model = LinearRegression()
|
| self._log(" Fitting linear model to training data...")
|
| model.fit(X_train, y_train)
|
| self._log(f" ✅ Training complete - {len(feature_cols)} coefficients learned")
|
| elif model_type == 'SVM':
|
| self._log("🎯 Training Support Vector Machine...")
|
| self._log(" Kernel: Radial Basis Function (RBF)")
|
| model = SVR(kernel='rbf')
|
| self._log(" Finding optimal hyperplane...")
|
| model.fit(X_train, y_train)
|
| self._log(" ✅ Training complete - support vectors identified")
|
| elif model_type == 'Neural Network (MLP)':
|
| self._log("🧠 Training Multi-Layer Perceptron...")
|
| self._log(" Architecture: Input → 100 → 50 → Output")
|
| self._log(" Activation: ReLU | Optimizer: Adam")
|
| model = MLPRegressor(hidden_layer_sizes=(100, 50), max_iter=500, random_state=42)
|
| self._log(" Training neural network (max 500 iterations)...")
|
| model.fit(X_train, y_train)
|
| self._log(f" ✅ Training complete - converged in {model.n_iter_} iterations")
|
| elif model_type in ['LSTM', 'CNN 1D', 'Transformer']:
|
| self._log(f"🤖 Training Deep Learning Model: {model_type}")
|
| preds, model = await self._train_deep_learning(X_train, y_train, X_test, model_type)
|
|
|
|
|
| self._log("\n" + "=" * 60)
|
| self._log("Step 6: Model Evaluation & Validation")
|
| self._log("=" * 60)
|
|
|
| if model_type not in ['LSTM', 'CNN 1D', 'Transformer']:
|
| self._log("🔮 Generating predictions on validation set...")
|
| preds = model.predict(X_test)
|
| self._log(f" ✅ {len(preds)} predictions generated")
|
|
|
|
|
| from sklearn.metrics import mean_absolute_error
|
| mse = mean_squared_error(y_test, preds)
|
| rmse = np.sqrt(mse)
|
| mae = mean_absolute_error(y_test, preds)
|
| r2 = r2_score(y_test, preds)
|
|
|
|
|
| accuracy_5pct = np.mean(np.abs((preds - y_test) / y_test) < 0.05) * 100
|
|
|
| metrics = {"mse": float(mse), "r2": float(r2), "rmse": float(rmse), "mae": float(mae)}
|
|
|
| self._log("\n📊 Performance Metrics:")
|
| self._log(f" R² Score: {r2:.4f} {'✅' if r2 > 0.7 else '⚠️' if r2 > 0.5 else '❌'}")
|
| self._log(f" MSE (Mean Squared Error): {mse:.4f}")
|
| self._log(f" RMSE (Root MSE): ${rmse:.2f}")
|
| self._log(f" MAE (Mean Absolute Error): ${mae:.2f}")
|
| self._log(f" Prediction Accuracy (±5%): {accuracy_5pct:.1f}%")
|
|
|
|
|
| self._log("\n💡 Model Performance Interpretation:")
|
| if r2 > 0.8:
|
| self._log(" 🌟 Excellent - Model explains >80% of variance")
|
| elif r2 > 0.6:
|
| self._log(" ✅ Good - Model has strong predictive power")
|
| elif r2 > 0.4:
|
| self._log(" ⚠️ Fair - Model shows moderate predictive ability")
|
| else:
|
| self._log(" ❌ Poor - Consider feature engineering or different model")
|
|
|
| self._log(f"\n🏁 Validation Complete!")
|
|
|
|
|
|
|
| import random
|
|
|
|
|
| is_dl = model_type in ['LSTM', 'CNN 1D', 'Transformer']
|
| tech_cat = "dl" if is_dl else "ml"
|
| algo_slug = model_type.lower().replace(" ", "-").replace("(", "").replace(")", "")
|
| rand_id = random.randint(100, 999)
|
|
|
| final_name = f"{target_symbol}-technical-{tech_cat}-{algo_slug}-{rand_id}"
|
|
|
| db_model = UserAIModel(
|
| user_id=self.user_id,
|
| name=final_name,
|
| model_type=model_type,
|
| target_symbol=target_symbol,
|
| parameters=json.dumps({
|
| "features": feature_cols,
|
| "total_samples": len(X),
|
| "train_samples": len(X_train),
|
| "test_samples": len(X_test),
|
| "test_size": test_size
|
| }),
|
| metrics=json.dumps(metrics)
|
| )
|
| self.db.add(db_model)
|
| await self.db.commit()
|
| self._log(f"💾 Model artifact indexed in database (ID: {db_model.id})")
|
|
|
| return {
|
| "status": "success",
|
| "logs": self.logs,
|
| "metrics": metrics,
|
| "model_id": db_model.id
|
| }
|
|
|
| async def _train_deep_learning(self, X_train, y_train, X_test, model_type: str):
|
| if not torch:
|
| raise ImportError("PyTorch not installed.")
|
|
|
| input_dim = X_train.shape[1]
|
| X_train_t = torch.tensor(X_train, dtype=torch.float32).unsqueeze(1)
|
| y_train_t = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
|
| X_test_t = torch.tensor(X_test, dtype=torch.float32).unsqueeze(1)
|
|
|
| dl_model = None
|
| if model_type == 'LSTM':
|
| dl_model = LSTMNet(input_dim)
|
| elif model_type == 'CNN 1D':
|
| dl_model = CNNNet(input_dim)
|
| elif model_type == 'Transformer':
|
| dl_model = TransformerNet(input_dim)
|
|
|
| criterion = nn.MSELoss()
|
| optimizer = optim.Adam(dl_model.parameters(), lr=0.01)
|
|
|
| self._log(f"Training {model_type} with PyTorch... Epochs=50")
|
| dl_model.train()
|
| for _ in range(50):
|
| optimizer.zero_grad()
|
| outputs = dl_model(X_train_t)
|
| loss = criterion(outputs, y_train_t)
|
| loss.backward()
|
| optimizer.step()
|
|
|
| dl_model.eval()
|
| with torch.no_grad():
|
| preds_t = dl_model(X_test_t)
|
| preds = preds_t.numpy().flatten()
|
|
|
| return preds, dl_model
|
|
|
|
|
|
|
|
|
| if torch:
|
| class LSTMNet(nn.Module):
|
| def __init__(self, input_dim):
|
| super(LSTMNet, self).__init__()
|
| self.lstm = nn.LSTM(input_dim, 64, batch_first=True)
|
| self.dropout = nn.Dropout(0.2)
|
| self.fc1 = nn.Linear(64, 32)
|
| self.relu = nn.ReLU()
|
| self.fc2 = nn.Linear(32, 1)
|
|
|
| def forward(self, x):
|
| out, _ = self.lstm(x)
|
| out = out[:, -1, :]
|
| out = self.dropout(out)
|
| out = self.fc1(out)
|
| out = self.relu(out)
|
| out = self.fc2(out)
|
| return out
|
|
|
| class CNNNet(nn.Module):
|
| def __init__(self, input_dim):
|
| super(CNNNet, self).__init__()
|
| self.conv1 = nn.Conv1d(in_channels=input_dim, out_channels=64, kernel_size=1)
|
| self.relu = nn.ReLU()
|
| self.flatten = nn.Flatten()
|
| self.fc1 = nn.Linear(64, 32)
|
| self.fc2 = nn.Linear(32, 1)
|
|
|
| def forward(self, x):
|
| x = x.permute(0, 2, 1)
|
| out = self.conv1(x)
|
| out = self.relu(out)
|
| out = self.flatten(out)
|
| out = self.fc1(out)
|
| out = self.relu(out)
|
| out = self.fc2(out)
|
| return out
|
|
|
| class TransformerNet(nn.Module):
|
| def __init__(self, input_dim):
|
| super(TransformerNet, self).__init__()
|
| self.embedding = nn.Linear(input_dim, 32)
|
| encoder_layer = nn.TransformerEncoderLayer(d_model=32, nhead=2, batch_first=True, dim_feedforward=64)
|
| self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=1)
|
| self.fc1 = nn.Linear(32, 1)
|
|
|
| def forward(self, x):
|
| out = self.embedding(x)
|
| out = self.transformer(out)
|
| out = out[:, -1, :]
|
| out = self.fc1(out)
|
| return out
|
|
|