李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
SparkSQL案例实操(二)
Leefs
2021-11-13 PM
1478℃
0条
[TOC] ### 一、需求 #### 1.1 需求简介 > 各区域热门商品 Top3 > > + 这里的热门商品是从点击量的维度来看的,计算各个区域前三大热门商品 > + 备注上每个商品在主要城市中的分布比例 > + 超过两个城市用其他显示 **示例** | 地区 | 商品名称 | 点击次数 | 城市备注 | | ---- | -------- | -------- | ---------------------------------- | | 华北 | 商品 A | 100000 | 北京 21.2%,天津 13.2%,其他 65.6% | | 华北 | 商品 P | 80200 | 北京 63.0%,太原 10%,其他 27.0% | | 华北 | 商品 M | 40000 | 北京 63.0%,太原 10%,其他 27.0% | | 东北 | 商品 J | 92000 | 大连 28%,辽宁 17.0%,其他 55.0% | #### 1.2 需求分析 + 查询出来所有的点击记录,并与 city_info 表连接,得到每个城市所在的地区,与 Product_info 表连接得到产品名称; + 按照地区和商品 id 分组,统计出每个商品在每个地区的总点击次数 + 每个地区内按照点击次数降序排列 + 只取前三名 + 城市备注需要自定义 UDAF 函数 ### 二、样例类 + 用户访问动作表(`user_visit_action.txt`) ```scala case class UserVisitAction( date: String,//用户点击行为的日期 user_id: Option[Long],//用户的ID session_id: String,//Session的ID page_id: Option[Long],//某个页面的ID action_time: String,//动作的时间点 search_keyword: String,//用户搜索的关键词 click_category_id: Option[Long],//某一个商品品类的ID click_product_id: Option[Long],//某一个商品的ID order_category_ids: String,//一次订单中所有品类的ID集合 order_product_ids: String,//一次订单中所有商品的ID集合 pay_category_ids: String,//一次支付中所有品类的ID集合 pay_product_ids: String,//一次支付中所有商品的ID集合 city_id: Option[Long] //城市ID ) ``` + 商品信息表(`product_info.txt`) ```scala case class ProductInfo( product_id: Long, //商品ID product_name: String, //商品名称 extend_info: String //商品平台类型 ) ``` + 城市信息表(`city_info.txt`) ```scala case class CityInfo( city_id: Long, //城市ID city_name: String, //城市名称 area: String //城市所属区域 ) ``` ### 三、实现 #### 3.1 实现步骤 ``` 1. 连接三张表的数据,获取完整的数据(只有点击) 2. 将数据根据地区,商品名称分组 3. 统计商品点击次数总和,取 Top3 4. 实现自定义聚合函数显示备注 ``` #### 3.2 代码实现 ```scala package com.llc.spark.sql import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.expressions.Aggregator import scala.collection.mutable import scala.collection.mutable.ListBuffer /** * @author lilinchao * @date 2021/11/12 * @description 1.0 **/ object PopularProductsTopN { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("PopularProductsTopN").master("local[*]").getOrCreate() import spark.implicits._ val userDataSet: Dataset[UserVisitAction] = userToDataSet("data_sql/user_visit_action.txt",spark) val cityDataSet: Dataset[CityInfo] = cityToDataSet("data_sql/city_info.txt",spark) val productDataSet: Dataset[ProductInfo] = productToDataSet("data_sql/product_info.txt",spark) userDataSet.createOrReplaceTempView("user_visit_action") cityDataSet.createOrReplaceTempView("city_info") productDataSet.createOrReplaceTempView("product_info") //1.先关联三张表求出区域、城市名、商品名称 spark.sql( """ | select | a.*, | p.product_name, | c.area, | c.city_name | from user_visit_action a | join product_info p on a.click_product_id = p.product_id | join city_info c on a.city_id = c.city_id | where a.click_product_id > -1 """.stripMargin ).createOrReplaceTempView("t1") //根据区域,商品进行数据聚合 spark.udf.register("cityRemark", functions.udaf(new CityRemarkUDAF())) //2.根据区域和商品分组,统计每个区域的商品点击次数 spark.sql( """ | select | area, | product_name, | count(*) as clickCnt, | cityRemark(city_name) as city_remark | from t1 group by area,product_name """.stripMargin).createOrReplaceTempView("t2") //3.统计出每个区域的热门商品,取前三 spark.sql( """ | select | *, | rank() over( partition by area order by clickCnt desc ) as rank | from t2 """.stripMargin).createOrReplaceTempView("t3") // 取前3名 spark.sql( """ | select | * | from t3 where rank <= 3 """.stripMargin).show(false) spark.close() } /** * 用户访问记录 => 读取txt文件并转换成为DataSet * @param path * @param spark */ def userToDataSet(path: String, spark: SparkSession): Dataset[UserVisitAction] = { import spark.implicits._ val frame: DataFrame = spark.read .schema(Encoders.product[UserVisitAction].schema) // .option("inferSchema", "true") //是否自动推断数据类型 .option("header", "false") .option("delimiter", "\t") //分割符【\t】,默认【,】逗号 .csv(path) // .toDF("date","user_id","session_id","page_id","action_time","search_keyword","click_category_id","click_product_id","order_category_ids","order_product_ids","pay_category_ids","pay_product_ids","city_id") frame.as[UserVisitAction] } /** * 城市信息 => 读取txt文件并转换成为DataSet * @param path * @param spark * @return */ def cityToDataSet(path: String, spark: SparkSession): Dataset[CityInfo] = { //读取外部txt文件 val dataRDD: RDD[String] = spark.sparkContext.textFile(path) //导入隐士转换,否则RDD不能够调用toDF方法 import spark.implicits._ dataRDD.map { line => { val split = line.split("\\s+") CityInfo(split(0).toLong, split(1), split(2)) } }.toDS() } /** * 商品信息 => 读取txt文件并转换成为DataSet * @param path * @param spark */ def productToDataSet(path: String, spark: SparkSession): Dataset[ProductInfo] = { val dataRDD: RDD[String] = spark.sparkContext.textFile(path) //导入隐士转换,否则RDD不能够调用toDF方法 import spark.implicits._ dataRDD.map { line => { val split = line.split("\\s+") ProductInfo(split(0).toLong, split(1), split(2)) } }.toDS() } abstract class Base extends Serializable with Product //用户访问动作表 case class UserVisitAction( date: String,//用户点击行为的日期 user_id: Option[Long],//用户的ID session_id: String,//Session的ID page_id: Option[Long],//某个页面的ID action_time: String,//动作的时间点 search_keyword: String,//用户搜索的关键词 click_category_id: Option[Long],//某一个商品品类的ID click_product_id: Option[Long],//某一个商品的ID order_category_ids: String,//一次订单中所有品类的ID集合 order_product_ids: String,//一次订单中所有商品的ID集合 pay_category_ids: String,//一次支付中所有品类的ID集合 pay_product_ids: String,//一次支付中所有商品的ID集合 city_id: Option[Long] //城市ID ) extends Base //商品信息表 case class ProductInfo( product_id: Long, //商品ID product_name: String, //商品名称 extend_info: String //商品平台类型 ) extends Base //城市信息表 case class CityInfo( city_id: Long, //城市ID city_name: String, //城市名称 area: String //城市所属区域 ) extends Base case class Buffer( var total : Long, var cityMap:mutable.Map[String, Long] ) // 自定义聚合函数:实现城市备注功能 // 1. 继承Aggregator, 定义泛型 // IN : 城市名称 // BUF : Buffer =>【总点击数量,Map[(city, cnt), (city, cnt)]】 // OUT : 备注信息 // 2. 重写方法(6) class CityRemarkUDAF extends Aggregator[String, Buffer, String]{ // 缓冲区初始化 override def zero: Buffer = { Buffer(0, mutable.Map[String, Long]()) } // 更新缓冲区数据 override def reduce(buff: Buffer, city: String): Buffer = { buff.total += 1 val newCount = buff.cityMap.getOrElse(city, 0L) + 1 buff.cityMap.update(city, newCount) buff } // 合并缓冲区数据 override def merge(buff1: Buffer, buff2: Buffer): Buffer = { buff1.total += buff2.total val map1 = buff1.cityMap val map2 = buff2.cityMap // 两个Map的合并操作 // buff1.cityMap = map1.foldLeft(map2) { // case ( map, (city, cnt) ) => { // val newCount = map.getOrElse(city, 0L) + cnt // map.update(city, newCount) // map // } // } map2.foreach{ case (city , cnt) => { val newCount = map1.getOrElse(city, 0L) + cnt map1.update(city, newCount) } } buff1.cityMap = map1 buff1 } // 将统计的结果生成字符串信息 override def finish(buff: Buffer): String = { val remarkList = ListBuffer[String]() val totalcnt = buff.total val cityMap = buff.cityMap // 降序排列 val cityCntList = cityMap.toList.sortWith( (left, right) => { left._2 > right._2 } ).take(2) val hasMore = cityMap.size > 2 var rsum = 0L cityCntList.foreach{ case ( city, cnt ) => { val r = cnt * 100 / totalcnt remarkList.append(s"${city} ${r}%") rsum += r } } if ( hasMore ) { remarkList.append(s"其他 ${100 - rsum}%") } remarkList.mkString(", ") } override def bufferEncoder: Encoder[Buffer] = Encoders.product override def outputEncoder: Encoder[String] = Encoders.STRING } } ``` **运行结果** ``` +----+------------+--------+------------------------------+----+ |area|product_name|clickCnt|city_remark |rank| +----+------------+--------+------------------------------+----+ |华东|商品_86 |371 |上海 16%, 杭州 15%, 其他 69% |1 | |华东|商品_47 |366 |杭州 15%, 青岛 15%, 其他 70% |2 | |华东|商品_75 |366 |上海 17%, 无锡 15%, 其他 68% |2 | |西北|商品_15 |116 |西安 54%, 银川 45% |1 | |西北|商品_2 |114 |银川 53%, 西安 46% |2 | |西北|商品_22 |113 |西安 54%, 银川 45% |3 | |华南|商品_23 |224 |厦门 29%, 福州 24%, 其他 47% |1 | |华南|商品_65 |222 |深圳 27%, 厦门 26%, 其他 47% |2 | |华南|商品_50 |212 |福州 27%, 深圳 25%, 其他 48% |3 | |华北|商品_42 |264 |郑州 25%, 保定 25%, 其他 50% |1 | |华北|商品_99 |264 |北京 24%, 郑州 23%, 其他 53% |1 | |华北|商品_19 |260 |郑州 23%, 保定 20%, 其他 57% |3 | |东北|商品_41 |169 |哈尔滨 35%, 大连 34%, 其他 31%|1 | |东北|商品_91 |165 |哈尔滨 35%, 大连 32%, 其他 33%|2 | |东北|商品_58 |159 |沈阳 37%, 大连 32%, 其他 31% |3 | |东北|商品_93 |159 |哈尔滨 38%, 大连 37%, 其他 25%|3 | |华中|商品_62 |117 |武汉 51%, 长沙 48% |1 | |华中|商品_4 |113 |长沙 53%, 武汉 46% |2 | |华中|商品_57 |111 |武汉 54%, 长沙 45% |3 | |华中|商品_29 |111 |武汉 50%, 长沙 49% |3 | +----+------------+--------+------------------------------+----+ ``` *注意:本次使用的Scala 2.12版本、Spark 3.0.0版本,如果代码报红,请查看版本是否对应* ### 结尾 因为本篇使用的示例数据`user_visit_action.txt`、`city_info.txt`、`product_info.txt`文件由于数据量较大将不在下方贴出。 直接在微信公众号【Java和大数据进阶】回复:**sparkdata**,即可获取。
标签:
Spark
,
Spark SQL
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1621.html
上一篇
SparkSQL案例实操(一)
下一篇
SparkSQL案例实操(三)
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
Golang基础
Nacos
数据结构和算法
递归
并发线程
人工智能
SpringCloud
队列
JavaSE
Elasticsearch
并发编程
哈希表
设计模式
Spark SQL
Spring
排序
SpringCloudAlibaba
锁
Elastisearch
微服务
RSA加解密
SpringBoot
BurpSuite
nginx
散列
Jenkins
SQL练习题
Typora
Linux
Ubuntu
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭