暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Hive 任务常见参数设置及作业优化

趣说大数据 2021-04-12
3075


01

Hive任务常见设置


  • 设置Hive执行引擎为 mr|spark|tez

set hive.execution.engine=mr|spark|tez;
复制
  • 设置任务提交队列

set mapreduce.job.queuename=root.default;
复制
  • 设置Spark Driver的内存

set spark.driver.memory=4g;
复制
  • 设置Spark Driver的堆外内存

set spark.driver.memoryOverhead=4g;
复制
  • 设置Spark Executor的内存

set spark.executor.memory=4g;
复制
  • 设置Spark Executor的堆外内存

set spark.executor.memoryOverhead=4g;
复制
  • 设置Spark Executor通信超时时间

set spark.executor.heartbeatInterval=60s;
复制
  • 设置Spark Client超时时间

set hive.spark.client.future.timeou=360;
复制
  • 设置非严格模式

set hive.mapred.mode=nostrict;
复制
  • 设置动态分区

set hive.exec.dynamic.partition=true;
复制
  • 设置动态分区为非严格模式

set hive.exec.dynamic.partition.mode=nonstrict;
复制
  • 设置每个执行MR的节点上,最大可以创建多少个分区

set hive.exec.max.dynamic.partitions.pernode=1000;
复制
复制
  • 设置所有执行MR的节点上,最大可以创建多少个分区

set hive.exec.max.dynamic.partitions=100000;
复制
复制
  • 设置整个MR Job中,最大可以创建多少个HDFS 文件

set hive.exec.max.created.files=100000;
复制
复制
  • 设置当有空分区生成时,是否抛出异常

set hive.error.on.empty.partition=false;
复制
复制
  • 设置关闭MapJoin

set hive.auto.convert.join=false;
复制
复制
  • 设置小表大小

SET hive.mapjoin.smalltable.filesize=52428800
复制
  • 控制自动转换Map join的小表大小

SET hive.auto.convert.join.noconditionaltask.size=10000000
复制
复制
  • 设置MR中间数据压缩,默认是false

set hive.exec.compress.intermediate=true;
复制
  • 设置MR中间数据压缩算法

set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.snappycodec;
复制
复制
  • 设置MR输出数据压缩,默认是 false

set hive.exec.compress.output=true;
复制
  • 设置MR输出数据压缩算法

set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.snappycodec;
复制
复制
  • 设置每个map处理的最大的文件大小,单位为‪ 268435456‬B=256M

set mapred.max.split.size=‪268435456‬;
复制
复制
  • 设置每个节点中可以处理的最小的文件大小

set mapred.min.split.size.per.node=100000000;
复制
  • 设置每个reduce处理的数据量,默认1GB

set hive.exec.reducers.bytes.per.reducer=1073741824;
复制
  • 设置 Hive 输入预聚合

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
复制
复制
  • 设置reduce 的个数

set mapred.reduce.tasks=10;
复制
复制
  • 开启MapReduce的任务结束时合并小文件

set hive.merge.mapfiles = true;
复制
  • 最终合并文件后的大小
    复制
set hive.merge.size.per.task = 256*1000*1000
复制
  • 开启在仅有Map任务的时候合并小文件

hive.merge.mapfiles=true;
复制
  • 设置触发合并的文件大小阀值

hive.merge.smallfiles.avgsize=16,000,000;
复制
  • 开启并行执行

set hive.exec.parallel=true;
复制
  • 设置并行执行的线程数

set hive.exec.parallel.thread.number=8;
复制
  • 设置修复分区忽略目录深度

set hive.msck.path.validation=ignore;
--解决分区数过多,执行修复表命令报错问题
复制
  • JVM重用
    复制
SET mapreduce.job.jvm.numtasks=5;
复制
  • 关闭Mapjoin使用的hashtable

set hive.mapjoin.optimized.hashtable=false;
复制
  • 设置使用正则做查询字段

set hive.support.quoted.identifiers=None;
复制
  • 开启group by使用位置下标

set hive.groupby.orderby.position.alias=true;
复制
  • 开启Hive数据倾斜自动优化

set hive.groupby.skewindata=true;
复制
  • 设置任务名称

set mapred.job.name=test;-- Mr 任务通过这个指定
set spark.app.name=test; --spark 任务通过这个指定
复制
  • 较长参数介绍

set spark.shuffle.memoryFraction=0.2;
-- Shuffle过程中拉取上一个stage数据,能使用的内存大小


set spark.storage.memoryFraction=0.6;
-- Rdd持久化能使用的executor内存比率


set spark.network.maxRemoteBlockSizeFetchToMem=200M
-- 当块的大小超过此阈值(以字节为单位)时,将写入磁盘,
-- 避免占用大量内存的请求。默认值


set hive.mapjoin.followby.gby.localtask.max.memory.usage=0.9;
-- mapjoin本地任务执行时hash表容纳key/value的最大量,超过这个值的话本地任务
-- 会自动退出,默认是0.9;




set hive.mapjoin.localtask.max.memory.usage=0.9;
-- 类似上面,只不过是如果mapjoin后有一个group by的话,该配置控制类似这样的
-- query的本地内存容量上限,默认是0.55;


set hive.mapjoin.check.memory.rows=100000;
-- 在运算了多少行后执行内存使用量检查,默认100000;


set mapreduce.input.fileinputformat.split.maxsize=1024000000;
--设置mapreduce输入文件最大值


复制
02

设置任务名称


set mapred.job.name=web_logs; --执行引擎为mr时设置任务名称
set spark.app.name=web_logs;--执行引擎为spark时设置任务名称
set hive.groupby.orderby.position.alias=true; --group by时使用下标
--sql 下标从1开始按顺序
select
client_ip,
url,
count(1)
from web_logs
group by 1,2;
复制


03

使用正则表达式查询


set hive.support.quoted.identifiers=None
--开启查询 使用正则,过滤掉字段client_ip和url
select
`(client_ip|url)?+.+`
from web_logs
复制


03

动态分区


set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions=200000;
set hive.exec.max.dynamic.partitions.pernode=50000;
复制


04

设置输出合并小文件


set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
set hive.merge.size.per.task=1024000000;
set hive.merge.smallfiles.avgsize=1024000000;
-- 当一个作业的输出结果文件的大小
-- 小于hive.merge.smallfiles.avgsize设定的阈值,
-- 并且hive.merge.mapfiles与hive.merge.mapredfiles设置为true,
-- Hive会额外启动一个mr作业将输出小文件合并成大文件
复制


05

设置spark任务参数


set spark.executor.memory=4g;
set spark.executor.memoryOverhead=4g;
set spark.shuffle.memoryFraction=0.6
-- spark.shuffle.memoryFraction
-- 参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的
-- 输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。
-- 也就是说,Executor默认只有20%的内存用来进行该操作。
-- shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,
-- 那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。


-- 参数调优建议:如果Spark作业中的RDD持久化操作较少,
-- shuffle操作较多时,建议降低持久化操作的内存占比,
-- 提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,
-- 必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,
-- 意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。
set spark.storage.memoryFraction=0.2;
-- spark.storage.memoryFraction 该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。
-- 也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。
-- 根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,
-- 或者数据会写入磁盘。
-- 参数调优建议:如果Spark作业中,有较多的RDD持久化操作,
-- 该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。
-- 避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。
-- 但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,
-- 那么这个参数的值适当降低一些比较合适。此外,
-- 如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时),
-- 意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。
set spark.maxRemoteBlockSizeFetchToMem=200m;
复制


06

Mr任务设置参数


set hive.execution.engine=mr;
set mapreduce.map.memory.mb=4096;
set mapreduce.reduce.memory.mb=4096;
set mapreduce.input.fileinputformat.split.maxsize=1024000000;
set mapreduce.input.fileinputformat.split.minsize=1024000000;
--mapreduce.input.fileinputformat.split.maxsize 和mapreduce.input.fileinputformat.split.minsize
-- 在读取parquet表时,指定其中一个可以减少Map的个数,这里默认1个G划分一个Map
set mapred.job.name=jobname_20210410;
set hive.exec.parallel=true--开启并行执行
set hive.exec.parallel.thread.number=10
-- Hive的查询通常会被转换成一系列的stage,这些stage之间并不是一直相互依赖的,
-- 所以可以并行执行这些stage
复制

注意:并行执行可以增加集群资源的利用率,如果集群的资源使用率已经很高了,那么并行执行的效果不会很明显。


07

设置开启MapJoin


set hive.auto.convert.join=true;
--是否启用基于输入文件的大小,将普通连接转化为Map连接的优化机制
set hive.mapjoin.smalltable.filesize=50000000;
-- 一旦开启map端join配置,Hive会自动检查小表
-- 是否大于hive.mapjoin.smalltable.filesize配置的大小,
-- 如果大于则转为普通的join,如果小于则转为map端join。
-- 首先,Task A(客户端本地执行的task)负责读取小表a,
-- 并将其转成一个HashTable的数据结构,写入到本地文件,
-- 之后将其加载至分布式缓存。然后,Task B任务会启动map任务读取大表b,
-- 在Map阶段,根据每条记录与分布式缓存中的a表对应的hashtable关联,
-- 并输出结果。
set hive.auto.convert.join.noconditionaltask=true;
-- 是否启用基于输入文件的大小,将普通连接转化为Map连接的优化机制。
-- 假设参与连接的表(或分区)有N个,如果打开这个 参数,
-- 并且有N-1个表(或分区)的大小总和小于
-- hive.auto.convert.join.noconditionaltask.size参数指定的值,
-- 那么会直接将连接转为Map连接。
SET hive.auto.convert.join.noconditionaltask.size=10000000;
-- 如果hive.auto.convert.join.noconditionaltask是关闭的,
--- 则本参数不起作用。否则,如果参与连接的N个表(或分区)中的N-1个 的总大小小于这个参数的值,
-- 则直接将连接转为Map连接。默认值为10MB
复制

注意:map端join没有reduce任务,所以map直接输出结果,即有多少个map任务就会产生多少个结果文件。


08

Jvm重用


SET mapreduce.job.jvm.numtasks=5;
-- 默认情况下,Hadoop会为为一个map或者reduce启动一个JVM,
-- 这样可以并行执行map和reduce。当map或者reduce是那种仅运行几秒钟
-- 的轻量级作业时,
-- JVM启动进程所耗费的时间会比作业执行的时间还要长。Hadoop可以重用JVM,
-- 通过共享JVM以串行而非并行的方式运行map或者reduce。
-- JVM的重用适用于同一个作业的map和reduce,
-- 对于不同作业的task不能够共享JVM。如果要开启JVM重用,
复制
注意:开启JVM重用将一直占用使用到的task插槽,以便进行重用,直到任务完成后才能释放。如果某个“不平衡的”job中有某几个reduce task执行的时间要比其他Reduce task消耗的时间多的多的话,那么保留的插槽就会一直空闲着却无法被其他的job使用,直到所有的task都结束了才会释放。


09

修复分区过多的表


set hive.msck.path.validation=ignore;
MSCK [REPAIR] TABLE table_name [ADD/DROP/SYNC PARTITIONS];
复制

10

内部表与外部表相互转换


alter table tablePartition set TBLPROPERTIES ('EXTERNAL'='TRUE');  //内部表转外部表
alter table tablePartition set TBLPROPERTIES ('EXTERNAL'='FALSE'); //外部表转内部表
复制


11

SparkSql删除分区


ALTER TABLE table_name DROP [IF EXISTSPARTITION partition_spec[, PARTITION partition_spec, ...]
复制
在SparkSql中通过Hive删除分区命令删除分区时,分区不存在报错,加上 IF EXISTS 即可。
文章转载自趣说大数据,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论