暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

[HUDI-源码系列] UpsertPartitioner

OLAP 2021-09-07
1182

一、概要

upsertPartitioner作用主要是用来计算分区的一个类

二、与其相关比较重要的几个配置

    //如果无法从先前的commit中确认平均记录大小,则此值用作平均记录大小的猜测值,默认1024 
    hoodie.copyonwrite.record.size.estimate=1014
    //默认100MB是小文件
    hoodie.parquet.small.file.limit=104857600
    //Hudi将使用上一次提交来通过totalBytesWritten totalRecordsWritten计算估计的记录大小。如果先前的提交太小而无法做出准确的估算,那么Hudi会以相反的提交顺序搜索,直到找到提交的totalBytesWrited大于(hoodie.parquet.small.file.limit * hoodie.record.size.estimation.threshold)
    hoodie.record.size.estimation.threshold=1.0
    // 每个桶中应该插入的记录数
    hoodie.copyonwrite.insert.split.size=5000000
    //是否开启动态计算,如果开启则会根据数据的平均记录大小来分配每个桶中应该有的记录数
    hoodie.copyonwrite.insert.auto.split=true

    三、核心方法

      /*
      *根据上一次commit的写入记录获取平均记录大小。用来估计将会有多少记录写到一个文件中。
      **/
      protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, HoodieWriteConfig hoodieWriteConfig) {
      //平均记录大小的估计值
      long avgSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate();
      // 文件大小临界值
      long fileSizeThreshold = (long) (hoodieWriteConfig.getRecordSizeEstimationThreshold() * hoodieWriteConfig.getParquetSmallFileLimit());
      try {
      if (!commitTimeline.empty()) {
      // 相反的提交顺序集合
      Iterator<HoodieInstant> instants = commitTimeline.getReverseOrderedInstants().iterator();
      while (instants.hasNext()) {
      HoodieInstant instant = instants.next();
      // 读hudi元数据文件获取commit元数据
      HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
      .fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
      //当前commit写入文件的字节总数
      long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
      //当前commit写入文件的记录总数。对于更新,它是文件中记录的总数; 对于插入,其实际插入的记录数。
      long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
      if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) {
      avgSize = (long) Math.ceil((1.0 * totalBytesWritten) totalRecordsWritten);
      break;
      }
      }
      }
      } catch (Throwable t) {
      LOG.error("Error trying to compute average bytes/record ", t);
      }
      return avgSize;
      }
        /*
        **获取传入分区路径下的小文件集合
        */
        protected List<SmallFile> getSmallFiles(String partitionPath) {


        // 存储小文件信息集合
        List<SmallFile> smallFileLocations = new ArrayList<>();
        // 获取完成的commitTimeline
        HoodieTimeline commitTimeline = table.getMetaClient().getCommitsTimeline().filterCompletedInstants();


        if (!commitTimeline.empty()) { // if we have some commits
        //取最新的一个
        HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
        //根据rest api 获取文件信息集合
        List<HoodieBaseFile> allFiles = table.getBaseFileOnlyView()
        .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());


        for (HoodieBaseFile file : allFiles) {
        if (file.getFileSize() < config.getParquetSmallFileLimit()) {
        String filename = file.getFileName();
        SmallFile sf = new SmallFile();
        sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
        sf.sizeBytes = file.getFileSize();
        smallFileLocations.add(sf);
        }
        }
        }


        return smallFileLocations;
        }
          /**
          * 主要将待分配的记录进行桶的划分
          */
          private void assignInserts(WorkloadProfile profile, JavaSparkContext jsc) {
          // for new inserts, compute buckets depending on how many records we have for each partition
          Set<String> partitionPaths = profile.getPartitionPaths();
          //计算平均记录大小
          long averageRecordSize =
          averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(), config);
          //根据传入的路径构建出<路径,小文件列表>的数据结构
          Map<String, List<SmallFile>> partitionSmallFilesMap =
          getSmallFilesForPartitions(new ArrayList<String>(partitionPaths), jsc);


          for (String partitionPath : partitionPaths) {
          WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
          if (pStat.getNumInserts() > 0) {


          //获取当前分区下小文件集合
          List<SmallFile> smallFiles = partitionSmallFilesMap.get(partitionPath);
          this.smallFiles.addAll(smallFiles);
          //获取待插入的记录数
          long totalUnassignedInserts = pStat.getNumInserts();
          List<Integer> bucketNumbers = new ArrayList<>();
          List<Long> recordsPerBucket = new ArrayList<>();


          // 首先尝试将带插入的记录数打包到一个小文件中
          for (SmallFile smallFile : smallFiles) {
          // 可以写入的记录数
          long recordsToAppend = Math.min((config.getParquetMaxFileSize() - smallFile.sizeBytes) averageRecordSize,
          totalUnassignedInserts);
          if (recordsToAppend > 0 && totalUnassignedInserts > 0) {
          // 使用已经存在的bucket
          int bucket;
          if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) {
          bucket = updateLocationToBucket.get(smallFile.location.getFileId());
          LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket);
          } else {
          //创建一个新的bucket
          bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId());
          LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket);
          }
          bucketNumbers.add(bucket);
          recordsPerBucket.add(recordsToAppend);
          totalUnassignedInserts -= recordsToAppend;
          }
          }


          // 如果本次插入还有未分配完毕的记录,则将他们存入新的bucket
          if (totalUnassignedInserts > 0) {
          //每个桶默认500000条
          long insertRecordsPerBucket = config.getCopyOnWriteInsertSplitSize();
          // 是否开启动态计算
          if (config.shouldAutoTuneInsertSplits()) {
          //如果开启动态计算,则会计算出每个桶应该有多少数据
          insertRecordsPerBucket = config.getParquetMaxFileSize() averageRecordSize;
          }


          int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) / insertRecordsPerBucket);
          for (int b = 0; b < insertBuckets; b++) {
          bucketNumbers.add(totalBuckets);
          recordsPerBucket.add(totalUnassignedInserts / insertBuckets);
          BucketInfo bucketInfo = new BucketInfo();
          bucketInfo.bucketType = BucketType.INSERT;
          bucketInfo.partitionPath = partitionPath;
          bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx();
          bucketInfoMap.put(totalBuckets, bucketInfo);
          totalBuckets++;
          }
          }


          // 遍历bucket,根据每个桶中的插入量计算权重
          List<InsertBucket> insertBuckets = new ArrayList<>();
          for (int i = 0; i < bucketNumbers.size(); i++) {
          InsertBucket bkt = new InsertBucket();
          bkt.bucketNumber = bucketNumbers.get(i);
          bkt.weight = (1.0 * recordsPerBucket.get(i)) / pStat.getNumInserts();
          insertBuckets.add(bkt);
          }
          LOG.info("Total insert buckets for partition path " + partitionPath + " => " + insertBuckets);
          partitionPathToInsertBuckets.put(partitionPath, insertBuckets);
          }
          }
          }


          四、总结

          UpsertPartitioner主要是计算将待分配记录划分到某个分区的过程,其中比较重要的是文章开头的配置,对于生产中调优是需要熟知的。

          文章转载自OLAP,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

          评论