一、概要
upsertPartitioner作用主要是用来计算分区的一个类
二、与其相关比较重要的几个配置
//如果无法从先前的commit中确认平均记录大小,则此值用作平均记录大小的猜测值,默认1024hoodie.copyonwrite.record.size.estimate=1014//默认100MB是小文件hoodie.parquet.small.file.limit=104857600//Hudi将使用上一次提交来通过totalBytesWritten totalRecordsWritten计算估计的记录大小。如果先前的提交太小而无法做出准确的估算,那么Hudi会以相反的提交顺序搜索,直到找到提交的totalBytesWrited大于(hoodie.parquet.small.file.limit * hoodie.record.size.estimation.threshold)hoodie.record.size.estimation.threshold=1.0// 每个桶中应该插入的记录数hoodie.copyonwrite.insert.split.size=5000000//是否开启动态计算,如果开启则会根据数据的平均记录大小来分配每个桶中应该有的记录数hoodie.copyonwrite.insert.auto.split=true
三、核心方法
/**根据上一次commit的写入记录获取平均记录大小。用来估计将会有多少记录写到一个文件中。**/protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, HoodieWriteConfig hoodieWriteConfig) {//平均记录大小的估计值long avgSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate();// 文件大小临界值long fileSizeThreshold = (long) (hoodieWriteConfig.getRecordSizeEstimationThreshold() * hoodieWriteConfig.getParquetSmallFileLimit());try {if (!commitTimeline.empty()) {// 相反的提交顺序集合Iterator<HoodieInstant> instants = commitTimeline.getReverseOrderedInstants().iterator();while (instants.hasNext()) {HoodieInstant instant = instants.next();// 读hudi元数据文件获取commit元数据HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);//当前commit写入文件的字节总数long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();//当前commit写入文件的记录总数。对于更新,它是文件中记录的总数; 对于插入,其实际插入的记录数。long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) {avgSize = (long) Math.ceil((1.0 * totalBytesWritten) totalRecordsWritten);break;}}}} catch (Throwable t) {LOG.error("Error trying to compute average bytes/record ", t);}return avgSize;}
/***获取传入分区路径下的小文件集合*/protected List<SmallFile> getSmallFiles(String partitionPath) {// 存储小文件信息集合List<SmallFile> smallFileLocations = new ArrayList<>();// 获取完成的commitTimelineHoodieTimeline commitTimeline = table.getMetaClient().getCommitsTimeline().filterCompletedInstants();if (!commitTimeline.empty()) { // if we have some commits//取最新的一个HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();//根据rest api 获取文件信息集合List<HoodieBaseFile> allFiles = table.getBaseFileOnlyView().getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());for (HoodieBaseFile file : allFiles) {if (file.getFileSize() < config.getParquetSmallFileLimit()) {String filename = file.getFileName();SmallFile sf = new SmallFile();sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));sf.sizeBytes = file.getFileSize();smallFileLocations.add(sf);}}}return smallFileLocations;}
/*** 主要将待分配的记录进行桶的划分*/private void assignInserts(WorkloadProfile profile, JavaSparkContext jsc) {// for new inserts, compute buckets depending on how many records we have for each partitionSet<String> partitionPaths = profile.getPartitionPaths();//计算平均记录大小long averageRecordSize =averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(), config);//根据传入的路径构建出<路径,小文件列表>的数据结构Map<String, List<SmallFile>> partitionSmallFilesMap =getSmallFilesForPartitions(new ArrayList<String>(partitionPaths), jsc);for (String partitionPath : partitionPaths) {WorkloadStat pStat = profile.getWorkloadStat(partitionPath);if (pStat.getNumInserts() > 0) {//获取当前分区下小文件集合List<SmallFile> smallFiles = partitionSmallFilesMap.get(partitionPath);this.smallFiles.addAll(smallFiles);//获取待插入的记录数long totalUnassignedInserts = pStat.getNumInserts();List<Integer> bucketNumbers = new ArrayList<>();List<Long> recordsPerBucket = new ArrayList<>();// 首先尝试将带插入的记录数打包到一个小文件中for (SmallFile smallFile : smallFiles) {// 可以写入的记录数long recordsToAppend = Math.min((config.getParquetMaxFileSize() - smallFile.sizeBytes) averageRecordSize,totalUnassignedInserts);if (recordsToAppend > 0 && totalUnassignedInserts > 0) {// 使用已经存在的bucketint bucket;if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) {bucket = updateLocationToBucket.get(smallFile.location.getFileId());LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket);} else {//创建一个新的bucketbucket = addUpdateBucket(partitionPath, smallFile.location.getFileId());LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket);}bucketNumbers.add(bucket);recordsPerBucket.add(recordsToAppend);totalUnassignedInserts -= recordsToAppend;}}// 如果本次插入还有未分配完毕的记录,则将他们存入新的bucketif (totalUnassignedInserts > 0) {//每个桶默认500000条long insertRecordsPerBucket = config.getCopyOnWriteInsertSplitSize();// 是否开启动态计算if (config.shouldAutoTuneInsertSplits()) {//如果开启动态计算,则会计算出每个桶应该有多少数据insertRecordsPerBucket = config.getParquetMaxFileSize() averageRecordSize;}int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) / insertRecordsPerBucket);for (int b = 0; b < insertBuckets; b++) {bucketNumbers.add(totalBuckets);recordsPerBucket.add(totalUnassignedInserts / insertBuckets);BucketInfo bucketInfo = new BucketInfo();bucketInfo.bucketType = BucketType.INSERT;bucketInfo.partitionPath = partitionPath;bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx();bucketInfoMap.put(totalBuckets, bucketInfo);totalBuckets++;}}// 遍历bucket,根据每个桶中的插入量计算权重List<InsertBucket> insertBuckets = new ArrayList<>();for (int i = 0; i < bucketNumbers.size(); i++) {InsertBucket bkt = new InsertBucket();bkt.bucketNumber = bucketNumbers.get(i);bkt.weight = (1.0 * recordsPerBucket.get(i)) / pStat.getNumInserts();insertBuckets.add(bkt);}LOG.info("Total insert buckets for partition path " + partitionPath + " => " + insertBuckets);partitionPathToInsertBuckets.put(partitionPath, insertBuckets);}}}
四、总结
UpsertPartitioner主要是计算将待分配记录划分到某个分区的过程,其中比较重要的是文章开头的配置,对于生产中调优是需要熟知的。
文章转载自OLAP,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




