Data-Science-Agent / src /tools /model_training.py
Pulastya B
fix: Fix module import paths for Render deployment
227cb22
"""
Model Training Tools
Tools for training machine learning models and generating reports.
"""
import polars as pl
import numpy as np
from typing import Dict, Any, List, Optional
from pathlib import Path
import sys
import os
import joblib
import json
import tempfile
# Add parent directory to path for imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Import artifact store
try:
from storage.helpers import save_model_with_store
ARTIFACT_STORE_AVAILABLE = True
except ImportError:
ARTIFACT_STORE_AVAILABLE = False
print("⚠️ Artifact store not available, using local paths")
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression, Ridge, Lasso
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from xgboost import XGBClassifier, XGBRegressor
from lightgbm import LGBMClassifier, LGBMRegressor
from catboost import CatBoostClassifier, CatBoostRegressor
from sklearn.metrics import (
accuracy_score, precision_score, recall_score, f1_score,
confusion_matrix, classification_report,
mean_squared_error, mean_absolute_error, r2_score
)
import shap
try:
from .visualization_engine import (
generate_model_performance_plots,
generate_feature_importance_plot
)
VISUALIZATION_AVAILABLE = True
except ImportError as e:
VISUALIZATION_AVAILABLE = False
print(f"⚠️ Visualization engine not available: {e}")
from ..utils.polars_helpers import (
load_dataframe,
get_numeric_columns,
split_features_target,
)
from ..utils.validation import (
validate_file_exists,
validate_file_format,
validate_dataframe,
validate_column_exists,
validate_target_column,
)
def train_baseline_models(file_path: str, target_col: str,
task_type: str = "auto",
test_size: float = 0.2,
random_state: int = 42) -> Dict[str, Any]:
"""
Train multiple baseline models and compare performance.
Args:
file_path: Path to prepared dataset
target_col: Name of target column
task_type: 'classification', 'regression', or 'auto'
test_size: Proportion for test split
random_state: Random seed
Returns:
Dictionary with training results and best model
"""
# Validation
validate_file_exists(file_path)
validate_file_format(file_path)
# Load data
df = load_dataframe(file_path)
validate_dataframe(df)
validate_column_exists(df, target_col)
# Infer task type if auto
if task_type == "auto":
task_type = validate_target_column(df, target_col)
# Split features and target
X, y = split_features_target(df, target_col)
# Convert to numpy for sklearn
# Only keep numeric columns for X
numeric_cols = get_numeric_columns(X)
if len(numeric_cols) == 0:
return {
"status": "error",
"message": "No numeric features found. Please encode categorical variables first."
}
X_numeric = X.select(numeric_cols)
X_np = X_numeric.to_numpy()
y_np = y.to_numpy()
# Handle missing values (simple imputation with mean)
from sklearn.impute import SimpleImputer
imputer = SimpleImputer(strategy='mean')
X_np = imputer.fit_transform(X_np)
# Train-test split
X_train, X_test, y_train, y_test = train_test_split(
X_np, y_np, test_size=test_size, random_state=random_state
)
results = {
"task_type": task_type,
"n_features": X_np.shape[1],
"n_samples": len(X_np),
"train_size": len(X_train),
"test_size": len(X_test),
"feature_names": numeric_cols,
"models": {}
}
# Train models based on task type
if task_type == "classification":
models = {
"logistic_regression": LogisticRegression(max_iter=1000, random_state=random_state),
"random_forest": RandomForestClassifier(n_estimators=100, random_state=random_state, n_jobs=-1),
"xgboost": XGBClassifier(n_estimators=100, random_state=random_state, n_jobs=-1),
"lightgbm": LGBMClassifier(n_estimators=100, random_state=random_state, n_jobs=-1, verbose=-1),
"catboost": CatBoostClassifier(iterations=100, random_state=random_state, verbose=0, allow_writing_files=False)
}
for model_name, model in models.items():
try:
# Train
model.fit(X_train, y_train)
# Predict
y_pred_train = model.predict(X_train)
y_pred_test = model.predict(X_test)
# Metrics
results["models"][model_name] = {
"train_metrics": {
"accuracy": float(accuracy_score(y_train, y_pred_train)),
"precision": float(precision_score(y_train, y_pred_train, average='weighted', zero_division=0)),
"recall": float(recall_score(y_train, y_pred_train, average='weighted', zero_division=0)),
"f1": float(f1_score(y_train, y_pred_train, average='weighted', zero_division=0))
},
"test_metrics": {
"accuracy": float(accuracy_score(y_test, y_pred_test)),
"precision": float(precision_score(y_test, y_pred_test, average='weighted', zero_division=0)),
"recall": float(recall_score(y_test, y_pred_test, average='weighted', zero_division=0)),
"f1": float(f1_score(y_test, y_pred_test, average='weighted', zero_division=0))
}
}
# Save model using artifact store
if ARTIFACT_STORE_AVAILABLE:
model_path = save_model_with_store(
model_data={
"model": model,
"imputer": imputer,
"feature_names": numeric_cols
},
filename=f"{model_name}.pkl",
metadata={
"model_name": model_name,
"task_type": "classification",
"train_accuracy": float(accuracy_score(y_train, y_pred_train)),
"test_accuracy": float(accuracy_score(y_test, y_pred_test)),
"features": numeric_cols
}
)
else:
model_path = f"./outputs/models/{model_name}.pkl"
Path(model_path).parent.mkdir(parents=True, exist_ok=True)
joblib.dump({
"model": model,
"imputer": imputer,
"feature_names": numeric_cols
}, model_path)
results["models"][model_name]["model_path"] = model_path
except Exception as e:
results["models"][model_name] = {
"status": "error",
"message": str(e)
}
else: # regression
models = {
"ridge": Ridge(random_state=random_state),
"lasso": Lasso(random_state=random_state),
"random_forest": RandomForestRegressor(n_estimators=100, random_state=random_state, n_jobs=-1),
"xgboost": XGBRegressor(n_estimators=100, random_state=random_state, n_jobs=-1),
"lightgbm": LGBMRegressor(n_estimators=100, random_state=random_state, n_jobs=-1, verbose=-1),
"catboost": CatBoostRegressor(iterations=100, random_state=random_state, verbose=0, allow_writing_files=False)
}
for model_name, model in models.items():
try:
# Train
model.fit(X_train, y_train)
# Predict
y_pred_train = model.predict(X_train)
y_pred_test = model.predict(X_test)
# Metrics
results["models"][model_name] = {
"train_metrics": {
"mse": float(mean_squared_error(y_train, y_pred_train)),
"rmse": float(np.sqrt(mean_squared_error(y_train, y_pred_train))),
"mae": float(mean_absolute_error(y_train, y_pred_train)),
"r2": float(r2_score(y_train, y_pred_train))
},
"test_metrics": {
"mse": float(mean_squared_error(y_test, y_pred_test)),
"rmse": float(np.sqrt(mean_squared_error(y_test, y_pred_test))),
"mae": float(mean_absolute_error(y_test, y_pred_test)),
"r2": float(r2_score(y_test, y_pred_test))
}
}
# Save model using artifact store
if ARTIFACT_STORE_AVAILABLE:
model_path = save_model_with_store(
model_data={
"model": model,
"imputer": imputer,
"feature_names": numeric_cols
},
filename=f"{model_name}.pkl",
metadata={
"model_name": model_name,
"task_type": "regression",
"train_r2": float(r2_score(y_train, y_pred_train)),
"test_r2": float(r2_score(y_test, y_pred_test)),
"features": numeric_cols
}
)
else:
model_path = f"./outputs/models/{model_name}.pkl"
Path(model_path).parent.mkdir(parents=True, exist_ok=True)
joblib.dump({
"model": model,
"imputer": imputer,
"feature_names": numeric_cols
}, model_path)
results["models"][model_name]["model_path"] = model_path
except Exception as e:
results["models"][model_name] = {
"status": "error",
"message": str(e)
}
# Determine best model
best_model_name = None
best_score = -float('inf')
for model_name, model_results in results["models"].items():
if "test_metrics" in model_results:
if task_type == "classification":
score = model_results["test_metrics"]["f1"]
else:
score = model_results["test_metrics"]["r2"]
if score > best_score:
best_score = score
best_model_name = model_name
results["best_model"] = {
"name": best_model_name,
"score": best_score,
"model_path": results["models"][best_model_name]["model_path"] if best_model_name else None
}
# Generate visualizations for best model
if VISUALIZATION_AVAILABLE and best_model_name:
try:
print(f"\n🎨 Generating visualizations for {best_model_name}...")
# Load best model
model_data = joblib.dump({
"model": models[best_model_name],
"imputer": imputer,
"feature_names": numeric_cols
}, f"./outputs/models/{best_model_name}_temp.pkl")
# Get predictions for visualization
best_model = models[best_model_name]
y_pred_test = best_model.predict(X_test)
y_pred_proba = None
if hasattr(best_model, "predict_proba") and task_type == "classification":
y_pred_proba = best_model.predict_proba(X_test)
# Generate model performance plots
plot_dir = "./outputs/plots/model_performance"
perf_plots = generate_model_performance_plots(
y_true=y_test,
y_pred=y_pred_test,
y_pred_proba=y_pred_proba,
task_type=task_type,
model_name=best_model_name,
output_dir=plot_dir
)
results["performance_plots"] = perf_plots["plot_paths"]
# Generate feature importance plot if available
if hasattr(best_model, "feature_importances_"):
feature_importance = dict(zip(numeric_cols, best_model.feature_importances_))
importance_plot = generate_feature_importance_plot(
feature_importances=feature_importance,
output_path=f"{plot_dir}/feature_importance_{best_model_name}.png"
)
results["feature_importance_plot"] = importance_plot
print(f" ✓ Generated {len(perf_plots.get('plot_paths', []))} performance plots")
results["visualization_generated"] = True
except Exception as e:
print(f" ⚠️ Could not generate visualizations: {str(e)}")
results["visualization_generated"] = False
else:
results["visualization_generated"] = False
return results
def generate_model_report(model_path: str, test_data_path: str,
target_col: str, output_path: str) -> Dict[str, Any]:
"""
Generate comprehensive model evaluation report.
Args:
model_path: Path to saved model file
test_data_path: Path to test dataset
target_col: Name of target column
output_path: Path to save report JSON
Returns:
Dictionary with model report
"""
# Validation
validate_file_exists(model_path)
validate_file_exists(test_data_path)
# Load model
model_data = joblib.load(model_path)
model = model_data["model"]
imputer = model_data["imputer"]
feature_names = model_data["feature_names"]
# Load test data
df = load_dataframe(test_data_path)
validate_dataframe(df)
validate_column_exists(df, target_col)
# Prepare features
X = df.select(feature_names)
y = df[target_col].to_numpy()
X_np = imputer.transform(X.to_numpy())
# Predict
y_pred = model.predict(X_np)
# Determine task type
if hasattr(model, "predict_proba"):
task_type = "classification"
else:
task_type = "regression"
report = {
"model_path": model_path,
"task_type": task_type,
"n_features": len(feature_names),
"n_samples": len(X_np)
}
# Calculate metrics
if task_type == "classification":
report["metrics"] = {
"accuracy": float(accuracy_score(y, y_pred)),
"precision": float(precision_score(y, y_pred, average='weighted', zero_division=0)),
"recall": float(recall_score(y, y_pred, average='weighted', zero_division=0)),
"f1": float(f1_score(y, y_pred, average='weighted', zero_division=0))
}
# Confusion matrix
cm = confusion_matrix(y, y_pred)
report["confusion_matrix"] = cm.tolist()
# Classification report
class_report = classification_report(y, y_pred, output_dict=True, zero_division=0)
report["classification_report"] = class_report
else: # regression
report["metrics"] = {
"mse": float(mean_squared_error(y, y_pred)),
"rmse": float(np.sqrt(mean_squared_error(y, y_pred))),
"mae": float(mean_absolute_error(y, y_pred)),
"r2": float(r2_score(y, y_pred))
}
# Feature importance
if hasattr(model, "feature_importances_"):
importances = model.feature_importances_
feature_importance = [
{"feature": name, "importance": float(imp)}
for name, imp in zip(feature_names, importances)
]
feature_importance.sort(key=lambda x: x["importance"], reverse=True)
report["feature_importance"] = feature_importance[:20] # Top 20
# SHAP values (for top 10 features)
try:
# Use TreeExplainer for tree-based models
if hasattr(model, "feature_importances_"):
explainer = shap.TreeExplainer(model)
else:
# Use KernelExplainer for other models (sample for speed)
sample_size = min(100, len(X_np))
explainer = shap.KernelExplainer(
model.predict,
X_np[:sample_size]
)
shap_values = explainer.shap_values(X_np[:100]) # First 100 samples
# Calculate mean absolute SHAP values
if isinstance(shap_values, list): # Multi-class
shap_values = shap_values[0]
mean_shap = np.abs(shap_values).mean(axis=0)
shap_importance = [
{"feature": name, "shap_value": float(val)}
for name, val in zip(feature_names, mean_shap)
]
shap_importance.sort(key=lambda x: x["shap_value"], reverse=True)
report["shap_feature_importance"] = shap_importance[:10] # Top 10
except Exception as e:
report["shap_error"] = f"Could not compute SHAP values: {str(e)}"
# Save report
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w') as f:
json.dump(report, f, indent=2)
report["output_path"] = output_path
return report