李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
Spark Core案例实操(九)
Leefs
2021-11-05 AM
1653℃
0条
### 一、需求 > 分析CDN日志统计出访问PV、UV、IP地址: > > + 计算独立ip数 > + 统计每个视频独立ip数 > + 统计一天中每个小时的流量(统计每天24小时中每个小时的流量) **说明** **PV(page view):** 页面浏览量,页面点击率;通常衡量一个网站或者新闻频道一条新闻的指标; **UV(unique visitor ):** 指访问某个站点或者点击某条新闻的不同的ip的人数 ### 二、数据说明 ``` 100.79.121.48 HIT 33 [15/Feb/2017:00:00:46 +0800] "GET http://cdn.v.abc.com.cn/videojs/video.js HTTP/1.1" 200 174055 "http://www.abc.com.cn/" "Mozilla/4.0+(compatible;+MSIE+6.0;+Windows+NT+5.1;+Trident/4.0;)" ``` *注:通过空格进行分隔* + 第一列:用户IP + 第二列:命中率(hit/miss) + 第三列:响应时间 + 第四列:请求时间 + 第五列:请求方法 + 第六列:请求URL + 第七列:请求协议 + 第八列:状态码 + 第九列:相应数据流量 + 第十列:refer(从哪个url跳转到当前url) + 第十一列:useragent ### 三、实现 #### 3.1 步骤 ``` 1. 计算独立ip数 - 从每行日志中筛选出ip - 对每个ip计数 - 去重ip,获取到独立的ip 2. 统计每个视频独立ip数 - 筛选视频文件,拆分(文件名,ip地址) - 按照文件名称分组:(文件名,[ip,ip,ip]) - 将每个文件名ip地址去重 3. 统计一天中每个小时的流量(统计每天24小时中每个小时的流量) - 将日志中的访问时间以及请求大小两个数据提取出来(访问时间:访问大小),去除非法的访问(404) - 按照访问时间分组:(访问时间,[流量,流量,。。。]) - 将访问时间的对应的流量叠加 ``` #### 3.2 代码实现 ```scala package com.llc.spark.core_demo import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import scala.util.matching.Regex /** * @author lilinchao * @date 2021/11/4 * @description 分析CDN日志统计出访问PV、UV、IP地址 **/ object CDNLogAnaluze { // 匹配ip规则: val IPPattern = "((?:(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d)))\\.){3}(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d))))".r // 匹配.mp4的视频规则 val VideoPattern = "([0-9]+).mp4".r // 匹配http响应码和请求大小 val httpSizePattern = ".*\\s(200|206|304)\\s([0-9]+)\\s.*".r // [15/Feb/2017:00:00:46 +0800] 匹配2017:00:00:46 val timePattern = ".*(2017):([0-9]{2}):[0-9]{2}:[0-9]{2}.*".r def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("CDNLogAnaluze").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) //加载数据 val input:RDD[String] = sc.textFile("datas/cdn.txt") //1. 独立IP数统计 // ipStatic(input) //2. 每个视频对应的IP数 // videoIpStatic(input) //3. 统计一天中每个小时的流量 flowOfHour(input) } /** * 统计独立IP数 */ def ipStatic(data: RDD[String]):Unit = { //1.统计独立IP val ipNums = data.map(line => (IPPattern.findFirstIn(line).get,1)) .reduceByKey(_+_) .sortBy(_._2,false) //2.获取到Top10访问次数 ipNums.take(10).foreach(println) //3.获取独立IP数 println("独立ip数:" + ipNums.count()) } /** * 统计视频对应的ip数 */ def videoIpStatic(data: RDD[String]):Unit = { //1. 获取到访问视频的行 def getFileNameAndIp(line:String):(String,String) = (VideoPattern.findFirstIn(line).mkString, IPPattern.findFirstIn(line).mkString) //2. 统计每个视频的独立ip数 val dataRDD = data .filter(_.matches(".*([0-9]+)\\.mp4.*")) .map(getFileNameAndIp) .groupByKey .map(file_ip => (file_ip._1, file_ip._2.toList.distinct)) .sortBy(_._2.size, false) //3. 输出结果 dataRDD.foreach(t => println("视频:" + t._1 + ", 独立ip数:" + t._2.size)) } /** * 统计一天中每个小时的流量 */ def flowOfHour(data: RDD[String]):Unit ={ /** * 使用str匹配pattern */ /** * 使用str匹配pattern */ def isMatch(pattern:Regex, str:String): Boolean = { str match { case pattern(_*) => true case _ => false } } /** * 获取日志中小时和http请求的大小的元组 */ def getTimeAndSize(line: String) = { var res = ("", 0L) try { val httpSizePattern(code, size) = line val timePattern(year, hour) = line res = (hour, size.toLong) }catch { case e : Exception => e.printStackTrace() } res } /** * 统计一天中每个小时的流量 */ val resultRDD = data.filter(isMatch(httpSizePattern, _)) .filter(isMatch(timePattern, _)) .map(getTimeAndSize(_)) .groupByKey() .map(hour_size => (hour_size._1, hour_size._2.sum)) .sortByKey(false, 1) resultRDD.foreach(hour_size => println(hour_size._1 + "时 CDN流量 = " + hour_size._2 / (1024 * 1024 * 1024) + "GB")) } } ``` **运行结果** ``` 23时 CDN流量 = 25GB 22时 CDN流量 = 42GB 21时 CDN流量 = 53GB 20时 CDN流量 = 55GB 19时 CDN流量 = 51GB 18时 CDN流量 = 45GB 17时 CDN流量 = 44GB 16时 CDN流量 = 45GB 15时 CDN流量 = 45GB 14时 CDN流量 = 55GB 13时 CDN流量 = 51GB 12时 CDN流量 = 46GB 11时 CDN流量 = 45GB 10时 CDN流量 = 61GB 09时 CDN流量 = 52GB 08时 CDN流量 = 43GB 07时 CDN流量 = 22GB 06时 CDN流量 = 11GB 05时 CDN流量 = 4GB 04时 CDN流量 = 3GB 03时 CDN流量 = 3GB 02时 CDN流量 = 5GB 01时 CDN流量 = 3GB 00时 CDN流量 = 14GB ``` ### 结尾 因为本篇使用的示例数据`cdn.txt`文件由于数据量较大将不在下方贴出。 直接在微信公众号【Java和大数据进阶】回复:**sparkdata**,即可获取。
标签:
Spark
,
Spark Core
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1611.html
上一篇
Spark Core案例实操(八)
下一篇
Spark Core案例实操(十)
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
数据结构
NIO
Java
nginx
持有对象
机器学习
Ubuntu
Hbase
字符串
Spring
前端
Python
BurpSuite
VUE
Kafka
MyBatis-Plus
Tomcat
MyBatis
RSA加解密
数据结构和算法
Java阻塞队列
Map
Flume
Spark Core
Filter
散列
高并发
线程池
哈希表
GET和POST
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭