暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Spark写Parquet源码分析

Orison的孤独奋斗 2019-01-23
578

Spark层面

Spark层面我们从 dataframe.write.parquet("path")
说起。

In DataFrame

Dataset中的 write()
方法:

  1. /**

  2.   * Interface for saving the content of the non-streaming Dataset out into external storage.

  3.   *

  4.   * @group basic

  5.   * @since 1.6.0

  6.   */

  7.  def write: DataFrameWriter[T] = {

  8.    if (isStreaming) {

  9.      logicalPlan.failAnalysis(

  10.        "'write' can not be called on streaming Dataset/DataFrame")

  11.    }

  12.    new DataFrameWriter[T](this)

  13.  }

复制

DataFrameWriter中的 parquet()
方法:

  1. def parquet(path: String): Unit = {

  2.  format("parquet").save(path)

  3. }

复制

可以看到,结果其实就是调用这个save方法,跟踪到后面,会发现最后调用到DataSource这个类里:

  1. /**

  2. * Writes the given [[DataFrame]] out to this [[DataSource]].

  3. */

  4. def write(mode: SaveMode, data: DataFrame): Unit = {

  5.  if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {

  6.    throw new AnalysisException("Cannot save interval data type into external storage.")

  7.  }

  8.  providingClass.newInstance() match {

  9.    case dataSource: CreatableRelationProvider =>

  10.      dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)

  11.    case format: FileFormat =>

  12.      writeInFileFormat(format, mode, data)

  13.    case _ =>

  14.      sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")

  15.  }

  16. }

复制

In DataSource

可以看到,写Parquet会在 FileFormat
这个case里, FileFormat
是个接口类,parquet的继承类是 ParquetFileFormat
。我们继续看 writeInFileFormat()
这个方法:

  1. /**

  2. * Writes the given [[DataFrame]] out in this [[FileFormat]].

  3. */

  4. private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: DataFrame): Unit = {

  5.  ...

  6.  ...

  7.  val plan =

  8.      InsertIntoHadoopFsRelationCommand(

  9.        outputPath = outputPath,

  10.        staticPartitionKeys = Map.empty,

  11.        customPartitionLocations = Map.empty,

  12.        partitionColumns = columns,

  13.        bucketSpec = bucketSpec,

  14.        fileFormat = format,

  15.        refreshFunction = _ => Unit, // No existing table needs to be refreshed.

  16.        options = options,

  17.        query = data.logicalPlan,

  18.        mode = mode,

  19.        catalogTable = catalogTable)

  20.    sparkSession.sessionState.executePlan(plan).toRdd

  21. }

复制

调用到了 InsertIntoHadoopFsRelationCommand
这个类,在其继承的 run()
方法里,有这样一段代码:

  1. FileFormatWriter.write(

  2.        sparkSession = sparkSession,

  3.        queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,

  4.        fileFormat = fileFormat,

  5.        committer = committer,

  6.        outputSpec = FileFormatWriter.OutputSpec(

  7.          qualifiedOutputPath.toString, customPartitionLocations),

  8.        hadoopConf = hadoopConf,

  9.        partitionColumns = partitionColumns,

  10.        bucketSpec = bucketSpec,

  11.        refreshFunction = refreshFunction,

  12.        options = options)

复制

跳转到这个write方法:

  1. def write(

  2.      sparkSession: SparkSession,

  3.      queryExecution: QueryExecution,

  4.      fileFormat: FileFormat,

  5.      committer: FileCommitProtocol,

  6.      outputSpec: OutputSpec,

  7.      hadoopConf: Configuration,

  8.      partitionColumns: Seq[Attribute],

  9.      bucketSpec: Option[BucketSpec],

  10.      refreshFunction: (Seq[TablePartitionSpec]) => Unit,

  11.      options: Map[String, String]): Unit = {

  12.    ...

  13.    ...

  14.    val outputWriterFactory =

  15.      fileFormat.prepareWrite(sparkSession, job, options, dataColumns.toStructType)

  16. }

复制

由于这个fileFormat是 ParquetFileFormat
的实例,所以我们直接看 ParquetFileFormat.prepareWrite
方法:

  1. override def prepareWrite(

  2.      sparkSession: SparkSession,

  3.      job: Job,

  4.      options: Map[String, String],

  5.      dataSchema: StructType): OutputWriterFactory = {

  6.    new OutputWriterFactory {

  7.      // This OutputWriterFactory instance is deserialized when writing Parquet files on the

  8.      // executor side without constructing or deserializing ParquetFileFormat. Therefore, we hold

  9.      // another reference to ParquetLogRedirector.INSTANCE here to ensure the latter class is

  10.      // initialized.

  11.      private val parquetLogRedirector = ParquetLogRedirector.INSTANCE


  12.        override def newInstance(

  13.          path: String,

  14.          dataSchema: StructType,

  15.          context: TaskAttemptContext): OutputWriter = {

  16.        new ParquetOutputWriter(path, context)

  17.      }


  18.      override def getFileExtension(context: TaskAttemptContext): String = {

  19.        CodecConfig.from(context).getCodec.getExtension + ".parquet"

  20.      }

  21.    }

  22. }

复制

可以看到,返回的OutputWriterFactory实例是 ParquetOutputWriter
,在 ParquetOutputWriter
里又调用到RecordWriter的实例 ParquetRecordWriter<InternalRow>
,其中的writeSupport类实例是Spark自己写的 ParquetWriteSupport
类,该类主要将Spark的InternalRow类里的字段分别使用Parquet的columnWriter去写。

如何做到的,看 ParquetWriteSupport
类中的 consumeField()
方法:

  1. private def consumeField(field: String, index: Int)(f: => Unit): Unit = {

  2.    recordConsumer.startField(field, index)

  3.    f

  4.    recordConsumer.endField(field, index)

  5.  }

复制

recordConsumer的实例在这里是 MessageColumnIORecordConsumer
类的对象。该类是Parquet的类。

同时,在 ParquetOutputWriter
类里,
的方法有着以下代码:

  1. // NOTE: This class is instantiated and used on executor side only, no need to be serializable.

  2. private[parquet] class ParquetOutputWriter(path: String, context: TaskAttemptContext)

  3.  extends OutputWriter {


  4.  private val recordWriter: RecordWriter[Void, InternalRow] = {

  5.    new ParquetOutputFormat[InternalRow]() {

  6.      override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {

  7.        new Path(path)

  8.      }

  9.    }.getRecordWriter(context)

  10.  }


  11.  override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")


  12.  override def writeInternal(row: InternalRow): Unit = recordWriter.write(null, row)


  13.  override def close(): Unit = recordWriter.close(context)

  14. }

复制

可以看到,实际的writer是 ParquetOutputFormat
类,该类也是Parquet的类,从这里我们开始进入到Parquet源代码。

Parquet层面

Parquet的写,最小到page层面,每个压缩也是在page层面。具体做法是,缓存在JVM中,当到达一个阈值后,flush到File中。接下来看看如何在代码中实现。

In ParquetOutputFormat

上文讲到Parquet-Hadoop模块的 ParquetOutputFormat
类。查看该类的getRecordWriter类代码:

  1. public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, CompressionCodecName codec)

  2.        throws IOException, InterruptedException {

  3.    final WriteSupport<T> writeSupport = getWriteSupport(conf);


  4.    CodecFactory codecFactory = new CodecFactory(conf);

  5.    long blockSize = getLongBlockSize(conf);

  6.    if (INFO) LOG.info("Parquet block size to " + blockSize);

  7.    int pageSize = getPageSize(conf);

  8.    if (INFO) LOG.info("Parquet page size to " + pageSize);

  9.    int dictionaryPageSize = getDictionaryPageSize(conf);

  10.    if (INFO) LOG.info("Parquet dictionary page size to " + dictionaryPageSize);

  11.    boolean enableDictionary = getEnableDictionary(conf);

  12.    if (INFO) LOG.info("Dictionary is " + (enableDictionary ? "on" : "off"));

  13.    boolean validating = getValidation(conf);

  14.    if (INFO) LOG.info("Validation is " + (validating ? "on" : "off"));

  15.    WriterVersion writerVersion = getWriterVersion(conf);

  16.    if (INFO) LOG.info("Writer version is: " + writerVersion);

  17.    int maxPaddingSize = getMaxPaddingSize(conf);

  18.    if (INFO) LOG.info("Maximum row group padding size is " + maxPaddingSize + " bytes");


  19.    WriteContext init = writeSupport.init(conf);

  20.    ParquetFileWriter w = new ParquetFileWriter(

  21.        conf, init.getSchema(), file, Mode.CREATE, blockSize, maxPaddingSize);

  22.    w.start();


  23.    float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO,

  24.        MemoryManager.DEFAULT_MEMORY_POOL_RATIO);

  25.    long minAllocation = conf.getLong(ParquetOutputFormat.MIN_MEMORY_ALLOCATION,

  26.        MemoryManager.DEFAULT_MIN_MEMORY_ALLOCATION);

  27.    if (memoryManager == null) {

  28.      memoryManager = new MemoryManager(maxLoad, minAllocation);

  29.    } else if (memoryManager.getMemoryPoolRatio() != maxLoad) {

  30.      LOG.warn("The configuration " + MEMORY_POOL_RATIO + " has been set. It should not " +

  31.          "be reset by the new value: " + maxLoad);

  32.    }


  33.    return new ParquetRecordWriter<T>(

  34.        w,

  35.        writeSupport,

  36.        init.getSchema(),

  37.        init.getExtraMetaData(),

  38.        blockSize, pageSize,

  39.        codecFactory.getCompressor(codec, pageSize),

  40.        dictionaryPageSize,

  41.        enableDictionary,

  42.        validating,

  43.        writerVersion,

  44.        memoryManager);

  45.  }

复制

可以看到,从conf里获取里blockSize(其实就是rowGroupSize,Parquet代码里把row group称作parquet block),pageSize等参数。然后初始化ParquetFileWriter,最后返回ParquetRecordWriter。其中的writeSupport就是spark自己写的 ParquetWriteSupport

In ParquetRecordWriter

  1. public ParquetRecordWriter(

  2.      ParquetFileWriter w,

  3.      WriteSupport<T> writeSupport,

  4.      MessageType schema,

  5.      Map<String, String> extraMetaData,

  6.      long blockSize, int pageSize,

  7.      BytesCompressor compressor,

  8.      int dictionaryPageSize,

  9.      boolean enableDictionary,

  10.      boolean validating,

  11.      WriterVersion writerVersion,

  12.      MemoryManager memoryManager) {

  13.    internalWriter = new InternalParquetRecordWriter<T>(w, writeSupport, schema,

  14.        extraMetaData, blockSize, pageSize, compressor, dictionaryPageSize, enableDictionary,

  15.        validating, writerVersion);

  16.    this.memoryManager = checkNotNull(memoryManager, "memoryManager");

  17.    memoryManager.addWriter(internalWriter, blockSize);

  18. }

  19. @Override

  20. public void write(Void key, T value) throws IOException, InterruptedException {

  21.    internalWriter.write(value);

  22. }

  23. @Override

  24. public void close(TaskAttemptContext context) throws IOException, InterruptedException {

  25.    internalWriter.close();

  26.    if (memoryManager != null) {

  27.      memoryManager.removeWriter(internalWriter);

  28.    }

  29. }

复制

可以看到,在该类里new了一个InternalParquetRecordWriter的对象。internalWriter和writeSupport这也是MR的一贯写法。

In InternalParquetRecordWriter

  1. public InternalParquetRecordWriter(

  2.    ParquetFileWriter parquetFileWriter,

  3.    WriteSupport<T> writeSupport,

  4.    MessageType schema,

  5.    Map<String, String> extraMetaData,

  6.    long rowGroupSize,

  7.    int pageSize,

  8.    BytesCompressor compressor,

  9.    int dictionaryPageSize,

  10.    boolean enableDictionary,

  11.    boolean validating,

  12.    WriterVersion writerVersion) {

  13.  this.parquetFileWriter = parquetFileWriter;

  14.  this.writeSupport = checkNotNull(writeSupport, "writeSupport");

  15.  this.schema = schema;

  16.  this.extraMetaData = extraMetaData;

  17.  this.rowGroupSize = rowGroupSize;

  18.  this.rowGroupSizeThreshold = rowGroupSize;

  19.  this.nextRowGroupSize = rowGroupSizeThreshold;

  20.  this.pageSize = pageSize;

  21.  this.compressor = compressor;

  22.  this.validating = validating;

  23.  this.parquetProperties = new ParquetProperties(dictionaryPageSize, writerVersion, enableDictionary);

  24.  initStore();

  25. }


  26. private void initStore() {

  27.  pageStore = new ColumnChunkPageWriteStore(compressor, schema, pageSize);

  28.  columnStore = parquetProperties.newColumnWriteStore(

  29.      schema,

  30.      pageStore,

  31.      pageSize);

  32.  MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);

  33.  writeSupport.prepareForWrite(columnIO.getRecordWriter(columnStore));

  34. }

复制

在初始化的时候,会初始化三个对象:pageStore,columnStore,MessageColumnIO。这三个对象是干嘛用的呢?

首先是pageStore,其是 ColumnChunkPageWriteStore
的实例,是pageWriter和column的集合,看其主要成员以及构造方法:

  1. private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>();

  2. private final MessageType schema;


  3. public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int pageSize) {

  4.    this.schema = schema;

  5.    for (ColumnDescriptor path : schema.getColumns()) {

  6.      writers.put(path,  new ColumnChunkPageWriter(path, compressor, pageSize));

  7.    }

  8. }

复制

可以看到,writers这个map是其最核心的成员,其key是每个列,value是实际的pageWriter。这个 ColumnChunkPageWriter
是内部类,它继承了PageWriter接口。在后面需要讲到的ColumnWriter中,实际使用的pageWriter对象就是 ColumnChunkPageWriter
对象。

然后是columnStore,通过查看 parquetProperties.newColumnWriteStore()
方法:

  1. public ColumnWriteStore newColumnWriteStore(

  2.      MessageType schema,

  3.      PageWriteStore pageStore,

  4.      int pageSize) {

  5.    switch (writerVersion) {

  6.    case PARQUET_1_0:

  7.      return new ColumnWriteStoreV1(

  8.          pageStore,

  9.          pageSize,

  10.          dictionaryPageSizeThreshold,

  11.          enableDictionary, writerVersion);

  12.    case PARQUET_2_0:

  13.      return new ColumnWriteStoreV2(

  14.          schema,

  15.          pageStore,

  16.          pageSize,

  17.          new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary));

  18.    default:

  19.      throw new IllegalArgumentException("unknown version " + writerVersion);

  20.    }

  21. }

复制

Spark采用的是V1.0的writer,所以case会走入到 ColumnWriteStoreV1
,该对象里传入了刚刚讲到的pageStore。那我们来看看 ColumnWriteStoreV1
里的内容:

  1. private final Map<ColumnDescriptor, ColumnWriterV1> columns = new TreeMap<ColumnDescriptor, ColumnWriterV1>();


  2. public ColumnWriter getColumnWriter(ColumnDescriptor path) {

  3.  ColumnWriterV1 column = columns.get(path);

  4.  if (column == null) {

  5.    column = newMemColumn(path);

  6.    columns.put(path, column);

  7.  }

  8.  return column;

  9. }


  10. private ColumnWriterV1 newMemColumn(ColumnDescriptor path) {

  11.  PageWriter pageWriter = pageWriteStore.getPageWriter(path);

  12.  return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, dictionaryPageSizeThreshold, enableDictionary, writerVersion);

  13. }

复制

可以看到,与pageStore类似,columnStore同样也使用了一个map,key是列,value是 ColumnWriterV1
的对象,获取一个columnWriter时,先去map里找,如果没有,就调用 newMemColumn()
方法,从pageStore的map里找到对应的pageWriter,赋给new的ColumnWriterV1,并且加入到columnStore的map中。

最后是MessageColumnIO,它的核心成员是 List<PrimitiveColumnIO>leaves
,会保存每列的基本信息,如列名等。

writeSupport.prepareForWrite(columnIO.getRecordWriter(columnStore));
这句非常关键, columnIO.getRecordWriter(columnStore)
会返回一个 MessageColumnIORecordConsumer
类的对象,该类在上文Spark部分最后有提到,其有一个成员是 ColumnWriter[]
,也就是说该类会保存所有的实际的writer,实际使用也就是我们上文提到的 ColumnWriterV1
们。

In RecordConsumer & ColumnWriter

回到 ParquetRecordWriter
来,其overwrite的写方法,就是调用 InternalParquetRecordWriter
的写方法,然后 InternalParquetRecordWriter
的写方法又调用writeSupport的写方法,在Spark的 ParquetWriteSupport
对象中,会发现最后调用的是 recordConsumer
addX
方法(如 addBoolean()
, addInteger()
),而 recordConsumer
实际对象是 MessageColumnIORecordConsumer
,那么我们就先看一下 addInteger()

  1. @Override

  2. public void addInteger(int value) {

  3.  if (DEBUG) log("addInt(" + value + ")");

  4.  emptyField = false;

  5.  getColumnWriter().write(value, r[currentLevel], currentColumnIO.getDefinitionLevel());


  6.  setRepetitionLevel();

  7.  if (DEBUG) printState();

  8. }

复制

可以看到,在获取实际的ColumnWriter后进行写,我们知道这里用的是 ColumnWriterV1
,遂进入到该类查看:

  1. @Override

  2. public void write(int value, int repetitionLevel, int definitionLevel) {

  3.  if (DEBUG) log(value, repetitionLevel, definitionLevel);

  4.  repetitionLevelColumn.writeInteger(repetitionLevel);

  5.  definitionLevelColumn.writeInteger(definitionLevel);

  6.  dataColumn.writeInteger(value);

  7.  updateStatistics(value);

  8.  accountForValueWritten();

  9. }

复制

写入到都是缓存,关键在最后一句, accountForValueWritten()
,查看该方法:

  1.  /**

  2.   * Counts how many values have been written and checks the memory usage to flush the page when we reach the page threshold.

  3.   *

  4.   * We measure the memory used when we reach the mid point toward our estimated count.

  5.   * We then update the estimate and flush the page if we reached the threshold.

  6.   *

  7.   * That way we check the memory size log2(n) times.

  8.   *

  9.   */

  10.  private void accountForValueWritten() {

  11.    ++ valueCount;

  12.    if (valueCount > valueCountForNextSizeCheck) {

  13.      // not checking the memory used for every value

  14.      long memSize = repetitionLevelColumn.getBufferedSize()

  15.          + definitionLevelColumn.getBufferedSize()

  16.          + dataColumn.getBufferedSize();

  17.      if (memSize > pageSizeThreshold) {

  18.        // we will write the current page and check again the size at the predicted middle of next page

  19.        valueCountForNextSizeCheck = valueCount / 2;

  20.        writePage();

  21.      } else {

  22.        // not reached the threshold, will check again midway

  23.        valueCountForNextSizeCheck = (int)(valueCount + ((float)valueCount * pageSizeThreshold / memSize)) / 2 + 1;

  24.      }

  25.    }

  26.  }

复制

这个方法实际就是在写入每个值以后判断是否到达阈值然后调用 writePage()
,查看该方法:

  1.  private void writePage() {

  2.    if (DEBUG) LOG.debug("write page");

  3.    try {

  4.      pageWriter.writePage(

  5.          concat(repetitionLevelColumn.getBytes(), definitionLevelColumn.getBytes(), dataColumn.getBytes()),

  6.          valueCount,

  7.          statistics,

  8.          repetitionLevelColumn.getEncoding(),

  9.          definitionLevelColumn.getEncoding(),

  10.          dataColumn.getEncoding());

  11.    } catch (IOException e) {

  12.      throw new ParquetEncodingException("could not write page for " + path, e);

  13.    }

  14.    repetitionLevelColumn.reset();

  15.    definitionLevelColumn.reset();

  16.    dataColumn.reset();

  17.    valueCount = 0;

  18.    resetStatistics();

  19.  }

复制

可以看到是调用pageWriter来写page,上文已经讲到过,实际使用的是 ColumnChunkPageWriter
,查看其 writePage()

  1.    @Override

  2.    public void writePage(BytesInput bytes,

  3.                          int valueCount,

  4.                          Statistics statistics,

  5.                          Encoding rlEncoding,

  6.                          Encoding dlEncoding,

  7.                          Encoding valuesEncoding) throws IOException {

  8.      long uncompressedSize = bytes.size();

  9.      if (uncompressedSize > Integer.MAX_VALUE) {

  10.        throw new ParquetEncodingException(

  11.            "Cannot write page larger than Integer.MAX_VALUE bytes: " +

  12.            uncompressedSize);

  13.      }

  14.      BytesInput compressedBytes = compressor.compress(bytes);

  15.      long compressedSize = compressedBytes.size();

  16.      if (compressedSize > Integer.MAX_VALUE) {

  17.        throw new ParquetEncodingException(

  18.            "Cannot write compressed page larger than Integer.MAX_VALUE bytes: "

  19.            + compressedSize);

  20.      }

  21.      tempOutputStream.reset();

  22.      parquetMetadataConverter.writeDataPageHeader(

  23.          (int)uncompressedSize,

  24.          (int)compressedSize,

  25.          valueCount,

  26.          statistics,

  27.          rlEncoding,

  28.          dlEncoding,

  29.          valuesEncoding,

  30.          tempOutputStream);

  31.      this.uncompressedLength += uncompressedSize;

  32.      this.compressedLength += compressedSize;

  33.      this.totalValueCount += valueCount;

  34.      this.pageCount += 1;

  35.      this.totalStatistics.mergeStatistics(statistics);

  36.      // by concatenating before collecting instead of collecting twice,

  37.      // we only allocate one buffer to copy into instead of multiple.

  38.      buf.collect(BytesInput.concat(BytesInput.from(tempOutputStream), compressedBytes));

  39.      encodings.add(rlEncoding);

  40.      encodings.add(dlEncoding);

  41.      encodings.add(valuesEncoding);

  42.    }

复制

这里可以清楚看到,在压缩page的byte以后,先写page头,再写page内容。这个时候还是在缓存里的。

In ParquetFileWriter

回到 InternalParquetRecordWriter
,该 write()
方法与pageWriter类似,最后一句都有一个check的方法:

  1.  public void write(T value) throws IOException, InterruptedException {

  2.    writeSupport.write(value);

  3.    ++ recordCount;

  4.    checkBlockSizeReached();

  5.  }

复制

不同的是,该check是check是否要写入到文件中:

  1. private void checkBlockSizeReached() throws IOException {

  2.  if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record.

  3.    long memSize = columnStore.getBufferedSize();

  4.    long recordSize = memSize / recordCount;

  5.    // flush the row group if it is within ~2 records of the limit

  6.    // it is much better to be slightly under size than to be over at all

  7.    if (memSize > (nextRowGroupSize - 2 * recordSize)) {

  8.      LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, nextRowGroupSize, recordCount));

  9.      flushRowGroupToStore();

  10.      initStore();

  11.      recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);

  12.      this.lastRowGroupEndPos = parquetFileWriter.getPos();

  13.    } else {

  14.      recordCountForNextMemCheck = min(

  15.          max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(nextRowGroupSize / ((float)recordSize))) / 2), // will check halfway

  16.          recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead

  17.          );

  18.      if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount, recordCountForNextMemCheck));

  19.    }

  20.  }

  21. }


  22. private void flushRowGroupToStore()

  23.    throws IOException {

  24.  LOG.info(format("Flushing mem columnStore to file. allocated memory: %,d", columnStore.getAllocatedSize()));

  25.  if (columnStore.getAllocatedSize() > (3 * rowGroupSizeThreshold)) {

  26.    LOG.warn("Too much memory used: " + columnStore.memUsageString());

  27.  }


  28.  if (recordCount > 0) {

  29.    parquetFileWriter.startBlock(recordCount);

  30.    columnStore.flush();

  31.    pageStore.flushToFileWriter(parquetFileWriter);

  32.    recordCount = 0;

  33.    parquetFileWriter.endBlock();

  34.    this.nextRowGroupSize = Math.min(

  35.        parquetFileWriter.getNextRowGroupSize(),

  36.        rowGroupSizeThreshold);

  37.  }


  38.  columnStore = null;

  39.  pageStore = null;

  40. }

复制

可以看到,调用了 ParquetFileWriter
写到文件中:

  1.  /**

  2.   * writes a number of pages at once

  3.   * @param bytes bytes to be written including page headers

  4.   * @param uncompressedTotalPageSize total uncompressed size (without page headers)

  5.   * @param compressedTotalPageSize total compressed size (without page headers)

  6.   * @throws IOException

  7.   */

  8.   void writeDataPages(BytesInput bytes,

  9.                       long uncompressedTotalPageSize,

  10.                       long compressedTotalPageSize,

  11.                       Statistics totalStats,

  12.                       List<Encoding> encodings) throws IOException {

  13.    state = state.write();

  14.    if (DEBUG) LOG.debug(out.getPos() + ": write data pages");

  15.    long headersSize = bytes.size() - compressedTotalPageSize;

  16.    this.uncompressedLength += uncompressedTotalPageSize + headersSize;

  17.    this.compressedLength += compressedTotalPageSize + headersSize;

  18.    if (DEBUG) LOG.debug(out.getPos() + ": write data pages content");

  19.    bytes.writeAllTo(out);

  20.    currentEncodings.addAll(encodings);

  21.    currentStatistics = totalStats;

  22.  }

复制

close

当所有的rowgroup中的page都写完,最后要写footer部分,这时候我们看到之前 InternalParquetRecordWriter
中的close方法:

  1.  public void close() throws IOException, InterruptedException {

  2.    flushRowGroupToStore();

  3.    parquetFileWriter.end(extraMetaData);

  4.  }

复制

会调用 parquetFileWriter
end()
方法进行最终的footer写,这里就不再赘述。


文章转载自Orison的孤独奋斗,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论