Hive特殊分隔符处理

补充:hive 读取数据的机制:

1、 首先用 InputFormat<默认是:org.apache.hadoop.mapred.TextInputFormat >的一个具体实 现类读入文件数据,返回一条一条的记录(可以是行,或者是你逻辑中的“行”)

2、 然后利用 SerDe<默认:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe>的一个具体 实现类,对上面返回的一条一条的记录进行字段切割 
 
Hive 对文件中字段的分隔符默认情况下只支持单字节分隔符,如果数据文件中的分隔符是多 字符的,如下所示:

01||huangbo

02||xuzheng

03||wangbaoqiang 

1.使用RegexSerDe通过正则表达式来抽取字段
 

create table t_bi_reg(id string,name string) row format serde 'org.apache.hadoop.hive.serde2.RegexSerDe' with serdeproperties('input.regex'='(.*)\\|\\|(.*)','output.format.string'='%1$s %2$s') stored as textfile; 
 
hive>load data local inpath '/home/hadoop /hivedata/bi.dat' into table t_bi_reg; 

hive>select * from t_bi_reg; 

 

 

 

 

 

2、通过自定义 InputFormat 解决特殊分隔符问题 

其原理是在 inputformat 读取行的时候将数据中的“多字节分隔符”替换为 hive 默认的分隔 符(ctrl+A 亦即 \001)或用于替代的单字符分隔符,以便 hive 在 serde 操作时按照默认的 单字节分隔符进行字段抽取 

com.ghgj.hive.delimit2.BiDelimiterInputFormat 

package com.ghgj.hive.delimit2; 
import java.io.IOException; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.FileSplit; 
import org.apache.hadoop.mapred.InputSplit; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapred.RecordReader; 
import org.apache.hadoop.mapred.Reporter; 
import org.apache.hadoop.mapred.TextInputFormat; 
 
public class BiDelimiterInputFormat extends TextInputFormat { 
 
@Override public RecordReader<LongWritable, Text> getRecordReader(InputSplit genericSplit, JobConf job, Reporter reporter)throws IOException { 
 
 reporter.setStatus(genericSplit.toString());  
BiRecordReader reader = new BiRecordReader(job,(FileSplit)genericSplit); 
// MyRecordReader reader = new MyRecordReader(job,(FileSplit)genericSplit);  
return reader; } 
} 

com.ghgj.hive.delimit2.BiRecordReader

package com.ghgj.hive.delimit2; 
import java.io.IOException; 
import java.io.InputStream; 
import org.apache.commons.logging.Log; 
import org.apache.commons.logging.LogFactory; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataInputStream; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.fs.Seekable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.compress.CodecPool; 
import org.apache.hadoop.io.compress.CompressionCodec; 
import org.apache.hadoop.io.compress.CompressionCodecFactory; 
import org.apache.hadoop.io.compress.Decompressor; 
import org.apache.hadoop.io.compress.SplitCompressionInputStream; 
import org.apache.hadoop.io.compress.SplittableCompressionCodec; 
import org.apache.hadoop.mapred.FileSplit; 
import org.apache.hadoop.mapred.LineRecordReader; 
import org.apache.hadoop.mapred.RecordReader; 

public class BiRecordReader implements RecordReader<LongWritable, Text> {  
    private static final Log LOG = LogFactory.getLog(LineRecordReader.class    
            .getName()); 

    private CompressionCodecFactory compressionCodecs = null;  
    private long start;  
    private long pos;  
    private long end;  
    private LineReader in;  
    int maxLineLength;  
    private Seekable filePosition;  
    private CompressionCodec codec;  
    private Decompressor decompressor; 

    /**   
    * A class that provides a line reader from an input stream.   
    * @deprecated Use {@link org.apache.hadoop.util.LineReader} instead.   
    */  
    @Deprecated  
    public static class LineReader extends org.apache.hadoop.util.LineReader {               
         LineReader(InputStream in) {    
            super(in);   
         } 
 
          LineReader(InputStream in, int bufferSize) {    
            super(in, bufferSize);   
         } 
 
      public LineReader(InputStream in, Configuration conf)     
            throws IOException { 
        super(in, conf);   
        }  
      } 
     public BiRecordReader(Configuration job, FileSplit split) throws IOException {           
         this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",             
            Integer.MAX_VALUE);   
            start = split.getStart();       
            end = start + split.getLength();   
            final Path file = split.getPath();   
            compressionCodecs = new CompressionCodecFactory(job);   
            codec = compressionCodecs.getCodec(file); 

         // open the file and seek to the start of the split   
         FileSystem fs = file.getFileSystem(job);   
         FSDataInputStream fileIn = fs.open(split.getPath()); 

        if (isCompressedInput()) { 
        decompressor = CodecPool.getDecompressor(codec);    
        if (codec instanceof SplittableCompressionCodec) {    
        final SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec)               
             .createInputStream(fileIn, decompressor, start, end,                 
             SplittableCompressionCodec.READ_MODE.BYBLOCK);     
        in = new LineReader(cIn, job);     
        start = cIn.getAdjustedStart();     
        end = cIn.getAdjustedEnd();     
        filePosition = cIn; 
        // take pos from compressed stream    
        } else {     
        in = new LineReader(codec.createInputStream(fileIn,       decompressor), job);             
             filePosition = fileIn;    
        }   
            } else {    
        fileIn.seek(start);    
        in = new LineReader(fileIn, job);    
        filePosition = fileIn;   }   
            // If this is not the first split, we always throw away first record   
            // because we always (except the last split) read one extra line in   
            // next() method.   
        if (start != 0) {    
        start += in.readLine(new Text(), 0, maxBytesToConsume(start));   
            }   
            this.pos = start; 
            } 
 
         private boolean isCompressedInput() {   
            return (codec != null);  } 
 
         private int maxBytesToConsume(long pos) {   
            return isCompressedInput() ? Integer.MAX_VALUE : (int) Math.min(             
                 Integer.MAX_VALUE, end - pos);  } 
 
         private long getFilePosition() throws IOException {   long retVal;   
            if (isCompressedInput() && null != filePosition) {    
            retVal = filePosition.getPos();   
            } else {    
        retVal = pos;   }   
        return retVal;  } 
 
         public BiRecordReader(InputStream in, long offset, long endOffset,    
            int maxLineLength) {   
            this.maxLineLength = maxLineLength;   
            this.in = new LineReader(in);   
            this.start = offset;   
            this.pos = offset;   
            this.end = endOffset;   
            this.filePosition = null;  } 
 
         public BiRecordReader(InputStream in, long offset, long endOffset,               
         Configuration job) throws IOException {   
            this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",             
             Integer.MAX_VALUE);   
         this.in = new LineReader(in, job);   
         this.start = offset;   
         this.pos = offset;   
         this.end = endOffset;   
         this.filePosition = null;  } 
 
         public LongWritable createKey() {   
            return new LongWritable(); 
} 
 
         public Text createValue() {   
            return new Text();  } 
 
         /** Read a line. */  
         public synchronized boolean next(LongWritable key, Text value)    
            throws IOException { 
 
         // We always read one extra line, which lies outside the upper   
         // split limit i.e. (end - 1)   
            while (getFilePosition() <= end) {    
            key.set(pos); 
 
           int newSize = in.readLine(value,             
             maxLineLength,Math.max(maxBytesToConsume(pos), maxLineLength));    
            String str = value.toString().replaceAll("\\|\\|", "\\|");    
            value.set(str);    
            pos += newSize;        
            if (newSize == 0) {     
            return false;    }    
            if (newSize < maxLineLength) {     
            return true;    } 
 
          // line too long. try again    
            LOG.info("Skipped line of size " + newSize + " at pos "      + (pos -             
             newSize));   } 
 
             return false;  
                } 
 
         /**   * Get the progress within the split   
         */  public float getProgress() throws IOException {   
         if (start == end) {    
               return 0.0f;   
               } else {    
               return Math.min(1.0f, (getFilePosition() - start) 
        
         / (float) (end - start));   }  } 
 
         public synchronized long getPos() throws IOException {   
         return pos;  
            } 
 
         public synchronized void close() throws IOException {   
            try {    
            if (in != null) {     
            in.close();    
            }   
        } finally {    
            if (decompressor != null) {     
            CodecPool.returnDecompressor(decompressor);    
        }   
            }  
        } 
                } 
 
        

注意:上述代码中的 api 全部使用 hadoop 的老 api 接口 org.apache.hadoop.mapred„. 然后将工程打包,并拷贝至 hive 安装目录的 lib 文件夹中,并重启 hive,使用以下语句建表 即可: 

Hive特殊分隔符处理

注:还需要在 hive 中使用 add jar,才能在执行 hql 查询该表时把自定义 jar 包传递给 maptask hive>add jar /home/hadoop/apps/hive/lib/myinput.jar