李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
06.DStream输出
Leefs
2021-10-25 AM
1516℃
0条
[TOC] ### 一、概念 输出操作允许`DStream`的操作推到如数据库、文件系统等外部系统中。因为输出操作实际上是允许外部系统消费转换后的数据,它们触发的实际操作是`DStream`转换。 与 `RDD` 中的惰性求值类似,如果一个 `DStream` 及其派生出的 `DStream` 都没有被执行输出操作,那么这些 `DStream` 就都不会被求值。如果 `StreamingContext` 中没有设定输出 操作,整个context就都不会启动。 ### 二、操作属性 | 属性 | 说明 | | :---------------------------------- | :----------------------------------------------------------- | | print() | 在运行流程序的驱动结点上打印 `DStream` 中每一批次数据的最开始 10 个元素。这用于开发和调试。在 `Python API` 中,同样的操作叫 print()。 | | saveAsObjectFiles(prefix, [suffix]) | 保存DStream的内容为一个序列化的文件`SequenceFile`。每一个批间隔的文件的文件名基于`prefix`和`suffix`生成。”prefix-TIME_IN_MS[.suffix]”,在Python API中不可用。 | | saveAsTextFiles(prefix, [suffix]) | 保存DStream的内容为一个文本文件。每一个批间隔的文件的文件名基于`prefix`和`suffix`生成。”prefix-TIME_IN_MS[.suffix]” | | saveAsHadoopFiles(prefix, [suffix]) | 保存DStream的内容为一个hadoop文件。每一个批间隔的文件的文件名基于`prefix`和`suffix`生成。”prefix-TIME_IN_MS[.suffix]”,在Python API中不可用。 | | foreachRDD(func) | 在从流中生成的每个RDD上应用函数`func`的最通用的输出操作。这个函数应该推送每个RDD的数据到外部系统,例如保存RDD到文件或者通过网络写到数据库中。需要注意的是,`func`函数在驱动程序中执行,并且通常都有RDD action在里面推动RDD流的计算。 | 通用的输出操作 `foreachRDD()`,它用来对 `DStream` 中的 RDD 运行任意计算。这和 transform() 有些类似,都可以让我们访问任意 RDD。 在 foreachRDD()中,可以重用我们在 Spark 中实现的所有行动操作。比如,常见的用例之一是把数据写到诸如 MySQL 的外部数据库中。 ### 三、利用foreachRDD的设计模式 `dstream.foreachRDD`是一个强大的原语,发送数据到外部系统中。然而,明白怎样正确地、有效地用这个原语是非常重要的。下面几点介绍了如何避免一般错误。 - 经常写数据到外部系统需要建一个连接对象(例如到远程服务器的TCP连接),用它发送数据到远程系统。为了达到这个目的,开发人员可能不经意的在Spark驱动中创建一个连接对象,但是在Spark worker中 尝试调用这个连接对象保存记录到RDD中,如下: ```scala dstream.foreachRDD(rdd =>{ val connection = createNewConnection()// executed at the driver rdd.foreach(record =>{ connection.send(record)// executed at the worker }) }) ``` 这是不正确的,因为这需要先序列化连接对象,然后将它从driver发送到worker中。这样的连接对象在机器之间不能传送。它可能表现为序列化错误(连接对象不可序列化)或者初始化错误(连接对象应该 在worker中初始化)等等。正确的解决办法是在worker中创建连接对象。 - 然而,这会造成另外一个常见的错误-为每一个记录创建了一个连接对象。例如: ```scala dstream.foreachRDD(rdd =>{ rdd.foreach(record =>{ val connection = createNewConnection() connection.send(record) connection.close() }) }) ``` 通常,创建一个连接对象有资源和时间的开支。因此,为每个记录创建和销毁连接对象会导致非常高的开支,明显的减少系统的整体吞吐量。一个更好的解决办法是利用`rdd.foreachPartition`方法。 为RDD的partition创建一个连接对象,用这个两件对象发送partition中的所有记录。 ```scala dstream.foreachRDD(rdd =>{ rdd.foreachPartition(partitionOfRecords =>{ val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) connection.close() }) }) ``` 这就将连接对象的创建开销分摊到了partition的所有记录上了。 - 最后,可以通过在多个RDD或者批数据间重用连接对象做更进一步的优化。开发者可以保有一个静态的连接对象池,重复使用池中的对象将多批次的RDD推送到外部系统,以进一步节省开支。 ```scala dstream.foreachRDD(rdd =>{ rdd.foreachPartition(partitionOfRecords =>{ // ConnectionPool is a static, lazily initialized pool of connections val connection =ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection)// return to the pool for future reuse }) }) ``` 需要注意的是,池中的连接对象应该根据需要延迟创建,并且在空闲一段时间后自动超时。这样就获取了最有效的方式发生数据到外部系统。 其它需要注意的地方: - 输出操作通过懒执行的方式操作DStreams,正如RDD action通过懒执行的方式操作RDD。具体地看,RDD actions和DStreams输出操作接收数据的处理。因此,如果你的应用程序没有任何输出操作或者 用于输出操作`dstream.foreachRDD()`,但是没有任何RDD action操作在`dstream.foreachRDD()`里面,那么什么也不会执行。系统仅仅会接收输入,然后丢弃它们。 - 默认情况下,DStreams输出操作是分时执行的,它们按照应用程序的定义顺序按序执行。 *附参考原文链接:* *https://www.bookstack.cn/read/Spark-ZH/spark-streaming-basic-concepts-output-operations-on-DStreams.md*
标签:
Spark
,
Spark Streaming
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1592.html
上一篇
05.DStream转换
下一篇
07.DStream优雅关闭
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
JavaScript
SpringCloudAlibaba
ajax
排序
FileBeat
序列化和反序列化
MyBatis-Plus
容器深入研究
Thymeleaf
Http
Spark SQL
MySQL
Ubuntu
查找
人工智能
Hadoop
持有对象
Stream流
Scala
锁
Sentinel
nginx
gorm
Quartz
正则表达式
JavaWEB项目搭建
MyBatisX
Hbase
Livy
Spring
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭