1
0

[HUDI-3841] Fixing Column Stats in the presence of Schema Evolution (#5275)

Currently, Data Skipping is not handling correctly the case when column-stats are not aligned and, for ex, some of the (column, file) combinations are missing from the CSI.

This could occur in different scenarios (schema evolution, CSI config changes), and has to be handled properly when we're composing CSI projection for Data Skipping. This PR addresses that.

- Added appropriate aligning for the transposed CSI projection
This commit is contained in:
Alexey Kudinkin
2022-04-11 12:45:53 -07:00
committed by GitHub
parent 52ea1e4964
commit 458fdd5611
18 changed files with 266 additions and 36 deletions

View File

@@ -26,6 +26,7 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.hash.ColumnIndexID
import org.apache.hudi.data.HoodieJavaRDD
import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata, HoodieTableMetadataUtil, MetadataPartitionType}
@@ -113,11 +114,11 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport {
*
* @param spark Spark session ref
* @param colStatsDF [[DataFrame]] bearing raw Column Stats Index table
* @param targetColumns target columns to be included into the final table
* @param queryColumns target columns to be included into the final table
* @param tableSchema schema of the source data table
* @return reshaped table according to the format outlined above
*/
def transposeColumnStatsIndex(spark: SparkSession, colStatsDF: DataFrame, targetColumns: Seq[String], tableSchema: StructType): DataFrame = {
def transposeColumnStatsIndex(spark: SparkSession, colStatsDF: DataFrame, queryColumns: Seq[String], tableSchema: StructType): DataFrame = {
val colStatsSchema = colStatsDF.schema
val colStatsSchemaOrdinalsMap = colStatsSchema.fields.zipWithIndex.map({
case (field, ordinal) => (field.name, ordinal)
@@ -125,10 +126,6 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport {
val tableSchemaFieldMap = tableSchema.fields.map(f => (f.name, f)).toMap
// NOTE: We're sorting the columns to make sure final index schema matches layout
// of the transposed table
val sortedColumns = TreeSet(targetColumns: _*)
val colNameOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME)
val minValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE)
val maxValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE)
@@ -136,36 +133,69 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport {
val nullCountOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
val valueCountOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT)
// NOTE: We have to collect list of indexed columns to make sure we properly align the rows
// w/in the transposed dataset: since some files might not have all of the columns indexed
// either due to the Column Stats Index config changes, schema evolution, etc, we have
// to make sure that all of the rows w/in transposed data-frame are properly padded (with null
// values) for such file-column combinations
val indexedColumns: Seq[String] = colStatsDF.rdd.map(row => row.getString(colNameOrdinal)).distinct().collect()
// NOTE: We're sorting the columns to make sure final index schema matches layout
// of the transposed table
val sortedTargetColumns = TreeSet(queryColumns.intersect(indexedColumns): _*)
val transposedRDD = colStatsDF.rdd
.filter(row => sortedColumns.contains(row.getString(colNameOrdinal)))
.filter(row => sortedTargetColumns.contains(row.getString(colNameOrdinal)))
.map { row =>
val (minValue, _) = tryUnpackNonNullVal(row.getAs[Row](minValueOrdinal))
val (maxValue, _) = tryUnpackNonNullVal(row.getAs[Row](maxValueOrdinal))
if (row.isNullAt(minValueOrdinal) && row.isNullAt(maxValueOrdinal)) {
// Corresponding row could be null in either of the 2 cases
// - Column contains only null values (in that case both min/max have to be nulls)
// - This is a stubbed Column Stats record (used as a tombstone)
row
} else {
val minValueStruct = row.getAs[Row](minValueOrdinal)
val maxValueStruct = row.getAs[Row](maxValueOrdinal)
val colName = row.getString(colNameOrdinal)
val colType = tableSchemaFieldMap(colName).dataType
checkState(minValueStruct != null && maxValueStruct != null, "Invalid Column Stats record: either both min/max have to be null, or both have to be non-null")
val rowValsSeq = row.toSeq.toArray
val colName = row.getString(colNameOrdinal)
val colType = tableSchemaFieldMap(colName).dataType
rowValsSeq(minValueOrdinal) = deserialize(minValue, colType)
rowValsSeq(maxValueOrdinal) = deserialize(maxValue, colType)
val (minValue, _) = tryUnpackNonNullVal(minValueStruct)
val (maxValue, _) = tryUnpackNonNullVal(maxValueStruct)
val rowValsSeq = row.toSeq.toArray
// Update min-/max-value structs w/ unwrapped values in-place
rowValsSeq(minValueOrdinal) = deserialize(minValue, colType)
rowValsSeq(maxValueOrdinal) = deserialize(maxValue, colType)
Row(rowValsSeq:_*)
Row(rowValsSeq: _*)
}
}
.groupBy(r => r.getString(fileNameOrdinal))
.foldByKey(Seq[Row]()) {
case (_, columnRows) =>
case (_, columnRowsSeq) =>
// Rows seq is always non-empty (otherwise it won't be grouped into)
val fileName = columnRows.head.get(fileNameOrdinal)
val valueCount = columnRows.head.get(valueCountOrdinal)
val fileName = columnRowsSeq.head.get(fileNameOrdinal)
val valueCount = columnRowsSeq.head.get(valueCountOrdinal)
val coalescedRowValuesSeq = columnRows.toSeq
// NOTE: It's crucial to maintain appropriate ordering of the columns
// matching table layout
.sortBy(_.getString(colNameOrdinal))
.foldLeft(Seq[Any](fileName, valueCount)) {
case (acc, columnRow) =>
acc ++ Seq(minValueOrdinal, maxValueOrdinal, nullCountOrdinal).map(ord => columnRow.get(ord))
// To properly align individual rows (corresponding to a file) w/in the transposed projection, we need
// to align existing column-stats for individual file with the list of expected ones for the
// whole transposed projection (a superset of all files)
val columnRowsMap = columnRowsSeq.map(row => (row.getString(colNameOrdinal), row)).toMap
val alignedColumnRowsSeq = sortedTargetColumns.toSeq.map(columnRowsMap.get)
val coalescedRowValuesSeq =
alignedColumnRowsSeq.foldLeft(Seq[Any](fileName, valueCount)) {
case (acc, opt) =>
opt match {
case Some(columnStatsRow) =>
acc ++ Seq(minValueOrdinal, maxValueOrdinal, nullCountOrdinal).map(ord => columnStatsRow.get(ord))
case None =>
// NOTE: Since we're assuming missing column to essentially contain exclusively
// null values, we set null-count to be equal to value-count (this behavior is
// consistent with reading non-existent columns from Parquet)
acc ++ Seq(null, null, valueCount)
}
}
Seq(Row(coalescedRowValuesSeq:_*))
@@ -176,7 +206,7 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport {
// NOTE: It's crucial to maintain appropriate ordering of the columns
// matching table layout: hence, we cherry-pick individual columns
// instead of simply filtering in the ones we're interested in the schema
val indexSchema = composeIndexSchema(sortedColumns.toSeq, tableSchema)
val indexSchema = composeIndexSchema(sortedTargetColumns.toSeq, tableSchema)
spark.createDataFrame(transposedRDD, indexSchema)
}

View File

@@ -0,0 +1,10 @@
{"c1":770,"c4":"2021-11-18T23:34:44.201-08:00","c5":78,"c6":"2020-01-15","c7":"Ag==","c8":9}
{"c1":768,"c4":"2021-11-18T23:34:44.201-08:00","c5":78,"c6":"2020-10-13","c7":"AA==","c8":9}
{"c1":431,"c4":"2021-11-18T23:34:44.186-08:00","c5":44,"c6":"2020-03-12","c7":"rw==","c8":9}
{"c1":427,"c4":"2021-11-18T23:34:44.186-08:00","c5":44,"c6":"2020-10-08","c7":"qw==","c8":9}
{"c1":328,"c4":"2021-11-18T23:34:44.181-08:00","c5":34,"c6":"2020-10-21","c7":"SA==","c8":9}
{"c1":320,"c4":"2021-11-18T23:34:44.180-08:00","c5":33,"c6":"2020-02-13","c7":"QA==","c8":9}
{"c1":317,"c4":"2021-11-18T23:34:44.180-08:00","c5":33,"c6":"2020-10-10","c7":"PQ==","c8":9}
{"c1":308,"c4":"2021-11-18T23:34:44.180-08:00","c5":32,"c6":"2020-01-01","c7":"NA==","c8":9}
{"c1":304,"c4":"2021-11-18T23:34:44.179-08:00","c5":32,"c6":"2020-08-25","c7":"MA==","c8":9}
{"c1":300,"c4":"2021-11-18T23:34:44.179-08:00","c5":31,"c6":"2020-04-21","c7":"LA==","c8":9}

View File

@@ -0,0 +1,10 @@
{"c1":719,"c4":"2021-11-18T23:34:44.199-08:00","c5":73,"c6":"2020-05-20","c7":"zw==","c8":9}
{"c1":715,"c4":"2021-11-18T23:34:44.199-08:00","c5":73,"c6":"2020-01-16","c7":"yw==","c8":9}
{"c1":579,"c4":"2021-11-18T23:34:44.193-08:00","c5":59,"c6":"2020-08-20","c7":"Qw==","c8":9}
{"c1":568,"c4":"2021-11-18T23:34:44.193-08:00","c5":58,"c6":"2020-08-09","c7":"OA==","c8":9}
{"c1":367,"c4":"2021-11-18T23:34:44.183-08:00","c5":38,"c6":"2020-05-04","c7":"bw==","c8":9}
{"c1":364,"c4":"2021-11-18T23:34:44.183-08:00","c5":38,"c6":"2020-02-01","c7":"bA==","c8":9}
{"c1":250,"c4":"2021-11-18T23:34:44.176-08:00","c5":26,"c6":"2020-09-27","c7":"+g==","c8":9}
{"c1":249,"c4":"2021-11-18T23:34:44.176-08:00","c5":26,"c6":"2020-08-26","c7":"+Q==","c8":9}
{"c1":246,"c4":"2021-11-18T23:34:44.176-08:00","c5":26,"c6":"2020-05-23","c7":"9g==","c8":9}
{"c1":125,"c4":"2021-11-18T23:34:44.169-08:00","c5":14,"c6":"2020-05-14","c7":"fQ==","c8":9}

View File

@@ -0,0 +1,10 @@
{"c1":486,"c4":"2021-11-18T23:34:44.189-08:00","c5":50,"c6":"2020-03-11","c7":"5g==","c8":9}
{"c1":483,"c4":"2021-11-18T23:34:44.189-08:00","c5":49,"c6":"2020-11-08","c7":"4w==","c8":9}
{"c1":224,"c4":"2021-11-18T23:34:44.175-08:00","c5":24,"c6":"2020-05-01","c7":"4A==","c8":9}
{"c1":118,"c4":"2021-11-18T23:34:44.168-08:00","c5":13,"c6":"2020-09-07","c7":"dg==","c8":9}
{"c1":111,"c4":"2021-11-18T23:34:44.168-08:00","c5":12,"c6":"2020-02-28","c7":"bw==","c8":9}
{"c1":79,"c4":"2021-11-18T23:34:44.166-08:00","c5":9,"c6":"2020-03-24","c7":"Tw==","c8":9}
{"c1":77,"c4":"2021-11-18T23:34:44.166-08:00","c5":9,"c6":"2020-01-22","c7":"TQ==","c8":9}
{"c1":76,"c4":"2021-11-18T23:34:44.166-08:00","c5":9,"c6":"2020-11-21","c7":"TA==","c8":9}
{"c1":60,"c4":"2021-11-18T23:34:44.164-08:00","c5":7,"c6":"2020-06-05","c7":"PA==","c8":9}
{"c1":59,"c4":"2021-11-18T23:34:44.164-08:00","c5":7,"c6":"2020-05-04","c7":"Ow==","c8":9}

View File

@@ -0,0 +1,10 @@
{"c1":272,"c4":"2021-11-18T23:34:44.178-08:00","c5":28,"c6":"2020-09-21","c7":"EA==","c8":9}
{"c1":258,"c4":"2021-11-18T23:34:44.177-08:00","c5":27,"c6":"2020-06-07","c7":"Ag==","c8":9}
{"c1":240,"c4":"2021-11-18T23:34:44.176-08:00","c5":25,"c6":"2020-10-17","c7":"8A==","c8":9}
{"c1":236,"c4":"2021-11-18T23:34:44.176-08:00","c5":25,"c6":"2020-06-13","c7":"7A==","c8":9}
{"c1":137,"c4":"2021-11-18T23:34:44.170-08:00","c5":15,"c6":"2020-06-26","c7":"iQ==","c8":9}
{"c1":134,"c4":"2021-11-18T23:34:44.170-08:00","c5":15,"c6":"2020-03-23","c7":"hg==","c8":9}
{"c1":131,"c4":"2021-11-18T23:34:44.169-08:00","c5":14,"c6":"2020-11-20","c7":"gw==","c8":9}
{"c1":129,"c4":"2021-11-18T23:34:44.169-08:00","c5":14,"c6":"2020-09-18","c7":"gQ==","c8":9}
{"c1":24,"c4":"2021-11-18T23:34:44.161-08:00","c5":4,"c6":"2020-03-25","c7":"GA==","c8":9}
{"c1":8,"c4":"2021-11-18T23:34:44.159-08:00","c5":2,"c6":"2020-09-09","c7":"CA==","c8":9}

View File

@@ -0,0 +1,4 @@
{"c1_maxValue":769,"c1_minValue":309,"c1_nullCount":0,"valueCount":9}
{"c1_maxValue":932,"c1_minValue":0,"c1_nullCount":0,"valueCount":8}
{"c1_maxValue":943,"c1_minValue":89,"c1_nullCount":0,"valueCount":10}
{"c1_maxValue":959,"c1_minValue":74,"c1_nullCount":0,"valueCount":13}

View File

@@ -0,0 +1,8 @@
{"c1_maxValue":568,"c1_minValue":8,"c1_nullCount":0,"c2_nullCount":15,"c3_nullCount":15,"valueCount":15}
{"c1_maxValue":715,"c1_minValue":76,"c1_nullCount":0,"c2_nullCount":12,"c3_nullCount":12,"valueCount":12}
{"c1_maxValue":768,"c1_minValue":59,"c1_nullCount":0,"c2_nullCount":7,"c3_nullCount":7,"valueCount":7}
{"c1_maxValue":769,"c1_minValue":309,"c1_nullCount":0,"c2_maxValue":" 769sdc","c2_minValue":" 309sdc","c2_nullCount":0,"c3_maxValue":919.769,"c3_minValue":76.430,"c3_nullCount":0,"valueCount":9}
{"c1_maxValue":770,"c1_minValue":129,"c1_nullCount":0,"c2_nullCount":6,"c3_nullCount":6,"valueCount":6}
{"c1_maxValue":932,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 932sdc","c2_minValue":" 0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"valueCount":8}
{"c1_maxValue":943,"c1_minValue":89,"c1_nullCount":0,"c2_maxValue":" 943sdc","c2_minValue":" 200sdc","c2_nullCount":0,"c3_maxValue":854.690,"c3_minValue":100.556,"c3_nullCount":0,"valueCount":10}
{"c1_maxValue":959,"c1_minValue":74,"c1_nullCount":0,"c2_maxValue":" 959sdc","c2_minValue":" 181sdc","c2_nullCount":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_nullCount":0,"valueCount":13}

View File

@@ -63,6 +63,10 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
initPath()
initSparkContexts()
initFileSystem()
setTableName("hoodie_test")
initMetaClient()
spark = sqlContext.sparkSession
}
@@ -93,10 +97,7 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
) ++ metadataOpts
setTableName("hoodie_test")
initMetaClient()
val sourceJSONTablePath = getClass.getClassLoader.getResource("index/zorder/input-table-json").toString
val sourceJSONTablePath = getClass.getClassLoader.getResource("index/colstats/input-table-json").toString
// NOTE: Schema here is provided for validation that the input date is in the appropriate format
val inputDF = spark.read.schema(sourceTableSchema).json(sourceJSONTablePath)
@@ -118,14 +119,14 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
.fromProperties(toProperties(metadataOpts))
.build()
val targetColumnsToRead: Seq[String] = {
val requestedColumns: Seq[String] = {
// Providing empty seq of columns to [[readColumnStatsIndex]] will lead to the whole
// MT to be read, and subsequently filtered
if (testCase.readFullMetadataTable) Seq.empty
else sourceTableSchema.fieldNames
}
val colStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, targetColumnsToRead)
val colStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns)
val transposedColStatsDF = transposeColumnStatsIndex(spark, colStatsDF, sourceTableSchema.fieldNames, sourceTableSchema)
val expectedColStatsSchema = composeIndexSchema(sourceTableSchema.fieldNames, sourceTableSchema)
@@ -134,7 +135,7 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
val expectedColStatsIndexTableDf =
spark.read
.schema(expectedColStatsSchema)
.json(getClass.getClassLoader.getResource("index/zorder/column-stats-index-table.json").toString)
.json(getClass.getClassLoader.getResource("index/colstats/column-stats-index-table.json").toString)
assertEquals(expectedColStatsIndexTableDf.schema, transposedColStatsDF.schema)
// NOTE: We have to drop the `fileName` column as it contains semi-random components
@@ -149,7 +150,7 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
assertEquals(asJson(sort(manualColStatsTableDF)), asJson(sort(transposedColStatsDF)))
// do an upsert and validate
val updateJSONTablePath = getClass.getClassLoader.getResource("index/zorder/another-input-table-json").toString
val updateJSONTablePath = getClass.getClassLoader.getResource("index/colstats/another-input-table-json").toString
val updateDF = spark.read
.schema(sourceTableSchema)
.json(updateJSONTablePath)
@@ -165,13 +166,13 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
metaClient = HoodieTableMetaClient.reload(metaClient)
val updatedColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, targetColumnsToRead)
val updatedColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns)
val transposedUpdatedColStatsDF = transposeColumnStatsIndex(spark, updatedColStatsDF, sourceTableSchema.fieldNames, sourceTableSchema)
val expectedColStatsIndexUpdatedDF =
spark.read
.schema(expectedColStatsSchema)
.json(getClass.getClassLoader.getResource("index/zorder/updated-column-stats-index-table.json").toString)
.json(getClass.getClassLoader.getResource("index/colstats/updated-column-stats-index-table.json").toString)
assertEquals(expectedColStatsIndexUpdatedDF.schema, transposedUpdatedColStatsDF.schema)
assertEquals(asJson(sort(expectedColStatsIndexUpdatedDF)), asJson(sort(transposedUpdatedColStatsDF.drop("fileName"))))
@@ -183,6 +184,153 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
assertEquals(asJson(sort(manualUpdatedColStatsTableDF)), asJson(sort(transposedUpdatedColStatsDF)))
}
@Test
def testMetadataColumnStatsIndexPartialProjection(): Unit = {
val targetColumnsToIndex = Seq("c1", "c2", "c3")
val metadataOpts = Map(
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key -> targetColumnsToIndex.mkString(",")
)
val opts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
RECORDKEY_FIELD.key -> "c1",
PRECOMBINE_FIELD.key -> "c1",
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
) ++ metadataOpts
val sourceJSONTablePath = getClass.getClassLoader.getResource("index/colstats/input-table-json").toString
// NOTE: Schema here is provided for validation that the input date is in the appropriate format
val inputDF = spark.read.schema(sourceTableSchema).json(sourceJSONTablePath)
inputDF
.sort("c1")
.repartition(4, new Column("c1"))
.write
.format("hudi")
.options(opts)
.option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key, 10 * 1024)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)
metaClient = HoodieTableMetaClient.reload(metaClient)
val metadataConfig = HoodieMetadataConfig.newBuilder()
.fromProperties(toProperties(metadataOpts))
.build()
////////////////////////////////////////////////////////////////////////
// Case #1: Empty CSI projection
// Projection is requested for columns which are NOT indexed
// by the CSI
////////////////////////////////////////////////////////////////////////
{
// These are NOT indexed
val requestedColumns = Seq("c4")
val emptyColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns)
val emptyTransposedColStatsDF = transposeColumnStatsIndex(spark, emptyColStatsDF, requestedColumns, sourceTableSchema)
assertEquals(0, emptyColStatsDF.collect().length)
assertEquals(0, emptyTransposedColStatsDF.collect().length)
}
////////////////////////////////////////////////////////////////////////
// Case #2: Partial CSI projection
// Projection is requested for set of columns some of which are
// NOT indexed by the CSI
////////////////////////////////////////////////////////////////////////
{
// We have to include "c1", since we sort the expected outputs by this column
val requestedColumns = Seq("c1", "c4")
val partialColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns)
val partialTransposedColStatsDF = transposeColumnStatsIndex(spark, partialColStatsDF, requestedColumns, sourceTableSchema)
val targetIndexedColumns = targetColumnsToIndex.intersect(requestedColumns)
val expectedColStatsSchema = composeIndexSchema(targetIndexedColumns, sourceTableSchema)
// Match against expected column stats table
val expectedColStatsIndexTableDf =
spark.read
.schema(expectedColStatsSchema)
.json(getClass.getClassLoader.getResource("index/colstats/partial-column-stats-index-table.json").toString)
assertEquals(expectedColStatsIndexTableDf.schema, partialTransposedColStatsDF.schema)
// NOTE: We have to drop the `fileName` column as it contains semi-random components
// that we can't control in this test. Nevertheless, since we manually verify composition of the
// ColStats Index by reading Parquet footers from individual Parquet files, this is not an issue
assertEquals(asJson(sort(expectedColStatsIndexTableDf)), asJson(sort(partialTransposedColStatsDF.drop("fileName"))))
// Collect Column Stats manually (reading individual Parquet files)
val manualColStatsTableDF =
buildColumnStatsTableManually(basePath, targetIndexedColumns, expectedColStatsSchema)
assertEquals(asJson(sort(manualColStatsTableDF)), asJson(sort(partialTransposedColStatsDF)))
}
////////////////////////////////////////////////////////////////////////
// Case #3: Aligned CSI projection
// Projection is requested for set of columns some of which are
// indexed only for subset of files
////////////////////////////////////////////////////////////////////////
{
// NOTE: The update we're writing is intentionally omitting some of the columns
// present in an earlier source
val missingCols = Seq("c2", "c3")
val partialSourceTableSchema = StructType(sourceTableSchema.fields.filterNot(f => missingCols.contains(f.name)))
val updateJSONTablePath = getClass.getClassLoader.getResource("index/colstats/partial-another-input-table-json").toString
val updateDF = spark.read
.schema(partialSourceTableSchema)
.json(updateJSONTablePath)
updateDF.repartition(4)
.write
.format("hudi")
.options(opts)
.option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key, 10 * 1024)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(basePath)
metaClient = HoodieTableMetaClient.reload(metaClient)
val requestedColumns = sourceTableSchema.fieldNames
// Nevertheless, the last update was written with a new schema (that is a subset of the original table schema),
// we should be able to read CSI, which will be properly padded (with nulls) after transposition
val updatedColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns)
val transposedUpdatedColStatsDF = transposeColumnStatsIndex(spark, updatedColStatsDF, requestedColumns, sourceTableSchema)
val targetIndexedColumns = targetColumnsToIndex.intersect(requestedColumns)
val expectedColStatsSchema = composeIndexSchema(targetIndexedColumns, sourceTableSchema)
val expectedColStatsIndexUpdatedDF =
spark.read
.schema(expectedColStatsSchema)
.json(getClass.getClassLoader.getResource("index/colstats/updated-partial-column-stats-index-table.json").toString)
assertEquals(expectedColStatsIndexUpdatedDF.schema, transposedUpdatedColStatsDF.schema)
assertEquals(asJson(sort(expectedColStatsIndexUpdatedDF)), asJson(sort(transposedUpdatedColStatsDF.drop("fileName"))))
// Collect Column Stats manually (reading individual Parquet files)
val manualUpdatedColStatsTableDF =
buildColumnStatsTableManually(basePath, targetIndexedColumns, expectedColStatsSchema)
assertEquals(asJson(sort(manualUpdatedColStatsTableDF)), asJson(sort(transposedUpdatedColStatsDF)))
}
}
@Test
def testParquetMetadataRangeExtraction(): Unit = {
val df = generateRandomDataFrame(spark)