# wk7cx.py - Random Forest Model Validation # ANSI color codes - to print in color, the package colorama must be installed RED = "\033[38;5;197m" GOLD = "\033[38;5;185m" TEAL = "\033[38;5;50m" GREEN = "\033[38;5;82m" RESET = "\033[0m" import pandas as pd import numpy as np from sklearn.model_selection import train_test_split, GridSearchCV, cross_validate from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import accuracy_score, classification_report, confusion_matrix from AdvancedAnalytics.ReplaceImputeEncode import DT, ReplaceImputeEncode from AdvancedAnalytics.Forest import forest_classifier def print_boundary(lbl, b_width=60): print("") margin = b_width - len(lbl) - 2 lmargin = int(margin/2) rmargin = lmargin if lmargin+rmargin < margin: lmargin += 1 print(f"{TEAL}", "="*b_width, f"{RESET}") print(f"{GREEN}", lmargin*"*", lbl, rmargin*"*", f"{RESET}") print(f"{TEAL}", "="*b_width, f"{RESET}") def print_acc_ratio(scores, n): n_folds = len(scores["train_score"]) train_misc = (1.0 - scores["train_score"]) train_smisc = 2.0*(1.0 - scores["train_score"]).std() val_misc = (1.0 - scores["test_score"]) val_smisc = 2.0*(1.0 - scores["test_score"]).std() ratio_misc = np.zeros(n_folds) for i in range(0, n_folds): if train_misc[i]>0: ratio_misc[i] = val_misc[i] / train_misc[i] elif val_misc[i]>0: ratio_misc[i] = np.inf else: ratio_misc[i] = 1.0 try: s_ratio = 2.0*ratio_misc.std() except: s_ratio = np.nan train_misc = train_misc.mean() val_misc = val_misc.mean() ratio = val_misc/train_misc if train_misc>0 else np.inf print(f"{TEAL}\n") print(f" ====== {n_folds:.0f}-Fold Cross Validation =======") print(f" Train Avg. MISC..... {train_misc:.4f} +/-{train_smisc:.4f}") print(f" Test Avg. MISC..... {val_misc:.4f} +/-{val_smisc:.4f}") if s_ratio == np.nan or s_ratio == np.inf: print(f" Mean Misc Ratio..... {ratio:.4f}") else: print(f" Mean Misc Ratio..... {ratio:.4f} +/-{s_ratio:.4f}") print(" ", 39*"=", f"{RESET}") n_v = n*(1.0/n_folds) n_t = n - n_v print(f"Equivalent to {n_folds:.0f} splits each with "+ f"{n_t:.0f}/{n_v:.0f} Cases") def print_summary(train_acc, val_acc): train_misc = 1.0 - train_acc val_misc = 1.0 - val_acc ratio_acc = train_acc / val_acc if val_acc>0 else np.inf if train_misc>0: ratio_misc = val_misc / train_misc elif val_misc>0: ratio_misc = np.inf else: ratio_misc = 1.0 #print accuracy and misclassification summary for train/validation print(f"{GREEN}{'TRAIN':>28s} {'VALIDATION':>11s} {'RATIO':>7s}") if ratio_acc < 1.2: color = GREEN else: color = RED print(f"{GREEN} {'ACCURACY':.<20s}{GOLD}{train_acc:>7.4f}", f" {val_acc:>7.4f} {color}{ratio_acc:>7.4f}{RESET}") if ratio_misc < 1.2: color = GREEN else: color = RED print(f"{GREEN} {'MISCLASSIFICATION':.<20s}{GOLD}{train_misc:>7.4f}", f" {val_misc:>7.4f} {color}{ratio_misc:>7.4f}{RESET}") print(f"{TEAL}","-"*47, f"{RESET}") # Read the data data = pd.read_csv('../data/ai_dev_productivity.csv') # Display basic information about the dataset print("Dataset shape:", data.shape) print("\nFirst 5 rows:") print(data.head()) print("\nData types:") print(data.dtypes) print("\nSummary statistics:") print(data.describe()) # Step 2: ReplaceImputeEncode (RIE) Processing lbl = "Step 2: ReplaceImputeEncode (RIE) Processing" print_boundary(lbl) # Create data map based on data dictionary data_map = { "hours_coding": [DT.Interval, (0, 12)], "coffee_intake_mg": [DT.Interval, (0, 1000)], "distractions": [DT.Interval, (0, 10)], "sleep_hours": [DT.Interval, (3, 12)], "commits": [DT.Interval, (0, 15)], "bugs_reported": [DT.Interval, (0, 9)], "ai_usage_hours": [DT.Interval, (0, 8)], "cognitive_load": [DT.Interval, (0, 10)], "complexity": [DT.Nominal, ("low", "mid", "high")], "experience": [DT.Nominal, (1, 2, 3)], "task_success": [DT.Binary, (0, 1)] } # Set target variable target = "task_success" # Apply ReplaceImputeEncode preprocessing rie = ReplaceImputeEncode(data_map=data_map, interval_scale=None, # No interval scaling no_impute=[target], # Do not impute target binary_encoding="one-hot", nominal_encoding="one-hot", drop=False, # Keep all columns display=True) # Transform the data encoded_data = rie.fit_transform(data) print(f"\nEncoded data shape: {encoded_data.shape[0]} cases and {encoded_data.shape[1]} columns") # Step 3: Random Forest Hyperparameter Optimization lbl = "Step 3: Random Forest Hyperparameter Optimization" print_boundary(lbl) # Separate features and target y = encoded_data[target] X = encoded_data.drop(target, axis=1) # Define hyperparameter grid for Random Forest # Dynamic Grid Construction based on Dr. Jones' Rules N, K = X.shape # 1. max_features: Adaptive to number of columns (predictors) max_features_list = ['sqrt'] # Also try specific counts if feasible possible_ints = [int(K * 0.2), int(K * 0.333), int(K * 0.5)] # Try k/5, k/3, k/2 for f in possible_ints: if f > 0 and f not in max_features_list: max_features_list.append(f) max_features_list.append(None) # Add None (all features) at the end # 2. min_samples_leaf: Start at 0.5% of N min_leaf_base = int(max(1, N * 0.005)) leaf_list = [min_leaf_base, min_leaf_base*2, min_leaf_base*4] # Remove duplicates and sort leaf_list = sorted(list(set(leaf_list))) param_grid = { 'n_estimators': [50, 100, 200], # Expanded 'max_depth': [5, 10, 15, None], # Depth is less critical in RF than single tree, but good to vary 'min_samples_split': [2, 5, 10], 'min_samples_leaf': leaf_list, 'max_features': max_features_list, 'criterion': ['gini', 'entropy'], 'bootstrap': [True] } # Calculate total combinations total_combinations = 1 for param_list in param_grid.values(): total_combinations *= len(param_list) total_fits = total_combinations * 5 # 5-fold CV print(f"Grid Search: {total_combinations} parameter combinations") print(f"Using 5-fold CV requires {total_fits} total fits") print(f"Optimizing for accuracy (minimizing misclassification)") # Perform grid search with cross-validation rf_grid = GridSearchCV( RandomForestClassifier(random_state=42), param_grid=param_grid, cv=4, scoring='accuracy', n_jobs=-1, verbose=1 ) print("\nStarting grid search...") rf_grid.fit(X, y) # Display best parameters and score print(f"\nBest parameters found:") for param, value in rf_grid.best_params_.items(): print(f" {param}: {value}") print(f"\nBest cross-validation accuracy: {rf_grid.best_score_:.4f}") print(f"Best cross-validation misclassification: {1 - rf_grid.best_score_:.4f}") # Get the best model best_rf = rf_grid.best_estimator_ # Step 4: Kitchen Sink Random Forest Evaluation lbl = "Step 4: Kitchen Sink Random Forest (Default Parameters)" print_boundary(lbl) # First show the overfitting on full data print("Fitting kitchen sink random forest using entire dataset") kitchen_sink_rf = RandomForestClassifier(random_state=42) # Default parameters kitchen_sink_rf = kitchen_sink_rf.fit(X, y) # Evaluate on training data (will show overfitting) print("Evaluating kitchen sink random forest on full dataset:") forest_classifier.display_metrics(kitchen_sink_rf, X, y) print("Note: These metrics show overfitting since we trained and tested on the same data") # Step 5: 70/30 Holdout Validation of Kitchen Sink Random Forest lbl = "Step 5: 70/30 Holdout Validation of Kitchen Sink Random Forest" print_boundary(lbl) # Split the data 70/30 X_train_ks, X_val_ks, y_train_ks, y_val_ks = train_test_split( X, y, test_size=0.3, stratify=y, random_state=42 ) # Fit kitchen sink random forest on training data only kitchen_sink_rf_cv = RandomForestClassifier(random_state=42) kitchen_sink_rf_cv = kitchen_sink_rf_cv.fit(X_train_ks, y_train_ks) # Evaluate using AdvancedAnalytics display_split_metrics forest_classifier.display_split_metrics(kitchen_sink_rf_cv, X_train_ks, y_train_ks, X_val_ks, y_val_ks) # Calculate and display accuracy ratio and misclassification ratio train_pred_ks_cv = kitchen_sink_rf_cv.predict(X_train_ks) val_pred_ks_cv = kitchen_sink_rf_cv.predict(X_val_ks) train_acc_ks_cv = accuracy_score(y_train_ks, train_pred_ks_cv) val_acc_ks_cv = accuracy_score(y_val_ks, val_pred_ks_cv) lbl = "Kitchen Sink 70/30 Validation" print_boundary(lbl, 47) print_summary(train_acc_ks_cv, val_acc_ks_cv) # Step 6: Holdout Validation with Optimized Random Forest lbl = "Step 6: Holdout Validation with Optimized Random Forest" print_boundary(lbl) # Split the data 70/30 (same split for consistency) X_train_opt, X_val_opt, y_train_opt, y_val_opt = train_test_split( X, y, test_size=0.3, stratify=y, random_state=42 ) # Train optimized model on training set optimized_rf = best_rf optimized_rf = optimized_rf.fit(X_train_opt, y_train_opt) # Evaluate using AdvancedAnalytics display_split_metrics forest_classifier.display_split_metrics(optimized_rf, X_train_opt, y_train_opt, X_val_opt, y_val_opt) # Display feature importance for the optimized model print(f"{GOLD}\nTop 10 Feature Importance (from optimized model):") forest_classifier.display_importance(optimized_rf, X.columns, top=10, plot=False) print(f"{RESET}") # Step 7: K-Fold Cross-Validation with Optimized Model lbl = "Step 7: K-Fold Cross-Validation with Optimized Model" print_boundary(lbl) from sklearn.model_selection import cross_validate n = X.shape[0] best_val_acc = 0 print(f"Testing K-fold CV from 2-fold to 10-fold on {n} observations") print("Finding optimal K that maximizes validation accuracy") for k in range(2, 11): # Test 2-fold through 10-fold CV scores = cross_validate( best_rf, X, y, scoring='accuracy', cv=k, return_train_score=True ) # Use the proper print_acc_ratio function from ai_dev_tree.py print_acc_ratio(scores, n) # Calculate metrics for finding best K train_acc_cv = scores["train_score"].mean() val_acc_cv = scores["test_score"].mean() if val_acc_cv > best_val_acc: best_k = k best_train_acc = train_acc_cv best_val_acc = val_acc_cv print(f"\n{GOLD}Best K: {best_k}-fold{GOLD}") print(f"Training Accuracy: {best_train_acc:.4f}") print(f"Validation Accuracy: {best_val_acc:.4f}{RESET}") # Step 8: Final Model Summary lbl = "Step 8: Final Model Summary" print_boundary(lbl, 47) print("Random Forest Model Validation Complete") print(f"Best hyperparameters found via GridSearchCV with 4-fold CV") print(f"Final validation using {best_k}-fold cross-validation") print(".4f") # Calculate final performance metrics from the optimized holdout validation final_train_acc = accuracy_score(y_train_opt, optimized_rf.predict(X_train_opt)) final_val_acc = accuracy_score(y_val_opt, optimized_rf.predict(X_val_opt)) lbl = "Final Optimized Model Performance Summary" print_boundary(lbl, 47) print_summary(final_train_acc, final_val_acc)