李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
SparkSQL案例实操(三)
Leefs
2021-11-13 PM
1515℃
0条
[TOC] ### 一、需求 > 统计连续登录三天及以上的用户 + 这个问题可以扩展到很多相似的问题:连续几个月充值会员、连续天数有商品卖出、连续打滴滴、连续逾期。 **示例** | uid | times | start_date | end_date | | ------ | ----- | ---------- | ---------- | | guid01 | 4 | 2018-03-04 | 2018-03-07 | | guid02 | 3 | 2018-03-01 | 2018-03-03 | ### 二、数据准备 + `v_user_login.csv` ```csv uid,datatime guid01,2018-02-28 guid01,2018-03-01 guid01,2018-03-02 guid01,2018-03-04 guid01,2018-03-05 guid01,2018-03-06 guid01,2018-03-07 guid02,2018-03-01 guid02,2018-03-02 guid02,2018-03-03 guid02,2018-03-06 ``` **说明** + uid:登录用户ID + datatime:登录时间 ### 三、SQL实现 #### 3.1 步骤 ``` 1.查询用户登录详情,并且根据访问时间顺序对每个用户访问次数进行累计 2.查询出连续登录日期 - 思考:如何能够确定是否是连续时间登录 - 寻找规律:登录累积次数减登录时间,如果相同则代码是连续时间登录 3. 对uid和dif进行分组,统计数量 ``` #### 3.2 代码 ```scala import org.apache.spark.sql.{DataFrame, SparkSession} /** * @author lilinchao * @date 2021/11/13 * @description 统计连续登录三天及以上的用户 **/ object UserContinueLoginSQL { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("UserContinueLoginSQL").master("local[*]").getOrCreate() import spark.implicits._ val df: DataFrame = spark.read .option("header", "true") .option("inferSchema", "true") .csv("data_sql/v_user_login.csv") df.createOrReplaceTempView("v_access_log") //1.查询用户登录详情,并且根据访问时间顺序对每个用户访问次数进行累计 //row_number()函数将针对SELECT语句返回的每一行,从1开始编号,赋予其连续的编号。 spark.sql( """ | select | uid, | datatime, | ROW_NUMBER() OVER(PARTITION BY uid ORDER BY datatime ASC) rn | from v_access_log """.stripMargin).createOrReplaceTempView("t1") //2.查询出连续登录日期 //思考:如何能够确定是否是连续时间登录 //寻找规律:登录累积次数减登录时间,如果相同则代码是连续时间登录 //DATE_SUB():日期相减 spark.sql( """ | select | uid, | datatime, | DATE_SUB(datatime,rn) dif | from t1 """.stripMargin).createOrReplaceTempView("t2") //3.对uid和dif进行分组,统计数量 spark.sql( """ | select | uid, | MIN(datatime) start_date, | MAX(datatime) end_date, | count(1) counts | from t2 | group by uid,dif HAVING counts >= 3 """.stripMargin).show() } } ``` **运行结果** ``` +------+----------+----------+------+ | uid|start_date| end_date|counts| +------+----------+----------+------+ |guid02|2018-03-01|2018-03-03| 3| |guid01|2018-02-28|2018-03-02| 3| |guid01|2018-03-04|2018-03-07| 4| +------+----------+----------+------+ ``` ### 四、RDD实现 ```scala import java.text.SimpleDateFormat import java.util.{Calendar, Date} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * @author lilinchao * @date 2021/11/13 * @description 统计连续登录三天及以上的用户 ==> RDD实现 **/ object UserContinueLoginRDD { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("UserContinueLoginRDD").setMaster("local[*]") val sc = new SparkContext(conf) //读取数据 val rdd: RDD[String] = sc.textFile("data_sql/v_user_login.csv") //过滤掉第一行表头 val header = rdd.first() val rdd1 = rdd.filter(row => row != header) //对数据进行处理 val uidAndDate: RDD[(String, String)] = rdd1.map(x => { val fis = x.split(",") val uid = fis(0) val date = fis(1) (uid, date) }) //根据uid进行分分组,将同一个用户的登录数据搞到同一个分组中 val grouped: RDD[(String, Iterable[String])] = uidAndDate.groupByKey() //在组内进行排序 val uidAndDateDif = grouped.flatMapValues(it => { //将迭代器中的数据toList/toSet,有可能会发生内存溢出 val sorted = it.toSet.toList.sorted //定义一个日期的工具类 val calendar = Calendar.getInstance() val sdf = new SimpleDateFormat("yyyy-MM-dd") var index = 0; sorted.map(desStr => { val date: Date = sdf.parse(desStr) calendar.setTime(date) calendar.add(Calendar.DATE, -index) index += 1 (desStr, sdf.format(calendar.getTime)) }) }) val result = uidAndDateDif.map(x => { ((x._1, x._2._2), x._2._1) }).groupByKey().mapValues(it=>{ val list = it.toList val times = list.size val startTime = list.head val endTime = list.last (times,startTime,endTime) }).map(t=>{ (t._1._1,t._2._1,t._2._2,t._2._3) }).filter(x=>{ x._2>=3 }) val buffer = result.collect().toBuffer println(buffer) } } ``` **运行结果** ``` ArrayBuffer((guid02,3,2018-03-01,2018-03-03), (guid01,4,2018-03-04,2018-03-07), (guid01,3,2018-02-28,2018-03-02)) ```
标签:
Spark
,
Spark SQL
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1622.html
上一篇
SparkSQL案例实操(二)
下一篇
SparkSQL案例实操(四)
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
容器深入研究
Spark Streaming
Spark Core
DataWarehouse
Linux
gorm
JVM
Java工具类
Shiro
人工智能
Git
Sentinel
高并发
Azkaban
ajax
NIO
HDFS
Filter
数学
微服务
Spring
栈
算法
JavaWEB项目搭建
MySQL
pytorch
JavaWeb
持有对象
Flink
国产数据库改造
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭