李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
SparkSQL案例实操(三)
Leefs
2021-11-13 PM
953℃
0条
[TOC] ### 一、需求 > 统计连续登录三天及以上的用户 + 这个问题可以扩展到很多相似的问题:连续几个月充值会员、连续天数有商品卖出、连续打滴滴、连续逾期。 **示例** | uid | times | start_date | end_date | | ------ | ----- | ---------- | ---------- | | guid01 | 4 | 2018-03-04 | 2018-03-07 | | guid02 | 3 | 2018-03-01 | 2018-03-03 | ### 二、数据准备 + `v_user_login.csv` ```csv uid,datatime guid01,2018-02-28 guid01,2018-03-01 guid01,2018-03-02 guid01,2018-03-04 guid01,2018-03-05 guid01,2018-03-06 guid01,2018-03-07 guid02,2018-03-01 guid02,2018-03-02 guid02,2018-03-03 guid02,2018-03-06 ``` **说明** + uid:登录用户ID + datatime:登录时间 ### 三、SQL实现 #### 3.1 步骤 ``` 1.查询用户登录详情,并且根据访问时间顺序对每个用户访问次数进行累计 2.查询出连续登录日期 - 思考:如何能够确定是否是连续时间登录 - 寻找规律:登录累积次数减登录时间,如果相同则代码是连续时间登录 3. 对uid和dif进行分组,统计数量 ``` #### 3.2 代码 ```scala import org.apache.spark.sql.{DataFrame, SparkSession} /** * @author lilinchao * @date 2021/11/13 * @description 统计连续登录三天及以上的用户 **/ object UserContinueLoginSQL { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("UserContinueLoginSQL").master("local[*]").getOrCreate() import spark.implicits._ val df: DataFrame = spark.read .option("header", "true") .option("inferSchema", "true") .csv("data_sql/v_user_login.csv") df.createOrReplaceTempView("v_access_log") //1.查询用户登录详情,并且根据访问时间顺序对每个用户访问次数进行累计 //row_number()函数将针对SELECT语句返回的每一行,从1开始编号,赋予其连续的编号。 spark.sql( """ | select | uid, | datatime, | ROW_NUMBER() OVER(PARTITION BY uid ORDER BY datatime ASC) rn | from v_access_log """.stripMargin).createOrReplaceTempView("t1") //2.查询出连续登录日期 //思考:如何能够确定是否是连续时间登录 //寻找规律:登录累积次数减登录时间,如果相同则代码是连续时间登录 //DATE_SUB():日期相减 spark.sql( """ | select | uid, | datatime, | DATE_SUB(datatime,rn) dif | from t1 """.stripMargin).createOrReplaceTempView("t2") //3.对uid和dif进行分组,统计数量 spark.sql( """ | select | uid, | MIN(datatime) start_date, | MAX(datatime) end_date, | count(1) counts | from t2 | group by uid,dif HAVING counts >= 3 """.stripMargin).show() } } ``` **运行结果** ``` +------+----------+----------+------+ | uid|start_date| end_date|counts| +------+----------+----------+------+ |guid02|2018-03-01|2018-03-03| 3| |guid01|2018-02-28|2018-03-02| 3| |guid01|2018-03-04|2018-03-07| 4| +------+----------+----------+------+ ``` ### 四、RDD实现 ```scala import java.text.SimpleDateFormat import java.util.{Calendar, Date} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * @author lilinchao * @date 2021/11/13 * @description 统计连续登录三天及以上的用户 ==> RDD实现 **/ object UserContinueLoginRDD { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("UserContinueLoginRDD").setMaster("local[*]") val sc = new SparkContext(conf) //读取数据 val rdd: RDD[String] = sc.textFile("data_sql/v_user_login.csv") //过滤掉第一行表头 val header = rdd.first() val rdd1 = rdd.filter(row => row != header) //对数据进行处理 val uidAndDate: RDD[(String, String)] = rdd1.map(x => { val fis = x.split(",") val uid = fis(0) val date = fis(1) (uid, date) }) //根据uid进行分分组,将同一个用户的登录数据搞到同一个分组中 val grouped: RDD[(String, Iterable[String])] = uidAndDate.groupByKey() //在组内进行排序 val uidAndDateDif = grouped.flatMapValues(it => { //将迭代器中的数据toList/toSet,有可能会发生内存溢出 val sorted = it.toSet.toList.sorted //定义一个日期的工具类 val calendar = Calendar.getInstance() val sdf = new SimpleDateFormat("yyyy-MM-dd") var index = 0; sorted.map(desStr => { val date: Date = sdf.parse(desStr) calendar.setTime(date) calendar.add(Calendar.DATE, -index) index += 1 (desStr, sdf.format(calendar.getTime)) }) }) val result = uidAndDateDif.map(x => { ((x._1, x._2._2), x._2._1) }).groupByKey().mapValues(it=>{ val list = it.toList val times = list.size val startTime = list.head val endTime = list.last (times,startTime,endTime) }).map(t=>{ (t._1._1,t._2._1,t._2._2,t._2._3) }).filter(x=>{ x._2>=3 }) val buffer = result.collect().toBuffer println(buffer) } } ``` **运行结果** ``` ArrayBuffer((guid02,3,2018-03-01,2018-03-03), (guid01,4,2018-03-04,2018-03-07), (guid01,3,2018-02-28,2018-03-02)) ```
标签:
Spark
,
Spark SQL
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1622.html
上一篇
SparkSQL案例实操(二)
下一篇
SparkSQL案例实操(四)
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
DataX
HDFS
JavaScript
ajax
Jenkins
数学
国产数据库改造
Java
递归
nginx
Shiro
Hive
Flume
Ubuntu
MyBatisX
MySQL
散列
JavaSE
Hadoop
Jquery
队列
Elasticsearch
SpringBoot
Kafka
线程池
FileBeat
Java编程思想
CentOS
Spark
Zookeeper
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞