现有学生成绩表、学生信息表、老师信息表和课程信息表,根据要求得到相应的结果

 

import java.io.{FileWriter, PrintWriter}

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

object Practice extends Serializable {
  case class Course(c_courseId:Int,c_course:String,c_teacherId:Int)
  case class Teacher(t_teacherId:Int,t_teacherName:String)
  case class Student(stu_stuId:Int,stu_stuName:String,stu_birthday:String,stu_gender:String)
  case class Score(s_stuId:Int,s_courseId:Int,s_score:Float)
  case class Stu_course(_stuId:Int,_courseId:Int)
  
  //隐式类,为了方便类型转换
  implicit class StrToAny(v:String){
    val ps = v.split(",")
    def toCourse = {
      Course(ps(0).toInt,ps(1),ps(2).toInt)
    }

    def toTeacher = {
      Teacher(ps(0).toInt,ps(1))
    }

    def toStudent = {
      Student(ps(0).toInt,ps(1),ps(2),ps(3))
    }

    def toScore = {
      Score(ps(0).toInt,ps(1).toInt,ps(2).toFloat)
    }

    def toScoreSrc = {
      Stu_course(ps(0).toInt,ps(1).toInt)
    }
  }
  def main(args: Array[String]): Unit = {
    //创建维度表,用于补全学生成绩
    /*val writer = new PrintWriter(new FileWriter("file/scoreSrc.txt"))
    for (stuId <- 1 to 8;courseId <- 1 to 3) {
      writer.println(s"$stuId,$courseId")
    }
    writer.close()*/

    val conf = new SparkConf().setMaster("local[*]").setAppName("practice")
    val spark = SparkSession.builder().config(conf).getOrCreate()

    import spark.implicits._
    val sc = spark.sparkContext
    val courseRdd: RDD[Course] = sc.textFile("file:///D:\\Study\\13_spark\\spark_sql\\file\\course.txt")
      .mapPartitions(_.map(_.toCourse))
    val scoreRdd: RDD[Score] = sc.textFile("file:///D:\\Study\\13_spark\\spark_sql\\file\\score.txt")
      .mapPartitions(_.map(_.toScore))
    val studentRdd: RDD[Student] = sc.textFile("file:///D:\\Study\\13_spark\\spark_sql\\file\\student.txt")
      .mapPartitions(_.map(_.toStudent))
    val teacherRdd = sc.textFile("file:///D:\\Study\\13_spark\\spark_sql\\file\\teacher.txt")
      .mapPartitions(_.map(_.toTeacher))
    val stu_courseRdd: RDD[Stu_course] = sc.textFile("file:///D:\\Study\\13_spark\\spark_sql\\file\\scoreSrc.txt")
      .mapPartitions(_.map(_.toScoreSrc))

    val courseFrame: DataFrame = spark.createDataFrame(courseRdd)
    val scoreSrcFrame: DataFrame = spark.createDataFrame(scoreRdd)
    val studentFrame: DataFrame = spark.createDataFrame(studentRdd)
    val teacherFrame: DataFrame = spark.createDataFrame(teacherRdd)
    val stu_courseFrame: DataFrame = spark.createDataFrame(stu_courseRdd)
    //stu_courseFrame.show(50)
    //补全成绩
    val scoreFrame: DataFrame = scoreSrcFrame.join(stu_courseFrame, $"s_stuId" === $"_stuId" and $"s_courseId" === $"_courseId", "right")
      .select($"_stuId".as("s_stuId"), $"_courseId".as("s_courseId"), when($"s_score".isNull, 0.0f).otherwise($"s_score").as("s_score"))
      .orderBy("s_stuId", "s_courseId").toDF()

    //练习1:查询" 01 "课程比" 02 "课程成绩高的学生的信息(没有成绩不做比较)
    println("查询\" 01 \"课程比\" 02 \"课程成绩高的学生的信息及课程分数(没有成绩不做比较)")
    scoreFrame.filter($"s_courseId"===1)
      .join(scoreFrame.filter($"s_courseId"===2 and $"s_score".gt(0.0f))
        .select($"s_stuId".as("_s_stuId"),$"s_score".as("_s_score")),
        $"s_stuId"===$"_s_stuId","inner")
      .filter($"s_score".gt($"_s_score"))
      .join(studentFrame,$"s_stuId"===$"stu_stuId","inner")
      .select($"stu_stuId",$"stu_stuName")
      .show()
    println("========================================================================")

    //练习2:查询平均成绩大于等于 60 分的同学的学生编号和学生姓名和平均成绩(无成绩不参与平均值计算)
    println("查询平均成绩大于等于 60 分的同学的学生编号和学生姓名和平均成绩(无成绩不参与平均值计算)")
    scoreSrcFrame.groupBy($"s_stuId").agg(avg($"s_score").as("avgScore"))
      .filter($"avgScore".gt(60.0f))
      .join(studentFrame,$"s_stuId"===$"stu_stuId","inner")
      .select("*").orderBy(desc("avgScore"))
      .show()
    println("========================================================================")

    //练习3:查询所有同学的学生编号、学生姓名、选课总数、所有课程的总成绩(没成绩的显示为 null )
    scoreSrcFrame.groupBy($"s_stuId")
      .agg(count($"s_courseId").as("cnt"),
      sum($"s_score").as("sumScore"))
      .join(studentFrame,$"s_stuId"===$"stu_stuId","right")
        .select($"stu_stuId",$"stu_stuName",
          when($"cnt".isNull,null).otherwise($"cnt").as("cnt"),
          when($"sumScore".isNull,null).otherwise($"sumScore").as("sumScore"))
      .orderBy(desc("sumScore"))
        .show()
    println("========================================================================")

    //练习4:查询「李」姓老师的数量
    println("查询「李」姓老师的数量")
    println(teacherFrame.filter($"t_teacherName".like("李_"))
      .count())
    println("========================================================================")

    //练习5:查询学过「张三」老师授课的同学的信息
    teacherFrame.filter($"t_teacherName"==="张三")
        .join(courseFrame,$"c_teacherId"===$"t_teacherId","inner")
        .join(scoreSrcFrame,$"c_courseId"===$"s_courseId","inner")
        .join(studentFrame,$"stu_stuId"===$"s_stuId")
        .select($"stu_stuId",$"stu_stuName",$"stu_gender")
        .show()
    println("========================================================================")

    //练习6:查询没有学全所有课程的同学的信息
    println("查询没有学全所有课程的同学的信息")
    scoreFrame.filter($"s_score"===0.0f)
      .dropDuplicates("s_stuId")
      .join(studentFrame,$"s_stuId"===$"stu_stuId")
      .select($"stu_stuId",$"stu_stuName",$"stu_gender")
      .show()
    println("========================================================================")

    //练习7:查询至少有一门课与学号为" 01 "的同学所学相同的同学的信息
    println("查询至少有一门课与学号为\" 01 \"的同学所学相同的同学的信息")
    scoreSrcFrame.filter($"s_stuId"=!=1)
        .select($"s_stuId".as("stuId"),$"s_courseId".as("courseId"))
      .join(scoreFrame.filter($"s_stuId"===1).select($"s_courseId"),$"courseId"===$"s_courseId","inner")
      .select($"stuId").dropDuplicates("stuId")
      .join(studentFrame,$"stuId"===$"stu_stuId","inner")
      .select($"stu_stuId",$"stu_stuName",$"stu_gender")
      .show()
    println("========================================================================")

    //练习8:查询和" 01 "号的同学学习的课程 完全相同的其他同学的信息
    println("查询和\" 01 \"号的同学学习的课程 完全相同的其他同学的信息")
    val arr = scoreSrcFrame.filter($"s_stuId" === 1).select($"s_courseId").collect().map(_.toSeq.head.toString.toInt)
    scoreSrcFrame.filter($"s_stuId"=!=1 and $"s_courseId".isInCollection(arr))
      .groupBy($"s_stuId")
      .agg(count($"s_stuId").as("cnt"))
        .filter($"cnt"===arr.length)
        .join(studentFrame,$"s_stuId"===$"stu_stuId","inner")
        .select($"stu_stuId",$"stu_stuName",$"stu_gender")
        .show()
    println("========================================================================")

    //意外收获:行转列,没有补空值
    scoreSrcFrame.groupBy("s_stuId").pivot($"s_courseId").sum("s_score").show()

    //练习9:查询没学过"张三"老师讲授的任一门课程的学生姓名
    println("查询没学过\"张三\"老师讲授的任一门课程的学生姓名")
    val courseArr = teacherFrame.filter($"t_teacherName" === "张三")
      .join(courseFrame, $"t_teacherId" === $"c_teacherId")
      .select($"c_courseId").collect().map(_.toSeq.head.toString.toInt)
    val stuArr: Array[Int] = scoreSrcFrame
      .filter($"s_courseId".isInCollection(courseArr))
      .dropDuplicates("s_stuId")
      .collect().map(_.toSeq.head.toString.toInt)
    studentFrame.filter(!$"stu_stuId".isInCollection(stuArr)).select("*").show()
    println("========================================================================")

    //练习10:查询两门及其以上不及格课程的同学的学号,姓名及其平均成绩
    println("查询两门及其以上不及格课程的同学的学号,姓名及其平均成绩")
    scoreSrcFrame.filter($"s_score".lt(60))
      .groupBy($"s_stuId")
      .agg(count($"s_stuId").as("cnt"),avg($"s_score").as("avgScore"))
        .filter($"cnt".geq(2))
        .join(studentFrame,$"s_stuId"===$"stu_stuId","inner")
        .select($"s_stuId",$"stu_stuName",$"avgScore")
        .show()
    println("========================================================================")

    //练习11:检索" 01 "课程分数小于 60,按分数降序排列的学生信息
    println("检索\" 01 \"课程分数小于 60,按分数降序排列的学生信息")
    scoreSrcFrame.filter($"s_courseId"===1 and $"s_score".lt(60))
        .join(studentFrame,$"s_stuId"===$"stu_stuId")
        .select($"stu_stuId",$"stu_stuName",$"stu_gender",$"s_score")
        .orderBy(desc("s_score"))
        .show()
    println("========================================================================")

    //练习12:按平均成绩从高到低显示所有学生的所有课程的成绩以及平均成绩
    println("按平均成绩从高到低显示所有学生的所有课程的成绩以及平均成绩")
    scoreSrcFrame.groupBy($"s_stuId")
      .agg(avg($"s_score").as("avgScore"),
        collect_set(concat_ws(":", $"s_courseId",$"s_score")).as("score"))
        .select($"s_stuId",$"score",round($"avgScore",2).as("avgScore"))
        .orderBy(desc("avgScore"))
        .show()
    println("========================================================================")

    //练习13:查询各科成绩最高分、最低分和平均分
    println("查询各科成绩最高分、最低分和平均分")
    scoreSrcFrame.groupBy($"s_courseId")
      .agg(max($"s_score").as("maxScore"),
        min($"s_score").as("minScore"),
        avg(s"s_score").as("avgScore"))
      .select($"s_courseId",$"maxScore",$"minScore",$"avgScore")
      .orderBy($"s_courseId")
      .show()
    println("========================================================================")

    //练习14:要求输出课程号和选修人数,查询结果按人数降序排列,若人数相同,按课程号升序排列
    println("要求输出课程号和选修人数,查询结果按人数降序排列,若人数相同,按课程号升序排列")
    scoreSrcFrame.groupBy($"s_courseId")
      .agg(count($"s_courseId").as("cnt"))
      .orderBy(desc("cnt"),$"s_courseId")
      .show()
    println("========================================================================")

    //练习15:按各科成绩进行排序,并显示排名, Score 重复时并列,排名连续
    println("按各科成绩进行排序,并显示排名, Score 重复时并列,排名连续")
    scoreSrcFrame.select($"s_courseId",$"s_stuId",$"s_score",
      dense_rank().over(Window.partitionBy($"s_courseId")
        .orderBy(desc("s_score"))).as("rank"))
        .show()
    println("========================================================================")

    //练习16:按各科成绩进行排序,并显示排名, Score 重复时并列,排名重复有空缺
    println("按各科成绩进行排序,并显示排名, Score 重复时并列,排名重复有空缺")
    scoreSrcFrame.select($"s_courseId",$"s_stuId",$"s_score",
      rank().over(Window.partitionBy($"s_courseId")
        .orderBy(desc("s_score"))).as("rank"))
        .show()
    println("========================================================================")

    //练习17:查询学生的总成绩,并进行排名,总分重复时保留名次空缺
    println("查询学生的总成绩,并进行排名,总分重复时保留名次空缺")
    scoreSrcFrame.groupBy($"s_stuId").agg(sum($"s_score").as("totalScore"))
        .select($"s_stuId",$"totalScore",
          rank().over(Window.orderBy(desc("totalScore"))).as("rank"))
        .show()
    println("========================================================================")

    //练习18:查询各科成绩前三名的记录(不考虑并列)
    println("查询各科成绩前三名的记录(不考虑并列)")
    scoreSrcFrame.select($"s_courseId",$"s_stuId",$"s_score",
      row_number().over(Window.partitionBy($"s_courseId")
        .orderBy($"s_score".desc)).as("rank"))
      .filter($"rank".leq(3))
      .show()
    println("========================================================================")

    //练习19:查询各科成绩前三名的记录(考虑并列,不考虑空缺)
    println("查询各科成绩前三名的记录(考虑并列,不考虑空缺)")
    scoreSrcFrame.select($"s_courseId",$"s_stuId",$"s_score",
      dense_rank().over(Window.partitionBy($"s_courseId")
        .orderBy($"s_score".desc)).as("rank"))
      .filter($"rank".leq(3))
      .show()
    println("========================================================================")

    //练习20:查询各科成绩前三名的记录(考虑并列,考虑空缺)
    println("查询各科成绩前三名的记录(考虑并列,考虑空缺)")
    scoreSrcFrame.select($"s_courseId",$"s_stuId",$"s_score",
      rank().over(Window.partitionBy($"s_courseId")
        .orderBy($"s_score".desc)).as("rank"))
      .filter($"rank".leq(3))
      .show()
    println("========================================================================")

    //练习21:查询课程名称为「数学」,且分数低于 60 的学生姓名和分数
    println("查询课程名称为「数学」,且分数低于 60 的学生姓名和分数")
    scoreSrcFrame.filter($"s_courseId"===courseFrame.filter($"c_course"==="数学")
      .select($"c_courseId").collect()
      .map(_.toSeq.head.toString.toInt).head and $"s_score".lt(60))
        .join(studentFrame,$"s_stuId"===$"stu_stuId","inner")
        .select($"stu_stuName",$"s_score")
        .orderBy(desc("s_score"))
        .show()

    sc.stop()
    spark.close()
  }
}

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐