Hadoop学习四十二:HBase 过滤器
一.概述
- 客户端创建包含过滤器Filter的Scan。
- Scan通过RPC被发送给RegionServer,在服务器端执行过滤操作。
- Scan的作用域是region,所以一个RegionServer有多个region的话,Scan将被发送到每个region。
二.Filter工作流程
你必须知道的是,HBase里的一行数据对应一或者多个KeyValue。再来看看流程。
- boolean fiterRowKey(byte[] buffer, int offset, int length):检查rowkey。返回true代表被过滤掉,false进入下个方法。
- ReturnCode fiterKeyValue(KeyValue v):检查rowkey下特定的某一个KeyValue。它有五种返回值,常见的ReturnCode.INCLUDE代表结果中包含这个KeyValue,Skip代表不包含,继续下处理一个KeyValue。
- void filterRow(List<KeyValue> ignored):ignored是2里面被过滤掉的KeyValue集合。
- boolean filterRow():返回true代表过滤掉当前行。
- void reset():迭代每一个新的RowKey(步骤1)前调用此方法。
- boolean filterAllRemaining():返回true时代表终止整个扫描操作。如用户找到了需要的所有数据,就在这里可以返回true。
三.例子
说这么多,不如一个实际的例子来的方便。
用户表rowkey为用户名称,cf:age代表年龄,cf:pw代表密码。现在需要找出用户,有三个限制条件
- 看到zzy这个用户直接pass,因为它是个技术狂人
。
- 找出密码长度小于4的用户,用户信息不需要包含pw列。
- 只找出满足上述条件的两个用户。
public class PasswordStrengthFilter extends FilterBase { private int len; private int limit; private int rowsAccepted = 0; private boolean filterRow = false; public PasswordStrengthFilter() { super(); } public PasswordStrengthFilter(int len, int limit) { this.len = len; this.limit = limit; } @Override public boolean filterRowKey(byte[] buffer, int offset, int length) { String rowkey = Bytes.toString(buffer, offset, length); boolean iszzy = rowkey.equals("zzy"); return iszzy; } public ReturnCode filterKeyValue(KeyValue v) { if (Bytes.toString(v.getQualifier()).equals("pw")) { if (v.getValueLength() >= len) { this.filterRow = true; } return ReturnCode.SKIP; } return ReturnCode.INCLUDE; } @Override public void filterRow(List<KeyValue> ignored) { } public boolean filterRow() { if(!filterRow){ rowsAccepted ++ ; } return this.filterRow; } @Override public boolean filterAllRemaining() { return rowsAccepted >= limit; } public void reset() { this.filterRow = false; } public void write(DataOutput out) throws IOException { out.writeInt(len); out.writeInt(limit); } public void readFields(DataInput in) throws IOException { this.len = in.readInt(); this.limit = in.readInt(); } public static String getShortDes(KeyValue kv){ return "KeyValue["+Bytes.toString(kv.getRow()) + "\t" + Bytes.toString(kv.getFamily()) + ":" + Bytes.toString(kv.getQualifier()) + "=" + Bytes.toString(kv.getValue())+"]"; } }
new PasswordStrengthFilter(4, 2)的测试结果:请各位自己分析下每一行,每一个KeyValue是怎么被fiter处理的。
input:
addData("david", TN, "age", "12");
addData("david", TN, "pw", "p1");
addData("zzy", TN, "age", "13");
addData("zzy", TN, "pw", "p2");
addData("zzz", TN, "age", "18");
addData("zzz", TN, "pw", "p456");
addData("zzzz", TN, "age", "18");
addData("zzzz", TN, "pw", "p4");
addData("zzzzz", TN, "age", "11");
addData("zzzzz", TN, "pw", "p6");
output:
KeyValue[david cf:age=12]
KeyValue[zzzz cf:age=18]
四.总结
- 一般返回true代表被过滤掉。
- 一个region上的所有数据共享一个Filter,并不是每来一行数据就new一个Filter来处理。所以例子中的rowsAccepted得以生效,变量filterRow的状态需要重置。
附件为测试代码和一个根据rowkey分页的RowPaginationFilter