HBase-MR操作
HBase擅长存储数据,但不擅长计算分析数据,但是它可以借用其他组件(mapreduce/spark),使用官方提供的hbase-api来实现计算分析数据功能。
hbase-server.jar
下面官方提供的一个Hbase操作MR的例子,此例子作用是对一张表的rowkey进行计数。
- 解决HBase与MapReduce的依赖包问题。
# 显示hbase需要的MapReduce Jar包
hbase mapredcp
# 环境变量
export HBASE_HOME="hbase安装目录"
export HADOOP_HOME="HADOOP安装目录"
# 注意是HADOOOP_CLASSPATH
export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`
上面的配置可以写入到hadoop-env.sh
- hbase安装目录/lib下,有个hbase-server-1.4.9.jar,是官方提供的工具包
# 此处执行hbase-server.jar,此jar包需要用到上面导入的mapredcp包。rowcounter是官方提供的计数rowkey的类名。
/home/even/hd/hadoop-2.8.4/bin/yarn jar lib/hbase-server-1.4.9.jar rowcounter table1
table1的数据:
运行MapReduce程序后的结果:
除了上面的rowcounter外,官方还提供了其他命令,如下面的导入数据命令。
/home/even/hd/hadoop-2.8.4/bin/yarn jar lib/hbase-server-1.4.9.jar importtsc -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:id table1 hdfs://hd-even-01:9000/tablein/
其他的命令可查看HBase-MapReduce官方文档
HBase-MR API
HBase针对MR提供了一套api。
例子1——HBase表1到HBase表2
下面例子中是要把table1的info列族的name列的数据导入table2表的info1列族的id列,以展示此套api的使用。
- HBaseMapper.java,查询出指定的列并输出到Reducer
package com.even.hbase;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
/**
* des: TableMapper是HBase提供的api
* ImmutableBytesWritable类似之前的LongWritable,表示起始偏移量。put表示封装的一条一条数据,需要导入到Reduce阶段
* author: Even
* create date:2019/1/30
*/
public class HBaseMapper extends TableMapper<ImmutableBytesWritable, Put> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
/*1,读取数据,拿到一个rowkey的数据,根据key得到put*/
Put put = new Put(key.get());
/*2,遍历出每个列并筛选需要的*/
for (Cell cell : value.rawCells()) {
/*获取列族*/
String cf = Bytes.toString(CellUtil.cloneFamily(cell));
if ("info".equals(cf)) {
/*获取列*/
if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell))))
/*获取info列族的name列的数据,并导入到另一张表的info1列族的id列*/
put.addColumn(Bytes.toBytes("info1"), Bytes.toBytes("id"), CellUtil.cloneValue(cell));
}
}
context.write(key, put);
}
}
- HBaseReducer.java,最终输出
package com.even.hbase;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
import java.io.IOException;
/**
* des: 把数据遍历输出,因为后面没有操作了,所以输出类型为NullWritable
* author: Even
* create date:2019/1/30
*/
public class HBaseReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
for (Put p : values) {
context.write(NullWritable.get(), p);
}
}
}
- HBaseDriver
package com.even.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* des:HBase使用MR需要实现Tool
* author: Even
* create date:2019/1/30
*/
public class HBaseDriver implements Tool {
private Configuration conf;
@Override
public int run(String[] strings) throws Exception {
/*1,创建任务*/
Job job = Job.getInstance(conf);
/*2,指定运行的类*/
job.setJarByClass(HBaseDriver.class);
/*3,创建scan,以查询源table1表的数据*/
Scan scan = new Scan();
/*4,配置Mapper*/
TableMapReduceUtil.initTableMapperJob("table1", scan, HBaseMapper.class, ImmutableBytesWritable.class, Put.class, job);
/*5,配置Reducer*/
TableMapReduceUtil.initTableReducerJob("table2", HBaseReducer.class, job);
return job.waitForCompletion(true) ? 0 : 1;
}
@Override
public void setConf(Configuration configuration) {
this.conf = HBaseConfiguration.create(configuration);
}
@Override
public Configuration getConf() {
return this.conf;
}
public static void main(String[] args) {
int status = 0;
try {
status = ToolRunner.run(new HBaseDriver(), args);
System.out.println("运行成功!");
} catch (Exception e) {
e.printStackTrace();
} finally {
System.exit(status);
}
}
}
table1的数据:
执行程序后table2的数据:
例子2——HDFS到HBase表
编写程序,把HDFS中的文件导入到HBASE。
- 构建Mapper来读取HDFS的数据,HDFSMapper.java
package com.even.hbase;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* des:因为是从hdfs中获取数据,相当于是从文件中获取,跟以前写的MapReduce程序一样的输入,但是输出是要HBase的类型
* author: Even
* create date:2019/1/30
*/
public class HDFSMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
/*1,获取数据*/
String line = value.toString();
/*2,切分数据*/
String[] fields = line.split("\t");
/*3,封装数据*/
String rowKey = fields[0];
String name = fields[1];
String des = fields[2];
ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(rowKey));
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes("info1"), Bytes.toBytes("name"), Bytes.toBytes(name));
put.addColumn(Bytes.toBytes("info1"), Bytes.toBytes("des"), Bytes.toBytes(des));
/*4,输出数据*/
context.write(immutableBytesWritable, put);
}
}
- 构建Reducer输出结果,HDFSReducer
package com.even.hbase;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
import java.io.IOException;
/**
* des:
* author: Even
* create date:2019/1/31
*/
public class HDFSReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
for (Put p : values) {
context.write(NullWritable.get(), p);
}
}
}
- 驱动类,HDFSDriver
package com.even.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* des: Mapper阶段跟处理正常的文件一样的设置,只是Reducer阶段是需要输出到HBase表,因此需要用到HBase的API
* author: Even
* create date:2019/1/31
*/
public class HDFSDriver implements Tool {
private Configuration conf;
@Override
public int run(String[] strings) throws Exception {
/*1,创建任务*/
Job job = Job.getInstance(conf);
/*2,指定主类*/
job.setJarByClass(HDFSDriver.class);
job.setMapperClass(HDFSMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
TableMapReduceUtil.initTableReducerJob("table2", HDFSReducer.class, job);
FileInputFormat.setInputPaths(job, new Path("/hdfs2hbase"));
return job.waitForCompletion(true) ? 0 : 1;
}
@Override
public void setConf(Configuration configuration) {
this.conf = HBaseConfiguration.create(configuration);
}
@Override
public Configuration getConf() {
return this.conf;
}
public static void main(String[] args) {
int status = 0;
try {
status = ToolRunner.run(new HDFSDriver(), args);
} catch (Exception e) {
e.printStackTrace();
} finally {
System.exit(status);
}
}
}
结果:
总结
HBase操作MR程序,和传统的MR程序不一样的是,官方提供了针对于HBase表的API,如果涉及到对Hbase表的操作,需要使用到这些API,如TableMapper和TableReducer。其中还有ImmutableBytesWritable实现了WritableComparable,是对应的HBase表的偏移量,相当于传统MR程序的Mapper阶段的LongWritable。其次Driver的写法也和传统MapReduce程序有所不同。但是,归根结底,整体的编码流程还是从Map到Reduce再到Driver,不同的是各阶段的代码调用,上面两个例子对HBase的APi和与传统MR的API结合使用都有充分的应用。