李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
Spark Core案例实操(十)
Leefs
2021-11-06 AM
2782℃
0条
[TOC] ### 一、需求 > 影评分析: > > + 按照年份进行分组。计算每部电影的平均评分,平均评分保留小数点后一位,并按评分大小进行排序。 > > + 评分一样,按照电影名排序。相同年份的输出到一个文件中。 > > 结果展示形式(年份,电影id,电影名字,平均评分) > > 要求:尝试使用自定义分区、自定义排序和缓冲。 ### 二、数据说明 *说明:以`::`对数据进行分隔* > `movies.dat`电影列表文件  + 第一列:电影ID + 第二列:电影名称 + 第三列:电影类型 > `ratings.dat`用户评分文件  + 第一列:用户ID + 第二列:电影ID + 第三列:评分 + 第四列:评分时间戳 ### 三、实现代码 #### 3.1 思路 ``` 1. 先处理评分数据,计算出电影id、平均评分 2. 再处理电影数据,提取出电影id、电影年份、电影名 3. 将上两步计算出的数据根据电影id进行关联join 4. 提取出年份、电影id、电影名、平均评分封装到对象里,并将年份作为key、对象作为value进行分区 5. 自定义分区规则,将相同年份的放到一个分区中,实现相同年份的写入到一个结果文件中 6. 对象实现排序规则,对分区内的数据进行排序 ``` #### 3.2 代码 ```scala import org.apache.spark.rdd.RDD import org.apache.spark.{Partitioner, SparkConf, SparkContext} /** * @author lilinchao * @date 2021/11/6 * @description 影评分析(年份,电影id,电影名字,平均评分) **/ object FilmReviewAnalyze { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("FilmReviewAnalyze").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) //1.先读取电影ID和评分 val rdd2: RDD[String] = sc.textFile("datas/ratings.dat") //2.按指定分割符切分并过滤脏数据 val filterData = rdd2.map(_.split("::")).filter(_.length >= 4) //3.去掉不必要的字段,并按电影id分组,计算每部电影的平均分 val movieGroupRDD = filterData.map(arr =>{ (arr(1),arr(2).toDouble) }).groupBy(_._1) val moiveAvg = movieGroupRDD.mapValues(iter => { val num: Int = iter.size val sum = iter.map(_._2).sum (sum / num).formatted("%.1f") }) //4.截取电影id、电影名字、年份 val movies = sc.textFile("datas/movies.dat") val moviesFilterRDD = movies.map(_.split("::")).filter(_.length >= 3) val movieMapRDD = moviesFilterRDD.map(arr => { val movieId = arr(0) val movieName = arr(1) val year = arr(1).substring(arr(1).length - 5, arr(1).length - 1) (movieId, (year, movieName)) }) //5.将平均评分和电影数据进行join,将数据封装到对象中 val allData = movieMapRDD.join(moiveAvg).map(x =>{ val year = x._2._1._1 val movieId = x._1 val movieName = x._2._1._2 val avg = x._2._2.toFloat (year, MovieBean(year, movieId, movieName, avg)) }) val years: Array[String] = allData.map(_._1).distinct().collect() val resultRDD = allData.partitionBy(new MyPatitioner(years)) .mapPartitions(iter => { iter.toList.sortBy(_._2).toIterator }).map(t => t._2.year + "," + t._2.movieId + "," + t._2.movieName + "," + t._2.avg) // resultRDD.foreach(println) resultRDD.saveAsTextFile("output") sc.stop() } } //自定义分区器,通过年份进行分区 class MyPatitioner(years: Array[String]) extends Partitioner { override def numPartitions: Int = years.length override def getPartition(key: Any): Int = { val year = key.asInstanceOf[String] years.indexOf(year) } } //自定义对象封装数据,并实现对应的比较逻辑 case class MovieBean(year: String, movieId: String, movieName: String, avg: Float) extends Ordered[MovieBean] { override def compare(that: MovieBean): Int = { if (that.avg == this.avg) { if (that.movieName.compareTo(this.movieName) > 0) -1 else 1 } else { if (that.avg > this.avg) 1 else -1 } } } ``` ### 结尾 因为本篇使用的示例数据`ratings.dat`、`movies.dat`文件由于数据量较大将不在下方贴出。 直接在微信公众号【Java和大数据进阶】回复:**sparkdata**,即可获取。 *附参考文章链接地址:* *https://www.cnblogs.com/hong-bo/p/11730396.html*
标签:
Spark
,
Spark Core
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1614.html
上一篇
Spark Core案例实操(九)
下一篇
SparkCore之累加器
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
35
其它
25
GO
48
NLP
8
标签云
Spark
Linux
算法
高并发
MyBatis-Plus
Hive
ClickHouse
栈
持有对象
数据结构和算法
pytorch
JavaScript
Flume
Elastisearch
Kibana
容器深入研究
NIO
SpringCloud
Hbase
Zookeeper
字符串
序列化和反序列化
排序
Shiro
Flink
Golang
Spark Core
BurpSuite
Yarn
Java工具类
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞