Spark大数据-spark streaming输出操作
spark streaming输出操作其他部分与转换操作一样,只需要添加输出保存部分,保存为文件使用saveAsTextFiles,输出到mysql数据库。object NetworkWordCountStateful {def main(args: Array[String]) {//定义状态更新函数val updateFunc = (values: Seq[Int...
·
spark streaming输出操作
其他部分与转换操作一样,只需要添加输出保存部分,保存为文件使用saveAsTextFiles,输出到mysql数据库。
object NetworkWordCountStateful {
def main(args: Array[String]) {
//定义状态更新函数
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
StreamingExamples.setStreamingLogLevels() //设置log4j日志级别
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")
val sc = new StreamingContext(conf, Seconds(5))
sc.checkpoint("home/ziyu_bigdata/quick_learn_spark/checkpoint") //设置检查点,检查点具有容错机制
val lines = sc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))
val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
// 结果打印
stateDstream.print()
// 运行的词频统计结果存储到txt文件中
stateDstream.saveAsTextFiles("home/ziyu_bigdata/quick_learn_spark/DstreamOutput/output.txt")
// 怎样写入mysql数据库
stateDstream.foreachRDD(
rdd => {
def func(records:Iterator[(String,Int)]){
var conn:Connection=null
var stmt:PreparedStatement=null
try{
val url="jdbc:mysql://localhost:3306/spark"
val user="root"
val password="hadoop"
conn=DirverManager.getConnection(url,user,password)
records.foreach(p=>{
val sql="inser into wordcount(word,count) values(?,?)"
stmt=conn.PrepareStatement(sql)
// key
stmt.setString(1,p._1.trim)
// value
stmt.setInt(2,p._2.toInt)
stmt.executeUpdate()
})
}catch{
case e:Exception => e.ptintStackTrace()
}finally{
if(stmt!=null){
stmt.close()
stmt=null
}
if(conn!=null){
conn.close()
conn=null
}
}
}
// 对RDD重新分区,对新分区后的使用func函数处理
val repartitionedRDD=rdd.repartition(3)
repartitionedRDD.foreachPartition(func)
})
sc.start()
sc.awaitTermination()
}
}
更多推荐
所有评论(0)