李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
Spark Core案例实操(二)
Leefs
2021-10-30 PM
921℃
0条
### 前言 本篇根据**Spark Core案例实操(一)**中需求继续对代码进行优化,减少shuffle,提高性能。 ### 五、实现方案三 在方案一和二中,reduceByKey算子使用过多,因为reduceByKey在进行聚合时也会存在shuffle,影响代码的整体性能。 #### 5.1 分析 + 在读取数据之后直接转换成如下结构: ``` 点击的场合 : ( 品类ID,( 1, 0, 0 ) ) 下单的场合 : ( 品类ID,( 0, 1, 0 ) ) 支付的场合 : ( 品类ID,( 0, 0, 1 ) ) ``` + 再将相同的品类ID的数据进行分组聚合 ``` ( 品类ID,( 点击数量, 下单数量, 支付数量 ) ) ``` 这样可以简化步骤,同时减少许多reduceByKey的使用。 #### 5.2 代码实现 ```scala import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * @author lilinchao * @date 2021/10/30 * @description Top10热门品类 **/ object HotCategoryTop10Analysis03 { def main(args: Array[String]): Unit = { val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis") val sc = new SparkContext(sparConf) // 1. 读取原始日志数据 val actionRDD = sc.textFile("datas/user_visit_action.txt") // 2. 将数据转换结构 val flatRDD: RDD[(String, (Int, Int, Int))] = actionRDD.flatMap( action => { val datas = action.split("_") if(datas(6) != "-1"){ //点击的场合 List((datas(6),(1,0,0))) }else if(datas(8) != "null") { //下单的场合 val ids = datas(8).split(",") ids.map(id => (id,(0,1,0))) }else if(datas(10) !="null"){ //支付的场合 val ids = datas(10).split(",") ids.map(id => (id,(0,0,1))) }else{ Nil } } ) // 3. 将相同的品类ID的数据进行分组聚合 val analysisRDD = flatRDD.reduceByKey( (t1, t2) => { ( t1._1+t2._1, t1._2 + t2._2, t1._3 + t2._3 ) } ) // 4. 将统计结果根据数量进行降序处理,取前10名 val resultRDD = analysisRDD.sortBy(_._2, false).take(10) // 5. 将结果采集到控制台打印出来 resultRDD.foreach(println) sc.stop() } } ``` **运行结果** ``` (15,(6120,1672,1259)) (2,(6119,1767,1196)) (20,(6098,1776,1244)) (12,(6095,1740,1218)) (11,(6093,1781,1202)) (17,(6079,1752,1231)) (7,(6074,1796,1252)) (9,(6045,1736,1230)) (19,(6044,1722,1158)) (13,(6036,1781,1161)) ``` ### 六、实现方案四 通过自定义累加器实现,该过程没有shuffle,性能高。 ```scala package com.llc.spark.core import org.apache.spark.util.AccumulatorV2 import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable /** * @author lilinchao * @date 2021/10/30 * @description Top10热门品类 **/ object HotCategoryTop10Analysis04 { def main(args: Array[String]): Unit = { val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis") val sc = new SparkContext(sparConf) // 1. 读取原始日志数据 val actionRDD = sc.textFile("datas/user_visit_action.txt") val acc = new HotCategoryAccumulator sc.register(acc, "hotCategory") // 2. 将数据转换结构 actionRDD.foreach( action => { val datas = action.split("_") if (datas(6) != "-1") { // 点击的场合 acc.add((datas(6), "click")) } else if (datas(8) != "null") { // 下单的场合 val ids = datas(8).split(",") ids.foreach( id => { acc.add( (id, "order") ) } ) } else if (datas(10) != "null") { // 支付的场合 val ids = datas(10).split(",") ids.foreach( id => { acc.add( (id, "pay") ) } ) } } ) val accVal: mutable.Map[String, HotCategory] = acc.value val categories: mutable.Iterable[HotCategory] = accVal.map(_._2) //3. 排序 val sort = categories.toList.sortWith( (left, right) => { if ( left.clickCnt > right.clickCnt ) { true } else if (left.clickCnt == right.clickCnt) { if ( left.orderCnt > right.orderCnt ) { true } else if (left.orderCnt == right.orderCnt) { left.payCnt > right.payCnt } else { false } } else { false } } ) // 4. 将结果采集到控制台打印出来 sort.take(10).foreach(println) sc.stop() } case class HotCategory(cid:String,var clickCnt:Int,var orderCnt:Int,var payCnt:Int) /** * 自定义累加器 * 1. 继承AccumulatorV2,定义泛型 * IN : ( 品类ID, 行为类型 ) * OUT : mutable.Map[String, HotCategory] * 2. 重写方法(6) */ class HotCategoryAccumulator extends AccumulatorV2[(String, String), mutable.Map[String, HotCategory]]{ private val hcMap = mutable.Map[String, HotCategory]() // 累加器是否为初始状态 override def isZero: Boolean = { hcMap.isEmpty } // 复制累加器 override def copy(): AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] = { new HotCategoryAccumulator() } // 重置累加器 override def reset(): Unit = { hcMap.clear() } // 向累加器中增加数据 (In) override def add(v: (String, String)): Unit = { val cid = v._1 val actionType = v._2 val category: HotCategory = hcMap.getOrElse(cid, HotCategory(cid, 0,0,0)) if(actionType == "click"){ category.clickCnt += 1 }else if(actionType == "order"){ category.orderCnt += 1 }else if(actionType == "pay"){ category.payCnt += 1 } hcMap.update(cid,category) } // 合并累加器 override def merge(other: AccumulatorV2[(String, String), mutable.Map[String, HotCategory]]): Unit = { val map1 = this.hcMap val map2 = other.value map2.foreach{ case ( cid, hc ) => { val category: HotCategory = map1.getOrElse(cid, HotCategory(cid, 0,0,0)) category.clickCnt += hc.clickCnt category.orderCnt += hc.orderCnt category.payCnt += hc.payCnt map1.update(cid, category) } } } // 返回累加器的结果 (Out) override def value: mutable.Map[String, HotCategory] = hcMap } } ``` 运行结果 ``` HotCategory(15,6120,1672,1259) HotCategory(2,6119,1767,1196) HotCategory(20,6098,1776,1244) HotCategory(12,6095,1740,1218) HotCategory(11,6093,1781,1202) HotCategory(17,6079,1752,1231) HotCategory(7,6074,1796,1252) HotCategory(9,6045,1736,1230) HotCategory(19,6044,1722,1158) HotCategory(13,6036,1781,1161) ``` **说明** 累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在 Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后, 传回 Driver 端进行 merge。 ### 结尾 因为本篇使用的示例数据`user_visit_action.txt`文件由于数据量较大将不在下方贴出。 直接在微信公众号【Java和大数据进阶】回复:**sparkdata**,即可获取。
标签:
Spark
,
Spark Core
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1596.html
上一篇
Spark Core案例实操(一)
下一篇
Spark Core案例实操(三)
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
JavaScript
SQL练习题
数学
MyBatis-Plus
Scala
SpringBoot
Spark Core
MyBatisX
Golang
Elasticsearch
Tomcat
人工智能
Java
栈
Spark SQL
Shiro
高并发
算法
FileBeat
Elastisearch
Typora
SpringCloudAlibaba
Hbase
JavaWEB项目搭建
数据结构和算法
Stream流
Spark Streaming
Map
Kafka
队列
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞