包含数据降维和可视化
Read_File模块https://blog.csdn.net/wangdiedang/article/details/125335812?spm=1001.2014.3001.5502

import math
import numpy as np
from sklearn.model_selection import train_test_split
import Read_File as RF
import time

def featureExtraction(img, dim, num):
    res = np.empty((dim, dim))
    for i in range(0, dim):
        for j in range(0, dim):
            # 算出每一个片区像素点的个数 若大于某一特定数则设为1
            tmp = img[num * i:num * (i + 1), num * j:num * (j + 1)].sum()
            if tmp > max((28 // dim - 1), 1):
                res[i, j] = 1
            else:
                res[i, j] = 0
    return res


# 输入图像集和转化维度
def Extraction2AllImgs(imgs, dim):
    res = np.empty((imgs.shape[0], dim, dim))
    num = 28 // dim
    for k, img in enumerate(imgs):
        # 对于每一个图像进行特征降维
        res[k] = featureExtraction(imgs[k], dim, num)
    return res


def read_data(dim=7):
    # 返回生数据
    a, b, c, d = RF.read_main()
    # 降低训练集和测试集的特征维度 初始28*28转化为dim*dim
    if dim < 28:
        a = Extraction2AllImgs(a, dim)
        c = Extraction2AllImgs(c, dim)
    return a, b, c, d


# 求numpy array中数值等于某值的元素数量
def equalNums(x, y):
    if x is None:
        return 0
    else:
        return x[x == y].size


# 计算某特征(feature)条件下y的信息熵
def singleEntropy(x):
    # 转换为 numpy 矩阵
    x = np.asarray(x)
    # 取所有不同值
    xValues = set(x)
    # 计算熵值
    entropy = 0
    for xValue in xValues:
        p = equalNums(x, xValue) / x.size
        entropy -= p * math.log(p, 2)
    return entropy


# 计算某属性划分下某特征(feature)条件下y的信息熵
def conditionnalEntropy(feature, y):
    # 转换为numpy
    feature = np.asarray(feature)
    y = np.asarray(y)
    # 取特征的不同值
    featureValues = set(feature)
    # 计算熵值
    entropy = 0
    for feat in featureValues:
        # feature == feat 是得到取feature中所有元素值等于feat的元素的索引(类似这样理解)
        # y[feature == feat] 是取y中 feature元素值等于feat的元素索引的 y的元素的子集
        p = equalNums(feature, feat) / feature.size
        entropy += p * singleEntropy(y[feature == feat])
    return entropy


# 计算信息增益
def infoGain(feature, y):
    return singleEntropy(y) - conditionnalEntropy(feature, y)


# 计算信息增益率
def infoGainRatio(feature, y):
    return 0 if singleEntropy(feature) == 0 else infoGain(feature, y) / singleEntropy(feature)


# 特征选取,选取信息增益率最大的特征
def bestFeature(data, labels):
    data = np.asarray(data)
    labels = np.asarray(labels)
    # 特征数量  即 data 的列数量
    featureNum = data.shape[1]
    # 计算最佳特征
    bestEnt = 0
    bestFeat = -1
    for feature in range(featureNum):
        ent = infoGainRatio(data[:, feature], labels)
        if ent >= bestEnt:
            bestEnt = ent
            bestFeat = feature
    return bestFeat


# 根据特征及特征值分割原数据集  删除data中的feature列,并根据feature列中的值分割 data和label
def splitFeatureData(data, labels, feature):
    # 取特征列
    features = np.asarray(data)[:, feature]
    # 数据集中删除特征列
    data = np.delete(np.asarray(data), feature, axis=1)
    # 标签
    labels = np.asarray(labels)

    uniqFeatures = set(features)
    dataSet = {}
    labelSet = {}
    for feat in uniqFeatures:
        dataSet[feat] = data[features == feat]
        labelSet[feat] = labels[features == feat]
    return dataSet, labelSet


# 如果没有待分类特征,采用多数投票
def voteLabel(labels):
    uniqLabels = list(set(labels))
    labels = np.asarray(labels)
    labelNum = []
    for label in uniqLabels:
        # 统计每个标签值得数量
        labelNum.append(equalNums(labels, label))
    # 返回数量最大的标签
    return uniqLabels[labelNum.index(max(labelNum))]


# 创建预剪枝决策树
def createTreePrePruning(dataTrain, labelTrain, dataTest, labelTest, names):
    trainData = np.asarray(dataTrain)
    labelTrain = np.asarray(labelTrain)
    testData = np.asarray(dataTest)
    labelTest = np.asarray(labelTest)
    names = np.asarray(names)
    # 如果结果为单一结果
    if len(set(labelTrain)) == 1:
        return labelTrain[0]
        # 如果没有待分类特征
    elif trainData.size == 0:
        return voteLabel(labelTrain)
    # 其他情况则选取特征
    bestFeat = bestFeature(dataTrain, labelTrain)
    # 取特征名称
    bestFeatName = names[bestFeat]
    # 从特征名称列表删除已取得特征名称
    names = np.delete(names, [bestFeat])
    # 根据最优特征进行分割
    dataTrainSet, labelTrainSet = splitFeatureData(dataTrain, labelTrain, bestFeat)

    # 预剪枝评估
    # 划分前的分类标签
    labelTrainLabelPre = voteLabel(labelTrain)
    # 划分后的精度计算
    if dataTest is not None:
        dataTestSet, labelTestSet = splitFeatureData(dataTest, labelTest, bestFeat)
        # 划分前的测试标签正确比例
        labelTestRatioPre = equalNums(labelTest, labelTrainLabelPre) / labelTest.size
        # 划分后 每个特征值的分类标签正确的数量
        labelTrainEqNumPost = 0
        for val in labelTrainSet.keys():
            labelTrainEqNumPost += equalNums(labelTestSet.get(val), voteLabel(labelTrainSet.get(val)))
        # 划分后 正确的比例
        labelTestRatioPost = labelTrainEqNumPost / labelTest.size

        # 没有评估数据
    if dataTest is None:
        return labelTrainLabelPre
        # 如果划分后的精度相比划分前的精度下降, 则直接作为叶子节点返回
    elif labelTestRatioPost < labelTestRatioPre:
        return labelTrainLabelPre
    else:
        # 根据选取的特征名称创建树节点
        decisionTree = {bestFeatName: {}}
        # 对最优特征的每个特征值所分的数据子集进行计算
        for featValue in dataTrainSet.keys():
            decisionTree[bestFeatName][featValue] = createTreePrePruning(dataTrainSet.get(featValue),
                                                                         labelTrainSet.get(featValue),
                                                                         dataTestSet.get(featValue),
                                                                         labelTestSet.get(featValue),
                                                                         names)
    return decisionTree


# 返回预测结果
def testResult(test_imgs, allTree):
    row, col = np.asarray(test_imgs).shape
    new_test_imgs = np.asarray(test_imgs).reshape(row * col)
    tree = allTree
    while str(type(tree)) == "<class 'dict'>":
        stri = list(tree.keys())[0]
        num = int(stri[2:]) - 1
        if tree[stri].get(new_test_imgs[num]) is None:
            tree=tree[stri][1 - new_test_imgs[num]]
        else:
            tree = tree[stri][new_test_imgs[num]]
    return tree


def main(train_imgs, train_labels, test_imgs, test_labels):
    x, y, z = np.array(train_imgs).shape
    data = np.array(train_imgs).reshape(x, y * z)

    name = []
    for i in range(y * z):
        name.append('节点' + str(i + 1))

    # 划分训练集和验证集
    x_train, x_test, y_train, y_test = train_test_split(data, train_labels, test_size=0.3, random_state=42)
    # 生成决策树
    TreeTrain = createTreePrePruning(x_train, y_train, x_test, y_test, name)
    print(TreeTrain)

    count = 0
    allnum = 0
    for i in range(test_imgs.shape[0]):
        t = testResult(test_imgs[i], TreeTrain)
        allnum += 1
        print(t, test_labels[i])
        if t == test_labels[i]:
            count += 1
    current_time = time.time()
    print("--------------------------------------------------------------")
    print("样本尺寸(%d, %d)" % (dim, dim))
    print('训练集样本数:%d,模型测试正确数:%d,模型测试正确率: %.3f' % (allnum, count, count / allnum * 100) + "%")
    print("运行时间为" + str(current_time - old_time) + "s")
    print("--------------------------------------------------------------")


if __name__ == '__main__':
    dim = 14
    old_time = time.time()
    train_imgs, train_labels, test_imgs, test_labels = read_data(dim)
    main(train_imgs, train_labels, test_imgs, test_labels)

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐