李宏毅机器学习_作业2代码
决策树# -*- coding: utf-8 -*-"""@Author:Mart@Time:2021/6/17 21:43@version :Python3.7.4@Software:pycharm2020.3.2"""import operatorfrom math import logdef createDataSet():dataSet = [[1, 1, 'yes'],[1, 1, 'y
·
决策树
# -*- coding: utf-8 -*-
"""
@Author :Mart
@Time :2021/6/17 21:43
@version :Python3.7.4
@Software:pycharm2020.3.2
"""
import operator
from math import log
def createDataSet():
dataSet = [[1, 1, 'yes'],
[1, 1, 'yes'],
[1, 0, 'no'],
[0, 1, 'no'],
[0, 1, 'no']]
labels = ['no surfacing', 'flippers']
return dataSet, labels
def calcShannonEnt(dataSet):
numEntries = len(dataSet)
labelCounts = {}
for featVex in dataSet:
currentLable = featVex[-1]
if currentLable not in labelCounts.keys():
labelCounts[currentLable] = 0
labelCounts[currentLable] += 1
shannonEnt = 0.0
for key in labelCounts:
prob = float(labelCounts[key]) / numEntries
shannonEnt -= prob * log(prob, 2)
return shannonEnt
def splitDataSet(dataSet, index, value):
retDataset = []
for featVec in dataSet: # 整个样本
if featVec[index] == value:
# 特征1,特征2,特征3,特征4 -> featVec[:index] = 特征1
reducedFeatVec = featVec[:index]
# featVec[index+1:] = 特征3,特征4
reducedFeatVec.extend(featVec[index + 1:])
retDataset.append(reducedFeatVec)
return retDataset
def chooseBestFeatureToSplit(dataSet):
numFeatures = len(dataSet[0]) - 1
baseEntropy = calcShannonEnt(dataSet)
bestInfoGain, bestFeature = 0.0, -1
for i in range(numFeatures): # 色泽,声音,纹理。。。
featList = [example[i] for example in dataSet]
uniqueVals = set(featList)
newEntropy = 0.0
for value in uniqueVals: # 青绿,浅白。。。
subdataset = splitDataSet(dataSet, i, value)
prob = len(subdataset) / float(len(dataSet))
newEntropy += prob * calcShannonEnt(subdataset)
bestInfoGain_ = baseEntropy - newEntropy
if(bestInfoGain_ > bestInfoGain):
bestInfoGain = bestInfoGain_
bestFeature = i
return bestFeature
def majorityCnt(classList):
classCount = {}
for vote in classList:
if vote not in classCount.keys():
classCount[vote] = 0
classCount[vote] += 1
sortedClassCount = sorted(
classCount.items(),
key=operator.itemgetter(1),
reverse=True)
return sortedClassCount[0][0]
def createTree(dataSet, labels):
classList = [example[-1] for example in dataSet]
if classList.count(classList[0]) == len(classList):
return classList[0] # 如果数据里只有一种类别,直接返回
# a = dataSet[0]
if len(dataSet[0]) == 1:
return majorityCnt(classList) # 如果只有一个特征
bestFeat = chooseBestFeatureToSplit(dataSet)
bestFeatLabels = labels[bestFeat] # '纹理' 知道第一个特征选择的是纹理
myTree = {bestFeatLabels: {}}
del(labels[bestFeat])
featValues = [example[bestFeat] for example in dataSet]
uniqueValus = set(featValues)
for featValues in uniqueValus: # 在子数据集里递归建立新的决策树
subLabels = labels[:]
myTree[bestFeatLabels][featValues] = createTree(
splitDataSet(dataSet, bestFeat, featValues), subLabels)
return myTree
def classify(inputTree, featLabels, testVec):
firstStr = list(inputTree.keys())[0]
secondDict = inputTree[firstStr]
featIndex = featLabels.index(firstStr)
key = testVec[featIndex]
valueOfFeat = secondDict[key]
if isinstance(valueOfFeat, dict):
classLable = classify(valueOfFeat, featLabels, testVec)
else:
classLable = valueOfFeat
return classLable
def fishTest():
myDat, labels = createDataSet()
import copy
myTree = createTree(myDat, copy.deepcopy(labels))
print(classify(myTree, labels, [1, 1]))
if __name__ == "__main__":
fishTest()
"""
本质上就是算法数据结构里面的二叉树结构
"""
SVM
# -*- coding: utf-8 -*-
"""
@Author :Mart
@Time :2021/6/19 17:21
@version :Python3.7.4
@Software:pycharm2020.3.2
"""
from libsvm.commonutil import svm_read_problem
from libsvm.svmutil import svm_train, svm_predict, svm_save_model, svm_load_model
from libsvm import svm
import codecs
import os
import jieba
train_file = './data/cnews.train.txt' # training data file name
test_file = './data/cnews.test.txt' # test data file name
vocab = './data/cnews_dict.txt' # dictionary
with codecs.open(train_file, 'r', 'utf-8') as f:
lines = f.readlines()
# print sample content
label, content = lines[0].strip('\r\n').split('\t')
print(content)
# print word segment results
segment = jieba.cut(content)
print('/'.join(segment))
# cut data
def process_line(idx, line):
data = tuple(line.strip('\r\n').split('\t'))
if not len(data) == 2:
return None
content_segged = list(jieba.cut(data[1]))
if idx % 1000 == 0:
print('line number: {}'.format(idx))
return (data[0], content_segged)
# data loading method
def load_data(file):
with codecs.open(file, 'r', 'utf-8') as f:
lines = f.readlines()
data_records = [process_line(idx, line) for idx, line in enumerate(lines)]
data_records = [data for data in data_records if data is not None]
return data_records
# load and process training data
train_data = load_data(train_file)
print('first training data: label {} segment {}'.format(
train_data[0][0], '/'.join(train_data[0][1])))
# load and process testing data
test_data = load_data(test_file)
print('first testing data: label {} segment {}'.format(
test_data[0][0], '/'.join(test_data[0][1])))
def build_vocab(train_data, thresh):
vocab = {'<UNK>': 0}
word_count = {} # word frequency
for idx, data in enumerate(train_data):
content = data[1]
for word in content:
if word in word_count:
word_count[word] += 1
else:
word_count[word] = 1
word_list = [(k, v) for k, v in word_count.items()]
print('word list length: {}'.format(len(word_list)))
# sorted by word frequency
word_list.sort(key=lambda x: x[1], reverse=True)
word_list_filtered = [word for word in word_list if word[1] > thresh]
print('word list length after filtering: {}'.format(len(word_list_filtered)))
# construct vocab
for word in word_list_filtered:
vocab[word[0]] = len(vocab)
# vocab size is word list size +1 due to unk token
print('vocab size: {}'.format(len(vocab)))
return vocab
# vocab = build_vocab(train_data, 1)
def build_label_vocab(cate_file):
label_vocab = {}
with codecs.open(cate_file, 'r', 'utf-8') as f:
for lines in f:
line = lines.strip().split('\t')
label_vocab[line[0]] = int(line[1])
return label_vocab
label_vocab = build_label_vocab('./data/cnews.category.txt')
print('label vocab: {}'.format(label_vocab))
def construct_trainable_matrix(corpus, vocab, label_vocab, out_file):
records = []
for idx, data in enumerate(corpus):
if idx % 1000 == 0:
print('process {} data'.format(idx))
label = str(label_vocab[data[0]]) # label id
token_dict = {}
for token in data[1]:
token_id = vocab.get(token, 0)
if token_id in token_dict:
token_dict[token_id] += 1
else:
token_dict[token_id] = 1
feature = [str(int(k) + 1) + ':' + str(v)
for k, v in token_dict.items()]
feature_text = ' '.join(feature)
records.append(label + ' ' + feature_text)
with open(out_file, 'w') as f:
f.write('\n'.join(records))
vocab = build_vocab(train_data, 1)
# vocab = [word.strip() for word in open('./data/cnews.vocab.txt','r',encoding='utf-8').readlines()]
construct_trainable_matrix(
train_data,
vocab,
label_vocab,
'./data/train.svm.txt')
construct_trainable_matrix(
test_data,
vocab,
label_vocab,
'./data/test.svm.txt')
# train svm
train_label, train_feature = svm_read_problem('./data/train.svm.txt')
print(train_label[0], train_feature[0])
model = svm_train(train_label, train_feature, '-s 0 -c 5 -t 0 -g 0.5 -e 0.1')
# predict
test_label, test_feature = svm_read_problem('./data/test.svm.txt')
print(test_label[0], test_feature[0])
p_labs, p_acc, p_vals = svm_predict(test_label, test_feature, model)
print('accuracy: {}'.format(p_acc))
收入分析逻辑回归模型
# -*- coding: utf-8 -*-
"""
@Author :Mart
@Time :2021/6/19 20:36
@version :Python3.7.4
@Software:pycharm2020.3.2
"""
"""
https://blog.csdn.net/weixin_49272172/article/details/115261791
"""
# 导入相关库
import numpy as np
import matplotlib.pyplot as plt
# 添加文件路径
X_train_fpath = './data/X_train'
Y_train_fpath = './data/Y_train'
X_test_fpath = './data/X_test'
predict_fpath = './predict_{}.csv' # 用于测试集的预测输出
# 加载数据
with open(X_train_fpath) as f:
next(f)
X_train = np.array([line.strip('\n').split(',')[1:]
for line in f], dtype=float)
with open(Y_train_fpath) as f:
next(f)
Y_train = np.array([line.strip('\n').split(',')[1]
for line in f], dtype=float)
with open(X_test_fpath) as f:
next(f)
X_test = np.array([line.strip('\n').split(',')[1:]
for line in f], dtype=float)
# 一般的方法,但是这种方法计算比较慢
# X_mean = np.mean(X_train,axis=0) # 每个特征的均值
# X_std = np.std(X_train,axis=0) # 每个特征的标准差
# n = X_train.shape[0]
# m = X_train.shape[1]
# X = np.full_like(X_train,fill_value=0)
# for i in range(n):
# for j in range(m):
# X[i,j] = (X_train[i,j] - X_mean[j]) / (X_std[j]+1e-6) # 归一化数据
# X
# 归一化
def _normalize(X, train=True, specified_column=None, X_mean=None, X_std=None):
# This function normalizes specific columns of X.
# The mean and standard variance of training data will be reused when processing testing data.
#
# Arguments:
# X: data to be processed
# train: 'True' when processing training data, 'False' for testing data
# specific_column: indexes of the columns that will be normalized. If 'None', all columns
# will be normalized.
# X_mean: mean value of training data, used when train = 'False'
# X_std: standard deviation of training data, used when train = 'False'
# Outputs:
# X: normalized data
# X_mean: computed mean value of training data
# X_std: computed standard deviation of training data
if specified_column is None:
# 为每个数据添加索值
specified_column = np.arange(X.shape[1])
if train:
# 求取每个数据的平均值和标准差
X_mean = np.mean(X[:, specified_column], 0).reshape(1, -1)
X_std = np.std(X[:, specified_column], 0).reshape(1, -1)
# 归一化数据
X[:, specified_column] = (X[:, specified_column] - X_mean) / (X_std + 1e-8)
# 返回归一化后的数据,均值,标准差
return X, X_mean, X_std
# 分割训练集-验证集
def _train_dev_split(X, Y, dev_ratio=0.25):
# This function spilts data into training set and development set.
train_size = int(len(X) * (1 - dev_ratio))
return X[:train_size], Y[:train_size], X[train_size:], Y[train_size:]
X_train, X_mean, X_std = _normalize(X_train, train=True)
X_test, _, _ = _normalize(
X_test, train=False, specified_column=None, X_mean=X_mean, X_std=X_std)
# 设置训练集-验证集
dev_ratio = 0.1
X_train, Y_train, X_dev, Y_dev = _train_dev_split(
X_train, Y_train, dev_ratio=dev_ratio)
train_size = X_train.shape[0]
dev_size = X_dev.shape[0]
test_size = X_test.shape[0]
data_dim = X_train.shape[1]
print('Size of training set: {}'.format(train_size))
print('Size of development set: {}'.format(dev_size))
print('Size of testing set: {}'.format(test_size))
print('Dimension of data: {}'.format(data_dim))
# 打乱数据顺序,重新为minibatch分配
def _shuffle(X, Y):
# This function shuffles two equal-length list/array, X and Y, together.
randomize = np.arange(len(X))
np.random.shuffle(randomize)
return (X[randomize], Y[randomize])
# sigmoid函数
def _sigmoid(z):
# Sigmoid function can be used to calculate probability.
# To avoid overflow, minimum/maximum output value is set.
return np.clip(1 / (1.0 + np.exp(-z)), 1e-8, 1 - (1e-8))
# 向前传播然后利用sigmoid激活函数计算激活值
def _f(X, w, b):
# This is the logistic regression function, parameterized by w and b
#
# Arguements:
# X: input data, shape = [batch_size, data_dimension]
# w: weight vector, shape = [data_dimension, ]
# b: bias, scalar
# Output:
# predicted probability of each row of X being positively labeled, shape =
# [batch_size, ]
return _sigmoid(np.matmul(X, w) + b)
# 预测
def _predict(X, w, b):
# This function returns a truth value prediction for each row of X
# by rounding the result of logistic regression function.
return np.round(_f(X, w, b)).astype(np.int)
# 准确度
def _accuracy(Y_pred, Y_label):
# This function calculates prediction accuracy
acc = 1 - np.mean(np.abs(Y_pred - Y_label))
return acc
# 交叉熵损失函数
def _cross_entropy_loss(y_pred, Y_label):
# This function computes the cross entropy.
#
# Arguements:
# y_pred: probabilistic predictions, float vector
# Y_label: ground truth labels, bool vector
# Output:
# cross entropy, scalar
cross_entropy = -np.dot(Y_label, np.log(y_pred)) - \
np.dot((1 - Y_label), np.log(1 - y_pred))
return cross_entropy
# 计算梯度值
def _gradient(X, Y_label, w, b):
# This function computes the gradient of cross entropy loss with respect
# to weight w and bias b.
y_pred = _f(X, w, b)
pred_error = Y_label - y_pred
w_grad = -np.sum(pred_error * X.T, 1)
b_grad = -np.sum(pred_error)
return w_grad, b_grad
# 至此,模型已经建立完成,我们开始训练
# 将w和b初始化为0
w = np.zeros((data_dim,))
b = np.zeros((1,))
# 设置其他超参数(迭代次数,分批次大小,学习率)
max_iter = 100
batch_size = 128
learning_rate = 0.2
# 创建列表用来保存训练集和验证集的损失值和准确度
train_loss = []
dev_loss = []
train_acc = []
dev_acc = []
# 用来更新学习率
step = 1
# 训练
for epoch in range(max_iter):
# 每个epoch都会重新洗牌
X_train, Y_train = _shuffle(X_train, Y_train)
# 分批次训练
for idx in range(int(np.floor(train_size / batch_size))):
X = X_train[idx * batch_size:(idx + 1) * batch_size]
Y = Y_train[idx * batch_size:(idx + 1) * batch_size]
# 计算梯度值
w_grad, b_grad = _gradient(X, Y, w, b)
# 更新参数w和b
# 学习率随着迭代时间增加而减少
w = w - learning_rate / np.sqrt(step) * w_grad
b = b - learning_rate / np.sqrt(step) * b_grad
step = step + 2
# 参数总共更新了max_iter × (train_size/batch_size)次
# 计算训练集的损失值和准确度
y_train_pred = _f(X_train, w, b)
Y_train_pred = np.round(y_train_pred)
train_acc.append(_accuracy(Y_train_pred, Y_train))
train_loss.append(_cross_entropy_loss(y_train_pred, Y_train) / train_size)
# 计算验证集的损失值和准确度
y_dev_pred = _f(X_dev, w, b)
Y_dev_pred = np.round(y_dev_pred)
dev_acc.append(_accuracy(Y_dev_pred, Y_dev))
dev_loss.append(_cross_entropy_loss(y_dev_pred, Y_dev) / dev_size)
print('Training loss: {}'.format(train_loss[-1]))
print('Development loss: {}'.format(dev_loss[-1]))
print('Training accuracy: {}'.format(train_acc[-1]))
print('Development accuracy: {}'.format(dev_acc[-1]))
# Loss Curve
plt.plot(train_loss)
plt.plot(dev_loss)
plt.title('Loss Curve1')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'dev'])
plt.savefig('loss.png')
plt.show()
# Accuracy Curve
plt.plot(train_acc)
plt.plot(dev_acc)
plt.title('Accuracy Curve1')
plt.ylabel('acc')
plt.xlabel('epoch')
plt.legend(['train', 'dev'])
plt.savefig('acc.png')
plt.show()
# Predict testing labels
predictions = _predict(X_test, w, b)
with open(predict_fpath.format('logistic'), 'w') as f:
f.write('id,label\n')
for i, label in enumerate(predictions):
f.write('{},{}\n'.format(i, label))
# 打印一下数据前10项特征对应的权重
# ind = np.argsort(np.abs(w))[::-1]
# with open(X_test_fpath) as f:
# content = f.readline().strip('\n').split(',')
# features = np.array(content)
# for i in ind[0:10]:
# print(features[i], w[i])
收入分析生成模型
# -*- coding: utf-8 -*-
"""
@Author :Mart
@Time :2021/6/19 20:36
@version :Python3.7.4
@Software:pycharm2020.3.2
"""
"""
https://blog.csdn.net/weixin_49272172/article/details/115261791
"""
import numpy as np
# 添加文件路径
X_train_fpath = './data/X_train'
Y_train_fpath = './data/Y_train'
X_test_fpath = './data/X_test'
predict_fpath = './predict_{}.csv' # 用于测试集的预测输出
def _normalize(X, train=True, specified_column=None, X_mean=None, X_std=None):
# This function normalizes specific columns of X.
# The mean and standard variance of training data will be reused when processing testing data.
#
# Arguments:
# X: data to be processed
# train: 'True' when processing training data, 'False' for testing data
# specific_column: indexes of the columns that will be normalized. If 'None', all columns
# will be normalized.
# X_mean: mean value of training data, used when train = 'False'
# X_std: standard deviation of training data, used when train = 'False'
# Outputs:
# X: normalized data
# X_mean: computed mean value of training data
# X_std: computed standard deviation of training data
if specified_column is None:
# 为每个数据添加索值
specified_column = np.arange(X.shape[1])
if train:
# 求取每个数据的平均值和标准差
X_mean = np.mean(X[:, specified_column], 0).reshape(1, -1)
X_std = np.std(X[:, specified_column], 0).reshape(1, -1)
# 归一化数据
X[:, specified_column] = (X[:, specified_column] - X_mean) / (X_std + 1e-8)
# 返回归一化后的数据,均值,标准差
return X, X_mean, X_std
# 分割训练集-验证集
def _train_dev_split(X, Y, dev_ratio=0.25):
# This function spilts data into training set and development set.
train_size = int(len(X) * (1 - dev_ratio))
return X[:train_size], Y[:train_size], X[train_size:], Y[train_size:]
# Parse csv files to numpy array
with open(X_train_fpath) as f:
next(f)
X_train = np.array([line.strip('\n').split(',')[1:]
for line in f], dtype=float)
with open(Y_train_fpath) as f:
next(f)
Y_train = np.array([line.strip('\n').split(',')[1]
for line in f], dtype=float)
with open(X_test_fpath) as f:
next(f)
X_test = np.array([line.strip('\n').split(',')[1:]
for line in f], dtype=float)
# 设置训练集-验证集
dev_ratio = 0.1
X_train, Y_train, X_dev, Y_dev = _train_dev_split(
X_train, Y_train, dev_ratio=dev_ratio)
train_size = X_train.shape[0]
dev_size = X_dev.shape[0]
test_size = X_test.shape[0]
data_dim = X_train.shape[1]
# Normalize training and testing data
X_train, X_mean, X_std = _normalize(X_train, train=True)
X_test, _, _ = _normalize(
X_test, train=False, specified_column=None, X_mean=X_mean, X_std=X_std)
# 分别计算两个类别的每个特征的均值和标准差,
X_train_0 = np.array([x for x, y in zip(X_train, Y_train) if y == 0])
X_train_1 = np.array([x for x, y in zip(X_train, Y_train) if y == 1])
mean_0 = np.mean(X_train_0, axis=0)
mean_1 = np.mean(X_train_1, axis=0)
# Compute in-class covariance
cov_0 = np.zeros((data_dim, data_dim))
cov_1 = np.zeros((data_dim, data_dim))
for x in X_train_0:
cov_0 += np.dot(np.transpose([x - mean_0]),
[x - mean_0]) / X_train_0.shape[0]
for x in X_train_1:
cov_1 += np.dot(np.transpose([x - mean_1]),
[x - mean_1]) / X_train_1.shape[0]
# Shared covariance is taken as a weighted average of individual in-class
# covariance.
cov = (cov_0 * X_train_0.shape[0] + cov_1 * X_train_1.shape[0]
) / (X_train_0.shape[0] + X_train_1.shape[0])
# 计算权重矩阵w和偏置向量b
# Compute inverse of covariance matrix.
# Since covariance matrix may be nearly singular, np.linalg.inv() may give a large numerical error.
# Via SVD decomposition, one can get matrix inverse efficiently and accurately.
u, s, v = np.linalg.svd(cov, full_matrices=False)
inv = np.matmul(v.T * 1 / s, u.T)
# Directly compute weights and bias
w = np.dot(inv, mean_0 - mean_1)
b = (-0.5) * np.dot(mean_0, np.dot(inv, mean_0)) + 0.5 * np.dot(mean_1,
np.dot(inv, mean_1)) + np.log(float(X_train_0.shape[0]) / X_train_1.shape[0])
# 打乱数据顺序,重新为minibatch分配
def _shuffle(X, Y):
# This function shuffles two equal-length list/array, X and Y, together.
randomize = np.arange(len(X))
np.random.shuffle(randomize)
return (X[randomize], Y[randomize])
# sigmoid函数
def _sigmoid(z):
# Sigmoid function can be used to calculate probability.
# To avoid overflow, minimum/maximum output value is set.
return np.clip(1 / (1.0 + np.exp(-z)), 1e-8, 1 - (1e-8))
# 向前传播然后利用sigmoid激活函数计算激活值
def _f(X, w, b):
# This is the logistic regression function, parameterized by w and b
#
# Arguements:
# X: input data, shape = [batch_size, data_dimension]
# w: weight vector, shape = [data_dimension, ]
# b: bias, scalar
# Output:
# predicted probability of each row of X being positively labeled, shape =
# [batch_size, ]
return _sigmoid(np.matmul(X, w) + b)
# 预测
def _predict(X, w, b):
# This function returns a truth value prediction for each row of X
# by rounding the result of logistic regression function.
return np.round(_f(X, w, b)).astype(np.int)
# 准确度
def _accuracy(Y_pred, Y_label):
# This function calculates prediction accuracy
acc = 1 - np.mean(np.abs(Y_pred - Y_label))
return acc
# Compute accuracy on training set
Y_train_pred = 1 - _predict(X_train, w, b)
print('Training accuracy: {}'.format(_accuracy(Y_train_pred, Y_train)))
# Predict testing labels
predictions = 1 - _predict(X_test, w, b)
with open(predict_fpath.format('generative'), 'w') as f:
f.write('id,label\n')
for i, label in enumerate(predictions):
f.write('{},{}\n'.format(i, label))
# 打印一下数据前10项特征对应的权重
# ind = np.argsort(np.abs(w))[::-1]
# with open(X_test_fpath) as f:
# content = f.readline().strip('\n').split(',')
# features = np.array(content)
# for i in ind[0:10]:
# enumerate(predictions):
# f.write('{},{}\n'.format(i, label))
# 打印一下数据前10项特征对应的权重
# ind = np.argsort(np.abs(w))[::-1]
# with open(X_test_fpath) as f:
# content = f.readline().strip('\n').split(',')
# features = np.array(content)
# for i in ind[0:10]:
# print(features[i], w[i])
更多推荐
所有评论(0)