SparkSQL案例实操(五)

Leefs 2021-11-14 PM 1434℃ 0条

一、需求

统计用户上网流量

  • 统计用户上网流量,如果两次上网的时间小于10min,就可以rollup(合并)到一起

二、数据准备

  • merge.dat文件
id start_time end_time flow
1 14:20:30 14:46:30 20
1 14:47:20 15:20:30 30
1 15:37:23 16:05:26 40
1 16:06:27 17:20:49 50
1 17:21:50 18:03:27 60
2 14:18:24 15:01:40 20
2 15:20:49 15:30:24 30
2 16:01:23 16:40:32 40
2 16:44:56 17:40:52 50
3 14:39:58 15:35:53 20
3 15:36:39 15:24:54 30
  • id:用户ID
  • start_time:上网开始时间
  • end_time:上网结束时间
  • flow:流量

三、功能实现

3.1 步骤

1.利用lag函数,把start_time和end_time的数据压到下一行
2.计算差值
3.打标记,如果小于10min0或者是为空 否则为1
4.以uid为分区,再次开窗,用sum计算和
5.以uid和flags分组,聚合得到结果

3.2 代码

import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  * @author lilinchao
  * @date 2021/11/14
  * @description 1.0 
  **/
object SparkSQLMerge {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").getOrCreate()
    import spark.implicits._

    val df: DataFrame = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .option("sep"," ")//指定分隔符进行切分
      .csv("data_sql/merge.dat")

    df.createOrReplaceTempView("flow_merge")

    //1.利用lag函数,把start_time和end_time的数据压到下一行
    spark.sql(
      """
        | select
        |   id,
        |   start_time,
        |   end_time,
        |   lag(start_time,1) over(PARTITION BY id ORDER BY start_time) lag_start_time, -- 把这行之前的数据拿到这一行
        |   lag(end_time) over(PARTITION BY id ORDER BY start_time) lag_end_time, -- -- 把这行之前的数据拿到这一行
        |  flow
        | from flow_merge
        | ORDER BY id,start_time ASC
      """.stripMargin
    ).createOrReplaceTempView("t1")
    //2.计算差值
    spark.sql(
      """
        | select
        |  id,
        |  start_time,
        |  end_time,
        |  lag_start_time,
        |  lag_end_time,
        |  (unix_timestamp(start_time,'HH:mm:ss') - unix_timestamp(lag_end_time,'HH:mm:ss') )  as seconds,
        |  flow
        | from t1
      """.stripMargin).createOrReplaceTempView("t2")
    //3.打标记,如果小于10min为0或者是为空 否则为1
    spark.sql(
      """
        | select
        |  id,
        |  start_time,
        |  end_time,
        |  case when seconds is null then 0
        |  when seconds <= 10 * 60 then 0
        |  else 1
        |  end flag,
        |  flow
        | from t2
      """.stripMargin).createOrReplaceTempView("t3")
    //4.以uid为分区,再次开窗,用sum计算和
    spark.sql(
      """
        | select
        |  id,
        |  start_time,
        |  end_time,
        |  sum(flag) over(partition by id order by start_time) as flags,
        |  flow
        | from t3
      """.stripMargin).createOrReplaceTempView("t4")
    //5.以uid和flags分组,聚合得到结果
    spark.sql(
      """
        | select
        |  id,
        |  min(start_time) start_time,
        |  max(end_time) end_time,
        |  sum(flow) flow
        | from t4
        | group by id,flags
        | order by id,start_time
      """.stripMargin).show()
  }
}

运行结果

+---+----------+--------+----+
| id|start_time|end_time|flow|
+---+----------+--------+----+
|  1|  14:20:30|15:20:30|  50|
|  1|  15:37:23|18:03:27| 150|
|  2|  14:18:24|15:01:40|  20|
|  2|  15:20:49|15:30:24|  30|
|  2|  16:01:23|17:40:52|  90|
|  3|  14:39:58|15:35:53|  50|
+---+----------+--------+----+
标签: Spark, Spark SQL

非特殊说明,本博所有文章均为博主原创。

评论已关闭