李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
04.Spark RDD创建简介
Leefs
2021-06-29 AM
1108℃
0条
# 04.Spark RDD创建简介 ### 一、创建RDD #### 1.1 RDD创建方式大概分为四种 **(1)从集合(内存中创建)RDD** 从集合中创建 RDD,Spark 主要提供了两个方法:parallelize 和 makeRDD + parallelize `def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]` + 参数1:Seq集合,必须。 + 参数2:分区数,默认为该Application分配到的资源的CPU核数 ```scala //默认分区数创建RDD val rdd1 = sparkContext.parallelize(List(1,2,3,4)) //设置RDD为三个分区 val rdd1and = sparkContext.parallelize(List(1,2,3,4),3) ``` + makeRDD `def makeRDD[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]` ```scala val rdd2 = sparkContext.makeRDD(List(1,2,3,4)) val rdd2and = sparkContext.makeRDD(List(1,2,3,4),2) ``` 从底层代码实现来讲,makeRDD 方法其实就是 parallelize 方法 ```scala def makeRDD[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { parallelize(seq, numSlices) } ``` **(2)从外部存储(文件)创建RDD** 由外部存储系统的数据集创建 RDD 包括:本地的文件系统,所有 Hadoop 支持的数据集, 比如 HDFS、HBase 等。 + textFile `textFile(path,minPartitions)` + 参数1:String类型的一个URI,可以是HDFS和本地文件URI + 参数2:指定数据的分区,如果不指定分区,当你的核数大于2的时候,不指定分区数默认就是2 + 返回值:返回的是一个字符串类型的RDD,也就是RDD的内部形式Iterator[(String)] ```scala val fileRDD:RDD[String] = sparkContext.textFile("input") val fileRDD2and:RDD[String] = sparkContext.textFile("input",3) ``` textFile支持读文件、读文件夹,甚至压缩文件,还支持通配符。 + wholeTextFiles `wholeTextFiles(path,minPartitions)` + 参数1:String类型的一个URI,可以设置为多个路径,多个路径之间以逗号分隔 + 参数2:指定数据的分区 + 返回值:返回的是pairRDD。每个输入文件**一条记录**,保留数据与包含它的文件之间的关系。记录的格式为`(fileName, fileContent)`。 ```scala val fileRdd3 = sparkContext wholeTextFiles("input") val fileRdd3and = sparkContext wholeTextFiles("input",3) ``` 返回结果 ``` (file:/D:/Codes/llc-blog/input/word.txt,Hello Spark Hello World Hello Scala) ``` wholeTextFiles可以针对一个目录中的大量小文件 **其他方法:** + SparkContext.sequenceFile
可以针对SequenceFile创建RDD,K和V泛型类型就是SequenceFile的key和value的类型。K和V要求必须是Hadoop的序列化类型,比如IntWritable、Text等。 + SparkContext.hadoopRDD() 对于Hadoop的自定义输入类型,可以创建RDD。该方法接收JobConf、InputFormatClass、Key和Value的Class。 + SparkContext.objectFile() 可以针对之前调用RDD.saveAsObjectFile()创建的对象序列化的文件,反序列化文件中的数据,并创建一个RDD。 **(3)从其他RDD创建** 主要是通过一个 RDD 运算完后,再产生新的 RDD。 **(4)直接创建 RDD(new)** 使用 new 的方式直接构造 RDD,一般由 Spark 框架自身使用。 ### 二、并行度与分区 默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能够并行计算的任务数量我们称之为并行度。这个数量可以在构建 RDD 时指定。记住,这里的并行执行的任务数量,并不是指的切分任务的数量,不要混淆了。 ```scala val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark") val sparkContext = new SparkContext(sparkConf) val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4),4) val fileRDD: RDD[String] = sparkContext.textFile("input",2) fileRDD.collect().foreach(println) sparkContext.stop() ``` 读取内存数据时,数据可以按照并行度的设定进行数据的分区操作 ### 三、RDD操作 RDD支持两种操作,转化操作和行动操作。 + 转化操作返回一个全新的RDD,比如map()和filter() + 行动操作返回的是其他数据类型,比如count()和first(),并且行动操作会触发实际的计算 ### 四、RDD的惰性求值 RDD的转化操作都是**惰性求值**的,也就是说在调用行动操作之前Spark是不会开始进行计算的。 当我们调用转化操作时,Spark会在内部记录所要执行的信息,因此,严格意义上来说RDD不应该被看作是存放数据的数据集,而是通过转化操作构建出来的、记录如何计算数据的指令集。 此外,RDD的读取也是惰性的,这就是说,当我们调用textFile()函数时,数据并没有真正的读取,而是在有需要的时候才会读取。 惰性求值的优点是可以把一些操作进行合并以减少计算数据的步骤。 ### 五、RDD缓存 因为RDD是惰性求值的,且在同一个程序里我们可以能要多次使用相同的RDD,如果只是简单对RDD调用行动操作,Spark每次都会重新计算RDD及其所有的依赖,这会造成极大的资源浪费。 为了避免资源浪费,提升代码运行效率,Spark可以通过persist()支持对RDD数据进行持久化操作。在Java中,persis()默认会把数据以序列化的方式缓存在JVM的堆空间里。 *附:* [参考文章链接](http://www.likuli.com/archives/772/)
标签:
Spark
,
Spark Core
,
Spark RDD
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1281.html
上一篇
03.Spark RDD简介
下一篇
05.【转载】Spark RDD转换算子
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
Redis
稀疏数组
Flink
Git
栈
并发线程
查找
GET和POST
设计模式
Python
Spark SQL
BurpSuite
并发编程
JVM
容器深入研究
机器学习
Flume
二叉树
散列
ajax
Hive
Nacos
锁
序列化和反序列化
gorm
递归
JavaSE
JavaWeb
SpringBoot
算法
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞