李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
05.【转载】Spark RDD转换算子
Leefs
2021-06-29 AM
2188℃
0条
[TOC] ### 前言 ![05.Spark RDD转换算子.png](https://lilinchao.com/usr/uploads/2021/06/659710740.png) ## 转换算子 RDD 根据数据处理方式的不同,将算子整体上分为 Value 类型、双 Value 类型 和 Key-Value类型。 ### Value类型 #### map ```scala def map(f: T => U): RDD[U] ``` 说明:将RDD中类型为T的元素,**一对一地映射**为类型为U的元素,这里的转换可以是**类型的转换**,也可以是**值的转换** ```scala val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4)) // 值的转换 ==> List(2, 4, 6, 8) val mapRDD: RDD[Int] = rdd.map( _ * 2 ) mapRDD.collect().foreach(println) // 类型的转换 ==> List("1", "2", "3", "4") val mapRDD1: RDD[String] = rdd.map( _ + "" ) mapRDD1.collect().foreach(println) ``` + 技巧:当map转换复杂的数据类型时,通过 **模式匹配** 简洁表达 ```scala val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("d", 4))) val mapRdd: RDD[String] = rdd.map( (tuple: (String, Int)) => { tuple._1 + tuple._2 // "a1","b2"... } ) val mapRdd1: RDD[String] = rdd.map { //最外层是 { } case (str, num) => { str + num } } ``` #### mapPartitions ```scala def mapPartitions( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] ``` 说明:将待处理的数据 **以分区为单位** 发送到计算节点进行处理,输入参数为RDD中每一个分区的迭代器。参数二`preservesPartitioning`是否保留父RDD的分区信息。 示例:获取每个数据分区的最大值 ```scala val rdd = sc.makeRDD(List(1, 2, 3, 4), 2) // 传入 f: Iterator => Iterator val mpRDD: RDD[Int] = rdd.mapPartitions( iterator => { List(iterator.max).iterator } ) mpRDD.collect().foreach(println) output: 2 4 ``` 思考:**map 和 mapPartitions 的区别?** - 数据处理角度 Map 算子是读一个record计算一个record,类似于**串行**操作。而 mapPartitions 算子是**以分区为单位进行批处理**操作。 - 功能的角度 Map 算子主要目的将数据源中的数据进行转换,不会减少或增多数据,**映射前后维度不变**。 MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,**可以增加或减少数据**。 - 性能的角度 Map 算子因为类似于串行操作,所以性能比较低,而 mapPartitions 算子类似于批处理,所以性能较高。 但是 mapPartitions 算子会将整个分区的数据加载到内存进行引用,那么这样会导致内存可能不够用,出现内存溢出的错误。**所以在内存有限的情况下,不推荐使用MapPartitions**。 思考:**mapPartitions 使用场景?** 如果在映射过程中需要频繁创建额外的对象,mapPartitions 可以使 RDD中各个分区可以共享同一个对象以提高性能。 思考:如何理解Map 算子类似于串行,而mapPartitions 算子是以分区为单位进行批处理操作呢? + map ```scala val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 1) //单个分区 val mapRDD1: RDD[Int] = rdd.map( num => { println(">>>>>>>> " + num) num } ) val mapRDD2: RDD[Int] = mapRDD1.map( num => { println("-------- " + num) num } ) mapRDD2.collect() ``` output: ```scala >>>>>>>> 1 -------- 1 >>>>>>>> 2 -------- 2 >>>>>>>> 3 -------- 3 >>>>>>>> 4 -------- 4 ``` 当分区个数为1时,只有当前面一个record全部的逻辑执行完毕后,才会执行下一个数据(串行)。分区内数据的执行是有序的。 当分区格式为2时,不同分区之间是并行执行的,无先后顺序;而同一分区内的数据,满足有序性,逐个执行(串行)。 + mapPartitions ```scala val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2) // 传入 f: Iterator => Iterator val mpRDD: RDD[Int] = rdd.mapPartitions( // iterator代表一个分区的迭代器 iterator => { println(">>>>>>>>>>") iterator.map(_ * 2) //一次性加载整个分区,然后对该分区进行map转换,类似于批处理 } ) mpRDD.collect() output: ">>>>>>>>>>" ">>>>>>>>>>" ``` - 小结: map的实现:同一分区内的数据,必须等待全部的逻辑执行完毕,才会加载下一个数据,这就是串行; mapPartitions的实现: 会先将分区内的全部数据加载到内存中,然后执行逻辑。 ![05.Spark RDD转换算子02.png](https://lilinchao.com/usr/uploads/2021/06/3547201911.png) #### mapPartitionsWithIndex ```scala def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] ``` 说明:将待处理的数据 **以分区为单位** 发送到计算节点进行处理,在处理时同时可以**获取当前分区索引**。 示例:获取第二个数据分区的数据 ```scala val rdd = sc.makeRDD(List(1, 2, 3, 4, 5), 3) //分区结果 ==> 【1】,【2,3】,【4,5】 //获取第2个分区的数据 val mpiRDD = rdd.mapPartitionsWithIndex( (index, iterator) => { if (index == 1) { iterator } else { Nil.iterator } } ) mpiRDD.collect().foreach(println)// 【2,3】 ``` #### flatMap ```scala def flatMap(f: T => TraversableOnce[U]): RDD[U] ``` 说明:将RDD中的每一个元素进行一对多转换,然后扁平化 强调:`f: T => TraversableOnce[U]`的返回值**必须是可遍历集合,不能是标量**。 ```scala // 先映射后打散,只需传入映射逻辑 val rdd1: RDD[String] = sc.makeRDD(List("hello spark", "hello scala")) // "hello spark" ==> Array["hello", "spark"] ==> "hello", "spark" val flatRDD1: RDD[String] = rdd1.flatMap( s => s.split(" ") ) flatRDD1.collect().foreach(println) // hello // spark // hello // scala ``` 示例:将 `List(List(1,2),3,List(4,5))` 进行扁平化操作 ```scala val rdd: RDD[Any] = sc.makeRDD(List( List(1, 2), 3, List(4, 5) )) // List中元素类型不同,需模式匹配 val flatRDD = rdd.flatMap{ case list: List[_] => list case a: Int => List(a) } flatRDD.collect().foreach(println) // 1 2 3 4 5 ``` #### glom ```scala def glom(): RDD[Array[T]] ``` 说明:将 **同一个分区** 的数据直接转换为相同类型的内存**数组**进行处理,分区不变,每个分区只有一个数组元素。 示例: 计算所有分区最大值求和(分区内取最大值,分区间求和) ```scala val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5), 2) // 【1,2】,【3,4,5】 // List(1,2,3,4) => RDD(Array(1,2), Array(3,4,5)) val glomRDD: RDD[Array[Int]] = rdd.glom() // Array(1,2) -> 2; Array(3,4,5) -> 5 val maxRDD: RDD[Int] = glomRDD.map( arr => arr.max ) // RDD(2, 5) => 归约(相加) val res: Int = maxRDD.reduce(_ + _) println(res) // res = 7 ``` + 补充:使用行动算子`aggregate`一步实现 ```scala val res: Int = rdd.aggregate(0)(math.max(_, _), _ + _) ``` + 思考:如何理解分区不变性? ```scala // 将RDD保存到目录下,以观察分区情况 val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4), 2) rdd.saveAsTextFile("output1") val mapRDD: RDD[Int] = rdd.map(_ * 2) mapRDD.saveAsTextFile("output2") /* 原始rdd的每条record会有分区号,经过map操作后,依然在相同的分区中 output1 part-00000 1 2 part-00001 3 4 output2 part-00000 2 4 part-00001 6 8 */ ``` ![05.Spark RDD转换算子03.png](https://lilinchao.com/usr/uploads/2021/06/3123702281.png) 与分区不变性相对立的是shuffle,下面介绍的算子groupBy涉及到shuffle过程。 #### groupBy ```scala def groupBy (f: T => K): RDD[(K, Iterable[T])] ``` 说明: ```scala /** * groupBy(f: T => K ) 将数据源中的元素映射到key上 * T是数据源元素的类型,K为任意类型 * * groupBy将数据源中的每一个数据进行f映射,根据返回的分组key进行分组 * 相同的key值的数据会放置在一个可迭代的集合中,即Iterable()中 */ ``` 示例1:按奇偶分组 ```scala val intRdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2) // 按奇偶分组 val groupRDD1: RDD[(Int, Iterable[Int])] = intRdd.groupBy( (num: Int) => { num % 2 } ) groupRDD1.collect().foreach(println) // (0,CompactBuffer(2, 4, 6)) // (1,CompactBuffer(1, 3, 5)) ``` 示例2:按单词首字母分组 ```scala val strRdd: RDD[String] = sc.makeRDD(List("Hello", "Spark", "Scala", "Hadoop"), 2) //按首字母分组 val groupRDD2: RDD[(Char, Iterable[String])] = strRdd.groupBy( word => word.charAt(0) ) groupRDD2.collect().foreach(println) // (H,CompactBuffer(Hello, Hadoop)) // (S,CompactBuffer(Spark, Scala)) ``` + 思考:分组和分区有什么关系? **分组后,一个组的数据会在一个分区中,但是并不是说一个分区中只有一个组,一句话:分组和分区没有必然的关系。** ```scala val intRdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2) // 原集合分区 intRdd.saveAsTextFile("output1") val groupRDD: RDD[(Int, Iterable[Int])] = intRdd.groupBy( (num: Int) => { num % 2 } ) // 分组后 groupRDD.saveAsTextFile("output2") ``` 结果: ```scala output1: part-00000 1 2 3 part-00001 4 5 6 output2: part-00000 (0,CompactBuffer(2, 4, 6)) part-00001 (1,CompactBuffer(1, 3, 5)) ``` ![05.Spark RDD转换算子04.png](https://lilinchao.com/usr/uploads/2021/06/233924564.png) #### filter ```scala def filter(f: T => Boolean): RDD[T] ``` 说明:将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃(返回True的保留,False丢弃) 示例:过滤,只保留偶数 ```scala val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6)) val filterRdd: RDD[Int] = rdd.filter(num => num % 2 == 0) filterRdd.collect().foreach(println) // 2 4 6 ``` **当数据进行筛选过滤后,分区不变**,但是分区内的数据可能不均衡,即数据倾斜。 ```scala val rdd: RDD[Int] = sc.makeRDD(List(2, 4, 6, 8, 1, 3, 5, 8), 2) val filterRdd: RDD[Int] = rdd.filter(num => num % 2 == 0) // 原分区: 【2,4,6,8】 【1,3,5,8】 // 过滤分区不变:【2,4,6,8】 【8】 <== 不同分区的数据不均衡 ``` #### distinct ```scala def distinct(): RDD[T] def distinct(numPartitions: Int): RDD[T] ``` 说明:将数据集中重复的数据去重 ```scala /** * 空参 distinct() 调用的实际是 distinct(partitions.length) * 其中,distinct(numPartitions: Int) 去重原理为 * map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1) */ ``` 示例:去重 ```scala val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 1, 2, 3, 4)) rdd.distinct() .collect().foreach(println) // 1 2 3 4 ``` #### coalesce ```scala def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] ``` 说明:根据数据量**增减分区**,用于大数据集过滤后,提高小数据集的执行效率。 当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩分区,减少分区的个数,减小任务调度成本。 - 缩小分区(N > M)且 N 和 M 相差不多的两种形式 ```scala /* 1. coalesce方法默认情况下不会将分区的数据打乱重新组合 例如元素3和4原本同一分区,那么缩减后仍会处于同一分区(窄依赖) 这种情况下的缩减分区可能会导致数据倾斜 */ val rdd = sc.makeRDD(List(1,2,3,4,5,6), 3) val newRDD: RDD[Int] = rdd.coalesce(2) newRDD.saveAsTextFile("output") // 产生两个分区:分别为【1,2】、【3,4,5,6】 ``` ![05.Spark RDD转换算子05.png](https://lilinchao.com/usr/uploads/2021/06/2228380108.png) ```scala // 2. 如果想要让数据均衡,可以进行shuffle处理,第二个参数为True(宽依赖) val newRDD: RDD[Int] = rdd.coalesce(2, true) newRDD.saveAsTextFile("output") ``` ![05.Spark RDD转换算子06.png](https://lilinchao.com/usr/uploads/2021/06/3587483232.png) - 缩小分区( N > M)且N和M差距悬殊(比如N=1000,M=1) 如果不进行shuffle,由于父子RDD是窄依赖,他们同处于一个Stage中,就可能造成Spark程序运行的并行度不够(Task个数由Stage的最后一个RDD的分区个数决定)。比如M=1时,由于只有一个分区,所以只会有一个Task运行,为了使coalesce之前的操作有更好的并行度,可以将shuffle参数设为true。 ![05.Spark RDD转换算子07.png](https://lilinchao.com/usr/uploads/2021/06/688231220.png) - 我想要扩大分区(N < M),怎么办? 一般情况下N个分区由于数据分布不均,利用HashPartitioner函数将数据重新分区为M个,这时必须将shuffle参数设为为True。 **扩大分区个数,如果不进行shuffle操作,是没有意义的,无法改变RDD分区数目**: ```scala val rdd = sc.makeRDD(List(1,2,3,4,5,6), 2) val newRDD: RDD[Int] = rdd.coalesce(3, shuffle = true) ``` spark提供了一个简化的操作`repartition`,专门用于扩大分区, 底层代码调用的就是coalesce,而且采用shuffle。 #### repartition ```scala def repartition(numPartitions: Int): RDD[T] = coalesce(numPartitions, shuffle = true) ``` 示例: ```scala val rdd = sc.makeRDD(List(1,2,3,4,5,6), 2) rdd.repartition(3) .saveAsTextFile("output") // 【1,6】 【2,5】 【3,4】 ``` - 思考:**coalesce 和 repartition 区别?** coalesce 和 repartition 本质是相同的,后者底层代码调用的就是coalesce,且一定要经过shuffle。 习惯上**减少分区使用coalesce, 扩大分区使用repartition 。** #### sortBy ```scala def sortBy( f: T => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length): RDD[T] ``` 说明:该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列,第二个参数为False为降序。 默认排序前后 RDD 的**分区数一致**,**中间存在 shuffle 的过程**。 示例1: ```scala val rdd: RDD[Int] = sc.makeRDD(List(2, 1, 6, 5, 4, 3), 2) val newRDD: RDD[Int] = rdd.sortBy(num=>num) newRDD.saveAsTextFile("output") // 两个分区为 【1,2,3】 【4,5,6】,所以经历了shuffle ``` 示例2: ```scala val rdd = sc.makeRDD(List(("1", 1), ("11", 2), ("2", 3)), 2) // 按元组的第一个元素,降序 val sortRDD: RDD[(String, Int)] = rdd.sortBy(t => t._1, false) sortRDD.collect().foreach(println) // ("2",3) ("11",2) ("1",1) ``` #### sample* ```scala def sample( withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T] ``` 说明:根据指定的规则从数据集中抽取数据 - 不放回 ```scala val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) /* 抽取数据不放回(伯努利算法) 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要 第一个参数:抽取的数据是否放回,false:不放回 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取; 第三个参数:随机数种子,种子相同随机结果也是相同的,不传递的话默认值为当前系统时间 */ println(rdd.sample( false, 0.3 ).collect().mkString(",")) // 7,8,9 ``` - 放回 ```scala /* 抽取数据放回(泊松算法) 第一个参数:抽取的数据是否放回,true:放回 第二个参数:表示数据源中的每条数据被抽取的可能次数 第三个参数:随机数种子 */ println(rdd.sample( true, 2 ).collect().mkString(",")) // 1,1,2,2,4,6,6,6,6,6,7,8,8,9,10 ``` - 思考:抽样函数有什么用呢? 对发生数据倾斜的分区数据集,进行多次抽样,从样本中分析数据的分布。 ### 双 Value 类型 方法签名: ```scala // 交集 def intersection(other: RDD[T]): RDD[T] // 并集 def union(other: RDD[T]): RDD[T] // 差集 def subtract(other: RDD[T]): RDD[T] // 拉链,形成元组 def zip(other: RDD[U]): RDD[(T, U)] ``` 示例: ```scala val rdd1: RDD[Int] = sc.makeRDD(List(1,2,3,4)) val rdd2: RDD[Int] = sc.makeRDD(List(3,4,5,6)) // 交集 : 【3,4】,会去重 val rdd3: RDD[Int] = rdd1.intersection(rdd2) println(rdd3.collect().mkString(",")) // 并集 : 【1,2,3,4,3,4,5,6】,不会去重 val rdd4: RDD[Int] = rdd1.union(rdd2) println(rdd4.collect().mkString(",")) // 差集 : 【1,2】 val rdd5: RDD[Int] = rdd1.subtract(rdd2) println(rdd5.collect().mkString(",")) // 拉链 : (1,3),(2,4),(3,5),(4,6) val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2) println(rdd6.collect().mkString(",")) ``` 特点: - **交集、并集 和 差集要求两个数据源数据类型一致** - 拉链操作:两个数据源的类型**可以不一致** ```scala val rdd7 = sc.makeRDD(List("a","b","c","d")) val rdd8 = rdd1.zip(rdd7) println(rdd8.collect().mkString(",")) // (1,a),(2,b),(3,c),(4,d) ``` - 拉链操作:两个RDD要求**分区数量要保持一致,分区中数据量保持一致** ```scala val rdd1 = sc.makeRDD(List(1,2,3,4),2) val rdd2 = sc.makeRDD(List(3,4,5,6),3) // val rdd3: RDD[(Int, Int)] = rdd1.zip(rdd2) 分区数量不一致,异常 val rdd4 = sc.makeRDD(List(3,4,5,6,7,8), 2) // val rdd5: RDD[(Int, Int)] = rdd1.zip(rdd5) 分区中数据量不一致,异常 ``` 注:scala语法中,两个集合zip操作,不要求元素个数相同。 - 是否存在shuffle? 一般情况下,intersection和subtract都会有shuffle过程;而union是窄依赖(RangeDependency ),不存在shuffle,如下图所示。 ![05.Spark RDD转换算子08.png](https://lilinchao.com/usr/uploads/2021/06/816701985.png) ### Key - Value 类型 Value 类型 与 Key - Value 类型区别在于,前者更为广泛,单值`RDD[U]`与键值`RDD[(K,V)]`都适用;后者只适用于`RDD[(K,V)]`。 #### partitionBy ```scala def partitionBy(partitioner: Partitioner): RDD[(K, V)] ``` 说明:将数据 **基于 key 按照指定 Partitioner 重分区**。Spark 默认的分区器是 HashPartitioner。 按照指定的分区器,对Key进行计算得到新的分区号,从而对数据重新分区。 ```scala val rdd: RDD[(Int, String)] = sc.makeRDD( Array((1,"aaa"),(2,"bbb"),(3,"ccc"), (4,"ddd")), 2) /* HashPartitioner(2) 传入分区数为2,也可以与原分区数不同 */ val value: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(2)) value.saveAsTextFile("output") // Key 按照哈希分区器划定分区,【(1,"aaa"),(3,"ccc")】 【(2,"bbb"),(4,"ddd")】 ``` - 补充:`partitionBy()`是PairRDDFunctions类中的方法,那么RDD为何可以调用呢? ```scala /* 因为存在 隐式转换(二次编译),RDD => PairRDDFunctions */ abstract class RDD {...} object RDD{ implicit def rddToPairRDDFunctions(rdd: RDD) = new PairRDDFunctions(rdd) ... } ``` - 思考:如果重分区的分区器和当前 RDD 的分区器一样怎么办? ```scala /* 当【分区器类别 + 分区数量】相同时,就不会创建新的RDD,返回当前RDD 二者有任一不同,将创建新的RDD返回 */ val value: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(2)) val value1: RDD[(Int, String)] = value.partitionBy(new HashPartitioner(2)) println(value1 == value) // true ``` - 思考:Spark 还有其他分区器吗? 常见的有 HashPartitioner、RangePartitioner - 思考:如果想按照自己的方法进行数据分区怎么办? 自定义分区器,继承 Partitioner #### mapValues ```scala def mapValues(f: V => U): RDD[(K, U)] ``` 说明:针对KV类型的映射map,当K不变,只对V进行映射时,可采用mapValues简化 示例:wordCount ```scala val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark")) val flatRdd: RDD[String] = rdd.flatMap(_.split(" ")) val groupRdd: RDD[(String, Iterable[String])] = flatRdd.groupBy(str => str) // 使用map,(k1,V1) -> (k2,v2) val value: RDD[(String, Int)] = groupRdd.map{ case (a, b) => { (a, b.size) } } // 使用mapValues,(K,V) -> (K,U) val value: RDD[(String, Int)] = groupRdd.mapValues( iter => iter.size ) outout: // (Spark,1) // (Hello,2) // (Scala,1) ``` #### reduceByKey ```scala // 泛型为[K, V],V代表value的类型 def reduceByKey(func: (V, V) => V): RDD[(K, V)] def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] ``` 说明:相同的key的数据进行value的聚合操作(两两聚合),**传入的func表示两个val的聚合逻辑**。如果key的数据只有一个,是不会参与运算的,直接返回。 示例:wordCount ```scala val rdd = sc.makeRDD(List( ("a", 1), ("a", 2), ("a", 3), ("b", 4) )) // 【1,2,3】-> 【3,3】 -> 【6】 val value: RDD[(String, Int)] = rdd.reduceByKey( (x: Int, y: Int) => { x + y } ) value.collect().foreach(println) // (a,6) // (b,4) ``` #### groupByKey ```scala def groupByKey(): RDD[(K, Iterable[V])] def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] ``` 说明:将数据源的数据按照key ,对 value 进行分组 ```scala val rdd = sc.makeRDD(List( ("a", 1), ("a", 2), ("a", 3), ("b", 4) )) // groupByKey : 针对[K,V]类型,将数据源中的数据,相同key的数据分在一个组中,形成一个对偶元组 // 元组中的第一个元素就是key,元组中的第二个元素就是相同key的value的集合 val groupRDD1: RDD[(String, Iterable[Int])] = rdd.groupByKey() groupRDD1.collect().foreach(println) // (a,CompactBuffer(1, 2, 3)) // (b,CompactBuffer(4)) println(groupRDD1.partitioner) // Some(org.apache.spark.HashPartitioner@8) 默认使用哈希分区器,8个分区 ``` - groupByKey的RDD依赖关系: ![05.Spark RDD转换算子09.png](https://lilinchao.com/usr/uploads/2021/06/3915956801.png) - 思考:groupByKey 与 groupBy的区别? | | groupByKey | groupBy | | ------------ | --------------------------- | ---------------------------- | | 适用集合类型 | 必须是`RDD[(K, V)]` | 任意`RDD[T]` | | 分组逻辑 | 按照Key分组 | 自定义`f:T->key`,需传入 | | 返回值 | `k ->Iterable(v1, v2, ...)` | `k -> Iterable(T1, T2, ...)` | - 经典考题: reduceByKey 和 groupByKey 的区别? - 从 shuffle 的角度:**reduceByKey 和 groupByKey 都存在 shuffle 的操作**,但是 reduceByKey 可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。 - 从功能的角度:reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合,只能使用 groupByKey。 ![05.Spark RDD转换算子10.png](https://lilinchao.com/usr/uploads/2021/06/3070839448.png) reduceByKey针对分区内与分区间,计算规则是相同的。如果分区内与分区间的计算规则不同,可以使用aggregateByKey。 #### aggregateByKey ```scala def aggregateByKey(zeroValue: U) (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] ``` 说明:将数据根据 **不同的规则** 进行分区内计算和分区间计算 ```scala /** * aggregateByKey存在函数柯里化,有两个参数列表 * 第一个参数列表,需要传递一个参数,表示为初始值(只用于分区内计算) * 用于当碰见key第一个value时,与它进行分区内计算 * 第二个参数列表需要传递2个参数 * 参数1表示分区内计算规则 * 参数2表示分区间计算规则 */ ``` 示例:取出每个分区内相同 key 的最大值然后分区间相加 ```scala val rdd = sc.makeRDD(List( ("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6) ),2) rdd.aggregateByKey(0)( (x, y) => math.max(x, y), (x, y) => x + y ).collect().foreach(println) //(b,8) //(a,8) ``` ![05.Spark RDD转换算子11.png](https://lilinchao.com/usr/uploads/2021/06/3114546660.png) 初始值`zeroValue`的选取是重要的,如果给的值不合适,将会是不同的结果: ```scala rdd.aggregateByKey(5)( (x, y) => math.max(x, y), (x, y) => x + y ).collect().foreach(println) //(b,10) //(a,11) ``` ![05.Spark RDD转换算子12.png](https://lilinchao.com/usr/uploads/2021/06/3713508465.png) aggregateByKey中初始值的类型与原本值的类型 **可以不同**,而最终的返回数据结果应该和初始值的类型保持一致,重温一下方法签名: ```scala // 键值对的泛型[K, V],输出为[K, U] def aggregateByKey(zeroValue: U) (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] ``` 示例:获取相同key的数据的平均值 ```scala val rdd = sc.makeRDD(List( ("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6) ),2) // 获取相同key的数据的平均值 => (a, 3),(b, 4) val newRDD : RDD[(String, (Int, Int))] = rdd.aggregateByKey( (0,0) )( // (Tuple, Int) => Tuple // a: t=(0,0),v=1 => (1,1) => t=(1,1),v=2 => (3,2) ( t, v ) => { (t._1 + v, t._2 + 1) }, // (Tuple, Tuple) => Tuple // a: t1=(3,2),t2=(6,1) => (9,3) (t1, t2) => { (t1._1 + t2._1, t1._2 + t2._2) } ) // 对[K,V]做映射时,若K保持不动,仅对V做映射,可使用mapValues(f: V => U) val resultRDD: RDD[(String, Int)] = newRDD.mapValues { case (num, cnt) => { num / cnt } } resultRDD.collect().foreach(println) // (a, 3) (b, 4) ``` ![05.Spark RDD转换算子13.png](https://lilinchao.com/usr/uploads/2021/06/939009585.png) #### foldByKey ```scala def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] ``` 说明:当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey 示例: ```scala val rdd = sc.makeRDD(List( ("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6) ),2) // 二者等价: 结果为 (b,12),(a,9) rdd.aggregateByKey(0)(_+_, _+_) rdd.foldByKey(0)(_+_) ``` 注:值得注意的是,foldByKey保持键值对的泛型不变(`(k,v)->(k,v)`),而aggregateByKey可能会改变输出的值类型(`(k,v)->(k,u)`)。 - 问题:当分区内计算规则和分区间计算规则相同时,foldByKey和reduceByKey都能实现,二者有什么区别呢? ```scala def reduceByKey(func: (V, V) => V): RDD[(K, V)] def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] ``` **相同点**:不会改变键值对类型`(K,V)->(K,V)`,针对相同的Key,对Value做**两两聚合**操作 **不同点**:`reduceByKey`没有初始值,如果key的数据只有一个,是不会参与运算的,直接返回;而`foldByKey`要给定初始值,如果key的数据只有一个,就会与初始值进行计算。 #### combineByKey ```scala def combineByKey( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)] ``` 说明:它是对aggregateByKey的另一种实现,它不直接给定初始值,而是将相同key的第一个数据进行结构的转换,作为初始值。 ```scala /** * combineByKey : 方法需要三个参数 * 1. createCombiner:将相同key的第一个数据进行结构的转换,实现操作 * 2. mergeValue:分区内的计算规则 * 3. mergeCombiners:分区间的计算规则 */ ``` 示例:获取相同key的数据的平均值 ```scala val rdd = sc.makeRDD(List( ("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6) ),2) //注:因为第一个参数返回值类型是动态的,所以计算规则需加上泛型限定 val newRDD : RDD[(String, (Int, Int))] = rdd.combineByKey( t => (t, 1), //"a": 1 => (1, 1)形成初始值 ( t: (Int, Int), v) => { (t._1 + v, t._2 + 1) }, (t1: (Int, Int), t2: (Int, Int)) => { (t1._1 + t2._1, t1._2 + t2._2) } ) val resultRDD: RDD[(String, Int)] = newRDD.mapValues { case (num, cnt) => { num / cnt } } resultRDD.collect().foreach(println) // (a, 4) (b, 4) ``` 执行流程(初始值 -> 分区内 -> 分区间)如图所示: ![05.Spark RDD转换算子14.png](https://lilinchao.com/usr/uploads/2021/06/1443787968.png) - groupByKey、reduceByKey、foldByKey、aggregateByKey这四种算子,**最终都归结为对combineByKey 的调用**。 - combineByKey 共有五个参数如下: ![05.Spark RDD转换算子15.png](https://lilinchao.com/usr/uploads/2021/06/533512966.png) - 值得注意的是:groupByKey的参数`mapSideCombine=false`,不会在map端进行combine操作,其余四种算子该参数为`mapSideCombine=true`。 - 归约算子的内部实现: ![05.Spark RDD转换算子16.png](https://lilinchao.com/usr/uploads/2021/06/202345924.png) | 转换操作 | 生成RDD的类型 | | ------------------------------------------------------- | ---------------------------------------------------------- | | combineByKey (reduceByKey、foldByKey、aggregateByKey) | MapParitionsRDD(预聚合)-> ShuffledRDD -> MapParitionsRDD | | groupByKey | ShuffledRDD -> MapParitionsRDD | 其中, ShuffledRDD 进行 reduce(通过 aggregate + mapPartitions() 操作来实现)得到 MapPartitionsRDD。 - 对比:**reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别?** | | 初始值 | 相同的Key第一个值 | 分区内&分区间的计算规则 | | -------------- | ------ | ------------------------------------------- | ----------------------- | | reduceByKey | 无 | 相同 key 的第一个数据不进行任何计算 | 计算规则相同 | | foldByKey | 给定 | 相同 key 的第一个数据和初始值进行分区内计算 | 计算规则相同 | | aggregateByKey | 给定 | 相同 key 的第一个数据和初始值进行分区内计算 | 计算规则可以不同 | | combineByKey | 无 | 相同 key 的第一个数据结构转换,作为初始值 | 计算规则可以不同 | **重要相同点:四个算子均具有“预聚合”功能,即在shuffle落盘之前,在内存中先聚合数据,再写入磁盘,减少数据落盘量** 示例:实现wordCount ```scala val rdd = sc.makeRDD(List( ("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6) ),2) //实现wordCount的四种方式:(b,12)、(a,9) rdd.reduceByKey(_+_) rdd.aggregateByKey(0)(_+_, _+_) rdd.foldByKey(0)(_+_) rdd.combineByKey(v=>v, (v1: Int, v2) => v1+v2, (v1: Int, v2: Int)=> v1+v2) ``` #### join ```scala def join(other: RDD[(K, W)]): RDD[(K, (V, W))] ``` 说明:在类型为`(K,V)`和`(K,W)`的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的`(K,(V,W))`的 RDD ```scala val rdd1 = sc.makeRDD(List( ("a", 1), ("b", 2), ("c", 3) )) val rdd2 = sc.makeRDD(List( ("a", 4), ("a", 5), ("c", 6) )) val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2) joinRDD.collect().foreach(println) //(a,(1,5)) //(a,(1,4)) //(c,(3,6)) ``` 如果两个数据源中key没有匹配上,那么数据不会出现在结果中(内连接,取交集); 如果两个数据源中key有多个相同的,会逐个匹配,可能会出现**笛卡尔乘积**,且会发生shuffle,故不推荐使用。 #### leftOuterJoin ```scala def leftOuterJoin(other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] ``` 说明:类似于左外连接,保留主表的所有数据,从表数据会由Option封装。 ```scala val rdd1 = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3))) val rdd2 = sc.makeRDD(List(("a", 4), ("b", 5))) val leftRDD: RDD[(String, (Int, Option[Int]))] = rdd1.leftOuterJoin(rdd2) //(a,(1,Some(4))) //(b,(2,Some(5))) //(c,(3,None)) ``` 相应的,还有右外连接**rightOuterJoin**。 #### cogroup ```scala def cogroup(other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] ``` 说明:在类型为`(K,V)`和`(K,W)`的 RDD 上调用,返回一个`(K,(Iterable
,Iterable
))`类型的 RDD ```scala val rdd1 = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3))) val rdd2 = sc.makeRDD(List(("a", 4), ("b", 5),("c", 6),("c", 7))) val value: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2) // (a,(CompactBuffer(1, 2),CompactBuffer(4))) // (b,(CompactBuffer(3),CompactBuffer(5))) // (c,(CompactBuffer(),CompactBuffer(6, 7))) ``` 它的Join的区别在于:Join返回的是两侧RDD公共的Key,而cogroup可以返回仅一侧出现的Key,类似于**全外连接**。 join等连接操作的底层,使用的是cogroup实现,**Join内部机制**如图: ![05.Spark RDD转换算子17.png](https://lilinchao.com/usr/uploads/2021/06/1118227016.png) #### sortByKey ```scala def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)] ``` 说明:在一个(K,V)的 RDD 上调用,**K 必须实现 Ordered 特质**,返回一个按照 key 进行排序的(K, V) ```scala val dataRDD1 = sc.makeRDD(List(("a",3),("b",2),("c",1))) val sortRdd1: RDD[(String, Int)] = dataRDD1.sortByKey() // 按Key升序 (a,3),(b,2),(c,1) val sortRdd2: RDD[(String, Int)] = dataRDD1.sortByKey(false) // 按Key降序 (c,1),(b,2),(a,3) ``` ### 经典案例 数据准备: > agent.log [时间戳,省份,城市,用户,广告],中间字段使用空格分隔。 功能实现:统计出 **每一个省份 每个广告被点击数量排行的 Top3** 分析: ① **提取有效数据**:通过`map`只保留有效数据,比如省份,广告,减少数据传输量 ② **建立有效键Key**:省份与广告均为分组关键词,应将元组`(省份,广告)`作为Key ③ **归约**:` ( ( 省份,广告 ), 1 ) => ( ( 省份,广告 ), sum )` ④ **结构转换**:为了查询每一个省份的TOP,做转换` ( ( 省份,广告 ), sum ) => ( 省份, ( 广告, sum ) )` ⑤ **分组**:按照省份进行分组,每个省份对应多干个`( 广告, sum )` ⑥ **排序**:对`sum`降序排序,取前三 ![05.Spark RDD转换算子18.png](https://lilinchao.com/usr/uploads/2021/06/996595702.png) ```scala // 1. 获取原始数据:时间戳,省份,城市,用户,广告 val rdd: RDD[String] = sc.textFile("data/agent.log") // 2. 将原始数据进行结构的转换。方便统计 // 时间戳,省份,城市,用户,广告 // => // ( ( 省份,广告 ), 1 ) val mapRDD: RDD[((String, String), Int)] = rdd.map( line => { val words: Array[String] = line.split(" ") ((words(1), words(4)), 1) } ) // 3. 将转换结构后的数据,进行分组聚合 // ( ( 省份,广告 ), 1 ) => ( ( 省份,广告 ), sum ) val reduceRDD: RDD[((String, String), Int)] = mapRDD.reduceByKey(_ + _) // 4. 将聚合的结果进行结构的转换 // ( ( 省份,广告 ), sum ) => ( 省份, ( 广告, sum ) ) val mapRdd1: RDD[(String, (String, Int))] = reduceRDD.map { case ((prv, ad), sum) => { (prv, (ad, sum)) } /* case (tuple, cnt) => { (tuple._1, (tuple._2, cnt)) }*/ } // 5. 将转换结构后的数据根据省份进行分组 // ( 省份, 【( 广告A, sumA ),( 广告B, sumB )】 ) val groupRDD: RDD[(String, Iterable[(String, Int)])] = mapRdd1.groupByKey() // 6. 将分组后的数据组内排序(降序),取前3名 val resultRDD: RDD[(String, List[(String, Int)])] = groupRDD.mapValues( iter => { iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3) } ) // 7. 采集数据打印在控制台 resultRDD.collect().foreach(println) ``` *附:* 原文链接地址:https://juejin.cn/post/6957937654360965157#heading-35
标签:
Spark
,
Spark Core
,
Spark RDD
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1300.html
上一篇
04.Spark RDD创建简介
下一篇
06.【转载】Spark RDD行动算子
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
Spark RDD
稀疏数组
Docker
Golang
JVM
Beego
SpringBoot
ajax
nginx
Elastisearch
数据结构
MyBatis-Plus
Eclipse
前端
算法
设计模式
高并发
MySQL
锁
数学
Quartz
Kibana
Hbase
Spark
国产数据库改造
Java
Ubuntu
容器深入研究
序列化和反序列化
SQL练习题
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭