Hive任务常见设置
设置Hive执行引擎为 mr|spark|tez
set hive.execution.engine=mr|spark|tez;
复制
设置任务提交队列
set mapreduce.job.queuename=root.default;
复制
设置Spark Driver的内存
set spark.driver.memory=4g;
复制
设置Spark Driver的堆外内存
set spark.driver.memoryOverhead=4g;
复制
设置Spark Executor的内存
set spark.executor.memory=4g;
复制
设置Spark Executor的堆外内存
set spark.executor.memoryOverhead=4g;
复制
设置Spark Executor通信超时时间
set spark.executor.heartbeatInterval=60s;
复制
设置Spark Client超时时间
set hive.spark.client.future.timeou=360;
复制
设置非严格模式
set hive.mapred.mode=nostrict;
复制
设置动态分区
set hive.exec.dynamic.partition=true;
复制
设置动态分区为非严格模式
set hive.exec.dynamic.partition.mode=nonstrict;
复制
设置每个执行MR的节点上,最大可以创建多少个分区
set hive.exec.max.dynamic.partitions.pernode=1000;
复制
复制
设置所有执行MR的节点上,最大可以创建多少个分区
set hive.exec.max.dynamic.partitions=100000;
复制
复制
设置整个MR Job中,最大可以创建多少个HDFS 文件
set hive.exec.max.created.files=100000;
复制
复制
设置当有空分区生成时,是否抛出异常
set hive.error.on.empty.partition=false;
复制
复制
设置关闭MapJoin
set hive.auto.convert.join=false;
复制
复制
设置小表大小
SET hive.mapjoin.smalltable.filesize=52428800
复制
控制自动转换Map join的小表大小
SET hive.auto.convert.join.noconditionaltask.size=10000000
复制
复制
设置MR中间数据压缩,默认是false
set hive.exec.compress.intermediate=true;
复制
设置MR中间数据压缩算法
set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.snappycodec;
复制
复制
设置MR输出数据压缩,默认是 false
set hive.exec.compress.output=true;
复制
设置MR输出数据压缩算法
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.snappycodec;
复制
复制
设置每个map处理的最大的文件大小,单位为 268435456B=256M
set mapred.max.split.size=268435456;
复制
复制
设置每个节点中可以处理的最小的文件大小
set mapred.min.split.size.per.node=100000000;
复制
设置每个reduce处理的数据量,默认1GB
set hive.exec.reducers.bytes.per.reducer=1073741824;
复制
设置 Hive 输入预聚合
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
复制
复制
设置reduce 的个数
set mapred.reduce.tasks=10;
复制
复制
开启MapReduce的任务结束时合并小文件
set hive.merge.mapfiles = true;
复制
最终合并文件后的大小
复制
set hive.merge.size.per.task = 256*1000*1000
复制
开启在仅有Map任务的时候合并小文件
hive.merge.mapfiles=true;
复制
设置触发合并的文件大小阀值
hive.merge.smallfiles.avgsize=16,000,000;
复制
开启并行执行
set hive.exec.parallel=true;
复制
设置并行执行的线程数
set hive.exec.parallel.thread.number=8;
复制
设置修复分区忽略目录深度
set hive.msck.path.validation=ignore;
--解决分区数过多,执行修复表命令报错问题
复制
JVM重用
复制
SET mapreduce.job.jvm.numtasks=5;
复制
关闭Mapjoin使用的hashtable
set hive.mapjoin.optimized.hashtable=false;
复制
设置使用正则做查询字段
set hive.support.quoted.identifiers=None;
复制
开启group by使用位置下标
set hive.groupby.orderby.position.alias=true;
复制
开启Hive数据倾斜自动优化
set hive.groupby.skewindata=true;
复制
设置任务名称
set mapred.job.name=test;-- Mr 任务通过这个指定
set spark.app.name=test; --spark 任务通过这个指定
复制
较长参数介绍
set spark.shuffle.memoryFraction=0.2;
-- Shuffle过程中拉取上一个stage数据,能使用的内存大小
set spark.storage.memoryFraction=0.6;
-- Rdd持久化能使用的executor内存比率
set spark.network.maxRemoteBlockSizeFetchToMem=200M
-- 当块的大小超过此阈值(以字节为单位)时,将写入磁盘,
-- 避免占用大量内存的请求。默认值
set hive.mapjoin.followby.gby.localtask.max.memory.usage=0.9;
-- mapjoin本地任务执行时hash表容纳key/value的最大量,超过这个值的话本地任务
-- 会自动退出,默认是0.9;
set hive.mapjoin.localtask.max.memory.usage=0.9;
-- 类似上面,只不过是如果mapjoin后有一个group by的话,该配置控制类似这样的
-- query的本地内存容量上限,默认是0.55;
set hive.mapjoin.check.memory.rows=100000;
-- 在运算了多少行后执行内存使用量检查,默认100000;
set mapreduce.input.fileinputformat.split.maxsize=1024000000;
--设置mapreduce输入文件最大值
复制
设置任务名称
set mapred.job.name=web_logs; --执行引擎为mr时设置任务名称
set spark.app.name=web_logs;--执行引擎为spark时设置任务名称
set hive.groupby.orderby.position.alias=true; --group by时使用下标
--sql 下标从1开始按顺序
select
client_ip,
url,
count(1)
from web_logs
group by 1,2;
复制
使用正则表达式查询
set hive.support.quoted.identifiers=None;
--开启查询 使用正则,过滤掉字段client_ip和url
select
`(client_ip|url)?+.+`
from web_logs
复制
动态分区
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions=200000;
set hive.exec.max.dynamic.partitions.pernode=50000;
复制
设置输出合并小文件
set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
set hive.merge.size.per.task=1024000000;
set hive.merge.smallfiles.avgsize=1024000000;
-- 当一个作业的输出结果文件的大小
-- 小于hive.merge.smallfiles.avgsize设定的阈值,
-- 并且hive.merge.mapfiles与hive.merge.mapredfiles设置为true,
-- Hive会额外启动一个mr作业将输出小文件合并成大文件
复制
设置spark任务参数
set spark.executor.memory=4g;
set spark.executor.memoryOverhead=4g;
set spark.shuffle.memoryFraction=0.6;
-- spark.shuffle.memoryFraction
-- 参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的
-- 输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。
-- 也就是说,Executor默认只有20%的内存用来进行该操作。
-- shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,
-- 那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。
-- 参数调优建议:如果Spark作业中的RDD持久化操作较少,
-- shuffle操作较多时,建议降低持久化操作的内存占比,
-- 提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,
-- 必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,
-- 意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。
set spark.storage.memoryFraction=0.2;
-- spark.storage.memoryFraction 该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。
-- 也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。
-- 根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,
-- 或者数据会写入磁盘。
-- 参数调优建议:如果Spark作业中,有较多的RDD持久化操作,
-- 该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。
-- 避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。
-- 但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,
-- 那么这个参数的值适当降低一些比较合适。此外,
-- 如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时),
-- 意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。
set spark.maxRemoteBlockSizeFetchToMem=200m;
复制
Mr任务设置参数
set hive.execution.engine=mr;
set mapreduce.map.memory.mb=4096;
set mapreduce.reduce.memory.mb=4096;
set mapreduce.input.fileinputformat.split.maxsize=1024000000;
set mapreduce.input.fileinputformat.split.minsize=1024000000;
--mapreduce.input.fileinputformat.split.maxsize 和mapreduce.input.fileinputformat.split.minsize
-- 在读取parquet表时,指定其中一个可以减少Map的个数,这里默认1个G划分一个Map
set mapred.job.name=jobname_20210410;
set hive.exec.parallel=true; --开启并行执行
set hive.exec.parallel.thread.number=10;
-- Hive的查询通常会被转换成一系列的stage,这些stage之间并不是一直相互依赖的,
-- 所以可以并行执行这些stage
复制
注意:并行执行可以增加集群资源的利用率,如果集群的资源使用率已经很高了,那么并行执行的效果不会很明显。
设置开启MapJoin
set hive.auto.convert.join=true;
--是否启用基于输入文件的大小,将普通连接转化为Map连接的优化机制
set hive.mapjoin.smalltable.filesize=50000000;
-- 一旦开启map端join配置,Hive会自动检查小表
-- 是否大于hive.mapjoin.smalltable.filesize配置的大小,
-- 如果大于则转为普通的join,如果小于则转为map端join。
-- 首先,Task A(客户端本地执行的task)负责读取小表a,
-- 并将其转成一个HashTable的数据结构,写入到本地文件,
-- 之后将其加载至分布式缓存。然后,Task B任务会启动map任务读取大表b,
-- 在Map阶段,根据每条记录与分布式缓存中的a表对应的hashtable关联,
-- 并输出结果。
set hive.auto.convert.join.noconditionaltask=true;
-- 是否启用基于输入文件的大小,将普通连接转化为Map连接的优化机制。
-- 假设参与连接的表(或分区)有N个,如果打开这个 参数,
-- 并且有N-1个表(或分区)的大小总和小于
-- hive.auto.convert.join.noconditionaltask.size参数指定的值,
-- 那么会直接将连接转为Map连接。
SET hive.auto.convert.join.noconditionaltask.size=10000000;
-- 如果hive.auto.convert.join.noconditionaltask是关闭的,
--- 则本参数不起作用。否则,如果参与连接的N个表(或分区)中的N-1个 的总大小小于这个参数的值,
-- 则直接将连接转为Map连接。默认值为10MB
复制
注意:map端join没有reduce任务,所以map直接输出结果,即有多少个map任务就会产生多少个结果文件。
Jvm重用
SET mapreduce.job.jvm.numtasks=5;
-- 默认情况下,Hadoop会为为一个map或者reduce启动一个JVM,
-- 这样可以并行执行map和reduce。当map或者reduce是那种仅运行几秒钟
-- 的轻量级作业时,
-- JVM启动进程所耗费的时间会比作业执行的时间还要长。Hadoop可以重用JVM,
-- 通过共享JVM以串行而非并行的方式运行map或者reduce。
-- JVM的重用适用于同一个作业的map和reduce,
-- 对于不同作业的task不能够共享JVM。如果要开启JVM重用,
复制
修复分区过多的表
set hive.msck.path.validation=ignore;
MSCK [REPAIR] TABLE table_name [ADD/DROP/SYNC PARTITIONS];
复制
内部表与外部表相互转换
alter table tablePartition set TBLPROPERTIES ('EXTERNAL'='TRUE'); //内部表转外部表
alter table tablePartition set TBLPROPERTIES ('EXTERNAL'='FALSE'); //外部表转内部表
复制
SparkSql删除分区
ALTER TABLE table_name DROP [IF EXISTS] PARTITION partition_spec[, PARTITION partition_spec, ...]
复制