dr_jones / Decision_Tree /BinaryDecisionTree_Template.py
anly656's picture
Upload BinaryDecisionTree_Template.py
572987c verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Updated on Oct 2, 2025
@purpose: Decision Tree Example for Interval Targets
@data: Fracking Oil Production in Texas, n=4752 with 13 features (2 Nominal)
@author: eJones
@email: eJones@tamu.edu
"""
# ANSI color codes - to print in color, the package colorama must be installed
RED = "\033[38;5;197m"
GOLD = "\033[38;5;185m"
TEAL = "\033[38;5;50m"
GREEN = "\033[38;5;82m"
RESET = "\033[0m"
import pandas as pd
import numpy as np
from AdvancedAnalytics.ReplaceImputeEncode import DT, ReplaceImputeEncode
from AdvancedAnalytics.Tree import tree_regressor
from sklearn.tree import DecisionTreeRegressor
from sklearn.model_selection import train_test_split, cross_validate
from sklearn.metrics import mean_squared_error
from copy import deepcopy
data_map = {
"Log_Cum_Production": [DT.Interval, (8, 15)],
"Log_Proppant_LB": [DT.Interval, (6, 18)],
"Log_Carbonate": [DT.Interval, (-4, 4)],
"Log_Frac_Fluid_GL": [DT.Interval, (7, 18)],
"Log_GrossPerforatedInterval": [DT.Interval, (4, 9)],
"Log_LowerPerforation_xy": [DT.Interval, (8, 10)],
"Log_UpperPerforation_xy": [DT.Interval, (8, 10)],
"Log_TotalDepth": [DT.Interval, (8, 10)],
"N_Stages": [DT.Interval, (2, 14)],
"X_Well": [DT.Interval, (-100, -95)],
"Y_Well": [DT.Interval, (30, 35)],
"Operator": [DT.Nominal, tuple(range(1, 29))],
"County": [DT.Nominal, tuple(range(1, 15))]
}
def print_boundary(lbl):
b_width = 60
print("")
margin = b_width - len(lbl) - 2
lmargin = int(margin/2)
rmargin = lmargin
if lmargin+rmargin < margin:
lmargin += 1
print(f"{TEAL}", "="*b_width, f"{RESET}")
print(f"{GREEN}", lmargin*"*", lbl, rmargin*"*"+f"{RESET}")
print(f"{TEAL}", "="*b_width, f"{RESET}")
print(f"{GOLD}")
print(15*"=", "DATA MAP", 15*"=")
lk = len(max(data_map, key=len)) + 1
ignored = 0
for col, (dt_type, valid_values) in data_map.items():
if dt_type.name == "ID" or dt_type.name=="Ignore":
ignored += 1
print(f" {TEAL}{col:.<{lk}s} {GOLD}{dt_type.name:9s}{GREEN}{valid_values}")
print(f"{GOLD} === Data Map has{RED}", len(data_map)-ignored,
f"{GOLD}attribute columns", 3*"=",f"{RESET}")
lbl = "Step 1: Read Data"
print_boundary(lbl)
""" READ OIL PRODUCTION FILE USING PANDAS """
df = pd.read_csv("../data/OilProduction.csv")
print("Read", df.shape[0], "observations with", df.shape[1], "attributes\n")
lbl = "Step 2: ReplaceImputeEncode (RIE) Processing"
print_boundary(lbl)
target = "Log_Cum_Production"
print(f"{GOLD}")
# Apply ReplaceImputeEncode preprocessing
rie = ReplaceImputeEncode(data_map=data_map,
interval_scale=None, # No standardization of interval features
no_impute=[target], # Do not impute target variable
binary_encoding="one-hot",
nominal_encoding="one-hot",
drop=False, # Drop one column from each encoded nominal set
display=True)
# Transform the data
encoded_df = rie.fit_transform(df)
# Create version without dropped columns for stepwise analysis
rie = ReplaceImputeEncode(data_map=data_map,
interval_scale=None,
no_impute=[target],
binary_encoding="one-hot",
nominal_encoding="one-hot",
drop=True, # Keep all columns for stepwise
display=False)
encoded_drp_df = rie.fit_transform(df)
print(f"{RESET}")
print(f"\n{RED}encoded_drp_df{RESET}:",
f"{encoded_drp_df.shape[0]} cases and",
f"{encoded_drp_df.shape[1]} columns,\n",
" including targets, excludes last one-hot columns.")
print(f"\n{RED}encoded_df {RESET}:",
f"{encoded_df.shape[0]} cases and",
f"{encoded_df.shape[1]} columns,\n",
" including targets.")
print(f"{RESET}")
#***************************************************************************
#**************** All Features Logistic Regression *************************
lbl = " STEP 3: Decision Tree Hyperparameter Optimization"
print_boundary(lbl)
y = encoded_df[target]
X = encoded_df.drop(target, axis=1)
# Dynamic Hyperparameter Ranges based on Dr. Jones' Rule of Thumb
# 1. Min Samples Leaf = 0.5% of N
# 2. Max Depth = 3 to K (number of predictors)
N = X.shape[0]
K = X.shape[1]
# Min Samples Leaf (0.5% of N)
min_leaf_base = int(max(1, N * 0.005))
candidate_leafs = [min_leaf_base, min_leaf_base*2, min_leaf_base*5]
# Ensure we don't have duplicates or ridiculously large values if N is small
candidate_leafs = sorted(list(set(candidate_leafs)))
# Candidate Depths (3 to K)
# We pick a reasonable spread of values between 3 and K
if K > 3:
step = max(1, (K - 3) // 8) # Aim for roughly 8 steps
candidate_depths = list(range(3, K + 1, step))
# Ensure K is included if not reached by step
if candidate_depths[-1] != K:
candidate_depths.append(K)
# Add None for unlimited depth (though risky, sometimes useful to see)
candidate_depths.append(None)
else:
candidate_depths = [2, 3, None]
best_metric = np.inf
metric = 'neg_mean_squared_error' # In Sklearn this is -ASE
n = X.shape[0]
Xt, Xv, yt, yv = train_test_split(X, y, train_size=0.7, random_state=31415)
""" Hyperparameter Optimization """
for depth in candidate_depths:
for leaf in candidate_leafs:
split = 2*leaf
dt = DecisionTreeRegressor(max_depth=depth,
min_samples_split=split,
min_samples_leaf=leaf,
random_state=31415)
dt = dt.fit(Xt,yt)
train_pred = dt.predict(Xt)
train_ase = mean_squared_error(yt, train_pred)
val_pred = dt.predict(Xv)
val_ase = mean_squared_error(yv, val_pred)
ratio = val_ase/train_ase
if ratio >= 1.2:
color = RED
else:
color = TEAL
print(f"{TEAL}")
print("Maximum Depth=", f"{GOLD}{depth}{TEAL}",
"Min Leaf Size=", f"{GOLD}{leaf}{TEAL}")
print(f"Train ASE:{train_ase:7.4f} Validation ASE:{RED}{val_ase:7.4f}",
f"{TEAL}Ratio:{color}{ratio:7.4f}{RESET}")
if val_ase < best_metric:
best_metric = val_ase
best_depth = depth
best_leaf = leaf
best_ratio = ratio
best_tree = deepcopy(dt)
print(f"{GOLD}")
tree_regressor.display_split_metrics(best_tree, Xt, yt, Xv, yv)
if best_ratio >= 1.2:
color = RED
else:
color = TEAL
print(f"\nOverfitting Ratio Val_ase/Train_ase: {color}{best_ratio:7.4f}{TEAL}")
tree_regressor.display_importance(best_tree, X.columns, top=10, plot=True)
""" Validation using K-Fold Cross-Validation """
lbl = " STEP 4: Decision Tree K-Fold Cross Validation"
print_boundary(lbl)
best_metric = np.inf
for k in range(2, 11):
best_split = 2*best_leaf
dt = DecisionTreeRegressor(max_depth=best_depth,
min_samples_split=best_split,
min_samples_leaf=best_leaf,
random_state=31415)
scores = cross_validate(dt, X, y,
scoring=metric,
cv=k, return_train_score=True )
print(f"\n{GOLD}Decision Tree K-Fold CV with K={k}")
print("{:.<18s}{:>6s}{:>13s}".format("Metric", "Mean", "Std. Dev."))
var = "test_score"
mean = -scores["test_score"].mean()
std = scores["test_score"].std()
print("{:.<18s}{:>7.4f}{:>10.4f}".format("ASE", mean, std))
if mean<best_metric:
best_fold = k
best_metric = mean
best_std = std
train_mean = -scores["train_score"].mean()
train_std = scores["train_score"].std()
best_ratio = best_metric/train_mean
best_tree = deepcopy(dt)
print(f"{TEAL}")
if best_ratio >= 1.2:
color = RED
else:
color = TEAL
print("Maximum Depth=", f"{GOLD}{best_depth}{TEAL}",
"Min Leaf Size=", f"{GOLD}{best_leaf}{TEAL}",
"Best Fold=", f"{GOLD}{best_fold}{TEAL}")
print(f"Train ASE:{train_ase:7.4f} Validation ASE:{RED}{val_ase:7.4f}",
f"{TEAL}Ratio:{color}{ratio:7.4f}{RESET}")
dt = DecisionTreeRegressor(max_depth=best_depth,
min_samples_leaf=best_leaf,
min_samples_split=2*best_leaf,
random_state=31415)
dt = dt.fit(X,y)
print(f"{GOLD}")
tree_regressor.display_metrics(dt, X, y)
tree_regressor.display_importance(dt, X.columns, top=10)
print(f"{RESET}")