spark streaming输出操作

其他部分与转换操作一样,只需要添加输出保存部分,保存为文件使用saveAsTextFiles,输出到mysql数据库。

object NetworkWordCountStateful {
  def main(args: Array[String]) {
    //定义状态更新函数
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }
        StreamingExamples.setStreamingLogLevels()  //设置log4j日志级别
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")
    val sc = new StreamingContext(conf, Seconds(5))
    sc.checkpoint("home/ziyu_bigdata/quick_learn_spark/checkpoint")    //设置检查点,检查点具有容错机制
    val lines = sc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))
    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
//       结果打印
    stateDstream.print()
      
//       运行的词频统计结果存储到txt文件中
    stateDstream.saveAsTextFiles("home/ziyu_bigdata/quick_learn_spark/DstreamOutput/output.txt")
      
//       怎样写入mysql数据库
    stateDstream.foreachRDD(
    rdd => {
        
        def func(records:Iterator[(String,Int)]){
            var conn:Connection=null
            var stmt:PreparedStatement=null
            try{
                val url="jdbc:mysql://localhost:3306/spark"
                val user="root"
                val password="hadoop"
                conn=DirverManager.getConnection(url,user,password)
                records.foreach(p=>{
                    val sql="inser into wordcount(word,count) values(?,?)"
                    stmt=conn.PrepareStatement(sql)
//                     key
                    stmt.setString(1,p._1.trim)
//                     value
                    stmt.setInt(2,p._2.toInt)
                    stmt.executeUpdate()
                })
                
            }catch{
                case e:Exception => e.ptintStackTrace()
            }finally{
                if(stmt!=null){
                    stmt.close()
                    stmt=null
                }
                if(conn!=null){
                    conn.close()
                    conn=null
                }
            }
        }
        
//         对RDD重新分区,对新分区后的使用func函数处理
        val repartitionedRDD=rdd.repartition(3)
        repartitionedRDD.foreachPartition(func)
        
    })
      
    sc.start()
    sc.awaitTermination()
  }
}
Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐