李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
04.Spark RDD创建简介
Leefs
2021-06-29 AM
1731℃
0条
# 04.Spark RDD创建简介 ### 一、创建RDD #### 1.1 RDD创建方式大概分为四种 **(1)从集合(内存中创建)RDD** 从集合中创建 RDD,Spark 主要提供了两个方法:parallelize 和 makeRDD + parallelize `def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]` + 参数1:Seq集合,必须。 + 参数2:分区数,默认为该Application分配到的资源的CPU核数 ```scala //默认分区数创建RDD val rdd1 = sparkContext.parallelize(List(1,2,3,4)) //设置RDD为三个分区 val rdd1and = sparkContext.parallelize(List(1,2,3,4),3) ``` + makeRDD `def makeRDD[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]` ```scala val rdd2 = sparkContext.makeRDD(List(1,2,3,4)) val rdd2and = sparkContext.makeRDD(List(1,2,3,4),2) ``` 从底层代码实现来讲,makeRDD 方法其实就是 parallelize 方法 ```scala def makeRDD[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { parallelize(seq, numSlices) } ``` **(2)从外部存储(文件)创建RDD** 由外部存储系统的数据集创建 RDD 包括:本地的文件系统,所有 Hadoop 支持的数据集, 比如 HDFS、HBase 等。 + textFile `textFile(path,minPartitions)` + 参数1:String类型的一个URI,可以是HDFS和本地文件URI + 参数2:指定数据的分区,如果不指定分区,当你的核数大于2的时候,不指定分区数默认就是2 + 返回值:返回的是一个字符串类型的RDD,也就是RDD的内部形式Iterator[(String)] ```scala val fileRDD:RDD[String] = sparkContext.textFile("input") val fileRDD2and:RDD[String] = sparkContext.textFile("input",3) ``` textFile支持读文件、读文件夹,甚至压缩文件,还支持通配符。 + wholeTextFiles `wholeTextFiles(path,minPartitions)` + 参数1:String类型的一个URI,可以设置为多个路径,多个路径之间以逗号分隔 + 参数2:指定数据的分区 + 返回值:返回的是pairRDD。每个输入文件**一条记录**,保留数据与包含它的文件之间的关系。记录的格式为`(fileName, fileContent)`。 ```scala val fileRdd3 = sparkContext wholeTextFiles("input") val fileRdd3and = sparkContext wholeTextFiles("input",3) ``` 返回结果 ``` (file:/D:/Codes/llc-blog/input/word.txt,Hello Spark Hello World Hello Scala) ``` wholeTextFiles可以针对一个目录中的大量小文件 **其他方法:** + SparkContext.sequenceFile
可以针对SequenceFile创建RDD,K和V泛型类型就是SequenceFile的key和value的类型。K和V要求必须是Hadoop的序列化类型,比如IntWritable、Text等。 + SparkContext.hadoopRDD() 对于Hadoop的自定义输入类型,可以创建RDD。该方法接收JobConf、InputFormatClass、Key和Value的Class。 + SparkContext.objectFile() 可以针对之前调用RDD.saveAsObjectFile()创建的对象序列化的文件,反序列化文件中的数据,并创建一个RDD。 **(3)从其他RDD创建** 主要是通过一个 RDD 运算完后,再产生新的 RDD。 **(4)直接创建 RDD(new)** 使用 new 的方式直接构造 RDD,一般由 Spark 框架自身使用。 ### 二、并行度与分区 默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能够并行计算的任务数量我们称之为并行度。这个数量可以在构建 RDD 时指定。记住,这里的并行执行的任务数量,并不是指的切分任务的数量,不要混淆了。 ```scala val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark") val sparkContext = new SparkContext(sparkConf) val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4),4) val fileRDD: RDD[String] = sparkContext.textFile("input",2) fileRDD.collect().foreach(println) sparkContext.stop() ``` 读取内存数据时,数据可以按照并行度的设定进行数据的分区操作 ### 三、RDD操作 RDD支持两种操作,转化操作和行动操作。 + 转化操作返回一个全新的RDD,比如map()和filter() + 行动操作返回的是其他数据类型,比如count()和first(),并且行动操作会触发实际的计算 ### 四、RDD的惰性求值 RDD的转化操作都是**惰性求值**的,也就是说在调用行动操作之前Spark是不会开始进行计算的。 当我们调用转化操作时,Spark会在内部记录所要执行的信息,因此,严格意义上来说RDD不应该被看作是存放数据的数据集,而是通过转化操作构建出来的、记录如何计算数据的指令集。 此外,RDD的读取也是惰性的,这就是说,当我们调用textFile()函数时,数据并没有真正的读取,而是在有需要的时候才会读取。 惰性求值的优点是可以把一些操作进行合并以减少计算数据的步骤。 ### 五、RDD缓存 因为RDD是惰性求值的,且在同一个程序里我们可以能要多次使用相同的RDD,如果只是简单对RDD调用行动操作,Spark每次都会重新计算RDD及其所有的依赖,这会造成极大的资源浪费。 为了避免资源浪费,提升代码运行效率,Spark可以通过persist()支持对RDD数据进行持久化操作。在Java中,persis()默认会把数据以序列化的方式缓存在JVM的堆空间里。 *附:* [参考文章链接](http://www.likuli.com/archives/772/)
标签:
Spark
,
Spark Core
,
Spark RDD
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1281.html
上一篇
03.Spark RDD简介
下一篇
05.【转载】Spark RDD转换算子
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
SQL练习题
人工智能
Golang基础
JVM
设计模式
Elasticsearch
Hbase
pytorch
递归
排序
并发编程
FastDFS
Java阻塞队列
FileBeat
Elastisearch
DataX
查找
Shiro
Git
Netty
HDFS
VUE
Spark Core
机器学习
Tomcat
链表
国产数据库改造
Flume
JavaWeb
算法
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭