李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
Spark Core案例实操(八)
Leefs
2021-11-04 PM
972℃
0条
[TOC] ### 一、需求 > 基站停留时间TOPN: > > + 根据用户产生的日志信息,分析在哪个基站停留的时间最长 > + 在一定范围内,求所有用户经过的所有基站所停留时间最长的TOP2 ### 二、数据说明 > `19735E1C66.log`:存储的日志信息 ![18.Spark Core案例实操(八)01.jpg](https://lilinchao.com/usr/uploads/2021/11/2951211537.jpg) + 第一列:手机号码 + 第二列:时间戳 + 第三列:基站ID + 第四列:连接状态(1连接,0断开) > `lac_info.txt:`存储基站信息 ![18.Spark Core案例实操(八)02.jpg](https://lilinchao.com/usr/uploads/2021/11/2328181239.jpg) + 第一列:基站id + 第二列:经度 + 第三列:纬度 ### 三、实现 #### 3.1 实现步骤 ``` 1. 获取用户产生的日志信息并切分 2. 用户在基站停留的总时长 3. 获取基站的基本信息 4. 把经纬度的信息join到用户数据中 5. 求除用户在某些基站停留时间的top2 ``` #### 3.2 代码实现 ```scala package com.llc.spark.core_demo import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * @author lilinchao * @date 2021/11/4 * @description 基站停留时间TOPN: * 根据用户产生的日志信息,分析在哪个基站停留的时间最长 * 在一定范围内,求所有用户经过的所有基站所停留时间最长的TOP2 **/ object BaseStationAnalyze { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("BaseStationAnalyze").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) //1.获取用户访问基站的日志 val filesRDD:RDD[String] = sc.textFile("datas/19735E1C66.log") //2.切分日志((phone,lac),time) val userInfoRDD:RDD[((String,String),Long)] = filesRDD.map(line => { val fields:Array[String] = line.split(",") val phone = fields(0) val time = fields(1).toLong val lac = fields(2) val eventType = fields(3) val time_long = if (eventType.equals("1")) -time else time ((phone,lac),time_long) }) //3.聚合 val sumeRDD:RDD[((String,String),Long)] = userInfoRDD.reduceByKey(_ + _) //4.转换数据格式,方进行join操作 val lacAndPT:RDD[((String,(String,Long)))] = sumeRDD.map(tup => { val phone = tup._1._1 val lac = tup._1._2 val time = tup._2 (lac,(phone,time)) }) //5.加载基站信息 val lacInfo = sc.textFile("datas/lac_info.txt") //6.切分基站信息 val lacAndXY:RDD[(String, (String, String))] = lacInfo.map(line =>{ val fields = line.split(",") val lac = fields(0) val x = fields(1) val y = fields(2) (lac,(x,y)) }) //7.连接操作 val joinedRDD:RDD[(String, ((String,Long),(String, String)))] = lacAndPT.join(lacAndXY) //8.对数据进行整合 val phoneAndTXY:RDD[(String, Long, (String, String))] = joinedRDD.map(tup => { val phone = tup._2._1._1 val time = tup._2._1._2 val xy = tup._2._2 (phone, time, xy) }) //9.按照手机进行分组 val groupedRDD = phoneAndTXY.groupBy(_._1) //10.排序 val sorted = groupedRDD.mapValues(_.toList.sortBy(_._2).reverse) //11. 整合 val res = sorted.map(tup => { val phone = tup._1 val list = tup._2 val filterlist = list.map(tup1 => { val time = tup1._2 // 时长 val xy = tup1._3 (time, xy) }) (phone, filterlist) }) val ress = res.mapValues(_.take(2)) println(ress.collect().toList) } } ``` **运行结果** ``` List((18101056888,List((97500,(116.296302,40.032296)), (54000,(116.304864,40.050645)))), (18688888888,List((87600,(116.296302,40.032296)), (51200,(116.304864,40.050645))))) ``` ### 结尾 因为本篇使用的示例数据`19735E1C66.log`、`lac_info.txt`文件由于数据量较大将不在下方贴出。 直接在微信公众号【Java和大数据进阶】回复:**sparkdata**,即可获取。
标签:
Spark
,
Spark Core
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1610.html
上一篇
Spark Core案例实操(七)
下一篇
Spark Core案例实操(九)
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
Hadoop
GET和POST
前端
HDFS
VUE
Tomcat
Stream流
微服务
并发编程
Jenkins
RSA加解密
Flume
ClickHouse
Golang
FastDFS
Python
SpringCloudAlibaba
栈
Linux
SQL练习题
Spark SQL
ajax
MyBatis
锁
JVM
容器深入研究
Scala
查找
MyBatisX
NIO
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞