李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
06.DStream输出
Leefs
2021-10-25 AM
851℃
0条
[TOC] ### 一、概念 输出操作允许`DStream`的操作推到如数据库、文件系统等外部系统中。因为输出操作实际上是允许外部系统消费转换后的数据,它们触发的实际操作是`DStream`转换。 与 `RDD` 中的惰性求值类似,如果一个 `DStream` 及其派生出的 `DStream` 都没有被执行输出操作,那么这些 `DStream` 就都不会被求值。如果 `StreamingContext` 中没有设定输出 操作,整个context就都不会启动。 ### 二、操作属性 | 属性 | 说明 | | :---------------------------------- | :----------------------------------------------------------- | | print() | 在运行流程序的驱动结点上打印 `DStream` 中每一批次数据的最开始 10 个元素。这用于开发和调试。在 `Python API` 中,同样的操作叫 print()。 | | saveAsObjectFiles(prefix, [suffix]) | 保存DStream的内容为一个序列化的文件`SequenceFile`。每一个批间隔的文件的文件名基于`prefix`和`suffix`生成。”prefix-TIME_IN_MS[.suffix]”,在Python API中不可用。 | | saveAsTextFiles(prefix, [suffix]) | 保存DStream的内容为一个文本文件。每一个批间隔的文件的文件名基于`prefix`和`suffix`生成。”prefix-TIME_IN_MS[.suffix]” | | saveAsHadoopFiles(prefix, [suffix]) | 保存DStream的内容为一个hadoop文件。每一个批间隔的文件的文件名基于`prefix`和`suffix`生成。”prefix-TIME_IN_MS[.suffix]”,在Python API中不可用。 | | foreachRDD(func) | 在从流中生成的每个RDD上应用函数`func`的最通用的输出操作。这个函数应该推送每个RDD的数据到外部系统,例如保存RDD到文件或者通过网络写到数据库中。需要注意的是,`func`函数在驱动程序中执行,并且通常都有RDD action在里面推动RDD流的计算。 | 通用的输出操作 `foreachRDD()`,它用来对 `DStream` 中的 RDD 运行任意计算。这和 transform() 有些类似,都可以让我们访问任意 RDD。 在 foreachRDD()中,可以重用我们在 Spark 中实现的所有行动操作。比如,常见的用例之一是把数据写到诸如 MySQL 的外部数据库中。 ### 三、利用foreachRDD的设计模式 `dstream.foreachRDD`是一个强大的原语,发送数据到外部系统中。然而,明白怎样正确地、有效地用这个原语是非常重要的。下面几点介绍了如何避免一般错误。 - 经常写数据到外部系统需要建一个连接对象(例如到远程服务器的TCP连接),用它发送数据到远程系统。为了达到这个目的,开发人员可能不经意的在Spark驱动中创建一个连接对象,但是在Spark worker中 尝试调用这个连接对象保存记录到RDD中,如下: ```scala dstream.foreachRDD(rdd =>{ val connection = createNewConnection()// executed at the driver rdd.foreach(record =>{ connection.send(record)// executed at the worker }) }) ``` 这是不正确的,因为这需要先序列化连接对象,然后将它从driver发送到worker中。这样的连接对象在机器之间不能传送。它可能表现为序列化错误(连接对象不可序列化)或者初始化错误(连接对象应该 在worker中初始化)等等。正确的解决办法是在worker中创建连接对象。 - 然而,这会造成另外一个常见的错误-为每一个记录创建了一个连接对象。例如: ```scala dstream.foreachRDD(rdd =>{ rdd.foreach(record =>{ val connection = createNewConnection() connection.send(record) connection.close() }) }) ``` 通常,创建一个连接对象有资源和时间的开支。因此,为每个记录创建和销毁连接对象会导致非常高的开支,明显的减少系统的整体吞吐量。一个更好的解决办法是利用`rdd.foreachPartition`方法。 为RDD的partition创建一个连接对象,用这个两件对象发送partition中的所有记录。 ```scala dstream.foreachRDD(rdd =>{ rdd.foreachPartition(partitionOfRecords =>{ val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) connection.close() }) }) ``` 这就将连接对象的创建开销分摊到了partition的所有记录上了。 - 最后,可以通过在多个RDD或者批数据间重用连接对象做更进一步的优化。开发者可以保有一个静态的连接对象池,重复使用池中的对象将多批次的RDD推送到外部系统,以进一步节省开支。 ```scala dstream.foreachRDD(rdd =>{ rdd.foreachPartition(partitionOfRecords =>{ // ConnectionPool is a static, lazily initialized pool of connections val connection =ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection)// return to the pool for future reuse }) }) ``` 需要注意的是,池中的连接对象应该根据需要延迟创建,并且在空闲一段时间后自动超时。这样就获取了最有效的方式发生数据到外部系统。 其它需要注意的地方: - 输出操作通过懒执行的方式操作DStreams,正如RDD action通过懒执行的方式操作RDD。具体地看,RDD actions和DStreams输出操作接收数据的处理。因此,如果你的应用程序没有任何输出操作或者 用于输出操作`dstream.foreachRDD()`,但是没有任何RDD action操作在`dstream.foreachRDD()`里面,那么什么也不会执行。系统仅仅会接收输入,然后丢弃它们。 - 默认情况下,DStreams输出操作是分时执行的,它们按照应用程序的定义顺序按序执行。 *附参考原文链接:* *https://www.bookstack.cn/read/Spark-ZH/spark-streaming-basic-concepts-output-operations-on-DStreams.md*
标签:
Spark
,
Spark Streaming
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1592.html
上一篇
05.DStream转换
下一篇
07.DStream优雅关闭
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
43
标签云
微服务
Python
散列
查找
Redis
Tomcat
Netty
JavaWEB项目搭建
Jenkins
Flink
Spark Core
JavaWeb
Scala
MyBatis
Typora
Golang基础
Elastisearch
Map
数据结构和算法
SpringBoot
高并发
设计模式
Livy
Java阻塞队列
Java编程思想
锁
NIO
MyBatis-Plus
Sentinel
并发线程
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞