Hadoop之HBase解读
HBase:给予HDFS的NpSQL数据库
1、什么是NoSQL和常见的NoSQL
1)基于Key-Value保存数据
2)NoSQL数据库不支持事务(Transaction)
3)常见的NoSQL数据库
(*)HBase:基于HDFS之上的NoSQL数据库,面向列
(*)Redis:基于内存的NoSQL数据库
(*)Cassandra:类似HBase,面向列的NoSQL
(*)MongoDB:文档(Bson文档)型NoSQL数据库Bson就是json的二进
2、体系结构和表结构
1、 安装配置:三种模式
前提:装好Hadoop
解压和设置环境变量:tar -zxvf hbase-0.96.2-hadoop2-bin.tar.gz -C~/training/
HBASE_HOME=/root/training/hbase-0.96.2-hadoop2
export HBASE_HOME
PATH=$HBASE_HOME/bin:$PATH
export PATH
1、本地模式:不需要Hadoop的支持 -----> hadoop111
hbase-env.sh
exportJAVA_HOME=/root/training/jdk1.7.0_75
hbase-site.xml
<property>
<name>hbase.rootdir</name>
<value>file:///root/training/hbase-0.96.2-hadoop2/data</value>
</property>
启动:start-hbase.sh
2、伪分布模式:在单机上模拟一个分布式的环境-----> hadoop111
hbase-env.sh:
exportHBASE_MANAGES_ZK=true
hbase-site.xml:
<property>
<name>hbase.rootdir</name>
<value>hdfs://192.168.157.111:9000/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>192.168.157.111</value>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
HBase Web Console: 端口60010
3、全分布模式:三台 ----->hadoop112,hadoop113,hadoop114
<property>
<name>hbase.rootdir</name>
<value>hdfs://192.168.157.112:9000/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>192.168.157.112</value>
</property>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>hbase.master.maxclockskew</name>
<value>180000</value>
</property>
regionservers从节点位置
192.168.157.113
192.168.157.114
把安装好的目录复制到从节点
scp -rhbase-0.96.2-hadoop2/ [email protected]:/root/training
scp -rhbase-0.96.2-hadoop2/ [email protected]:/root/training
4、实现HBase的HA:-----> hadoop112,hadoop113,hadoop114
直接在另一个节点上,启动hmaster
4、命令行:hbase shell 、JavaAPI
1、命令行
(1)创建表:表名、列族的名字 -----> HDFS目录
create 'student','info','grade'
create'student',{NAME=>'info',VERSIONS=>'3'}
查看表的结构:describe 'student'
(2)插入数据: put 表名,行键, 列(列族的名字+列的名字),值
put'student','stu001','info:name','Tom'
put'student','stu001','info:age','24'
put'student','stu001','grade:chinese','80'
put'student','stu002','info:name','Mary'
(3)查询数据:
(*)scan: 相当于 select * from emp;
scan 'student'
(*)get:相当于 select * from *** whererowkey=???
格式: get 表名,rowkey
get 'student','stu001'
(4) count 'student'
(5)清空表: truncate 表名
(*)补充:Oracle中,清空表的数据:
delete from table;
truncate table ****;
区别:
1、delete是DML(Data Manipulation Language 数据操作语言):可以回滚
truncate是DDL(Data Definition Language 数据定义语言): create/drop table 不可以回滚(DDL是隐式提交)
2、delete逐条删除;truncate先摧毁表,再重建表
3、delete会产生碎片;truncate不会产生碎片
4、delete可以闪回;truncate不可以闪回(flashback)
(*)hbase(main):010:0> truncate 'student'
Truncating 'student'table (it may take a while):
- Disabling table...
- Dropping table...
- Creating table...
0 row(s) in 2.1140seconds
(6)删除表:drop 'student'
先:disable 'student'
2、Java API: 依赖的jar /root/training/hbase-0.96.2-hadoop2/lib
package demo;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test;
public class TestDemoHBase {
@Test public void testCreateTable() throws Exception{ //获取HMaster的地址信息,配置ZK的地址 Configuration conf = new Configuration(); conf.set("hbase.zookeeper.quorum", "192.168.157.111");
//获取一个HBase的客户端:HBaseAdmin HBaseAdmin admin = new HBaseAdmin(conf);
//创建表 //表名 HTableDescriptor ht = new HTableDescriptor(TableName.valueOf("mystudent"));
//创建列族 HColumnDescriptor hc1 = new HColumnDescriptor("info"); HColumnDescriptor hc2 = new HColumnDescriptor("grade");
//将列族加入表 ht.addFamily(hc1); ht.addFamily(hc2);
//创建表 admin.createTable(ht);
//关闭 admin.close(); }
@Test public void testPut() throws Exception{ //获取HMaster的地址信息,配置ZK的地址 Configuration conf = new Configuration(); conf.set("hbase.zookeeper.quorum", "192.168.157.111");
//获取HBase客户端:HTable HTable table = new HTable(conf, "mystudent");
//构造一个Put对象,传递一个rowkey Put put = new Put(Bytes.toBytes("stu001")); // put.add(family, 列族的名字 // qualifier, 列的名字 // value) 值 put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("Tom"));
//插入数据 table.put(put);
table.close(); }
@Test public void testPutList() throws Exception{ //相当于: insert into **** select **** //作业 }
@Test public void testGet() throws Exception{ //获取HMaster的地址信息,配置ZK的地址 Configuration conf = new Configuration(); conf.set("hbase.zookeeper.quorum", "192.168.157.111");
//获取HBase客户端:HTable HTable table = new HTable(conf, "mystudent");
//构造一个Get对象,传递rowkey Get get = new Get(Bytes.toBytes("stu001"));
//查询 Result r = table.get(get);
//取出这条记录的name和age String name = Bytes.toString(r.getValue(Bytes.toBytes("info"), Bytes.toBytes("name"))); String age = Bytes.toString(r.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")));
//打印 System.out.println(name +"\t"+ age);
table.close(); }
@Test public void testScan() throws Exception{ //获取HMaster的地址信息,配置ZK的地址 Configuration conf = new Configuration(); conf.set("hbase.zookeeper.quorum", "192.168.157.111");
//获取HBase客户端:HTable HTable table = new HTable(conf, "mystudent");
//定义一个扫描器 Scan scan = new Scan(); //scan.setFilter(filter) ----> 定义一个过滤器: where条件
ResultScanner rs = table.getScanner(scan); for(Result r: rs){ //取出名字 String name = Bytes.toString(r.getValue(Bytes.toBytes("info"), Bytes.toBytes("name"))); System.out.println(name); }
table.close(); }
@Test public void testDropTable() throws Exception{ //获取HMaster的地址信息,配置ZK的地址 Configuration conf = new Configuration(); conf.set("hbase.zookeeper.quorum", "192.168.157.111");
//获取一个HBase的客户端:HBaseAdmin HBaseAdmin admin = new HBaseAdmin(conf);
//删除表 admin.disableTable("mystudent"); admin.deleteTable("mystudent");
admin.close(); } } |
1、Web Console:端口: 60010
5、数据保存的过程------> 问题:Region的分裂
6、HBase过滤器(filter):Java程序
1、准备测试数据
2、类型
(*)列值过滤器:SingleColumnValueFilter
举例:查询工资等于3000的员工姓名 select enamefrom emp where sal=3000;
(*)列名前缀过滤器:ColumnPrefixFilter
举例:查询员工姓名 select enamefrom emp;
(*)多个列名前缀过滤器: MultipleColumnPrefixFilter
举例:查询员工姓名和薪水: select ename,sal from emp;
(*)rowkey 过滤器: 查询员工号等于7839的员工
select * from emp whereempno=7839;
(*)组合多个过滤器
package demo.filter;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.filter.MultipleColumnPrefixFilter; import org.apache.hadoop.hbase.filter.RegexStringComparator; import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test;
public class TestHBaseFilter {
@Test public void testSingleColumnValueFilter() throws Exception{ //指定的配置信息: ZooKeeper Configuration conf = new Configuration(); conf.set("hbase.zookeeper.quorum", "192.168.157.111");
//客户端 HTable table = new HTable(conf, "emp");
//创建一个扫描器 Scan scan = new Scan();
//创建列值过滤器 SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("empinfo"), //列族名字 Bytes.toBytes("sal"), //列的名字 CompareOp.EQUAL, //枚举类,代表比较运算符 Bytes.toBytes("3000"));
//将过滤器加入扫描器 scan.setFilter(filter);
//执行查询 ResultScanner rs = table.getScanner(scan); for(Result r:rs){ //打印名字 System.out.println(Bytes.toString(r.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("ename")))); }
table.close(); }
@Test public void testColumnPrefixFilter() throws Exception{ //指定的配置信息: ZooKeeper Configuration conf = new Configuration(); conf.set("hbase.zookeeper.quorum", "192.168.157.111");
//客户端 HTable table = new HTable(conf, "emp");
//创建一个扫描器 Scan scan = new Scan();
//创建列名前缀过滤器 ColumnPrefixFilter filter = new ColumnPrefixFilter(Bytes.toBytes("ename"));
scan.setFilter(filter);
//执行查询 ResultScanner rs = table.getScanner(scan); for(Result r:rs){ //打印名字 System.out.println(Bytes.toString(r.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("ename")))); }
table.close(); }
@Test public void testMultipleColumnPrefixFilter() throws Exception{ //指定的配置信息: ZooKeeper Configuration conf = new Configuration(); conf.set("hbase.zookeeper.quorum", "192.168.157.111");
//客户端 HTable table = new HTable(conf, "emp");
//创建一个扫描器 Scan scan = new Scan();
//创建多个列名前缀过滤器: 查询员工姓名和薪水 //构造一个二维数据 byte[][] namesList = {Bytes.toBytes("ename"),Bytes.toBytes("sal")};
MultipleColumnPrefixFilter filter = new MultipleColumnPrefixFilter(namesList); scan.setFilter(filter);
//执行查询 ResultScanner rs = table.getScanner(scan); for(Result r:rs){ String ename = Bytes.toString(r.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"))); String sal = Bytes.toString(r.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("sal")));
//打印名字 System.out.println(ename+"\t"+sal); }
table.close(); }
@Test public void testRowFilter() throws Exception{ //指定的配置信息: ZooKeeper Configuration conf = new Configuration(); conf.set("hbase.zookeeper.quorum", "192.168.157.111");
//客户端 HTable table = new HTable(conf, "emp");
//创建一个扫描器 Scan scan = new Scan(); //创建一个RowFilter RowFilter filter = new RowFilter(CompareOp.EQUAL, //比较规则,比较运算符 new RegexStringComparator("7839")); //rowkey值:采用正则表达式
scan.setFilter(filter);
//执行查询 ResultScanner rs = table.getScanner(scan); for(Result r:rs){ String ename = Bytes.toString(r.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"))); String sal = Bytes.toString(r.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("sal")));
//打印名字 System.out.println(ename+"\t"+sal); }
table.close(); }
@Test public void test5() throws Exception{ //查询薪水等于3000的员工姓名 /* * 使用两个过滤器 * 1、列值过滤器:薪水等于3000的员工 * 2、列名前缀过滤器:员工姓名 */
//指定的配置信息: ZooKeeper Configuration conf = new Configuration(); conf.set("hbase.zookeeper.quorum", "192.168.157.111");
//客户端 HTable table = new HTable(conf, "emp");
//创建一个扫描器 Scan scan = new Scan();
//创建第一个过滤器 SingleColumnValueFilter filter1 = new SingleColumnValueFilter(Bytes.toBytes("empinfo"), //列族名字 Bytes.toBytes("sal"), //列的名字 CompareOp.EQUAL, //枚举类,代表比较运算符 Bytes.toBytes("3000"));
//创建第二个过滤器 ColumnPrefixFilter filter2 = new ColumnPrefixFilter(Bytes.toBytes("ename"));
//创建一个FilterList FilterList list = new FilterList(Operator.MUST_PASS_ALL); //相当于and list.addFilter(filter1); list.addFilter(filter2);
//将两个过滤器加入扫描器 scan.setFilter(list);
//执行查询 ResultScanner rs = table.getScanner(scan); for(Result r:rs){ String ename = Bytes.toString(r.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"))); String sal = Bytes.toString(r.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("sal")));
//打印名字 System.out.println(ename+"\t"+sal); }
table.close();
} } |
7、HBase上的MapReduce程序
package demo.wc;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
// k3 v3 Reduce的输出:一条记录 public class WordCountReducer extends TableReducer<Text, LongWritable, ImmutableBytesWritable> {
@Override protected void reduce(Text k3, Iterable<LongWritable> v3,Context context) throws IOException, InterruptedException { // 求和 long total = 0; for(LongWritable l:v3){ total = total + l.get(); }
//输出结果:是表中的一条记录 //构造一个Put对象: 使用单词k3作为rowkey Put put = new Put(Bytes.toBytes(k3.toString()));
put.add(Bytes.toBytes("content"), Bytes.toBytes("result"), Bytes.toBytes(String.valueOf(total)));
//输出 context.write(new ImmutableBytesWritable(Bytes.toBytes(k3.toString())), //插入数据的时候,rowkey是多少 put);//数据 }
} |
package demo.wc;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
//没有输入k1和v1,现在的输入就是表中的一条记录 // k2 v2 public class WordCountMapper extends TableMapper<Text, LongWritable> {
@Override protected void map(ImmutableBytesWritable key, Result value,Context context) throws IOException, InterruptedException { //输入的就是表中的一条记录 //key : 记录的rowkey //value: 输入的记录 //取出数据 String str = Bytes.toString(value.getValue(Bytes.toBytes("content"), Bytes.toBytes("info")));
//分词 String[] words = str.split(" ");
//输出 for(String w: words){ context.write(new Text(w), new LongWritable(1)); } }
} |
package demo.wc;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job;
public class WordCountMain {
public static void main(String[] args) throws Exception { //指定的配置信息: ZooKeeper Configuration conf = new Configuration(); conf.set("hbase.zookeeper.quorum", "192.168.157.111");
//创建一个job Job job = Job.getInstance(conf); job.setJarByClass(WordCountMain.class);
//定义一个扫描器:只读入需要处理的数据 Scan scan = new Scan(); scan.addColumn(Bytes.toBytes("content"), Bytes.toBytes("info"));//指定要读取的列
//指定任务的Mapper //TableMapReduceUtil.initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job); TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("word"), //输入的表 scan, //扫描器 WordCountMapper.class, Text.class, LongWritable.class, job );
//指定任务的Reducer 输出的表名 TableMapReduceUtil.initTableReducerJob("stat", WordCountReducer.class, job);
//执行 job.waitForCompletion(true); }
} |