diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 8680a8c83..2c4623b2b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -321,9 +321,11 @@ public class HoodieMetadataPayload implements HoodieRecordPayload opt case Failure(e) => - logError("Failed to lookup candidate files in Z-index", e) - Option.empty + logError("Failed to lookup candidate files in File Index", e) + + spark.sqlContext.getConf(DataSkippingFailureMode.configName, DataSkippingFailureMode.Fallback.value) match { + case DataSkippingFailureMode.Fallback.value => Option.empty + case DataSkippingFailureMode.Strict.value => throw new HoodieException(e); + } } logDebug(s"Overlapping candidate files from Column Stats Index: ${candidateFilesNamesOpt.getOrElse(Set.empty)}") @@ -194,7 +199,7 @@ case class HoodieFileIndex(spark: SparkSession, val fs = metaClient.getFs val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(basePath) - if (!isDataSkippingEnabled() || !fs.exists(new Path(metadataTablePath)) || queryFilters.isEmpty) { + if (!isDataSkippingEnabled || !fs.exists(new Path(metadataTablePath)) || queryFilters.isEmpty) { Option.empty } else { val targetColStatsIndexColumns = Seq( @@ -302,6 +307,22 @@ case class HoodieFileIndex(spark: SparkSession, object HoodieFileIndex extends Logging { + object DataSkippingFailureMode extends Enumeration { + val configName = "hoodie.fileIndex.dataSkippingFailureMode" + + type DataSkippingFailureMode = Value + + case class Val(value: String) extends super.Val { + override def toString(): String = value + } + + import scala.language.implicitConversions + implicit def valueToVal(x: Value): DataSkippingFailureMode = x.asInstanceOf[Val] + + val Fallback: Val = Val("fallback") + val Strict: Val = Val("strict") + } + private def collectReferencedColumns(spark: SparkSession, queryFilters: Seq[Expression], schema: StructType): Seq[String] = { val resolver = spark.sessionState.analyzer.resolver val refs = queryFilters.flatMap(_.references) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala index 899fc4cc2..a11c2f73f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala @@ -20,6 +20,7 @@ package org.apache.hudi import org.apache.hadoop.conf.Configuration import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL} import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.HoodieFileIndex.DataSkippingFailureMode import org.apache.hudi.client.HoodieJavaWriteClient import org.apache.hudi.client.common.HoodieJavaEngineContext import org.apache.hudi.common.config.HoodieMetadataConfig @@ -354,6 +355,9 @@ class TestHoodieFileIndex extends HoodieClientTestBase { HoodieTableConfig.POPULATE_META_FIELDS.key -> "true" ) + // If there are any failures in the Data Skipping flow, test should fail + spark.sqlContext.setConf(DataSkippingFailureMode.configName, DataSkippingFailureMode.Strict.value); + inputDF.repartition(4) .write .format("hudi") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala index 96728f620..188ba6745 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala @@ -18,13 +18,14 @@ package org.apache.hudi.functional +import org.apache.hudi.HoodieFileIndex.DataSkippingFailureMode import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig} import org.apache.hudi.testutils.HoodieClientTestBase -import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions} +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieFileIndex} import org.apache.spark.sql._ import org.apache.spark.sql.types._ import org.junit.jupiter.api.Assertions.assertEquals @@ -92,6 +93,9 @@ class TestLayoutOptimization extends HoodieClientTestBase { val records = recordsToStrings(dataGen.generateInserts("001", targetRecordsCount)).toList val writeDf: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records, 2)) + // If there are any failures in the Data Skipping flow, test should fail + spark.sqlContext.setConf(DataSkippingFailureMode.configName, DataSkippingFailureMode.Strict.value); + writeDf.write.format("org.apache.hudi") .options(commonOpts) .option("hoodie.compact.inline", "false")