HBase-MR操作

HBase擅长存储数据,但不擅长计算分析数据,但是它可以借用其他组件(mapreduce/spark),使用官方提供的hbase-api来实现计算分析数据功能。

hbase-server.jar

下面官方提供的一个Hbase操作MR的例子,此例子作用是对一张表的rowkey进行计数。

  1. 解决HBase与MapReduce的依赖包问题。
# 显示hbase需要的MapReduce Jar包
hbase mapredcp
# 环境变量
export HBASE_HOME="hbase安装目录"
export HADOOP_HOME="HADOOP安装目录"
# 注意是HADOOOP_CLASSPATH
export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`
上面的配置可以写入到hadoop-env.sh
  1. hbase安装目录/lib下,有个hbase-server-1.4.9.jar,是官方提供的工具包
# 此处执行hbase-server.jar,此jar包需要用到上面导入的mapredcp包。rowcounter是官方提供的计数rowkey的类名。
/home/even/hd/hadoop-2.8.4/bin/yarn jar lib/hbase-server-1.4.9.jar rowcounter table1

table1的数据:
HBase-MR操作

运行MapReduce程序后的结果:
HBase-MR操作
除了上面的rowcounter外,官方还提供了其他命令,如下面的导入数据命令。

/home/even/hd/hadoop-2.8.4/bin/yarn jar lib/hbase-server-1.4.9.jar importtsc -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:id table1 hdfs://hd-even-01:9000/tablein/

其他的命令可查看HBase-MapReduce官方文档

HBase-MR API

HBase针对MR提供了一套api。

例子1——HBase表1到HBase表2

下面例子中是要把table1的info列族的name列的数据导入table2表的info1列族的id列,以展示此套api的使用。

  1. HBaseMapper.java,查询出指定的列并输出到Reducer
package com.even.hbase;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

/**
 * des: TableMapper是HBase提供的api
 * ImmutableBytesWritable类似之前的LongWritable,表示起始偏移量。put表示封装的一条一条数据,需要导入到Reduce阶段
 * author: Even
 * create date:2019/1/30
 */
public class HBaseMapper extends TableMapper<ImmutableBytesWritable, Put> {

    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        /*1,读取数据,拿到一个rowkey的数据,根据key得到put*/
        Put put = new Put(key.get());
        /*2,遍历出每个列并筛选需要的*/
        for (Cell cell : value.rawCells()) {
            /*获取列族*/
            String cf = Bytes.toString(CellUtil.cloneFamily(cell));
            if ("info".equals(cf)) {
                /*获取列*/
                if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell))))
                    /*获取info列族的name列的数据,并导入到另一张表的info1列族的id列*/
                    put.addColumn(Bytes.toBytes("info1"), Bytes.toBytes("id"), CellUtil.cloneValue(cell));
            }
        }
        context.write(key, put);
    }
}

  1. HBaseReducer.java,最终输出
package com.even.hbase;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

import java.io.IOException;

/**
 * des: 把数据遍历输出,因为后面没有操作了,所以输出类型为NullWritable
 * author: Even
 * create date:2019/1/30
 */
public class HBaseReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
        for (Put p : values) {
            context.write(NullWritable.get(), p);
        }
    }
}

  1. HBaseDriver
package com.even.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * des:HBase使用MR需要实现Tool
 * author: Even
 * create date:2019/1/30
 */
public class HBaseDriver implements Tool {
    private Configuration conf;


    @Override
    public int run(String[] strings) throws Exception {
        /*1,创建任务*/
        Job job = Job.getInstance(conf);
        /*2,指定运行的类*/
        job.setJarByClass(HBaseDriver.class);
        /*3,创建scan,以查询源table1表的数据*/
        Scan scan = new Scan();

        /*4,配置Mapper*/
        TableMapReduceUtil.initTableMapperJob("table1", scan, HBaseMapper.class, ImmutableBytesWritable.class, Put.class, job);
        /*5,配置Reducer*/
        TableMapReduceUtil.initTableReducerJob("table2", HBaseReducer.class, job);

        return job.waitForCompletion(true) ? 0 : 1;
    }

    @Override
    public void setConf(Configuration configuration) {
        this.conf = HBaseConfiguration.create(configuration);
    }

    @Override
    public Configuration getConf() {
        return this.conf;
    }


    public static void main(String[] args) {
        int status = 0;
        try {
            status = ToolRunner.run(new HBaseDriver(), args);
            System.out.println("运行成功!");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.exit(status);
        }
    }
}

table1的数据:
HBase-MR操作
执行程序后table2的数据:
HBase-MR操作

例子2——HDFS到HBase表

编写程序,把HDFS中的文件导入到HBASE。

  1. 构建Mapper来读取HDFS的数据,HDFSMapper.java
package com.even.hbase;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * des:因为是从hdfs中获取数据,相当于是从文件中获取,跟以前写的MapReduce程序一样的输入,但是输出是要HBase的类型
 * author: Even
 * create date:2019/1/30
 */
public class HDFSMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        /*1,获取数据*/
        String line = value.toString();
        /*2,切分数据*/
        String[] fields = line.split("\t");
        /*3,封装数据*/
        String rowKey = fields[0];
        String name = fields[1];
        String des = fields[2];
        ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(rowKey));
        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(Bytes.toBytes("info1"), Bytes.toBytes("name"), Bytes.toBytes(name));
        put.addColumn(Bytes.toBytes("info1"), Bytes.toBytes("des"), Bytes.toBytes(des));

        /*4,输出数据*/
        context.write(immutableBytesWritable, put);


    }
}

  1. 构建Reducer输出结果,HDFSReducer
package com.even.hbase;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

import java.io.IOException;

/**
 * des:
 * author: Even
 * create date:2019/1/31
 */
public class HDFSReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
        for (Put p : values) {
            context.write(NullWritable.get(), p);
        }
    }
}

  1. 驱动类,HDFSDriver
package com.even.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * des: Mapper阶段跟处理正常的文件一样的设置,只是Reducer阶段是需要输出到HBase表,因此需要用到HBase的API
 * author: Even
 * create date:2019/1/31
 */
public class HDFSDriver implements Tool {
    private Configuration conf;

    @Override
    public int run(String[] strings) throws Exception {
        /*1,创建任务*/
        Job job = Job.getInstance(conf);
        /*2,指定主类*/
        job.setJarByClass(HDFSDriver.class);
        job.setMapperClass(HDFSMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);

        TableMapReduceUtil.initTableReducerJob("table2", HDFSReducer.class, job);

        FileInputFormat.setInputPaths(job, new Path("/hdfs2hbase"));
        return job.waitForCompletion(true) ? 0 : 1;
    }

    @Override
    public void setConf(Configuration configuration) {
        this.conf = HBaseConfiguration.create(configuration);
    }

    @Override
    public Configuration getConf() {
        return this.conf;
    }

    public static void main(String[] args) {
        int status = 0;
        try {
            status = ToolRunner.run(new HDFSDriver(), args);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.exit(status);
        }
    }
}

结果:
HBase-MR操作

总结

HBase操作MR程序,和传统的MR程序不一样的是,官方提供了针对于HBase表的API,如果涉及到对Hbase表的操作,需要使用到这些API,如TableMapper和TableReducer。其中还有ImmutableBytesWritable实现了WritableComparable,是对应的HBase表的偏移量,相当于传统MR程序的Mapper阶段的LongWritable。其次Driver的写法也和传统MapReduce程序有所不同。但是,归根结底,整体的编码流程还是从Map到Reduce再到Driver,不同的是各阶段的代码调用,上面两个例子对HBase的APi和与传统MR的API结合使用都有充分的应用。