暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Hive(Parquet)存储方式Task个数影响因素

趣说大数据 2021-02-24
673

01

写在前面的话


从源码角度探究(Parquet)存储方式,查询语句(不含Shuffle)Task个数影响因素,换句话说也就是Map的分区数。篇幅涉及大量源代码建议收藏后再看。

02

查看hive建表语句


CREATE TABLE `ods.log_info`( 
log_id string,
log_type string,
original_msg string,
create_time bigint,
update_time bigint)
PARTITIONED BY (
`d_date` string COMMENT '快照日',
`pid` string COMMENT '渠道号')
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'


复制
从建表语句中找到INPUTFORMAT 
org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
复制
03

查看MapredParquetInputFormat类

protected MapredParquetInputFormat(final ParquetInputFormat<ArrayWritable> inputFormat) {
this.realInput = inputFormat;
vectorizedSelf = new VectorizedParquetInputFormat(inputFormat);
}
复制
入参ParquetInputFormat<ArrayWritable> inputFormat 为构造方法入参,入口是这个类
04

查看ParquetInputFormat 类(getSplits 方法)


public static boolean isTaskSideMetaData(Configuration configuration) {
return configuration.getBoolean("parquet.task.side.metadata", Boolean.TRUE);
}
复制
public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
Configuration configuration = ContextUtil.getConfiguration(jobContext);
List<InputSplit> splits = new ArrayList();
if (!isTaskSideMetaData(configuration)) { // configuration.getBoolean("parquet.task.side.metadata", Boolean.TRUE); 默认返回为True
splits.addAll(this.getSplits(configuration, this.getFooters(jobContext)));
return splits;
} else {
Iterator i$ = super.getSplits(jobContext).iterator(); // 调用父类的 getSplits 返回的是List<InputSplit> 这里取的是List<InputSplit> 的迭代器


while(i$.hasNext()) {
InputSplit split = (InputSplit)i$.next();
Preconditions.checkArgument(split instanceof FileSplit, "Cannot wrap non-FileSplit: " + split);
splits.add(ParquetInputSplit.from((FileSplit)split));
}


return splits;
}
}
复制
默认走下面的else分支,即调用super.getSplits方法
public class ParquetInputFormat<T> extends FileInputFormat<Void, T>
复制
05

进入父类FileInputFormat查看(getSplits)


  • 获取切片最小 minSize

long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
复制
protected long getFormatMinSplitSize() {
return 1;
}
复制
public static final String SPLIT_MINSIZE = 
"mapreduce.input.fileinputformat.split.minsize";
public static long getMinSplitSize(JobContext job) {
return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
}
复制
由上面的公式可计算出long minSize = Math.max(1, 1); minSize =1
  • 计算切片最大maxSize

  long maxSize = getMaxSplitSize(job); 
复制
  public static final String SPLIT_MAXSIZE = 
"mapreduce.input.fileinputformat.split.maxsize";
public static long getMaxSplitSize(JobContext context) {
return context.getConfiguration().getLong(SPLIT_MAXSIZE,
Long.MAX_VALUE);
}
复制
如配置了mapreduce.input.fileinputformat.split.maxsize 
参数值的默认值是HDFS集群的块大小,这里值是256
  • 找出该路径下的所有文件 listStatus(job)

这个地方会有一个文件名过滤,过滤掉
  • 循环获取该目录下的文件大小

  • 判断文件是否可切 isSplitable(job, path)、

protected boolean isSplitable(JobContext context, Path filename) {
return true; // 默认可切
}
复制
  • 计算切片大小

long splitSize = computeSplitSize(blockSize, minSize, maxSize);
复制

computeSplitSize方法

protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
  return Math.max(minSize, Math.min(maxSize, blockSize)); 
}
复制
方法入参:blockSize 集群块大小(这里是256M)
minSize 最小默认值为1 
maxSize 最大默认值为:
mapreduce.input.fileinputformat.split.maxsize 参数控制
故变成:Math.max(1, Math.min(256, 256))
返回值为:256





完整的getSplits方法如下:
public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = new StopWatch().start();
  // 计算最小值
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
  // 计算最大值
long maxSize = getMaxSplitSize(job);


// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
  // 获取所有的文件会过滤掉以.和_开头的文件
  List<FileStatus> files = listStatus(job); 
   //循环该目录下的所有文件
for (FileStatus file: files) {
    // 处理文件信息
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
      // 判断文件是否可切 默认值 返回True
      if (isSplitable(job, path)) { 
long blockSize = file.getBlockSize();
        // 计算切片大小 Math.max(minSize, Math.min(maxSize, blockSize));
        long splitSize = computeSplitSize(blockSize, minSize, maxSize); 
long bytesRemaining = length;
      // private static final double SPLIT_SLOP = 1.1; // 10% slop
         // 当文件大小/ splitSize >1.1时对改文件进行循环切片    
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
        // 剩下的都是一块
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
    } else { //不可切d额时候直接整个加入
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); // 设置输入的文件个数
//public static final String NUM_INPUT_FILES = "mapreduce.input.fileinputformat.numinputfiles";

sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits; // 返回所有的切片信息
}
复制

06

核心代码


  • 计算最小值

long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
//由参数mapreduce.input.fileinputformat.split.minsize控制 
复制
  • 计算最大值

long maxSize = getMaxSplitSize(job);
//由参数mapreduce.input.fileinputformat.split.maxsize 控制
复制
  • 计算splitSize

Math.max(minSize, Math.min(maxSize, blockSize));
复制
  •  HDFS集群默认块大小 blockSize(这里是256M)


07

结论


  • 增大Map阶段的分区数

set mapreduce.input.fileinputformat.split.maxsize=128000000;
复制
  • 减少Map计算的分区数

set mapreduce.input.fileinputformat.split.maxsize=512000000
复制
08

测试


  • 测试SQL:

set hive.support.quoted.identifiers=None;
set spark.app.name="ods.log_info"
insert overwrite table ods.log_info_temp partition(d_date= '2021-02-21',pid)
select `(is_settled)?+.+`
from ods.log_info
where 1 > 0
复制
  • 结果如下:

set mapreduce.input.fileinputformat.split.maxsize=128000000;task个数为65
复制

set mapreduce.input.fileinputformat.split.maxsize=256000000;task个数为65
复制

set mapreduce.input.fileinputformat.split.maxsize=512000000;task个数为34
复制




扫描二维码获取

更多精彩




IT民工超





点个在看你最好看




文章转载自趣说大数据,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论