Hive 自定义UDF UDAF UDTF

UDF步骤:
UDF(User-Defined-Function) 用来解决 一行输入一行输出(On-to-On maping) 的需求。
1.继承org.apache.hadoop.hive.ql.exec.UDF
2.实现evaluate函数,evaluate函数支持重载
[java] view plaincopy
 
  1. package cn.sina.stat.hive.udf;  
  2. import java.util.Arrays;  
  3. import org.apache.hadoop.hive.ql.exec.UDF;  
  4. public final class SortFieldContent extends UDF {  
  5.         public String evaluate( final String str, String delimiter) {  
  6.                if (str == null ) {  
  7.                       return null ;  
  8.               }  
  9.                if (delimiter == null) {  
  10.                      delimiter = "," ;  
  11.               }  
  12.               String[] strs = str.split(delimiter);  
  13.               Arrays. sort(strs);  
  14.               String result = "" ;  
  15.                for (int i = 0; i < strs. length; i++) {  
  16.                       if (result.length() > 0) {  
  17.                            result.concat(delimiter);  
  18.                      }  
  19.                      result.concat(strs[i]);  
  20.               }  
  21.                return result;  
  22.        }  
  23.   
  24.         public String evaluate( final String str, String delimiter, String order) {  
  25.                if (str == null ) {  
  26.                       return null ;  
  27.               }  
  28.                if (delimiter == null) {  
  29.                      delimiter = "," ;  
  30.               }  
  31.                if (order != null && order.toUpperCase().equals( "ASC" )) {  
  32.                       return evaluate(str, delimiter);  
  33.               } else {  
  34.                      String[] strs = str.split(delimiter);  
  35.                      Arrays. sort(strs);  
  36.                      String result = "" ;  
  37.                       for (int i = strs. length - 1; i >= 0; i--) {  
  38.                             if (result.length() > 0) {  
  39.                                   result.concat(delimiter);  
  40.                            }  
  41.                            result.concat(strs[i]);  
  42.                      }  
  43.                       return result;  
  44.               }  
  45.        }  
  46. }  

UDAF步骤:
UDAF(User- Defined Aggregation Funcation)用来解决 多行输入一行输出(Many-to-On maping) 的需求。
1.函数类继承org.apache.hadoop.hive.ql.exec.UDAF
   内部类实现接口org.apache.hadoop.hive.ql.exec.UDAFEvaluator
2.Evaluator需要实现 init、iterate、terminatePartial、merge、terminate这几个函数
   具体执行过程如图:
Hive 自定义UDF UDAF UDTF
[java] view plaincopy
 
  1. package cn.sina.stat.hive.udaf;  
  2. import java.util.Arrays;  
  3. import org.apache.hadoop.hive.ql.exec.UDAF;  
  4. import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;  
  5.   
  6. public class ConcatClumnGroupByKeyWithOrder extends UDAF {  
  7.      public static class ConcatUDAFEvaluator implements UDAFEvaluator {  
  8.           public static class PartialResult {  
  9.                String result;  
  10.                String delimiter;  
  11.                String order;  
  12.           }  
  13.   
  14.           private PartialResult partial;  
  15.   
  16.           public void init() {  
  17.                partial = null;  
  18.           }  
  19.   
  20.           public boolean iterate(String value, String delimiter, String order) {  
  21.   
  22.                if (value == null) {  
  23.                     return true;  
  24.                }  
  25.                if (partial == null) {  
  26.                     partial = new PartialResult();  
  27.                     partial.result = new String("");  
  28.                     if (delimiter == null || delimiter.equals("")) {  
  29.                          partial.delimiter = new String(",");  
  30.                     } else {  
  31.                          partial.delimiter = new String(delimiter);  
  32.                     }  
  33.                     if (order != null  
  34.                               && (order.toUpperCase().equals("ASC") || order  
  35.                                         .toUpperCase().equals("DESC"))) {  
  36.                          partial.order = new String(order);  
  37.                     } else {  
  38.                          partial.order = new String("ASC");  
  39.                     }  
  40.   
  41.                }  
  42.                if (partial.result.length() > 0) {  
  43.                     partial.result = partial.result.concat(partial.delimiter);  
  44.                }  
  45.   
  46.                partial.result = partial.result.concat(value);  
  47.   
  48.                return true;  
  49.           }  
  50.   
  51.           public PartialResult terminatePartial() {  
  52.                return partial;  
  53.           }  
  54.   
  55.           public boolean merge(PartialResult other) {  
  56.                if (other == null) {  
  57.                     return true;  
  58.                }  
  59.                if (partial == null) {  
  60.                     partial = new PartialResult();  
  61.                     partial.result = new String(other.result);  
  62.                     partial.delimiter = new String(other.delimiter);  
  63.                     partial.order = new String(other.order);  
  64.                } else {  
  65.                     if (partial.result.length() > 0) {  
  66.                          partial.result = partial.result.concat(partial.delimiter);  
  67.                     }  
  68.                     partial.result = partial.result.concat(other.result);  
  69.                }  
  70.                return true;  
  71.           }  
  72.   
  73.           public String terminate() {  
  74.                String[] strs = partial.result.split(partial.delimiter);  
  75.                Arrays.sort(strs);  
  76.                String result = new String("");  
  77.                if (partial.order.equals("DESC")) {  
  78.                     for (int i = strs.length - 1; i >= 0; i--) {  
  79.                          if (result.length() > 0) {  
  80.                               result.concat(partial.delimiter);  
  81.                          }  
  82.                          result.concat(strs[i]);  
  83.                     }  
  84.                } else {  
  85.                     for (int i = 0; i < strs.length; i++) {  
  86.                          if (result.length() > 0) {  
  87.                               result.concat(partial.delimiter);  
  88.                          }  
  89.                          result.concat(strs[i]);  
  90.                     }  
  91.                }  
  92.                return new String(result);  
  93.           }  
  94.      }  
  95. }  

UDTF步骤:
UDTF(User-Defined Table-Generating Functions) 用来解决 输入一行输出多行(On-to-many maping) 的需求。
1.继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
2.实现initialize, process, close三个方法
     a.initialize初始化验证,返回字段名和字段类型
     b.初始化完成后,调用process方法,对传入的参数进行处理,通过forword()方法把结果返回
     c.最后调用close()方法进行清理工作
下面是我写的一个用来切分”key:value;key:value;”这种字符串,返回结果为key, value两个字段。供参考:  

 

import java.util.ArrayList;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

public class ExplodeMap extends GenericUDTF{

   @Override
   public void close() throws HiveException {
	   // TODO Auto-generated method stub    
   }

   @Override
   public StructObjectInspector initialize(ObjectInspector[] args)
		   throws UDFArgumentException {
	   if (args.length != 1) {
		   throw new UDFArgumentLengthException("ExplodeMap takes only one argument");
	   }
	   if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
		   throw new UDFArgumentException("ExplodeMap takes string as a parameter");
	   }

	   ArrayList<String> fieldNames = new ArrayList<String>();
	   ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
	   fieldNames.add("col1");
	   fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
	   fieldNames.add("col2");
	   fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

	   return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);
   }

  @Override
   public void process(Object[] args) throws HiveException {
	   String input = args[0].toString();
	   String[] test = input.split(";");
	   for(int i=0; i<test.length; i++) {
		   try {
			   String[] result = test[i].split(":");
			   forward(result);
		   } catch (Exception e) {
			  continue;
		  }
	 }
   }
}

 

 
3. 使用方法UDTF有两种使用方法,一种直接放到select后面,一种和lateral view一起使用。
1:直接select中使用:select explode_map(properties) as (col1,col2) from src;
不可以添加其他字段使用:select a, explode_map(properties) as (col1,col2) from src
不可以嵌套调用:select explode_map(explode_map(properties)) from src
不可以和group by/cluster by/distribute by/sort by一起使用:select explode_map(properties) as (col1,col2) from src group by col1, col2
 
2:和lateral view一起使用:select src.id, mytable.col1, mytable.col2 from src lateral view explode_map(properties) mytable as col1, col2;
此方法更为方便日常使用。执行过程相当于单独执行了两次抽取,然后union到一个表里。

文献出处:http://blog.****.net/cheersu/article/details/8333045

文献出处:http://blog.****.net/longzilong216/article/details/23921235