李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
SparkSQL案例实操(四)
Leefs
2021-11-14 AM
1337℃
0条
[TOC] ### 一、需求 > 统计有过连续3天以上销售的店铺,并计算销售额 **结果示例** ``` +-----+----------+----------+-----+-----------+ | sid|begin_date| end_date|times|total_sales| +-----+----------+----------+-----+-----------+ |shop1|2019-02-10|2019-02-13| 4| 1900| +-----+----------+----------+-----+-----------+ ``` ### 二、数据准备 + order.csv ``` sid,datatime,money shop1,2019-01-18,500 shop1,2019-02-10,500 shop1,2019-02-10,200 shop1,2019-02-11,600 shop1,2019-02-12,400 shop1,2019-02-13,200 shop1,2019-02-15,100 shop1,2019-03-05,180 shop1,2019-04-05,280 shop1,2019-04-06,220 shop2,2019-02-10,100 shop2,2019-02-11,100 shop2,2019-02-13,100 ``` + sid:店铺ID + datatime:订单创建日期 + money:订单金额 ### 三、SQL实现 #### 3.1 步骤 ``` 1.将每天的金额求和(同一天可能会有多个订单) 2.给每个商家中每日的订单按时间排序并打上编号 3.获取date与rn的差值的字段 4.获得最终结果 ``` #### 3.2 代码 ```scala import org.apache.spark.sql.{DataFrame, SparkSession} /** * @author lilinchao * @date 2021/11/13 * @description 统计有过连续3天以上销售的店铺,并计算销售额 **/ object RollupMthIncomeSQL { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").getOrCreate() import spark.implicits._ val df: DataFrame = spark.read .option("header", "true") .option("inferSchema", "true") .csv("data_sql/order.csv") df.createOrReplaceTempView("v_orders") //1.将每天的金额求和(同一天可能会有多个订单) spark.sql( """ | select | sid, | datatime, | SUM(money) day_money | from v_orders | group by sid,datatime """.stripMargin).createOrReplaceTempView("t1") //2.给每个商家中每日的订单按时间排序并打上编号 spark.sql( """ | select | sid, | datatime, | day_money, | ROW_NUMBER() OVER(PARTITION BY sid ORDER BY datatime) rn | from t1 """.stripMargin).createOrReplaceTempView("t2") //3.获取date与rn的差值的字段 spark.sql( """ | select | sid, | datatime, | day_money, | DATE_SUB(datatime,rn) diff | from t2 """.stripMargin).createOrReplaceTempView("t3") //4.获得最终结果 spark.sql( """ | select | sid, | MIN(datatime) begin_date, | MAX(datatime) end_date, | COUNT(*) times, | SUM(day_money) total_sales | from t3 | GROUP BY sid,diff | HAVING times >=3 """.stripMargin).show() } } ``` **运行结果** ``` +-----+----------+----------+-----+-----------+ | sid|begin_date| end_date|times|total_sales| +-----+----------+----------+-----+-----------+ |shop1|2019-02-10|2019-02-13| 4| 1900| +-----+----------+----------+-----+-----------+ ``` ### 四、RDD编码实现 ```scala import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * @author lilinchao * @date 2021/11/14 * @description 1.0 **/ object RollupMthIncomeRDD { def main(args: Array[String]): Unit = { // 创建SparkContext val conf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) val linesRDD: RDD[String] = sc.textFile("data_sql/order.csv") //过滤掉第一行表头 val header = linesRDD.first() val lines = linesRDD.filter(row => row != header) val reduced: RDD[((String, String), Double)] = lines.map(line => { val fields: Array[String] = line.split(",") val sid: String = fields(0) val dateStr: String = fields(1) val month: String = dateStr.substring(0, 7) val money: Double = fields(2).toDouble ((sid, month), money) }).reduceByKey(_ + _) // 按照shop id分组 val result: RDD[(String, String, String, Double)] = reduced.groupBy(_._1._1).mapValues(it => { //将迭代器中的数据toList放入到内存 //并且按照月份排序【字典顺序】 val sorted: List[((String, String), Double)] = it.toList.sortBy(_._1._2) var rollup = 0.0 sorted.map(t => { val sid = t._1._1 val month = t._1._2 val month_sales = t._2 rollup += month_sales (sid, month, rollup) }) }).flatMapValues(lst => lst).map(t => (t._1, t._2._1, t._2._2, t._2._3)) result.foreach(println) sc.stop() } } ``` **运行结果** ``` (shop1,shop1,2019-01,500.0) (shop1,shop1,2019-02,2500.0) (shop1,shop1,2019-03,2680.0) (shop1,shop1,2019-04,3180.0) (shop2,shop2,2019-02,300.0) ```
标签:
Spark
,
Spark SQL
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1623.html
上一篇
SparkSQL案例实操(三)
下一篇
SparkSQL案例实操(五)
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
并发线程
DataX
MySQL
gorm
Redis
Kafka
序列化和反序列化
Spark SQL
Quartz
Stream流
VUE
链表
BurpSuite
Kibana
Spark RDD
线程池
Spark
查找
Http
二叉树
Thymeleaf
国产数据库改造
FastDFS
Sentinel
Linux
前端
Nacos
Yarn
Hadoop
Elastisearch
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭