#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ @author: Edward R Jones @version 1.34 @copyright 2020 - Edward R Jones, all rights reserved. """ import sys import warnings from copy import deepcopy #Used to create sentiment word dictionary import numpy as np import pandas as pd from math import sqrt, log, pi import statsmodels.api as sm from sklearn.metrics import median_absolute_error from sklearn.metrics import accuracy_score, precision_score, recall_score from sklearn.metrics import mean_absolute_error, \ mean_squared_error, r2_score from sklearn.metrics import f1_score, confusion_matrix, \ classification_report class linreg(object): def display_coef(lr, X, y, col=None): if type(col)==type(None): try: col = X.columns except: raise RuntimeError(" Call to display_coef is Invalid.\n"+ " When X is not a pandas dataframe. Parameter col "+ "required.") if len(col)!=X.shape[1]: raise RuntimeError(" Call to display_coef is Invalid.\n"+\ " Number of Coefficient Names is not equal to the"+\ " Number of Columns in X") sys.exit() max_label = len('Intercept')+2 for i in range(len(col)): if len(col[i]) > max_label: max_label = len(col[i]) label_format = ("{:.<%i" %max_label)+"s}{:15.4f}" if type(lr) != sm.regression.linear_model.RegressionResultsWrapper: print("TYPE: ", type(lr)) print(label_format.format('Intercept', lr.intercept_)) for i in range(X.shape[1]): print(label_format.format(col[i], lr.coef_[i])) else: for i in range(X.shape[1]): print(label_format.format(col[i], lr.params[i])) def display_metrics(lr, X, y, w=None): predictions = lr.predict(X) n = X.shape[0] p = X.shape[1] # Notations uses Sheather's convention k = p+2 # need to count the estimated variance and intercept print("\nModel Metrics") print("{:.<23s}{:15d}".format('Observations', n)) print("{:.<23s}{:15d}".format('Coefficients', p+1)) print("{:.<23s}{:15d}".format('DF Error', X.shape[0]-X.shape[1]-1)) if type(w)==np.ndarray: R2 = r2_score(y, predictions, sample_weight=w) n = w.sum() else: R2 = r2_score(y, predictions) print("{:.<23s}{:15.4f}".format('R-Squared', R2)) adjr2 = 1.0-R2 adjr2 = ((n-1)/(n-p-1))*adjr2 adjr2 = 1.0 - adjr2 print("{:.<23s}{:15.4f}".format('Adj. R-Squared', adjr2)) if type(w)==np.ndarray: MAE = mean_absolute_error(y,predictions, sample_weight=w) else: MAE = mean_absolute_error(y,predictions) print("{:.<23s}{:15.4f}".format('Mean Absolute Error', MAE)) MAE = median_absolute_error(y,predictions) print("{:.<23s}{:15.4f}".format('Median Absolute Error', MAE)) if type(w)==np.ndarray: ASE = mean_squared_error(y,predictions, sample_weight=w) else: ASE = mean_squared_error(y,predictions) print("{:.<23s}{:15.4f}".format('Avg Squared Error', ASE)) print("{:.<23s}{:15.4f}".format('Square Root ASE', sqrt(ASE))) if ASE<1e-20: twoLL = -np.inf LL = twoLL else: twoLL = n*(log(2*pi) + 1.0 + log(ASE)) LL = twoLL/(-2.0) print("{:.<23s}{:15.4f}".format('Log(Likelihood)', LL)) AIC = twoLL + 2*k print("{:.<23s}{:15.4f}".format('AIC ', AIC)) if (n-k-1)>0: AICc = AIC + 2*k*(k+1)/(n-k-1) else: AICc = AIC + 2*k*(k+1) print("{:.<23s}{:15.4f}".format('AICc ', AICc)) BIC = twoLL + log(n)*k print("{:.<23s}{:15.4f}".format('BIC ', BIC)) def return_metrics(lr, X, y, w=None): metrics = [0, 0, 0, 0] predictions = lr.predict(X) n = X.shape[0] p = X.shape[1] # Notations uses Sheather's convention k = p+2 # need to count the estimated variance and intercept if type(w)==np.ndarray: R2 = r2_score(y, predictions, sample_weight=w) n = w.sum() else: R2 = r2_score(y, predictions) adjr2 = 1.0-R2 adjr2 = ((n-1)/(n-p-1))*adjr2 adjr2 = 1.0 - adjr2 metrics[0] = adjr2 if type(w)==np.ndarray: ASE = mean_squared_error(y,predictions, sample_weight=w) else: ASE = mean_squared_error(y,predictions) if ASE<1e-20: twoLL = -np.inf else: twoLL = n*(log(2*pi) + 1.0 + log(ASE)) AIC = twoLL + 2*k metrics[1] = AIC if (n-k-1)>0: AICc = AIC + 2*k*(k+1)/(n-k-1) else: AICc = AIC + 2*k*(k+1) metrics[2] = AICc BIC = twoLL + log(n)*k metrics[3] = BIC return metrics def display_split_metrics(lr, Xt, yt, Xv, yv, wt=None, wv=None): predict_t = lr.predict(Xt) predict_v = lr.predict(Xv) nt = Xt.shape[0] pt = Xt.shape[1] # Notations uses Sheather's convention kt = pt+2 # need to count the estimated variance and intercept nv = Xv.shape[0] pv = Xv.shape[1] # Notations uses Sheather's convention kv = pv+2 # need to count the estimated variance and intercept print("\n") print("{:.<23s}{:>15s}{:>15s}".format('Model Metrics', \ 'Training', 'Validation')) print("{:.<23s}{:15d}{:15d}".format('Observations', \ Xt.shape[0], Xv.shape[0])) print("{:.<23s}{:15d}{:15d}".format('Coefficients', \ Xt.shape[1]+1, Xv.shape[1]+1)) print("{:.<23s}{:15d}{:15d}".format('DF Error', \ Xt.shape[0]-Xt.shape[1]-1, Xv.shape[0]-Xv.shape[1]-1)) R2t = r2_score(yt, predict_t) R2v = r2_score(yv, predict_v) print("{:.<23s}{:15.4f}{:15.4f}".format('R-Squared', R2t, R2v)) adjr2t = 1.0-R2t adjr2t = ((nt-1)/(nt-pt-1))*adjr2t adjr2t = 1.0 - adjr2t adjr2v = 1.0-R2v adjr2v = ((nv-1)/(nv-pv-1))*adjr2v adjr2v = 1.0 - adjr2v print("{:.<23s}{:15.4f}{:15.4f}".format('Adj. R-Squared', \ adjr2t, adjr2v)) print("{:.<23s}{:15.4f}{:15.4f}".format('Mean Absolute Error', \ mean_absolute_error(yt,predict_t), \ mean_absolute_error(yv,predict_v))) print("{:.<23s}{:15.4f}{:15.4f}".format('Median Absolute Error', \ median_absolute_error(yt,predict_t), \ median_absolute_error(yv,predict_v))) ASEt = mean_squared_error(yt,predict_t) ASEv = mean_squared_error(yv,predict_v) print("{:.<23s}{:15.4f}{:15.4f}".format('Avg Squared Error', \ ASEt, ASEv)) print("{:.<23s}{:15.4f}{:15.4f}".format('Square Root ASE', \ sqrt(ASEt), sqrt(ASEv))) if ASEt<1e-20: twoLLt = -np.inf LLt = twoLLt else: twoLLt = nt*(log(2*pi) + 1.0 + log(ASEt)) LLt = twoLLt/(-2.0) if ASEv<1e-20: twoLLv = -np.inf LLv = twoLLv else: twoLLv = nv*(log(2*pi) + 1.0 + log(ASEv)) LLv = twoLLv/(-2.0) print("{:.<23s}{:15.4f}{:15.4f}".format('Log Likelihood', \ LLt, LLv)) AICt = twoLLt + 2*kt AICv = twoLLv + 2*kv print("{:.<23s}{:15.4f}{:15.4f}".format('AIC ', \ AICt, AICv)) if (nt-kt-1)>0: AICct = AICt + 2*kt*(kt+1)/(nt-kt-1) else: AICct = AICt + 2*kt*(kt+1) if (nv-kv-1)>0: AICcv = AICv + 2*kv*(kv+1)/(nv-kv-1) else: AICcv = AICv + 2*kv*(kv+1) print("{:.<23s}{:15.4f}{:15.4f}".format('AICc ', \ AICct, AICcv)) BICt = twoLLt + log(nt)*kt BICv = twoLLv + log(nv)*kv print("{:.<23s}{:15.4f}{:15.4f}".format('BIC ', \ BICt, BICv)) class logreg(object): def display_coef(lr, X, y, col=None): if type(col)==type(None): try: col = X.columns except: raise RuntimeError(" Call to display_coef is Invalid.\n"+ " When X is not a pandas dataframe. Parameter col "+ "required.") if len(col)!=X.shape[1]: raise RuntimeError(" Call to display_coef is Invalid.\n"+\ " Number of Coefficient Names is not equal to the"+\ " Number of Columns in X") sys.exit() max_label = len('Intercept')+2 for i in range(len(col)): if len(col[i]) > max_label: max_label = len(col[i]) label_format = ("{:.<%i" %max_label)+"s}{:15.4f}" if type(y) == np.ndarray: k = len(np.unique(y)) #numpy array else: k = len(lr.classes_) #pandas vector nx = X.shape[1] k2 = k if k <=2: k2 = 1 for j in range(k2): if k == 2: print("\nCoefficients:") else: print("\nCoefficients for Target Class", lr.classes_[j]) print(label_format.format('Intercept', lr.intercept_[j])) for i in range(nx): print(label_format.format(col[i], lr.coef_[j,i])) def display_confusion(conf_mat): if len(conf_mat) != 2: raise RuntimeError(" Call to display_confustion invalid"+\ " Argument is not a 2x2 Matrix.") sys.exit() TP = int(conf_mat[1][1]) TN = int(conf_mat[0][0]) FP = int(conf_mat[0][1]) FN = int(conf_mat[1][0]) n_neg = TN + FP n_pos = FN + TP n_pneg = TN + FN n_ppos = FP + TP n_obs = n_neg + n_pos print("\nModel Metrics") print("{:.<27s}{:10d}".format('Observations', n_obs)) acc = np.nan pre = np.nan tpr = np.nan tnr = np.nan f1 = np.nan misc = np.nan miscc = [np.nan, np.nan] if n_obs>0: acc = (TP+TN)/n_obs print("{:.<27s}{:10.4f}".format('Accuracy', acc)) if (TP+FP)>0: pre = TP/(TP+FP) print("{:.<27s}{:10.4f}".format('Precision', pre)) if (TP+FN)>0: tpr = TP/(TP+FN) print("{:.<27s}{:10.4f}".format('Sensitivity (Recall)', tpr)) if (TN+FP)>0: tnr = TN/(TN+FP) print("{:.<27s}{:10.4f}".format('Specificity (Selectivity)', tnr)) if (2*TP+FP+FN)>0: f1 = 2*TP/(2*TP + FP + FN) print("{:.<27s}{:10.4f}".format('F1-Score', f1)) if n_obs>0: misc = 100*(FN + FP)/n_obs print("{:.<27s}{:9.1f}{:s}".format(\ 'MISC (Misclassification)', misc, '%')) if n_neg>0 and n_pos>0: miscc = [100*conf_mat[0][1]/n_neg, 100*conf_mat[1][0]/n_pos] lrcc = [0, 1] for i in range(2): print("{:s}{:.<16.0f}{:>9.1f}{:<1s}".format(\ ' class ', lrcc[i], miscc[i], '%')) print("\n\n Confusion") print(" Matrix ", end="") for i in range(2): print("{:>7s}{:<3.0f}".format('Class ', lrcc[i]), end="") print("") for i in range(2): print("{:s}{:.<6.0f}".format('Class ', lrcc[i]), end="") for j in range(2): print("{:>10d}".format(int(conf_mat[i][j])), end="") print("") def display_metrics(lr, X, y): if len(lr.classes_) == 2: y_ = np.ravel(y) # necessary because yt is a df with row keys if type(y_[0])==str: classes_ = lr.classes_ else: classes_ = [str(int(lr.classes_[0])), str(int(lr.classes_[1]))] z = np.zeros(len(y_)) predictions = lr.predict(X) # get binary class predictions conf_mat = confusion_matrix(y_true=y, y_pred=predictions) tmisc = conf_mat[0][1]+conf_mat[1][0] misc = 100*(tmisc)/(len(y_)) for i in range(len(y_)): if y_[i] == 1: z[i] = 1 #probability = lr.predict_proba(X) # get binary probabilities try: probability = lr.predict_proba(X) except: probability = lr._predict_proba_lr(X) print("\nModel Metrics") print("{:.<27s}{:10d}".format('Observations', X.shape[0])) print("{:.<27s}{:10d}".format('Coefficients', X.shape[1]+1)) print("{:.<27s}{:10d}".format('DF Error', X.shape[0]-X.shape[1]-1)) if lr.n_iter_ == None: print("{:.<27s}{:>10s}".format('Iterations', 'None')) elif type(lr.n_iter_)==np.ndarray: print("{:.<27s}{:10d}".format('Iterations', lr.n_iter_[0])) else: print("{:.<27s}{:10d}".format('Iterations', lr.n_iter_)) print("{:.<27s}{:10.4f}".format('Mean Absolute Error', \ mean_absolute_error(z,probability[:, 1]))) print("{:.<27s}{:10.4f}".format('Avg Squared Error', \ mean_squared_error(z,probability[:, 1]))) acc = accuracy_score(y, predictions) print("{:.<27s}{:10.4f}".format('Accuracy', acc)) if type(y_[0]) == str: pre = precision_score(y, predictions, pos_label=classes_[1]) tpr = recall_score(y, predictions, pos_label=classes_[1]) tpr0 = recall_score(y, predictions, pos_label=classes_[0]) f1 = f1_score(y, predictions, pos_label=classes_[1]) else: pre = precision_score(y, predictions) tpr = recall_score(y, predictions) tpr0 = recall_score(y, predictions, pos_label=0) f1 = f1_score(y, predictions) print("{:.<27s}{:10.4f}".format('Precision', pre)) print("{:.<27s}{:10.4f}".format('Recall (Sensitivity)', tpr)) print("{:.<27s}{:10.4f}".format('Specificity', tpr0)) print("{:.<27s}{:10.4f}".format('F1-Score', f1)) print("{:.<27s}{:10d}".format(\ 'Total Misclassifications', tmisc)) print("{:.<27s}{:9.1f}{:s}".format(\ 'MISC (Misclassification)', misc, '%')) n_ = [conf_mat[0][0]+conf_mat[0][1], conf_mat[1][0]+conf_mat[1][1]] miscc = [100*conf_mat[0][1]/n_[0], 100*conf_mat[1][0]/n_[1]] for i in range(2): print("{:s}{:<16s}{:>9.1f}{:<1s}".format(\ ' class ', classes_[i], miscc[i], '%')) print("\n\n Confusion Class Class") print(" Matrix", end="") print("{:1s}{:>10s}{:>10s}".format(" ", classes_[0], classes_[1])) for i in range(2): print("{:s}{:.<6s}".format(' Class ', classes_[i]), end="") for j in range(2): print("{:>10d}".format(conf_mat[i][j]), end="") print("") print("") else: n_classes = len(lr.classes_) predict_ = lr.predict(X) try: prob_ = lr.predict_proba(X) #ver>=21 except: prob_ = lr._predict_proba_lr(X) ase_sum = 0 mase_sum = 0 misc_ = 0 misc = [] n_ = [] n_obs = y.shape[0] conf_mat = [] for i in range(n_classes): z = [] for j in range(n_classes): z.append(0) conf_mat.append(z) y_ = np.ravel(y) # necessary because yt is a df with row keys for i in range(n_classes): misc.append(0) n_.append(0) for i in range(n_obs): for j in range(n_classes): if y_[i] == lr.classes_[j]: ase_sum += (1-prob_[i,j])*(1-prob_[i,j]) mase_sum += 1-prob_[i,j] idx = j else: ase_sum += prob_[i,j]*prob_[i,j] mase_sum += prob_[i,j] for j in range(n_classes): if predict_[i] == lr.classes_[j]: conf_mat[idx][j] += 1 break n_[idx] += 1 if predict_[i] != y_[i]: misc_ += 1 misc[idx] += 1 tmisc = misc_ misc_ = 100*misc_/n_obs ase = ase_sum/(n_classes*n_obs) mase = mase_sum/(n_classes*n_obs) print("\nModel Metrics") print("{:.<27s}{:10d}".format('Observations', X.shape[0])) n_coef = len(lr.coef_)*(len(lr.coef_[0])+1) print("{:.<27s}{:10d}".format('Coefficients', n_coef)) print("{:.<27s}{:10d}".format('DF Error', X.shape[0]-n_coef)) print("{:.<27s}{:10d}".format('Iterations', lr.n_iter_.max())) print("{:.<27s}{:10.4f}".format('Avg Squared Error', ase)) print("{:.<27s}{:10.4f}".format('Root ASE', sqrt(ase))) print("{:.<27s}{:10.4f}".format('Mean Absolute Error', mase)) acc = accuracy_score(y_, predict_) print("{:.<27s}{:10.4f}".format('Accuracy', acc)) pre = precision_score(y_, predict_, average='macro') print("{:.<27s}{:10.4f}".format('Precision', pre)) tpr = recall_score(y_, predict_, average='macro') print("{:.<27s}{:10.4f}".format('Recall (Sensitivity)', tpr)) f1 = f1_score(y_,predict_, average='macro') print("{:.<27s}{:10.4f}".format('F1-Score', f1)) print("{:.<27s}{:10d}".format(\ 'Total Misclassifications', tmisc)) print("{:.<27s}{:9.1f}{:s}".format(\ 'MISC (Misclassification)', misc_, '%')) if type(lr.classes_[0]) == str: fstr = "{:s}{:.<16s}{:>9.1f}{:<1s}" else: fstr = "{:s}{:.<16.0f}{:>9.1f}{:<1s}" for i in range(n_classes): misc[i] = 100*misc[i]/n_[i] print(fstr.format(\ ' class ', lr.classes_[i], misc[i], '%')) print("\n\n Confusion") print(" Matrix ", end="") fstr1 = "{:>7s}{:<3.0f}" if type(lr.classes_[0]) == str: fstr2 = "{:.<15s}" else: fstr2 = "{:s}{:.<6.0f}" for i in range(n_classes): if type(lr.classes_[0]) == str: print(fstr1.format('Class ', i), end="") else: print(fstr1.format('Class ', lr.classes_[i]), end="") print("") for i in range(n_classes): if type(lr.classes_[0]) == str: print(fstr2.format(str(i)+" "+lr.classes_[i]), end="") else: print(fstr2.format('Class ', lr.classes_[i]), end="") for j in range(n_classes): print("{:>10d}".format(conf_mat[i][j]), end="") print("") print("") cr = classification_report(y, predict_, labels=lr.classes_) print("\n",cr) def display_split_metrics(lr, Xt, yt, Xv, yv, target_names=None): if len(lr.classes_) == 2: yt_= np.ravel(yt) yv_= np.ravel(yv) if type(yt_[0])==str: classes_ = lr.classes_ else: classes_ = [str(int(lr.classes_[0])), str(int(lr.classes_[1]))] zt = np.zeros(len(yt_)) zv = np.zeros(len(yv_)) #zt = deepcopy(yt) for i in range(len(yt)): if yt_[i] == 1: zt[i] = 1 for i in range(len(yv)): if yv_[i] == 1: zv[i] = 1 predict_t = lr.predict(Xt) predict_v = lr.predict(Xv) conf_matt = confusion_matrix(y_true=yt_, y_pred=predict_t) conf_matv = confusion_matrix(y_true=yv_, y_pred=predict_v) prob_t = lr._predict_proba_lr(Xt) prob_v = lr._predict_proba_lr(Xv) #prob_t = lr.predict_proba(Xt) #prob_v = lr.predict_proba(Xv) print("\n") print("{:.<23s}{:>15s}{:>15s}".format('Model Metrics', \ 'Training', 'Validation')) print("{:.<23s}{:15d}{:15d}".format('Observations', \ Xt.shape[0], Xv.shape[0])) n_coef = len(lr.coef_)*(len(lr.coef_[0])+1) print("{:.<23s}{:15d}{:15d}".format('Coefficients', \ n_coef, n_coef)) print("{:.<23s}{:15d}{:15d}".format('DF Error', \ Xt.shape[0]-n_coef, Xv.shape[0]-n_coef)) print("{:.<23s}{:15d}{:15d}".format('Iterations', \ lr.n_iter_.max(), lr.n_iter_.max())) print("{:.<23s}{:15.4f}{:15.4f}".format('Mean Absolute Error', \ mean_absolute_error(zt,prob_t[:,1]), \ mean_absolute_error(zv,prob_v[:,1]))) print("{:.<23s}{:15.4f}{:15.4f}".format('Avg Squared Error', \ mean_squared_error(zt,prob_t[:,1]), \ mean_squared_error(zv,prob_v[:,1]))) acct = accuracy_score(yt_, predict_t) accv = accuracy_score(yv_, predict_v) print("{:.<23s}{:15.4f}{:15.4f}".format('Accuracy', acct, accv)) if type(yt_[0])==str: pre_t = precision_score(yt, predict_t, pos_label=classes_[1]) tpr_t = recall_score(yt, predict_t, pos_label=classes_[1]) f1_t = f1_score(yt,predict_t, pos_label=classes_[1]) pre_v = precision_score(yv, predict_v, pos_label=classes_[1]) tpr_v = recall_score(yv, predict_v, pos_label=classes_[1]) f1_v = f1_score(yv,predict_v, pos_label=classes_[1]) tpr0_v = recall_score(yv, predict_v, pos_label=classes_[0]) tpr0_t = recall_score(yt, predict_t, pos_label=classes_[0]) else: pre_t = precision_score(yt, predict_t) tpr_t = recall_score(yt, predict_t) f1_t = f1_score(yt,predict_t) pre_v = precision_score(yv, predict_v) tpr_v = recall_score(yv, predict_v) f1_v = f1_score(yv,predict_v) tpr0_v = recall_score(yv, predict_v, pos_label=0) tpr0_t = recall_score(yt, predict_t, pos_label=0) print("{:.<27s}{:11.4f}{:15.4f}".format('Precision', pre_t, pre_v)) print("{:.<27s}{:11.4f}{:15.4f}".format('Recall (Sensitivity)', tpr_t, tpr_v)) print("{:.<27s}{:11.4f}{:15.4f}".format('Specificity', tpr0_t, tpr0_v)) print("{:.<27s}{:11.4f}{:15.4f}".format('F1-score', f1_t, f1_v)) misct_ = conf_matt[0][1]+conf_matt[1][0] miscv_ = conf_matv[0][1]+conf_matv[1][0] misct = 100*misct_/len(yt) miscv = 100*miscv_/len(yv) n_t = [conf_matt[0][0]+conf_matt[0][1], \ conf_matt[1][0]+conf_matt[1][1]] n_v = [conf_matv[0][0]+conf_matv[0][1], \ conf_matv[1][0]+conf_matv[1][1]] misc_ = [[0,0], [0,0]] misc_[0][0] = 100*conf_matt[0][1]/n_t[0] misc_[0][1] = 100*conf_matt[1][0]/n_t[1] misc_[1][0] = 100*conf_matv[0][1]/n_v[0] misc_[1][1] = 100*conf_matv[1][0]/n_v[1] print("{:.<27s}{:11d}{:15d}".format(\ 'Total Misclassifications', misct_, miscv_)) print("{:.<27s}{:10.1f}{:s}{:14.1f}{:s}".format(\ 'MISC (Misclassification)', misct, '%', miscv, '%')) for i in range(2): print("{:s}{:.<16s}{:>10.1f}{:<1s}{:>14.1f}{:<1s}".format( ' class ', classes_[i], misc_[0][i], '%', misc_[1][i], '%')) print("\n\nTraining Class Class") print("{:<21s}{:>10s}{:>10s}".format("Confusion Matrix", classes_[0], classes_[1]) ) for i in range(2): print("{:6s}{:.<15s}".format('Class ', classes_[i]), end="") for j in range(2): print("{:>10d}".format(conf_matt[i][j]), end="") print("") print("\n\nValidation Class Class") print("{:<21s}{:>10s}{:>10s}".format("Confusion Matrix", classes_[0], classes_[1]) ) for i in range(2): print("{:6s}{:.<15s}".format('Class ', classes_[i]), end="") for j in range(2): print("{:>10d}".format(conf_matv[i][j]), end="") print("") # In the binary case, the classification report is incorrect cr = classification_report(yv, predict_v, labels=lr.classes_) print("\n",cr) else: n_classes = len(lr.classes_) if n_classes < 2: raise RuntimeError(" Call to display_nominal_split_metrics"+\ " invalid.\n"+\ " This target does not appear to be nominal."+\ " The number of target classes is less than 2.") sys.exit() predict_t = lr.predict(Xt) predict_v = lr.predict(Xv) prob_t = lr._predict_proba_lr(Xt) prob_v = lr._predict_proba_lr(Xv) conf_mat_t = confusion_matrix(y_true=yt, y_pred=predict_t) conf_mat_v = confusion_matrix(y_true=yv, y_pred=predict_v) #prob_t = lr.predict_proba(Xt) #prob_v = lr.predict_proba(Xv) ase_sumt = 0 ase_sumv = 0 mase_sumt = 0 mase_sumv = 0 misc_t = 0 misc_v = 0 misct = [] miscv = [] n_t = [] n_v = [] nt_obs = yt.shape[0] nv_obs = yv.shape[0] conf_matt = [] conf_matv = [] for i in range(n_classes): conf_matt.append(np.zeros(n_classes)) conf_matv.append(np.zeros(n_classes)) y_t = np.ravel(yt) # necessary because yt is a df with row keys y_v = np.ravel(yv) # likewise for i in range(n_classes): misct.append(0) n_t.append(0) miscv.append(0) n_v.append(0) for i in range(nt_obs): for j in range(n_classes): if y_t[i] == lr.classes_[j]: ase_sumt += (1-prob_t[i,j])*(1-prob_t[i,j]) mase_sumt += 1-prob_t[i,j] idx = j else: ase_sumt += prob_t[i,j]*prob_t[i,j] mase_sumt += prob_t[i,j] for j in range(n_classes): if predict_t[i] == lr.classes_[j]: conf_matt[idx][j] += 1 break n_t[idx] += 1 if predict_t[i] != y_t[i]: misc_t += 1 misct[idx] += 1 for i in range(nv_obs): for j in range(n_classes): if y_v[i] == lr.classes_[j]: ase_sumv += (1-prob_v[i,j])*(1-prob_v[i,j]) mase_sumv += 1-prob_v[i,j] idx = j else: ase_sumv += prob_v[i,j]*prob_v[i,j] mase_sumv += prob_v[i,j] for j in range(n_classes): if predict_v[i] == lr.classes_[j]: conf_matv[idx][j] += 1 break n_v[idx] += 1 if predict_v[i] != y_v[i]: misc_v += 1 miscv[idx] += 1 misct_ = misc_t miscv_ = misc_v misc_t = 100*misc_t/nt_obs misc_v = 100*misc_v/nv_obs aset = ase_sumt/(n_classes*nt_obs) asev = ase_sumv/(n_classes*nv_obs) maset = mase_sumt/(n_classes*nt_obs) masev = mase_sumv/(n_classes*nv_obs) for i in range(n_classes): for j in range(n_classes): if conf_mat_t[i][j] != conf_matt[i][j] or \ conf_mat_v[i][j] != conf_matv[i][j]: raise RuntimeError("***SKLEARN CONFLICT!!. ") sys.exit() print("") print("{:.<27s}{:>11s}{:>13s}".format('Model Metrics', \ 'Training', 'Validation')) print("{:.<27s}{:10d}{:11d}".format('Observations', \ Xt.shape[0], Xv.shape[0])) n_coef = len(lr.coef_)*(len(lr.coef_[0])+1) print("{:.<27s}{:10d}{:11d}".format('Coefficients', \ n_coef, n_coef)) print("{:.<27s}{:10d}{:11d}".format('DF Error', \ Xt.shape[0]-n_coef, Xt.shape[0]-n_coef)) print("{:.<27s}{:10d}{:11d}".format('Iterations', \ lr.n_iter_.max(), lr.n_iter_.max())) print("{:.<27s}{:10.4f}{:11.4f}".format( 'ASE', aset, asev)) print("{:.<27s}{:10.4f}{:11.4f}".format(\ 'Root ASE', sqrt(aset), sqrt(asev))) print("{:.<27s}{:10.4f}{:11.4f}".format( 'Mean Absolute Error', maset, masev)) acct = accuracy_score(y_t, predict_t) accv = accuracy_score(y_v, predict_v) print("{:.<27s}{:10.4f}{:11.4f}".format('Accuracy', acct, accv)) print("{:.<27s}{:10.4f}{:11.4f}".format('Precision', \ precision_score(y_t,predict_t, average='macro'), \ precision_score(y_v,predict_v, average='macro'))) print("{:.<27s}{:10.4f}{:11.4f}".format('Recall (Sensitivity)', \ recall_score(y_t,predict_t, average='macro'), \ recall_score(y_v,predict_v, average='macro'))) print("{:.<27s}{:10.4f}{:11.4f}".format('F1-score', \ f1_score(y_t,predict_t, average='macro'), \ f1_score(y_v,predict_v, average='macro'))) print("{:.<27s}{:10d}{:11d}".format(\ 'Total Misclassifications', misct_, miscv_)) print("{:.<27s}{:9.1f}{:s}{:10.1f}{:s}".format(\ 'MISC (Misclassification)', misc_t, '%', misc_v, '%')) fstr0="{:s}{:.<16s}{:>9.1f}{:<1s}{:>10.1f}{:<1s}" fstr1 = "{:>7s}{:<3.0f}" if type(lr.classes_[0]) == str: fstr2 = "{:.<15s}" else: fstr2 = "{:s}{:.<6.0f}" classes_ = [] if type(lr.classes_[0])==str: classes_ = lr.classes_ else: for i in range(n_classes): classes_.append(str(int(lr.classes_[i]))) for i in range(n_classes): misct[i] = 100*misct[i]/n_t[i] miscv[i] = 100*miscv[i]/n_v[i] print(fstr0.format(\ ' class ', classes_[i], misct[i], '%', miscv[i], '%')) print("\n\nTraining") print("Confusion Matrix ", end="") for i in range(n_classes): if type(lr.classes_[0]) == str: print(fstr1.format('Class ', i), end="") else: print(fstr1.format('Class ', lr.classes_[i]), end="") print("") for i in range(n_classes): if type(lr.classes_[0]) == str: print(fstr2.format(str(i)+" "+ lr.classes_[i]), end="") else: print(fstr2.format('Class ', lr.classes_[i]), end="") for j in range(n_classes): print("{:>10d}".format(conf_mat_t[i][j]), end="") print("") print("") ct = classification_report(yt, predict_t, labels=target_names) print("\nTraining \nMetrics:\n",ct) print("\n\nValidation") print("Confusion Matrix ", end="") for i in range(n_classes): if type(lr.classes_[0]) == str: print(fstr1.format('Class ', i), end="") else: print(fstr1.format('Class ', lr.classes_[i]), end="") print("") for i in range(n_classes): if type(lr.classes_[0]) == str: print(fstr2.format(str(i)+" "+ lr.classes_[i]), end="") else: print(fstr2.format('Class ', lr.classes_[i]), end="") for j in range(n_classes): print("{:>10d}".format(conf_mat_v[i][j]), end="") print("") print("") cv = classification_report(yv, predict_v, labels=target_names) print("\nValidation \nMetrics:\n",cv) # ********************************************************************* class stepwise(object): def __init__(self, df, yname, reg, xnames=None, \ method="stepwise", crit_in=0.1, crit_out=0.1, \ x_force=None, verbose=False, deep=True): warnings.simplefilter(action="ignore", category=FutureWarning) if reg!="linear" and reg!="logistic": raise RuntimeError("***Call to stepwise invalid. "+\ "*** Reg must be set to 'linear' or 'logistic'.") sys.exit() if type(df)!= pd.DataFrame: #raise RuntimeError("***Call to stepwise invalid. "+\ #"*** DF Not DataFrame ***") pass if df.shape[0] < 2: raise RuntimeError("***Call to stepwise invalid. "+\ "*** Required Dataframe has less the 2 observations.") if type(yname)!= str: raise RuntimeError("***Call to stepwise invalid. "+\ "*** Parameter yname not a string name in DataFrame.") sys.exit() if not(yname in df.columns): raise RuntimeError("***Call to stepwise invalid. "+\ "*** Required parameter yname not in DataFrame.") sys.exit() if reg=='logistic': yvalues = df[yname].unique() if len(yvalues) != 2: raise RuntimeError("***Call to stepwise invalid. "+\ "*** The target is not binary.") sys.exit() if type(xnames)!= type(None): if not(all(item in df.columns for item in xnames)): raise RuntimeError("***Call to stepwise invalid. "+\ "*** xnames are not all in DataFrame.") sys.exit() if method!="stepwise" and method!="forward" and method!="backward": raise RuntimeError("***Call to stepwise invalid. "+\ "*** method is invalid.") sys.exit() if type(crit_in)==str: if crit_in!="AIC" and crit_in!="BIC": raise RuntimeError("***Call to stepwise invalid. "+\ "*** crit_in is invalid.") sys.exit() else: if type(crit_in)!=float: raise RuntimeError("***Call to stepwise invalid. "+\ "*** crit_in is invalid.") sys.exit() else: if crit_in>1.0 or crit_in<0.0: raise RuntimeError("***Call to stepwise invalid. "+\ "*** crit_in is invalid.") sys.exit() if type(crit_out)==str: if crit_out!="AIC" and crit_out!="BIC": raise RuntimeError("***Call to stepwise invalid. "+\ "*** crit_out is invalid.") sys.exit() else: if type(crit_out)!=float: raise RuntimeError("***Call to stepwise invalid. "+\ "*** crit_out is invalid.") sys.exit() else: if crit_out>1.0 or crit_out<0: raise RuntimeError("***Call to stepwise invalid. "+\ "*** crit_out is invalid.") sys.exit() if type(x_force)!=type(None) and \ not(all(item in df.columns for item in x_force)): raise RuntimeError("***Call to stepwise invalid. "+\ "*** x_force is invalid.") sys.exit() if deep==True: self.df_copy = deepcopy(df) else: self.df_copy = df # string - column name in df for y self.yname = yname # None or string = list of column names in df for X var. if type(xnames)!= type(None): self.xnames = xnames # list of strings (col names) else: self.xnames = list(set(df.columns)-set([yname])) # string - "stepwise", "backward" or "forward" self.method = method # string # string - "linear" or "logistic" self.reg = reg # string # string = "AIC" or "BIC", or p=[0,1] if type(crit_in)==str or type(crit_out)==str: warnings.warn("\n***Call to stepwise invalid: "+ \ " crit_in and crit_out must be a number between 0 and 1.") self.crit_in = 0.1 self.crit_out = 0.1 else: self.crit_in = crit_in # float self.crit_out = crit_out # float # [] of string = list of column names in df forced into model if type(x_force)!= type(None): self.x_force = x_force # list of strings (col names) else: self.x_force = [] # True or False, control display of steps selected self.verbose = verbose # initialized list of selected columns in df self.selected_ = [] return # ************************************************************************* def stepwise_(self): """ Linear Regression Stepwise Selection Author: Mahitha RAJENDRAN THANGADURAI """ initial_list = [] included = initial_list if self.crit_out self.crit_out: worst_feature = pvalues.idxmax() included.remove(worst_feature) changed=True if self.verbose: print('Remove {:30} with p-value {:.6}'. format(worst_feature,worst_pval)) if not changed: break return included # ************************************************************************** def forward_(self): """ Linear Regression Forward Stepwise Selection Author: SHAOFANG """ initial_list = [] included = list(initial_list) X = self.df_copy[self.xnames] y = self.df_copy[self.yname] warnings.filterwarnings("once", category=UserWarning) while True: changed=False excluded = list(set(X.columns)-set(included)) new_pval = pd.Series(index=excluded) if self.reg=="linear": for new_column in excluded: model = sm.OLS(y, \ sm.add_constant(pd.DataFrame(\ X[included+[new_column]]))) results = model.fit(disp=False) new_pval[new_column] = results.pvalues.loc[new_column] else: for new_column in excluded: Xc = sm.add_constant(pd.DataFrame(X[included+[new_column]])) model = sm.Logit(y, Xc) results = model.fit(disp=False) new_pval[new_column] = results.pvalues.loc[new_column] best_pval = new_pval.min() if best_pval < self.crit_in: best_feature = new_pval.idxmin() included.append(best_feature) changed=True if self.verbose: print('Add {:30} with p-value {:.6}'.format(best_feature,\ best_pval)) if not changed: break return included # ************************************************************************** def backward_(self): """ Linear Regression Backkward Stepwise Selection Author: Tara Gaddis """ included = list(self.xnames) X = self.df_copy[included] y = self.df_copy[self.yname] warnings.filterwarnings("once", category=UserWarning) while True: changed=False new_pval = pd.Series(index=included) if self.reg=="linear": model = sm.OLS(y, sm.add_constant(\ pd.DataFrame(X[included]))).fit() else: model = sm.Logit(y, sm.add_constant(\ pd.DataFrame(X[included]))).fit(disp=False) for new_column in included: new_pval[new_column] = model.pvalues.loc[new_column] worst_pval = new_pval.max() if worst_pval > self.crit_out: worst_feature = new_pval.idxmax() included.remove(worst_feature) changed=True if self.verbose: print('Remove {:30} with p-value {:.6}'.\ format(worst_feature, worst_pval)) if not changed: break return included # ************************************************************************** def fit_transform(self): if self.method=="stepwise": self.selected_ = self.stepwise_() else: if self.method=="forward": self.selected_ = self.forward_() else: self.selected_ = self.backward_() warnings.filterwarnings("always", category=UserWarning) return self.selected_ # **************************************************************************