cyberforge / trainer.py
mdhaggai
Deploy CyberForge AI ML Training Platform
7b61a48
"""
Advanced Cybersecurity Model Trainer
Comprehensive training module for security ML models
"""
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split, cross_val_score, StratifiedKFold
from sklearn.ensemble import (
RandomForestClassifier,
GradientBoostingClassifier,
AdaBoostClassifier,
ExtraTreesClassifier,
VotingClassifier,
StackingClassifier
)
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier
from sklearn.preprocessing import StandardScaler, LabelEncoder, MinMaxScaler
from sklearn.metrics import (
classification_report,
confusion_matrix,
roc_auc_score,
precision_recall_curve,
f1_score,
accuracy_score
)
from sklearn.feature_selection import SelectKBest, f_classif, mutual_info_classif
from sklearn.decomposition import PCA
import joblib
import json
from datetime import datetime
from pathlib import Path
import logging
from typing import Dict, List, Any, Optional, Tuple
import warnings
warnings.filterwarnings('ignore')
logger = logging.getLogger(__name__)
class CyberSecurityNeuralNet(nn.Module):
"""Deep Neural Network for Cybersecurity Classification"""
def __init__(self, input_size: int, hidden_sizes: List[int], num_classes: int, dropout: float = 0.3):
super().__init__()
layers = []
prev_size = input_size
for hidden_size in hidden_sizes:
layers.extend([
nn.Linear(prev_size, hidden_size),
nn.BatchNorm1d(hidden_size),
nn.ReLU(),
nn.Dropout(dropout)
])
prev_size = hidden_size
layers.append(nn.Linear(prev_size, num_classes))
self.network = nn.Sequential(*layers)
def forward(self, x):
return self.network(x)
class AdvancedSecurityTrainer:
"""Advanced trainer for cybersecurity models with multiple algorithms"""
def __init__(self, models_dir: str = "./trained_models"):
self.models_dir = Path(models_dir)
self.models_dir.mkdir(exist_ok=True)
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.trained_models = {}
self.training_history = []
def preprocess_security_data(
self,
df: pd.DataFrame,
target_col: str,
feature_selection: bool = True,
n_features: int = 50
) -> Tuple[np.ndarray, np.ndarray, StandardScaler, LabelEncoder, List[str]]:
"""Preprocess security data with advanced feature engineering"""
# Separate features and target
X = df.drop(columns=[target_col])
y = df[target_col]
# Store original feature names
feature_names = list(X.columns)
# Handle categorical features
categorical_cols = X.select_dtypes(include=['object', 'category']).columns
for col in categorical_cols:
le = LabelEncoder()
X[col] = le.fit_transform(X[col].astype(str))
# Handle missing values
X = X.fillna(X.median())
# Encode target if categorical
label_encoder = LabelEncoder()
if y.dtype == 'object' or y.dtype.name == 'category':
y = label_encoder.fit_transform(y)
else:
y = y.values
# Scale features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# Feature selection
if feature_selection and X_scaled.shape[1] > n_features:
selector = SelectKBest(mutual_info_classif, k=min(n_features, X_scaled.shape[1]))
X_scaled = selector.fit_transform(X_scaled, y)
selected_indices = selector.get_support(indices=True)
feature_names = [feature_names[i] for i in selected_indices]
return X_scaled, y, scaler, label_encoder, feature_names
def train_ensemble_model(
self,
X_train: np.ndarray,
y_train: np.ndarray,
X_test: np.ndarray,
y_test: np.ndarray,
model_name: str = "ensemble"
) -> Tuple[Any, Dict[str, float]]:
"""Train an ensemble of classifiers"""
logger.info("Training ensemble model...")
# Base estimators
estimators = [
('rf', RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1)),
('gb', GradientBoostingClassifier(n_estimators=100, random_state=42)),
('et', ExtraTreesClassifier(n_estimators=100, random_state=42, n_jobs=-1)),
]
# Voting classifier
voting_clf = VotingClassifier(estimators=estimators, voting='soft')
voting_clf.fit(X_train, y_train)
# Evaluate
y_pred = voting_clf.predict(X_test)
y_proba = voting_clf.predict_proba(X_test)
metrics = self._calculate_metrics(y_test, y_pred, y_proba)
# Save model
model_path = self.models_dir / f"{model_name}_ensemble.pkl"
joblib.dump(voting_clf, model_path)
logger.info(f"Ensemble model trained with accuracy: {metrics['accuracy']:.4f}")
return voting_clf, metrics
def train_stacking_model(
self,
X_train: np.ndarray,
y_train: np.ndarray,
X_test: np.ndarray,
y_test: np.ndarray,
model_name: str = "stacking"
) -> Tuple[Any, Dict[str, float]]:
"""Train a stacking classifier"""
logger.info("Training stacking model...")
# Base estimators
estimators = [
('rf', RandomForestClassifier(n_estimators=50, random_state=42)),
('gb', GradientBoostingClassifier(n_estimators=50, random_state=42)),
('svm', SVC(probability=True, random_state=42)),
]
# Stacking classifier with logistic regression meta-learner
stacking_clf = StackingClassifier(
estimators=estimators,
final_estimator=LogisticRegression(random_state=42),
cv=3
)
stacking_clf.fit(X_train, y_train)
# Evaluate
y_pred = stacking_clf.predict(X_test)
y_proba = stacking_clf.predict_proba(X_test)
metrics = self._calculate_metrics(y_test, y_pred, y_proba)
# Save model
model_path = self.models_dir / f"{model_name}_stacking.pkl"
joblib.dump(stacking_clf, model_path)
logger.info(f"Stacking model trained with accuracy: {metrics['accuracy']:.4f}")
return stacking_clf, metrics
def train_neural_network(
self,
X_train: np.ndarray,
y_train: np.ndarray,
X_test: np.ndarray,
y_test: np.ndarray,
hidden_sizes: List[int] = [256, 128, 64],
epochs: int = 100,
batch_size: int = 32,
learning_rate: float = 0.001,
model_name: str = "neural_net"
) -> Tuple[nn.Module, Dict[str, float]]:
"""Train a deep neural network"""
logger.info(f"Training neural network on {self.device}...")
# Convert to tensors
X_train_tensor = torch.FloatTensor(X_train).to(self.device)
y_train_tensor = torch.LongTensor(y_train).to(self.device)
X_test_tensor = torch.FloatTensor(X_test).to(self.device)
y_test_tensor = torch.LongTensor(y_test).to(self.device)
# Create data loader
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
# Initialize model
num_classes = len(np.unique(y_train))
model = CyberSecurityNeuralNet(
input_size=X_train.shape[1],
hidden_sizes=hidden_sizes,
num_classes=num_classes
).to(self.device)
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=10)
# Training loop
best_accuracy = 0
for epoch in range(epochs):
model.train()
total_loss = 0
for batch_X, batch_y in train_loader:
optimizer.zero_grad()
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
total_loss += loss.item()
# Validation
model.eval()
with torch.no_grad():
test_outputs = model(X_test_tensor)
test_loss = criterion(test_outputs, y_test_tensor)
_, predicted = torch.max(test_outputs, 1)
accuracy = (predicted == y_test_tensor).float().mean().item()
scheduler.step(test_loss)
if accuracy > best_accuracy:
best_accuracy = accuracy
torch.save(model.state_dict(), self.models_dir / f"{model_name}_nn_best.pt")
if (epoch + 1) % 20 == 0:
logger.info(f"Epoch [{epoch+1}/{epochs}], Loss: {total_loss/len(train_loader):.4f}, Accuracy: {accuracy:.4f}")
# Final evaluation
model.eval()
with torch.no_grad():
outputs = model(X_test_tensor)
_, y_pred = torch.max(outputs, 1)
y_pred = y_pred.cpu().numpy()
y_proba = torch.softmax(outputs, dim=1).cpu().numpy()
metrics = self._calculate_metrics(y_test, y_pred, y_proba)
logger.info(f"Neural network trained with accuracy: {metrics['accuracy']:.4f}")
return model, metrics
def train_all_models(
self,
df: pd.DataFrame,
target_col: str,
model_name: str,
test_size: float = 0.2
) -> Dict[str, Any]:
"""Train all available model types and return best performing"""
logger.info(f"Starting comprehensive training for {model_name}...")
# Preprocess data
X, y, scaler, label_encoder, feature_names = self.preprocess_security_data(df, target_col)
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=42, stratify=y
)
results = {}
# Train individual models
models_to_train = [
("random_forest", RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1)),
("gradient_boosting", GradientBoostingClassifier(n_estimators=100, random_state=42)),
("extra_trees", ExtraTreesClassifier(n_estimators=100, random_state=42, n_jobs=-1)),
("logistic_regression", LogisticRegression(random_state=42, max_iter=1000)),
("mlp", MLPClassifier(hidden_layer_sizes=(128, 64), random_state=42, max_iter=500)),
]
for name, model in models_to_train:
try:
logger.info(f"Training {name}...")
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
y_proba = model.predict_proba(X_test) if hasattr(model, 'predict_proba') else None
metrics = self._calculate_metrics(y_test, y_pred, y_proba)
results[name] = {
"model": model,
"metrics": metrics
}
# Save model
model_path = self.models_dir / f"{model_name}_{name}.pkl"
joblib.dump(model, model_path)
except Exception as e:
logger.error(f"Failed to train {name}: {e}")
results[name] = {"error": str(e)}
# Train ensemble
try:
ensemble_model, ensemble_metrics = self.train_ensemble_model(
X_train, y_train, X_test, y_test, model_name
)
results["ensemble"] = {
"model": ensemble_model,
"metrics": ensemble_metrics
}
except Exception as e:
logger.error(f"Failed to train ensemble: {e}")
# Train stacking
try:
stacking_model, stacking_metrics = self.train_stacking_model(
X_train, y_train, X_test, y_test, model_name
)
results["stacking"] = {
"model": stacking_model,
"metrics": stacking_metrics
}
except Exception as e:
logger.error(f"Failed to train stacking: {e}")
# Find best model
best_model_name = None
best_accuracy = 0
for name, result in results.items():
if "metrics" in result and result["metrics"]["accuracy"] > best_accuracy:
best_accuracy = result["metrics"]["accuracy"]
best_model_name = name
# Save preprocessing artifacts
joblib.dump(scaler, self.models_dir / f"{model_name}_scaler.pkl")
joblib.dump(label_encoder, self.models_dir / f"{model_name}_label_encoder.pkl")
# Save metadata
metadata = {
"model_name": model_name,
"target_column": target_col,
"feature_names": feature_names,
"num_features": len(feature_names),
"num_samples": len(df),
"num_classes": len(np.unique(y)),
"best_model": best_model_name,
"best_accuracy": best_accuracy,
"all_results": {
name: result.get("metrics", {"error": result.get("error")})
for name, result in results.items()
},
"created_at": datetime.now().isoformat()
}
with open(self.models_dir / f"{model_name}_metadata.json", 'w') as f:
json.dump(metadata, f, indent=2)
logger.info(f"Training complete. Best model: {best_model_name} with accuracy: {best_accuracy:.4f}")
return {
"results": results,
"metadata": metadata,
"scaler": scaler,
"label_encoder": label_encoder,
"feature_names": feature_names
}
def _calculate_metrics(
self,
y_true: np.ndarray,
y_pred: np.ndarray,
y_proba: Optional[np.ndarray] = None
) -> Dict[str, float]:
"""Calculate comprehensive metrics"""
metrics = {
"accuracy": float(accuracy_score(y_true, y_pred)),
"f1_weighted": float(f1_score(y_true, y_pred, average='weighted')),
"f1_macro": float(f1_score(y_true, y_pred, average='macro')),
}
# ROC AUC for binary or multi-class
if y_proba is not None:
try:
if len(np.unique(y_true)) == 2:
metrics["roc_auc"] = float(roc_auc_score(y_true, y_proba[:, 1]))
else:
metrics["roc_auc"] = float(roc_auc_score(y_true, y_proba, multi_class='ovr'))
except:
pass
return metrics
# Convenience function for Gradio interface
def train_comprehensive_model(
file_path: str,
target_column: str,
model_name: str,
test_size: float = 0.2
) -> Dict[str, Any]:
"""Train comprehensive models from file path"""
# Load dataset
if file_path.endswith('.csv'):
df = pd.read_csv(file_path)
elif file_path.endswith('.json'):
df = pd.read_json(file_path)
elif file_path.endswith('.parquet'):
df = pd.read_parquet(file_path)
else:
raise ValueError(f"Unsupported file format: {file_path}")
# Initialize trainer
trainer = AdvancedSecurityTrainer()
# Train all models
results = trainer.train_all_models(df, target_column, model_name, test_size)
return results