Hive(下)
Hive(下)
一,Hive分区与自定义函数
1.1 Hive的分区 partition
假如现在我们公司一天产生3亿的数据量,那么为了方便管理和查询,此时可以建立分区(可按日期 部门等具体业务分区)。分门别类的管理
注意:必须在表创建的时候创建 partition!!!
分区分为:单分区和多分区
分区分为:静态分区和动态分区
1.1.1 创建分区
单分区建表语句:
create table day_table(id int, content string)
partitioned by (dt string)
row format delimited fields terminated by ',';
解释:按天分区,在表结构中存在 id, content, dt 三列; 以dt为文件夹区分
所谓的分区其实就是分文件夹存储
双分区建表语句:
create table day_hour_table(id int, content string )
partitioned by (dt string, hour string)
row format delimited fileds terminated by ',';
解释:双分区表,按照天和小时分区,在表结构中增加了 dt 和 hour两列;先以dt为文件夹,再以Hour为文件夹。
注意:在创建,删除多分区等操作时一定要注意分区的先后顺序,他们是父子节点的关系。分区字段不要和表字段相同。
1.1.2 添加分区表的分区
表已经创建,在其基础上加分区
alter table day_table add partition(dt='2018-01-12');
1.1.3 删除分区
alter table drop partition(dt='2018-01-12');
注意:删除分区,分区的数据和元信息将一起被删除。
1.1.4 数据加载进分区表中
load data [local] inpath 'filePath' [overwrite] into table tableName
[partition(key=val,key1=val2,...)]
查询分区数据
select * from day_table where day_table.dt='2018-01-12';
1.1.5 查询分区语句
show partitions day_table;
1.1.6 重命名分区
alter table tableName partition dt rename to partition newName;
1.1.7 动态分区–注意外部表
-
在本地文件/home/grid/a.txt中写入以下4行数据:
aaa,US,CA
aaa,US,CB
bbb,CA,BB
bbb,CA,BC -
建立非分区表并加载数据
1.CREATE TABLE t1 (name STRING, cty STRING, st STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; 2.LOAD DATA LOCAL INPATH '/home/grid/a.txt' INTO TABLE t1; 3.SELECT * FROM t1;
-
建立外部分区表并动态加载数据 (注意删除外部表的相关事项)
CREATE EXTERNAL TABLE t2 (name STRING) PARTITIONED BY (country STRING, state STRING);
使用动态分区前,需要配置以下参数:
hive.exec.dynamic.partition
默认值:false
是否开启动态分区功能,默认false关闭。
使用动态分区时候,该参数必须设置成true;
hive.exec.dynamic.partition.mode
默认值:strict
动态分区的模式,默认strict,表示必须指定至少一个分区为静态分区,nonstrict模式表示允许所有的分区字段都可以使用动态分区。
一般需要设置为nonstrict
hive.exec.max.dynamic.partition.pernode
默认值:100
在每个执行MR的节点上,最大可以创建多少个动态分区。
该参数需要根据实际的数据来设定。
比如:源数据中包含了一年的数据,即day字段有365个值,那么该参数就需要设置成大于365,如果使用默认值100,则会报错。
hive.exec.max.dynamic.partitions
默认值:1000
在所有执行MR的节点上,最大一共可以创建多少个动态分区。
同上参数解释。
hive.exec.max.created.files
默认值:100000
整个MR Job中,最大可以创建多少个HDFS文件。
一般默认值足够了,除非你的数据量非常大,需要创建的文件数大于100000,可根据实际情况加以调整。
hive.error.on.empty.partition
默认值:false
当有空分区生成时,是否抛出异常。
一般不需要设置。
3. set hive.exec.dynamic.partition=true;
4. set hive.exec.dynamic.partition.mode=nonstrict;
5. set hive.exec.max.dynamic.partitions.pernode=1000;
6. insert into table t2 partition(country,state)
select name, cty, st from t1;
7. select * from t2;
动态分区加速了分区的过程,提高了分区的效率
1.2 函数自定义
自定义函数包括三种 UDF、UDAF、UDTF
UDF: 一进一出
UDAF: 聚集函数,多进一出。如:sum() max() min()
UDTF: 一进多出 , 如 lateralview explore()
1.2.1 UDF开发(开发中较常用)
TuoMin.java
package com.demo.hive;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public class TuoMin extends UDF {
private Text res = new Text();
public Text evaluate(String str){
if (str==null) {
return null;
}
String first = str.substring(0,1);
String last = str.substring(str.length()-1,str.length());
res.set(first+"***"+last);
return res;
}
}
1、UDF 函数可以直接应用于 select 语句,对查询结构做格式化处理后,再输出内容。
2、编写 UDF 函数的时候需要注意一下几点:
a)自定义 UDF 需要继承 org.apache.hadoop.hive.ql.UDF。
b)需要实现 evaluate 函数,evaluate 函数支持重载。
3、步骤
a)把程序打包放到目标机器上去;
b)进入 hive 客户端,添加 jar 包:hive>add jar /jar/udf_test.jar;
(清除缓存时记得删除jar包delete jar /*)
hive > add jar /root/tm.jar ##添加
hive > delete jar /root/tm.jar ##删除
c)创建临时函数:hive>CREATE TEMPORARY FUNCTION add_example AS ‘hive.udf.Add’;
hive > create temporary function add tm as 'com.demo.hive.TuoMin';
d)查询 HQL 语句:
select tm(filedName) from tableName; ##查看结果
e) 销毁临时函数:
hive > drop temporary function tm;
1.2.2 UDAF自定义集函数(用的较少)
多行进一行出,如 sum()、min(),用在 group by 时
1.必须继承org.apache.hadoop.hive.ql.exec.UDAF(函数类继承)
org.apache.hadoop.hive.ql.exec.UDAFEvaluator(内部类 Eval uator 实现 UDAFEvaluator 接口)
2.Evaluator 需要实现 init、iterate、terminatePartial、merge、t erminate 这几个函数
init():类似于构造函数,用于 UDAF 的初始化
iterate():接收传入的参数,并进行内部的轮转,返回 boolean
terminatePartial():无参数,其为 iterate 函数轮转结束后,返回轮转数据,类似于 hadoop 的 Combinermerge():接收 terminatePartial 的返回结果,进行数据 merge 操作,其返回类型为 boolean
terminate():返回最终的聚集函数结果
开发一个功能同:
Oracle 的 wm_concat()函数
Mysql 的 group_concat()
Hive UDF 的数据类型:
1.2.3 UDTF(用的较少)
UDTF:一进多出,如 lateral view explore()
二,Hive索引
索引是hive0.7之后才有的功能,创建索引需要评估其合理性,因为创建索引也是要磁盘空间,维护起来也是需要代价的
在hive0.12.0和更早的版本中,索引名称是区分大小写的创建索引,索引语句。然而,改变指数需要索引名称用小写字母(参见创建[hive 2752](javascript:changelink(‘https://issues.apache.org/jira/browse/HIVE-2752’,‘EN2ZH_CN’);))。这个bug是固定的[hive0.13.0](javascript:changelink(‘https://issues.apache.org/jira/browse/HIVE-2752’,'EN2ZH_CN’);)对所有HiveQL语句通过索引名称不区分大小写。0.13.0之前发布的最佳实践是使用小写字母索引名称。
CREATE INDEX index_name
ON TABLE base_table_name (col_name, ...)
AS index_type
[WITH DEFERRED REBUILD]
[IDXPROPERTIES (property_name=property_value, ...)]
[IN TABLE index_table_name]
[
[ ROW FORMAT ...] STORED AS ...
| STORED BY ...
]
[LOCATION hdfs_path]
[TBLPROPERTIES (...)]
[COMMENT "index comment"];
一个表上创建索引创建一个索引使用给定的列表的列作为键。看到创建索引索引;)设计文档。
三,案例
3.1 基站掉话率
1.建表
create table cell_monitor(
record_time string,
imei string,
cell string,
ph_num int,
call_num int,
drop_num int,
duration int,
drop_rate DOUBLE,
net_type string,
erl string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE;
建结果表:
create table cell_drop_monitor(
imei string,
total_call_num int,
total_drop_num int,
d_rate DOUBLE
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;
-
Load数据
LOAD DATA LOCAL INPATH '/opt/data/cdr_summ_imei_cell_info.csv' OVERWRITE INTO TABLE cell_monitor;
-
找出掉效率最高的基站
from cell_monitor cm insert overwrite table cell_drop_monitor select cm.imei ,sum(cm.drop_num),sum(cm.duration),sum(cm.drop_num)/sum(cm.duration) d_rate group by cm.imei sort by d_rate desc;
3.2 WC
1.建表
create table docs(line string); //数据表
create table wc(word string, totalword int); //结果表
2.加载数据
load data local inpath '/tmp/wc' into table docs;
3.统计
from (select explode(split(line, ' ')) as word from docs) w
insert into table wc
select word, count(1) as totalword
group by word
order by wor
数据(随意即可):
hello my gril
hello tom
hi xixi
heihei kuxiao
anglebaby is my friend
4.查询结果
select * from wc;
如下:
anglebaby 1
friend 1
gril 1
heihei 1
hello 2
hi 1
is 1
kuxiao 1
my 2
tom 1
xixi 1
比起编写MapReduce代码,可以说是很简便了…
四,分桶
分桶表及应用场景
分桶表是对列值取哈希值的方式,将不同数据放到不同文件中存储。对于hive中每一个表、分区都可以进一步进行分桶。由列的哈希值除以桶的个数来决定每条数据划分在哪个桶中。
多用于:数据抽样( sampling )、map-join
开启支持分桶:
hive > set hive.enforce.bucketing=true;
默认:false;设置为true之后,mr运行时会根据bucket的个数自动分配reduce task个数。(用户也可以通过mapred.reduce.tasks自己设置reduce任务个数,但分桶时不推荐使用)
注意:一次作业产生的桶(文件数量)和reduce task个数一致。
往分桶表中加载数据:
insert into table bucket_table select columns from tbl;
insert overwrite table bucket_table select columns from tbl;
桶表 抽样查询
select * from bucket_table tablesample(bucket 1 out of 4 on columns);
TABLESAMPLE语法:
TABLESAMPLE(BUCKET x OUT OF y)
x:表示从哪个bucket开始抽取数据
y:必须为该表总bucket数的倍数或因子
创建普通表:
CREATE TABLE mm( id INT, name STRING, age INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
测试数据:
1,tom,11
2,cat,22
3,dog,33
4,hive,44
5,hbase,55
6,mr,66
7,alice,77
8,scala,88
创建分桶表:
CREATE TABLE psnbucket( id INT, name STRING, age INT) CLUSTERED BY (age) INTO 4 BUCKETS ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
加载数据:
insert into table psnbucket select id, name, age from mm;
抽样:
select id, name, age from psnbucket tablesample(bucket 2 out of 4 on age);
五,Hive Lateral View
Lateral View用于和UDTF函数(explode、split)结合来使用。
首先通过UDTF函数拆分成多行,再将多行结果组合成一个支持别名的虚拟表。
主要解决在select使用UDTF做查询过程中,查询只能包含单个UDTF,不能包含其他字段、以及多个UDTF的问题
语法:
lateral view udtf(expression) tableAlias as columnAlias(',',columnAlias)
例:
统计人员表中共有多少种爱好、多少个城市?
select count(distinct(myCol1)), count(distinct(myCol2)) from psn2
LATERAL VIEW explode(likes) myTable1 AS myCol1
LATERAL VIEW explode(address) myTable2 AS myCol2, myCol3;
六,运行方式
七,hive的GUI接口
Hive Web GUI接口
web界面安装:
1、下载源码包apache-hive-*-src.tar.gz
2、将hwi war包放在$HIVE_HOME/lib/
制作方法:将hwi/web/*里面所有的文件打成war包
cd apache-hive-1.2.1-src/hwi/web
jar -cvf hive-hwi.war *
3、复制tools.jar(在jdk的lib目录下)到$HIVE_HOME/lib下
4、修改hive-site.xml
<property>
<name>hive.hwi.listen.host</name>
<value>0.0.0.0</value>
</property>
<property>
<name>hive.hwi.listen.port</name>
<value>9999</value>
</property>
<property>
<name>hive.hwi.war.file</name>
<value>lib/hive-hwi.war</value>
</property>
5、启动hwi服务(端口号9999)
hive --service hwi
6、浏览器通过以下链接来访问
http://node01:9999/hwi
八,Hive优化
核心思想:把Hive SQL 当做 MapReduce程序去优化。
以下的SQL不会转为MapReduce来执行
---- select 仅查询本表字段
---- where 仅对本表字段做条件过滤。
1. Explain 显示执行计划
命令:
hive > explain [extended] query
预先查看 sql 的执行步骤,或产生几个mapreduce任务,具体的操作涉及哪些,从而作出响应优化。
2.Hive的运行方式
Hive的运行方式分为本地模式和集群模式。
开启本地模式,将大大缩短Mapreduce执行效率(但不是所有的都可以在本地模式下运行)
开启本地模式:
set hive.exec.mode.local.auto=true;
注意:
hive.exec.mode.local.auto.inputbytes.max=134217728(128MB)
表示加载文件的最大值,若大于该配置仍会以 集群方式来运行!
3.严格模式
通过设置以下参数开启严格模式[防止误操作]。
set hive.mapred.mode=strict;(默认是nonstrict,非严格模式)
严格模式限制条件:
---- 对分区表查询时,必须添加where对分区字段的条件过滤。
---- oredr by 语句必须包含 limit输出限制
---- 限制执行笛卡尔积的查询
4. Hive排序
order by ---- 对于查询结果做全排序,只允许有一个reduce处理。(当数据量较大时,应慎用。严格模式下,必须结合limit使用)。
Sort By ---- 对于单个reduce的数据进行排序
Distribute By ---- 分区排序,经常和Sort By结合使用
Cluster By ---- 相当于 Sort By + Distribute By (Cluster By 不能通过 asc , desc 的方式指定排序规则)
可通过 distribute by column sort by column asc|desc 的方式。
5.Hive Join
Join计算时,将小表(作为驱动表) 放在Join的左边
Map Join : 在map端完成 Join(内存中)
两种实现方式:
1.sql方式,在sql语句中添加MapJoin标记(mapjoin hint)
语法:
select /*+ mapjoin(smallTable) */ smallTable.key, bigTable.value from
smallTable join bigTable on smallTable.key=bigTable.key;
2.开启自动的MapJoin
通过修改以下配置启用自动的mapjoin;
set hive.auto.convert.join=true;
该参数为true时,Hive自动对左边的表统计量,如果是小表就加入内存,即对小表使用 mapjoin
其他相关配置参数:
set hive.mapjoin.smalltable.filesize;
大表小表判断的阈值25MB左右,如果表的大小 小于该值则会被加载到内存中运行
set hive.ignore.mapjoin.hint;
默认值:true;是否忽略mapjoin hint 即mapjoin标记
set hive.auto.convert.join.noconditionaltask;
默认值:true;将普通的join转化为普通的mapjoin时,是否将多个mapjoin转化为一个mapjoin
set hive.auto.convert.join.noconditionaltask.size;
将多个mapjoin转化为一个mapjoin时,其表的最大值,默认10M左右。
6. Map-Side聚合
例如count() max() min() avg() 等聚合函数。
通过设置以下参数开启在Map端的聚合:
set hive.map.aggr=true;
相关参数配置:
set hive.groupby.mapaggr.checkinterval;
map端group by执行聚合时处理的多少行数据(默认:100000)
set hive.map.aggr.hash.min.reduction; ## 默认是 0.5
进行聚合的最小比例(预先对100000条数据做聚合,若聚合的数据量 /100000 的值大于该配置0.5,则不会聚合)
set hive.map.aggr.hash.perentmemory;
map端聚合使用的内存的最大值
set hive.map.aggr.hash.force.flush.memory.threshold:
map端做聚合操作是hash表的最大可用内容,大于该值则会触发flush
set hive.groupby.skewindata
是否对GroupBy产生的数据倾斜做优化,默认为false
数据倾斜:给key加上随机数,增加map的数量,增加分区
7.控制Hive中Map以及Reduce的数量
Map数量相关的参数:
set mapred.max.split.size;
一个split的最大值,即每个map处理文件的最大值 默认256M
set mapred.min.split.size.per.node
一个节点上split的最小值 默认1字节
set mapred.min.split.size.per.rack
一个机架上split的最小值 默认1字节
Reduce数量相关参数:
set mapred.reduce.tasks
强制指定reduce任务的数量 (默认值是-1,表示未定义,按具体的程序判断)
set hive.exec.reducers.bytes.per.reducer
每个reduce任务处理的数据量 (默认是256M)
set hive.exec.reducers.max
每个任务最大的reduce数 [Map数量 >= Reduce数量 ] (默认值是1009)
8. Hive-- JVM重用
使用场景:
1.小文件个数过多
2.task个数过多
配置参数:
set mapred.job.reuse.jvm.num.tasks=n; ##(n为task插槽个数)
缺点:设置开启之后,task插槽会一直占用资源,不论是否有task运行,直到所有的task即整个job全部执行完成时,才会释放所有的task插槽资源!