李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
Spark Core案例实操(十)
Leefs
2021-11-06 AM
1579℃
0条
[TOC] ### 一、需求 > 影评分析: > > + 按照年份进行分组。计算每部电影的平均评分,平均评分保留小数点后一位,并按评分大小进行排序。 > > + 评分一样,按照电影名排序。相同年份的输出到一个文件中。 > > 结果展示形式(年份,电影id,电影名字,平均评分) > > 要求:尝试使用自定义分区、自定义排序和缓冲。 ### 二、数据说明 *说明:以`::`对数据进行分隔* > `movies.dat`电影列表文件 ![20.Spark Core案例实操(十)01.jpg](https://lilinchao.com/usr/uploads/2021/11/2272790913.jpg) + 第一列:电影ID + 第二列:电影名称 + 第三列:电影类型 > `ratings.dat`用户评分文件 ![20.Spark Core案例实操(十)02.jpg](https://lilinchao.com/usr/uploads/2021/11/2029257373.jpg) + 第一列:用户ID + 第二列:电影ID + 第三列:评分 + 第四列:评分时间戳 ### 三、实现代码 #### 3.1 思路 ``` 1. 先处理评分数据,计算出电影id、平均评分 2. 再处理电影数据,提取出电影id、电影年份、电影名 3. 将上两步计算出的数据根据电影id进行关联join 4. 提取出年份、电影id、电影名、平均评分封装到对象里,并将年份作为key、对象作为value进行分区 5. 自定义分区规则,将相同年份的放到一个分区中,实现相同年份的写入到一个结果文件中 6. 对象实现排序规则,对分区内的数据进行排序 ``` #### 3.2 代码 ```scala import org.apache.spark.rdd.RDD import org.apache.spark.{Partitioner, SparkConf, SparkContext} /** * @author lilinchao * @date 2021/11/6 * @description 影评分析(年份,电影id,电影名字,平均评分) **/ object FilmReviewAnalyze { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("FilmReviewAnalyze").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) //1.先读取电影ID和评分 val rdd2: RDD[String] = sc.textFile("datas/ratings.dat") //2.按指定分割符切分并过滤脏数据 val filterData = rdd2.map(_.split("::")).filter(_.length >= 4) //3.去掉不必要的字段,并按电影id分组,计算每部电影的平均分 val movieGroupRDD = filterData.map(arr =>{ (arr(1),arr(2).toDouble) }).groupBy(_._1) val moiveAvg = movieGroupRDD.mapValues(iter => { val num: Int = iter.size val sum = iter.map(_._2).sum (sum / num).formatted("%.1f") }) //4.截取电影id、电影名字、年份 val movies = sc.textFile("datas/movies.dat") val moviesFilterRDD = movies.map(_.split("::")).filter(_.length >= 3) val movieMapRDD = moviesFilterRDD.map(arr => { val movieId = arr(0) val movieName = arr(1) val year = arr(1).substring(arr(1).length - 5, arr(1).length - 1) (movieId, (year, movieName)) }) //5.将平均评分和电影数据进行join,将数据封装到对象中 val allData = movieMapRDD.join(moiveAvg).map(x =>{ val year = x._2._1._1 val movieId = x._1 val movieName = x._2._1._2 val avg = x._2._2.toFloat (year, MovieBean(year, movieId, movieName, avg)) }) val years: Array[String] = allData.map(_._1).distinct().collect() val resultRDD = allData.partitionBy(new MyPatitioner(years)) .mapPartitions(iter => { iter.toList.sortBy(_._2).toIterator }).map(t => t._2.year + "," + t._2.movieId + "," + t._2.movieName + "," + t._2.avg) // resultRDD.foreach(println) resultRDD.saveAsTextFile("output") sc.stop() } } //自定义分区器,通过年份进行分区 class MyPatitioner(years: Array[String]) extends Partitioner { override def numPartitions: Int = years.length override def getPartition(key: Any): Int = { val year = key.asInstanceOf[String] years.indexOf(year) } } //自定义对象封装数据,并实现对应的比较逻辑 case class MovieBean(year: String, movieId: String, movieName: String, avg: Float) extends Ordered[MovieBean] { override def compare(that: MovieBean): Int = { if (that.avg == this.avg) { if (that.movieName.compareTo(this.movieName) > 0) -1 else 1 } else { if (that.avg > this.avg) 1 else -1 } } } ``` ### 结尾 因为本篇使用的示例数据`ratings.dat`、`movies.dat`文件由于数据量较大将不在下方贴出。 直接在微信公众号【Java和大数据进阶】回复:**sparkdata**,即可获取。 *附参考文章链接地址:* *https://www.cnblogs.com/hong-bo/p/11730396.html*
标签:
Spark
,
Spark Core
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1614.html
上一篇
Spark Core案例实操(九)
下一篇
SparkCore之累加器
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
Zookeeper
JavaWeb
LeetCode刷题
高并发
MyBatis-Plus
查找
JavaScript
Git
SpringCloud
稀疏数组
数学
Spring
nginx
数据结构和算法
机器学习
散列
Java
Sentinel
并发编程
DataWarehouse
Python
Elasticsearch
Spark SQL
HDFS
FastDFS
Jquery
Spark Core
二叉树
NIO
人工智能
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭