Spark层面
Spark层面我们从 dataframe.write.parquet("path")
说起。
In DataFrame
Dataset中的 write()
方法:
/**
* Interface for saving the content of the non-streaming Dataset out into external storage.
*
* @group basic
* @since 1.6.0
*/
def write: DataFrameWriter[T] = {
if (isStreaming) {
logicalPlan.failAnalysis(
"'write' can not be called on streaming Dataset/DataFrame")
}
new DataFrameWriter[T](this)
}
复制
DataFrameWriter中的 parquet()
方法:
def parquet(path: String): Unit = {
format("parquet").save(path)
}
复制
可以看到,结果其实就是调用这个save方法,跟踪到后面,会发现最后调用到DataSource这个类里:
/**
* Writes the given [[DataFrame]] out to this [[DataSource]].
*/
def write(mode: SaveMode, data: DataFrame): Unit = {
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
throw new AnalysisException("Cannot save interval data type into external storage.")
}
providingClass.newInstance() match {
case dataSource: CreatableRelationProvider =>
dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)
case format: FileFormat =>
writeInFileFormat(format, mode, data)
case _ =>
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
}
}
复制
In DataSource
可以看到,写Parquet会在 FileFormat
这个case里, FileFormat
是个接口类,parquet的继承类是 ParquetFileFormat
。我们继续看 writeInFileFormat()
这个方法:
/**
* Writes the given [[DataFrame]] out in this [[FileFormat]].
*/
private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: DataFrame): Unit = {
...
...
val plan =
InsertIntoHadoopFsRelationCommand(
outputPath = outputPath,
staticPartitionKeys = Map.empty,
customPartitionLocations = Map.empty,
partitionColumns = columns,
bucketSpec = bucketSpec,
fileFormat = format,
refreshFunction = _ => Unit, // No existing table needs to be refreshed.
options = options,
query = data.logicalPlan,
mode = mode,
catalogTable = catalogTable)
sparkSession.sessionState.executePlan(plan).toRdd
}
复制
调用到了 InsertIntoHadoopFsRelationCommand
这个类,在其继承的 run()
方法里,有这样一段代码:
FileFormatWriter.write(
sparkSession = sparkSession,
queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
fileFormat = fileFormat,
committer = committer,
outputSpec = FileFormatWriter.OutputSpec(
qualifiedOutputPath.toString, customPartitionLocations),
hadoopConf = hadoopConf,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
refreshFunction = refreshFunction,
options = options)
复制
跳转到这个write方法:
def write(
sparkSession: SparkSession,
queryExecution: QueryExecution,
fileFormat: FileFormat,
committer: FileCommitProtocol,
outputSpec: OutputSpec,
hadoopConf: Configuration,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
refreshFunction: (Seq[TablePartitionSpec]) => Unit,
options: Map[String, String]): Unit = {
...
...
val outputWriterFactory =
fileFormat.prepareWrite(sparkSession, job, options, dataColumns.toStructType)
}
复制
由于这个fileFormat是 ParquetFileFormat
的实例,所以我们直接看 ParquetFileFormat.prepareWrite
方法:
override def prepareWrite(
sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
new OutputWriterFactory {
// This OutputWriterFactory instance is deserialized when writing Parquet files on the
// executor side without constructing or deserializing ParquetFileFormat. Therefore, we hold
// another reference to ParquetLogRedirector.INSTANCE here to ensure the latter class is
// initialized.
private val parquetLogRedirector = ParquetLogRedirector.INSTANCE
override def newInstance(
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
new ParquetOutputWriter(path, context)
}
override def getFileExtension(context: TaskAttemptContext): String = {
CodecConfig.from(context).getCodec.getExtension + ".parquet"
}
}
}
复制
可以看到,返回的OutputWriterFactory实例是 ParquetOutputWriter
,在 ParquetOutputWriter
里又调用到RecordWriter的实例 ParquetRecordWriter<InternalRow>
,其中的writeSupport类实例是Spark自己写的 ParquetWriteSupport
类,该类主要将Spark的InternalRow类里的字段分别使用Parquet的columnWriter去写。
如何做到的,看 ParquetWriteSupport
类中的 consumeField()
方法:
private def consumeField(field: String, index: Int)(f: => Unit): Unit = {
recordConsumer.startField(field, index)
f
recordConsumer.endField(field, index)
}
复制
recordConsumer的实例在这里是 MessageColumnIORecordConsumer
类的对象。该类是Parquet的类。
同时,在 ParquetOutputWriter
类里, 写
的方法有着以下代码:
// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
private[parquet] class ParquetOutputWriter(path: String, context: TaskAttemptContext)
extends OutputWriter {
private val recordWriter: RecordWriter[Void, InternalRow] = {
new ParquetOutputFormat[InternalRow]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
new Path(path)
}
}.getRecordWriter(context)
}
override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
override def writeInternal(row: InternalRow): Unit = recordWriter.write(null, row)
override def close(): Unit = recordWriter.close(context)
}
复制
可以看到,实际的writer是 ParquetOutputFormat
类,该类也是Parquet的类,从这里我们开始进入到Parquet源代码。
Parquet层面
Parquet的写,最小到page层面,每个压缩也是在page层面。具体做法是,缓存在JVM中,当到达一个阈值后,flush到File中。接下来看看如何在代码中实现。
In ParquetOutputFormat
上文讲到Parquet-Hadoop模块的 ParquetOutputFormat
类。查看该类的getRecordWriter类代码:
public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, CompressionCodecName codec)
throws IOException, InterruptedException {
final WriteSupport<T> writeSupport = getWriteSupport(conf);
CodecFactory codecFactory = new CodecFactory(conf);
long blockSize = getLongBlockSize(conf);
if (INFO) LOG.info("Parquet block size to " + blockSize);
int pageSize = getPageSize(conf);
if (INFO) LOG.info("Parquet page size to " + pageSize);
int dictionaryPageSize = getDictionaryPageSize(conf);
if (INFO) LOG.info("Parquet dictionary page size to " + dictionaryPageSize);
boolean enableDictionary = getEnableDictionary(conf);
if (INFO) LOG.info("Dictionary is " + (enableDictionary ? "on" : "off"));
boolean validating = getValidation(conf);
if (INFO) LOG.info("Validation is " + (validating ? "on" : "off"));
WriterVersion writerVersion = getWriterVersion(conf);
if (INFO) LOG.info("Writer version is: " + writerVersion);
int maxPaddingSize = getMaxPaddingSize(conf);
if (INFO) LOG.info("Maximum row group padding size is " + maxPaddingSize + " bytes");
WriteContext init = writeSupport.init(conf);
ParquetFileWriter w = new ParquetFileWriter(
conf, init.getSchema(), file, Mode.CREATE, blockSize, maxPaddingSize);
w.start();
float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO,
MemoryManager.DEFAULT_MEMORY_POOL_RATIO);
long minAllocation = conf.getLong(ParquetOutputFormat.MIN_MEMORY_ALLOCATION,
MemoryManager.DEFAULT_MIN_MEMORY_ALLOCATION);
if (memoryManager == null) {
memoryManager = new MemoryManager(maxLoad, minAllocation);
} else if (memoryManager.getMemoryPoolRatio() != maxLoad) {
LOG.warn("The configuration " + MEMORY_POOL_RATIO + " has been set. It should not " +
"be reset by the new value: " + maxLoad);
}
return new ParquetRecordWriter<T>(
w,
writeSupport,
init.getSchema(),
init.getExtraMetaData(),
blockSize, pageSize,
codecFactory.getCompressor(codec, pageSize),
dictionaryPageSize,
enableDictionary,
validating,
writerVersion,
memoryManager);
}
复制
可以看到,从conf里获取里blockSize(其实就是rowGroupSize,Parquet代码里把row group称作parquet block),pageSize等参数。然后初始化ParquetFileWriter,最后返回ParquetRecordWriter。其中的writeSupport就是spark自己写的 ParquetWriteSupport
。
In ParquetRecordWriter
public ParquetRecordWriter(
ParquetFileWriter w,
WriteSupport<T> writeSupport,
MessageType schema,
Map<String, String> extraMetaData,
long blockSize, int pageSize,
BytesCompressor compressor,
int dictionaryPageSize,
boolean enableDictionary,
boolean validating,
WriterVersion writerVersion,
MemoryManager memoryManager) {
internalWriter = new InternalParquetRecordWriter<T>(w, writeSupport, schema,
extraMetaData, blockSize, pageSize, compressor, dictionaryPageSize, enableDictionary,
validating, writerVersion);
this.memoryManager = checkNotNull(memoryManager, "memoryManager");
memoryManager.addWriter(internalWriter, blockSize);
}
@Override
public void write(Void key, T value) throws IOException, InterruptedException {
internalWriter.write(value);
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
internalWriter.close();
if (memoryManager != null) {
memoryManager.removeWriter(internalWriter);
}
}
复制
可以看到,在该类里new了一个InternalParquetRecordWriter的对象。internalWriter和writeSupport这也是MR的一贯写法。
In InternalParquetRecordWriter
public InternalParquetRecordWriter(
ParquetFileWriter parquetFileWriter,
WriteSupport<T> writeSupport,
MessageType schema,
Map<String, String> extraMetaData,
long rowGroupSize,
int pageSize,
BytesCompressor compressor,
int dictionaryPageSize,
boolean enableDictionary,
boolean validating,
WriterVersion writerVersion) {
this.parquetFileWriter = parquetFileWriter;
this.writeSupport = checkNotNull(writeSupport, "writeSupport");
this.schema = schema;
this.extraMetaData = extraMetaData;
this.rowGroupSize = rowGroupSize;
this.rowGroupSizeThreshold = rowGroupSize;
this.nextRowGroupSize = rowGroupSizeThreshold;
this.pageSize = pageSize;
this.compressor = compressor;
this.validating = validating;
this.parquetProperties = new ParquetProperties(dictionaryPageSize, writerVersion, enableDictionary);
initStore();
}
private void initStore() {
pageStore = new ColumnChunkPageWriteStore(compressor, schema, pageSize);
columnStore = parquetProperties.newColumnWriteStore(
schema,
pageStore,
pageSize);
MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
writeSupport.prepareForWrite(columnIO.getRecordWriter(columnStore));
}
复制
在初始化的时候,会初始化三个对象:pageStore,columnStore,MessageColumnIO。这三个对象是干嘛用的呢?
首先是pageStore,其是 ColumnChunkPageWriteStore
的实例,是pageWriter和column的集合,看其主要成员以及构造方法:
private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>();
private final MessageType schema;
public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int pageSize) {
this.schema = schema;
for (ColumnDescriptor path : schema.getColumns()) {
writers.put(path, new ColumnChunkPageWriter(path, compressor, pageSize));
}
}
复制
可以看到,writers这个map是其最核心的成员,其key是每个列,value是实际的pageWriter。这个 ColumnChunkPageWriter
是内部类,它继承了PageWriter接口。在后面需要讲到的ColumnWriter中,实际使用的pageWriter对象就是 ColumnChunkPageWriter
对象。
然后是columnStore,通过查看 parquetProperties.newColumnWriteStore()
方法:
public ColumnWriteStore newColumnWriteStore(
MessageType schema,
PageWriteStore pageStore,
int pageSize) {
switch (writerVersion) {
case PARQUET_1_0:
return new ColumnWriteStoreV1(
pageStore,
pageSize,
dictionaryPageSizeThreshold,
enableDictionary, writerVersion);
case PARQUET_2_0:
return new ColumnWriteStoreV2(
schema,
pageStore,
pageSize,
new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary));
default:
throw new IllegalArgumentException("unknown version " + writerVersion);
}
}
复制
Spark采用的是V1.0的writer,所以case会走入到 ColumnWriteStoreV1
,该对象里传入了刚刚讲到的pageStore。那我们来看看 ColumnWriteStoreV1
里的内容:
private final Map<ColumnDescriptor, ColumnWriterV1> columns = new TreeMap<ColumnDescriptor, ColumnWriterV1>();
public ColumnWriter getColumnWriter(ColumnDescriptor path) {
ColumnWriterV1 column = columns.get(path);
if (column == null) {
column = newMemColumn(path);
columns.put(path, column);
}
return column;
}
private ColumnWriterV1 newMemColumn(ColumnDescriptor path) {
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, dictionaryPageSizeThreshold, enableDictionary, writerVersion);
}
复制
可以看到,与pageStore类似,columnStore同样也使用了一个map,key是列,value是 ColumnWriterV1
的对象,获取一个columnWriter时,先去map里找,如果没有,就调用 newMemColumn()
方法,从pageStore的map里找到对应的pageWriter,赋给new的ColumnWriterV1,并且加入到columnStore的map中。
最后是MessageColumnIO,它的核心成员是 List<PrimitiveColumnIO>leaves
,会保存每列的基本信息,如列名等。
writeSupport.prepareForWrite(columnIO.getRecordWriter(columnStore));
这句非常关键, columnIO.getRecordWriter(columnStore)
会返回一个 MessageColumnIORecordConsumer
类的对象,该类在上文Spark部分最后有提到,其有一个成员是 ColumnWriter[]
,也就是说该类会保存所有的实际的writer,实际使用也就是我们上文提到的 ColumnWriterV1
们。
In RecordConsumer & ColumnWriter
回到 ParquetRecordWriter
来,其overwrite的写方法,就是调用 InternalParquetRecordWriter
的写方法,然后 InternalParquetRecordWriter
的写方法又调用writeSupport的写方法,在Spark的 ParquetWriteSupport
对象中,会发现最后调用的是 recordConsumer
的 addX
方法(如 addBoolean()
, addInteger()
),而 recordConsumer
实际对象是 MessageColumnIORecordConsumer
,那么我们就先看一下 addInteger()
:
@Override
public void addInteger(int value) {
if (DEBUG) log("addInt(" + value + ")");
emptyField = false;
getColumnWriter().write(value, r[currentLevel], currentColumnIO.getDefinitionLevel());
setRepetitionLevel();
if (DEBUG) printState();
}
复制
可以看到,在获取实际的ColumnWriter后进行写,我们知道这里用的是 ColumnWriterV1
,遂进入到该类查看:
@Override
public void write(int value, int repetitionLevel, int definitionLevel) {
if (DEBUG) log(value, repetitionLevel, definitionLevel);
repetitionLevelColumn.writeInteger(repetitionLevel);
definitionLevelColumn.writeInteger(definitionLevel);
dataColumn.writeInteger(value);
updateStatistics(value);
accountForValueWritten();
}
复制
写入到都是缓存,关键在最后一句, accountForValueWritten()
,查看该方法:
/**
* Counts how many values have been written and checks the memory usage to flush the page when we reach the page threshold.
*
* We measure the memory used when we reach the mid point toward our estimated count.
* We then update the estimate and flush the page if we reached the threshold.
*
* That way we check the memory size log2(n) times.
*
*/
private void accountForValueWritten() {
++ valueCount;
if (valueCount > valueCountForNextSizeCheck) {
// not checking the memory used for every value
long memSize = repetitionLevelColumn.getBufferedSize()
+ definitionLevelColumn.getBufferedSize()
+ dataColumn.getBufferedSize();
if (memSize > pageSizeThreshold) {
// we will write the current page and check again the size at the predicted middle of next page
valueCountForNextSizeCheck = valueCount / 2;
writePage();
} else {
// not reached the threshold, will check again midway
valueCountForNextSizeCheck = (int)(valueCount + ((float)valueCount * pageSizeThreshold / memSize)) / 2 + 1;
}
}
}
复制
这个方法实际就是在写入每个值以后判断是否到达阈值然后调用 writePage()
,查看该方法:
private void writePage() {
if (DEBUG) LOG.debug("write page");
try {
pageWriter.writePage(
concat(repetitionLevelColumn.getBytes(), definitionLevelColumn.getBytes(), dataColumn.getBytes()),
valueCount,
statistics,
repetitionLevelColumn.getEncoding(),
definitionLevelColumn.getEncoding(),
dataColumn.getEncoding());
} catch (IOException e) {
throw new ParquetEncodingException("could not write page for " + path, e);
}
repetitionLevelColumn.reset();
definitionLevelColumn.reset();
dataColumn.reset();
valueCount = 0;
resetStatistics();
}
复制
可以看到是调用pageWriter来写page,上文已经讲到过,实际使用的是 ColumnChunkPageWriter
,查看其 writePage()
:
@Override
public void writePage(BytesInput bytes,
int valueCount,
Statistics statistics,
Encoding rlEncoding,
Encoding dlEncoding,
Encoding valuesEncoding) throws IOException {
long uncompressedSize = bytes.size();
if (uncompressedSize > Integer.MAX_VALUE) {
throw new ParquetEncodingException(
"Cannot write page larger than Integer.MAX_VALUE bytes: " +
uncompressedSize);
}
BytesInput compressedBytes = compressor.compress(bytes);
long compressedSize = compressedBytes.size();
if (compressedSize > Integer.MAX_VALUE) {
throw new ParquetEncodingException(
"Cannot write compressed page larger than Integer.MAX_VALUE bytes: "
+ compressedSize);
}
tempOutputStream.reset();
parquetMetadataConverter.writeDataPageHeader(
(int)uncompressedSize,
(int)compressedSize,
valueCount,
statistics,
rlEncoding,
dlEncoding,
valuesEncoding,
tempOutputStream);
this.uncompressedLength += uncompressedSize;
this.compressedLength += compressedSize;
this.totalValueCount += valueCount;
this.pageCount += 1;
this.totalStatistics.mergeStatistics(statistics);
// by concatenating before collecting instead of collecting twice,
// we only allocate one buffer to copy into instead of multiple.
buf.collect(BytesInput.concat(BytesInput.from(tempOutputStream), compressedBytes));
encodings.add(rlEncoding);
encodings.add(dlEncoding);
encodings.add(valuesEncoding);
}
复制
这里可以清楚看到,在压缩page的byte以后,先写page头,再写page内容。这个时候还是在缓存里的。
In ParquetFileWriter
回到 InternalParquetRecordWriter
,该 write()
方法与pageWriter类似,最后一句都有一个check的方法:
public void write(T value) throws IOException, InterruptedException {
writeSupport.write(value);
++ recordCount;
checkBlockSizeReached();
}
复制
不同的是,该check是check是否要写入到文件中:
private void checkBlockSizeReached() throws IOException {
if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record.
long memSize = columnStore.getBufferedSize();
long recordSize = memSize / recordCount;
// flush the row group if it is within ~2 records of the limit
// it is much better to be slightly under size than to be over at all
if (memSize > (nextRowGroupSize - 2 * recordSize)) {
LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, nextRowGroupSize, recordCount));
flushRowGroupToStore();
initStore();
recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
this.lastRowGroupEndPos = parquetFileWriter.getPos();
} else {
recordCountForNextMemCheck = min(
max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(nextRowGroupSize / ((float)recordSize))) / 2), // will check halfway
recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead
);
if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount, recordCountForNextMemCheck));
}
}
}
private void flushRowGroupToStore()
throws IOException {
LOG.info(format("Flushing mem columnStore to file. allocated memory: %,d", columnStore.getAllocatedSize()));
if (columnStore.getAllocatedSize() > (3 * rowGroupSizeThreshold)) {
LOG.warn("Too much memory used: " + columnStore.memUsageString());
}
if (recordCount > 0) {
parquetFileWriter.startBlock(recordCount);
columnStore.flush();
pageStore.flushToFileWriter(parquetFileWriter);
recordCount = 0;
parquetFileWriter.endBlock();
this.nextRowGroupSize = Math.min(
parquetFileWriter.getNextRowGroupSize(),
rowGroupSizeThreshold);
}
columnStore = null;
pageStore = null;
}
复制
可以看到,调用了 ParquetFileWriter
写到文件中:
/**
* writes a number of pages at once
* @param bytes bytes to be written including page headers
* @param uncompressedTotalPageSize total uncompressed size (without page headers)
* @param compressedTotalPageSize total compressed size (without page headers)
* @throws IOException
*/
void writeDataPages(BytesInput bytes,
long uncompressedTotalPageSize,
long compressedTotalPageSize,
Statistics totalStats,
List<Encoding> encodings) throws IOException {
state = state.write();
if (DEBUG) LOG.debug(out.getPos() + ": write data pages");
long headersSize = bytes.size() - compressedTotalPageSize;
this.uncompressedLength += uncompressedTotalPageSize + headersSize;
this.compressedLength += compressedTotalPageSize + headersSize;
if (DEBUG) LOG.debug(out.getPos() + ": write data pages content");
bytes.writeAllTo(out);
currentEncodings.addAll(encodings);
currentStatistics = totalStats;
}
复制
close
当所有的rowgroup中的page都写完,最后要写footer部分,这时候我们看到之前 InternalParquetRecordWriter
中的close方法:
public void close() throws IOException, InterruptedException {
flushRowGroupToStore();
parquetFileWriter.end(extraMetaData);
}
复制
会调用 parquetFileWriter
的 end()
方法进行最终的footer写,这里就不再赘述。