spark如何导入数据到hbase数据库
spark整合hbase数据库
在实际生产过程中,因为数据的复杂性,我们通常将处理好的数据缓存到hbase中。
本篇文章目的在于在学习过程中做笔记,以备后面的巩固复习。
代码如下
package com.aura.bigdata.dmp.util
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory}
//获得hbase的HBaseConnection对象
object HBaseConnectionUtil {
def getConnection(): Connection = {
val conf:Configuration = HBaseConfiguration.create()
//设置hbase访问路径,set参数取决于hadoop安装目录中的hdfs- site.xml
conf.set("hbase.rootdir", ""hbase.rootdir","hdfs://bd1807/hbase"")
conf.set("hbase.zookeeper.quorum", "huaxia01:2181,huaxia02:2181,huaxia03:2181")
//connect不可被实例化,通过ConnectionFactory创建实例对象
val connection = ConnectionFactory.createConnection(conf)
connection
}
def main(args: Array[String]): Unit = {
val connection = getConnection()
val admin = connection.getAdmin
//查看是否连接成功
val tableNames = admin.listTableNames()
println(tableNames.mkString("[", ", ", "]"))
}
}
启动zookeeper,hdfs,hbase,注意查看zookeeper,hdfs是否启动成功,避免HA两个namenode出现都是standby的情况
启动完成后,进程如下
通过命令hbase shell进入hbase命令模式,输入list查看所有表名
运行上面代码查看结果一致
接下来将RDD数据导入到hbase表中,这里举一个自己的例子
代码如下
userId2TagsRDD.foreachPartition(partition =>{
if(!partition.isEmpty){
val connection: Connection = HbaseConnectionUtil.getConnection()
//连接到表
val table: Table = connection.getTable(TableName.valueOf("dmp_1807"))
partition.foreach{ case (userid,tags) =>{
//将数据导入到hbase中
//创建puts,这里的util为java.util包下的
val puts = new util.ArrayList[Put]()
for((tag,count) <- tags){
val put = new Put(userid.getBytes())
//Put.add方法接收三个参数:列族,列名,数据
put.addColumn("cf".getBytes(),tag.getBytes(),(count+"").getBytes())
puts.add(put)
}
//使用table.put()puts方法一次性导入
table.put(puts)
table.close()
}
}
connection.close()
}
})
程序运行完成后,进入hbase查看是否导入成功,如下