Spark ML Pipeline机器学习流程回归分析
概述使用“Bike Sharing”数据集,使用Spark机器学习流程(ML Pipeline)回归分析,使用决策树回归分析,在不同情况(季节、月份、时间、假日、星期、工作日、天气、温度、体感温度、湿度、风速等)来预测每个小时的租用数量,并且使用训练验证与交叉验证找出最佳模型,提高预测准确度,最后使用GBT(Gradient-Boosted Tree)梯度提升决策树,进一步提高预测准确度。分为三个
概述
使用“Bike Sharing”数据集,使用Spark机器学习流程(ML Pipeline)回归分析,使用决策树回归分析,在不同情况(季节、月份、时间、假日、星期、工作日、天气、温度、体感温度、湿度、风速等)来预测每个小时的租用数量,并且使用训练验证与交叉验证找出最佳模型,提高预测准确度,最后使用GBT(Gradient-Boosted Tree)梯度提升决策树,进一步提高预测准确度。
分为三个阶段:
- 建立机器学习流程pipeline
包含3个阶段(stages),前2个阶段是数据处理,第3个阶段是DecisionTreeRegressor,机器学习决策树回归分析。
VectorAssembler:将所有的特征字段整合成Vector.
VectorIndexer:将不重复数值的数量小于等于maxCategories参数值对应的字段视为分类字段,否则视为数值字段。
DecisionTreeRegressor:决策树回归分析。 - 训练
“训练数据DataFrame”使用pipeline.fit()进行训练,系统会按照顺序执行每一个字段,最后产生pipelineModel模型,pipelineModel与pipeline类似,只是多了训练后建立的模型Model。 - 预测
“新数据DataFrame”使用pipelineModel.transform(),系统会按照顺序执行每一个字段,最后使用DecisionTree Regressor Model进行预测。预测完成后会产生“预测结构DataFrame”。
数据集下载:https://download.csdn.net/download/qq_41986872/12519365
书籍:Python Spark 2.0 Hadoop机器学习与大数据实战_林大贵(著)
实现过程
2.1 数据准备
导入数据,查看项数,查看所有字段:
hour_df=spark.read.format('csv')\
.option("header","true").load("hour.csv")
hour_df.count()
print(hour_df.columns)
可以看到一共17379数据。
舍弃不需要的字段:
hour_df=hour_df.drop("instant").drop("dteday")\
.drop("yr").drop("casual").drop("registered")
使用printSchema()查看导入数据的Schema,可以看到所有的字段都是String数据格式:
hour_df.printSchema()
导入相关模块,用来读取字段数据,将string转为double:
from pyspark.sql.functions import col
hour_df=hour_df.select([col(column).cast("double").alias(column)for column in hour_df.columns])
hour_df.printSchema()
查看前5条数据:
hour_df.show(5)
将数据以7:3的比例分成train_df(训练数据)与test_df(测试数据),并且.cache()暂存在内存中,以加快后续程序运行速度:
train_df,test_df=hour_df.randomSplit([0.7,0.3])
train_df.cache()
test_df.cache()
2.2 建立机器学习pipeline流程
使用VectorIndexer,在数据集中有月份(112)、星期(06)、小时(0~23)等字段。在决策树运算时,如果这些字段被视为分类字段,计算后准确率会比较高。
将设置maxCategories=24,月份(不重复数值的数量为12),星期(不重复数值的数量为7)、小时(不重复数值的数量为24)不重复数值的数量小于等于maxCategories,所以月份、星期、小时都会被决策树算法视为分类字段。
导入模块:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer,VectorIndexer,VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
创建特征字段List,hour_df除了最后一个字段是label,其他都是特征字段:
featuresCols=hour_df.columns[:-1]
print (featuresCols)
建立pipeline,并查看:
vectorAssembler = VectorAssembler(inputCols=featuresCols,outputCol="aFeatures")
vectorIndexer = VectorIndexer(inputCol="aFeatures",outputCol='features',maxCategories=24)
dt = DecisionTreeRegressor(labelCol='cnt',featuresCol='features')
dt_pipeline = Pipeline(stages=[vectorAssembler,vectorIndexer,dt])
dt_pipeline.getStages()
2.3 使用dt_pipeline进行数据处理与训练
使用dt_pipeline.fit进行训练,传入train_df训练数据,训练数据执行pipeline的所有字段vectorAssembler,vectorIndexer和dt,最后结构是dt_pipelineModel:
dt_pipelineModel=dt_pipeline.fit(train_df)
查看训练完的决策树模型:
dt_pipelineModel.stages[2]
print(dt_pipelineModel.stages[2].toDebugString[:500])
2.4 使用pipelineModel进行预测
使用transfrom方法传入test_df测试数据并进行预测:
predicted_df=dt_pipelineModel.transform(test_df)
查看新增的字段,查看预测结果:
print(predicted_df.columns)
predicted_df.select('season','mnth','hr','holiday','weekday','workingday',\
'weathersit','temp','atemp','hum','windspeed','cnt','prediction').show(10)
2.5 评估模型的准确率
导入RegressionEvaluator模块,创建RegressionEvaluator,传入labelCol、predictCol、metricName参数;
from pyspark.ml.evaluation import RegressionEvaluator
evaluator=RegressionEvaluator(labelCol='cnt',
predictionCol='prediction',metricName='rmse')
计算RMSE:
predicted_df=dt_pipelineModel.transform(test_df)
rmse=evaluator.evaluate(predicted_df)
rmse
2.6 使用crossValidation进行交叉验证找出最佳模型
导入模块,创建交叉验证的CrossValidator:
from pyspark.ml.tuning import CrossValidator
cv=CrossValidator(estimator=dt,evaluator=evaluator,
estimatorParamMaps=paramgrid,numFolds=3)
创建交叉验证的CrossValidator,建立交叉验证的cv_pipeline:
cv_pipeline=Pipeline(stages=[vectorAssembler,vectorIndexer,cv])
cv_pipelineModel=cv_pipeline.fit(train_df)
评估最佳模型RMSE:
predicted_df=cv_pipelineModel.transform(test_df)
rmse=evaluator.evaluate(predicted_df)
rmse
2.7 使用GBT Regression
GBT(Gradient-Boosted)或GBDT(Gradient-Boosted DecisionTree)梯度提升决策树与随机森林Random Forest都是集合很多决策树,不同的是训练的方式。Random Forest可以并行产生很多决策树,再整合了所有决策树的投票结果,将投票次数最多的类别作为最终的分类。GBT一次只产生一颗决策树,再根据前一个决策树的结果决定如何产生下一个决策树,所以无法并行处理。
导入GBTRegression模块,创建GBT Regression:
from pyspark.ml.regression import GBTRegressor
gbt=GBTRegressor(labelCol='cnt',featuresCol="features")
gbt_pipelines=Pipeline(stages=[vectorAssembler,vectorIndexer,gbt])
使用GBTRegression流程训练评估,RMSE越低,代表误差越小:
gbt_pipelineModel=gbt_pipelines.fit(train_df)
predicted_df=gbt_pipelineModel.transform(test_df)
rmse=evaluator.evaluate(predicted_df)
rmse
设置CrossValidator找出最佳模型:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder
from pyspark.ml import Pipeline
paramgrid=ParamGridBuilder()\
.addGrid(gbt.maxDepth,[5,10])\
.addGrid(gbt.maxBins,[25,40])\
.addGrid(gbt.maxIter,[10,50])\
.build()
cv=CrossValidator(estimator=gbt,evaluator=evaluator,
estimatorParamMaps=paramgrid,numFolds=3)
cv_pipeline=Pipeline(stages=[vectorAssembler,vectorIndexer,cv])
传入train_df训练数据,执行交叉验证,查看最佳模型:
cv_pipelineModel=cv_pipeline.fit(train_df)
cvm=cv_pipelineModel.stages[2]
gbestModel=cvm.bestModel
print(gbestModel.toDebugString[:500])
使用最佳模型进行预测,计算最佳模型RMSE:
predicted_df=cv_pipelineModel.transform(test_df)
predicted_df.select('season','mnth','hr','holiday','weekday','workingday',\
'weathersit','temp','atemp','hum','windspeed','cnt','prediction').show(10)
evaluator=RegressionEvaluator(metricName="rmas",labelCol='cnt',predictionCol='prediction')
rmse=evaluator.evaluate(predicted_df)
rmse
结论
这里介绍了Spark机器学习流程(ML Pipeline)多元分类,包括建立pipeline机器学习流程,完成数据处理、训练模型、评估模型,并预测每一小时租借总数量,最后使用训练验证与交叉验证找出最佳模型,提高预测准确度。
更多推荐
所有评论(0)