import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification, make_regression
from sklearn.model_selection import KFold
from sklearn.metrics import roc_auc_score, r2_score
from sklearn.base import RegressorMixin, ClassifierMixin
from scipy.stats import mode
Notes for myself about decision trees and how to implement them from scratch.
1. Introduction
Decision trees are a non-parametric supervised machine learning algorithm. They can be used both for classification and for regression tasks. Decision trees are essentially a data structure that represents a given set of data: each node of the tree represents a particular partition of the dataset, further categorizing the samples into subtrees until some convergence metric is met.
1.1. Fitting decision trees
As previously stated, decision trees split the data in a way that some metric must be optimized. The question that arises now is “How do we find the best possible split?”.
Decision trees work by evaluating all possible splits (feature splits and point splits) and selecting the best according to a metric. For classification, we want the nodes to be as “pure” as possible. Here, the term “pure” means having a node completely composed of data that belongs to the same class.
In classification problems, information gain, computed either Gini or Entropy, is used. In regression, Mean Squeared Error is a good metric to use.
1.1.1. Information gain
Using Gini impurity or entropy alone would only measure the impurity of the current state of the data. This doesn’t tell us whether splitting on a particular feature is beneficial. To decide which feature to split on, we must evaluate how much improvement (reduction in impurity) a split provides. This improvement is quantified by information gain.
Information gain is the amount of information gained about a random variable or signal from observing another random variable. It tells us how important a given attribute of the feature vectors is. It is defined as
\[ IG = I(\text{parent}) - \sum_{i=1}^{k} \dfrac{N_i}{N} I(\text{child}_{i}). \]
- \(I\) is the impurity metric: entropy or gini
parent
is the parent node.child
is the child node.- \(N\) is the total number of samples on the
parent
node. - \(N_{i}\) is the number of samples in the \(i\)-th child node.
1.1.2. Gini impurity index
Gini measures how often a randomly chosen element of a set would be incorrectly labeled if it were labeled randomly and independently according to the distribution of labels in the set.
\[ I_{G} = 1 - \sum_{i=1}^{n} p_{i}^2, \]
where \(p_{i}\) is the probability of the class \(i\) in the dataset.
1.1.3. Entropy
Entropy is a measure somewhat contrary to probability. An event with 100% change of occurring would have entropy 0, since it does not add any information. It is “expected”. An event with 0% chance of occurring would have infinite entropy, since it is completely “unexpected”.
\[ I_{H} = -\sum_{i=1}^{n} p_{i} \cdot log(p_{i}), \]
where \(p_{i}\) is the probability of the class \(i\) in the dataset.
1.2. Recursive splitting
After selecting a metric we must generate all possible splits for each feature. We can traverse all the features in the matrix \(X\) of independent variables and sort each column. We then compute the middlepoint between any two adjacent values to obtain the split thesholds (we can also traverse the column directly).
For each threshold we compute the information gain and select the best split. The process is repeated recursively until a stopping criterio is met.
1.3. Regression trees
If we want to generate a tree for regression tasks we consider variance reduction
instead of information gain
. Regression trees measure how much the variance (or spread) of the target values is reduced by splitting the data at a particular threshold. The variance reduction is defined as
\[ \text{Variance reduction} = \text{Var}(y_{\text{parent}}) - ( \omega_{\text{left}} \cdot \text{Var}(y_{\text{left}}) + \omega_{\text{right}} \cdot \text{Var}(y_{\text{right}}) ). \]
Here,
- \(\text{Var(y)}\) is the variance of the target variable.
- \(\omega_{\text{left}}\) and \(\omega_{\text{right}}\) are the weights for the left and right child nodes based on the proportion of samples.
2. Code
We first develop the code that implements Gini and Entropy formulas, for which we need a helper function to compute the class probability.
def get_class_probabilities(array):
"""Compute class probabilities.
The probability is computed by counting the number of favorable outcomes
divided by the numberr of outcomes.
Parameters
----------
array : numpy.ndarray
Array containing the labels.
Returns
-------
probabilities : numpy.ndarray
Array containing probabilities for each class.
"""
= np.unique(array, return_counts=True)
_, label_counts = label_counts / label_counts.sum()
probabilities return probabilities
def gini_impurity_index(array):
"""Compute gini impurity index of a given array.
Parameters
----------
array : numpy.ndarray
Array to get the Gini index from.
Returns
-------
gini : float
Gini impurity index value.
"""
= get_class_probabilities(array)
probabilities = 1 - (probabilities ** 2).sum()
gini return gini
def entropy(array):
"""Compute Entropy of a given array.
Parameters
----------
array : numpy.ndarray
Array to get the Entropy from.
Returns
-------
h : float
Entropy value.
"""
= get_class_probabilities(array)
probabilities = (-probabilities * np.log(probabilities)).sum()
h return h
Notice that both metrics penalize heterogeneity
= np.array([1, 0])
het = np.array([1, 1]) hom
For a sequence composed of different numbers the gini and entropy are greater that for squences composed of the same number.
gini_impurity_index(het), gini_impurity_index(hom)
(0.5, 0.0)
entropy(het), entropy(hom)
(0.6931471805599453, 0.0)
We also need an extra helper function to compute the middle points between all adjacent values of a given array:
def get_avg_between_adjacent_values(array):
"""Get adjacent values.
Computes the adjacent values of any 2 adjacent values of the array. See
example section for clarification
Parameters
----------
array : numpy.ndarray
Array to get middle points from.
Returns
-------
middle_points : numpy.ndarray
Middle points between all adjacent values of the array.
Example
-------
>>> import numpy as np
>>> vals = np.array([1, 2, 5, 9])
>>> get_adjacent_values(vals)
array([1.5, 3.5, 7. ])
"""
= np.roll(array, shift=1)
roll = roll + (array - roll) / 2
middle_poins
# we aren't interested in computing the middle point between last and first
= middle_poins[1:]
middle_poins return middle_poins
As with previous posts, the code structure follows the Scikit-Learn API convention of Fit-Predict. There is a Node class that registers split information and a base tree class that is laterr extended to create classification and regression trees.
class Node:
"""Decision tree class.
Parameters
----------
split_threshold : float, optional, default=None
feature_name : str, optional, default=None
left_child : Node, optional, default=None
right_child : Node, optional, default=None
is_leaf : bool, optional, default=None
node_value : float, optional, default=None
"""
__slots__ = [
"split_threshold",
"feature_name",
"is_leaf",
"left_child",
"right_child",
"node_value"
]
def __init__(
self,
=None,
split_threshold=None,
feature_name=None,
left_child=None,
right_child=None,
is_leaf=None,
node_value
):self.split_threshold = split_threshold
self.feature_name = feature_name
self.is_leaf = is_leaf
self.left_child = left_child
self.right_child = right_child
self.node_value = node_value
The base class contains all the important methods. We only need to extend it and set the get_node_value
method, used to compute the predicted value for a sample, and the criterion_function
used in the information gain computation.
class DecisionTreeBase:
"""Decision tree base class.
To build a decision tree, both for regression and classification, this
class must be extended.
Parameters
----------
criterion_function : callable
Criterion function used to evaluate a split.
is_classifier : bool
True for classification trees. False otherwise.
min_samples_per_split : int, optional, default=2
Minimum number of samples per split.
max_depth : int, optional, default=100
Maximum tree depth.
"""
def __init__(
self,
criterion_function,
is_classifier,=2,
min_samples_per_split=100,
max_depth
):self.criterion_function = criterion_function
self.is_classifier = is_classifier
self.min_samples_per_split = min_samples_per_split
self.max_depth = max_depth
self.tree = None
def fit(self, X, y):
"""Build a decision tree classifier from the training set (X, y).
Parameters
----------
X : numpy.ndarray
Matrix of features.
y : numpy.ndarray
The target values.
"""
= self.build_tree(X, y)
tree self.tree = tree
def predict(self, X):
"""Predict class or regression value for X.
Parameters
----------
X : numpy.ndarray
Input samples.
Returns
-------
y : numpy.ndarray
Predictions for the given samples.
"""
return np.array([self.traverse_tree(x, self.tree) for x in X]).flatten()
def traverse_tree(self, x, node: Node):
if node.node_value is not None:
return node.node_value
if x[node.feature_name] <= node.split_threshold:
return self.traverse_tree(x, node.left_child)
return self.traverse_tree(x, node.right_child)
def build_tree(self, X, y, depth=0):
"""Build a tree by recursively splitting on different thresholds and
features.
Parameters
----------
X : numpy.ndarray
Matrix of features.
y : numpy.ndarray
The target values.
depth : int, optional, default=0
Depth of the tree.
Returns
-------
Node
Root node.
"""
= X.shape
n_samples, n_features
# check if we have reached any stopping criteria
= depth >= self.max_depth
depth_crit = n_samples < self.min_samples_per_split
min_samples_crit = len(np.unique(y)) == 1
n_labels_cond
if depth_crit or min_samples_crit or n_labels_cond:
= self.get_node_value(y)
val return Node(is_leaf=True, node_value=val)
# define initial values for the threhold and feature
= -1
best_gain = None
best_feat_col = None
best_split_th
for int_feat_col in range(n_features):
= X[:, int_feat_col]
col_i
# sort values and compute split thresholds
= np.sort(col_i)
sorted_col_i = np.unique(
split_thresholds
get_avg_between_adjacent_values(sorted_col_i)
)
# now we traverse each split and select the best one
for split_th in split_thresholds:
# perform the split based on the threshold
= self.split(X[:, int_feat_col], split_th)
left_idx, right_idx if len(left_idx) == 0 or len(right_idx) == 0:
continue
# compute information gain
if self.is_classifier:
= self.information_gain(y, y[left_idx], y[right_idx])
gain else:
# is not a gain, but we don't change the name to avoid
# multiple names
= self.variance_reduction(y, y[left_idx], y[right_idx])
gain
if gain > best_gain:
= gain
best_gain = split_th
best_split_th = int_feat_col
best_feat_col
if best_gain == -1:
= self.get_node_value(y)
leaf return Node(node_value=leaf)
# create subtrees
= self.split(X[:, best_feat_col], best_split_th)
left_idx, right_idx
# recursively evaluate the subtrees
= self.build_tree(X[left_idx, :], y[left_idx], depth + 1)
left_subtree = self.build_tree(X[right_idx, :], y[right_idx], depth + 1)
right_subtree
return Node(
=best_feat_col,
feature_name=best_split_th,
split_threshold=left_subtree,
left_child=right_subtree,
right_child
)
def split(self, feature_column, threshold):
= np.where(feature_column <= threshold)[0]
left_idx = np.where(feature_column > threshold)[0]
right_idx return left_idx, right_idx
def information_gain(self, parent, left, right):
= self.criterion_function
criterion_function
= len(left) / len(parent)
weight_left = len(right) / len(parent)
weight_right = criterion_function(parent) - (
info_gain * criterion_function(left) +
weight_left * criterion_function(right)
weight_right
)return info_gain
def variance_reduction(self, parent, left, right):
= len(left) / len(parent)
weight_left = len(right) / len(parent)
weight_right return np.var(parent) - (weight_left * np.var(left) + weight_right * np.var(right))
def get_node_value(self, array):
raise NotImplementedError("Subclasses must override this method")
2.1. Classification trees
Classification trees use the information gain on gini or entropy to compute the impurity. The mode function is usually used to get the most common label.
class DecisionTreeClassifier(DecisionTreeBase, ClassifierMixin):
"""Decision tree classifier.
Parameters
----------
criterion : {“gini”, “entropy”}, default="gini"
Criterion used to compute the information gain.
min_samples_per_split : int, optional, default=2
Minimum samples per split.
max_depth : int, optioanl, default=100
Maximum depth of the tree.
"""
def __init__(
self, criterion="gini", min_samples_per_split=2, max_depth=100
):
if criterion == "gini":
= gini_impurity_index
criterion_function elif criterion == "entropy":
= entropy
criterion_function else:
raise ValueError(f"Unsupported criterion function: {criterion}")
super().__init__(
=criterion_function,
criterion_function=min_samples_per_split,
min_samples_per_split=max_depth,
max_depth=True
is_classifier
)
def get_node_value(self, array):
return mode(array).mode
We generate a synthetic classification toy dataset to test our model.
= make_classification(n_samples=500, n_features=2, n_informative=2, n_redundant=0, n_classes=2, random_state=10)
X, y
= plt.subplots(1, 1, figsize=(12, 6))
fig, ax
0], X[:, 1], c=y)
ax.scatter(X[:, True, alpha=.3)
ax.grid("Dataset", fontsize=24)
ax.set_title("Feature $x_{1}$", fontsize=20)
ax.set_xlabel("Feature $x_{2}$", fontsize=20)
ax.set_ylabel(=20) ax.tick_params(labelsize
= KFold(n_splits=5)
kf
= []
roc_scores for train_idx, test_idx in kf.split(X):
= DecisionTreeClassifier(criterion="entropy")
model
= X[train_idx]
X_train = y[train_idx]
y_train
= X[test_idx]
X_test = y[test_idx]
y_test
model.fit(X_train, y_train)= model.predict(X_test)
y_pred
= roc_auc_score(y_true=y_test, y_score=y_pred)
score roc_scores.append(score)
We achieve a really good area under the curve for this particular synthetic dataset.
np.array(roc_scores).mean()
0.9460805262132187
2.2. Regression trees
Regression tree implemented in this post uses the variance reduction criterion to find the optimal split.
class DecisionTreeRegressor(DecisionTreeBase, RegressorMixin):
"""Decision tree regressor.
This decision trees used variance reduction to find the optimal split.
Parameters
----------
min_samples_per_split : int, optional, default=2
Minimum samples per split.
max_depth : int, optioanl, default=100
Maximum depth of the tree.
"""
def __init__(
self,
=2,
min_samples_per_split=100
max_depth
):super().__init__(
=None,
criterion_function=min_samples_per_split,
min_samples_per_split=max_depth,
max_depth=False
is_classifier
)
def get_node_value(self, array):
return np.mean(array)
We generate an easy toy dataset using Scikit-Learn’s make_regression
function to test our model.
= make_regression(n_samples=500, n_features=1, n_informative=1, n_targets=1, noise=10, random_state=0)
X, y
= plt.subplots(1, 1, figsize=(12, 6))
fig, ax
ax.scatter(X, y)True, alpha=.3)
ax.grid("Dataset", fontsize=24)
ax.set_title("Feature $x_{1}$", fontsize=20)
ax.set_xlabel("Label", fontsize=20)
ax.set_ylabel(=20) ax.tick_params(labelsize
= KFold(n_splits=5)
kf
= []
r2_scores for train_idx, test_idx in kf.split(X):
= DecisionTreeRegressor()
model
= X[train_idx]
X_train = y[train_idx]
y_train
= X[test_idx]
X_test = y[test_idx]
y_test
model.fit(X_train, y_train)= model.predict(X_test)
y_pred
= r2_score(y_true=y_test, y_pred=y_pred)
score r2_scores.append(score)
R2 score is very good considering the synthetic generated toy dataset.
np.array(r2_scores).mean()
0.9075705029040689
= plt.subplots(1, 1, figsize=(12, 6))
fig, ax
ax.scatter(X_test, y_test)
ax.scatter(X_test, y_pred)True, alpha=.3)
ax.grid(
"Test data", "Predictions"], fontsize=20)
ax.legend(["Dataset", fontsize=24)
ax.set_title("Feature $x_{1}$", fontsize=20)
ax.set_xlabel("Label", fontsize=20)
ax.set_ylabel(=20) ax.tick_params(labelsize
3. Strengths and Weaknesses
A summary of strenghts and weaknesses of this model.
Pros
- Easy to interpret.
- Robust to outliers.
- Can deal by default with non-linear.
Cons
- Can overfit easily to the data.
- High variance model.
- Computationally expensive by its recursive nature.