一、需求
统计用户上网流量
- 统计用户上网流量,如果两次上网的时间小于10min,就可以rollup(合并)到一起
二、数据准备
- merge.dat文件
id start_time end_time flow
1 14:20:30 14:46:30 20
1 14:47:20 15:20:30 30
1 15:37:23 16:05:26 40
1 16:06:27 17:20:49 50
1 17:21:50 18:03:27 60
2 14:18:24 15:01:40 20
2 15:20:49 15:30:24 30
2 16:01:23 16:40:32 40
2 16:44:56 17:40:52 50
3 14:39:58 15:35:53 20
3 15:36:39 15:24:54 30
- id:用户ID
- start_time:上网开始时间
- end_time:上网结束时间
- flow:流量
三、功能实现
3.1 步骤
1.利用lag函数,把start_time和end_time的数据压到下一行
2.计算差值
3.打标记,如果小于10min为0或者是为空 否则为1
4.以uid为分区,再次开窗,用sum计算和
5.以uid和flags分组,聚合得到结果
3.2 代码
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* @author lilinchao
* @date 2021/11/14
* @description 1.0
**/
object SparkSQLMerge {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").getOrCreate()
import spark.implicits._
val df: DataFrame = spark.read
.option("header", "true")
.option("inferSchema", "true")
.option("sep"," ")//指定分隔符进行切分
.csv("data_sql/merge.dat")
df.createOrReplaceTempView("flow_merge")
//1.利用lag函数,把start_time和end_time的数据压到下一行
spark.sql(
"""
| select
| id,
| start_time,
| end_time,
| lag(start_time,1) over(PARTITION BY id ORDER BY start_time) lag_start_time, -- 把这行之前的数据拿到这一行
| lag(end_time) over(PARTITION BY id ORDER BY start_time) lag_end_time, -- -- 把这行之前的数据拿到这一行
| flow
| from flow_merge
| ORDER BY id,start_time ASC
""".stripMargin
).createOrReplaceTempView("t1")
//2.计算差值
spark.sql(
"""
| select
| id,
| start_time,
| end_time,
| lag_start_time,
| lag_end_time,
| (unix_timestamp(start_time,'HH:mm:ss') - unix_timestamp(lag_end_time,'HH:mm:ss') ) as seconds,
| flow
| from t1
""".stripMargin).createOrReplaceTempView("t2")
//3.打标记,如果小于10min为0或者是为空 否则为1
spark.sql(
"""
| select
| id,
| start_time,
| end_time,
| case when seconds is null then 0
| when seconds <= 10 * 60 then 0
| else 1
| end flag,
| flow
| from t2
""".stripMargin).createOrReplaceTempView("t3")
//4.以uid为分区,再次开窗,用sum计算和
spark.sql(
"""
| select
| id,
| start_time,
| end_time,
| sum(flag) over(partition by id order by start_time) as flags,
| flow
| from t3
""".stripMargin).createOrReplaceTempView("t4")
//5.以uid和flags分组,聚合得到结果
spark.sql(
"""
| select
| id,
| min(start_time) start_time,
| max(end_time) end_time,
| sum(flow) flow
| from t4
| group by id,flags
| order by id,start_time
""".stripMargin).show()
}
}
运行结果
+---+----------+--------+----+
| id|start_time|end_time|flow|
+---+----------+--------+----+
| 1| 14:20:30|15:20:30| 50|
| 1| 15:37:23|18:03:27| 150|
| 2| 14:18:24|15:01:40| 20|
| 2| 15:20:49|15:30:24| 30|
| 2| 16:01:23|17:40:52| 90|
| 3| 14:39:58|15:35:53| 50|
+---+----------+--------+----+
评论已关闭