HBase 5种写入数据方式
Version :hadoop1.2.1; hbaes0.94.16;
HBase写入数据方式(参考:《HBase The Definitive Guide》),可以简单分为下面几种:
1. 直接使用HTable进行导入,代码如下:
package hbase.curd;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
public class PutExample {
/**
* @param args
* @throws IOException
*/
private HTable table = HTableUtil.getHTable("testtable");
public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub
PutExample pe = new PutExample();
pe.putRows();
}
public void putRows(){
List<Put> puts = new ArrayList<Put>();
for(int i=0;i<10;i++){
Put put = new Put(Bytes.toBytes("row_"+i));
Random random = new Random();
if(random.nextBoolean()){
put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("colfam1_qual1_value_"+i));
}
if(random.nextBoolean()){
put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("colfam1_qual1_value_"+i));
}
if(random.nextBoolean()){
put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual3"), Bytes.toBytes("colfam1_qual1_value_"+i));
}
if(random.nextBoolean()){
put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual4"), Bytes.toBytes("colfam1_qual1_value_"+i));
}
if(random.nextBoolean()){
put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual5"), Bytes.toBytes("colfam1_qual1_value_"+i));
}
puts.add(put);
}
try{
table.put(puts);
table.close();
}catch(Exception e){
e.printStackTrace();
return ;
}
System.out.println("done put rows");
}
}
其中HTableUtil如下:
package hbase.curd;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.util.Bytes;
public class HTableUtil {
private static HTable table;
private static Configuration conf;
static{
conf =HBaseConfiguration.create();
conf.set("mapred.job.tracker", "hbase:9001");
conf.set("fs.default.name", "hbase:9000");
conf.set("hbase.zookeeper.quorum", "hbase");
try {
table = new HTable(conf,"testtable");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static Configuration getConf(){
return conf;
}
public static HTable getHTable(String tablename){
if(table==null){
try {
table= new HTable(conf,tablename);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return table;
}
public static byte[] gB(String name){
return Bytes.toBytes(name);
}
}
这一种是没有使用MR的,下面介绍的几种方式都是使用MR的。
2.1 从HDFS文件导入HBase,继承自Mapper,代码如下:
package hbase.mr;
import java.io.IOException;
import hbase.curd.HTableUtil;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class ImportFromFile {
/**
* 从文件导入到HBase
* @param args
*/
public static final String NAME="ImportFromFile";
public enum Counters{LINES}
static class ImportMapper extends Mapper<LongWritable,Text,
ImmutableBytesWritable,Writable>{
private byte[] family =null;
private byte[] qualifier = null;
@Override
protected void setup(Context cxt){
String column = cxt.getConfiguration().get("conf.column");
byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column));
family = colkey[0];
if(colkey.length>1){
qualifier = colkey[1];
}
}
@Override
public void map(LongWritable offset,Text line,Context cxt){
try{
String lineString= line.toString();
byte[] rowkey= DigestUtils.md5(lineString);
Put put = new Put(rowkey);
put.add(family,qualifier,Bytes.toBytes(lineString));
cxt.write(new ImmutableBytesWritable(rowkey), put);
cxt.getCounter(Counters.LINES).increment(1);
}catch(Exception e){
e.printStackTrace();
}
}
}
private static CommandLine parseArgs(String[] args){
Options options = new Options();
Option o = new Option("t" ,"table",true,"table to import into (must exist)");
o.setArgName("table-name");
o.setRequired(true);
options.addOption(o);
o= new Option("c","column",true,"column to store row data into");
o.setArgName("family:qualifier");
o.setRequired(true);
options.addOption(o);
o = new Option("i", "input", true,
"the directory or file to read from");
o.setArgName("path-in-HDFS");
o.setRequired(true);
options.addOption(o);
options.addOption("d", "debug", false, "switch on DEBUG log level");
CommandLineParser parser = new PosixParser();
CommandLine cmd = null;
try {
cmd = parser.parse(options, args);
} catch (Exception e) {
System.err.println("ERROR: " + e.getMessage() + "\n");
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp(NAME + " ", options, true);
System.exit(-1);
}
return cmd;
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = HTableUtil.getConf();
String[] otherArgs = new GenericOptionsParser(conf, initialArg()).getRemainingArgs();
CommandLine cmd = parseArgs(otherArgs);
String table = cmd.getOptionValue("t");
String input = cmd.getOptionValue("i");
String column = cmd.getOptionValue("c");
conf.set("conf.column", column);
Job job = new Job(conf, "Import from file " + input + " into table " + table);
job.setJarByClass(ImportFromFile.class);
job.setMapperClass(ImportMapper.class);
job.setOutputFormatClass(TableOutputFormat.class);
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Writable.class);
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, new Path(input));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
private static String[] initialArg(){
String []args = new String[6];
args[0]="-c";
args[1]="fam:data";
args[2]="-i";
args[3]="/user/hadoop/input/picdata";
args[4]="-t";
args[5]="testtable";
return args;
}
}
2.2 读取HBase表写入HBase表中字段,代码如下:
package hbase.mr;
import hadoop.util.HadoopUtils;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.IdentityTableReducer;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ParseDriver {
/**
* 把hbase表中数据拷贝到其他表(或本表)相同字段
* @param args
*/
enum Counters{
VALID, ROWS, COLS, ERROR
}
private static Logger log = LoggerFactory.getLogger(ParseDriver.class);
static class ParseMapper extends TableMapper<ImmutableBytesWritable,Writable>{
private byte[] columnFamily =null ;
private byte[] columnQualifier =null;
@Override
protected void setup(Context cxt){
columnFamily = Bytes.toBytes(cxt.getConfiguration().get("conf.columnfamily"));
columnQualifier = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier"));
}
@Override
public void map(ImmutableBytesWritable row,Result columns,Context cxt){
cxt.getCounter(Counters.ROWS).increment(1);
String value =null;
try{
Put put = new Put(row.get());
for(KeyValue kv : columns.list()){
cxt.getCounter(Counters.COLS).increment(1);
value= Bytes.toStringBinary(kv.getValue());
if(equals(columnQualifier,kv.getQualifier())){ // 过滤column
put.add(columnFamily,columnQualifier,kv.getValue());
cxt.write(row, put);
cxt.getCounter(Counters.VALID).increment(1);
}
}
}catch(Exception e){
log.info("Error:"+e.getMessage()+",Row:"+Bytes.toStringBinary(row.get())+
",Value:"+value);
cxt.getCounter(Counters.ERROR).increment(1);
}
}
private boolean equals(byte[] a,byte[] b){
String aStr= Bytes.toString(a);
String bStr= Bytes.toString(b);
if(aStr.equals(bStr)){
return true;
}
return false;
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
byte[] columnFamily = Bytes.toBytes("fam");
byte[] columnQualifier = Bytes.toBytes("data");
Scan scan = new Scan ();
scan.addColumn(columnFamily, columnQualifier);
HadoopUtils.initialConf("hbase");
Configuration conf = HadoopUtils.getConf();
conf.set("conf.columnfamily", Bytes.toStringBinary(columnFamily));
conf.set("conf.columnqualifier", Bytes.toStringBinary(columnQualifier));
String input ="testtable" ;//
String output="testtable1"; //
Job job = new Job(conf,"Parse data in "+input+",write to"+output);
job.setJarByClass(ParseDriver.class);
TableMapReduceUtil.initTableMapperJob(input, scan, ParseMapper.class,
ImmutableBytesWritable.class, Put.class,job);
TableMapReduceUtil.initTableReducerJob(output, IdentityTableReducer.class, job);
System.exit(job.waitForCompletion(true)?0:1);
}
}
其中HadoopUtils代码如下:
package hadoop.util;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.LineReader;
public class HadoopUtils {
private static Configuration conf;
public static void initialConf(){
conf = new Configuration();
conf.set("mapred.job.tracker", "hbase:9001");
conf.set("fs.default.name", "hbase:9000");
conf.set("hbase.zookeeper.quorum", "hbase");
}
public static void initialConf(String host){
conf = new Configuration();
conf.set("mapred.job.tracker", host+":9001");
conf.set("fs.default.name", host+":9000");
conf.set("hbase.zookeeper.quorum", host);
}
public static Configuration getConf(){
if(conf==null){
initialConf();
}
return conf;
}
public static List<String> readFromHDFS(String fileName) throws IOException {
Configuration conf = getConf();
FileSystem fs = FileSystem.get(URI.create(fileName), conf);
FSDataInputStream hdfsInStream = fs.open(new Path(fileName));
// 按行读取(新版本的方法)
LineReader inLine = new LineReader(hdfsInStream, conf);
Text txtLine = new Text();
int iResult = inLine.readLine(txtLine); //读取第一行
List<String> list = new ArrayList<String>();
while (iResult > 0 ) {
list.add(txtLine.toString());
iResult = inLine.readLine(txtLine);
}
hdfsInStream.close();
fs.close();
return list;
}
}
2.3 MR和HTable结合,代码如下:
package hbase.mr;
import hadoop.util.HadoopUtils;
import hbase.mr.AnalyzeDriver.Counters;
import java.io.IOException;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ParseSinglePutDriver {
/**
* 使用HTable进行写入
* 把infoTable 表中的 qualifier字段复制到qualifier1字段
* 单个Put
* @param args
*/
private static Logger log = LoggerFactory.getLogger(ParseMapper.class);
static class ParseMapper extends TableMapper<ImmutableBytesWritable,Writable>{
private HTable infoTable =null ;
private byte[] columnFamily =null ;
private byte[] columnQualifier =null;
private byte[] columnQualifier1 =null;
@Override
protected void setup(Context cxt){
log.info("ParseSinglePutDriver setup,current time: "+new Date());
try {
infoTable = new HTable(cxt.getConfiguration(),
cxt.getConfiguration().get("conf.infotable"));
infoTable.setAutoFlush(false);
} catch (IOException e) {
log.error("Initial infoTable error:\n"+e.getMessage());
}
columnFamily = Bytes.toBytes(cxt.getConfiguration().get("conf.columnfamily"));
columnQualifier = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier"));
columnQualifier1 = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier1"));
}
@Override
protected void cleanup(Context cxt){
try {
infoTable.flushCommits();
log.info("ParseSinglePutDriver cleanup ,current time :"+new Date());
} catch (IOException e) {
log.error("infoTable flush commits error:\n"+e.getMessage());
}
}
@Override
public void map(ImmutableBytesWritable row,Result columns,Context cxt){
cxt.getCounter(Counters.ROWS).increment(1);
String value =null ;
try{
Put put = new Put(row.get());
for(KeyValue kv : columns.list()){
cxt.getCounter(Counters.COLS).increment(1);
value= Bytes.toStringBinary(kv.getValue());
if(equals(columnQualifier,kv.getQualifier())){ // 过滤column
put.add(columnFamily,columnQualifier1,kv.getValue());
infoTable.put(put);
}
}
}catch(Exception e){
log.info("Error:"+e.getMessage()+",Row:"+Bytes.toStringBinary(row.get())+
",Value:"+value);
cxt.getCounter(Counters.ERROR).increment(1);
}
}
private boolean equals(byte[] a,byte[] b){
String aStr= Bytes.toString(a);
String bStr= Bytes.toString(b);
if(aStr.equals(bStr)){
return true;
}
return false;
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
String input ="testtable";
byte[] columnFamily = Bytes.toBytes("fam");
byte[] columnQualifier = Bytes.toBytes("data");
byte[] columnQualifier1 = Bytes.toBytes("data1");
Scan scan = new Scan ();
scan.addColumn(columnFamily, columnQualifier);
HadoopUtils.initialConf("hbase");
Configuration conf = HadoopUtils.getConf();
conf.set("conf.columnfamily", Bytes.toStringBinary(columnFamily));
conf.set("conf.columnqualifier", Bytes.toStringBinary(columnQualifier));
conf.set("conf.columnqualifier1", Bytes.toStringBinary(columnQualifier1));
conf.set("conf.infotable", input);
Job job = new Job(conf,"Parse data in "+input+",into tables");
job.setJarByClass(ParseSinglePutDriver.class);
TableMapReduceUtil.initTableMapperJob(input, scan, ParseMapper.class,
ImmutableBytesWritable.class, Put.class,job);
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(0);
System.exit(job.waitForCompletion(true)?0:1);
}
}
2.4 上面2.3中的HTable其实也是可以put一个List的,下面的方式就是put一个list的方式,这样效率会高。
package hbase.mr;
import hadoop.util.HadoopUtils;
import hbase.mr.AnalyzeDriver.Counters;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ParseListPutDriver {
/**
* 使用HTable进行写入
* List <Put> 进行测试,查看效率
* 把infoTable 表中的 qualifier字段复制到qualifier1字段
* @param args
*/
private static Logger log = LoggerFactory.getLogger(ParseMapper.class);
static class ParseMapper extends TableMapper<ImmutableBytesWritable,Writable>{
private HTable infoTable =null ;
private byte[] columnFamily =null ;
private byte[] columnQualifier =null;
private byte[] columnQualifier1 =null;
private List<Put> list = new ArrayList<Put>();
@Override
protected void setup(Context cxt){
log.info("ParseListPutDriver setup,current time: "+new Date());
try {
infoTable = new HTable(cxt.getConfiguration(),
cxt.getConfiguration().get("conf.infotable"));
infoTable.setAutoFlush(false);
} catch (IOException e) {
log.error("Initial infoTable error:\n"+e.getMessage());
}
columnFamily = Bytes.toBytes(cxt.getConfiguration().get("conf.columnfamily"));
columnQualifier = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier"));
columnQualifier1 = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier1"));
}
@Override
protected void cleanup(Context cxt){
try {
infoTable.put(list);
infoTable.flushCommits();
log.info("ParseListPutDriver cleanup ,current time :"+new Date());
} catch (IOException e) {
log.error("infoTable flush commits error:\n"+e.getMessage());
}
}
@Override
public void map(ImmutableBytesWritable row,Result columns,Context cxt){
cxt.getCounter(Counters.ROWS).increment(1);
String value =null ;
try{
Put put = new Put(row.get());
for(KeyValue kv : columns.list()){
cxt.getCounter(Counters.COLS).increment(1);
value= Bytes.toStringBinary(kv.getValue());
if(equals(columnQualifier,kv.getQualifier())){ // 过滤column
put.add(columnFamily,columnQualifier1,kv.getValue());
list.add(put);
}
}
}catch(Exception e){
log.info("Error:"+e.getMessage()+",Row:"+Bytes.toStringBinary(row.get())+
",Value:"+value);
cxt.getCounter(Counters.ERROR).increment(1);
}
}
private boolean equals(byte[] a,byte[] b){
String aStr= Bytes.toString(a);
String bStr= Bytes.toString(b);
if(aStr.equals(bStr)){
return true;
}
return false;
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
String input ="testtable";
byte[] columnFamily = Bytes.toBytes("fam");
byte[] columnQualifier = Bytes.toBytes("data");
byte[] columnQualifier1 = Bytes.toBytes("data2");
Scan scan = new Scan ();
scan.addColumn(columnFamily, columnQualifier);
HadoopUtils.initialConf("hbase");
Configuration conf = HadoopUtils.getConf();
conf.set("conf.columnfamily", Bytes.toStringBinary(columnFamily));
conf.set("conf.columnqualifier", Bytes.toStringBinary(columnQualifier));
conf.set("conf.columnqualifier1", Bytes.toStringBinary(columnQualifier1));
conf.set("conf.infotable", input);
Job job = new Job(conf,"Parse data in "+input+",into tables");
job.setJarByClass(ParseListPutDriver.class);
TableMapReduceUtil.initTableMapperJob(input, scan, ParseMapper.class,
ImmutableBytesWritable.class, Put.class,job);
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(0);
System.exit(job.waitForCompletion(true)?0:1);
}
}
数据记录条数为:26632,可以看到下面图片中的时间记录对比:
由于结合了hbase,所以需要在hadoop_home/lib目录下面加些额外的包,其整个包如下:
分享,成长,快乐
转载请注明blog地址:http://blog.****.net/fansy1990