[HUDI-3731] Fixing Column Stats Index record Merging sequence missing columnName (#5159)
* Added `DataSkippingFailureMode` to control how DS handles failures in the flow (either "strict", when exception would be thrown, or "fallback" when it will just fallback to the full-scan) * Make sure tests execute in `DataSkippingFailureMode.Strict` * Fixed Column Stats Index record merging sequence missing `columnName`
This commit is contained in:
@@ -321,9 +321,11 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
|
|||||||
private HoodieMetadataColumnStats combineColumnStatsMetadata(HoodieMetadataPayload previousRecord) {
|
private HoodieMetadataColumnStats combineColumnStatsMetadata(HoodieMetadataPayload previousRecord) {
|
||||||
checkArgument(previousRecord.getColumnStatMetadata().isPresent());
|
checkArgument(previousRecord.getColumnStatMetadata().isPresent());
|
||||||
checkArgument(getColumnStatMetadata().isPresent());
|
checkArgument(getColumnStatMetadata().isPresent());
|
||||||
checkArgument(previousRecord.getColumnStatMetadata().get()
|
|
||||||
.getFileName().equals(this.columnStatMetadata.getFileName()));
|
HoodieMetadataColumnStats previousColStatsRecord = previousRecord.getColumnStatMetadata().get();
|
||||||
return HoodieTableMetadataUtil.mergeColumnStats(previousRecord.getColumnStatMetadata().get(), this.columnStatMetadata);
|
HoodieMetadataColumnStats newColumnStatsRecord = getColumnStatMetadata().get();
|
||||||
|
|
||||||
|
return HoodieTableMetadataUtil.mergeColumnStats(previousColStatsRecord, newColumnStatsRecord);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -90,6 +90,7 @@ import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.TOTAL
|
|||||||
import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.TOTAL_UNCOMPRESSED_SIZE;
|
import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.TOTAL_UNCOMPRESSED_SIZE;
|
||||||
import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.VALUE_COUNT;
|
import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.VALUE_COUNT;
|
||||||
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
|
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
|
||||||
|
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
|
||||||
import static org.apache.hudi.metadata.HoodieTableMetadata.EMPTY_PARTITION_NAME;
|
import static org.apache.hudi.metadata.HoodieTableMetadata.EMPTY_PARTITION_NAME;
|
||||||
import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
|
import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
|
||||||
|
|
||||||
@@ -935,20 +936,25 @@ public class HoodieTableMetadataUtil {
|
|||||||
return Arrays.asList(tableConfig.getRecordKeyFields().get());
|
return Arrays.asList(tableConfig.getRecordKeyFields().get());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static HoodieMetadataColumnStats mergeColumnStats(HoodieMetadataColumnStats oldColumnStats, HoodieMetadataColumnStats newColumnStats) {
|
public static HoodieMetadataColumnStats mergeColumnStats(HoodieMetadataColumnStats prevColumnStatsRecord,
|
||||||
ValidationUtils.checkArgument(oldColumnStats.getFileName().equals(newColumnStats.getFileName()));
|
HoodieMetadataColumnStats newColumnStatsRecord) {
|
||||||
if (newColumnStats.getIsDeleted()) {
|
checkArgument(prevColumnStatsRecord.getFileName().equals(newColumnStatsRecord.getFileName()));
|
||||||
return newColumnStats;
|
checkArgument(prevColumnStatsRecord.getColumnName().equals(newColumnStatsRecord.getColumnName()));
|
||||||
|
|
||||||
|
if (newColumnStatsRecord.getIsDeleted()) {
|
||||||
|
return newColumnStatsRecord;
|
||||||
}
|
}
|
||||||
|
|
||||||
return HoodieMetadataColumnStats.newBuilder()
|
return HoodieMetadataColumnStats.newBuilder()
|
||||||
.setFileName(newColumnStats.getFileName())
|
.setFileName(newColumnStatsRecord.getFileName())
|
||||||
.setMinValue(Stream.of(oldColumnStats.getMinValue(), newColumnStats.getMinValue()).filter(Objects::nonNull).min(Comparator.naturalOrder()).orElse(null))
|
.setColumnName(newColumnStatsRecord.getColumnName())
|
||||||
.setMaxValue(Stream.of(oldColumnStats.getMinValue(), newColumnStats.getMinValue()).filter(Objects::nonNull).max(Comparator.naturalOrder()).orElse(null))
|
.setMinValue(Stream.of(prevColumnStatsRecord.getMinValue(), newColumnStatsRecord.getMinValue()).filter(Objects::nonNull).min(Comparator.naturalOrder()).orElse(null))
|
||||||
.setValueCount(oldColumnStats.getValueCount() + newColumnStats.getValueCount())
|
.setMaxValue(Stream.of(prevColumnStatsRecord.getMinValue(), newColumnStatsRecord.getMinValue()).filter(Objects::nonNull).max(Comparator.naturalOrder()).orElse(null))
|
||||||
.setNullCount(oldColumnStats.getNullCount() + newColumnStats.getNullCount())
|
.setValueCount(prevColumnStatsRecord.getValueCount() + newColumnStatsRecord.getValueCount())
|
||||||
.setTotalSize(oldColumnStats.getTotalSize() + newColumnStats.getTotalSize())
|
.setNullCount(prevColumnStatsRecord.getNullCount() + newColumnStatsRecord.getNullCount())
|
||||||
.setTotalUncompressedSize(oldColumnStats.getTotalUncompressedSize() + newColumnStats.getTotalUncompressedSize())
|
.setTotalSize(prevColumnStatsRecord.getTotalSize() + newColumnStatsRecord.getTotalSize())
|
||||||
.setIsDeleted(newColumnStats.getIsDeleted())
|
.setTotalUncompressedSize(prevColumnStatsRecord.getTotalUncompressedSize() + newColumnStatsRecord.getTotalUncompressedSize())
|
||||||
|
.setIsDeleted(newColumnStatsRecord.getIsDeleted())
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -19,10 +19,11 @@ package org.apache.hudi
|
|||||||
|
|
||||||
import org.apache.hadoop.fs.{FileStatus, Path}
|
import org.apache.hadoop.fs.{FileStatus, Path}
|
||||||
import org.apache.hudi.HoodieDatasetUtils.withPersistence
|
import org.apache.hudi.HoodieDatasetUtils.withPersistence
|
||||||
import org.apache.hudi.HoodieFileIndex.{collectReferencedColumns, getConfigProperties}
|
import org.apache.hudi.HoodieFileIndex.{DataSkippingFailureMode, collectReferencedColumns, getConfigProperties}
|
||||||
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
|
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient
|
import org.apache.hudi.common.table.HoodieTableMetaClient
|
||||||
import org.apache.hudi.common.util.StringUtils
|
import org.apache.hudi.common.util.StringUtils
|
||||||
|
import org.apache.hudi.exception.HoodieException
|
||||||
import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper.{getMaxColumnNameFor, getMinColumnNameFor, getNumNullsColumnNameFor}
|
import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper.{getMaxColumnNameFor, getMinColumnNameFor, getNumNullsColumnNameFor}
|
||||||
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
|
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
|
||||||
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
|
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
|
||||||
@@ -84,7 +85,7 @@ case class HoodieFileIndex(spark: SparkSession,
|
|||||||
|
|
||||||
override def rootPaths: Seq[Path] = queryPaths.asScala
|
override def rootPaths: Seq[Path] = queryPaths.asScala
|
||||||
|
|
||||||
def isDataSkippingEnabled(): Boolean = {
|
def isDataSkippingEnabled: Boolean = {
|
||||||
options.getOrElse(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(),
|
options.getOrElse(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(),
|
||||||
spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false")).toBoolean
|
spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false")).toBoolean
|
||||||
}
|
}
|
||||||
@@ -123,8 +124,12 @@ case class HoodieFileIndex(spark: SparkSession,
|
|||||||
lookupCandidateFilesInMetadataTable(dataFilters) match {
|
lookupCandidateFilesInMetadataTable(dataFilters) match {
|
||||||
case Success(opt) => opt
|
case Success(opt) => opt
|
||||||
case Failure(e) =>
|
case Failure(e) =>
|
||||||
logError("Failed to lookup candidate files in Z-index", e)
|
logError("Failed to lookup candidate files in File Index", e)
|
||||||
Option.empty
|
|
||||||
|
spark.sqlContext.getConf(DataSkippingFailureMode.configName, DataSkippingFailureMode.Fallback.value) match {
|
||||||
|
case DataSkippingFailureMode.Fallback.value => Option.empty
|
||||||
|
case DataSkippingFailureMode.Strict.value => throw new HoodieException(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
logDebug(s"Overlapping candidate files from Column Stats Index: ${candidateFilesNamesOpt.getOrElse(Set.empty)}")
|
logDebug(s"Overlapping candidate files from Column Stats Index: ${candidateFilesNamesOpt.getOrElse(Set.empty)}")
|
||||||
@@ -194,7 +199,7 @@ case class HoodieFileIndex(spark: SparkSession,
|
|||||||
val fs = metaClient.getFs
|
val fs = metaClient.getFs
|
||||||
val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(basePath)
|
val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(basePath)
|
||||||
|
|
||||||
if (!isDataSkippingEnabled() || !fs.exists(new Path(metadataTablePath)) || queryFilters.isEmpty) {
|
if (!isDataSkippingEnabled || !fs.exists(new Path(metadataTablePath)) || queryFilters.isEmpty) {
|
||||||
Option.empty
|
Option.empty
|
||||||
} else {
|
} else {
|
||||||
val targetColStatsIndexColumns = Seq(
|
val targetColStatsIndexColumns = Seq(
|
||||||
@@ -302,6 +307,22 @@ case class HoodieFileIndex(spark: SparkSession,
|
|||||||
|
|
||||||
object HoodieFileIndex extends Logging {
|
object HoodieFileIndex extends Logging {
|
||||||
|
|
||||||
|
object DataSkippingFailureMode extends Enumeration {
|
||||||
|
val configName = "hoodie.fileIndex.dataSkippingFailureMode"
|
||||||
|
|
||||||
|
type DataSkippingFailureMode = Value
|
||||||
|
|
||||||
|
case class Val(value: String) extends super.Val {
|
||||||
|
override def toString(): String = value
|
||||||
|
}
|
||||||
|
|
||||||
|
import scala.language.implicitConversions
|
||||||
|
implicit def valueToVal(x: Value): DataSkippingFailureMode = x.asInstanceOf[Val]
|
||||||
|
|
||||||
|
val Fallback: Val = Val("fallback")
|
||||||
|
val Strict: Val = Val("strict")
|
||||||
|
}
|
||||||
|
|
||||||
private def collectReferencedColumns(spark: SparkSession, queryFilters: Seq[Expression], schema: StructType): Seq[String] = {
|
private def collectReferencedColumns(spark: SparkSession, queryFilters: Seq[Expression], schema: StructType): Seq[String] = {
|
||||||
val resolver = spark.sessionState.analyzer.resolver
|
val resolver = spark.sessionState.analyzer.resolver
|
||||||
val refs = queryFilters.flatMap(_.references)
|
val refs = queryFilters.flatMap(_.references)
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ package org.apache.hudi
|
|||||||
import org.apache.hadoop.conf.Configuration
|
import org.apache.hadoop.conf.Configuration
|
||||||
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
|
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
|
||||||
import org.apache.hudi.DataSourceWriteOptions._
|
import org.apache.hudi.DataSourceWriteOptions._
|
||||||
|
import org.apache.hudi.HoodieFileIndex.DataSkippingFailureMode
|
||||||
import org.apache.hudi.client.HoodieJavaWriteClient
|
import org.apache.hudi.client.HoodieJavaWriteClient
|
||||||
import org.apache.hudi.client.common.HoodieJavaEngineContext
|
import org.apache.hudi.client.common.HoodieJavaEngineContext
|
||||||
import org.apache.hudi.common.config.HoodieMetadataConfig
|
import org.apache.hudi.common.config.HoodieMetadataConfig
|
||||||
@@ -354,6 +355,9 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
|
|||||||
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
|
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// If there are any failures in the Data Skipping flow, test should fail
|
||||||
|
spark.sqlContext.setConf(DataSkippingFailureMode.configName, DataSkippingFailureMode.Strict.value);
|
||||||
|
|
||||||
inputDF.repartition(4)
|
inputDF.repartition(4)
|
||||||
.write
|
.write
|
||||||
.format("hudi")
|
.format("hudi")
|
||||||
|
|||||||
@@ -18,13 +18,14 @@
|
|||||||
|
|
||||||
package org.apache.hudi.functional
|
package org.apache.hudi.functional
|
||||||
|
|
||||||
|
import org.apache.hudi.HoodieFileIndex.DataSkippingFailureMode
|
||||||
import org.apache.hudi.common.config.HoodieMetadataConfig
|
import org.apache.hudi.common.config.HoodieMetadataConfig
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient
|
import org.apache.hudi.common.table.HoodieTableMetaClient
|
||||||
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
|
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
|
||||||
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
|
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
|
||||||
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
|
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
|
||||||
import org.apache.hudi.testutils.HoodieClientTestBase
|
import org.apache.hudi.testutils.HoodieClientTestBase
|
||||||
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
|
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieFileIndex}
|
||||||
import org.apache.spark.sql._
|
import org.apache.spark.sql._
|
||||||
import org.apache.spark.sql.types._
|
import org.apache.spark.sql.types._
|
||||||
import org.junit.jupiter.api.Assertions.assertEquals
|
import org.junit.jupiter.api.Assertions.assertEquals
|
||||||
@@ -92,6 +93,9 @@ class TestLayoutOptimization extends HoodieClientTestBase {
|
|||||||
val records = recordsToStrings(dataGen.generateInserts("001", targetRecordsCount)).toList
|
val records = recordsToStrings(dataGen.generateInserts("001", targetRecordsCount)).toList
|
||||||
val writeDf: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records, 2))
|
val writeDf: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records, 2))
|
||||||
|
|
||||||
|
// If there are any failures in the Data Skipping flow, test should fail
|
||||||
|
spark.sqlContext.setConf(DataSkippingFailureMode.configName, DataSkippingFailureMode.Strict.value);
|
||||||
|
|
||||||
writeDf.write.format("org.apache.hudi")
|
writeDf.write.format("org.apache.hudi")
|
||||||
.options(commonOpts)
|
.options(commonOpts)
|
||||||
.option("hoodie.compact.inline", "false")
|
.option("hoodie.compact.inline", "false")
|
||||||
|
|||||||
Reference in New Issue
Block a user