1
0

[HUDI-3935] Adding config to fallback to enabled Partition Values extraction from Partition path (#5377)

This commit is contained in:
Alexey Kudinkin
2022-04-21 01:36:19 -07:00
committed by GitHub
parent a9506aa545
commit 4b296f79cc
12 changed files with 133 additions and 61 deletions

View File

@@ -66,7 +66,7 @@ public class DataSourceInternalWriterHelper {
writeClient.startCommitWithTime(instantTime);
this.metaClient = HoodieTableMetaClient.builder().setConf(configuration).setBasePath(writeConfig.getBasePath()).build();
this.metaClient.validateTableProperties(writeConfig.getProps(), WriteOperationType.BULK_INSERT);
this.metaClient.validateTableProperties(writeConfig.getProps());
this.hoodieTable = HoodieSparkTable.create(writeConfig, new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), metaClient);
}

View File

@@ -114,21 +114,21 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
* rule; you can find more details in HUDI-3896)
*/
def toHadoopFsRelation: HadoopFsRelation = {
// We're delegating to Spark to append partition values to every row only in cases
// when these corresponding partition-values are not persisted w/in the data file itself
val shouldAppendPartitionColumns = shouldOmitPartitionColumns
val (tableFileFormat, formatClassName) = metaClient.getTableConfig.getBaseFileFormat match {
case HoodieFileFormat.PARQUET =>
(sparkAdapter.createHoodieParquetFileFormat(shouldAppendPartitionColumns).get, HoodieParquetFileFormat.FILE_FORMAT_ID)
case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
}
val (tableFileFormat, formatClassName) =
metaClient.getTableConfig.getBaseFileFormat match {
case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
case HoodieFileFormat.PARQUET =>
// We're delegating to Spark to append partition values to every row only in cases
// when these corresponding partition-values are not persisted w/in the data file itself
val parquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get
(parquetFileFormat, HoodieParquetFileFormat.FILE_FORMAT_ID)
}
if (globPaths.isEmpty) {
// NOTE: There are currently 2 ways partition values could be fetched:
// - Source columns (producing the values used for physical partitioning) will be read
// from the data file
// - Values parsed from the actual partition pat would be appended to the final dataset
// - Values parsed from the actual partition path would be appended to the final dataset
//
// In the former case, we don't need to provide the partition-schema to the relation,
// therefore we simply stub it w/ empty schema and use full table-schema as the one being
@@ -136,7 +136,7 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
//
// In the latter, we have to specify proper partition schema as well as "data"-schema, essentially
// being a table-schema with all partition columns stripped out
val (partitionSchema, dataSchema) = if (shouldAppendPartitionColumns) {
val (partitionSchema, dataSchema) = if (shouldExtractPartitionValuesFromPartitionPath) {
(fileIndex.partitionSchema, fileIndex.dataSchema)
} else {
(StructType(Nil), tableStructSchema)

View File

@@ -18,14 +18,16 @@
package org.apache.hudi
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.common.config.{ConfigProperty, HoodieConfig}
import org.apache.hudi.common.fs.ConsistencyGuardConfig
import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.util.Option
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
import org.apache.hudi.hive.util.ConfigUtils
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool, MultiPartKeysValueExtractor, NonPartitionedExtractor, SlashEncodedDayPartitionValueExtractor}
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.{ComplexKeyGenerator, CustomKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.sync.common.HoodieSyncConfig
@@ -45,6 +47,7 @@ import scala.language.implicitConversions
* Options supported for reading hoodie tables.
*/
object DataSourceReadOptions {
import DataSourceOptionsHelper._
val QUERY_TYPE_SNAPSHOT_OPT_VAL = "snapshot"
val QUERY_TYPE_READ_OPTIMIZED_OPT_VAL = "read_optimized"
@@ -124,6 +127,15 @@ object DataSourceReadOptions {
.withDocumentation("Enables data-skipping allowing queries to leverage indexes to reduce the search space by " +
"skipping over files")
val EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH: ConfigProperty[Boolean] =
ConfigProperty.key("hoodie.datasource.read.extract.partition.values.from.path")
.defaultValue(false)
.sinceVersion("0.11.0")
.withDocumentation("When set to true, values for partition columns (partition values) will be extracted" +
" from physical partition path (default Spark behavior). When set to false partition values will be" +
" read from the data file (in Hudi partition columns are persisted by default)." +
" This config is a fallback allowing to preserve existing behavior, and should not be used otherwise.")
val INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.incr.fallback.fulltablescan.enable")
.defaultValue("false")
@@ -185,6 +197,8 @@ object DataSourceReadOptions {
*/
object DataSourceWriteOptions {
import DataSourceOptionsHelper._
val BULK_INSERT_OPERATION_OPT_VAL = WriteOperationType.BULK_INSERT.value
val INSERT_OPERATION_OPT_VAL = WriteOperationType.INSERT.value
val UPSERT_OPERATION_OPT_VAL = WriteOperationType.UPSERT.value
@@ -471,10 +485,7 @@ object DataSourceWriteOptions {
.sinceVersion("0.9.0")
.withDocumentation("This class is used by kafka client to deserialize the records")
val DROP_PARTITION_COLUMNS: ConfigProperty[Boolean] = ConfigProperty
.key(HoodieTableConfig.DROP_PARTITION_COLUMNS.key())
.defaultValue(HoodieTableConfig.DROP_PARTITION_COLUMNS.defaultValue().booleanValue())
.withDocumentation(HoodieTableConfig.DROP_PARTITION_COLUMNS.doc())
val DROP_PARTITION_COLUMNS: ConfigProperty[Boolean] = HoodieTableConfig.DROP_PARTITION_COLUMNS
/** @deprecated Use {@link HIVE_ASSUME_DATE_PARTITION} and its methods instead */
@Deprecated
@@ -774,4 +785,23 @@ object DataSourceOptionsHelper {
override def apply (input: From): To = function (input)
}
}
implicit def convert[T, U](prop: ConfigProperty[T])(implicit converter: T => U): ConfigProperty[U] = {
checkState(prop.hasDefaultValue)
var newProp: ConfigProperty[U] = ConfigProperty.key(prop.key())
.defaultValue(converter(prop.defaultValue()))
.withDocumentation(prop.doc())
.withAlternatives(prop.getAlternatives.asScala: _*)
newProp = toScalaOption(prop.getSinceVersion) match {
case Some(version) => newProp.sinceVersion(version)
case None => newProp
}
newProp = toScalaOption(prop.getDeprecatedVersion) match {
case Some(version) => newProp.deprecatedAfter(version)
case None => newProp
}
newProp
}
}

View File

@@ -149,8 +149,36 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
protected val partitionColumns: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty)
protected val shouldOmitPartitionColumns: Boolean =
metaClient.getTableConfig.shouldDropPartitionColumns && partitionColumns.nonEmpty
/**
* Controls whether partition values (ie values of partition columns) should be
* <ol>
* <li>Extracted from partition path and appended to individual rows read from the data file (we
* delegate this to Spark's [[ParquetFileFormat]])</li>
* <li>Read from the data-file as is (by default Hudi persists all columns including partition ones)</li>
* </ol>
*
* This flag is only be relevant in conjunction with the usage of [["hoodie.datasource.write.drop.partition.columns"]]
* config, when Hudi will NOT be persisting partition columns in the data file, and therefore values for
* such partition columns (ie "partition values") will have to be parsed from the partition path, and appended
* to every row only in the fetched dataset.
*
* NOTE: Partition values extracted from partition path might be deviating from the values of the original
* partition columns: for ex, if originally as partition column was used column [[ts]] bearing epoch
* timestamp, which was used by [[TimestampBasedKeyGenerator]] to generate partition path of the format
* [["yyyy/mm/dd"]], appended partition value would bear the format verbatim as it was used in the
* partition path, meaning that string value of "2022/01/01" will be appended, and not its original
* representation
*/
protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = {
// Controls whether partition columns (which are the source for the partition path values) should
// be omitted from persistence in the data files. On the read path it affects whether partition values (values
// of partition columns) will be read from the data file ot extracted from partition path
val shouldOmitPartitionColumns = metaClient.getTableConfig.shouldDropPartitionColumns && partitionColumns.nonEmpty
val shouldExtractPartitionValueFromPath =
optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,
DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean
shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath
}
/**
* NOTE: PLEASE READ THIS CAREFULLY
@@ -228,7 +256,6 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
val fileSplits = collectFileSplits(partitionFilters, dataFilters)
val tableAvroSchemaStr =
if (internalSchema.isEmptySchema) tableAvroSchema.toString
else AvroInternalSchemaConverter.convert(internalSchema, tableAvroSchema.getName).toString
@@ -367,7 +394,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
protected def getPartitionColumnsAsInternalRow(file: FileStatus): InternalRow = {
try {
val tableConfig = metaClient.getTableConfig
if (shouldOmitPartitionColumns) {
if (shouldExtractPartitionValuesFromPartitionPath) {
val relativePath = new URI(metaClient.getBasePath).relativize(new URI(file.getPath.getParent.toString)).toString
val hiveStylePartitioningEnabled = tableConfig.getHiveStylePartitioningEnable.toBoolean
if (hiveStylePartitioningEnabled) {
@@ -420,9 +447,6 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
hadoopConf = hadoopConf
)
// We're delegating to Spark to append partition values to every row only in cases
// when these corresponding partition-values are not persisted w/in the data file itself
val shouldAppendPartitionColumns = shouldOmitPartitionColumns
val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
sparkSession = spark,
dataSchema = dataSchema.structTypeSchema,
@@ -431,7 +455,9 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
filters = filters,
options = options,
hadoopConf = hadoopConf,
appendPartitionValues = shouldAppendPartitionColumns
// We're delegating to Spark to append partition values to every row only in cases
// when these corresponding partition-values are not persisted w/in the data file itself
appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath
)
partitionedFile => {
@@ -448,7 +474,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
private def tryPrunePartitionColumns(tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema): (StructType, HoodieTableSchema, HoodieTableSchema) = {
if (shouldOmitPartitionColumns) {
if (shouldExtractPartitionValuesFromPartitionPath) {
val partitionSchema = StructType(partitionColumns.map(StructField(_, StringType)))
val prunedDataStructSchema = prunePartitionColumns(tableSchema.structTypeSchema)
val prunedRequiredSchema = prunePartitionColumns(requiredSchema.structTypeSchema)

View File

@@ -50,7 +50,6 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport {
options: Map[String, String],
hadoopConf: Configuration,
appendPartitionValues: Boolean = false): PartitionedFile => Iterator[InternalRow] = {
val parquetFileFormat: ParquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(appendPartitionValues).get
val readParquetFile: PartitionedFile => Iterator[Any] = parquetFileFormat.buildReaderWithPartitionValues(
sparkSession = sparkSession,

View File

@@ -160,7 +160,7 @@ object HoodieSparkSqlWriter {
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
.setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile)
.setDropPartitionColumnsWhenWrite(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS))
.setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS))
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
.initTable(sparkContext.hadoopConfiguration, path)
tableConfig = tableMetaClient.getTableConfig

View File

@@ -19,7 +19,7 @@
package org.apache.spark.sql.execution.datasources.parquet
import org.apache.hadoop.conf.Configuration
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.{DataSourceReadOptions, SparkAdapterSupport}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
@@ -41,14 +41,16 @@ class HoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterSupport
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
val shouldExtractPartitionValuesFromPartitionPath =
options.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,
DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean
sparkAdapter
.createHoodieParquetFileFormat(appendPartitionValues = false).get
.createHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get
.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
}
}
object HoodieParquetFileFormat {
val FILE_FORMAT_ID = "hoodie-parquet"
}