说明,感谢亮哥长期对我的帮助,此处多篇文章均为亮哥带我整理。以及参考诸多博主的文章。如果侵权,请及时指出,我会立马停止该行为;如有不足之处,还请大佬不吝指教,以期共同进步。

1. Spark Streaming

image

Spark Streaming是Spark Core的扩展应用,它具有可扩展,高吞吐量,对于流数据的可容错性等特点。可以监控来自Kafka,Flumn,HDFS。Kinesis,Twitter,ZeroMQ或者Scoket套接字的数据通过复杂的算法以及一系列的计算分析数据,并且可以将分析结果存入到HDFS文件系统,数据库以及前端页面中。
- 高可扩展性,可以运行在上百台机器上(Scales to hundreds of nodes)
- 低延迟,可以在秒级别上对数据进行处理(Achieves low latency)
- 高可容错性(Efficiently recover from failures)
- 能够集成并行计算程序,比如Spark Core(Integrates with batch and interactive processing)

1.1 Spark Streaming工作原理

对于Spark Core它的核心就是RDD,对于Spark Streaming来说,它的核心是DStream,DStream类似于RDD,它实质上一系列的RDD的集合,DStream可以按照秒数将数据流进行批量的划分。首先从接收到流数据之后,将其划分为多个batch,然后提交给Spark集群进行计算,最后将结果批量输出到HDFS或者数据库以及前端页面展示等等

参考
参考

1.1 Spark Streaming如何读取Kafka中数据?

一种是利用接收器(receiver)和kafaka的高层API实现
一种是不利用接收器,直接用kafka底层的API来实现(spark1.3以后引入)
Receiver 方式
Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。

然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。

object KafkaRecciver {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    val conf = new SparkConf().setAppName("SparkFlumeNGWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.checkpoint("hdfs://bigdata111:9000/checkpoint")
    //创建kafka对象   生产者 和消费者 
    //模式1 采取的是 receiver 方式  reciver 每次只能读取一条记录
    val topic = Map("mydemo2" -> 1)
    //直接读取的方式  由于kafka 是分布式消息系统需要依赖Zookeeper
    val data = KafkaUtils.createStream(ssc, "192.168.128.111:2181", "mygroup", topic, StorageLevel.MEMORY_AND_DISK)
    //数据累计计算
    val updateFunc =(curVal:Seq[Int],preVal:Option[Int])=>{
    //进行数据统计当前值加上之前的值
    var total = curVal.sum
    //最初的值应该是0
    var previous = preVal.getOrElse(0)
    //Some 代表最终的返回值
    Some(total+previous)
  }
   val result = data.map(_._2).flatMap(_.split(" ")).map(word=>(word,1)).updateStateByKey(updateFunc).print()
   //启动ssc
   ssc.start()
   ssc.awaitTermination()
    
  }

Direct方式
这种新的直接方式,是在Spark1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。

object KafkaDirector {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    //构建conf ssc 对象
    val conf = new SparkConf().setAppName("Kafka_director").setMaster("local[2]")
    val ssc = new StreamingContext(conf,Seconds(3))
    //设置数据检查点进行累计统计单词
    ssc.checkpoint("hdfs://192.168.128.111:9000/checkpoint")
    //kafka 需要Zookeeper  需要消费者组
    val topics = Set("mydemo2")
    //                                     broker的原信息                                  ip地址以及端口号
    val kafkaPrams = Map[String,String]("metadata.broker.list" -> "192.168.128.111:9092")
    //                                          数据的输入了类型    数据的解码类型
    val data = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaPrams, topics)
    val updateFunc =(curVal:Seq[Int],preVal:Option[Int])=>{
      //进行数据统计当前值加上之前的值
      var total = curVal.sum
      //最初的值应该是0
      var previous = preVal.getOrElse(0)
      //Some 代表最终的但会值
      Some(total+previous)
    }
    //统计结果
    val result = data.map(_._2).flatMap(_.split(" ")).map(word=>(word,1)).updateStateByKey(updateFunc).print() 
    //启动程序
    ssc.start()
    ssc.awaitTermination()
    
  }

参考
参考
参考
参考
参考

1.2 Spark Streaming编写步骤

1. 通过创建输入DStream来定义输入源。
2. 通过对DStream应用转换操作和输出操作来丁意思流计算
3. 用streamingContext.start()来开始接收数据和处理流程
4. 通过streamingContext.awaitTermination()方法来等待处理结束(手动结束或因为错误而结束)
6. 可以通过streamingContext.stop()来手动结束流计算进程
Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐