| |
|
|
| |
| RED = "\033[38;5;197m" |
| GOLD = "\033[38;5;185m" |
| TEAL = "\033[38;5;50m" |
| GREEN = "\033[38;5;82m" |
| RESET = "\033[0m" |
|
|
| import pandas as pd |
| import numpy as np |
| from sklearn.model_selection import train_test_split, GridSearchCV, cross_validate |
| from sklearn.ensemble import RandomForestClassifier |
| from sklearn.metrics import accuracy_score, classification_report, confusion_matrix |
| from AdvancedAnalytics.ReplaceImputeEncode import DT, ReplaceImputeEncode |
| from AdvancedAnalytics.Forest import forest_classifier |
|
|
| def print_boundary(lbl, b_width=60): |
| print("") |
| margin = b_width - len(lbl) - 2 |
| lmargin = int(margin/2) |
| rmargin = lmargin |
| if lmargin+rmargin < margin: |
| lmargin += 1 |
| print(f"{TEAL}", "="*b_width, f"{RESET}") |
| print(f"{GREEN}", lmargin*"*", lbl, rmargin*"*", f"{RESET}") |
| print(f"{TEAL}", "="*b_width, f"{RESET}") |
|
|
| def print_acc_ratio(scores, n): |
| n_folds = len(scores["train_score"]) |
| train_misc = (1.0 - scores["train_score"]) |
| train_smisc = 2.0*(1.0 - scores["train_score"]).std() |
| val_misc = (1.0 - scores["test_score"]) |
| val_smisc = 2.0*(1.0 - scores["test_score"]).std() |
| ratio_misc = np.zeros(n_folds) |
| for i in range(0, n_folds): |
| if train_misc[i]>0: |
| ratio_misc[i] = val_misc[i] / train_misc[i] |
| elif val_misc[i]>0: |
| ratio_misc[i] = np.inf |
| else: |
| ratio_misc[i] = 1.0 |
| try: |
| s_ratio = 2.0*ratio_misc.std() |
| except: |
| s_ratio = np.nan |
| train_misc = train_misc.mean() |
| val_misc = val_misc.mean() |
| ratio = val_misc/train_misc if train_misc>0 else np.inf |
| print(f"{TEAL}\n") |
| print(f" ====== {n_folds:.0f}-Fold Cross Validation =======") |
| print(f" Train Avg. MISC..... {train_misc:.4f} +/-{train_smisc:.4f}") |
| print(f" Test Avg. MISC..... {val_misc:.4f} +/-{val_smisc:.4f}") |
| if s_ratio == np.nan or s_ratio == np.inf: |
| print(f" Mean Misc Ratio..... {ratio:.4f}") |
| else: |
| print(f" Mean Misc Ratio..... {ratio:.4f} +/-{s_ratio:.4f}") |
| print(" ", 39*"=", f"{RESET}") |
| n_v = n*(1.0/n_folds) |
| n_t = n - n_v |
| print(f"Equivalent to {n_folds:.0f} splits each with "+ |
| f"{n_t:.0f}/{n_v:.0f} Cases") |
|
|
| def print_summary(train_acc, val_acc): |
| train_misc = 1.0 - train_acc |
| val_misc = 1.0 - val_acc |
| ratio_acc = train_acc / val_acc if val_acc>0 else np.inf |
| if train_misc>0: |
| ratio_misc = val_misc / train_misc |
| elif val_misc>0: |
| ratio_misc = np.inf |
| else: |
| ratio_misc = 1.0 |
| |
| print(f"{GREEN}{'TRAIN':>28s} {'VALIDATION':>11s} {'RATIO':>7s}") |
| if ratio_acc < 1.2: |
| color = GREEN |
| else: |
| color = RED |
| print(f"{GREEN} {'ACCURACY':.<20s}{GOLD}{train_acc:>7.4f}", |
| f" {val_acc:>7.4f} {color}{ratio_acc:>7.4f}{RESET}") |
|
|
| if ratio_misc < 1.2: |
| color = GREEN |
| else: |
| color = RED |
| print(f"{GREEN} {'MISCLASSIFICATION':.<20s}{GOLD}{train_misc:>7.4f}", |
| f" {val_misc:>7.4f} {color}{ratio_misc:>7.4f}{RESET}") |
| print(f"{TEAL}","-"*47, f"{RESET}") |
|
|
| |
| data = pd.read_csv('../data/ai_dev_productivity.csv') |
|
|
| |
| print("Dataset shape:", data.shape) |
| print("\nFirst 5 rows:") |
| print(data.head()) |
| print("\nData types:") |
| print(data.dtypes) |
| print("\nSummary statistics:") |
| print(data.describe()) |
|
|
| |
| lbl = "Step 2: ReplaceImputeEncode (RIE) Processing" |
| print_boundary(lbl) |
|
|
| |
| data_map = { |
| "hours_coding": [DT.Interval, (0, 12)], |
| "coffee_intake_mg": [DT.Interval, (0, 1000)], |
| "distractions": [DT.Interval, (0, 10)], |
| "sleep_hours": [DT.Interval, (3, 12)], |
| "commits": [DT.Interval, (0, 15)], |
| "bugs_reported": [DT.Interval, (0, 9)], |
| "ai_usage_hours": [DT.Interval, (0, 8)], |
| "cognitive_load": [DT.Interval, (0, 10)], |
| "complexity": [DT.Nominal, ("low", "mid", "high")], |
| "experience": [DT.Nominal, (1, 2, 3)], |
| "task_success": [DT.Binary, (0, 1)] |
| } |
|
|
| |
| target = "task_success" |
|
|
| |
| rie = ReplaceImputeEncode(data_map=data_map, |
| interval_scale=None, |
| no_impute=[target], |
| binary_encoding="one-hot", |
| nominal_encoding="one-hot", |
| drop=False, |
| display=True) |
|
|
| |
| encoded_data = rie.fit_transform(data) |
| print(f"\nEncoded data shape: {encoded_data.shape[0]} cases and {encoded_data.shape[1]} columns") |
|
|
| |
| lbl = "Step 3: Random Forest Hyperparameter Optimization" |
| print_boundary(lbl) |
|
|
| |
| y = encoded_data[target] |
| X = encoded_data.drop(target, axis=1) |
|
|
| |
| |
| N, K = X.shape |
|
|
| |
| max_features_list = ['sqrt'] |
| |
| possible_ints = [int(K * 0.2), int(K * 0.333), int(K * 0.5)] |
| for f in possible_ints: |
| if f > 0 and f not in max_features_list: |
| max_features_list.append(f) |
| max_features_list.append(None) |
|
|
| |
| min_leaf_base = int(max(1, N * 0.005)) |
| leaf_list = [min_leaf_base, min_leaf_base*2, min_leaf_base*4] |
| |
| leaf_list = sorted(list(set(leaf_list))) |
|
|
| param_grid = { |
| 'n_estimators': [50, 100, 200], |
| 'max_depth': [5, 10, 15, None], |
| 'min_samples_split': [2, 5, 10], |
| 'min_samples_leaf': leaf_list, |
| 'max_features': max_features_list, |
| 'criterion': ['gini', 'entropy'], |
| 'bootstrap': [True] |
| } |
|
|
| |
| total_combinations = 1 |
| for param_list in param_grid.values(): |
| total_combinations *= len(param_list) |
| total_fits = total_combinations * 5 |
|
|
| print(f"Grid Search: {total_combinations} parameter combinations") |
| print(f"Using 5-fold CV requires {total_fits} total fits") |
| print(f"Optimizing for accuracy (minimizing misclassification)") |
|
|
| |
| rf_grid = GridSearchCV( |
| RandomForestClassifier(random_state=42), |
| param_grid=param_grid, |
| cv=4, |
| scoring='accuracy', |
| n_jobs=-1, |
| verbose=1 |
| ) |
|
|
| print("\nStarting grid search...") |
| rf_grid.fit(X, y) |
|
|
| |
| print(f"\nBest parameters found:") |
| for param, value in rf_grid.best_params_.items(): |
| print(f" {param}: {value}") |
|
|
| print(f"\nBest cross-validation accuracy: {rf_grid.best_score_:.4f}") |
| print(f"Best cross-validation misclassification: {1 - rf_grid.best_score_:.4f}") |
|
|
| |
| best_rf = rf_grid.best_estimator_ |
|
|
| |
| lbl = "Step 4: Kitchen Sink Random Forest (Default Parameters)" |
| print_boundary(lbl) |
|
|
| |
| print("Fitting kitchen sink random forest using entire dataset") |
| kitchen_sink_rf = RandomForestClassifier(random_state=42) |
| kitchen_sink_rf = kitchen_sink_rf.fit(X, y) |
|
|
| |
| print("Evaluating kitchen sink random forest on full dataset:") |
| forest_classifier.display_metrics(kitchen_sink_rf, X, y) |
| print("Note: These metrics show overfitting since we trained and tested on the same data") |
|
|
| |
| lbl = "Step 5: 70/30 Holdout Validation of Kitchen Sink Random Forest" |
| print_boundary(lbl) |
|
|
| |
| X_train_ks, X_val_ks, y_train_ks, y_val_ks = train_test_split( |
| X, y, test_size=0.3, stratify=y, random_state=42 |
| ) |
|
|
| |
| kitchen_sink_rf_cv = RandomForestClassifier(random_state=42) |
| kitchen_sink_rf_cv = kitchen_sink_rf_cv.fit(X_train_ks, y_train_ks) |
|
|
| |
| forest_classifier.display_split_metrics(kitchen_sink_rf_cv, |
| X_train_ks, y_train_ks, |
| X_val_ks, y_val_ks) |
|
|
| |
| train_pred_ks_cv = kitchen_sink_rf_cv.predict(X_train_ks) |
| val_pred_ks_cv = kitchen_sink_rf_cv.predict(X_val_ks) |
| train_acc_ks_cv = accuracy_score(y_train_ks, train_pred_ks_cv) |
| val_acc_ks_cv = accuracy_score(y_val_ks, val_pred_ks_cv) |
|
|
| lbl = "Kitchen Sink 70/30 Validation" |
| print_boundary(lbl, 47) |
| print_summary(train_acc_ks_cv, val_acc_ks_cv) |
|
|
| |
| lbl = "Step 6: Holdout Validation with Optimized Random Forest" |
| print_boundary(lbl) |
|
|
| |
| X_train_opt, X_val_opt, y_train_opt, y_val_opt = train_test_split( |
| X, y, test_size=0.3, stratify=y, random_state=42 |
| ) |
|
|
| |
| optimized_rf = best_rf |
| optimized_rf = optimized_rf.fit(X_train_opt, y_train_opt) |
|
|
| |
| forest_classifier.display_split_metrics(optimized_rf, |
| X_train_opt, y_train_opt, |
| X_val_opt, y_val_opt) |
|
|
| |
| print(f"{GOLD}\nTop 10 Feature Importance (from optimized model):") |
| forest_classifier.display_importance(optimized_rf, X.columns, top=10, plot=False) |
| print(f"{RESET}") |
|
|
| |
| lbl = "Step 7: K-Fold Cross-Validation with Optimized Model" |
| print_boundary(lbl) |
|
|
| from sklearn.model_selection import cross_validate |
|
|
| n = X.shape[0] |
| best_val_acc = 0 |
|
|
| print(f"Testing K-fold CV from 2-fold to 10-fold on {n} observations") |
| print("Finding optimal K that maximizes validation accuracy") |
|
|
| for k in range(2, 11): |
| scores = cross_validate( |
| best_rf, X, y, scoring='accuracy', |
| cv=k, return_train_score=True |
| ) |
|
|
| |
| print_acc_ratio(scores, n) |
|
|
| |
| train_acc_cv = scores["train_score"].mean() |
| val_acc_cv = scores["test_score"].mean() |
|
|
| if val_acc_cv > best_val_acc: |
| best_k = k |
| best_train_acc = train_acc_cv |
| best_val_acc = val_acc_cv |
|
|
| print(f"\n{GOLD}Best K: {best_k}-fold{GOLD}") |
| print(f"Training Accuracy: {best_train_acc:.4f}") |
| print(f"Validation Accuracy: {best_val_acc:.4f}{RESET}") |
|
|
| |
| lbl = "Step 8: Final Model Summary" |
| print_boundary(lbl, 47) |
|
|
| print("Random Forest Model Validation Complete") |
| print(f"Best hyperparameters found via GridSearchCV with 4-fold CV") |
| print(f"Final validation using {best_k}-fold cross-validation") |
| print(".4f") |
|
|
| |
| final_train_acc = accuracy_score(y_train_opt, optimized_rf.predict(X_train_opt)) |
| final_val_acc = accuracy_score(y_val_opt, optimized_rf.predict(X_val_opt)) |
|
|
| lbl = "Final Optimized Model Performance Summary" |
| print_boundary(lbl, 47) |
| print_summary(final_train_acc, final_val_acc) |
|
|