Spark的三大数据结构-RDD并行度与分区
Spark的三大数据结构-RDD并行度与分区默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能够并行计算的任务数量我们称之为并行度。这个数量可以在构建 RDD 时指定。// TODO 准备环境val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")val spa
Spark的三大数据结构-RDD并行度与分区
默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能够并行计算的任务数量我们称之为并行度。这个数量可以在构建 RDD 时指定。
// TODO 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
// TODO 创建RDD
val dataRDD: RDD[Int] =
sparkContext.makeRDD(
List(1,2,3,4),
4)
val fileRDD: RDD[String] =
sparkContext.textFile(
"input",
2)
fileRDD.collect().foreach(println)
// 资源关闭
sparkContext.stop()
一、数据源为集合(内存)时
1:分区的设定:
// TODO 准备环境
//"local[*]"代表本机的最大线程,如果不写,就是单线程
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
// TODO 创建RDD
// RDD的并行度 & 分区
// makeRDD方法可以传递第二个擦拭农户,这个参数表示分区的数量
// 第二个参数可以不传递,使用的就是默认值:defaultParallelism(默认并行度)
// spark在默认情况下,从配置对象中获取配置参数:spark.default.parallelism
// 如果获取不到,那么使用totalCores属性,这个属性取值为当前运行环境的最大可用核数
// spark.default.parallelism这个参数是可以配的sparkConf(saprk.default.parallelism",5)
val rdd = sparkContext.makeRDD(
List(1,2,3,4)l, 2
)
// 将处理的数据保存成分区文件
rdd.saveAsTextFile("output")
//资源关闭
sparkContext.stop()
说明:
- makeRDD方法可以传递第二个擦拭农户,这个参数表示分区的数量
- 第二个参数可以不传递,使用的就是默认值:defaultParallelism(默认并行度)
- spark在默认情况下,从配置对象中获取配置参数:spark.default.parallelism
- 如果获取不到,那么使用totalCores属性,这个属性取值为当前运行环境的最大可用核数
- spark.default.parallelism这个参数是可以配的sparkConf(saprk.default.parallelism",5)
2:分区数据的分配
加入有一个集合是List(1,2,3,4,5),List的length为5,想要分成3个区,那么每个区的数据该怎么分配?
设计到分配的问题,在底层就涉及到了分割方法,该方法包括了几个属性,分别是start,end,numSlices,length
start = ((i * length)) / numSlices).toInt
end = (((I + 1) * length) / numSlices).toInt
比如数据List(1,2,3,4,5),length = 5,numSlices = 3
那么分区数据分配就是:
分区 | 计算过程 | 分区数据 |
---|---|---|
0 | (((0 * 5) / 3) = 0,(((0 + 1) * 5) / 3) = 1) => (0,1) | 1 |
1 | (((1 * 5) / 3) = 1,(((2 + 1) * 5) / 3) = 3) => (1,3) | 2,3 |
2 | (((2 * 5) / 3) = 3,(((2 + 1) * 5) / 3) = 5) => (3,5) | 4,5 |
二、数据源为文件时
1:分区的设定
// TODO 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
//TODO 创建RDD
// textFile可以将文件作为数据处理的数据源,默认也可以设置分区
// minPartitions:最小分区数量
// 分区数是根据 math.min(defaultParallelism, 2) 判定
// 如果不想使用默认的分区数量,就可以通过第二个参数指定分区数
// spark读取文件,底层使用的是hadoop的读取方式
val fileRDD: RDD[String] = sparkContext.textFile("input/test.txt",2)
// 将处理的数据保存成分区文件
rdd.saveAsTextFile("output")
//TODO 资源关闭
sparkContext.stop()
说明:
- textFile可以将文件作为数据处理的数据源,默认也可以设置分区
- minPartitions:最小分区数量
- 分区数是根据 math.min(defaultParallelism, 2) 判定
- 如果不想使用默认的分区数量,就可以通过第二个参数指定分区数
- spark读取文件,底层使用的是hadoop的读取方式
2:分区数据的分配
假设有一个文件,文件内容是
1
2
3
以上的这个文件的字节数为 7 ,因为回车占据2个字节,所以为 7 字节
那么如果在设置上的时候设置为两个分区,那么最后的分区结果是两个吗?而且每个分区的数据应该是什么样的呢?
分区数量的计算方式:
在spark的底层计算文件数据源的分区数据时,设计到的主要属性有:totalSize,goalsize
totalSize就是字节数,比如上述的文件字节数为7,那么这时的totalSize = 7
goalSize就是目标字节,就是每个分区应该计划存放多少字节,计算方式是 totalSize / 分区数,goalSize = 7 / 2 = 3(字节)
那么我们所要需要的分区数,应该是采用的是math.min(defaultParallelism, 2),
总共7个字节,每个分区设置的文件为3个字节,那么就需要 7 / 3 = 2 … 1,就需要2个分区但是剩下了一个字节,一个分区是3个字节,那么一个字节所占一个分区的1 / 3 = 0.33333,根据hadoop的分区1.1原则,如果超过了分区的0.1就需要增加一个分区,所以最后的分区应该是 7 / 3 = 2 … 1 + 1 = 3(分区)
最后所需要的分区为3个,尽管在设计的时候分区为2个,那么在最小分区数量的限制下,最后是需要三个分区
那么分区的数据是怎么分配的呢?
按照分区的预定存数大小字节,每个分区存储3个字节,那么就出现了
分区 | 数据范围 | 存储数据 |
---|---|---|
0 | [0,3] | 1_ _ 2 |
1 | [3,6] | _ _ 3 |
2 | [6,7] | 空 |
_ _ :代表回车的两个字节
说明:
数据范围三个字节,[0,3]属于前后都是闭合,也就是第一个文件实际读取的是四个字节,而不同分区读取数据时候不会出现重复读的情况,就是在0分区读取完索引3之后,尽管在1分区也应该读取这个索引,但是因为0分区已经读取过,则1分区不会再次重复读取,最后的结果就是2分区为空,即没有数据
更多推荐
所有评论(0)