import numpy as np class Node(): def __init__(self, feature_index=None, threshold=None, left=None, right=None, var_red=None, value=None): self.feature_index = feature_index self.threshold = threshold self.left = left self.right = right self.var_red = var_red self.value = value class DecisionTreeRegressor(): def __init__(self, min_samples_split, max_depth): self.root = None self.min_samples_split = min_samples_split self.max_depth = max_depth def build_tree(self, dataset, curr_depth=0): X, Y = dataset[:, :-1], dataset[:, -1] num_samples, num_features = np.shape(X) best_split = {} if num_samples >= self.min_samples_split and curr_depth <= self.max_depth: best_split = self.get_best_split(dataset, num_samples, num_features) if "var_red" in best_split and best_split["var_red"] > 0: left_subtree = self.build_tree(best_split["dataset_left"], curr_depth + 1) right_subtree = self.build_tree(best_split["dataset_right"], curr_depth + 1) return Node(best_split["feature_index"], best_split["threshold"], left_subtree, right_subtree, best_split["var_red"]) leaf_value = self.calculate_leaf_value(Y) return Node(value=leaf_value) def variance_reduction(self, parent, l_child, r_child): weight_l = len(l_child) / len(parent) weight_r = len(r_child) / len(parent) reduction = np.var(parent) - (weight_l * np.var(l_child) + weight_r * np.var(r_child)) return reduction def calculate_leaf_value(self, Y): val = np.mean(Y) return val def split(self, dataset, feature_index, threshold): dataset_left = np.array([row for row in dataset if row[feature_index] <= threshold]) dataset_right = np.array([row for row in dataset if row[feature_index] > threshold]) return dataset_left, dataset_right def get_best_split(self, dataset, num_samples, num_features): best_split = {} max_var_red = -float("inf") for feature_index in range(num_features): feature_values = dataset[:, feature_index] possible_thresholds = np.unique(feature_values) for threshold in possible_thresholds: dataset_left, dataset_right = self.split(dataset, feature_index, threshold) if len(dataset_left) > 0 and len(dataset_right) > 0: y, left_y, right_y = dataset[:, -1], dataset_left[:, -1], dataset_right[:, -1] curr_var_red = self.variance_reduction(y, left_y, right_y) if curr_var_red > max_var_red: best_split["feature_index"] = feature_index best_split["threshold"] = threshold best_split["dataset_left"] = dataset_left best_split["dataset_right"] = dataset_right best_split["var_red"] = curr_var_red max_var_red = curr_var_red return best_split def print_tree(self, tree=None, indent=" "): if not tree: tree = self.root if tree.value is not None: print(tree.value) else: print("X_" + str(tree.feature_index), "<=", tree.threshold, "?", tree.var_red) print("%sleft:" % (indent), end="") self.print_tree(tree.left, indent + indent) print("%sright:" % (indent), end="") self.print_tree(tree.right, indent + indent) def fit(self, X, Y, min_samples_split=None, max_depth=None): if min_samples_split is not None: self.min_samples_split = min_samples_split if max_depth is not None: self.max_depth = max_depth dataset = np.column_stack((X, Y)) self.root = self.build_tree(dataset) def make_prediction(self, x, tree): if tree.value != None: return tree.value feature_val = x[tree.feature_index] if feature_val <= tree.threshold: return self.make_prediction(x, tree.left) else: return self.make_prediction(x, tree.right) def predict(self, X): predictions = [self.make_prediction(x, self.root) for x in X] return predictions