Hive 自定义UDF UDAF UDTF
UDF步骤:
UDF(User-Defined-Function) 用来解决 一行输入一行输出(On-to-On maping) 的需求。1.继承org.apache.hadoop.hive.ql.exec.UDF2.实现evaluate函数,evaluate函数支持重载
- package cn.sina.stat.hive.udf;
- import java.util.Arrays;
- import org.apache.hadoop.hive.ql.exec.UDF;
- public final class SortFieldContent extends UDF {
- public String evaluate( final String str, String delimiter) {
- if (str == null ) {
- return null ;
- }
- if (delimiter == null) {
- delimiter = "," ;
- }
- String[] strs = str.split(delimiter);
- Arrays. sort(strs);
- String result = "" ;
- for (int i = 0; i < strs. length; i++) {
- if (result.length() > 0) {
- result.concat(delimiter);
- }
- result.concat(strs[i]);
- }
- return result;
- }
- public String evaluate( final String str, String delimiter, String order) {
- if (str == null ) {
- return null ;
- }
- if (delimiter == null) {
- delimiter = "," ;
- }
- if (order != null && order.toUpperCase().equals( "ASC" )) {
- return evaluate(str, delimiter);
- } else {
- String[] strs = str.split(delimiter);
- Arrays. sort(strs);
- String result = "" ;
- for (int i = strs. length - 1; i >= 0; i--) {
- if (result.length() > 0) {
- result.concat(delimiter);
- }
- result.concat(strs[i]);
- }
- return result;
- }
- }
- }
UDAF步骤:
UDAF(User- Defined Aggregation Funcation)用来解决 多行输入一行输出(Many-to-On maping) 的需求。1.函数类继承org.apache.hadoop.hive.ql.exec.UDAF内部类实现接口org.apache.hadoop.hive.ql.exec.UDAFEvaluator2.Evaluator需要实现 init、iterate、terminatePartial、merge、terminate这几个函数具体执行过程如图:
- package cn.sina.stat.hive.udaf;
- import java.util.Arrays;
- import org.apache.hadoop.hive.ql.exec.UDAF;
- import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
- public class ConcatClumnGroupByKeyWithOrder extends UDAF {
- public static class ConcatUDAFEvaluator implements UDAFEvaluator {
- public static class PartialResult {
- String result;
- String delimiter;
- String order;
- }
- private PartialResult partial;
- public void init() {
- partial = null;
- }
- public boolean iterate(String value, String delimiter, String order) {
- if (value == null) {
- return true;
- }
- if (partial == null) {
- partial = new PartialResult();
- partial.result = new String("");
- if (delimiter == null || delimiter.equals("")) {
- partial.delimiter = new String(",");
- } else {
- partial.delimiter = new String(delimiter);
- }
- if (order != null
- && (order.toUpperCase().equals("ASC") || order
- .toUpperCase().equals("DESC"))) {
- partial.order = new String(order);
- } else {
- partial.order = new String("ASC");
- }
- }
- if (partial.result.length() > 0) {
- partial.result = partial.result.concat(partial.delimiter);
- }
- partial.result = partial.result.concat(value);
- return true;
- }
- public PartialResult terminatePartial() {
- return partial;
- }
- public boolean merge(PartialResult other) {
- if (other == null) {
- return true;
- }
- if (partial == null) {
- partial = new PartialResult();
- partial.result = new String(other.result);
- partial.delimiter = new String(other.delimiter);
- partial.order = new String(other.order);
- } else {
- if (partial.result.length() > 0) {
- partial.result = partial.result.concat(partial.delimiter);
- }
- partial.result = partial.result.concat(other.result);
- }
- return true;
- }
- public String terminate() {
- String[] strs = partial.result.split(partial.delimiter);
- Arrays.sort(strs);
- String result = new String("");
- if (partial.order.equals("DESC")) {
- for (int i = strs.length - 1; i >= 0; i--) {
- if (result.length() > 0) {
- result.concat(partial.delimiter);
- }
- result.concat(strs[i]);
- }
- } else {
- for (int i = 0; i < strs.length; i++) {
- if (result.length() > 0) {
- result.concat(partial.delimiter);
- }
- result.concat(strs[i]);
- }
- }
- return new String(result);
- }
- }
- }
UDTF步骤:
UDTF(User-Defined Table-Generating Functions) 用来解决 输入一行输出多行(On-to-many maping) 的需求。1.继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
2.实现initialize, process, close三个方法
a.initialize初始化验证,返回字段名和字段类型
b.初始化完成后,调用process方法,对传入的参数进行处理,通过forword()方法把结果返回
c.最后调用close()方法进行清理工作下面是我写的一个用来切分”key:value;key:value;”这种字符串,返回结果为key, value两个字段。供参考:
import java.util.ArrayList; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; public class ExplodeMap extends GenericUDTF{ @Override public void close() throws HiveException { // TODO Auto-generated method stub } @Override public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException { if (args.length != 1) { throw new UDFArgumentLengthException("ExplodeMap takes only one argument"); } if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) { throw new UDFArgumentException("ExplodeMap takes string as a parameter"); } ArrayList<String> fieldNames = new ArrayList<String>(); ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(); fieldNames.add("col1"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldNames.add("col2"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs); } @Override public void process(Object[] args) throws HiveException { String input = args[0].toString(); String[] test = input.split(";"); for(int i=0; i<test.length; i++) { try { String[] result = test[i].split(":"); forward(result); } catch (Exception e) { continue; } } } }
3. 使用方法UDTF有两种使用方法,一种直接放到select后面,一种和lateral view一起使用。1:直接select中使用:select explode_map(properties) as (col1,col2) from src;不可以添加其他字段使用:select a, explode_map(properties) as (col1,col2) from src不可以嵌套调用:select explode_map(explode_map(properties)) from src不可以和group by/cluster by/distribute by/sort by一起使用:select explode_map(properties) as (col1,col2) from src group by col1, col22:和lateral view一起使用:select src.id, mytable.col1, mytable.col2 from src lateral view explode_map(properties) mytable as col1, col2;此方法更为方便日常使用。执行过程相当于单独执行了两次抽取,然后union到一个表里。
文献出处:http://blog.****.net/cheersu/article/details/8333045
文献出处:http://blog.****.net/longzilong216/article/details/23921235