大数据基础之Spark——Spark分布式计算原理(Spark Stage原理、Spark Shuffle过程、RDD依赖关系、DAG工作原理、RDD优化、RDD分区设计、数据倾斜问题)
大数据基础项目——WordCount原理:分析:当数据发生Shuffle过程时,会划分成两个Stage一个Stage对应着三个Task一个分区对应着一个Task划分Stage的原因:数据本地化: - 移动计算,而不是移动数据 (移动数据不如移动计算) - 保证一个Stage内不会发生数据移动Spark Shuffle过程解析:在分区之间重新分配数据 - 父RDD中同一分区中的数据按照算子的要
·
大数据基础项目——WordCount原理:
分析:
- 当数据发生Shuffle过程时,会划分成两个Stage
- 一个Stage对应着三个Task
- 一个分区对应着一个Task
划分Stage的原因:
- 数据本地化:
- 移动计算,而不是移动数据 (移动数据不如移动计算)
- 保证一个Stage内不会发生数据移动
Spark Shuffle过程解析:
- 在分区之间重新分配数据
- 父RDD中同一分区中的数据按照算子的要求重新进入子RDD的不同分区中
- 中间结果写磁盘
- 由子RDD拉取数据,而不是父RDD推送数据
- 默认情况下,Shuffle不会改变分区数量
spark write:
shuffle wirte:在一个stage结束计算之后,为了下一个stage可以进行shuffle类的算子,而将每个task处理的数据按照key进行分区,即对相同的key进行hash算法,将相同的key写入到一个磁盘文件中,每一个磁盘文件都只属于reduce端的stage的一个task.在将数据写入磁盘之前,会先将数据写入到内存缓冲中,内存缓冲填满后,才会溢出到磁盘文件中。
spark read:
上一个stage开始,每个shuffle read task都会有一个自己的buffer缓冲,每次都只能拉去与buffer缓冲相同大小的数据,然后通过内存中的一个map进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操作,直到将所有的数据拉取完,并得到最终的结果.
RDD的依赖关系:
- Lineage:血统、遗传
- 保存了RDD的依赖关系
- RDD实现了基于Lineage的容错机制 - 依赖关系
- 宽依赖(一个父RDD的分区被子RDD的多个分区使用)
- 一对多
宽依赖算子:groupByKey、join、reduceByKey、sortByKey等
- 窄依赖(一个父RDD的分区被子RDD的一个分区使用)
- 一对一
- 多对一
窄依赖算子:map、flatMap、filter、distinct、union、join等
DAG(有向无环图)的工作原理:
- 根据RDD之间的依赖关系,形成一个DAG(有向无环图)
- DAGScheduler将DAG划分为多个Stage
- 划分依据:是否发生宽依赖(Shuffle)
- 划分规则:从后往前,遇到宽依赖切割为新的Stage
- 每个Stage由一组并行的Task组成
RDD优化:
RDD持久化
- RDD缓存机制:缓存数据到内存/磁盘,可大幅提升Spark应用性能
- cache=persist(MEMORY)
- persist - 缓存策略StorgeLevel
据底层源码记载一共有12种
- MEMORY_ONLY(默认)
- MEMORY_AND_DISK
- DISK_ONLY - 缓存应用场景
- 从文件加载数据后,因为重新读取文件成本较高
- 经过较多算子变换之后,重新计算成本较高
- 单个非常消耗资源的算子之后 - 使用注意事项
- cache()或persist()后不能再有其他算子
- cache()或persist()遇到Action算子完成后才生效
RDD共享变量
广播变量
- 广播变量:允许开发者将一个只读变量(Driver端)缓存到每个节点(Exector)上,而不是每个任务传递一个副本
val broadcastVal = sc.broadcast(Array(1,2,3)) //定义广播变量
broadcastVal.value //取值
注意:
- Driver端变量在每个Exector每个Task保存一个变量副本
- Driver端广播变量在每个Exector上只保存一个变量副本
累加器
- 只允许added操作,常用于实现计数
//设置初始值为0
val accum = sc.accumulator(0,"mytest")
sc.parallelize(Array(1,2,3,4)).foreach(x=>accum+=x)
accum.value //10
RDD分区设计
- 分区大小限制为2GB
- 分区太少
- 不利于并发
- 更容易受数据倾斜的影响
- groupBy、reduceBy、sortBy等内存压力增大 - 分区过多
- Shuffle开销越大
- 创建任务开销越大 - 经验
- 每个分区大约128MB
- 如果分区小于但接近2000,则设置为大于2000
RDD数据倾斜
简单来说数据倾斜就是数据的key的分布不均造成一部分数据很多,一部分数据很少的局面,即数据分布不均。
解决方案:
- 对数据进行ETL预处理
- 过滤少数导致倾斜的key
- 提高Shuffle的并行度
- 两阶段聚合
- 将reduceJoin转为mapJoin
- 采样倾斜key并分拆join操作
- 使用随机前缀和扩容RDD进行join
更多推荐
所有评论(0)