[HUDI-3739] Fix handling of the isNotNull predicate in Data Skipping (#5224)
- Fix handling of the isNotNull predicate in Data Skipping
This commit is contained in:
@@ -54,7 +54,8 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport {
|
||||
HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME,
|
||||
HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE,
|
||||
HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE,
|
||||
HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
|
||||
HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT,
|
||||
HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT)
|
||||
|
||||
val requiredMetadataIndexColumns =
|
||||
(targetColStatsIndexColumns :+ HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).map(colName =>
|
||||
@@ -98,7 +99,7 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport {
|
||||
*
|
||||
* <pre>
|
||||
* +---------------------------+------------+------------+-------------+
|
||||
* | file | A_minValue | A_maxValue | A_num_nulls |
|
||||
* | file | A_minValue | A_maxValue | A_nullCount |
|
||||
* +---------------------------+------------+------------+-------------+
|
||||
* | one_base_file.parquet | 1 | 10 | 0 |
|
||||
* | another_base_file.parquet | -10 | 0 | 5 |
|
||||
@@ -133,6 +134,7 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport {
|
||||
val maxValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE)
|
||||
val fileNameOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
|
||||
val nullCountOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
|
||||
val valueCountOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT)
|
||||
|
||||
val transposedRDD = colStatsDF.rdd
|
||||
.filter(row => sortedColumns.contains(row.getString(colNameOrdinal)))
|
||||
@@ -155,11 +157,13 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport {
|
||||
case (_, columnRows) =>
|
||||
// Rows seq is always non-empty (otherwise it won't be grouped into)
|
||||
val fileName = columnRows.head.get(fileNameOrdinal)
|
||||
val valueCount = columnRows.head.get(valueCountOrdinal)
|
||||
|
||||
val coalescedRowValuesSeq = columnRows.toSeq
|
||||
// NOTE: It's crucial to maintain appropriate ordering of the columns
|
||||
// matching table layout
|
||||
.sortBy(_.getString(colNameOrdinal))
|
||||
.foldLeft(Seq[Any](fileName)) {
|
||||
.foldLeft(Seq[Any](fileName, valueCount)) {
|
||||
case (acc, columnRow) =>
|
||||
acc ++ Seq(minValueOrdinal, maxValueOrdinal, nullCountOrdinal).map(ord => columnRow.get(ord))
|
||||
}
|
||||
@@ -223,11 +227,6 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport {
|
||||
|
||||
object ColumnStatsIndexSupport {
|
||||
|
||||
private val COLUMN_STATS_INDEX_FILE_COLUMN_NAME = "fileName"
|
||||
private val COLUMN_STATS_INDEX_MIN_VALUE_STAT_NAME = "minValue"
|
||||
private val COLUMN_STATS_INDEX_MAX_VALUE_STAT_NAME = "maxValue"
|
||||
private val COLUMN_STATS_INDEX_NUM_NULLS_STAT_NAME = "num_nulls"
|
||||
|
||||
private val metadataRecordSchemaString: String = HoodieMetadataRecord.SCHEMA$.toString
|
||||
private val metadataRecordStructType: StructType = AvroConversionUtils.convertAvroSchemaToStructType(HoodieMetadataRecord.SCHEMA$)
|
||||
|
||||
@@ -235,28 +234,33 @@ object ColumnStatsIndexSupport {
|
||||
* @VisibleForTesting
|
||||
*/
|
||||
def composeIndexSchema(targetColumnNames: Seq[String], tableSchema: StructType): StructType = {
|
||||
val fileNameField = StructField(COLUMN_STATS_INDEX_FILE_COLUMN_NAME, StringType, nullable = true, Metadata.empty)
|
||||
val fileNameField = StructField(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME, StringType, nullable = true, Metadata.empty)
|
||||
val valueCountField = StructField(HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT, LongType, nullable = true, Metadata.empty)
|
||||
|
||||
val targetFields = targetColumnNames.map(colName => tableSchema.fields.find(f => f.name == colName).get)
|
||||
|
||||
StructType(
|
||||
targetFields.foldLeft(Seq(fileNameField)) {
|
||||
targetFields.foldLeft(Seq(fileNameField, valueCountField)) {
|
||||
case (acc, field) =>
|
||||
acc ++ Seq(
|
||||
composeColumnStatStructType(field.name, COLUMN_STATS_INDEX_MIN_VALUE_STAT_NAME, field.dataType),
|
||||
composeColumnStatStructType(field.name, COLUMN_STATS_INDEX_MAX_VALUE_STAT_NAME, field.dataType),
|
||||
composeColumnStatStructType(field.name, COLUMN_STATS_INDEX_NUM_NULLS_STAT_NAME, LongType))
|
||||
composeColumnStatStructType(field.name, HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE, field.dataType),
|
||||
composeColumnStatStructType(field.name, HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE, field.dataType),
|
||||
composeColumnStatStructType(field.name, HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT, LongType))
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
@inline def getMinColumnNameFor(colName: String): String =
|
||||
formatColName(colName, COLUMN_STATS_INDEX_MIN_VALUE_STAT_NAME)
|
||||
formatColName(colName, HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE)
|
||||
|
||||
@inline def getMaxColumnNameFor(colName: String): String =
|
||||
formatColName(colName, COLUMN_STATS_INDEX_MAX_VALUE_STAT_NAME)
|
||||
formatColName(colName, HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE)
|
||||
|
||||
@inline def getNumNullsColumnNameFor(colName: String): String =
|
||||
formatColName(colName, COLUMN_STATS_INDEX_NUM_NULLS_STAT_NAME)
|
||||
@inline def getNullCountColumnNameFor(colName: String): String =
|
||||
formatColName(colName, HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
|
||||
|
||||
@inline def getValueCountColumnNameFor: String =
|
||||
HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT
|
||||
|
||||
@inline private def formatColName(col: String, statName: String) = { // TODO add escaping for
|
||||
String.format("%s_%s", col, statName)
|
||||
|
||||
@@ -26,12 +26,13 @@ import org.apache.hudi.common.util.StringUtils
|
||||
import org.apache.hudi.exception.HoodieException
|
||||
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
|
||||
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
|
||||
import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata, HoodieTableMetadataUtil, MetadataPartitionType}
|
||||
import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadataUtil}
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Literal}
|
||||
import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory}
|
||||
import org.apache.spark.sql.hudi.{DataSkippingUtils, HoodieSqlCommonUtils}
|
||||
import org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr
|
||||
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.sql.types._
|
||||
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
|
||||
@@ -211,7 +212,7 @@ case class HoodieFileIndex(spark: SparkSession,
|
||||
withPersistence(transposedColStatsDF) {
|
||||
val indexSchema = transposedColStatsDF.schema
|
||||
val indexFilter =
|
||||
queryFilters.map(DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr(_, indexSchema))
|
||||
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexSchema))
|
||||
.reduce(And)
|
||||
|
||||
val allIndexedFileNames =
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
|
||||
package org.apache.spark.sql.hudi
|
||||
|
||||
import org.apache.hudi.ColumnStatsIndexSupport.{getMaxColumnNameFor, getMinColumnNameFor, getNumNullsColumnNameFor}
|
||||
import org.apache.hudi.ColumnStatsIndexSupport.{getMaxColumnNameFor, getMinColumnNameFor, getNullCountColumnNameFor, getValueCountColumnNameFor}
|
||||
import org.apache.hudi.SparkAdapterSupport
|
||||
import org.apache.hudi.common.util.ValidationUtils.checkState
|
||||
import org.apache.spark.internal.Logging
|
||||
@@ -135,7 +135,7 @@ object DataSkippingUtils extends Logging {
|
||||
}
|
||||
|
||||
// Filter "colA = null"
|
||||
// Translates to "colA_num_nulls = null" for index lookup
|
||||
// Translates to "colA_nullCount = null" for index lookup
|
||||
case EqualNullSafe(attrRef: AttributeReference, litNull @ Literal(null, _)) =>
|
||||
getTargetIndexedColumnName(attrRef, indexSchema)
|
||||
.map(colName => EqualTo(genColNumNullsExpr(colName), litNull))
|
||||
@@ -205,16 +205,16 @@ object DataSkippingUtils extends Logging {
|
||||
}
|
||||
|
||||
// Filter "colA is null"
|
||||
// Translates to "colA_num_nulls > 0" for index lookup
|
||||
// Translates to "colA_nullCount > 0" for index lookup
|
||||
case IsNull(attribute: AttributeReference) =>
|
||||
getTargetIndexedColumnName(attribute, indexSchema)
|
||||
.map(colName => GreaterThan(genColNumNullsExpr(colName), Literal(0)))
|
||||
|
||||
// Filter "colA is not null"
|
||||
// Translates to "colA_num_nulls = 0" for index lookup
|
||||
// Translates to "colA_nullCount < colA_valueCount" for index lookup
|
||||
case IsNotNull(attribute: AttributeReference) =>
|
||||
getTargetIndexedColumnName(attribute, indexSchema)
|
||||
.map(colName => EqualTo(genColNumNullsExpr(colName), Literal(0)))
|
||||
.map(colName => LessThan(genColNumNullsExpr(colName), genColValueCountExpr))
|
||||
|
||||
// Filter "expr(colA) in (B1, B2, ...)"
|
||||
// Translates to "(colA_minValue <= B1 AND colA_maxValue >= B1) OR (colA_minValue <= B2 AND colA_maxValue >= B2) ... "
|
||||
@@ -294,7 +294,7 @@ object DataSkippingUtils extends Logging {
|
||||
Set.apply(
|
||||
getMinColumnNameFor(colName),
|
||||
getMaxColumnNameFor(colName),
|
||||
getNumNullsColumnNameFor(colName)
|
||||
getNullCountColumnNameFor(colName)
|
||||
)
|
||||
.forall(stat => indexSchema.exists(_.name == stat))
|
||||
}
|
||||
@@ -325,19 +325,14 @@ object DataSkippingUtils extends Logging {
|
||||
|
||||
private object ColumnStatsExpressionUtils {
|
||||
|
||||
def genColMinValueExpr(colName: String): Expression =
|
||||
col(getMinColumnNameFor(colName)).expr
|
||||
def genColMaxValueExpr(colName: String): Expression =
|
||||
col(getMaxColumnNameFor(colName)).expr
|
||||
def genColNumNullsExpr(colName: String): Expression =
|
||||
col(getNumNullsColumnNameFor(colName)).expr
|
||||
@inline def genColMinValueExpr(colName: String): Expression = col(getMinColumnNameFor(colName)).expr
|
||||
@inline def genColMaxValueExpr(colName: String): Expression = col(getMaxColumnNameFor(colName)).expr
|
||||
@inline def genColNumNullsExpr(colName: String): Expression = col(getNullCountColumnNameFor(colName)).expr
|
||||
@inline def genColValueCountExpr: Expression = col(getValueCountColumnNameFor).expr
|
||||
|
||||
def genColumnValuesEqualToExpression(colName: String,
|
||||
@inline def genColumnValuesEqualToExpression(colName: String,
|
||||
value: Expression,
|
||||
targetExprBuilder: Function[Expression, Expression] = Predef.identity): Expression = {
|
||||
// TODO clean up
|
||||
checkState(isValueExpression(value))
|
||||
|
||||
val minValueExpr = targetExprBuilder.apply(genColMinValueExpr(colName))
|
||||
val maxValueExpr = targetExprBuilder.apply(genColMaxValueExpr(colName))
|
||||
// Only case when column C contains value V is when min(C) <= V <= max(c)
|
||||
@@ -347,9 +342,6 @@ private object ColumnStatsExpressionUtils {
|
||||
def genColumnOnlyValuesEqualToExpression(colName: String,
|
||||
value: Expression,
|
||||
targetExprBuilder: Function[Expression, Expression] = Predef.identity): Expression = {
|
||||
// TODO clean up
|
||||
checkState(isValueExpression(value))
|
||||
|
||||
val minValueExpr = targetExprBuilder.apply(genColMinValueExpr(colName))
|
||||
val maxValueExpr = targetExprBuilder.apply(genColMaxValueExpr(colName))
|
||||
// Only case when column C contains _only_ value V is when min(C) = V AND max(c) = V
|
||||
|
||||
Reference in New Issue
Block a user