From 458fdd5611e4f246cc35a6e5436731eb5d526ab5 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Mon, 11 Apr 2022 12:45:53 -0700 Subject: [PATCH] [HUDI-3841] Fixing Column Stats in the presence of Schema Evolution (#5275) Currently, Data Skipping is not handling correctly the case when column-stats are not aligned and, for ex, some of the (column, file) combinations are missing from the CSI. This could occur in different scenarios (schema evolution, CSI config changes), and has to be handled properly when we're composing CSI projection for Data Skipping. This PR addresses that. - Added appropriate aligning for the transposed CSI projection --- .../apache/hudi/ColumnStatsIndexSupport.scala | 82 ++++++--- ...0484-e7e1-48b6-8289-1a7c483b530b-c000.json | 0 ...0484-e7e1-48b6-8289-1a7c483b530b-c000.json | 0 ...0484-e7e1-48b6-8289-1a7c483b530b-c000.json | 0 ...0484-e7e1-48b6-8289-1a7c483b530b-c000.json | 0 .../column-stats-index-table.json | 0 ...afca-8a37-4ae8-a150-0c2fd3361080-c000.json | 0 ...afca-8a37-4ae8-a150-0c2fd3361080-c000.json | 0 ...afca-8a37-4ae8-a150-0c2fd3361080-c000.json | 0 ...afca-8a37-4ae8-a150-0c2fd3361080-c000.json | 0 ...0484-e7e1-48b6-8289-1a7c483b530b-c000.json | 10 ++ ...0484-e7e1-48b6-8289-1a7c483b530b-c000.json | 10 ++ ...0484-e7e1-48b6-8289-1a7c483b530b-c000.json | 10 ++ ...0484-e7e1-48b6-8289-1a7c483b530b-c000.json | 10 ++ .../partial-column-stats-index-table.json | 4 + .../updated-column-stats-index-table.json | 0 ...ated-partial-column-stats-index-table.json | 8 + .../functional/TestColumnStatsIndex.scala | 168 ++++++++++++++++-- 18 files changed, 266 insertions(+), 36 deletions(-) rename hudi-spark-datasource/hudi-spark/src/test/resources/index/{zorder => colstats}/another-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json (100%) rename hudi-spark-datasource/hudi-spark/src/test/resources/index/{zorder => colstats}/another-input-table-json/part-00001-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json (100%) rename hudi-spark-datasource/hudi-spark/src/test/resources/index/{zorder => colstats}/another-input-table-json/part-00002-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json (100%) rename hudi-spark-datasource/hudi-spark/src/test/resources/index/{zorder => colstats}/another-input-table-json/part-00003-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json (100%) rename hudi-spark-datasource/hudi-spark/src/test/resources/index/{zorder => colstats}/column-stats-index-table.json (100%) rename hudi-spark-datasource/hudi-spark/src/test/resources/index/{zorder => colstats}/input-table-json/part-00000-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json (100%) rename hudi-spark-datasource/hudi-spark/src/test/resources/index/{zorder => colstats}/input-table-json/part-00001-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json (100%) rename hudi-spark-datasource/hudi-spark/src/test/resources/index/{zorder => colstats}/input-table-json/part-00002-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json (100%) rename hudi-spark-datasource/hudi-spark/src/test/resources/index/{zorder => colstats}/input-table-json/part-00003-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json (100%) create mode 100644 hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-another-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json create mode 100644 hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-another-input-table-json/part-00001-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json create mode 100644 hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-another-input-table-json/part-00002-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json create mode 100644 hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-another-input-table-json/part-00003-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json create mode 100644 hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-column-stats-index-table.json rename hudi-spark-datasource/hudi-spark/src/test/resources/index/{zorder => colstats}/updated-column-stats-index-table.json (100%) create mode 100644 hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/updated-partial-column-stats-index-table.json diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala index 4ee5a6005..b1e03f86f 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala @@ -26,6 +26,7 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.view.FileSystemViewStorageConfig +import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.common.util.hash.ColumnIndexID import org.apache.hudi.data.HoodieJavaRDD import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata, HoodieTableMetadataUtil, MetadataPartitionType} @@ -113,11 +114,11 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport { * * @param spark Spark session ref * @param colStatsDF [[DataFrame]] bearing raw Column Stats Index table - * @param targetColumns target columns to be included into the final table + * @param queryColumns target columns to be included into the final table * @param tableSchema schema of the source data table * @return reshaped table according to the format outlined above */ - def transposeColumnStatsIndex(spark: SparkSession, colStatsDF: DataFrame, targetColumns: Seq[String], tableSchema: StructType): DataFrame = { + def transposeColumnStatsIndex(spark: SparkSession, colStatsDF: DataFrame, queryColumns: Seq[String], tableSchema: StructType): DataFrame = { val colStatsSchema = colStatsDF.schema val colStatsSchemaOrdinalsMap = colStatsSchema.fields.zipWithIndex.map({ case (field, ordinal) => (field.name, ordinal) @@ -125,10 +126,6 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport { val tableSchemaFieldMap = tableSchema.fields.map(f => (f.name, f)).toMap - // NOTE: We're sorting the columns to make sure final index schema matches layout - // of the transposed table - val sortedColumns = TreeSet(targetColumns: _*) - val colNameOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME) val minValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE) val maxValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE) @@ -136,36 +133,69 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport { val nullCountOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT) val valueCountOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT) + // NOTE: We have to collect list of indexed columns to make sure we properly align the rows + // w/in the transposed dataset: since some files might not have all of the columns indexed + // either due to the Column Stats Index config changes, schema evolution, etc, we have + // to make sure that all of the rows w/in transposed data-frame are properly padded (with null + // values) for such file-column combinations + val indexedColumns: Seq[String] = colStatsDF.rdd.map(row => row.getString(colNameOrdinal)).distinct().collect() + + // NOTE: We're sorting the columns to make sure final index schema matches layout + // of the transposed table + val sortedTargetColumns = TreeSet(queryColumns.intersect(indexedColumns): _*) + val transposedRDD = colStatsDF.rdd - .filter(row => sortedColumns.contains(row.getString(colNameOrdinal))) + .filter(row => sortedTargetColumns.contains(row.getString(colNameOrdinal))) .map { row => - val (minValue, _) = tryUnpackNonNullVal(row.getAs[Row](minValueOrdinal)) - val (maxValue, _) = tryUnpackNonNullVal(row.getAs[Row](maxValueOrdinal)) + if (row.isNullAt(minValueOrdinal) && row.isNullAt(maxValueOrdinal)) { + // Corresponding row could be null in either of the 2 cases + // - Column contains only null values (in that case both min/max have to be nulls) + // - This is a stubbed Column Stats record (used as a tombstone) + row + } else { + val minValueStruct = row.getAs[Row](minValueOrdinal) + val maxValueStruct = row.getAs[Row](maxValueOrdinal) - val colName = row.getString(colNameOrdinal) - val colType = tableSchemaFieldMap(colName).dataType + checkState(minValueStruct != null && maxValueStruct != null, "Invalid Column Stats record: either both min/max have to be null, or both have to be non-null") - val rowValsSeq = row.toSeq.toArray + val colName = row.getString(colNameOrdinal) + val colType = tableSchemaFieldMap(colName).dataType - rowValsSeq(minValueOrdinal) = deserialize(minValue, colType) - rowValsSeq(maxValueOrdinal) = deserialize(maxValue, colType) + val (minValue, _) = tryUnpackNonNullVal(minValueStruct) + val (maxValue, _) = tryUnpackNonNullVal(maxValueStruct) + val rowValsSeq = row.toSeq.toArray + // Update min-/max-value structs w/ unwrapped values in-place + rowValsSeq(minValueOrdinal) = deserialize(minValue, colType) + rowValsSeq(maxValueOrdinal) = deserialize(maxValue, colType) - Row(rowValsSeq:_*) + Row(rowValsSeq: _*) + } } .groupBy(r => r.getString(fileNameOrdinal)) .foldByKey(Seq[Row]()) { - case (_, columnRows) => + case (_, columnRowsSeq) => // Rows seq is always non-empty (otherwise it won't be grouped into) - val fileName = columnRows.head.get(fileNameOrdinal) - val valueCount = columnRows.head.get(valueCountOrdinal) + val fileName = columnRowsSeq.head.get(fileNameOrdinal) + val valueCount = columnRowsSeq.head.get(valueCountOrdinal) - val coalescedRowValuesSeq = columnRows.toSeq - // NOTE: It's crucial to maintain appropriate ordering of the columns - // matching table layout - .sortBy(_.getString(colNameOrdinal)) - .foldLeft(Seq[Any](fileName, valueCount)) { - case (acc, columnRow) => - acc ++ Seq(minValueOrdinal, maxValueOrdinal, nullCountOrdinal).map(ord => columnRow.get(ord)) + // To properly align individual rows (corresponding to a file) w/in the transposed projection, we need + // to align existing column-stats for individual file with the list of expected ones for the + // whole transposed projection (a superset of all files) + val columnRowsMap = columnRowsSeq.map(row => (row.getString(colNameOrdinal), row)).toMap + val alignedColumnRowsSeq = sortedTargetColumns.toSeq.map(columnRowsMap.get) + + val coalescedRowValuesSeq = + alignedColumnRowsSeq.foldLeft(Seq[Any](fileName, valueCount)) { + case (acc, opt) => + opt match { + case Some(columnStatsRow) => + acc ++ Seq(minValueOrdinal, maxValueOrdinal, nullCountOrdinal).map(ord => columnStatsRow.get(ord)) + case None => + // NOTE: Since we're assuming missing column to essentially contain exclusively + // null values, we set null-count to be equal to value-count (this behavior is + // consistent with reading non-existent columns from Parquet) + acc ++ Seq(null, null, valueCount) + } } Seq(Row(coalescedRowValuesSeq:_*)) @@ -176,7 +206,7 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport { // NOTE: It's crucial to maintain appropriate ordering of the columns // matching table layout: hence, we cherry-pick individual columns // instead of simply filtering in the ones we're interested in the schema - val indexSchema = composeIndexSchema(sortedColumns.toSeq, tableSchema) + val indexSchema = composeIndexSchema(sortedTargetColumns.toSeq, tableSchema) spark.createDataFrame(transposedRDD, indexSchema) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/another-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/another-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/another-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json rename to hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/another-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/another-input-table-json/part-00001-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/another-input-table-json/part-00001-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/another-input-table-json/part-00001-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json rename to hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/another-input-table-json/part-00001-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/another-input-table-json/part-00002-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/another-input-table-json/part-00002-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/another-input-table-json/part-00002-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json rename to hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/another-input-table-json/part-00002-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/another-input-table-json/part-00003-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/another-input-table-json/part-00003-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/another-input-table-json/part-00003-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json rename to hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/another-input-table-json/part-00003-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/column-stats-index-table.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/column-stats-index-table.json similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/column-stats-index-table.json rename to hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/column-stats-index-table.json diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/input-table-json/part-00000-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/input-table-json/part-00000-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/input-table-json/part-00000-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json rename to hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/input-table-json/part-00000-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/input-table-json/part-00001-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/input-table-json/part-00001-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/input-table-json/part-00001-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json rename to hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/input-table-json/part-00001-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/input-table-json/part-00002-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/input-table-json/part-00002-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/input-table-json/part-00002-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json rename to hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/input-table-json/part-00002-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/input-table-json/part-00003-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/input-table-json/part-00003-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/input-table-json/part-00003-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json rename to hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/input-table-json/part-00003-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-another-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-another-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json new file mode 100644 index 000000000..9c0daac40 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-another-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json @@ -0,0 +1,10 @@ +{"c1":770,"c4":"2021-11-18T23:34:44.201-08:00","c5":78,"c6":"2020-01-15","c7":"Ag==","c8":9} +{"c1":768,"c4":"2021-11-18T23:34:44.201-08:00","c5":78,"c6":"2020-10-13","c7":"AA==","c8":9} +{"c1":431,"c4":"2021-11-18T23:34:44.186-08:00","c5":44,"c6":"2020-03-12","c7":"rw==","c8":9} +{"c1":427,"c4":"2021-11-18T23:34:44.186-08:00","c5":44,"c6":"2020-10-08","c7":"qw==","c8":9} +{"c1":328,"c4":"2021-11-18T23:34:44.181-08:00","c5":34,"c6":"2020-10-21","c7":"SA==","c8":9} +{"c1":320,"c4":"2021-11-18T23:34:44.180-08:00","c5":33,"c6":"2020-02-13","c7":"QA==","c8":9} +{"c1":317,"c4":"2021-11-18T23:34:44.180-08:00","c5":33,"c6":"2020-10-10","c7":"PQ==","c8":9} +{"c1":308,"c4":"2021-11-18T23:34:44.180-08:00","c5":32,"c6":"2020-01-01","c7":"NA==","c8":9} +{"c1":304,"c4":"2021-11-18T23:34:44.179-08:00","c5":32,"c6":"2020-08-25","c7":"MA==","c8":9} +{"c1":300,"c4":"2021-11-18T23:34:44.179-08:00","c5":31,"c6":"2020-04-21","c7":"LA==","c8":9} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-another-input-table-json/part-00001-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-another-input-table-json/part-00001-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json new file mode 100644 index 000000000..d19386382 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-another-input-table-json/part-00001-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json @@ -0,0 +1,10 @@ +{"c1":719,"c4":"2021-11-18T23:34:44.199-08:00","c5":73,"c6":"2020-05-20","c7":"zw==","c8":9} +{"c1":715,"c4":"2021-11-18T23:34:44.199-08:00","c5":73,"c6":"2020-01-16","c7":"yw==","c8":9} +{"c1":579,"c4":"2021-11-18T23:34:44.193-08:00","c5":59,"c6":"2020-08-20","c7":"Qw==","c8":9} +{"c1":568,"c4":"2021-11-18T23:34:44.193-08:00","c5":58,"c6":"2020-08-09","c7":"OA==","c8":9} +{"c1":367,"c4":"2021-11-18T23:34:44.183-08:00","c5":38,"c6":"2020-05-04","c7":"bw==","c8":9} +{"c1":364,"c4":"2021-11-18T23:34:44.183-08:00","c5":38,"c6":"2020-02-01","c7":"bA==","c8":9} +{"c1":250,"c4":"2021-11-18T23:34:44.176-08:00","c5":26,"c6":"2020-09-27","c7":"+g==","c8":9} +{"c1":249,"c4":"2021-11-18T23:34:44.176-08:00","c5":26,"c6":"2020-08-26","c7":"+Q==","c8":9} +{"c1":246,"c4":"2021-11-18T23:34:44.176-08:00","c5":26,"c6":"2020-05-23","c7":"9g==","c8":9} +{"c1":125,"c4":"2021-11-18T23:34:44.169-08:00","c5":14,"c6":"2020-05-14","c7":"fQ==","c8":9} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-another-input-table-json/part-00002-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-another-input-table-json/part-00002-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json new file mode 100644 index 000000000..602dbe87b --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-another-input-table-json/part-00002-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json @@ -0,0 +1,10 @@ +{"c1":486,"c4":"2021-11-18T23:34:44.189-08:00","c5":50,"c6":"2020-03-11","c7":"5g==","c8":9} +{"c1":483,"c4":"2021-11-18T23:34:44.189-08:00","c5":49,"c6":"2020-11-08","c7":"4w==","c8":9} +{"c1":224,"c4":"2021-11-18T23:34:44.175-08:00","c5":24,"c6":"2020-05-01","c7":"4A==","c8":9} +{"c1":118,"c4":"2021-11-18T23:34:44.168-08:00","c5":13,"c6":"2020-09-07","c7":"dg==","c8":9} +{"c1":111,"c4":"2021-11-18T23:34:44.168-08:00","c5":12,"c6":"2020-02-28","c7":"bw==","c8":9} +{"c1":79,"c4":"2021-11-18T23:34:44.166-08:00","c5":9,"c6":"2020-03-24","c7":"Tw==","c8":9} +{"c1":77,"c4":"2021-11-18T23:34:44.166-08:00","c5":9,"c6":"2020-01-22","c7":"TQ==","c8":9} +{"c1":76,"c4":"2021-11-18T23:34:44.166-08:00","c5":9,"c6":"2020-11-21","c7":"TA==","c8":9} +{"c1":60,"c4":"2021-11-18T23:34:44.164-08:00","c5":7,"c6":"2020-06-05","c7":"PA==","c8":9} +{"c1":59,"c4":"2021-11-18T23:34:44.164-08:00","c5":7,"c6":"2020-05-04","c7":"Ow==","c8":9} diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-another-input-table-json/part-00003-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-another-input-table-json/part-00003-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json new file mode 100644 index 000000000..6232e862f --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-another-input-table-json/part-00003-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json @@ -0,0 +1,10 @@ +{"c1":272,"c4":"2021-11-18T23:34:44.178-08:00","c5":28,"c6":"2020-09-21","c7":"EA==","c8":9} +{"c1":258,"c4":"2021-11-18T23:34:44.177-08:00","c5":27,"c6":"2020-06-07","c7":"Ag==","c8":9} +{"c1":240,"c4":"2021-11-18T23:34:44.176-08:00","c5":25,"c6":"2020-10-17","c7":"8A==","c8":9} +{"c1":236,"c4":"2021-11-18T23:34:44.176-08:00","c5":25,"c6":"2020-06-13","c7":"7A==","c8":9} +{"c1":137,"c4":"2021-11-18T23:34:44.170-08:00","c5":15,"c6":"2020-06-26","c7":"iQ==","c8":9} +{"c1":134,"c4":"2021-11-18T23:34:44.170-08:00","c5":15,"c6":"2020-03-23","c7":"hg==","c8":9} +{"c1":131,"c4":"2021-11-18T23:34:44.169-08:00","c5":14,"c6":"2020-11-20","c7":"gw==","c8":9} +{"c1":129,"c4":"2021-11-18T23:34:44.169-08:00","c5":14,"c6":"2020-09-18","c7":"gQ==","c8":9} +{"c1":24,"c4":"2021-11-18T23:34:44.161-08:00","c5":4,"c6":"2020-03-25","c7":"GA==","c8":9} +{"c1":8,"c4":"2021-11-18T23:34:44.159-08:00","c5":2,"c6":"2020-09-09","c7":"CA==","c8":9} diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-column-stats-index-table.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-column-stats-index-table.json new file mode 100644 index 000000000..8405cdf91 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-column-stats-index-table.json @@ -0,0 +1,4 @@ +{"c1_maxValue":769,"c1_minValue":309,"c1_nullCount":0,"valueCount":9} +{"c1_maxValue":932,"c1_minValue":0,"c1_nullCount":0,"valueCount":8} +{"c1_maxValue":943,"c1_minValue":89,"c1_nullCount":0,"valueCount":10} +{"c1_maxValue":959,"c1_minValue":74,"c1_nullCount":0,"valueCount":13} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/updated-column-stats-index-table.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/updated-column-stats-index-table.json similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/updated-column-stats-index-table.json rename to hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/updated-column-stats-index-table.json diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/updated-partial-column-stats-index-table.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/updated-partial-column-stats-index-table.json new file mode 100644 index 000000000..8552fd359 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/updated-partial-column-stats-index-table.json @@ -0,0 +1,8 @@ +{"c1_maxValue":568,"c1_minValue":8,"c1_nullCount":0,"c2_nullCount":15,"c3_nullCount":15,"valueCount":15} +{"c1_maxValue":715,"c1_minValue":76,"c1_nullCount":0,"c2_nullCount":12,"c3_nullCount":12,"valueCount":12} +{"c1_maxValue":768,"c1_minValue":59,"c1_nullCount":0,"c2_nullCount":7,"c3_nullCount":7,"valueCount":7} +{"c1_maxValue":769,"c1_minValue":309,"c1_nullCount":0,"c2_maxValue":" 769sdc","c2_minValue":" 309sdc","c2_nullCount":0,"c3_maxValue":919.769,"c3_minValue":76.430,"c3_nullCount":0,"valueCount":9} +{"c1_maxValue":770,"c1_minValue":129,"c1_nullCount":0,"c2_nullCount":6,"c3_nullCount":6,"valueCount":6} +{"c1_maxValue":932,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 932sdc","c2_minValue":" 0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"valueCount":8} +{"c1_maxValue":943,"c1_minValue":89,"c1_nullCount":0,"c2_maxValue":" 943sdc","c2_minValue":" 200sdc","c2_nullCount":0,"c3_maxValue":854.690,"c3_minValue":100.556,"c3_nullCount":0,"valueCount":10} +{"c1_maxValue":959,"c1_minValue":74,"c1_nullCount":0,"c2_maxValue":" 959sdc","c2_minValue":" 181sdc","c2_nullCount":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_nullCount":0,"valueCount":13} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala index 841041e40..75d3ce0b7 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala @@ -63,6 +63,10 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup initPath() initSparkContexts() initFileSystem() + + setTableName("hoodie_test") + initMetaClient() + spark = sqlContext.sparkSession } @@ -93,10 +97,7 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup HoodieTableConfig.POPULATE_META_FIELDS.key -> "true" ) ++ metadataOpts - setTableName("hoodie_test") - initMetaClient() - - val sourceJSONTablePath = getClass.getClassLoader.getResource("index/zorder/input-table-json").toString + val sourceJSONTablePath = getClass.getClassLoader.getResource("index/colstats/input-table-json").toString // NOTE: Schema here is provided for validation that the input date is in the appropriate format val inputDF = spark.read.schema(sourceTableSchema).json(sourceJSONTablePath) @@ -118,14 +119,14 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup .fromProperties(toProperties(metadataOpts)) .build() - val targetColumnsToRead: Seq[String] = { + val requestedColumns: Seq[String] = { // Providing empty seq of columns to [[readColumnStatsIndex]] will lead to the whole // MT to be read, and subsequently filtered if (testCase.readFullMetadataTable) Seq.empty else sourceTableSchema.fieldNames } - val colStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, targetColumnsToRead) + val colStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns) val transposedColStatsDF = transposeColumnStatsIndex(spark, colStatsDF, sourceTableSchema.fieldNames, sourceTableSchema) val expectedColStatsSchema = composeIndexSchema(sourceTableSchema.fieldNames, sourceTableSchema) @@ -134,7 +135,7 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup val expectedColStatsIndexTableDf = spark.read .schema(expectedColStatsSchema) - .json(getClass.getClassLoader.getResource("index/zorder/column-stats-index-table.json").toString) + .json(getClass.getClassLoader.getResource("index/colstats/column-stats-index-table.json").toString) assertEquals(expectedColStatsIndexTableDf.schema, transposedColStatsDF.schema) // NOTE: We have to drop the `fileName` column as it contains semi-random components @@ -149,7 +150,7 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup assertEquals(asJson(sort(manualColStatsTableDF)), asJson(sort(transposedColStatsDF))) // do an upsert and validate - val updateJSONTablePath = getClass.getClassLoader.getResource("index/zorder/another-input-table-json").toString + val updateJSONTablePath = getClass.getClassLoader.getResource("index/colstats/another-input-table-json").toString val updateDF = spark.read .schema(sourceTableSchema) .json(updateJSONTablePath) @@ -165,13 +166,13 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup metaClient = HoodieTableMetaClient.reload(metaClient) - val updatedColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, targetColumnsToRead) + val updatedColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns) val transposedUpdatedColStatsDF = transposeColumnStatsIndex(spark, updatedColStatsDF, sourceTableSchema.fieldNames, sourceTableSchema) val expectedColStatsIndexUpdatedDF = spark.read .schema(expectedColStatsSchema) - .json(getClass.getClassLoader.getResource("index/zorder/updated-column-stats-index-table.json").toString) + .json(getClass.getClassLoader.getResource("index/colstats/updated-column-stats-index-table.json").toString) assertEquals(expectedColStatsIndexUpdatedDF.schema, transposedUpdatedColStatsDF.schema) assertEquals(asJson(sort(expectedColStatsIndexUpdatedDF)), asJson(sort(transposedUpdatedColStatsDF.drop("fileName")))) @@ -183,6 +184,153 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup assertEquals(asJson(sort(manualUpdatedColStatsTableDF)), asJson(sort(transposedUpdatedColStatsDF))) } + @Test + def testMetadataColumnStatsIndexPartialProjection(): Unit = { + val targetColumnsToIndex = Seq("c1", "c2", "c3") + + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true", + HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key -> targetColumnsToIndex.mkString(",") + ) + + val opts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true" + ) ++ metadataOpts + + val sourceJSONTablePath = getClass.getClassLoader.getResource("index/colstats/input-table-json").toString + + // NOTE: Schema here is provided for validation that the input date is in the appropriate format + val inputDF = spark.read.schema(sourceTableSchema).json(sourceJSONTablePath) + + inputDF + .sort("c1") + .repartition(4, new Column("c1")) + .write + .format("hudi") + .options(opts) + .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key, 10 * 1024) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(basePath) + + metaClient = HoodieTableMetaClient.reload(metaClient) + + val metadataConfig = HoodieMetadataConfig.newBuilder() + .fromProperties(toProperties(metadataOpts)) + .build() + + //////////////////////////////////////////////////////////////////////// + // Case #1: Empty CSI projection + // Projection is requested for columns which are NOT indexed + // by the CSI + //////////////////////////////////////////////////////////////////////// + + { + // These are NOT indexed + val requestedColumns = Seq("c4") + + val emptyColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns) + val emptyTransposedColStatsDF = transposeColumnStatsIndex(spark, emptyColStatsDF, requestedColumns, sourceTableSchema) + + assertEquals(0, emptyColStatsDF.collect().length) + assertEquals(0, emptyTransposedColStatsDF.collect().length) + } + + //////////////////////////////////////////////////////////////////////// + // Case #2: Partial CSI projection + // Projection is requested for set of columns some of which are + // NOT indexed by the CSI + //////////////////////////////////////////////////////////////////////// + + { + // We have to include "c1", since we sort the expected outputs by this column + val requestedColumns = Seq("c1", "c4") + + val partialColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns) + val partialTransposedColStatsDF = transposeColumnStatsIndex(spark, partialColStatsDF, requestedColumns, sourceTableSchema) + + val targetIndexedColumns = targetColumnsToIndex.intersect(requestedColumns) + val expectedColStatsSchema = composeIndexSchema(targetIndexedColumns, sourceTableSchema) + + // Match against expected column stats table + val expectedColStatsIndexTableDf = + spark.read + .schema(expectedColStatsSchema) + .json(getClass.getClassLoader.getResource("index/colstats/partial-column-stats-index-table.json").toString) + + assertEquals(expectedColStatsIndexTableDf.schema, partialTransposedColStatsDF.schema) + // NOTE: We have to drop the `fileName` column as it contains semi-random components + // that we can't control in this test. Nevertheless, since we manually verify composition of the + // ColStats Index by reading Parquet footers from individual Parquet files, this is not an issue + assertEquals(asJson(sort(expectedColStatsIndexTableDf)), asJson(sort(partialTransposedColStatsDF.drop("fileName")))) + + // Collect Column Stats manually (reading individual Parquet files) + val manualColStatsTableDF = + buildColumnStatsTableManually(basePath, targetIndexedColumns, expectedColStatsSchema) + + assertEquals(asJson(sort(manualColStatsTableDF)), asJson(sort(partialTransposedColStatsDF))) + } + + //////////////////////////////////////////////////////////////////////// + // Case #3: Aligned CSI projection + // Projection is requested for set of columns some of which are + // indexed only for subset of files + //////////////////////////////////////////////////////////////////////// + + { + // NOTE: The update we're writing is intentionally omitting some of the columns + // present in an earlier source + val missingCols = Seq("c2", "c3") + val partialSourceTableSchema = StructType(sourceTableSchema.fields.filterNot(f => missingCols.contains(f.name))) + + val updateJSONTablePath = getClass.getClassLoader.getResource("index/colstats/partial-another-input-table-json").toString + val updateDF = spark.read + .schema(partialSourceTableSchema) + .json(updateJSONTablePath) + + updateDF.repartition(4) + .write + .format("hudi") + .options(opts) + .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key, 10 * 1024) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + + metaClient = HoodieTableMetaClient.reload(metaClient) + + val requestedColumns = sourceTableSchema.fieldNames + + // Nevertheless, the last update was written with a new schema (that is a subset of the original table schema), + // we should be able to read CSI, which will be properly padded (with nulls) after transposition + val updatedColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns) + val transposedUpdatedColStatsDF = transposeColumnStatsIndex(spark, updatedColStatsDF, requestedColumns, sourceTableSchema) + + val targetIndexedColumns = targetColumnsToIndex.intersect(requestedColumns) + val expectedColStatsSchema = composeIndexSchema(targetIndexedColumns, sourceTableSchema) + + val expectedColStatsIndexUpdatedDF = + spark.read + .schema(expectedColStatsSchema) + .json(getClass.getClassLoader.getResource("index/colstats/updated-partial-column-stats-index-table.json").toString) + + assertEquals(expectedColStatsIndexUpdatedDF.schema, transposedUpdatedColStatsDF.schema) + assertEquals(asJson(sort(expectedColStatsIndexUpdatedDF)), asJson(sort(transposedUpdatedColStatsDF.drop("fileName")))) + + // Collect Column Stats manually (reading individual Parquet files) + val manualUpdatedColStatsTableDF = + buildColumnStatsTableManually(basePath, targetIndexedColumns, expectedColStatsSchema) + + assertEquals(asJson(sort(manualUpdatedColStatsTableDF)), asJson(sort(transposedUpdatedColStatsDF))) + } + } + @Test def testParquetMetadataRangeExtraction(): Unit = { val df = generateRandomDataFrame(spark)