【机器学习】决策树算法实现
文章目录数据准备ID3算法C4.5算法总结参考数据准备决策树是一种基本的分类与回归算法,因此使用的仍是原版十类的手写识别数据集;然而在算法中需要计算条件概率和相应的条件经验熵,为了简化条件概率形式且减少计算量(主要还是防止条件概率过小,导致后面连乘的时候出错),对输入特征进行二值化。这部分还是在代码中完成,就不提前做成新的数据集了。这里主要实现决策树生成算法,包含:使用信息增益作为...
数据准备
决策树是一种基本的分类与回归算法,因此使用的仍是原版十类的手写识别数据集;
然而在算法中需要计算条件概率和相应的条件经验熵,为了简化条件概率形式且减少计算量(主要还是防止条件概率过小,导致后面连乘的时候出错),对输入特征进行二值化。这部分还是在代码中完成,就不提前做成新的数据集了。
这里主要实现决策树生成算法,包含:
- 使用信息增益作为特征选择的ID3算法
- 使用信息增益比作为特征选择的C4.5算法
两种算法除了在节点分裂时选择的特征评价指标不同,在算法流程上完全相同;因此实现在同一份代码当中。
另外上述两种决策树的剪枝算法,以及使用平方误差最小化作为特征的CART回归树和使用基尼系数作为特征选择的CART分类树的生成、剪枝过程,暂时就不实现了。
ID3算法
C4.5算法
两种决策树的生成算法上面列的都比较清楚了,实现算法的代码如下:
# @Author: phd
# @Date: 2019/8/5
# @Site: github.com/phdsky
# @Description: NULL
import numpy as np
import pandas as pd
import math
import time
import logging
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import Binarizer
def log(func):
def wrapper(*args, **kwargs):
start_time = time.time()
ret = func(*args, **kwargs)
end_time = time.time()
logging.debug('%s() cost %s seconds' % (func.__name__, end_time - start_time))
return ret
return wrapper
def calc_accuracy(y_pred, y_truth):
assert len(y_pred) == len(y_truth)
n = len(y_pred)
hit_count = 0
for i in range(0, n):
if y_pred[i] == y_truth[i]:
hit_count += 1
print("Predicting accuracy %f" % (hit_count / n))
def empire_entropy(y_dict):
entropy = 0
y_sum = sum(y_dict.values())
for (k, v) in y_dict.items():
part = v / y_sum
entropy += part*np.log2(part) # math.log2(part)
return -entropy
class TreeNode(object):
def __init__(self, type=None, belong=None, index=None, subtree=None):
self.type = type # Internal or Leaf node type
self.belong = belong # Leaf: Belong to which class
self.index = index # Internal: Feature index
self.subtree = subtree # Internal: Subtree dict
class DecisionTree(object):
def __init__(self, algorithm, epsilon):
self.algorithm = algorithm
self.epsilon = epsilon
self.root = None
@log
def train(self, X_train, y_train):
feature_indices = list(range(0, len(X_train[0])))
self.root = self.build(X_train, y_train, feature_indices)
def build(self, X_set, y_set, indices):
assert(len(X_set) == len(y_set))
set_length = len(y_set)
y_dict = {}
for i in range(0, set_length):
if y_set[i] in y_dict.keys():
y_dict[y_set[i]] += 1
else:
y_dict[y_set[i]] = 1
# Step 1: If all samples belongs to one class, return the node
if len(y_dict) == 1:
return TreeNode(type='leaf', belong=y_set[0])
# Step 2: If indices is empty, vote for the max class node
if len(indices) == 0:
return TreeNode(type='leaf', belong=sorted(y_dict, key=lambda x: y_dict[x])[-1])
# Step 3: Calculate the information gain of all the feature indices
HD = empire_entropy(y_dict)
HD_A = []
H_A_D = []
for index in indices:
conditional_dict = {}
for i in range(0, set_length):
if X_set[i][index] in conditional_dict.keys():
if y_set[i] in conditional_dict[X_set[i][index]].keys():
conditional_dict[X_set[i][index]][y_set[i]] += 1
else:
conditional_dict[X_set[i][index]][y_set[i]] = 1
else:
conditional_dict[X_set[i][index]] = dict()
conditional_dict[X_set[i][index]][y_set[i]] = 1
conditional_empire_entropy = 0
feature_empire_entropy = 0
for key in conditional_dict.keys():
cond_dict = conditional_dict[key]
conditional = sum(cond_dict.values()) / set_length
conditional_empire_entropy += conditional * empire_entropy(cond_dict)
feature_empire_entropy -= conditional * np.log2(conditional)
HD_A.append(conditional_empire_entropy)
H_A_D.append(feature_empire_entropy)
g_HD_A = [HD - hd_a for hd_a in HD_A]
g_r_HD_A = []
for a, b in zip(g_HD_A, H_A_D):
if b == 0:
g_r_HD_A.append(a)
else:
g_r_HD_A.append(a / b)
max_g_HD_A = max(g_HD_A)
max_g_r_HD_A = max(g_r_HD_A)
best_feature_index = -1
if self.algorithm == "ID3":
# Step 4: If max gain lower than epsilon, vote for the max class node
if max_g_HD_A < self.epsilon:
return TreeNode(type='leaf', belong=sorted(y_dict, key=lambda x: y_dict[x])[-1])
# Step 5:
best_feature_index = indices[g_HD_A.index(max_g_HD_A)]
elif self.algorithm == "C4.5":
if max_g_r_HD_A < self.epsilon:
return TreeNode(type='leaf', belong=sorted(y_dict, key=lambda x: y_dict[x])[-1])
best_feature_index = indices[g_r_HD_A.index(max_g_r_HD_A)]
else:
print("WTF of %s algorithm?", self.algorithm)
# Build internal node using best_feature_index
node = TreeNode(type='internal', index=best_feature_index, subtree={})
new_subset = {}
for i in range(0, set_length):
if X_set[i][best_feature_index] in new_subset.keys():
new_subset[X_set[i][best_feature_index]].append(i)
else:
new_subset[X_set[i][best_feature_index]] = [i]
# Better to sort new_subset, subtree value range from value_min to value_max
new_subset = dict((k, new_subset[k]) for k in sorted(new_subset.keys()))
# Step 6:
# indices.remove(best_feature_index) # !!! FFFuck bug here
sub_indices = list(filter(lambda x: x != best_feature_index, indices))
for key in new_subset.keys():
subset_list = new_subset[key]
new_X_set = X_set[subset_list]
new_y_set = y_set[subset_list]
node.subtree[key] = self.build(new_X_set, new_y_set, sub_indices)
return node
@log
def predict(self, X_test):
n = len(X_test)
d = X_test.shape[1]
predict_label = np.full(n, -1)
for i in range(0, n):
to_predict = X_test[i]
node = self.root
while node.type != 'leaf':
node = node.subtree[to_predict[node.index]]
predict_label[i] = node.belong
# print("Sample %d predicted as %d" % (i, predict_label[i]))
return predict_label
def example_large():
mnist_data = pd.read_csv("../data/mnist.csv")
mnist_values = mnist_data.values
images = mnist_values[::, 1::]
labels = mnist_values[::, 0]
X_train, X_test, y_train, y_test = train_test_split(
images, labels, test_size=0.33, random_state=42
)
# Binary the image to avoid predict_probability gets 0
binarizer_train = Binarizer(threshold=127).fit(X_train)
X_train_binary = binarizer_train.transform(X_train)
binarizer_test = Binarizer(threshold=127).fit(X_test)
X_test_binary = binarizer_test.transform(X_test)
decision_tree = DecisionTree(algorithm="ID3", epsilon=0.01)
# decision_tree = DecisionTree(algorithm="C4.5", epsilon=0.01)
print("Decision tree building...")
decision_tree.train(X_train=X_train_binary, y_train=y_train)
print("Building complete...")
# Start predicting progress
print("Testing on %d samples..." % len(X_test))
y_predicted = decision_tree.predict(X_test=X_test_binary)
calc_accuracy(y_pred=y_predicted, y_truth=y_test)
def example_small():
X_train = np.asarray([[0, 0, 0, 0], [0, 0, 0, 1], [0, 1, 0, 1], [0, 1, 1, 0], [0, 0, 0, 0],
[1, 0, 0, 0], [1, 0, 0, 1], [1, 1, 1, 1], [1, 0, 1, 2], [1, 0, 1, 2],
[2, 0, 1, 2], [2, 0, 1, 1], [2, 1, 0, 1], [2, 1, 0, 2], [2, 0, 0, 0]])
y_train = np.asarray([0, 0, 1, 1, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 0])
X_test = np.asarray([[0, 0, 0, 0], [0, 0, 0, 1], [0, 1, 0, 1], [0, 1, 1, 0], [0, 0, 0, 0],
[1, 0, 0, 0], [1, 0, 0, 1], [1, 1, 1, 1], [1, 0, 1, 2], [1, 0, 1, 2],
[2, 0, 1, 2], [2, 0, 1, 1], [2, 1, 0, 1], [2, 1, 0, 2], [2, 0, 0, 0]])
y_test = np.asarray([0, 0, 1, 1, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 0])
decision_tree = DecisionTree(algorithm="ID3", epsilon=0.01)
# decision_tree = DecisionTree(type="C4.5")
print("Decision tree building...")
decision_tree.train(X_train=X_train, y_train=y_train)
print("Building complete...")
# Start predicting progress
print("Testing on %d samples..." % len(X_test))
y_predicted = decision_tree.predict(X_test=X_test)
calc_accuracy(y_pred=y_predicted, y_truth=y_test)
if __name__ == "__main__":
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# example_small()
example_large()
代码实现的过程中,由于在以下两个方面对Python——类的对象初始化,和函数传参的错误理解:参数是值传递的,实际上是引用传递;导致刚开始的时候出现几个Bug不能理解,主要体现如下:
- 在节点__init__(self, type=None, belong=None, index=None, subtree=None)函数中如果将subtree=None改为subtree={},意思是__init__函数就直接指定subtree是一个空字典(每次都是这个对象);而不是像代码中的每个节点在初始化的时候给一个空的字典(每次给一个独立对象);这样做的话会导致生成出来树的子节点一直循环递归引用。
- 代码在 Step 6 生成子树传参的时候,如果不按 sub_indices = list(filter(lambda x: x != best_feature_index, indices)) 生成一个新的特征列表往函数传参,而按 indices.remove(best_feature_index) 方式将remove后的特征列表传入函数;参数按引用方式传递,很明显递归会出错。
为了解决上述问题,拉了书上的数据做测试,第一个问题很容易解决;在第二个问题的时候,发现信息增益和书上的都一致,最后生成的决策树的结构也和书上相同,感觉在计算条件概率和熵上应该没有问题;于是拉了别人的实现,减小mnist训练数据规模进行一一比对,终于才发现问题出在函数传参上。
最终算法结果如下:
ID3
/Users/phd/Softwares/anaconda3/bin/python /Users/phd/Desktop/ML/decision_tree/decision_tree.py
Decision tree building...
Building complete...
DEBUG:root:train() cost 493.4455769062042 seconds
Testing on 13860 samples...
DEBUG:root:predict() cost 0.06820201873779297 seconds
Predicting accuracy 0.840548
Process finished with exit code 0
C4.5
/Users/phd/Softwares/anaconda3/bin/python /Users/phd/Desktop/ML/decision_tree/decision_tree.py
Decision tree building...
DEBUG:root:train() cost 2878.278247117996 seconds
DEBUG:root:predict() cost 0.4305226802825928 seconds
Building complete...
Testing on 13860 samples...
Predicting accuracy 0.831097
Process finished with exit code 0
两种算法的结果都还可以,只是C4.5运算量上去了花的时间也多了,结果还比ID3差点。。。
总结
参考
- 《统计学习方法》
更多推荐
所有评论(0)