dr_jones / Random_Forest /BinaryRandomForest_Template.py
anly656's picture
Upload BinaryRandomForest_Template.py
723bbc5 verified
# wk7cx.py - Random Forest Model Validation
# ANSI color codes - to print in color, the package colorama must be installed
RED = "\033[38;5;197m"
GOLD = "\033[38;5;185m"
TEAL = "\033[38;5;50m"
GREEN = "\033[38;5;82m"
RESET = "\033[0m"
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, GridSearchCV, cross_validate
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from AdvancedAnalytics.ReplaceImputeEncode import DT, ReplaceImputeEncode
from AdvancedAnalytics.Forest import forest_classifier
def print_boundary(lbl, b_width=60):
print("")
margin = b_width - len(lbl) - 2
lmargin = int(margin/2)
rmargin = lmargin
if lmargin+rmargin < margin:
lmargin += 1
print(f"{TEAL}", "="*b_width, f"{RESET}")
print(f"{GREEN}", lmargin*"*", lbl, rmargin*"*", f"{RESET}")
print(f"{TEAL}", "="*b_width, f"{RESET}")
def print_acc_ratio(scores, n):
n_folds = len(scores["train_score"])
train_misc = (1.0 - scores["train_score"])
train_smisc = 2.0*(1.0 - scores["train_score"]).std()
val_misc = (1.0 - scores["test_score"])
val_smisc = 2.0*(1.0 - scores["test_score"]).std()
ratio_misc = np.zeros(n_folds)
for i in range(0, n_folds):
if train_misc[i]>0:
ratio_misc[i] = val_misc[i] / train_misc[i]
elif val_misc[i]>0:
ratio_misc[i] = np.inf
else:
ratio_misc[i] = 1.0
try:
s_ratio = 2.0*ratio_misc.std()
except:
s_ratio = np.nan
train_misc = train_misc.mean()
val_misc = val_misc.mean()
ratio = val_misc/train_misc if train_misc>0 else np.inf
print(f"{TEAL}\n")
print(f" ====== {n_folds:.0f}-Fold Cross Validation =======")
print(f" Train Avg. MISC..... {train_misc:.4f} +/-{train_smisc:.4f}")
print(f" Test Avg. MISC..... {val_misc:.4f} +/-{val_smisc:.4f}")
if s_ratio == np.nan or s_ratio == np.inf:
print(f" Mean Misc Ratio..... {ratio:.4f}")
else:
print(f" Mean Misc Ratio..... {ratio:.4f} +/-{s_ratio:.4f}")
print(" ", 39*"=", f"{RESET}")
n_v = n*(1.0/n_folds)
n_t = n - n_v
print(f"Equivalent to {n_folds:.0f} splits each with "+
f"{n_t:.0f}/{n_v:.0f} Cases")
def print_summary(train_acc, val_acc):
train_misc = 1.0 - train_acc
val_misc = 1.0 - val_acc
ratio_acc = train_acc / val_acc if val_acc>0 else np.inf
if train_misc>0:
ratio_misc = val_misc / train_misc
elif val_misc>0:
ratio_misc = np.inf
else:
ratio_misc = 1.0
#print accuracy and misclassification summary for train/validation
print(f"{GREEN}{'TRAIN':>28s} {'VALIDATION':>11s} {'RATIO':>7s}")
if ratio_acc < 1.2:
color = GREEN
else:
color = RED
print(f"{GREEN} {'ACCURACY':.<20s}{GOLD}{train_acc:>7.4f}",
f" {val_acc:>7.4f} {color}{ratio_acc:>7.4f}{RESET}")
if ratio_misc < 1.2:
color = GREEN
else:
color = RED
print(f"{GREEN} {'MISCLASSIFICATION':.<20s}{GOLD}{train_misc:>7.4f}",
f" {val_misc:>7.4f} {color}{ratio_misc:>7.4f}{RESET}")
print(f"{TEAL}","-"*47, f"{RESET}")
# Read the data
data = pd.read_csv('../data/ai_dev_productivity.csv')
# Display basic information about the dataset
print("Dataset shape:", data.shape)
print("\nFirst 5 rows:")
print(data.head())
print("\nData types:")
print(data.dtypes)
print("\nSummary statistics:")
print(data.describe())
# Step 2: ReplaceImputeEncode (RIE) Processing
lbl = "Step 2: ReplaceImputeEncode (RIE) Processing"
print_boundary(lbl)
# Create data map based on data dictionary
data_map = {
"hours_coding": [DT.Interval, (0, 12)],
"coffee_intake_mg": [DT.Interval, (0, 1000)],
"distractions": [DT.Interval, (0, 10)],
"sleep_hours": [DT.Interval, (3, 12)],
"commits": [DT.Interval, (0, 15)],
"bugs_reported": [DT.Interval, (0, 9)],
"ai_usage_hours": [DT.Interval, (0, 8)],
"cognitive_load": [DT.Interval, (0, 10)],
"complexity": [DT.Nominal, ("low", "mid", "high")],
"experience": [DT.Nominal, (1, 2, 3)],
"task_success": [DT.Binary, (0, 1)]
}
# Set target variable
target = "task_success"
# Apply ReplaceImputeEncode preprocessing
rie = ReplaceImputeEncode(data_map=data_map,
interval_scale=None, # No interval scaling
no_impute=[target], # Do not impute target
binary_encoding="one-hot",
nominal_encoding="one-hot",
drop=False, # Keep all columns
display=True)
# Transform the data
encoded_data = rie.fit_transform(data)
print(f"\nEncoded data shape: {encoded_data.shape[0]} cases and {encoded_data.shape[1]} columns")
# Step 3: Random Forest Hyperparameter Optimization
lbl = "Step 3: Random Forest Hyperparameter Optimization"
print_boundary(lbl)
# Separate features and target
y = encoded_data[target]
X = encoded_data.drop(target, axis=1)
# Define hyperparameter grid for Random Forest
# Dynamic Grid Construction based on Dr. Jones' Rules
N, K = X.shape
# 1. max_features: Adaptive to number of columns (predictors)
max_features_list = ['sqrt']
# Also try specific counts if feasible
possible_ints = [int(K * 0.2), int(K * 0.333), int(K * 0.5)] # Try k/5, k/3, k/2
for f in possible_ints:
if f > 0 and f not in max_features_list:
max_features_list.append(f)
max_features_list.append(None) # Add None (all features) at the end
# 2. min_samples_leaf: Start at 0.5% of N
min_leaf_base = int(max(1, N * 0.005))
leaf_list = [min_leaf_base, min_leaf_base*2, min_leaf_base*4]
# Remove duplicates and sort
leaf_list = sorted(list(set(leaf_list)))
param_grid = {
'n_estimators': [50, 100, 200], # Expanded
'max_depth': [5, 10, 15, None], # Depth is less critical in RF than single tree, but good to vary
'min_samples_split': [2, 5, 10],
'min_samples_leaf': leaf_list,
'max_features': max_features_list,
'criterion': ['gini', 'entropy'],
'bootstrap': [True]
}
# Calculate total combinations
total_combinations = 1
for param_list in param_grid.values():
total_combinations *= len(param_list)
total_fits = total_combinations * 5 # 5-fold CV
print(f"Grid Search: {total_combinations} parameter combinations")
print(f"Using 5-fold CV requires {total_fits} total fits")
print(f"Optimizing for accuracy (minimizing misclassification)")
# Perform grid search with cross-validation
rf_grid = GridSearchCV(
RandomForestClassifier(random_state=42),
param_grid=param_grid,
cv=4,
scoring='accuracy',
n_jobs=-1,
verbose=1
)
print("\nStarting grid search...")
rf_grid.fit(X, y)
# Display best parameters and score
print(f"\nBest parameters found:")
for param, value in rf_grid.best_params_.items():
print(f" {param}: {value}")
print(f"\nBest cross-validation accuracy: {rf_grid.best_score_:.4f}")
print(f"Best cross-validation misclassification: {1 - rf_grid.best_score_:.4f}")
# Get the best model
best_rf = rf_grid.best_estimator_
# Step 4: Kitchen Sink Random Forest Evaluation
lbl = "Step 4: Kitchen Sink Random Forest (Default Parameters)"
print_boundary(lbl)
# First show the overfitting on full data
print("Fitting kitchen sink random forest using entire dataset")
kitchen_sink_rf = RandomForestClassifier(random_state=42) # Default parameters
kitchen_sink_rf = kitchen_sink_rf.fit(X, y)
# Evaluate on training data (will show overfitting)
print("Evaluating kitchen sink random forest on full dataset:")
forest_classifier.display_metrics(kitchen_sink_rf, X, y)
print("Note: These metrics show overfitting since we trained and tested on the same data")
# Step 5: 70/30 Holdout Validation of Kitchen Sink Random Forest
lbl = "Step 5: 70/30 Holdout Validation of Kitchen Sink Random Forest"
print_boundary(lbl)
# Split the data 70/30
X_train_ks, X_val_ks, y_train_ks, y_val_ks = train_test_split(
X, y, test_size=0.3, stratify=y, random_state=42
)
# Fit kitchen sink random forest on training data only
kitchen_sink_rf_cv = RandomForestClassifier(random_state=42)
kitchen_sink_rf_cv = kitchen_sink_rf_cv.fit(X_train_ks, y_train_ks)
# Evaluate using AdvancedAnalytics display_split_metrics
forest_classifier.display_split_metrics(kitchen_sink_rf_cv,
X_train_ks, y_train_ks,
X_val_ks, y_val_ks)
# Calculate and display accuracy ratio and misclassification ratio
train_pred_ks_cv = kitchen_sink_rf_cv.predict(X_train_ks)
val_pred_ks_cv = kitchen_sink_rf_cv.predict(X_val_ks)
train_acc_ks_cv = accuracy_score(y_train_ks, train_pred_ks_cv)
val_acc_ks_cv = accuracy_score(y_val_ks, val_pred_ks_cv)
lbl = "Kitchen Sink 70/30 Validation"
print_boundary(lbl, 47)
print_summary(train_acc_ks_cv, val_acc_ks_cv)
# Step 6: Holdout Validation with Optimized Random Forest
lbl = "Step 6: Holdout Validation with Optimized Random Forest"
print_boundary(lbl)
# Split the data 70/30 (same split for consistency)
X_train_opt, X_val_opt, y_train_opt, y_val_opt = train_test_split(
X, y, test_size=0.3, stratify=y, random_state=42
)
# Train optimized model on training set
optimized_rf = best_rf
optimized_rf = optimized_rf.fit(X_train_opt, y_train_opt)
# Evaluate using AdvancedAnalytics display_split_metrics
forest_classifier.display_split_metrics(optimized_rf,
X_train_opt, y_train_opt,
X_val_opt, y_val_opt)
# Display feature importance for the optimized model
print(f"{GOLD}\nTop 10 Feature Importance (from optimized model):")
forest_classifier.display_importance(optimized_rf, X.columns, top=10, plot=False)
print(f"{RESET}")
# Step 7: K-Fold Cross-Validation with Optimized Model
lbl = "Step 7: K-Fold Cross-Validation with Optimized Model"
print_boundary(lbl)
from sklearn.model_selection import cross_validate
n = X.shape[0]
best_val_acc = 0
print(f"Testing K-fold CV from 2-fold to 10-fold on {n} observations")
print("Finding optimal K that maximizes validation accuracy")
for k in range(2, 11): # Test 2-fold through 10-fold CV
scores = cross_validate(
best_rf, X, y, scoring='accuracy',
cv=k, return_train_score=True
)
# Use the proper print_acc_ratio function from ai_dev_tree.py
print_acc_ratio(scores, n)
# Calculate metrics for finding best K
train_acc_cv = scores["train_score"].mean()
val_acc_cv = scores["test_score"].mean()
if val_acc_cv > best_val_acc:
best_k = k
best_train_acc = train_acc_cv
best_val_acc = val_acc_cv
print(f"\n{GOLD}Best K: {best_k}-fold{GOLD}")
print(f"Training Accuracy: {best_train_acc:.4f}")
print(f"Validation Accuracy: {best_val_acc:.4f}{RESET}")
# Step 8: Final Model Summary
lbl = "Step 8: Final Model Summary"
print_boundary(lbl, 47)
print("Random Forest Model Validation Complete")
print(f"Best hyperparameters found via GridSearchCV with 4-fold CV")
print(f"Final validation using {best_k}-fold cross-validation")
print(".4f")
# Calculate final performance metrics from the optimized holdout validation
final_train_acc = accuracy_score(y_train_opt, optimized_rf.predict(X_train_opt))
final_val_acc = accuracy_score(y_val_opt, optimized_rf.predict(X_val_opt))
lbl = "Final Optimized Model Performance Summary"
print_boundary(lbl, 47)
print_summary(final_train_acc, final_val_acc)