Hadoop之HBase解读

HBase:给予HDFS的NpSQL数据库

1、什么是NoSQL和常见的NoSQL

1)基于Key-Value保存数据

2)NoSQL数据库不支持事务(Transaction)

3)常见的NoSQL数据库

 (*)HBase:基于HDFS之上的NoSQL数据库,面向列

 (*)Redis:基于内存的NoSQL数据库

 (*)Cassandra:类似HBase,面向列的NoSQL

 (*)MongoDB:文档(Bson文档)型NoSQL数据库Bson就是json的二进

Hadoop之HBase解读

2、体系结构和表结构

 Hadoop之HBase解读

1、 安装配置:三种模式

                前提:装好Hadoop

       解压和设置环境变量:tar -zxvf hbase-0.96.2-hadoop2-bin.tar.gz -C~/training/

                            HBASE_HOME=/root/training/hbase-0.96.2-hadoop2

                            export HBASE_HOME

 

                            PATH=$HBASE_HOME/bin:$PATH

                            export PATH

             

             

              1、本地模式:不需要Hadoop的支持     ----->  hadoop111

                     hbase-env.sh

                                   exportJAVA_HOME=/root/training/jdk1.7.0_75

                                  

                     hbase-site.xml

                                   <property>

                                     <name>hbase.rootdir</name>

                                    <value>file:///root/training/hbase-0.96.2-hadoop2/data</value>

                                   </property>        

 

                     启动:start-hbase.sh

             

             

              2、伪分布模式:在单机上模拟一个分布式的环境-----> hadoop111

                     hbase-env.sh:

                            exportHBASE_MANAGES_ZK=true

 

                     hbase-site.xml:

                            <property>

                              <name>hbase.rootdir</name>

                             <value>hdfs://192.168.157.111:9000/hbase</value>

                            </property>

 

                            <property>

                             <name>hbase.cluster.distributed</name>

                              <value>true</value>

                            </property>

 

                            <property>

                             <name>hbase.zookeeper.quorum</name>

                              <value>192.168.157.111</value>

                            </property>

 

                            <property>

                              <name>dfs.replication</name>

                              <value>1</value>

                            </property>        

 

                     HBase Web Console: 端口60010

             

             

              3、全分布模式:三台   ----->hadoop112,hadoop113,hadoop114

                            <property>

                              <name>hbase.rootdir</name>

                             <value>hdfs://192.168.157.112:9000/hbase</value>

                            </property>

 

                            <property>

                             <name>hbase.cluster.distributed</name>

                              <value>true</value>

                            </property>

 

                            <property>

                             <name>hbase.zookeeper.quorum</name>

                              <value>192.168.157.112</value>

                            </property>

 

                            <property>

                              <name>dfs.replication</name>

                              <value>2</value>

                            </property> 

 

                            <property>

                              <name>hbase.master.maxclockskew</name>

                              <value>180000</value>

                            </property>        

                           

                     regionservers从节点位置

                            192.168.157.113

                            192.168.157.114

 

 

                     把安装好的目录复制到从节点

                     scp -rhbase-0.96.2-hadoop2/ [email protected]:/root/training

                     scp -rhbase-0.96.2-hadoop2/ [email protected]:/root/training

             

              4、实现HBase的HA:-----> hadoop112,hadoop113,hadoop114

                     直接在另一个节点上,启动hmaster

Hadoop之HBase解读

4、命令行:hbase shell 、JavaAPI

1、命令行

              (1)创建表:表名、列族的名字  -----> HDFS目录

                       create 'student','info','grade'

                             

                             create'student',{NAME=>'info',VERSIONS=>'3'}

                             

                      查看表的结构:describe 'student'

             Hadoop之HBase解读

              (2)插入数据:  put 表名,行键, 列(列族的名字+列的名字),值

                      put'student','stu001','info:name','Tom'

                            put'student','stu001','info:age','24'

                            put'student','stu001','grade:chinese','80'

                            put'student','stu002','info:name','Mary'

                           

              (3)查询数据:

                            (*)scan: 相当于  select * from emp;

                                  scan 'student'

                           

                            (*)get:相当于  select * from *** whererowkey=???

                             格式: get 表名,rowkey

                                      get 'student','stu001'

             

             

              (4) count 'student'

             

              (5)清空表: truncate 表名

                     (*)补充:Oracle中,清空表的数据:

                                          delete from table;

                                                         truncate table ****;

              区别:

       1、delete是DML(Data Manipulation Language 数据操作语言):可以回滚

                     truncate是DDL(Data Definition Language 数据定义语言): create/drop table  不可以回滚(DDL是隐式提交)

       2、delete逐条删除;truncate先摧毁表,再重建表

       3、delete会产生碎片;truncate不会产生碎片

       4、delete可以闪回;truncate不可以闪回(flashback)

                                         

                     (*)hbase(main):010:0> truncate 'student'

                            Truncating 'student'table (it may take a while):

                             - Disabling table...

                             - Dropping table...

                             - Creating table...

                            0 row(s) in 2.1140seconds

 

              (6)删除表:drop 'student'

                     先:disable 'student'

             

       2、Java API: 依赖的jar /root/training/hbase-0.96.2-hadoop2/lib

      

package demo;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HColumnDescriptor;

import org.apache.hadoop.hbase.HTableDescriptor;

import org.apache.hadoop.hbase.TableName;

import org.apache.hadoop.hbase.client.Get;

import org.apache.hadoop.hbase.client.HBaseAdmin;

import org.apache.hadoop.hbase.client.HTable;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.ResultScanner;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.util.Bytes;

import org.junit.Test;

 

public class TestDemoHBase {

 

       @Test

       public void testCreateTable() throws Exception{

              //获取HMaster的地址信息,配置ZK的地址

              Configuration conf = new Configuration();

              conf.set("hbase.zookeeper.quorum", "192.168.157.111");

             

              //获取一个HBase的客户端:HBaseAdmin

              HBaseAdmin admin = new HBaseAdmin(conf);

             

              //创建表

              //表名

              HTableDescriptor ht = new HTableDescriptor(TableName.valueOf("mystudent"));

             

              //创建列族

              HColumnDescriptor hc1 = new HColumnDescriptor("info");

              HColumnDescriptor hc2 = new HColumnDescriptor("grade");

             

              //将列族加入表

              ht.addFamily(hc1);

              ht.addFamily(hc2);

             

              //创建表

              admin.createTable(ht);

             

              //关闭

              admin.close();

       }

      

       @Test

       public void testPut() throws Exception{

              //获取HMaster的地址信息,配置ZK的地址

              Configuration conf = new Configuration();

              conf.set("hbase.zookeeper.quorum", "192.168.157.111");

             

              //获取HBase客户端:HTable

              HTable table = new HTable(conf, "mystudent");

             

              //构造一个Put对象,传递一个rowkey

              Put put = new Put(Bytes.toBytes("stu001"));

//           put.add(family,  列族的名字

//                         qualifier, 列的名字

//                         value)  值

              put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("Tom"));

             

              //插入数据

              table.put(put);

             

              table.close();

       }

      

       @Test

       public void testPutList() throws Exception{

              //相当于: insert into **** select ****

              //作业

       }

      

       @Test

       public void testGet() throws Exception{

              //获取HMaster的地址信息,配置ZK的地址

              Configuration conf = new Configuration();

              conf.set("hbase.zookeeper.quorum", "192.168.157.111");

             

              //获取HBase客户端:HTable

              HTable table = new HTable(conf, "mystudent");

             

              //构造一个Get对象,传递rowkey

              Get get = new Get(Bytes.toBytes("stu001"));

             

              //查询

              Result r = table.get(get);

             

              //取出这条记录的name和age

              String name = Bytes.toString(r.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")));

              String age = Bytes.toString(r.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")));

             

              //打印

              System.out.println(name +"\t"+ age);

             

              table.close();

       }

      

      

       @Test

       public void testScan() throws Exception{

              //获取HMaster的地址信息,配置ZK的地址

              Configuration conf = new Configuration();

              conf.set("hbase.zookeeper.quorum", "192.168.157.111");

             

              //获取HBase客户端:HTable

              HTable table = new HTable(conf, "mystudent");

             

              //定义一个扫描器

              Scan scan = new Scan();

              //scan.setFilter(filter) ----> 定义一个过滤器: where条件

             

           ResultScanner rs = table.getScanner(scan);

           for(Result r: rs){

                 //取出名字

                 String name = Bytes.toString(r.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")));

                 System.out.println(name);

           }

          

           table.close();

       }

      

       @Test

       public void testDropTable() throws Exception{

              //获取HMaster的地址信息,配置ZK的地址

              Configuration conf = new Configuration();

              conf.set("hbase.zookeeper.quorum", "192.168.157.111");

             

              //获取一个HBase的客户端:HBaseAdmin

              HBaseAdmin admin = new HBaseAdmin(conf);

             

              //删除表

              admin.disableTable("mystudent");

              admin.deleteTable("mystudent");

             

              admin.close();

       }

}

 

1、Web Console:端口: 60010

5、数据保存的过程------>  问题:Region的分裂

Hadoop之HBase解读

Hadoop之HBase解读

6、HBase过滤器(filter):Java程序

1、准备测试数据

2、类型

              (*)列值过滤器:SingleColumnValueFilter

 举例:查询工资等于3000的员工姓名   select enamefrom emp where sal=3000;

             

              (*)列名前缀过滤器:ColumnPrefixFilter

                          举例:查询员工姓名  select enamefrom emp;

             

              (*)多个列名前缀过滤器: MultipleColumnPrefixFilter

                          举例:查询员工姓名和薪水: select ename,sal from emp;

             

              (*)rowkey 过滤器: 查询员工号等于7839的员工

                                  select * from emp whereempno=7839;

             

              (*)组合多个过滤器

package demo.filter;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.client.HTable;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.ResultScanner;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;

import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;

import org.apache.hadoop.hbase.filter.FilterList;

import org.apache.hadoop.hbase.filter.FilterList.Operator;

import org.apache.hadoop.hbase.filter.MultipleColumnPrefixFilter;

import org.apache.hadoop.hbase.filter.RegexStringComparator;

import org.apache.hadoop.hbase.filter.RowFilter;

import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;

import org.apache.hadoop.hbase.util.Bytes;

import org.junit.Test;

 

public class TestHBaseFilter {

 

       @Test

       public void testSingleColumnValueFilter() throws Exception{

              //指定的配置信息: ZooKeeper

              Configuration conf = new Configuration();

              conf.set("hbase.zookeeper.quorum", "192.168.157.111");

             

              //客户端

              HTable table = new HTable(conf, "emp");          

             

              //创建一个扫描器

              Scan scan = new Scan();

             

              //创建列值过滤器

              SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("empinfo"),     //列族名字

                                                                                                                        Bytes.toBytes("sal"), //列的名字

                                                                                 CompareOp.EQUAL,  //枚举类,代表比较运算符

                                                                                 Bytes.toBytes("3000"));

             

              //将过滤器加入扫描器

              scan.setFilter(filter);

             

              //执行查询

        ResultScanner rs = table.getScanner(scan);

        for(Result r:rs){

             //打印名字

             System.out.println(Bytes.toString(r.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"))));

        }

       

        table.close();

       }

      

       @Test

       public void testColumnPrefixFilter() throws Exception{

              //指定的配置信息: ZooKeeper

              Configuration conf = new Configuration();

              conf.set("hbase.zookeeper.quorum", "192.168.157.111");

             

              //客户端

              HTable table = new HTable(conf, "emp");          

             

              //创建一个扫描器

              Scan scan = new Scan();

             

              //创建列名前缀过滤器

              ColumnPrefixFilter filter = new ColumnPrefixFilter(Bytes.toBytes("ename"));

             

              scan.setFilter(filter);

             

              //执行查询

        ResultScanner rs = table.getScanner(scan);

        for(Result r:rs){

             //打印名字

             System.out.println(Bytes.toString(r.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"))));

        }

       

        table.close();

       }

      

       @Test

       public void testMultipleColumnPrefixFilter() throws Exception{

              //指定的配置信息: ZooKeeper

              Configuration conf = new Configuration();

              conf.set("hbase.zookeeper.quorum", "192.168.157.111");

             

              //客户端

              HTable table = new HTable(conf, "emp");          

             

              //创建一个扫描器

              Scan scan = new Scan();

             

              //创建多个列名前缀过滤器: 查询员工姓名和薪水

              //构造一个二维数据

              byte[][] namesList = {Bytes.toBytes("ename"),Bytes.toBytes("sal")};

             

              MultipleColumnPrefixFilter filter = new MultipleColumnPrefixFilter(namesList);

              scan.setFilter(filter);

             

              //执行查询

        ResultScanner rs = table.getScanner(scan);

        for(Result r:rs){

             String ename = Bytes.toString(r.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("ename")));

             String sal = Bytes.toString(r.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("sal")));

            

             //打印名字

             System.out.println(ename+"\t"+sal);

        }

       

        table.close();           

       }

      

       @Test

       public void testRowFilter() throws Exception{

              //指定的配置信息: ZooKeeper

              Configuration conf = new Configuration();

              conf.set("hbase.zookeeper.quorum", "192.168.157.111");

             

              //客户端

              HTable table = new HTable(conf, "emp");   

             

              //创建一个扫描器

              Scan scan = new Scan();

              //创建一个RowFilter

              RowFilter filter = new RowFilter(CompareOp.EQUAL,  //比较规则,比较运算符

                                                     new RegexStringComparator("7839")); //rowkey值:采用正则表达式

             

              scan.setFilter(filter);

             

              //执行查询

        ResultScanner rs = table.getScanner(scan);

        for(Result r:rs){

             String ename = Bytes.toString(r.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("ename")));

             String sal = Bytes.toString(r.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("sal")));

            

             //打印名字

             System.out.println(ename+"\t"+sal);

        }

       

        table.close();           

       }

      

       @Test

       public void test5() throws Exception{

              //查询薪水等于3000的员工姓名

              /*

               * 使用两个过滤器

               * 1、列值过滤器:薪水等于3000的员工

               * 2、列名前缀过滤器:员工姓名

               */

             

              //指定的配置信息: ZooKeeper

              Configuration conf = new Configuration();

              conf.set("hbase.zookeeper.quorum", "192.168.157.111");

             

              //客户端

              HTable table = new HTable(conf, "emp");   

             

              //创建一个扫描器

              Scan scan = new Scan();

             

              //创建第一个过滤器

              SingleColumnValueFilter filter1 = new SingleColumnValueFilter(Bytes.toBytes("empinfo"),     //列族名字

                                                                                     Bytes.toBytes("sal"), //列的名字

                                                                         CompareOp.EQUAL,  //枚举类,代表比较运算符

                                                                         Bytes.toBytes("3000"));

             

              //创建第二个过滤器

              ColumnPrefixFilter filter2 = new ColumnPrefixFilter(Bytes.toBytes("ename"));

             

              //创建一个FilterList

              FilterList list = new FilterList(Operator.MUST_PASS_ALL); //相当于and

              list.addFilter(filter1);

              list.addFilter(filter2);

             

             

              //将两个过滤器加入扫描器

              scan.setFilter(list);

             

              //执行查询

        ResultScanner rs = table.getScanner(scan);

        for(Result r:rs){

             String ename = Bytes.toString(r.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("ename")));

             String sal = Bytes.toString(r.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("sal")));

            

             //打印名字

             System.out.println(ename+"\t"+sal);

        }

       

        table.close();                         

             

       }

}

 

7、HBase上的MapReduce程序

package demo.wc;

 

import java.io.IOException;

 

import org.apache.hadoop.hbase.client.Mutation;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableReducer;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

//                                                   k3    v3      Reduce的输出:一条记录

public class WordCountReducer extends TableReducer<Text, LongWritable, ImmutableBytesWritable> {

 

         @Override

         protected void reduce(Text k3, Iterable<LongWritable> v3,Context context)

                            throws IOException, InterruptedException {

                   // 求和

                   long total = 0;

                   for(LongWritable l:v3){

                            total = total + l.get();

                   }

                  

                   //输出结果:是表中的一条记录

                   //构造一个Put对象: 使用单词k3作为rowkey

                   Put put = new Put(Bytes.toBytes(k3.toString()));

                  

                   put.add(Bytes.toBytes("content"), Bytes.toBytes("result"), Bytes.toBytes(String.valueOf(total)));

                  

                   //输出

                   context.write(new ImmutableBytesWritable(Bytes.toBytes(k3.toString())),  //插入数据的时候,rowkey是多少

                                           put);//数据

         }

 

}

package demo.wc;

 

import java.io.IOException;

 

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableMapper;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

//没有输入k1和v1,现在的输入就是表中的一条记录

//                                                k2       v2

public class WordCountMapper extends TableMapper<Text, LongWritable> {

 

         @Override

         protected void map(ImmutableBytesWritable key, Result value,Context context)

                            throws IOException, InterruptedException {

                   //输入的就是表中的一条记录

                   //key : 记录的rowkey

                   //value: 输入的记录

                   //取出数据

                   String str = Bytes.toString(value.getValue(Bytes.toBytes("content"), Bytes.toBytes("info")));

                  

                   //分词

                   String[] words = str.split(" ");

                  

                   //输出

                   for(String w: words){

                            context.write(new Text(w), new LongWritable(1));

                   }

         }

 

}

package demo.wc;

 

import java.io.IOException;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

 

public class WordCountMain {

 

         public static void main(String[] args) throws Exception {

                   //指定的配置信息: ZooKeeper

                   Configuration conf = new Configuration();

                   conf.set("hbase.zookeeper.quorum", "192.168.157.111");

                  

                   //创建一个job

                   Job job = Job.getInstance(conf);

                   job.setJarByClass(WordCountMain.class);

                  

                   //定义一个扫描器:只读入需要处理的数据

                   Scan scan = new Scan();

                   scan.addColumn(Bytes.toBytes("content"), Bytes.toBytes("info"));//指定要读取的列

                                    

                   //指定任务的Mapper

                   //TableMapReduceUtil.initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job);

                   TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("word"),  //输入的表

                                                                   scan,                   //扫描器

                                                                   WordCountMapper.class,

                                                                   Text.class,

                                                                   LongWritable.class,

                                                                   job

                                                                 );

                  

                   //指定任务的Reducer                       输出的表名

                   TableMapReduceUtil.initTableReducerJob("stat", WordCountReducer.class, job);

                  

                   //执行

                   job.waitForCompletion(true);

         }

 

}