Hive函数分类

从输入输出角度分类

  • 标准函数:一行数据中的一列或多列为输入,结果为单一值
  • 聚合函数:多行的零列到多列为输入,结果为单一值
  • 表生成函数:零个或多个输入,结果为多列或多行

从实现方式分类

  • 内置函数

  • 自定义函数

     UDF:自定义标准函数
     UDAF:自定义聚合函数
     UDTF:自定义表生成函数
    

内置函数

Hive提供大量内置函数供开发者使用

  • 标准函数

     字符函数
     类型转换函数
     数学函数
     日期函数
     集合函数
     条件函数
    
  • 聚合函数

  • 表生成函数

字符函数

返回值 函数 描述
string concat(string|binary A,string|binary B…) 对二进制字节码或字符串按次序进行拼接
int instr(string str,string substr) 查找字符串str中子字符串substr出现的位置
int length(string A) 返回字符串的长度
int locate(string substr,string str[,int pos]) 查找字符串str中的pos位置后字符串substr第一次出现的位置
string lower(string A)/upper(string A) 将字符串A的所有字母转换成小写/大写字母
string regexp_replace(string INITIAL_STRING, string PATTERN, string REPLACEMENT) 按正则表达式PATTERN将字符串中符合条件的部分替换成REPLACEMENT所指定的字符串
array split(string str, string pat) 按照正则表达式pat来分割字符串str
string substr(string|binary A, int start, int len)substring(string|binary A, int start, int len) 对字符串A,从start位置开始截取长度为len的字符串并返回
string trim(string A) 将字符串A前后出现的空格去掉
map str_to_map(text[, delimiter1, delimiter2]) 将字符串str按照指定分隔符转换成Map
binary encode(string src, string charset) 用指定字符集charset将字符串编码成二进制值

类型转换函数

返回值 类型转换函数 描述
“type” cast(expr as <type>) 将expr转换成type类型 如:cast(“1” as BIGINT) 将字符串1转换成了BIGINT类型
binary binary(string|binary) 将输入的值转换成二进制

数学函数

返回值 数学函数 描述
double round(double a) 返回对a四舍五入的biging值
binary round(double a,int b) 返回对a四舍五入并保留d位小数位的值
bigint floor(double a) 向下取整,如:6.10->6 -3.4->4
double rand(int seed) 返回一个double型随机数,seed是随机因子
double power(double a,double p) 计算a的p次幂
double abs(double a) 计算a的绝对值

日期函数

返回值 日期函数 描述
string from_unixtime(bigint unixtime[, string format]) 时间戳转换成format格式
int unix_timestamp() 获取本地时区下的时间戳
bigint unix_timestamp(string date) 将格式为yyyy-MM-dd HH:mm:ss的时间字符串转换成时间戳
string to_date(string timestamp) 返回时间字符串的日期部分
int year(string date)month/day/hour/minute/second/weekofyear 返回时间字符串的年份部分返回月/天/时/分/秒/第几周
int datediff(string enddate, string startdate) 计算开始时间到结束时间相差的天数
string date_add(string startdate, int days) 从开始时间startdate加上days
string date_sub(string startdate, int days) 从开始时间startdate减去days
date current_date 返回当前时间的日期
timestamp current_timestamp 返回当前时间戳
string date_format(date/timestamp/string ts, string fmt) 按指定格式返回时间date 如:date_format(“2016-06-22”,“MM-dd”)=06-22

集合函数

返回值 集合函数 描述
int size(Map<K.V>) 返回map中键值对个数
int size(Array) 返回数组的长度
array map_keys(Map<K.V>) 返回map中的所有key
array map_values(Map<K.V>) 返回map中的所有value
boolean array_contains(Array, value) 如该数组Array包含value返回true。,否则返回false
array sort_array(Array) 对数组进行排序

条件函数

返回值 条件函数 描述
T if(boolean testCondition, T valueTrue, T valueFalseOrNull) 如果testCondition 为true就返回valueTrue,否则返回valueFalseOrNull
T nvl(T value, T default_value) value为NULL返回default_value,否则返回value
T COALESCE(T v1, T v2, …) 返回第一非null的值,如果全部都为NULL就返回NULL
T CASE a WHEN b THEN c [WHEN d THEN e]* [ELSE f] END 如果a=b就返回c,a=d就返回e,否则返回f
T CASE WHEN a THEN b [WHEN c THEN d]* [ELSE e] END 如果a=ture就返回b,c= ture就返回d,否则返回e
boolean isnull( a ) 如果a为null就返回true,否则返回false
boolean isnotnull ( a ) 如果a为非null就返回true,否则返回false

聚合函数

聚合函数

  • count、sum、max、min、avg、var_samp等

表生成函数

表生成函数:输出可以作为表使用

返回值 函数 描述
N rows explode(array) 对于array中的每个元素生成一行且包含该元素
N rows explode(MAP) 每行对应每个map键值对,其中一个字段是map的键,另一个字段是map的值
N rows posexplode(ARRAY) 与explode类似,不同的是还返回各元素在数组中的位置
N rows stack(INT n, v_1, v_2, …, v_k) 把k列转换成n行,每行有k/n个字段,其中n必须是常数
tuple json_tuple(jsonStr, k1, k2, …) 从一个JSON字符串中获取多个键并作为一个元组返回,与get_json_object不同的是此函数能一次获取多个键值

Hive UDF开发流程

  • 继承UDF类或GenericUDF类
  • 重写evaluate()方法并实现函数逻辑
  • 编译打包为jar文件
  • 复制到正确的HDFS路径
  • 使用jar创建临时/永久函数
  • 调用函数

Hive UDF实现

Java IDE,JDK,Maven
继承UDF并重写evaluate()方法
演示:实现以下两个自定义函数

  • string_lower(letter)=LETTER
  • arraycontains(List(“a”,“b”,“c”),“d”)=true

编译、测试和打包jar文件,上传jar并调用函数

hdfs -put –f df-hiveudf-1.0-SNAPSHOT.jar /apps/hive/functions 
-- beeline中create函数并使用
CREATE FUNCTION str_lower AS 'cn.kgc.hiveudf.udf.StringLower' 
USING JAR 'hdfs:////apps/hive/functions/df-hiveudf-1.0-SNAPSHOT.jar';
select str_lower(name), work_place from employee; 

示例一:把小写字母转换成大写字母

  • 首先建立Maven功能
  • 下载两个依赖包
<dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>2.6.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-exec</artifactId>
      <version>1.2.1</version>
    </dependency>

在这里插入图片描述

  • 编写java代码
package cn.kgc.kb09.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public class TestUDF extends UDF {
    public Text evaluate(Text str) {
        if (null == str) {
            return null;
        }
        return new Text(str.toString().toUpperCase());
    }
    public static void main(String[] args) {
        TestUDF tu = new TestUDF();
        Text rst = tu.evaluate(new Text("abc"));
        System.out.println(rst);
    }
}

输出结果:
在这里插入图片描述

示例二:输入一个时间,添加小时

package cn.kgc.kb09.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
public class AddHour extends UDF {
    public Text evaluate(Text beforeDate, IntWritable hours) throws Exception {
        //把接收到的字符串(Text类型)和整型转成java的类型
        String d = beforeDate.toString();
        //String[] dAndT = d.split(" ");
        //String[] day = dAndT[0].split("-");
        //String[] time = dAndT[1].split(":");
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date date = sdf.parse(d);
        String rst = sdf.format(new Date(date.getTime() + hours.get() * 60 * 60 * 1000));
        return new Text(rst);
    }
    public static void main(String[] args) throws Exception{
        AddHour ah = new AddHour();
        Text t = new Text("2020-09-22 10:20:00");
        IntWritable iw = new IntWritable(3);
        System.out.println(ah.evaluate(t, iw));
    }
}

输出结果:在这里插入图片描述

示例三:两个时间相减

package cn.kgc.kb09.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import java.text.SimpleDateFormat;
import java.util.Date;
public class HourDiff extends UDF {
    public IntWritable evaluate(Text date1, Text date2) throws Exception {
        String d1 = date1.toString();
        String d2 = date2.toString();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date dt1 = sdf.parse(d1);
        Date dt2 = sdf.parse(d2);
        long diff = dt1.getTime() - dt2.getTime();
        int rst = (int) (diff / 1000 / 60 / 60);
        return new IntWritable(rst);
    }
    public static void main(String[] args) throws Exception{
        HourDiff hd = new HourDiff();
        System.out.println(hd.evaluate(new Text("2020-09-21 12:00:00"),
                new Text("2020-09-23 23:00:00")));
    }
}

输出结果:
在这里插入图片描述

使用hive实现函数

  • 删除main方法
package cn.kgc.kb09.udf;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import java.text.SimpleDateFormat;
import java.util.Date;

public class HourDiff extends UDF {
    public IntWritable evaluate(Text date1, Text date2) throws Exception {
        String d1 = date1.toString();
        String d2 = date2.toString();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date dt1 = sdf.parse(d1);
        Date dt2 = sdf.parse(d2);
        long diff = dt1.getTime() - dt2.getTime();
        int rst = (int) (diff / 1000 / 60 / 60);
        return new IntWritable(rst);
    }
}
  • 打jar包,不要选择main方法
    在这里插入图片描述
  • 在hdfs端上创建一个文件夹
[root@hadoop100 ~]# hdfs dfs -mkdir -p /apps/hive/functions
  • 把jar包放到root目录下
    在这里插入图片描述

  • 下载zip包

[root@hadoop100 ~]# yum install -y zip
  • 删除jar包的签名文件(-d后面的参数是自己jar包的名字)
[root@hadoop100 ~]# zip -d testUDF.jar 'META-INF/.SF' 'META-INF/.RSA' 'META-INF/*SF'

在这里插入图片描述

  • 在hive中添加jar包
hive> add jar /root/testUDF.jar;

在这里插入图片描述

建立hive临时函数

hive> create temporary function add_hour as 'cn.kgc.kb09.udf.AddHour';

在这里插入图片描述

  • 输入一个时间,查询
hive> select add_hour("2020-08-20 10:00:05",1000);

在这里插入图片描述

建立hive永久函数

  • 把jar包上传到hdfs端
[root@hadoop100 ~]# hdfs dfs -put testUDF.jar /apps/hive/functions
  • 创建永久函数
hive> create function hour_diff as 'cn.kgc.kb09.udf.HourDiff'
    > using jar 'hdfs://hadoop100:9000/apps/hive/functions/testUDF.jar';

在这里插入图片描述

  • 输入两个时间查看差值
select hour_diff("2020-01-20 10:20:33","2020-02-10 11:10:10");

在这里插入图片描述
注意:临时函数可以在每个库中使用,退出hive就使用不了函数了。
永久函数只能在所在库中使用函数,如果想要在其他库中使用,就需要函数所在库名.函数名才能使用,退出hive了下次还是可以使用函数

Hive事务

事务(Transaction)指一组单元化操作,这些操作要么都执行,要么都不执行
ACID特性

  • Atomicity:原子性
  • Consistency:一致性
  • Isolation:隔离性
  • Durability:持久性

Hive事务的特点和局限性

V0.14版本开始支持行级事务

  • 支持INSERT、DELETE、UPDATE(v2.2.0开始支持Merge)
  • 文件格式只支持ORC
    局限
  • 表必须是bucketed(分桶)表
  • 需要消耗额外的时间、资源和空间
  • 不支持开始、提交、回滚、桶或分区列上的更新
  • 锁可以为共享锁或排它锁(串联的而不是并发)
  • 不允许从一个非ACID连接读写ACID表
  • 使用较少

Hive事务的开启和设置

  • 通过Hive命令行方式设置,当前session有效
  • 通过配置文件设置,全局有效
  • 通过UI工具(如Ambari)设置
-- 通过命令行方式开启事务
set hive.support.concurrency = true;
set hive.enforce.bucketing = true;
set hive.exec.dynamic.partition.mode = nonstrict;
set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
set hive.compactor.initiator.on = true;
set hive.compactor.worker.threads = 1; 
-- 通过配置文件hive-site.xml
<property> 
<name>hive.support.concurrency</name> 
<value>true</value>
 </property>
 <property> 
<name>hive.txn.manager</name> <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>

在这里插入图片描述

Hive PLSQL

Hive PLSQL:Hive存储过程(v2.0过后)

  • 支持SparkSQL和Impala
  • 兼容Oracle、DB2、MySQL、TSQL标准
  • 使将现有的过程迁移到Hive变得简单和高效
  • 使编写UDF不需要Java技能
  • 它的性能比Java UDF稍微慢一些
  • 功能较新
    在Hive2 bin目录下运行./hplsql
    在这里插入图片描述
./hplsql -f plsql_demo.pl
RETURNS STRING 
BEGIN RETURN 'Hello, ' || text || '!'; 
END;
Print hello(' word') 

CREATE PROCEDURE getCount()
BEGIN DECLARE cnt INT = 0;	
SELECT COUNT(*) INTO cnt FROM employee;
PRINT 'Users cnt: ' || cnt;
END;
call getCount();

Hive性能调优工具

EXPLAN

在这里插入图片描述

ANALYZE

ANALYZE:分析表数据,用于执行计划选择的参考

  • 收集表的统计信息,如行数、最大值等
  • 使用时调用该信息加速查询
    语法
ANALYZE TABLE employee COMPUTE STATISTICS; 

ANALYZE TABLE employee_partitioned 
PARTITION(year=2014, month=12) COMPUTE STATISTICS;

ANALYZE TABLE employee_id COMPUTE STATISTICS 
FOR COLUMNS employee_id;

Hive优化设计

  • 使用分区表、桶表
  • 使用索引
  • 使用适当的文件格式,如orc、avro、parquet
  • 使用适当的压缩格式,如snappy
  • 考虑数据本地化-增加一些副本
  • 避免小文件
  • 使用Tez引擎代替MapReduce
  • 使用Hive LLAP(在内存中读取缓存)
  • 考虑在不需要时关闭并发

Job优化

本地模式运行

Hive支持将作业自动转换为本地模式运行

  • 当要处理的数据很小时,完全分布式的启动时间比作业处理时间要长
-- 通过以下设置开启本地模式
SET hive.exec.mode.local.auto=true; --default false 
SET hive.exec.mode.local.auto.inputbytes.max=50000000; 
SET hive.exec.mode.local.auto.input.files.max=5; --default 4

Job必须满足一下条件才能在本地模式下运行

  • Job总输入大小小于hive.exec.mode.local.auto. inputbytes.max
  • map任务总数小于 hive.exec.mode.local.auto. input.files.max
  • 所需的Reduce任务总数为1或0

JVM重用(JVM Reuse)

通过JVM重用减少JVM启动的消耗

  • 默认每个Map或Reduce启动一个新的JAVM
  • Map或Reduce运行时间很短时,JVM启动过程占很大开销
  • 通过共享JVM来重用JVM,以串行方式运行MapReduce Job
  • 适用于同一个Job总的Map或Reduce任务
  • 对于不同Job的任务,总是在独立的JVM中运行
-- 通过以下设置开启JVM重用
set mapred.job.reuse.jvm.num.tasks = 5;  -- 默认值为1

并行执行

并行执行可提高集群利用率

  • Hive查询通常被转换成许多按默认顺序执行的阶段
  • 这些阶段并不总是相互依赖的
  • 它们可以并行运行以节省总体作业运行时间
  • 如果集群的利用率已经很高,并行执行帮助不大
-- 通过以下设置开启并行执行
SET hive.exec.parallel=true;  -- default false 
SET hive.exec.parallel.thread.number=16;  -- default 8,定义并行运行的最大数量

查询优化

  • 自动启动Map端Join
  • 防止数据倾斜
set hive.optimize.skewjoin=true;	
  • 启用CBO(Cost based Optimizer)
set hive.cbo.enable=true; 
set hive.compute.query.using.stats=true; 
set hive.stats.fetch.column.stats=true; 
set hive.stats.fetch.partition.stats=true;	
  • 启动Vectorization(矢量化)
set hive.vectorized.execution.enabled = true; 
set hive.vectorized.execution.reduce.enabled = true;
  • 使用CTE、临时表、窗口函数等正确的编码约定

压缩算法

减少传输数据量,会极大提升MapReduce性能

  • 采用数据压缩是减少数据量的很好的方式
    常用压缩方法对比
压缩方式 是否可分隔 压缩后大小 压缩解压速度
gzip
lzo
snappy
bzip2
Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐