Spark的三大数据结构-RDD并行度与分区

默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能够并行计算的任务数量我们称之为并行度。这个数量可以在构建 RDD 时指定。

// TODO 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark") 
val sparkContext = new SparkContext(sparkConf) 

// TODO 创建RDD
val dataRDD: RDD[Int] = 
    sparkContext.makeRDD( 
        List(1,2,3,4), 
        4) 
val fileRDD: RDD[String] = 
    sparkContext.textFile( 
        "input", 
        2) 
fileRDD.collect().foreach(println) 

// 资源关闭
sparkContext.stop() 

一、数据源为集合(内存)时

1:分区的设定:

// TODO 准备环境
//"local[*]"代表本机的最大线程,如果不写,就是单线程
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark") 
val sparkContext = new SparkContext(sparkConf) 

// TODO 创建RDD
// RDD的并行度 & 分区
// makeRDD方法可以传递第二个擦拭农户,这个参数表示分区的数量
// 第二个参数可以不传递,使用的就是默认值:defaultParallelism(默认并行度)
// spark在默认情况下,从配置对象中获取配置参数:spark.default.parallelism
// 如果获取不到,那么使用totalCores属性,这个属性取值为当前运行环境的最大可用核数
// spark.default.parallelism这个参数是可以配的sparkConf(saprk.default.parallelism",5)
val rdd = sparkContext.makeRDD( 
    List(1,2,3,4)l, 2
) 

// 将处理的数据保存成分区文件
rdd.saveAsTextFile("output")

//资源关闭
sparkContext.stop() 

说明:

  1. makeRDD方法可以传递第二个擦拭农户,这个参数表示分区的数量
  2. 第二个参数可以不传递,使用的就是默认值:defaultParallelism(默认并行度)
  3. spark在默认情况下,从配置对象中获取配置参数:spark.default.parallelism
  4. 如果获取不到,那么使用totalCores属性,这个属性取值为当前运行环境的最大可用核数
  5. spark.default.parallelism这个参数是可以配的sparkConf(saprk.default.parallelism",5)

2:分区数据的分配

加入有一个集合是List(1,2,3,4,5),List的length为5,想要分成3个区,那么每个区的数据该怎么分配?

设计到分配的问题,在底层就涉及到了分割方法,该方法包括了几个属性,分别是start,end,numSlices,length

start = ((i * length)) / numSlices).toInt

end = (((I + 1) * length) / numSlices).toInt

比如数据List(1,2,3,4,5),length = 5,numSlices = 3

那么分区数据分配就是:

分区 计算过程 分区数据
0 (((0 * 5) / 3) = 0,(((0 + 1) * 5) / 3) = 1) => (0,1) 1
1 (((1 * 5) / 3) = 1,(((2 + 1) * 5) / 3) = 3) => (1,3) 2,3
2 (((2 * 5) / 3) = 3,(((2 + 1) * 5) / 3) = 5) => (3,5) 4,5

二、数据源为文件时

1:分区的设定

// TODO 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark") 
val sparkContext = new SparkContext(sparkConf) 

//TODO 创建RDD
// textFile可以将文件作为数据处理的数据源,默认也可以设置分区
// minPartitions:最小分区数量
// 分区数是根据 math.min(defaultParallelism, 2) 判定
// 如果不想使用默认的分区数量,就可以通过第二个参数指定分区数
// spark读取文件,底层使用的是hadoop的读取方式
val fileRDD: RDD[String] = sparkContext.textFile("input/test.txt",2) 

// 将处理的数据保存成分区文件
rdd.saveAsTextFile("output")

//TODO 资源关闭
sparkContext.stop() 

说明:

  1. textFile可以将文件作为数据处理的数据源,默认也可以设置分区
  2. minPartitions:最小分区数量
  3. 分区数是根据 math.min(defaultParallelism, 2) 判定
  4. 如果不想使用默认的分区数量,就可以通过第二个参数指定分区数
  5. spark读取文件,底层使用的是hadoop的读取方式

2:分区数据的分配

假设有一个文件,文件内容是

1
2
3

以上的这个文件的字节数为 7 ,因为回车占据2个字节,所以为 7 字节

那么如果在设置上的时候设置为两个分区,那么最后的分区结果是两个吗?而且每个分区的数据应该是什么样的呢?

分区数量的计算方式:

在spark的底层计算文件数据源的分区数据时,设计到的主要属性有:totalSize,goalsize

totalSize就是字节数,比如上述的文件字节数为7,那么这时的totalSize = 7

goalSize就是目标字节,就是每个分区应该计划存放多少字节,计算方式是 totalSize / 分区数,goalSize = 7 / 2 = 3(字节)

那么我们所要需要的分区数,应该是采用的是math.min(defaultParallelism, 2),

总共7个字节,每个分区设置的文件为3个字节,那么就需要 7 / 3 = 2 … 1,就需要2个分区但是剩下了一个字节,一个分区是3个字节,那么一个字节所占一个分区的1 / 3 = 0.33333,根据hadoop的分区1.1原则,如果超过了分区的0.1就需要增加一个分区,所以最后的分区应该是 7 / 3 = 2 … 1 + 1 = 3(分区)

最后所需要的分区为3个,尽管在设计的时候分区为2个,那么在最小分区数量的限制下,最后是需要三个分区

那么分区的数据是怎么分配的呢?

按照分区的预定存数大小字节,每个分区存储3个字节,那么就出现了

分区 数据范围 存储数据
0 [0,3] 1_ _ 2
1 [3,6] _ _ 3
2 [6,7]

_ _ :代表回车的两个字节

说明:

数据范围三个字节,[0,3]属于前后都是闭合,也就是第一个文件实际读取的是四个字节,而不同分区读取数据时候不会出现重复读的情况,就是在0分区读取完索引3之后,尽管在1分区也应该读取这个索引,但是因为0分区已经读取过,则1分区不会再次重复读取,最后的结果就是2分区为空,即没有数据

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐