实际使用java来编写和使用spark的几个例子
一、使用Java语言开发sparkstreaming完成WordCountpackage Test;import org.apache.spark.SparkConf;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.ap
·
参考致谢:
一、使用Java语言开发sparkstreaming完成WordCount
package Test;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
public class Test31 {
/**
* 使用Java语言开发sparkstreaming完成WordCount
*/
public static void main(String[] args) throws InterruptedException {
//0.TODO 准备环境
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkDemo").setMaster("local[*]");
// JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// jsc.setLogLevel("WARN");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));
//1.TODO 加载数据
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("node1", 9999);
//2.TODO 处理数据-WordCount
JavaPairDStream<String, Integer> result = lines.flatMap(line -> Arrays.asList(line.split(",", -1)).iterator())
.mapToPair(word -> new Tuple2<>(word, 1))
.reduceByKey((a, b) -> a + b);
//3.TODO 输出结果
result.print();
//4.TODO 启动并等待停止
jssc.start();
jssc.awaitTermination();
//nc -lk 999
}
}
二、使用Java语言开发SqarkSQL完成WordCount
package TTest;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.col;
import java.util.Arrays;
public class TTest32 {
/**
* 使用Java语言开发sparkstreaming完成WordCount
*/
public static void main(String[] args) throws InterruptedException {
//0.TODO 准备环境
SparkSession spark = SparkSession.builder().appName("JavaSparkDemo").master("local[*]").getOrCreate();
spark.sparkContext().setLogLevel("Warn");
//1.TODO 加载数据
Dataset<String> ds = spark.read().textFile("");
//2.TODO 处理数据-WordCount
Dataset<String> wordsDS = ds.flatMap((String line) -> Arrays.asList(line.split(" ")).iterator(), Encoders.STRING());
//TODO====SQL
wordsDS.createOrReplaceGlobalTempView("t_word");
String sql="select value,count(*) as counts"+
"from t_word"+
"group by value"+
"order by counts desc";
spark.sql(sql).show();
//TODO ====DSL
/* Dataset<Row> temp = wordsDS.groupBy("value")
.count();
temp.orderBy(temp.col("count").desc()).show();*/
wordsDS.groupBy("value")
.count()
.orderBy(col("count").desc()).show();
//3.TODO 输出结果
//4.TODO 启动并等待停止
spark.stop();
//nc -lk 999
}
}
三、使用Java语言开发SparkMlLib-线性回归算法-房价预测案例
package TTest;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
public class TTest34 {
/**
* 使用Java语言开发SparkMlLib-线性回归算法-房价预测案例
*/
public static void main(String[] args) throws InterruptedException, StreamingQueryException {
//0.TODO 准备环境
SparkSession spark = SparkSession.builder()
.appName("JavaSparkDemo")
.master("local[*]")
.getOrCreate();
spark.sparkContext().setLogLevel("WARN");
//TODO 1.加载数据
Dataset<Row> homedata = spark.read()
.format("csv")
.option("sep", "|")//指定分隔符
.option("header", "true")//是否有表头
.option("inferSchema", "true")//是否自动推断约束
.load("TTest/homeprice.data");
homedata.printSchema();
homedata.show();
//TODO 2.特征处理
//特征选择
Dataset<Row> featuredDF = homedata.select("sqFt", "age", "ares", "price");
//特征向量化
new VectorAssembler
//TODO 3.数据集划分0.8训练集/0.2测试集
Dataset<Row>[] arr=vectorDF.randomSplit(new double[]{0.8,0.2},100);
Dataset<Row> trainSet=arr[0];
Dataset<Row> testSet=arr[1];
//TODO 4.使用训练集训练线性回归模型
LinearRegressionModel model=new LinearRegression()
.setFeaturesCol("features")//设置特征列
.setLabelCol("price")//设置标签列(数据中已经标记好的原来的价格)
.setPredictionCol("predict_price")//设置预测列(后续做预测时预测的价格)
.setMaxIter(10)//最大迭代次数
.fit(trainSet);//使用训练集进行训练
//TODO 5.使用测试集队模型进行测试
Dataset<Row> testResult=model.transform(testSet);
testResult.show();
//TODO 6.计算误差rmse均方误差
double rmse=RegressionEvaluator evaluator=new RegressionEvaluator()//创建误差评估器
.setMetricName("rmse")//设置要计算的误差名称,均方根误差(sum({y-y')^2)/n)^0.5
.setLabelCol("price")//设置真实值是哪一列
.setPredictionCol("predict_price")//设置预测值是哪一列
.evaluator.evaluate(testResult);//对数据中的真实值和预测值进行误差计算
System.out.println("rmse为:"+rmse);
//TODO 7.模型保存(save)方便后续使用(load)
//model.save(path);//后续使用
//LinearRegressionModel lmodel=LinearRegressionModel.load(path);//后续使用
//TODO 8.关闭资源
spark.stop();
}
}
更多推荐
所有评论(0)