李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
03.DStream创建
Leefs
2021-10-20 PM
1593℃
0条
[TOC] ### 一、RDD队列 #### 1.1 用法及说明 测试过程中,可以通过使用 ssc.queueStream(queueOfRDDs)来创建 DStream,每一个推送到 这个队列中的 RDD,都会作为一个 DStream 处理。 ### 1.2 案例实操 ##### 需求 > 循环创建几个RDD,将 RDD放入队列。通过SparkStream创建Dstream,计算WordCount。 ##### 实现代码 ```scala import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable /** * @author lilinchao * @date 2021/8/13 * @description 1.0 **/ object RDDStream { def main(args: Array[String]) { //1.初始化 Spark 配置信息 val conf = new SparkConf().setMaster("local[*]").setAppName("RDDStream") //2.初始化 SparkStreamingContext val ssc = new StreamingContext(conf, Seconds(4)) //3.创建 RDD 队列 val rddQueue = new mutable.Queue[RDD[Int]]() //4.创建 QueueInputDStream val inputStream = ssc.queueStream(rddQueue,oneAtATime = false) //5.处理队列中的 RDD 数据 val mappedStream = inputStream.map((_,1)) val reducedStream = mappedStream.reduceByKey(_ + _) //6.打印结果 reducedStream.print() //7.启动任务 ssc.start() //8.循环创建并向 RDD 队列中放入 RDD for (i <- 1 to 5) { rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10) Thread.sleep(2000) } ssc.awaitTermination() } } ``` **运行结果** ![03.DStream创建01.jpg](https://lilinchao.com/usr/uploads/2021/10/1942316752.jpg) ### 二、自定义数据源 #### 2.1 用法及说明 首先要实现一个Receiver。自定义接收方必须通过实现两个方法扩展此抽象类Receiver - onStart(): 开始接收数据要做的事情 - onStop():停止接收数据要做的事情 不能无限期地阻塞 onStart ()和 onStop ()。通常,onStart ()将启动负责接收数据的线程,而 onStop ()将确保这些接收数据的线程停止。接收线程还可以使用接收方法 isStopped ()来检查它们是否应该停止接收数据。 一旦接收到数据,数据就可以通过调用 store (data)存储在 Spark 内部,这是由 Receiver 类提供的一种方法。Store ()有许多种类型,允许一次存储接收到的数据记录或作为对象/序列化字节的整个集合存储。 应该正确捕获和处理接收线程中的任何异常,以避免接收方的静默故障。 + Restart (< exception >)将通过异步调用 onStop ()并在延迟后调用 onStart ()来重新启动接收器。 + Stop (< exception >)将调用 onStop ()并终止接收器。 + reportError (< error >)会向驱动程序报告错误消息(在日志和 UI 中可见) ,而不会停止/重新启动接收器。 #### 2.2 案例实操 **需求** > 自定义数据源,实现监控某个端口号,获取该端口号内容。 **(1)自定义数据源** ```scala import java.io.{BufferedReader, InputStreamReader} import java.net.Socket import java.nio.charset.StandardCharsets import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver /** * @author lilinchao * @date 2021/8/13 * @description 1.0 **/ class CustomerReceiver (host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) { //最初启动的时候,调用该方法,作用为:读数据并将数据发送给 Spark override def onStart(): Unit = { new Thread("Socket Receiver") { override def run() { receive() } }.start() } //读数据并将数据发送给Spark def receive(): Unit = { //创建一个Socket var socket: Socket = new Socket(host, port) //定义一个变量,用来接收端口传过来的数据 var input: String = null //创建一个BufferedReader用于读取端口传来的数据 val reader = new BufferedReader(new InputStreamReader(socket.getInputStream,StandardCharsets.UTF_8)) //读取数据 input = reader.readLine() //当receiver没有关闭并且输入数据不为空,则循环发送数据给Spark while (!isStopped() && input != null) { store(input) input = reader.readLine() } //跳出循环则关闭资源 reader.close() socket.close() //重启任务 restart("restart") } override def onStop(): Unit = {} } ``` **(2)使用自定义的数据源采集数据** ```scala import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * @author lilinchao * @date 2021/8/13 * @description 1.0 **/ object FileStream { def main(args: Array[String]): Unit = { //1.初始化 Spark 配置信息 val sparkConf = new SparkConf().setMaster("local[*]") .setAppName("StreamWordCount") //2.初始化 SparkStreamingContext val ssc = new StreamingContext(sparkConf, Seconds(5)) //3.创建自定义 receiver 的 Streaming val lineStream = ssc.receiverStream(new CustomerReceiver("192.168.10.7", 9999)) //4.将每一行数据做切分,形成一个个单词 val wordStream = lineStream.flatMap(_.split("\t")) //5.将单词映射成元组(word,1) val wordAndOneStream = wordStream.map((_, 1)) //6.将相同的单词次数做统计 val wordAndCountStream = wordAndOneStream.reduceByKey(_ + _) //7.打印 wordAndCountStream.print() //8.启动 SparkStreamingContext ssc.start() sc.awaitTermination() } } ``` **运行结果** ![03.DStream创建02.jpg](https://lilinchao.com/usr/uploads/2021/10/321462242.jpg)
标签:
Spark
,
Spark Streaming
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1581.html
上一篇
02.DStream入门
下一篇
04.SparkStreaming之Kafka数据源
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
Eclipse
容器深入研究
JVM
并发编程
算法
序列化和反序列化
CentOS
BurpSuite
Spark RDD
Nacos
数据结构和算法
Scala
稀疏数组
SpringCloudAlibaba
人工智能
pytorch
MyBatis
VUE
国产数据库改造
JavaWeb
递归
MyBatisX
线程池
Java阻塞队列
ajax
JavaScript
栈
DataX
散列
MySQL
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭