李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
02. Spark Shuffle过程介绍
Leefs
2021-06-29 AM
1476℃
0条
# 02. Spark Shuffle过程介绍 ### 一、Shuffle概念 **1.1 Shuffle简介** 有些运算需要将各节点上的同一类数据汇集到某一节点进行计算,把这些分布在不同节点的数据按照一定的规则汇集到一起的过程称为 Shuffle。 **1.2 MapReduce中的Shuffle** 在MapReduce框架,Shuffle是连接Map和Reduce之间的桥梁,Map阶段通过shuffle读取数据并输出到对应的Reduce;而Reduce阶段负责从Map端拉取数据并进行计算。在整个shuffle过程中,往往伴随着大量的磁盘和网络I/O。所以shuffle性能的高低也直接决定了整个程序的性能高低。 详细过程可参考文章:.MapReduce介绍 ### 二、Spark的Shuffle机制 #### **2.1 Spark Shuffle发展史** - Spark 0.8及以前 Hash Based Shuffle - Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制 - Spark 0.9 引入ExternalAppendOnlyMap - Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle - Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle - Spark 1.4 引入Tungsten-Sort Based Shuffle - Spark 1.6 Tungsten-sort并入Sort Based Shuffle - Spark 2.0 Hash Based Shuffle退出历史舞台 从Spark 1.2开始将之前默认的`Hash Based Shuffle`替换成了`Sort Based Shuffle`。并在Spark 2.0直接舍弃了Hash Based Shuffle。大家可能会好奇Hash Based Shuffle到底存在怎样的缺陷导致很快被遗弃掉。 #### **2.2 Hash Shuffle** HashShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是合并的运行机制。 合并机制主要是通过复用buffer来优化Shuffle过程中产生的小文件的数量。Hash shuffle是不具有排序的Shuffle。 **普通机制的Hash shuffle** ![02.Spark Shuffle过程介绍01.png](https://lilinchao.com/usr/uploads/2021/06/2195544173.png) **PS:** 这里我们先明确一个假设前提:每个Executor只有1个CPU core,也就是说,无论这个Executor上分配多少个task线程,同一时间都只能执行一个task线程。 **图解** 图中有3个 Reducer,从Task 开始那边各自把自己进行 Hash 计算(分区器:hash/numreduce取模),分类出3个不同的类别,每个 Task 都分成3种类别的数据,想把不同的数据汇聚然后计算出最终的结果,所以Reducer 会在每个 Task 中把属于自己类别的数据收集过来,汇聚成一个同类别的大集合,每1个 Task 输出3份本地文件,这里有4个 Mapper Tasks,所以总共输出了4个 Tasks x 3个分类文件 = 12个本地小文件。 **1. shuffle write阶段** 主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子(比如reduceByKey,groupByKey),而将每个task处理的数据按key进行“分区”。所谓“分区”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于reduce端的stage的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。 那么每个执行shuffle write的task,要为下一个stage创建多少个磁盘文件呢?很简单,下一个stage的task有多少个,当前stage的每个task就要创建多少份磁盘文件。比如下一个stage总共有100个task,那么当前stage的每个task都要创建100份磁盘文件。如果当前stage有50个task,总共有10个Executor,每个Executor执行5个Task,那么每个Executor上总共就要创建500个磁盘文件,所有Executor上会创建5000个磁盘文件。由此可见,未经优化的shuffle write操作所产生的磁盘文件的数量是极其惊人的。 **2. shuffle read阶段** shuffle read,通常就是一个stage刚开始时要做的事情。此时该stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。由于shuffle write的过程中,task给Reduce端的stage的每个task都创建了一个磁盘文件,因此shuffle read的过程中,每个task只要从上游stage的所有task所在节点上,拉取属于自己的那一个磁盘文件即可。 shuffle read的拉取过程是一边拉取一边进行聚合的。每个shuffle read task都会有一个自己的buffer缓冲,每次都只能拉取与buffer缓冲相同大小的数据,然后通过内存中的一个Map进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。 **注意:** (1)buffer起到的是缓存作用,缓存能够加速写磁盘,提高计算的效率,buffer的默认大小32k。 分区器:根据hash/numRedcue取模决定数据由几个Reduce处理,也决定了写入几个buffer中 block file:磁盘小文件,从图中我们可以知道磁盘小文件的个数计算公式: block file=M*R (2)M为map task的数量,R为Reduce的数量,一般Reduce的数量等于buffer的数量,都是由分区器决定的 **3. Hash shuffle普通机制的问题** (1)Shuffle前在磁盘上会产生海量的小文件,建立通信和拉取数据的次数变多,此时会产生大量耗时低效的 IO 操作 (因为产生过多的小文件) (2)可能导致OOM,大量耗时低效的 IO 操作 ,导致写磁盘时的对象过多,读磁盘时候的对象也过多,这些对象存储在堆内存中,会导致堆内存不足,相应会导致频繁的GC,GC会导致OOM。由于内存中需要保存海量文件操作句柄和临时信息,如果数据处理的规模比较庞大的话,内存不可承受,会出现 OOM 等问题。 **合并机制的Hash shuffle** 合并机制就是复用buffer,开启合并机制的配置是`spark.shuffle.consolidateFiles`。该参数默认值为false,将其设置为true即可开启优化机制。通常来说,如果我们使用`HashShuffleManager`,那么都建议开启这个选项。 ![02.Spark Shuffle过程介绍02.png](https://lilinchao.com/usr/uploads/2021/06/2834189778.png) 这里还是有4个Tasks,数据类别还是分成3种类型,因为Hash算法会根据你的 Key 进行分类,在同一个进程中,无论是有多少过Task,都会把同样的Key放在同一个Buffer里,然后把Buffer中的数据写入以Core数量为单位的本地文件中,(一个Core只有一种类型的Key的数据),每1个Task所在的进程中,分别写入共同进程中的3份本地文件,这里有4个Mapper Tasks,所以总共输出是 2个Cores x 3个分类文件 = 6个本地小文件。 **图解** 开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现`shuffleFileGroup`的概念,每个`shuffleFileGroup`会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个`shuffleFileGroup`,并将数据写入对应的磁盘文件内。 Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的`shuffleFileGroup`,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。 假设第二个stage有100个task,第一个stage有50个task,总共还是有10个Executor,每个Executor执行5个task。那么原本使用未经优化的`HashShuffleManager`时,每个Executor会产生500个磁盘文件,所有Executor会产生5000个磁盘文件的。但是此时经过优化之后,每个Executor创建的磁盘文件的数量的计算公式为:CPU core的数量 * 下一个stage的task数量。也就是说,每个Executor此时只会创建100个磁盘文件,所有Executor只会创建1000个磁盘文件。 **注意:** (1)启动`HashShuffle`的合并机制`ConsolidatedShuffle`的配置:`spark.shuffle.consolidateFiles=true` (2)block file=Core*R Core为CPU的核数,R为Reduce的数量 **Hash shuffle合并机制的问题** 如果 Reducer 端的并行任务或者是数据分片过多的话则 Core * Reducer Task 依旧过大,也会产生很多小文件。 #### **2.3 Sort shuffle** `SortShuffleManager`的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle read task的数量小于等于`spark.shuffle.sort.bypassMergeThreshold`参数的值时(默认为200),就会启用bypass机制。 **Sort shuffle的普通机制** ![02.Spark Shuffle过程介绍03.png](https://lilinchao.com/usr/uploads/2021/06/1314329474.png) **写入内存数据结构** 该图说明了普通的`SortShuffleManager`的原理。在该模式下,数据会先写入一个内存数据结构中(默认5M),此时根据不同的shuffle算子,可能选用不同的数据结构。如果是`reduceByKey`这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。 **注意:** shuffle中的定时器:定时器会检查内存数据结构的大小,如果内存数据结构空间不够,那么会申请额外的内存,申请的大小满足如下公式: `applyMemory=nowMenory*2-oldMemory` 申请的内存=当前的内存情况*2-上一次的内存情况 意思就是说内存数据结构的大小的动态变化,如果存储的数据超出内存数据结构的大小,将申请内存数据结构存储的数据*2-内存数据结构的设定值的内存大小空间。申请到了,内存数据结构的大小变大,内存不够,申请不到,则发生溢写 **排序** 在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。 **溢写** 排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是通过Java的`BufferedOutputStream`实现的。`BufferedOutputStream`是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。 **merge** 一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为Reduce端的stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。 `SortShuffleManager`由于有一个磁盘文件merge的过程,因此大大减少了文件数量。比如第一个stage有50个task,总共有10个Executor,每个Executor执行5个task,而第二个stage有100个task。由于每个task最终只有一个磁盘文件,因此此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁盘文件。 **注意:** (1)block file= 2M 一个map task会产生一个索引文件和一个数据大文件 (2) m*r>2m(r>2) SortShuffle会使得磁盘小文件的个数再次的减少 **Sort shuffle的bypass机制** ![02.Spark Shuffle过程介绍04.png](https://lilinchao.com/usr/uploads/2021/06/3627736169.png) bypass运行机制的触发条件如下: (1)shuffle map task数量小于`spark.shuffle.sort.bypassMergeThreshold`参数的值。 (2)不是聚合类的shuffle算子(比如reduceByKey)。 此时task会为每个reduce端的task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。 该过程的磁盘写机制其实跟未经优化的`HashShuffleManager`是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的`HashShuffleManager`来说,shuffle read的性能会更好。 而该机制与普通`SortShuffleManager`运行机制的不同在于: (1)磁盘写机制不同; (2)不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。 ### 三、总结 + Shuffle 过程本质上都是将 Map 端获得的数据使用分区器进行划分,并将数据发送给对应的 Reducer 的过程。 + shuffle作为处理连接map端和reduce端的枢纽,其shuffle的性能高低直接影响了整个程序的性能和吞吐量。map端的shuffle一般为shuffle的Write阶段,reduce端的shuffle一般为shuffle的read阶段。Hadoop和spark的shuffle在实现上面存在很大的不同,spark的shuffle分为两种实现,分别为HashShuffle和SortShuffle。 + HashShuffle又分为普通机制和合并机制,普通机制因为其会产生`M*R`个数的巨量磁盘小文件而产生大量性能低下的Io操作,从而性能较低,因为其巨量的磁盘小文件还可能导致OOM,HashShuffle的合并机制通过重复利用buffer从而将磁盘小文件的数量降低到`Core*R`个,但是当Reducer 端的并行任务或者是数据分片过多的时候,依然会产生大量的磁盘小文件。 + SortShuffle也分为普通机制和bypass机制,普通机制在内存数据结构(默认为5M)完成排序,会产生2M个磁盘小文件。而当shuffle map task数量小于`spark.shuffle.sort.bypassMergeThreshold`参数的值。或者算子不是聚合类的shuffle算子(比如reduceByKey)的时候会触发SortShuffle的bypass机制,SortShuffle的bypass机制不会进行排序,极大的提高了其性能 + 在Spark 1.2以前,默认的shuffle计算引擎是`HashShuffleManager`,因为`HashShuffleManager`会产生大量的磁盘小文件而性能低下,在Spark 1.2以后的版本中,默认的`ShuffleManager`改成了`SortShuffleManager`。`SortShuffleManager`相较于`HashShuffleManager`来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。 *附:* https://www.cnblogs.com/itboys/p/9226479.html
标签:
Spark
,
Spark Core
,
Spark RDD
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1269.html
上一篇
01.MapReduce介绍
下一篇
03.Spark RDD简介
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
算法
Zookeeper
序列化和反序列化
DataWarehouse
Spark RDD
MyBatis
LeetCode刷题
Jquery
字符串
SpringCloud
NIO
Azkaban
容器深入研究
Livy
FileBeat
Ubuntu
SQL练习题
队列
DataX
Netty
高并发
并发编程
数学
SpringBoot
nginx
栈
JavaWeb
MyBatis-Plus
MyBatisX
Sentinel
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞