大数据——SparkSQL练习题
现有学生成绩表、学生信息表、老师信息表和课程信息表,根据要求得到相应的结果import java.io.{FileWriter, PrintWriter}import org.apache.spark.SparkConfimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.{DataFrame, Dataset, Row, Spark
·
现有学生成绩表、学生信息表、老师信息表和课程信息表,根据要求得到相应的结果
import java.io.{FileWriter, PrintWriter}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
object Practice extends Serializable {
case class Course(c_courseId:Int,c_course:String,c_teacherId:Int)
case class Teacher(t_teacherId:Int,t_teacherName:String)
case class Student(stu_stuId:Int,stu_stuName:String,stu_birthday:String,stu_gender:String)
case class Score(s_stuId:Int,s_courseId:Int,s_score:Float)
case class Stu_course(_stuId:Int,_courseId:Int)
//隐式类,为了方便类型转换
implicit class StrToAny(v:String){
val ps = v.split(",")
def toCourse = {
Course(ps(0).toInt,ps(1),ps(2).toInt)
}
def toTeacher = {
Teacher(ps(0).toInt,ps(1))
}
def toStudent = {
Student(ps(0).toInt,ps(1),ps(2),ps(3))
}
def toScore = {
Score(ps(0).toInt,ps(1).toInt,ps(2).toFloat)
}
def toScoreSrc = {
Stu_course(ps(0).toInt,ps(1).toInt)
}
}
def main(args: Array[String]): Unit = {
//创建维度表,用于补全学生成绩
/*val writer = new PrintWriter(new FileWriter("file/scoreSrc.txt"))
for (stuId <- 1 to 8;courseId <- 1 to 3) {
writer.println(s"$stuId,$courseId")
}
writer.close()*/
val conf = new SparkConf().setMaster("local[*]").setAppName("practice")
val spark = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
val sc = spark.sparkContext
val courseRdd: RDD[Course] = sc.textFile("file:///D:\\Study\\13_spark\\spark_sql\\file\\course.txt")
.mapPartitions(_.map(_.toCourse))
val scoreRdd: RDD[Score] = sc.textFile("file:///D:\\Study\\13_spark\\spark_sql\\file\\score.txt")
.mapPartitions(_.map(_.toScore))
val studentRdd: RDD[Student] = sc.textFile("file:///D:\\Study\\13_spark\\spark_sql\\file\\student.txt")
.mapPartitions(_.map(_.toStudent))
val teacherRdd = sc.textFile("file:///D:\\Study\\13_spark\\spark_sql\\file\\teacher.txt")
.mapPartitions(_.map(_.toTeacher))
val stu_courseRdd: RDD[Stu_course] = sc.textFile("file:///D:\\Study\\13_spark\\spark_sql\\file\\scoreSrc.txt")
.mapPartitions(_.map(_.toScoreSrc))
val courseFrame: DataFrame = spark.createDataFrame(courseRdd)
val scoreSrcFrame: DataFrame = spark.createDataFrame(scoreRdd)
val studentFrame: DataFrame = spark.createDataFrame(studentRdd)
val teacherFrame: DataFrame = spark.createDataFrame(teacherRdd)
val stu_courseFrame: DataFrame = spark.createDataFrame(stu_courseRdd)
//stu_courseFrame.show(50)
//补全成绩
val scoreFrame: DataFrame = scoreSrcFrame.join(stu_courseFrame, $"s_stuId" === $"_stuId" and $"s_courseId" === $"_courseId", "right")
.select($"_stuId".as("s_stuId"), $"_courseId".as("s_courseId"), when($"s_score".isNull, 0.0f).otherwise($"s_score").as("s_score"))
.orderBy("s_stuId", "s_courseId").toDF()
//练习1:查询" 01 "课程比" 02 "课程成绩高的学生的信息(没有成绩不做比较)
println("查询\" 01 \"课程比\" 02 \"课程成绩高的学生的信息及课程分数(没有成绩不做比较)")
scoreFrame.filter($"s_courseId"===1)
.join(scoreFrame.filter($"s_courseId"===2 and $"s_score".gt(0.0f))
.select($"s_stuId".as("_s_stuId"),$"s_score".as("_s_score")),
$"s_stuId"===$"_s_stuId","inner")
.filter($"s_score".gt($"_s_score"))
.join(studentFrame,$"s_stuId"===$"stu_stuId","inner")
.select($"stu_stuId",$"stu_stuName")
.show()
println("========================================================================")
//练习2:查询平均成绩大于等于 60 分的同学的学生编号和学生姓名和平均成绩(无成绩不参与平均值计算)
println("查询平均成绩大于等于 60 分的同学的学生编号和学生姓名和平均成绩(无成绩不参与平均值计算)")
scoreSrcFrame.groupBy($"s_stuId").agg(avg($"s_score").as("avgScore"))
.filter($"avgScore".gt(60.0f))
.join(studentFrame,$"s_stuId"===$"stu_stuId","inner")
.select("*").orderBy(desc("avgScore"))
.show()
println("========================================================================")
//练习3:查询所有同学的学生编号、学生姓名、选课总数、所有课程的总成绩(没成绩的显示为 null )
scoreSrcFrame.groupBy($"s_stuId")
.agg(count($"s_courseId").as("cnt"),
sum($"s_score").as("sumScore"))
.join(studentFrame,$"s_stuId"===$"stu_stuId","right")
.select($"stu_stuId",$"stu_stuName",
when($"cnt".isNull,null).otherwise($"cnt").as("cnt"),
when($"sumScore".isNull,null).otherwise($"sumScore").as("sumScore"))
.orderBy(desc("sumScore"))
.show()
println("========================================================================")
//练习4:查询「李」姓老师的数量
println("查询「李」姓老师的数量")
println(teacherFrame.filter($"t_teacherName".like("李_"))
.count())
println("========================================================================")
//练习5:查询学过「张三」老师授课的同学的信息
teacherFrame.filter($"t_teacherName"==="张三")
.join(courseFrame,$"c_teacherId"===$"t_teacherId","inner")
.join(scoreSrcFrame,$"c_courseId"===$"s_courseId","inner")
.join(studentFrame,$"stu_stuId"===$"s_stuId")
.select($"stu_stuId",$"stu_stuName",$"stu_gender")
.show()
println("========================================================================")
//练习6:查询没有学全所有课程的同学的信息
println("查询没有学全所有课程的同学的信息")
scoreFrame.filter($"s_score"===0.0f)
.dropDuplicates("s_stuId")
.join(studentFrame,$"s_stuId"===$"stu_stuId")
.select($"stu_stuId",$"stu_stuName",$"stu_gender")
.show()
println("========================================================================")
//练习7:查询至少有一门课与学号为" 01 "的同学所学相同的同学的信息
println("查询至少有一门课与学号为\" 01 \"的同学所学相同的同学的信息")
scoreSrcFrame.filter($"s_stuId"=!=1)
.select($"s_stuId".as("stuId"),$"s_courseId".as("courseId"))
.join(scoreFrame.filter($"s_stuId"===1).select($"s_courseId"),$"courseId"===$"s_courseId","inner")
.select($"stuId").dropDuplicates("stuId")
.join(studentFrame,$"stuId"===$"stu_stuId","inner")
.select($"stu_stuId",$"stu_stuName",$"stu_gender")
.show()
println("========================================================================")
//练习8:查询和" 01 "号的同学学习的课程 完全相同的其他同学的信息
println("查询和\" 01 \"号的同学学习的课程 完全相同的其他同学的信息")
val arr = scoreSrcFrame.filter($"s_stuId" === 1).select($"s_courseId").collect().map(_.toSeq.head.toString.toInt)
scoreSrcFrame.filter($"s_stuId"=!=1 and $"s_courseId".isInCollection(arr))
.groupBy($"s_stuId")
.agg(count($"s_stuId").as("cnt"))
.filter($"cnt"===arr.length)
.join(studentFrame,$"s_stuId"===$"stu_stuId","inner")
.select($"stu_stuId",$"stu_stuName",$"stu_gender")
.show()
println("========================================================================")
//意外收获:行转列,没有补空值
scoreSrcFrame.groupBy("s_stuId").pivot($"s_courseId").sum("s_score").show()
//练习9:查询没学过"张三"老师讲授的任一门课程的学生姓名
println("查询没学过\"张三\"老师讲授的任一门课程的学生姓名")
val courseArr = teacherFrame.filter($"t_teacherName" === "张三")
.join(courseFrame, $"t_teacherId" === $"c_teacherId")
.select($"c_courseId").collect().map(_.toSeq.head.toString.toInt)
val stuArr: Array[Int] = scoreSrcFrame
.filter($"s_courseId".isInCollection(courseArr))
.dropDuplicates("s_stuId")
.collect().map(_.toSeq.head.toString.toInt)
studentFrame.filter(!$"stu_stuId".isInCollection(stuArr)).select("*").show()
println("========================================================================")
//练习10:查询两门及其以上不及格课程的同学的学号,姓名及其平均成绩
println("查询两门及其以上不及格课程的同学的学号,姓名及其平均成绩")
scoreSrcFrame.filter($"s_score".lt(60))
.groupBy($"s_stuId")
.agg(count($"s_stuId").as("cnt"),avg($"s_score").as("avgScore"))
.filter($"cnt".geq(2))
.join(studentFrame,$"s_stuId"===$"stu_stuId","inner")
.select($"s_stuId",$"stu_stuName",$"avgScore")
.show()
println("========================================================================")
//练习11:检索" 01 "课程分数小于 60,按分数降序排列的学生信息
println("检索\" 01 \"课程分数小于 60,按分数降序排列的学生信息")
scoreSrcFrame.filter($"s_courseId"===1 and $"s_score".lt(60))
.join(studentFrame,$"s_stuId"===$"stu_stuId")
.select($"stu_stuId",$"stu_stuName",$"stu_gender",$"s_score")
.orderBy(desc("s_score"))
.show()
println("========================================================================")
//练习12:按平均成绩从高到低显示所有学生的所有课程的成绩以及平均成绩
println("按平均成绩从高到低显示所有学生的所有课程的成绩以及平均成绩")
scoreSrcFrame.groupBy($"s_stuId")
.agg(avg($"s_score").as("avgScore"),
collect_set(concat_ws(":", $"s_courseId",$"s_score")).as("score"))
.select($"s_stuId",$"score",round($"avgScore",2).as("avgScore"))
.orderBy(desc("avgScore"))
.show()
println("========================================================================")
//练习13:查询各科成绩最高分、最低分和平均分
println("查询各科成绩最高分、最低分和平均分")
scoreSrcFrame.groupBy($"s_courseId")
.agg(max($"s_score").as("maxScore"),
min($"s_score").as("minScore"),
avg(s"s_score").as("avgScore"))
.select($"s_courseId",$"maxScore",$"minScore",$"avgScore")
.orderBy($"s_courseId")
.show()
println("========================================================================")
//练习14:要求输出课程号和选修人数,查询结果按人数降序排列,若人数相同,按课程号升序排列
println("要求输出课程号和选修人数,查询结果按人数降序排列,若人数相同,按课程号升序排列")
scoreSrcFrame.groupBy($"s_courseId")
.agg(count($"s_courseId").as("cnt"))
.orderBy(desc("cnt"),$"s_courseId")
.show()
println("========================================================================")
//练习15:按各科成绩进行排序,并显示排名, Score 重复时并列,排名连续
println("按各科成绩进行排序,并显示排名, Score 重复时并列,排名连续")
scoreSrcFrame.select($"s_courseId",$"s_stuId",$"s_score",
dense_rank().over(Window.partitionBy($"s_courseId")
.orderBy(desc("s_score"))).as("rank"))
.show()
println("========================================================================")
//练习16:按各科成绩进行排序,并显示排名, Score 重复时并列,排名重复有空缺
println("按各科成绩进行排序,并显示排名, Score 重复时并列,排名重复有空缺")
scoreSrcFrame.select($"s_courseId",$"s_stuId",$"s_score",
rank().over(Window.partitionBy($"s_courseId")
.orderBy(desc("s_score"))).as("rank"))
.show()
println("========================================================================")
//练习17:查询学生的总成绩,并进行排名,总分重复时保留名次空缺
println("查询学生的总成绩,并进行排名,总分重复时保留名次空缺")
scoreSrcFrame.groupBy($"s_stuId").agg(sum($"s_score").as("totalScore"))
.select($"s_stuId",$"totalScore",
rank().over(Window.orderBy(desc("totalScore"))).as("rank"))
.show()
println("========================================================================")
//练习18:查询各科成绩前三名的记录(不考虑并列)
println("查询各科成绩前三名的记录(不考虑并列)")
scoreSrcFrame.select($"s_courseId",$"s_stuId",$"s_score",
row_number().over(Window.partitionBy($"s_courseId")
.orderBy($"s_score".desc)).as("rank"))
.filter($"rank".leq(3))
.show()
println("========================================================================")
//练习19:查询各科成绩前三名的记录(考虑并列,不考虑空缺)
println("查询各科成绩前三名的记录(考虑并列,不考虑空缺)")
scoreSrcFrame.select($"s_courseId",$"s_stuId",$"s_score",
dense_rank().over(Window.partitionBy($"s_courseId")
.orderBy($"s_score".desc)).as("rank"))
.filter($"rank".leq(3))
.show()
println("========================================================================")
//练习20:查询各科成绩前三名的记录(考虑并列,考虑空缺)
println("查询各科成绩前三名的记录(考虑并列,考虑空缺)")
scoreSrcFrame.select($"s_courseId",$"s_stuId",$"s_score",
rank().over(Window.partitionBy($"s_courseId")
.orderBy($"s_score".desc)).as("rank"))
.filter($"rank".leq(3))
.show()
println("========================================================================")
//练习21:查询课程名称为「数学」,且分数低于 60 的学生姓名和分数
println("查询课程名称为「数学」,且分数低于 60 的学生姓名和分数")
scoreSrcFrame.filter($"s_courseId"===courseFrame.filter($"c_course"==="数学")
.select($"c_courseId").collect()
.map(_.toSeq.head.toString.toInt).head and $"s_score".lt(60))
.join(studentFrame,$"s_stuId"===$"stu_stuId","inner")
.select($"stu_stuName",$"s_score")
.orderBy(desc("s_score"))
.show()
sc.stop()
spark.close()
}
}
更多推荐
所有评论(0)