李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
Spark Core案例实操(八)
Leefs
2021-11-04 PM
1555℃
0条
[TOC] ### 一、需求 > 基站停留时间TOPN: > > + 根据用户产生的日志信息,分析在哪个基站停留的时间最长 > + 在一定范围内,求所有用户经过的所有基站所停留时间最长的TOP2 ### 二、数据说明 > `19735E1C66.log`:存储的日志信息 ![18.Spark Core案例实操(八)01.jpg](https://lilinchao.com/usr/uploads/2021/11/2951211537.jpg) + 第一列:手机号码 + 第二列:时间戳 + 第三列:基站ID + 第四列:连接状态(1连接,0断开) > `lac_info.txt:`存储基站信息 ![18.Spark Core案例实操(八)02.jpg](https://lilinchao.com/usr/uploads/2021/11/2328181239.jpg) + 第一列:基站id + 第二列:经度 + 第三列:纬度 ### 三、实现 #### 3.1 实现步骤 ``` 1. 获取用户产生的日志信息并切分 2. 用户在基站停留的总时长 3. 获取基站的基本信息 4. 把经纬度的信息join到用户数据中 5. 求除用户在某些基站停留时间的top2 ``` #### 3.2 代码实现 ```scala package com.llc.spark.core_demo import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * @author lilinchao * @date 2021/11/4 * @description 基站停留时间TOPN: * 根据用户产生的日志信息,分析在哪个基站停留的时间最长 * 在一定范围内,求所有用户经过的所有基站所停留时间最长的TOP2 **/ object BaseStationAnalyze { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("BaseStationAnalyze").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) //1.获取用户访问基站的日志 val filesRDD:RDD[String] = sc.textFile("datas/19735E1C66.log") //2.切分日志((phone,lac),time) val userInfoRDD:RDD[((String,String),Long)] = filesRDD.map(line => { val fields:Array[String] = line.split(",") val phone = fields(0) val time = fields(1).toLong val lac = fields(2) val eventType = fields(3) val time_long = if (eventType.equals("1")) -time else time ((phone,lac),time_long) }) //3.聚合 val sumeRDD:RDD[((String,String),Long)] = userInfoRDD.reduceByKey(_ + _) //4.转换数据格式,方进行join操作 val lacAndPT:RDD[((String,(String,Long)))] = sumeRDD.map(tup => { val phone = tup._1._1 val lac = tup._1._2 val time = tup._2 (lac,(phone,time)) }) //5.加载基站信息 val lacInfo = sc.textFile("datas/lac_info.txt") //6.切分基站信息 val lacAndXY:RDD[(String, (String, String))] = lacInfo.map(line =>{ val fields = line.split(",") val lac = fields(0) val x = fields(1) val y = fields(2) (lac,(x,y)) }) //7.连接操作 val joinedRDD:RDD[(String, ((String,Long),(String, String)))] = lacAndPT.join(lacAndXY) //8.对数据进行整合 val phoneAndTXY:RDD[(String, Long, (String, String))] = joinedRDD.map(tup => { val phone = tup._2._1._1 val time = tup._2._1._2 val xy = tup._2._2 (phone, time, xy) }) //9.按照手机进行分组 val groupedRDD = phoneAndTXY.groupBy(_._1) //10.排序 val sorted = groupedRDD.mapValues(_.toList.sortBy(_._2).reverse) //11. 整合 val res = sorted.map(tup => { val phone = tup._1 val list = tup._2 val filterlist = list.map(tup1 => { val time = tup1._2 // 时长 val xy = tup1._3 (time, xy) }) (phone, filterlist) }) val ress = res.mapValues(_.take(2)) println(ress.collect().toList) } } ``` **运行结果** ``` List((18101056888,List((97500,(116.296302,40.032296)), (54000,(116.304864,40.050645)))), (18688888888,List((87600,(116.296302,40.032296)), (51200,(116.304864,40.050645))))) ``` ### 结尾 因为本篇使用的示例数据`19735E1C66.log`、`lac_info.txt`文件由于数据量较大将不在下方贴出。 直接在微信公众号【Java和大数据进阶】回复:**sparkdata**,即可获取。
标签:
Spark
,
Spark Core
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1610.html
上一篇
Spark Core案例实操(七)
下一篇
Spark Core案例实操(九)
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
MySQL
Spark Streaming
SQL练习题
Map
散列
Livy
排序
线程池
Elasticsearch
队列
递归
DataX
Filter
ClickHouse
Java工具类
人工智能
Http
LeetCode刷题
nginx
数据结构和算法
Shiro
Sentinel
容器深入研究
BurpSuite
Spark RDD
设计模式
CentOS
Hive
JavaSE
SpringBoot
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭