Hadoop---(4)HBase(分布式存储系统)
HBASE
HBase(Hadoop Database),是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBase技术可在廉价PC Server上搭建起大规模结构化存储集群。HBase利用Hadoop HDFS作为其文件存储系统,利用Hadoop MapReduce来处理HBase中海量数据。利用Zookeeper作为协调工具。
- HMaster— HBase中仅有一个Master server。
- HRegionServer—负责多个HRegion使之能向client端提供服务,在HBase cluster中会存在多个HRegionServer。
- ServerManager—负责管理Region server信息,如每个Region server的HServerInfo(这个对象包含HServerAddress和startCode),已load Region个数,死亡的Region server列表
- RegionManager—负责将region分配到region server的具体工作,还监视root和meta 这2个系统级的region状态。
- RootScanner—定期扫描root region,以发现没有分配的meta region。
- MetaScanner—定期扫描meta region,以发现没有分配的user region。
HBase在产品中还包含了Jetty,在HBase启动时采用嵌入式的方式来启动Jetty,因此可以通过web界面对HBase进行管理和查看当前运行的一些状态,非常轻巧。
4.1 为什么采用HBase?
HBase 不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库.所谓非结构化数据存储就是说HBase是基于列的而不是基于行的模式,这样方面读写你的大数据内容。
HBase是介于Map Entry(key & value)和DB Row之间的一种数据存储方式。就点有点类似于现在流行的Memcache,但不仅仅是简单的一个key对应一个 value,你很可能需要存储多个属性的数据结构,但没有传统数据库表中那么多的关联关系,这就是所谓的松散数据。
简单来说,你在HBase中的表创建的可以看做是一张很大的表,而这个表的属性可以根据需求去动态增加,在HBase中没有表与表之间关联查询。你只需要 告诉你的数据存储到Hbase的那个column families 就可以了,不需要指定它的具体类型:char,varchar,int,tinyint,text等等。但是你需要注意HBase中不包含事务此类的功 能。
Apache HBase 和Google Bigtable 有非常相似的地方,一个数据行拥有一个可选择的键和任意数量的列。表是疏松的存储的,因此用户可以给行定义各种不同的列,对于这样的功能在大项目中非常实用,可以简化设计和升级的成本。
4.1 数据模型
HBASE中的每一张表,就是所谓的BigTable。稀疏表。
RowKey 和 ColumnKey 是二进制值byte[],按字典顺序排序;
Timestamp 是一个 64 位整数;
value 是一个未解释的字节数组byte[]。
表中的不同行可以拥有不同数量的成员。即支持“动态模式“模型
- 字符串、整数、二进制串甚至串行化的结构都可以作为行键
- 表按照行键的“逐字节排序”顺序对行进行有序化处理
- 表内数据非常‘稀疏’,不同的行的列的数完全目可以大不相同
- 可以只对一行上“锁”
- 对行的写操作是始终是“原子”的
1.列必须用‘族’(family)来定义
2.任意一列有如下形式
“族:标签”
其中,族和标签都可为任意形式的串
3.物理上将同“族”数据存储在一起
4.数据可通过时间戳区分版本
表是存放数据的。表由行和列组成
数据模型
- Row Key: 行键,Table的主键,Table中的记录按照Row Key排序
- Timestamp: 时间戳,每次数据操作对应的时间戳,可以看作是数据的version number
- Column Family:列簇,Table在水平方向有一个或者多个Column Family组成,一个Column Family中可以有任意多个Column组成,即Column Family支持动态扩展,无需预先定义Column的数量以及类型,所有Column均以二进制格式存储,用户需要自行进行类型转换。
4.2 物理存储
Table 在行的方向上分割为多个HRegion,一个region由[startkey,endkey)表示,每个HRegion分散在不同的RegionServer中。(参数hbase.hregion.max.filesize)
4.3 架构体系
架构体系:
- Client包含访问hbase的接口,client维护着一些cache来加快对hbase的访问,比如regione的位置信息。
- Zookeeper:
- 保证任何时候,集群中只有一个runnning master。
- 存贮所有Region的寻址入口。
- 实时监控Region Server 的状态,将Region Server的上线和下线信息。
- 存储HBase的schema,包括有哪些table,每个table有哪些column family。
- Mater可以启动多个HMaster,通过Zookeeper的Matser Election机制保证总有一个Master运行。
- 为Region Server分配运行。
- 负责Region Server。
- 发现失效的Region Server并重新分配其上的Region。
Region Server
维护Mater分配给它的Region,处理对这些region的IO请求。
负责切分在运行过程中变得过大的region。
可以看出,client访问hbase上数据的过程并不需要master参与,寻址访问zookeeper和region server,数据读写访问region server。
HRegion Server主要负责相应用户I/O请求,向HDFS文件系统中读写数据,是HBase中最核心的模块。
4.4 HBase Shell
提供HBase的状态,例如:服务器的数量
status
提供正在使用的hbase版本
version
表引用命令提供帮助
table_help
提供有关用户的信息
whoami
授予用户权限(hadoop是用户,R:读,W:写,X:执行,C:创建,A:管理)
grant 'hadoop' 'RWXCA'
撤销用户权限
revoke 'hadoop'
列出特定表的所有权限
user_permission 'emp'
DDL表级操作
创建表:
create 'users','user_id','address','info'
(表users,有三个列族user_id,address,info)
列出全部表
list
得到表的描述
describe 'users'
创建表
create 'users_tmp','user_id','address','info'
禁用表
disable 'users_tmp'
启用表
enable 'users_tmp'
验证表是否已经启用
is_enabled 'users_tmp'
删除表(如果删除表,需要先屏蔽表(disable),才能删除(drop))
disable 'users_tmp'
drop 'users_tmp'
改变一个表
alter 'users_tmp'
验证表是否存在
exists 'users_tmp'
DML记录级操作
添加记录
put 'users','xiaoming','info:age','24';
put 'users','xiaoming','info:birthday','1987-06-17';
put 'users','xiaoming','info:company','alibaba';
获取一条记录
get 'users','xiaoming'
获取一个id,一个列族的所有数据
get 'users','xiaoming','info'
获取一个id,一个列族中的一个列的所有数据
get 'users','xiaoming','info:age'
更新记录
put 'users','xiaoming','info:age','29'
get 'users','xiaoming','info:age'
put 'users','xiaoming','info:age','30'
get 'users','xiaoming','info:age'
获取单元格数据的版本数据
get 'users','xiaoming',{COLUMN=>'info:age',VERSION=>1}
get 'users','xiaoming',{COLUMN=>'info:age',VERSION=>2}
get 'users','xiaoming',{COLUMN=>'info:age',VERSION=>3}
获取单元格数据的某个版本数据
get 'users','xiaoming',{COLUMN=>'info:age',TIMESTAMP=>1364874937056}
全表扫描
scan 'users'
删除xiaoming值得'info:age'字段
delete 'users','xiaoming','info:age'
get 'users','xiaoming'
删除整行
deleteall 'users','xiaoming'
统计表的行数
count 'users'
清空表(禁用,删除和重新创建一个指定的表)
truncate 'users'
HBase中有两张特殊的Table,-ROOT-和.META.
- .META.:记录了用户表的Region信息,.META.可以有多个region。
- -ROOT-:记录.META.表的Region信息,-ROOT-只有一个Region,zookeeper中记录了-ROOT-表的location。
Client访问用户数据之前需要首先访问zookeeper,然后访问-ROOT-表,接着访问.META.表,最后才能找到用户数据的位置去访问。
4.5 HBase的Java_API
4.5.1 必备操作
//可以不放在静态块,按照自己的喜欢
public static Configuration configuration = null;
static
{
configuration = HBaseConfiguration.create();
configuration.set("hbase.master", "Master:60000");
configuration.set("hbase.zookeeper.quorum", "Master,Slave1,Slave2");
configuration.set("hbase.zookeeper.property.clientPort", "2181");
}
4.5.2 创建一张表
/**
* 创建一张表
*/
public static void creatTable(String tableName, String[] familys) throws Exception {
HBaseAdmin admin = new HBaseAdmin(conf);
if (admin.tableExists(tableName)) {
System.out.println("table already exists!");
} else {
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
for(int i=0; i<familys.length; i++){
tableDesc.addFamily(new HColumnDescriptor(familys[i]));
}
admin.createTable(tableDesc);
System.out.println("create table " + tableName + " ok.");
}
}
4.5.3 删除表
/**
* 删除表
*/
public static void deleteTable(String tableName) throws Exception {
try {
HBaseAdmin admin = new HBaseAdmin(conf);
admin.disableTable(tableName);
admin.deleteTable(tableName);
System.out.println("delete table " + tableName + " ok.");
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
}
}
4.5.4 插入一条记录
/**
* 插入一行记录
*/
public static void addRecord (String tableName, String rowKey, String family, String qualifier, String value)
throws Exception{
try {
HTable table = new HTable(conf, tableName);
Put put = new Put(Bytes.toBytes(rowKey));
put.add(Bytes.toBytes(family),Bytes.toBytes(qualifier),Bytes.toBytes(value));
table.put(put);
System.out.println("insert recored " + rowKey + " to table " + tableName +" ok.");
} catch (IOException e) {
e.printStackTrace();
}
}
4.5.5 删除一条记录
/**
* 删除一行记录
*/
public static void delRecord (String tableName, String rowKey) throws IOException{
HTable table = new HTable(conf, tableName);
List list = new ArrayList();
Delete del = new Delete(rowKey.getBytes());
list.add(del);
table.delete(list);
System.out.println("del recored " + rowKey + " ok.");
}
4.5.6 查找一行记录
/**
* 查找一行记录
*/
public static void getOneRecord (String tableName, String rowKey) throws IOException{
HTable table = new HTable(conf, tableName);
Get get = new Get(rowKey.getBytes());
Result rs = table.get(get);
for(KeyValue kv : rs.raw()){
System.out.print(new String(kv.getRow()) + " " );
System.out.print(new String(kv.getFamily()) + ":" );
System.out.print(new String(kv.getQualifier()) + " " );
System.out.print(kv.getTimestamp() + " " );
System.out.println(new String(kv.getValue()));
}
}
4.5.7 显示所有数据
/**
* 显示所有数据
*/
public static void getAllRecord (String tableName) {
try{
HTable table = new HTable(conf, tableName);
Scan s = new Scan();
ResultScanner ss = table.getScanner(s);
for(Result r:ss){
for(KeyValue kv : r.raw()){
System.out.print(new String(kv.getRow()) + " ");
System.out.print(new String(kv.getFamily()) + ":");
System.out.print(new String(kv.getQualifier()) + " ");
System.out.print(kv.getTimestamp() + " ");
System.out.println(new String(kv.getValue()));
}
}
} catch (IOException e){
e.printStackTrace();
}
}
4.5.8 测试之前的CURD
public static void main (String [] agrs) {
try {
String tablename = "scores3";
String[] familys = {"grade", "course"};
HBaseExampleClient.creatTable(tablename, familys);
//add record kevin
HBaseExampleClient.addRecord(tablename,"kevin","grade","","5");
HBaseExampleClient.addRecord(tablename,"kevin","course","","90");
HBaseExampleClient.addRecord(tablename,"kevin","course","math","97");
HBaseExampleClient.addRecord(tablename,"kevin","course","art","87");
//add record coco
HBaseExampleClient.addRecord(tablename,"kevin","grade","","4");
HBaseExampleClient.addRecord(tablename,"kevin","course","math","89");
System.out.println("===========get one record========");
HBaseExampleClient.getOneRecord(tablename, "kevin");
System.out.println("===========show all record========");
HBaseExampleClient.getAllRecord(tablename);
System.out.println("===========del one record========");
HBaseExampleClient.delRecord(tablename, "coco");
HBaseExampleClient.getAllRecord(tablename);
System.out.println("===========show all record========");
HBaseExampleClient.getAllRecord(tablename);
} catch (Exception e) {
e.printStackTrace();
}
}
4.6 案例
package org.my.myHBase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import javax.ws.rs.PUT;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
public class HBaseExampleClient {
public static Configuration conf = null;
static
{
conf = HBaseConfiguration.create();
conf.set("hbase.master", "Master:60000");
conf.set("hbase.zookeeper.quorum", "Master,Slave1,Slave2");
conf.set("hbase.zookeeper.property.clientPort", "2181");
}
/**
* 创建一张表
*/
public static void creatTable(String tableName, String[] familys) throws Exception {
HBaseAdmin admin = new HBaseAdmin(conf);
if (admin.tableExists(tableName)) {
System.out.println("table already exists!");
} else {
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
for(int i=0; i<familys.length; i++){
tableDesc.addFamily(new HColumnDescriptor(familys[i]));
}
admin.createTable(tableDesc);
System.out.println("create table " + tableName + " ok.");
}
}
/**
* 删除表
*/
public static void deleteTable(String tableName) throws Exception {
try {
HBaseAdmin admin = new HBaseAdmin(conf);
admin.disableTable(tableName);
admin.deleteTable(tableName);
System.out.println("delete table " + tableName + " ok.");
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
}
}
/**
* 插入一行记录
*/
public static void addRecord (String tableName, String rowKey, String family, String qualifier, String value)
throws Exception{
try {
HTable table = new HTable(conf, tableName);
Put put = new Put(Bytes.toBytes(rowKey));
put.add(Bytes.toBytes(family),Bytes.toBytes(qualifier),Bytes.toBytes(value));
table.put(put);
System.out.println("insert recored " + rowKey + " to table " + tableName +" ok.");
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 删除一行记录
*/
public static void delRecord (String tableName, String rowKey) throws IOException{
HTable table = new HTable(conf, tableName);
List list = new ArrayList();
Delete del = new Delete(rowKey.getBytes());
list.add(del);
table.delete(list);
System.out.println("del recored " + rowKey + " ok.");
}
/**
* 查找一行记录
*/
public static void getOneRecord (String tableName, String rowKey) throws IOException{
HTable table = new HTable(conf, tableName);
Get get = new Get(rowKey.getBytes());
Result rs = table.get(get);
for(KeyValue kv : rs.raw()){
System.out.print(new String(kv.getRow()) + " " );
System.out.print(new String(kv.getFamily()) + ":" );
System.out.print(new String(kv.getQualifier()) + " " );
System.out.print(kv.getTimestamp() + " " );
System.out.println(new String(kv.getValue()));
}
}
/**
* 显示所有数据
*/
public static void getAllRecord (String tableName) {
try{
HTable table = new HTable(conf, tableName);
Scan s = new Scan();
ResultScanner ss = table.getScanner(s);
for(Result r:ss){
for(KeyValue kv : r.raw()){
System.out.print(new String(kv.getRow()) + " ");
System.out.print(new String(kv.getFamily()) + ":");
System.out.print(new String(kv.getQualifier()) + " ");
System.out.print(kv.getTimestamp() + " ");
System.out.println(new String(kv.getValue()));
}
}
} catch (IOException e){
e.printStackTrace();
}
}
public static void main (String [] agrs) {
try {
String tablename = "scores3";
String[] familys = {"grade", "course"};
HBaseExampleClient.creatTable(tablename, familys);
//add record kevin
HBaseExampleClient.addRecord(tablename,"kevin","grade","","5");
HBaseExampleClient.addRecord(tablename,"kevin","course","","90");
HBaseExampleClient.addRecord(tablename,"kevin","course","math","97");
HBaseExampleClient.addRecord(tablename,"kevin","course","art","87");
//add record coco
HBaseExampleClient.addRecord(tablename,"kevin","grade","","4");
HBaseExampleClient.addRecord(tablename,"kevin","course","math","89");
System.out.println("===========get one record========");
HBaseExampleClient.getOneRecord(tablename, "kevin");
System.out.println("===========show all record========");
HBaseExampleClient.getAllRecord(tablename);
System.out.println("===========del one record========");
HBaseExampleClient.delRecord(tablename, "coco");
HBaseExampleClient.getAllRecord(tablename);
System.out.println("===========show all record========");
HBaseExampleClient.getAllRecord(tablename);
} catch (Exception e) {
e.printStackTrace();
}
}
}
4.7 HBase结合MapReduce批量导入
4.7.1 Mapper
static class BatchImportMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
SimpleDateFormat dateformat1=new SimpleDateFormat("yyyyMMddHHmmss");
Text v2 = new Text();
protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {
final String[] splited = value.toString().split("\t");
try {
final Date date = new Date(Long.parseLong(splited[0].trim()));
final String dateFormat = dateformat1.format(date);
String rowKey = splited[1]+":"+dateFormat;
v2.set(rowKey+"\t"+value.toString());
context.write(key, v2);
} catch (NumberFormatException e) {
final Counter counter = context.getCounter("BatchImport", "ErrorFormat");
counter.increment(1L);
System.out.println("出错了"+splited[0]+" "+e.getMessage());
}
};
}
4.7.2 Reduce
static class BatchImportReducer extends TableReducer<LongWritable, Text, NullWritable>{
protected void reduce(LongWritable key, java.lang.Iterable<Text> values, Context context) throws java.io.IOException ,InterruptedException {
for (Text text : values) {
final String[] splited = text.toString().split("\t");
final Put put = new Put(Bytes.toBytes(splited[0]));
put.add(Bytes.toBytes("cf"), Bytes.toBytes("date"), Bytes.toBytes(splited[1]));
//省略其他字段,调用put.add(....)即可
context.write(NullWritable.get(), put);
}
};
}
4.7.3 Driver
public static void main(String[] args) throws Exception {
final Configuration configuration = new Configuration();
//设置zookeeper
configuration.set("hbase.zookeeper.quorum", "hadoop0");
//设置hbase表名称
configuration.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log");
//将该值改大,防止hbase超时退出
configuration.set("dfs.socket.timeout", "180000");
final Job job = new Job(configuration, "HBaseBatchImport");
job.setMapperClass(BatchImportMapper.class);
job.setReducerClass(BatchImportReducer.class);
//设置map的输出,不设置reduce的输出类型
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
//不再设置输出路径,而是设置输出格式类型
job.setOutputFormatClass(TableOutputFormat.class);
FileInputFormat.setInputPaths(job, "hdfs://hadoop0:9000/input");
job.waitForCompletion(true);
}
4.8 HBase操作
提供HBase的状态,例如:服务器的数量
status
提供正在使用的hbase版本
version
表引用命令提供帮助
table_help
提供有关用户的信息
whoami
授予用户权限(hadoop是用户,R:读,W:写,X:执行,C:创建,A:管理)
grant 'hadoop' 'RWXCA'
撤销用户权限
revoke 'hadoop'
列出特定表的所有权限
user_permission 'emp'
DDL表级操作
创建表:
create 'users','user_id','address','info'
列出全部表
list
得到表的描述
describe 'users'
创建表
create 'users_tmp','user_id','address','info'
禁用表
disable 'users_tmp'
启用表
enable 'users_tmp'
验证表是否已经启用
is_enabled 'users_tmp'
删除表(如果删除表,需要先屏蔽表(disable),才能删除(drop))
disable 'users_tmp'
drop 'users_tmp'
改变一个表
alter 'users_tmp'
验证表是否存在
exists 'users_tmp'
DML记录级操作
添加记录
put 'users','xiaoming','info:age','24';
put 'users','xiaoming','info:birthday','1987-06-17';
put 'users','xiaoming','info:company','alibaba';
获取一条记录
get 'users','xiaoming'
获取一个id,一个列族的所有数据
get 'users','xiaoming','info'
获取一个id,一个列族中的一个列的所有数据
get 'users','xiaoming','info:age'
更新记录
put 'users','xiaoming','info:age','29'
get 'users','xiaoming','info:age'
put 'users','xiaoming','info:age','30'
get 'users','xiaoming','info:age'
获取单元格数据的版本数据
get 'users','xiaoming',{COLUMN=>'info:age',VERSION=>1}
get 'users','xiaoming',{COLUMN=>'info:age',VERSION=>2}
get 'users','xiaoming',{COLUMN=>'info:age',VERSION=>3}
获取单元格数据的某个版本数据
get 'users','xiaoming',{COLUMN=>'info:age',TIMESTAMP=>1364874937056}
全表扫描
scan 'users'
删除xiaoming值得'info:age'字段
delete 'users','xiaoming','info:age'
get 'users','xiaoming'
删除整行
deleteall 'users','xiaoming'
统计表的行数
count 'users'
清空表(禁用,删除和重新创建一个指定的表)
truncate 'users'
二级索引
单表建立二级索引