1
0

[HUDI-3812] Fixing Data Skipping configuration to respect Metadata Table configs (#5244)

Addressing the problem of Data Skipping not respecting Metadata Table configs which might differ b/w write/read paths. More details could be found in HUDI-3812.

- Fixing Data Skipping configuration to respect MT configs (on the Read path)
- Tightening up DS handling of cases when no top-level columns are in the target query
- Enhancing tests to cover all possible case
This commit is contained in:
Alexey Kudinkin
2022-04-10 10:43:47 -07:00
committed by GitHub
parent 7a9d48d126
commit 976840e8eb
4 changed files with 119 additions and 75 deletions

View File

@@ -25,10 +25,9 @@ import org.apache.hudi.client.HoodieJavaWriteClient
import org.apache.hudi.client.common.HoodieJavaEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.engine.EngineType
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieRecord, HoodieTableQueryType, HoodieTableType}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestUtils}
@@ -38,17 +37,15 @@ import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig}
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.TimestampType
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
import org.apache.hudi.metadata.{HoodieTableMetadata, MetadataPartitionType}
import org.apache.hudi.testutils.{HoodieClientTestBase, SparkClientFunctionalTestHarness}
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, GreaterThanOrEqual, LessThan, Literal}
import org.apache.spark.sql.execution.datasources.{NoopCache, PartitionDirectory}
import org.apache.spark.sql.functions.{lit, struct}
import org.apache.spark.sql.types.{IntegerType, StringType}
import org.apache.spark.sql.{DataFrameWriter, Row, SaveMode, SparkSession}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{BeforeEach, Tag, Test}
import org.junit.jupiter.api.{BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, CsvSource, MethodSource, ValueSource}
@@ -343,16 +340,19 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
import _spark.implicits._
val inputDF = tuples.toDF("id", "inv_id", "str", "rand")
val writeMetadataOpts = Map(
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
)
val opts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
RECORDKEY_FIELD.key -> "id",
PRECOMBINE_FIELD.key -> "id",
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
)
) ++ writeMetadataOpts
// If there are any failures in the Data Skipping flow, test should fail
spark.sqlContext.setConf(DataSkippingFailureMode.configName, DataSkippingFailureMode.Strict.value);
@@ -368,26 +368,46 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
metaClient = HoodieTableMetaClient.reload(metaClient)
val props = Map[String, String](
"path" -> basePath,
QUERY_TYPE.key -> QUERY_TYPE_SNAPSHOT_OPT_VAL,
DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true",
// NOTE: Metadata Table has to be enabled on the read path as well
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
)
case class TestCase(enableMetadata: Boolean,
enableColumnStats: Boolean,
enableDataSkipping: Boolean)
val fileIndex = HoodieFileIndex(spark, metaClient, Option.empty, props, NoopCache)
val testCases: Seq[TestCase] =
TestCase(enableMetadata = false, enableColumnStats = false, enableDataSkipping = false) ::
TestCase(enableMetadata = false, enableColumnStats = false, enableDataSkipping = true) ::
TestCase(enableMetadata = true, enableColumnStats = false, enableDataSkipping = true) ::
TestCase(enableMetadata = false, enableColumnStats = true, enableDataSkipping = true) ::
TestCase(enableMetadata = true, enableColumnStats = true, enableDataSkipping = true) ::
Nil
val allFilesPartitions = fileIndex.listFiles(Seq(), Seq())
assertEquals(10, allFilesPartitions.head.files.length)
for (testCase <- testCases) {
val readMetadataOpts = Map(
// NOTE: Metadata Table has to be enabled on the read path as well
HoodieMetadataConfig.ENABLE.key -> testCase.enableMetadata.toString,
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> testCase.enableColumnStats.toString,
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
)
// We're selecting a single file that contains "id" == 1 row, which there should be
// strictly 1. Given that 1 is minimal possible value, Data Skipping should be able to
// truncate search space to just a single file
val dataFilter = EqualTo(AttributeReference("id", IntegerType, nullable = false)(), Literal(1))
val filteredPartitions = fileIndex.listFiles(Seq(), Seq(dataFilter))
assertEquals(1, filteredPartitions.head.files.length)
val props = Map[String, String](
"path" -> basePath,
QUERY_TYPE.key -> QUERY_TYPE_SNAPSHOT_OPT_VAL,
DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> testCase.enableDataSkipping.toString
) ++ readMetadataOpts
val fileIndex = HoodieFileIndex(spark, metaClient, Option.empty, props, NoopCache)
val allFilesPartitions = fileIndex.listFiles(Seq(), Seq())
assertEquals(10, allFilesPartitions.head.files.length)
if (testCase.enableDataSkipping && testCase.enableMetadata) {
// We're selecting a single file that contains "id" == 1 row, which there should be
// strictly 1. Given that 1 is minimal possible value, Data Skipping should be able to
// truncate search space to just a single file
val dataFilter = EqualTo(AttributeReference("id", IntegerType, nullable = false)(), Literal(1))
val filteredPartitions = fileIndex.listFiles(Seq(), Seq(dataFilter))
assertEquals(1, filteredPartitions.head.files.length)
}
}
}
private def attribute(partition: String): AttributeReference = {
@@ -411,6 +431,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
}
object TestHoodieFileIndex {
def keyGeneratorParameters(): java.util.stream.Stream[Arguments] = {
java.util.stream.Stream.of(
Arguments.arguments(null.asInstanceOf[String]),

View File

@@ -19,7 +19,7 @@
package org.apache.hudi.functional
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path}
import org.apache.hadoop.fs.{LocatedFileStatus, Path}
import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema
import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD, RECORDKEY_FIELD}
import org.apache.hudi.HoodieConversionUtils.toProperties
@@ -27,6 +27,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.ParquetUtils
import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig}
import org.apache.hudi.functional.TestColumnStatsIndex.ColumnStatsTestCase
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions}
import org.apache.spark.sql._
@@ -35,7 +36,7 @@ import org.apache.spark.sql.types._
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue}
import org.junit.jupiter.api._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
import java.math.BigInteger
import java.sql.{Date, Timestamp}
@@ -72,19 +73,25 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
}
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testMetadataColumnStatsIndex(forceFullLogScan: Boolean): Unit = {
@MethodSource(Array("testMetadataColumnStatsIndexParams"))
def testMetadataColumnStatsIndex(testCase: ColumnStatsTestCase): Unit = {
val metadataOpts = Map(
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
)
val opts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
RECORDKEY_FIELD.key -> "c1",
PRECOMBINE_FIELD.key -> "c1",
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
HoodieMetadataConfig.ENABLE_FULL_SCAN_LOG_FILES.key -> forceFullLogScan.toString,
// NOTE: Currently only this setting is used like following by different MT partitions:
// - Files: using it
// - Column Stats: NOT using it (defaults to doing "point-lookups")
HoodieMetadataConfig.ENABLE_FULL_SCAN_LOG_FILES.key -> testCase.forceFullLogScan.toString,
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
)
) ++ metadataOpts
setTableName("hoodie_test")
initMetaClient()
@@ -108,10 +115,17 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
metaClient = HoodieTableMetaClient.reload(metaClient)
val metadataConfig = HoodieMetadataConfig.newBuilder()
.fromProperties(toProperties(opts))
.fromProperties(toProperties(metadataOpts))
.build()
val colStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, sourceTableSchema.fieldNames)
val targetColumnsToRead: Seq[String] = {
// Providing empty seq of columns to [[readColumnStatsIndex]] will lead to the whole
// MT to be read, and subsequently filtered
if (testCase.readFullMetadataTable) Seq.empty
else sourceTableSchema.fieldNames
}
val colStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, targetColumnsToRead)
val transposedColStatsDF = transposeColumnStatsIndex(spark, colStatsDF, sourceTableSchema.fieldNames, sourceTableSchema)
val expectedColStatsSchema = composeIndexSchema(sourceTableSchema.fieldNames, sourceTableSchema)
@@ -151,7 +165,7 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
metaClient = HoodieTableMetaClient.reload(metaClient)
val updatedColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, sourceTableSchema.fieldNames)
val updatedColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, targetColumnsToRead)
val transposedUpdatedColStatsDF = transposeColumnStatsIndex(spark, updatedColStatsDF, sourceTableSchema.fieldNames, sourceTableSchema)
val expectedColStatsIndexUpdatedDF =
@@ -243,26 +257,6 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
)
}
def bootstrapParquetInputTableFromJSON(sourceJSONTablePath: String, targetParquetTablePath: String): Unit = {
val jsonInputDF =
// NOTE: Schema here is provided for validation that the input date is in the appropriate format
spark.read
.schema(sourceTableSchema)
.json(sourceJSONTablePath)
jsonInputDF
.sort("c1")
.repartition(4, new Column("c1"))
.write
.format("parquet")
.mode("overwrite")
.save(targetParquetTablePath)
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
// Have to cleanup additional artefacts of Spark write
fs.delete(new Path(targetParquetTablePath, "_SUCCESS"), false)
}
private def generateRandomDataFrame(spark: SparkSession): DataFrame = {
val sourceTableSchema =
new StructType()
@@ -316,3 +310,14 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
}
}
object TestColumnStatsIndex {
case class ColumnStatsTestCase(forceFullLogScan: Boolean, readFullMetadataTable: Boolean)
def testMetadataColumnStatsIndexParams: java.util.stream.Stream[Arguments] =
java.util.stream.Stream.of(
Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = false, readFullMetadataTable = false)),
Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = true, readFullMetadataTable = true))
)
}