李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
Spark读写HBase实践
Leefs
2021-04-12 AM
2326℃
0条
# Spark读写HBase实践 ### 前言 Spark经常会读写一些外部数据源,常见的有HDFS、HBase、JDBC、Redis、Kafka等。 ### 一、maven依赖 需要引入Hadoop和HBase的相关依赖,版本信息根据实际情况确定。 ```xml
2.4.4
2.7.3
1.3.1
2.11
org.apache.spark
spark-core_${scala.version}
${spark.version}
org.apache.spark
spark-sql_${scala.version}
${spark.version}
org.apache.spark
spark-streaming_${scala.version}
${spark.version}
org.apache.hbase
hbase-client
${hbase.version}
org.apache.hbase
hbase-common
${hbase.version}
org.apache.hbase
hbase-server
${hbase.version}
org.apache.hadoop
hadoop-client
${hadoop.version}
org.apache.hadoop
hadoop-common
${hadoop.version}
org.apache.hadoop
hadoop-hdfs
${hadoop.version}
``` ### 二、HBaseUtils 为了方便使用,需要写HBaseUtils类,完成一些基本信息的配置。比如完成Configuration、zookeeper的配置,返回HBaseAdmin和HTable等操作。 ```scala package com.bupt.Hbase import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.client.{HBaseAdmin, HTable} object HBaseUtils { /** * 设置HBaseConfiguration * @param quorum * @param port * @param tableName */ def getHBaseConfiguration(quorum:String, port:String, tableName:String) = { val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum",quorum) conf.set("hbase.zookeeper.property.clientPort",port) conf } /** * 返回或新建HBaseAdmin * @param conf * @param tableName * @return */ def getHBaseAdmin(conf:Configuration,tableName:String) = { val admin = new HBaseAdmin(conf) if (!admin.isTableAvailable(tableName)) { val tableDesc = new HTableDescriptor(TableName.valueOf(tableName)) admin.createTable(tableDesc) } admin } /** * 返回HTable * @param conf * @param tableName * @return */ def getTable(conf:Configuration,tableName:String) = { new HTable(conf,tableName) } } ``` ### 三、Spark读HBase 读取HBase数据可以通过TableInputFormat,newAPIHadoopRDD,把HBase表里的数据读出来,转变成RDD,再做后续处理。 ```scala package com.bupt.Hbase import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.{SparkConf, SparkContext} /** * 从HBase读取数据 */ object HBaseReadTest { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("HBaseReadTest").setMaster("local[2]") val sc = new SparkContext(sparkConf) val tableName = "test01" val quorum = "localhost" val port = "2181" // 配置相关信息 val conf = HBaseUtils.getHBaseConfiguration(quorum,port,tableName) conf.set(TableInputFormat.INPUT_TABLE,tableName) // HBase数据转成RDD val hBaseRDD = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]).cache() // RDD数据操作 val data = hBaseRDD.map(x => { val result = x._2 val key = Bytes.toString(result.getRow) val value = Bytes.toString(result.getValue("info".getBytes,"name".getBytes)) (key,value) }) data.foreach(println) sc.stop() } } ``` ### 四、Spark写HBase **Spark写HBase有三种方法:** > + 方法一是通过HTable中put方法一条一条插入数据到HBase; > + 方法二是通过TableOutputFormat、saveAsHadoopDataset的API; > + 方法三是通过bulkload将数据写入HFile再完成导入。 根据网上的资料显示,方法三的效率会更高,所以推荐使用bulkload的方式。 **4.1 通过HTable中put方法** 值得提醒的是,Spark在map,foreachPartition等算子内部使用了外部定义的变量和函数时,会引发Task未序列化问题。所以只能把配置信息放在foreachPartition中,效率很低。 ```scala package com.bupt.Hbase import java.util import org.apache.hadoop.hbase.client.{HTable, Put} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapred.JobConf import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 通过HTable中的Put向HBase写数据 */ object HBaseWriteTest { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("HBaseWriteTest").setMaster("local[2]") val sc = new SparkContext(sparkConf) val tableName = "test01" val quorum = "localhost" val port = "2181" // 配置相关信息 val conf = HBaseUtils.getHBaseConfiguration(quorum,port,tableName) conf.set(TableOutputFormat.OUTPUT_TABLE,tableName) val indataRDD = sc.makeRDD(Array("002,10","003,10","004,50")) indataRDD.foreachPartition(x=> { val conf = HBaseUtils.getHBaseConfiguration(quorum,port,tableName) conf.set(TableOutputFormat.OUTPUT_TABLE,tableName) val htable = HBaseUtils.getTable(conf,tableName) x.foreach(y => { val arr = y.split(",") val key = arr(0) val value = arr(1) val put = new Put(Bytes.toBytes(key)) put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(value)) htable.put(put) }) }) sc.stop() } } ``` **4.2 通过TableOutputFormat向HBase写数据** ```scala package com.bupt.Hbase import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapred.JobConf import org.apache.spark.{SparkConf, SparkContext} /** * 通过TableOutputFormat向HBase写数据 */ object HBaseWriteTest { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("HBaseWriteTest").setMaster("local[2]") val sc = new SparkContext(sparkConf) val tableName = "test01" val quorum = "localhost" val port = "2181" // 配置相关信息 val conf = HBaseUtils.getHBaseConfiguration(quorum,port,tableName) conf.set(TableOutputFormat.OUTPUT_TABLE,tableName) val jobConf = new JobConf(conf) jobConf.setOutputFormat(classOf[TableOutputFormat]) jobConf.set(TableOutputFormat.OUTPUT_TABLE,tableName) // 写入数据到HBase val indataRDD = sc.makeRDD(Array("20180723_02,10","20180723_03,10","20180818_03,50")) val rdd = indataRDD.map(_.split(",")).map{arr => { val put = new Put(Bytes.toBytes(arr(0))) put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(arr(1))) (new ImmutableBytesWritable,put) }}.saveAsHadoopDataset(jobConf) sc.stop() } } ``` **4.3 通过bulkload向HBase写数据** 这个方法的思路就是将数据RDD先生成HFiles,然后通过`org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles`将事先生成Hfiles批量导入到Hbase中。bulkload性能提高的原因可以参阅这篇文章https://www.iteblog.com/archives/1889.html ```scala package com.bupt.Hbase import org.apache.spark.{SparkConf, SparkContext} import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles} import org.apache.hadoop.fs.Path import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.hbase.KeyValue /** * 通过bulkload向HBase写数据 */ object HBaseWriteTest2 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("HBaseReadTest").setMaster("local[2]") val sc = new SparkContext(sparkConf) val tableName = "test01" val quorum = "localhost" val port = "2181" // 配置相关信息 val conf = HBaseUtils.getHBaseConfiguration(quorum,port,tableName) conf.set(TableOutputFormat.OUTPUT_TABLE,tableName) val table = HBaseUtils.getTable(conf,tableName) val job = Job.getInstance(conf) job.setMapOutputKeyClass(classOf[ImmutableBytesWritable]) job.setMapOutputKeyClass(classOf[KeyValue]) HFileOutputFormat2.configureIncrementalLoadMap(job, table) // inputRDD data val indataRDD = sc.makeRDD(Array("20180723_02,13","20180723_03,13","20180818_03,13")) val rdd = indataRDD.map(x => { val arr = x.split(",") val kv = new KeyValue(Bytes.toBytes(arr(0)),"info".getBytes,"age".getBytes,arr(1).getBytes) (new ImmutableBytesWritable(Bytes.toBytes(arr(0))),kv) }) // 保存Hfile to HDFS rdd.saveAsNewAPIHadoopFile("hdfs://localhost:8020/tmp/hbase",classOf[ImmutableBytesWritable],classOf[KeyValue],classOf[HFileOutputFormat2],conf) // Bulk写Hfile to HBase val bulkLoader = new LoadIncrementalHFiles(conf) bulkLoader.doBulkLoad(new Path("hdfs://localhost:8020/tmp/hbase"),table) } } ``` *附:* *[参考原文链接](https://www.jianshu.com/p/49141df754a2)*
标签:
Hbase
,
Spark
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1228.html
上一篇
SpringBoot2.x整合百度UidGenerator
下一篇
Scala练习(一)
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
高并发
Zookeeper
MySQL
CentOS
Thymeleaf
Kibana
栈
Eclipse
Python
容器深入研究
Git
SQL练习题
Ubuntu
国产数据库改造
RSA加解密
链表
Spark Streaming
FastDFS
哈希表
微服务
Netty
JavaSE
MyBatis
Typora
Nacos
gorm
稀疏数组
Java
Flink
并发编程
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭