李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
07.Spark RDD序列化
Leefs
2021-06-29 AM
1354℃
0条
# 07.Spark RDD序列化 ### 一、闭包检查 从计算的角度, **算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor 端执行**。 那么在 scala 的函数式编程中,就会**导致算子内经常会用到算子外的数据**,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给 Executor 端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。Scala2.12 版本后闭包编译方式发生了改变。 ### 二、序列化方法和属性 从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor 端执行。 看如下代码: ```scala object serializable01_function { def main(args: Array[String]): Unit = { // 1、创建 SparkConf 并设置 App 名称 val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]") // 2、创建 SparkContext,该对象是提交 Spark App 的入口 val sc: SparkContext = new SparkContext(conf) // 3、创建一个 RDD val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive", "spark")) // 3.1 创建一个 Search 对象 val search = new Search("hello") // 3.2 函数传递,打印:ERROR Task not serializable search.getMatch1(rdd).collect().foreach(println) // 3.3 属性传递,打印:ERROR Task not serializable search.getMatch2(rdd).collect().foreach(println) // 4、关闭连接 sc.stop() } class Search(query:String) extends Serializable { //过滤出包含字符串的数据 def isMatch(s: String): Boolean = { s.contains(query) } // 函数序列化案例 //过滤出包含字符串的RDD def getMatch1 (rdd: RDD[String]): RDD[String] = { rdd.filter(isMatch) } // 属性序列化案例 //过滤出包含字符串的RDD def getMatch2(rdd: RDD[String]): RDD[String] = { rdd.filter(x => x.contains(query)) } } } ``` ##### 代码异常 如果Search类不继承`Serializable`将会报如下错误: ```scala Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) at org.apache.spark.SparkContext.clean(SparkContext.scala:2356) ``` 这是因为scala在执行的时候会校验序列化 ```scala private def clean( func: AnyRef, checkSerializable: Boolean, cleanTransitively: Boolean, accessedFields: Map[Class[_], Set[String]]): Unit = { .............. // 校验序列化 if (checkSerializable) { ensureSerializable(func) } } private def ensureSerializable(func: AnyRef): Unit = { try { if (SparkEnv.get != null) { SparkEnv.get.closureSerializer.newInstance().serialize(func) } } catch { case ex: Exception => throw new SparkException("Task not serializable", ex) } } ``` ##### 代码解析 **在传递一个方法时**,这个方法中所调用的方法`isMatch()`是定义在Search这个类中的,实际上调用的是`this. isMatch()`,this表示Search这个类的对象,程序在运行过程中需要将Search对象序列化以后传递到Executor端。 在传递一个属性时,同样会出现和调用方法相同的问题。 ### 三、Kryo序列化框架 ##### 1.参考地址 `https://github.com/EsotericSoftware/kryo` ##### 2.介绍 Java 的**序列化能够序列化任何的类**,但是**比较重**(字节多),序列化后,对象的提交也比较大。Spark 出于性能的考虑,Spark 2.0 开始支持另外一种 Kryo 序列化机制。 Kryo 速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化。 注意:即使使用 Kryo 序列化,也要继承 Serializable 接口。 ```scala object serializable_Kryo { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() .setAppName("SerDemo") .setMaster("local[*]") // 替换默认的序列化机制 .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 注册需要使用 kryo 序列化的自定义类 .registerKryoClasses(Array(classOf[Searcher])) val sc = new SparkContext(conf) val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello scala", "hello java"), 2) val searcher = new Searcher("hello") val result: RDD[String] = searcher.getMatchedRDD1(rdd) result.collect.foreach(println) } } case class Searcher(val query: String) { def isMatch(s: String) = { s.contains(query) } def getMatchedRDD1(rdd: RDD[String]) = { rdd.filter(isMatch) } def getMatchedRDD2(rdd: RDD[String]) = { val q = query rdd.filter(_.contains(q)) } } ```
标签:
Spark
,
Spark Core
,
Spark RDD
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1304.html
上一篇
06.【转载】Spark RDD行动算子
下一篇
08.Spark RDD依赖关系
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
43
标签云
线程池
Eclipse
正则表达式
GET和POST
栈
微服务
SQL练习题
容器深入研究
LeetCode刷题
Hive
Jquery
Spark SQL
人工智能
Shiro
Golang基础
NIO
Http
Spark
BurpSuite
Tomcat
Java
机器学习
Java编程思想
Linux
Ubuntu
HDFS
散列
工具
MyBatis
Typora
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞