1
0

[HUDI-3934] Fix Spark32HoodieParquetFileFormat not being compatible w/ Spark 3.2.0 (#5378)

- Due to the fact that Spark 3.2.1 is non-BWC w/ 3.2.0, we have to handle all these incompatibilities in Spark32HoodieParquetFileFormat. This PR is addressing that.

Co-authored-by: Raymond Xu <2701446+xushiyan@users.noreply.github.com>
This commit is contained in:
Alexey Kudinkin
2022-04-21 18:00:38 -07:00
committed by GitHub
parent c4bc2deea0
commit c05a4e7b6f
5 changed files with 229 additions and 48 deletions

View File

@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.parquet
import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.util.Utils
object Spark32DataSourceUtils {
/**
* NOTE: This method was copied from Spark 3.2.0, and is required to maintain runtime
* compatibility against Spark 3.2.0
*/
// scalastyle:off
def int96RebaseMode(lookupFileMeta: String => String,
modeByConfig: String): LegacyBehaviorPolicy.Value = {
if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
return LegacyBehaviorPolicy.CORRECTED
}
// If there is no version, we return the mode specified by the config.
Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
// Files written by Spark 3.0 and earlier follow the legacy hybrid calendar and we need to
// rebase the INT96 timestamp values.
// Files written by Spark 3.1 and latter may also need the rebase if they were written with
// the "LEGACY" rebase mode.
if (version < "3.1.0" || lookupFileMeta("org.apache.spark.legacyINT96") != null) {
LegacyBehaviorPolicy.LEGACY
} else {
LegacyBehaviorPolicy.CORRECTED
}
}.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
}
// scalastyle:on
/**
* NOTE: This method was copied from Spark 3.2.0, and is required to maintain runtime
* compatibility against Spark 3.2.0
*/
// scalastyle:off
def datetimeRebaseMode(lookupFileMeta: String => String,
modeByConfig: String): LegacyBehaviorPolicy.Value = {
if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
return LegacyBehaviorPolicy.CORRECTED
}
// If there is no version, we return the mode specified by the config.
Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
// Files written by Spark 2.4 and earlier follow the legacy hybrid calendar and we need to
// rebase the datetime values.
// Files written by Spark 3.0 and latter may also need the rebase if they were written with
// the "LEGACY" rebase mode.
if (version < "3.0.0" || lookupFileMeta("org.apache.spark.legacyDateTime") != null) {
LegacyBehaviorPolicy.LEGACY
} else {
LegacyBehaviorPolicy.CORRECTED
}
}.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
}
// scalastyle:on
}

View File

@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.FileSplit
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.util.InternalSchemaCache
@@ -37,10 +38,10 @@ import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader}
import org.apache.spark.TaskContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat.{pruneInternalSchema, rebuildFilterFromParquet}
import org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat._
import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
@@ -148,8 +149,8 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
val shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) && querySchemaOption.isPresent
val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
val fileSchema = if (shouldUseInternalSchema) {
val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits)
} else {
@@ -158,21 +159,38 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
lazy val footerFileMetaData =
ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
footerFileMetaData.getKeyValueMetaData.get,
datetimeRebaseModeInRead)
// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = new ParquetFilters(
parquetSchema,
pushDownDate,
pushDownTimestamp,
pushDownDecimal,
pushDownStringStartWith,
pushDownInFilterThreshold,
isCaseSensitive,
datetimeRebaseSpec)
val parquetFilters = if (HoodieSparkUtils.gteqSpark3_2_1) {
// NOTE: Below code could only be compiled against >= Spark 3.2.1,
// and unfortunately won't compile against Spark 3.2.0
// However this code is runtime-compatible w/ both Spark 3.2.0 and >= Spark 3.2.1
val datetimeRebaseSpec =
DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
new ParquetFilters(
parquetSchema,
pushDownDate,
pushDownTimestamp,
pushDownDecimal,
pushDownStringStartWith,
pushDownInFilterThreshold,
isCaseSensitive,
datetimeRebaseSpec)
} else {
// Spark 3.2.0
val datetimeRebaseMode =
Spark32DataSourceUtils.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
createParquetFilters(
parquetSchema,
pushDownDate,
pushDownTimestamp,
pushDownDecimal,
pushDownStringStartWith,
pushDownInFilterThreshold,
isCaseSensitive,
datetimeRebaseMode)
}
filters.map(rebuildFilterFromParquet(_, fileSchema, querySchemaOption.orElse(null)))
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
@@ -198,21 +216,21 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
None
}
val int96RebaseSpec = DataSourceUtils.int96RebaseSpec(
footerFileMetaData.getKeyValueMetaData.get,
int96RebaseModeInRead)
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
// Clone new conf
val hadoopAttemptConf = new Configuration(broadcastedHadoopConf.value.value)
var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap()
if (shouldUseInternalSchema) {
val typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = if (shouldUseInternalSchema) {
val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema()
val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema)
hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json)
SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema)
} else {
new java.util.HashMap()
}
val hadoopAttemptContext =
new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)
@@ -225,6 +243,10 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
if (enableVectorizedReader) {
val vectorizedReader =
if (shouldUseInternalSchema) {
val int96RebaseSpec =
DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
val datetimeRebaseSpec =
DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
new Spark32HoodieVectorizedParquetRecordReader(
convertTz.orNull,
datetimeRebaseSpec.mode.toString,
@@ -234,7 +256,14 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
enableOffHeapColumnVector && taskContext.isDefined,
capacity,
typeChangeInfos)
} else {
} else if (HoodieSparkUtils.gteqSpark3_2_1) {
// NOTE: Below code could only be compiled against >= Spark 3.2.1,
// and unfortunately won't compile against Spark 3.2.0
// However this code is runtime-compatible w/ both Spark 3.2.0 and >= Spark 3.2.1
val int96RebaseSpec =
DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
val datetimeRebaseSpec =
DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
new VectorizedParquetRecordReader(
convertTz.orNull,
datetimeRebaseSpec.mode.toString,
@@ -243,7 +272,20 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
int96RebaseSpec.timeZone,
enableOffHeapColumnVector && taskContext.isDefined,
capacity)
} else {
// Spark 3.2.0
val datetimeRebaseMode =
Spark32DataSourceUtils.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
val int96RebaseMode =
Spark32DataSourceUtils.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
createVectorizedParquetRecordReader(
convertTz.orNull,
datetimeRebaseMode.toString,
int96RebaseMode.toString,
enableOffHeapColumnVector && taskContext.isDefined,
capacity)
}
// SPARK-37089: We cannot register a task completion listener to close this iterator here
// because downstream exec nodes have already registered their listeners. Since listeners
// are executed in reverse order of registration, a listener registered here would close the
@@ -279,12 +321,32 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
}
} else {
logDebug(s"Falling back to parquet-mr")
// ParquetRecordReader returns InternalRow
val readSupport = new ParquetReadSupport(
convertTz,
enableVectorizedReader = false,
datetimeRebaseSpec,
int96RebaseSpec)
val readSupport = if (HoodieSparkUtils.gteqSpark3_2_1) {
// ParquetRecordReader returns InternalRow
// NOTE: Below code could only be compiled against >= Spark 3.2.1,
// and unfortunately won't compile against Spark 3.2.0
// However this code is runtime-compatible w/ both Spark 3.2.0 and >= Spark 3.2.1
val int96RebaseSpec =
DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
val datetimeRebaseSpec =
DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
new ParquetReadSupport(
convertTz,
enableVectorizedReader = false,
datetimeRebaseSpec,
int96RebaseSpec)
} else {
val datetimeRebaseMode =
Spark32DataSourceUtils.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
val int96RebaseMode =
Spark32DataSourceUtils.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
createParquetReadSupport(
convertTz,
/* enableVectorizedReader = */ false,
datetimeRebaseMode,
int96RebaseMode)
}
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
@@ -332,10 +394,47 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
}
}
}
}
object Spark32HoodieParquetFileFormat {
/**
* NOTE: This method is specific to Spark 3.2.0
*/
private def createParquetFilters(args: Any*): ParquetFilters = {
// NOTE: ParquetFilters ctor args contain Scala enum, therefore we can't look it
// up by arg types, and have to instead rely on the number of args based on individual class;
// the ctor order is not guaranteed
val ctor = classOf[ParquetFilters].getConstructors.maxBy(_.getParameterCount)
ctor.newInstance(args.map(_.asInstanceOf[AnyRef]): _*)
.asInstanceOf[ParquetFilters]
}
/**
* NOTE: This method is specific to Spark 3.2.0
*/
private def createParquetReadSupport(args: Any*): ParquetReadSupport = {
// NOTE: ParquetReadSupport ctor args contain Scala enum, therefore we can't look it
// up by arg types, and have to instead rely on the number of args based on individual class;
// the ctor order is not guaranteed
val ctor = classOf[ParquetReadSupport].getConstructors.maxBy(_.getParameterCount)
ctor.newInstance(args.map(_.asInstanceOf[AnyRef]): _*)
.asInstanceOf[ParquetReadSupport]
}
/**
* NOTE: This method is specific to Spark 3.2.0
*/
private def createVectorizedParquetRecordReader(args: Any*): VectorizedParquetRecordReader = {
// NOTE: ParquetReadSupport ctor args contain Scala enum, therefore we can't look it
// up by arg types, and have to instead rely on the number of args based on individual class;
// the ctor order is not guaranteed
val ctor = classOf[VectorizedParquetRecordReader].getConstructors.maxBy(_.getParameterCount)
ctor.newInstance(args.map(_.asInstanceOf[AnyRef]): _*)
.asInstanceOf[VectorizedParquetRecordReader]
}
def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = {
val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
if (querySchemaOption.isPresent && requiredSchema.nonEmpty) {