李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
【转载】scala spark创建DataFrame的多种方式
Leefs
2021-12-16 PM
1573℃
0条
[TOC] ### 一、通过RDD[Row]和StructType创建 ```scala import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SparkSession} /** * 通过RDD[Row]和StructType创建DataFrame **/ object DataFrameDemo01 { def main(args: Array[String]): Unit = { // 通过SparkSession创建spark入口 val spark: SparkSession = SparkSession.builder() .appName(this.getClass.getSimpleName) .master("local[*]") .getOrCreate() println("1. 通过RDD[Row]和StructType创建") //1. 通过RDD[Row]和StructType创建 val sparkRdd1: RDD[String] = spark.sparkContext.parallelize(List("X,22,M", "y,21,W", "N,22,M")) // 将RDD与Row联合 val rowRdd: RDD[Row] = sparkRdd1.map(t => { val per: Array[String] = t.split(",") Row(per(0), per(1).toInt, per(2)) }) // 创建StructType实例,设置字段名和类型 val schema: StructType = StructType( List( StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType) ) ) // 创建dataFrame val dataFrame: DataFrame = spark.createDataFrame(rowRdd, schema) // 展示数据 dataFrame.show() // 释放资源 spark.stop() } } ``` **输出结果** ``` 1. 通过RDD[Row]和StructType创建 +----+---+---+ |name|age|sex| +----+---+---+ | X| 22| M| | y| 21| W| | N| 22| M| +----+---+---+ ``` ### 二、直接通过Rdd.toDF()创建DataFrame ```scala import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} /** * 通过RDD.toDF直接创建DataFrame **/ object DataFrameDemo02 { def main(args: Array[String]): Unit = { // 通过SparkSession创建spark入口 val spark: SparkSession = SparkSession.builder() .appName(this.getClass.getSimpleName) .master("local[*]") .getOrCreate() println("2. 直接通过Rdd.toDF()创建DataFrame") //创建RDD val sparkRdd2: RDD[String] = spark.sparkContext.parallelize(List("X,22,M", "y,21,W", "N,22,M")) //将RDD与Row联合 val toDFRdd: RDD[(String, Int, String)] = sparkRdd2.map(t => { val per: Array[String] = t.split(",") (per(0), per(1).toInt, per(2)) }) import org.apache.spark.sql.functions._ import spark.implicits._ //写法1 //val frame: DataFrame = toDFRdd.toDF("name", "age", "sex") //写法2 val array = Array("name","age","sex") val frame: DataFrame = toDFRdd.toDF(array:_*) //展示数据 frame.agg(sum("age") as "avg_age").show() // 释放资源 spark.stop() } } ``` **运行结果** ``` 2. 直接通过Rdd.toDF()创建DataFrame +-------+ |avg_age| +-------+ | 65| +-------+ ``` ### 三、通过RDD和ScalaBean创建DataFrame ```scala import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} /** * @author 通过RDD和scalabean创建DataFrame **/ object DataFrameDemo03 { def main(args: Array[String]): Unit = { // 通过SparkSession创建spark入口 val spark: SparkSession = SparkSession.builder() .appName(this.getClass.getSimpleName) .master("local[*]") .getOrCreate() println("3. 通过RDD和ScalaBean创建DataFrame") //创建RDD val sparkRdd3: RDD[String] = spark.sparkContext.parallelize(List("X,22,M", "y,21,W", "N,22,M")) //关联ScalaBean的RDD val beanRdd: RDD[Per] = sparkRdd3.map(t => { val per: Array[String] = t.split(",") Per(per(0), per(1).toInt, per(2)) }) // 必须导入隐式转换才能使用.toDF import spark.implicits._ //创建DataFrame val df: DataFrame = beanRdd.toDF() // 创建视图 df.createTempView("t_per") // 查询数据 val res: DataFrame = spark.sql("SELECT name,age FROM t_per ORDER BY age") // 展示数据 res.show() // 释放资源 spark.stop() } case class Per(name:String,age:Int,sex:String) } ``` **运行结果** ``` 3. 通过RDD和ScalaBean创建DataFrame +----+---+ |name|age| +----+---+ | y| 21| | X| 22| | N| 22| +----+---+ ``` ### 四、通过RDD联合JavaBean创建DataFrame ```scala import com.llc.spark.hbase.bean.Person import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} /** * 通过RDD和javabean创建DataFrame **/ object DataFrameDemo04 { def main(args: Array[String]): Unit = { // 通过SparkSession创建spark入口 val spark: SparkSession = SparkSession.builder() .appName(this.getClass.getSimpleName) .master("local[*]") .getOrCreate() println("4. 通过RDD联合JavaBean创建DataFrame") //创建RDD val sparkRdd4: RDD[String] = spark.sparkContext.parallelize(List("X,22,M", "y,21,W", "N,22,M")) //关联JavaBean的RDD val javaBeanRdd: RDD[Person] = sparkRdd4.map(t => { val per: Array[String] = t.split(",") new Person(per(0), per(1).toInt, per(2)) }) //创建DataFrame val frame: DataFrame = spark.createDataFrame(javaBeanRdd,classOf[Person]) // 展示数据 frame.show() //释放资源 spark.stop() } } ``` **Java bean class** ```java public class Person { private String name; private int age; private String sex; public Person(String name, int age, String sex) { this.name = name; this.age = age; this.sex = sex; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public String getSex() { return sex; } public void setSex(String sex) { this.sex = sex; } } ``` **运行结果** ``` 4. 通过RDD联合JavaBean创建DataFrame +---+----+---+ |age|name|sex| +---+----+---+ | 22| X| M| | 21| y| W| | 22| N| M| +---+----+---+ ``` ### 五、手写的通过RDD和javabean创建DataFrame ```scala import com.llc.spark.hbase.bean.NumberWord import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SparkSession} /** * 通过RDD和javabean创建DataFrame **/ object DataFrameDemo05 { def main(args: Array[String]): Unit = { // 通过SparkSession创建spark入口 val spark: SparkSession = SparkSession.builder() .appName(this.getClass.getSimpleName) .master("local[*]") .getOrCreate() println("5.手写的通过RDD和javabean创建DataFrame") val rdd11: RDD[Row] = spark.sparkContext.parallelize( Seq( Row(8, "bat"), Row(64, "mouse"), Row(-27, "horse") ) ) rdd11.foreach(println) //创建 val newRDD: RDD[NumberWord] = rdd11.map(x => { val rdto = convertNumberWordDTO(x.getInt(0) + 100, x.getString(1)) rdto }) val ds: DataFrame = spark.createDataFrame(newRDD,classOf[NumberWord]) ds.show(100,false) println(ds.schema) //StructType(StructField(number,IntegerType,false), StructField(word,StringType,true)) //调用 someDF.schema.printTreeString()得到someDF的schema: ds.schema.printTreeString() //root // |-- number: integer (nullable = false) // |-- word: string (nullable = true) // 释放资源 spark.stop() } def convertNumberWordDTO(number: Int,word:String) ={ val rdto = new NumberWord() rdto.setNumber(number) rdto.setWord(word) rdto } } ``` **Java bean class** ```java public class NumberWord { private int number; private String word; public NumberWord() { } public NumberWord(int number, String word) { this.number = number; this.word = word; } public int getNumber() { return number; } public void setNumber(int number) { this.number = number; } public String getWord() { return word; } public void setWord(String word) { this.word = word; } } ``` **运行结果** ``` 5.手写的通过RDD和javabean创建DataFrame [64,mouse] [-27,horse] [8,bat] +------+-----+ |number|word | +------+-----+ |108 |bat | |164 |mouse| |73 |horse| +------+-----+ StructType(StructField(number,IntegerType,false), StructField(word,StringType,true)) root |-- number: integer (nullable = false) |-- word: string (nullable = true) ``` *附原文链接地址:* *https://blog.csdn.net/helloxiaozhe/article/details/113354054*
标签:
Spark
,
Spark SQL
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1748.html
上一篇
19.Hive自定义UDF函数
下一篇
20.Hive自定义UDTF函数
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
Flink
NIO
VUE
JavaSE
Python
Thymeleaf
MySQL
Netty
LeetCode刷题
Quartz
Stream流
Tomcat
GET和POST
Hive
gorm
国产数据库改造
并发线程
Sentinel
Kafka
人工智能
DataX
容器深入研究
Flume
Jquery
MyBatisX
JavaWEB项目搭建
Spark Streaming
FastDFS
Spark SQL
散列
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭