李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
05.【转载】Dataset (DataFrame) 的基础操作(一)
Leefs
2021-07-16 PM
1272℃
0条
# 05.【转载】Dataset (DataFrame) 的基础操作(一) ### 导读 这一章节主要目的是介绍 Dataset 的基础操作, 当然, DataFrame 就是 Dataset, 所以这些操作大部分也适用于 DataFrame 1. 有类型的转换操作 2. 无类型的转换操作 3. 基础 Action 4. 空值如何处理 5. 统计操作 ### 一、有类型转换操作 #### 1.1 flatMap 通过 flatMap 可以将一条数据转为一个数组, 后再展开这个数组放入 Dataset ```scala //flatmap val ds1 = Seq("hello spark", "hello hadoop").toDS ds1.flatMap( x => x.split(" ")).show() ``` #### 1.2 map map 可以将数据集中每条数据转为另一种形式 ```scala case class Person(name:String,age:Int) //map val ds2 = Seq(Person("zhangsan", 20), Person("lisi", 15)).toDS() ds2.map( x => Person(x.name,x.age * 2 )).show() ``` #### 1.3 mapPartitions mapPartitions 和 map 一样, 但是 map 的处理单位是每条数据, mapPartitions 的处理单位是每个分区 ```scala //map val ds2 = Seq(Person("zhangsan", 20), Person("lisi", 15)).toDS() ds2.map( x => Person(x.name,x.age * 2 )).show() //mapPartitions ds2.mapPartitions{ x =>{ //x不能大到没个Executor的内存放不下,不然就会OOM //对每个元素进心转换,后生成一个新的的集合 //这个map是Scala的map val result = x.map(x => Person(x.name, x.age * 2)) result } }.show() ``` #### 1.4 transform map 和 mapPartitions 以及 transform 都是转换, map 和 mapPartitions 是针对数据, 而 transform 是针对整个数据集, 这种方式最大的区别就是 transform 可以直接拿到 Dataset 进行操作 ![05.【转载】Dataset (DataFrame) 的基础操作01.png](https://lilinchao.com/usr/uploads/2021/07/3630433830.png) ```scala val ds = spark.range(10) ds.transform(x => x.withColumn("doubled",'id * 2)) .show() } ``` #### 1.5 as as[Type] 算子的主要作用是将弱类型的 Dataset 转为强类型的 Dataset, 它有很多适用场景, 但是最常见的还是在读取数据的时候, 因为 DataFrameReader 体系大部分情况下是将读出来的数据转换为 DataFrame 的形式, 如果后续需要使用 Dataset 的强类型 API, 则需要将 DataFrame 转为 Dataset. 可以使用 as[Type] 算子完成这种操作 ```scala @Test def as(): Unit ={ val schema = StructType{ List( StructField("name",StringType), StructField("age",IntegerType), StructField("gpa",FloatType) ) } val path = "E:\\Project\\Spark\\spark-sql\\input\\studenttab10k" val df: Dataset[Row] = spark.read.schema(schema) .option("header",true) .option("delimiter", "\t") .csv(path) //转换 val ds: Dataset[Student] = df.as[Student] ds.show() } case class Student(name:String,age:Int,gpa:Float) ``` #### 1.6 filter filter 用来按照条件过滤数据集 ```scala case class Person(name:String,age:Int) @Test def filters(): Unit ={ val ds2 = Seq(Person("zhangsan", 20), Person("lisi", 15)).toDS() ds2.filter (x => x.age > 15).show() } ``` #### 1.7 groupByKey grouByKey 算子的返回结果是 KeyValueGroupedDataset, 而不是一个 Dataset, 所以必须要先经过 KeyValueGroupedDataset 中的方法进行聚合, 再转回 Dataset, 才能使用 Action 得出结果 其实这也印证了分组后必须聚合的道理 ```scala case class Person(name:String,age:Int) @Test def groupByKey(): Unit ={ val ds = Seq(Person("zhangsan", 20),Person("zhangsan", 18), Person("lisi", 15)).toDS //select count(*) from person group by name val grouped: KeyValueGroupedDataset[String, Person] = ds.groupByKey(x => x.name) grouped.count().show() } ``` #### 1.8 randomSplit randomSplit 会按照传入的权重随机将一个 Dataset 分为多个 Dataset, 传入 randomSplit 的数组有多少个权重, 最终就会生成多少个 Dataset, 这些权重的加倍和应该为 1, 否则将被标准化。 ```scala @Test def split(): Unit ={ val ds = spark.range(15) //randomSplit 切多少分,权重多少 val datasets: Array[Dataset[lang.Long]] = ds.randomSplit(Array(5, 2, 3)) datasets.foreach(_.show()) //sample ds.sample(withReplacement = false,fraction = 0.4).show() } ``` #### 1.9 sample sample 会随机在 Dataset 中抽样 ```scala @Test def split(): Unit ={ val ds = spark.range(15) //randomSplit 切多少分,权重多少 val datasets: Array[Dataset[lang.Long]] = ds.randomSplit(Array(5, 2, 3)) datasets.foreach(_.show()) //sample ds.sample(withReplacement = false,fraction = 0.4).show() } ``` #### 1.10 orderBy orderBy 配合 Column 的 API, 可以实现正反序排列 ```scala case class Person(name:String,age:Int) @Test def sort(): Unit ={ val ds = Seq(Person("zhangsan", 20),Person("zhangsan", 18), Person("lisi", 15)).toDS ds.orderBy('name.desc).show() println("----------") ds.sort('age.asc).show() } ``` #### 1.11 sort 其实 orderBy 是 sort 的别名, 所以它们所实现的功能是一样的 ```scala case class Person(name:String,age:Int) @Test def sort(): Unit ={ val ds = Seq(Person("zhangsan", 20),Person("zhangsan", 18), Person("lisi", 15)).toDS ds.orderBy('name.desc).show() println("----------") ds.sort('age.asc).show() } ``` #### 1.12 coalesce 减少分区, 此算子和 RDD 中的 coalesce 不同, Dataset 中的 coalesce 只能减少分区数, coalesce 会直接创建一个逻辑操作, 并且设置 Shuffle 为 false。 ```scala val ds = spark.range(15) ds.coalesce(1).explain(true) ``` #### 1.13 repartitions repartitions 有两个作用, 一个是重分区到特定的分区数, 另一个是按照某一列来分区, 类似于 SQL 中的 DISTRIBUTE BY ```scala case class Person(name:String,age:Int) val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() ds.repartition(4) ds.repartition('name) ``` #### 1.14 dropDuplicates 使用 dropDuplicates 可以去掉某一些列中重复的行 ```scala case class Person(name:String,age:Int) @Test def dropDuplicates(): Unit ={ val ds = spark.createDataset(Seq(Person("zhangsan", 15), Person("lisi", 15), Person("zhangsan", 15))) ds.distinct().show() println("-------------") ds.dropDuplicates("age").show() } ``` #### 1.15 distinct 当 dropDuplicates 中没有传入列名的时候, 其含义是根据所有列去重, dropDuplicates() 方法还有一个别名, 叫做 distinct ![05.【转载】Dataset (DataFrame) 的基础操作02.png](https://lilinchao.com/usr/uploads/2021/07/569711141.png) 所以, 使用 distinct 也可以去重, 并且只能根据所有的列来去重 ```scala case class Person(name:String,age:Int) @Test def dropDuplicates(): Unit ={ val ds = spark.createDataset(Seq(Person("zhangsan", 15), Person("lisi", 15), Person("zhangsan", 15))) ds.distinct().show() println("-------------") ds.dropDuplicates("age").show() } ``` #### 1.16 except except 和 SQL 语句中的 except 一个意思, 是求得 ds1 中不存在于 ds2 中的数据, 其实就是差集 ```scala val ds1 = spark.range(1, 10) val ds2 = spark.range(5, 15) ds1.except(ds2).show() ``` #### 1.17 intersect 求得两个集合的交集 ```scala val ds1 = spark.range(1, 10) val ds2 = spark.range(5, 15) ds1.intersect(ds2).show() ``` #### 1.18 union 求得两个集合的并集 ```scala val ds1 = spark.range(1, 10) val ds2 = spark.range(5, 15) ds1.union(ds2).show() ``` #### 1.19 limit 限制结果集数量 ```scala val ds = spark.range(1, 10) ds.limit(3).show() ``` ### 二、无类型转换操作 #### 2.1 select select 用来选择某些列出现在结果集中 ```scala case class Person(name:String,age:Int) @Test def select(): Unit ={ val ds = Seq(Person("zhangsan", 20),Person("zhangsan", 18), Person("lisi", 15)).toDS ds.select('name).show() ds.selectExpr("sum(age)").show() println("----------------") ds.select(expr("sum(age)")).show() } ``` #### 2.2 selectExpr 在 SQL 语句中, 经常可以在 select 子句中使用 count(age), rand() 等函数, 在 selectExpr 中就可以使用这样的 SQL 表达式, 同时使用 select 配合 expr 函数也可以做到类似的效果 ```scala case class Person(name:String,age:Int) @Test def select(): Unit ={ val ds = Seq(Person("zhangsan", 20),Person("zhangsan", 18), Person("lisi", 15)).toDS ds.select('name).show() ds.selectExpr("sum(age)").show() println("----------------") ds.select(expr("sum(age)")).show() } ``` #### 2.3 withColumn 通过 Column 对象在 Dataset 中创建一个新的列或者修改原来的列 ```scala case class Person(name:String,age:Int) @Test def column(): Unit ={ val ds = Seq(Person("zhangsan", 20),Person("zhangsan", 18), Person("lisi", 15)).toDS //如果想使用函数功能 //1.使用functions.xx //2.使用表达式,可以使用expr("..."),随时随地编写表达式 ds.withColumn("random",expr("rand()")).show() ds.withColumn("name_new",'name).show() ds.withColumn("name_jdk",'name === "" ).show() ds.withColumnRenamed("name","new_name").show() } ``` #### 2.4 withColumnRenamed 修改列名 ```scala case class Person(name:String,age:Int) @Test def column(): Unit ={ val ds = Seq(Person("zhangsan", 20),Person("zhangsan", 18), Person("lisi", 15)).toDS //如果想使用函数功能 //1.使用functions.xx //2.使用表达式,可以使用expr("..."),随时随地编写表达式 ds.withColumn("random",expr("rand()")).show() ds.withColumn("name_new",'name).show() ds.withColumn("name_jdk",'name === "" ).show() ds.withColumnRenamed("name","new_name").show() } ``` #### 2.5 drop 剪掉某个列 ```scala import spark.implicits._ case class Person(name:String,age:Int) val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() ds.drop('age).show() ``` #### 2.6 groupBy 按照给定的行进行分组 ```scala case class Person(name:String,age:Int) @Test def groupBy(): Unit ={ val ds = Seq(Person("zhangsan", 20),Person("zhangsan", 18), Person("lisi", 15)).toDS //为什么GroupByKey是有类型的,最主要原因是因为 GroupByKey 生成的对象的算子是有类型的 //为什么GroupBy是无类型的,因为GroupBy生成的对象的算子是无类型的,针对列进行处理的 ds.groupBy('name).agg(mean("age")).show() } ``` *附:原文链接地址* https://blog.csdn.net/weixin_45417821/article/details/108472687
标签:
Spark
,
Spark SQL
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1338.html
上一篇
04.DataFrame常用API
下一篇
06.【转载】Dataset (DataFrame) 的基础操作(二)
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
Eclipse
锁
Zookeeper
Tomcat
Java编程思想
持有对象
容器深入研究
Yarn
DataWarehouse
工具
GET和POST
数据结构和算法
Spark SQL
SpringBoot
Beego
查找
JavaScript
Python
VUE
Elasticsearch
Spark Core
Netty
Ubuntu
字符串
高并发
Spark
gorm
Flume
BurpSuite
Flink
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞