1
0

[HUDI-3594] Supporting Composite Expressions over Data Table Columns in Data Skipping flow (#4996)

This commit is contained in:
Alexey Kudinkin
2022-03-24 22:27:15 -07:00
committed by GitHub
parent 8896864d7b
commit 8b38ddedc2
18 changed files with 1079 additions and 302 deletions

View File

@@ -20,7 +20,7 @@ package org.apache.hudi
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path, PathFilter}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hbase.io.hfile.CacheConfig
import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieBaseRelation.{getPartitionPath, isMetadataTable}
@@ -32,7 +32,6 @@ import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.hudi.io.storage.HoodieHFileReader
import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata}
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
@@ -41,7 +40,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionDirectory, PartitionedFile}
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
import org.apache.spark.sql.types.StructType

View File

@@ -32,11 +32,10 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Literal}
import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.hudi.{DataSkippingUtils, HoodieSqlCommonUtils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{AnalysisException, Column, SparkSession}
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.unsafe.types.UTF8String
import java.text.SimpleDateFormat
@@ -244,21 +243,21 @@ case class HoodieFileIndex(spark: SparkSession,
// column references from the filtering expressions, and only transpose records corresponding to the
// columns referenced in those
val transposedColStatsDF =
queryReferencedColumns.map(colName =>
colStatsDF.filter(col(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).equalTo(colName))
.select(targetColStatsIndexColumns.map(col): _*)
.withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT, getNumNullsColumnNameFor(colName))
.withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE, getMinColumnNameFor(colName))
.withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE, getMaxColumnNameFor(colName))
)
.reduceLeft((left, right) =>
left.join(right, usingColumn = HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME))
queryReferencedColumns.map(colName =>
colStatsDF.filter(col(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).equalTo(colName))
.select(targetColStatsIndexColumns.map(col): _*)
.withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT, getNumNullsColumnNameFor(colName))
.withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE, getMinColumnNameFor(colName))
.withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE, getMaxColumnNameFor(colName))
)
.reduceLeft((left, right) =>
left.join(right, usingColumn = HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME))
// Persist DF to avoid re-computing column statistics unraveling
withPersistence(transposedColStatsDF) {
val indexSchema = transposedColStatsDF.schema
val indexFilter =
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexSchema))
queryFilters.map(DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr(_, indexSchema))
.reduce(And)
val allIndexedFileNames =

View File

@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
import org.apache.spark.sql.types.{DataType, NumericType, StringType}
// TODO unify w/ DataTypeUtils
object HoodieSparkTypeUtils {
/**
* Checks whether casting expression of [[from]] [[DataType]] to [[to]] [[DataType]] will
* preserve ordering of the elements
*/
def isCastPreservingOrdering(from: DataType, to: DataType): Boolean =
(from, to) match {
// NOTE: In the casting rules defined by Spark, only casting from String to Numeric
// (and vice versa) are the only casts that might break the ordering of the elements after casting
case (StringType, _: NumericType) => false
case (_: NumericType, StringType) => false
case _ => true
}
}

View File

@@ -1,88 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation}
import org.apache.spark.sql.types.StructType
object HoodieCatalystExpressionUtils {
/**
* Resolve filter expression from string expr with given table schema, for example:
* <pre>
* ts > 1000 and ts <= 1500
* </pre>
* will be resolved as
* <pre>
* And(GreaterThan(ts#590L > 1000), LessThanOrEqual(ts#590L <= 1500))
* </pre>
*
* @param spark The spark session
* @param exprString String to be resolved
* @param tableSchema The table schema
* @return Resolved filter expression
*/
def resolveFilterExpr(spark: SparkSession, exprString: String, tableSchema: StructType): Expression = {
val expr = spark.sessionState.sqlParser.parseExpression(exprString)
resolveFilterExpr(spark, expr, tableSchema)
}
def resolveFilterExpr(spark: SparkSession, expr: Expression, tableSchema: StructType): Expression = {
val schemaFields = tableSchema.fields
val resolvedExpr = spark.sessionState.analyzer.ResolveReferences(
Filter(expr,
LocalRelation(schemaFields.head, schemaFields.drop(1): _*))
)
.asInstanceOf[Filter].condition
checkForUnresolvedRefs(resolvedExpr)
}
private def checkForUnresolvedRefs(resolvedExpr: Expression): Expression =
resolvedExpr match {
case UnresolvedAttribute(_) => throw new IllegalStateException("unresolved attribute")
case _ => resolvedExpr.mapChildren(e => checkForUnresolvedRefs(e))
}
/**
* Split the given predicates into two sequence predicates:
* - predicates that references partition columns only(and involves no sub-query);
* - other predicates.
*
* @param sparkSession The spark session
* @param predicates The predicates to be split
* @param partitionColumns The partition columns
* @return (partitionFilters, dataFilters)
*/
def splitPartitionAndDataPredicates(sparkSession: SparkSession,
predicates: Array[Expression],
partitionColumns: Array[String]): (Array[Expression], Array[Expression]) = {
// Validates that the provided names both resolve to the same entity
val resolvedNameEquals = sparkSession.sessionState.analyzer.resolver
predicates.partition(expr => {
// Checks whether given expression only references partition columns(and involves no sub-query)
expr.references.forall(r => partitionColumns.exists(resolvedNameEquals(r.name, _))) &&
!SubqueryExpression.hasSubquery(expr)
})
}
}

View File

@@ -17,14 +17,17 @@
package org.apache.spark.sql.hudi
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper.{getMaxColumnNameFor, getMinColumnNameFor, getNumNullsColumnNameFor}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, AttributeReference, EqualNullSafe, EqualTo, Expression, ExtractValue, GetStructField, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, Not, Or, StartsWith}
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, AttributeReference, EqualNullSafe, EqualTo, Expression, ExtractValue, GetStructField, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, Not, Or, StartsWith, SubqueryExpression}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.hudi.ColumnStatsExpressionUtils._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{AnalysisException, HoodieCatalystExpressionUtils}
import org.apache.spark.unsafe.types.UTF8String
object DataSkippingUtils extends Logging {
@@ -59,147 +62,205 @@ object DataSkippingUtils extends Logging {
}
private def tryComposeIndexFilterExpr(sourceExpr: Expression, indexSchema: StructType): Option[Expression] = {
def minValue(colName: String) = col(getMinColumnNameFor(colName)).expr
def maxValue(colName: String) = col(getMaxColumnNameFor(colName)).expr
def numNulls(colName: String) = col(getNumNullsColumnNameFor(colName)).expr
def colContainsValuesEqualToLiteral(colName: String, value: Literal): Expression =
// Only case when column C contains value V is when min(C) <= V <= max(c)
And(LessThanOrEqual(minValue(colName), value), GreaterThanOrEqual(maxValue(colName), value))
def colContainsOnlyValuesEqualToLiteral(colName: String, value: Literal) =
// Only case when column C contains _only_ value V is when min(C) = V AND max(c) = V
And(EqualTo(minValue(colName), value), EqualTo(maxValue(colName), value))
//
// For translation of the Filter Expression for the Data Table into Filter Expression for Column Stats Index, we're
// assuming that
// - The column A is queried in the Data Table (hereafter referred to as "colA")
// - Filter Expression is a relational expression (ie "=", "<", "<=", ...) of the following form
//
// ```transform_expr(colA) = value_expr```
//
// Where
// - "transform_expr" is an expression of the _transformation_ which preserve ordering of the "colA"
// - "value_expr" is an "value"-expression (ie one NOT referring to other attributes/columns or containing sub-queries)
//
// We translate original Filter Expr into the one querying Column Stats Index like following: let's consider
// equality Filter Expr referred to above:
//
// ```transform_expr(colA) = value_expr```
//
// This expression will be translated into following Filter Expression for the Column Stats Index:
//
// ```(transform_expr(colA_minValue) <= value_expr) AND (value_expr <= transform_expr(colA_maxValue))```
//
// Which will enable us to match files with the range of values in column A containing the target ```value_expr```
//
// NOTE: That we can apply ```transform_expr``` transformation precisely b/c it preserves the ordering of the
// values of the source column, ie following holds true:
//
// colA_minValue = min(colA) => transform_expr(colA_minValue) = min(transform_expr(colA))
// colA_maxValue = max(colA) => transform_expr(colA_maxValue) = max(transform_expr(colA))
//
sourceExpr match {
// Filter "colA = b"
// Translates to "colA_minValue <= b AND colA_maxValue >= b" condition for index lookup
case EqualTo(attribute: AttributeReference, value: Literal) =>
getTargetIndexedColName(attribute, indexSchema)
.map(colName => colContainsValuesEqualToLiteral(colName, value))
// If Expression is not resolved, we can't perform the analysis accurately, bailing
case expr if !expr.resolved => None
// Filter "b = colA"
// Translates to "colA_minValue <= b AND colA_maxValue >= b" condition for index lookup
case EqualTo(value: Literal, attribute: AttributeReference) =>
getTargetIndexedColName(attribute, indexSchema)
.map(colName => colContainsValuesEqualToLiteral(colName, value))
// Filter "expr(colA) = B" and "B = expr(colA)"
// Translates to "(expr(colA_minValue) <= B) AND (B <= expr(colA_maxValue))" condition for index lookup
case EqualTo(sourceExpr @ AllowedTransformationExpression(attrRef), valueExpr: Expression) if isValueExpression(valueExpr) =>
getTargetIndexedColumnName(attrRef, indexSchema)
.map { colName =>
// NOTE: Since we're supporting (almost) arbitrary expressions of the form `f(colA) = B`, we have to
// appropriately translate such original expression targeted at Data Table, to corresponding
// expression targeted at Column Stats Index Table. For that, we take original expression holding
// [[AttributeReference]] referring to the Data Table, and swap it w/ expression referring to
// corresponding column in the Column Stats Index
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
genColumnValuesEqualToExpression(colName, valueExpr, targetExprBuilder)
}
// Filter "colA != b"
// Translates to "NOT(colA_minValue = b AND colA_maxValue = b)"
// NOTE: This is NOT an inversion of `colA = b`
case Not(EqualTo(attribute: AttributeReference, value: Literal)) =>
getTargetIndexedColName(attribute, indexSchema)
.map(colName => Not(colContainsOnlyValuesEqualToLiteral(colName, value)))
case EqualTo(valueExpr: Expression, sourceExpr @ AllowedTransformationExpression(attrRef)) if isValueExpression(valueExpr) =>
getTargetIndexedColumnName(attrRef, indexSchema)
.map { colName =>
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
genColumnValuesEqualToExpression(colName, valueExpr, targetExprBuilder)
}
// Filter "b != colA"
// Translates to "NOT(colA_minValue = b AND colA_maxValue = b)"
// NOTE: This is NOT an inversion of `colA = b`
case Not(EqualTo(value: Literal, attribute: AttributeReference)) =>
getTargetIndexedColName(attribute, indexSchema)
.map(colName => Not(colContainsOnlyValuesEqualToLiteral(colName, value)))
// Filter "expr(colA) != B" and "B != expr(colA)"
// Translates to "NOT(expr(colA_minValue) = B AND expr(colA_maxValue) = B)"
// NOTE: This is NOT an inversion of `colA = b`, instead this filter ONLY excludes files for which `colA = B`
// holds true
case Not(EqualTo(sourceExpr @ AllowedTransformationExpression(attrRef), value: Expression)) if isValueExpression(value) =>
getTargetIndexedColumnName(attrRef, indexSchema)
.map { colName =>
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
Not(genColumnOnlyValuesEqualToExpression(colName, value, targetExprBuilder))
}
case Not(EqualTo(value: Expression, sourceExpr @ AllowedTransformationExpression(attrRef))) if isValueExpression(value) =>
getTargetIndexedColumnName(attrRef, indexSchema)
.map { colName =>
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
Not(genColumnOnlyValuesEqualToExpression(colName, value, targetExprBuilder))
}
// Filter "colA = null"
// Translates to "colA_num_nulls = null" for index lookup
case equalNullSafe @ EqualNullSafe(_: AttributeReference, _ @ Literal(null, _)) =>
getTargetIndexedColName(equalNullSafe.left, indexSchema)
.map(colName => EqualTo(numNulls(colName), equalNullSafe.right))
case EqualNullSafe(attrRef: AttributeReference, litNull @ Literal(null, _)) =>
getTargetIndexedColumnName(attrRef, indexSchema)
.map(colName => EqualTo(genColNumNullsExpr(colName), litNull))
// Filter "colA < b"
// Translates to "colA_minValue < b" for index lookup
case LessThan(attribute: AttributeReference, value: Literal) =>
getTargetIndexedColName(attribute, indexSchema)
.map(colName => LessThan(minValue(colName), value))
// Filter "expr(colA) < B" and "B > expr(colA)"
// Translates to "expr(colA_minValue) < B" for index lookup
case LessThan(sourceExpr @ AllowedTransformationExpression(attrRef), value: Expression) if isValueExpression(value) =>
getTargetIndexedColumnName(attrRef, indexSchema)
.map { colName =>
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
LessThan(targetExprBuilder.apply(genColMinValueExpr(colName)), value)
}
// Filter "b > colA"
// Translates to "b > colA_minValue" for index lookup
case GreaterThan(value: Literal, attribute: AttributeReference) =>
getTargetIndexedColName(attribute, indexSchema)
.map(colName => LessThan(minValue(colName), value))
case GreaterThan(value: Expression, sourceExpr @ AllowedTransformationExpression(attrRef)) if isValueExpression(value) =>
getTargetIndexedColumnName(attrRef, indexSchema)
.map { colName =>
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
LessThan(targetExprBuilder.apply(genColMinValueExpr(colName)), value)
}
// Filter "b < colA"
// Translates to "b < colA_maxValue" for index lookup
case LessThan(value: Literal, attribute: AttributeReference) =>
getTargetIndexedColName(attribute, indexSchema)
.map(colName => GreaterThan(maxValue(colName), value))
// Filter "B < expr(colA)" and "expr(colA) > B"
// Translates to "B < colA_maxValue" for index lookup
case LessThan(value: Expression, sourceExpr @ AllowedTransformationExpression(attrRef)) if isValueExpression(value) =>
getTargetIndexedColumnName(attrRef, indexSchema)
.map { colName =>
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
GreaterThan(targetExprBuilder.apply(genColMaxValueExpr(colName)), value)
}
// Filter "colA > b"
// Translates to "colA_maxValue > b" for index lookup
case GreaterThan(attribute: AttributeReference, value: Literal) =>
getTargetIndexedColName(attribute, indexSchema)
.map(colName => GreaterThan(maxValue(colName), value))
case GreaterThan(sourceExpr @ AllowedTransformationExpression(attrRef), value: Expression) if isValueExpression(value) =>
getTargetIndexedColumnName(attrRef, indexSchema)
.map { colName =>
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
GreaterThan(targetExprBuilder.apply(genColMaxValueExpr(colName)), value)
}
// Filter "colA <= b"
// Translates to "colA_minValue <= b" for index lookup
case LessThanOrEqual(attribute: AttributeReference, value: Literal) =>
getTargetIndexedColName(attribute, indexSchema)
.map(colName => LessThanOrEqual(minValue(colName), value))
// Filter "expr(colA) <= B" and "B >= expr(colA)"
// Translates to "colA_minValue <= B" for index lookup
case LessThanOrEqual(sourceExpr @ AllowedTransformationExpression(attrRef), value: Expression) if isValueExpression(value) =>
getTargetIndexedColumnName(attrRef, indexSchema)
.map { colName =>
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
LessThanOrEqual(targetExprBuilder.apply(genColMinValueExpr(colName)), value)
}
// Filter "b >= colA"
// Translates to "b >= colA_minValue" for index lookup
case GreaterThanOrEqual(value: Literal, attribute: AttributeReference) =>
getTargetIndexedColName(attribute, indexSchema)
.map(colName => LessThanOrEqual(minValue(colName), value))
case GreaterThanOrEqual(value: Expression, sourceExpr @ AllowedTransformationExpression(attrRef)) if isValueExpression(value) =>
getTargetIndexedColumnName(attrRef, indexSchema)
.map { colName =>
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
LessThanOrEqual(targetExprBuilder.apply(genColMinValueExpr(colName)), value)
}
// Filter "b <= colA"
// Translates to "b <= colA_maxValue" for index lookup
case LessThanOrEqual(value: Literal, attribute: AttributeReference) =>
getTargetIndexedColName(attribute, indexSchema)
.map(colName => GreaterThanOrEqual(maxValue(colName), value))
// Filter "B <= expr(colA)" and "expr(colA) >= B"
// Translates to "B <= colA_maxValue" for index lookup
case LessThanOrEqual(value: Expression, sourceExpr @ AllowedTransformationExpression(attrRef)) if isValueExpression(value) =>
getTargetIndexedColumnName(attrRef, indexSchema)
.map { colName =>
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
GreaterThanOrEqual(targetExprBuilder.apply(genColMaxValueExpr(colName)), value)
}
// Filter "colA >= b"
// Translates to "colA_maxValue >= b" for index lookup
case GreaterThanOrEqual(attribute: AttributeReference, right: Literal) =>
getTargetIndexedColName(attribute, indexSchema)
.map(colName => GreaterThanOrEqual(maxValue(colName), right))
case GreaterThanOrEqual(sourceExpr @ AllowedTransformationExpression(attrRef), value: Expression) if isValueExpression(value) =>
getTargetIndexedColumnName(attrRef, indexSchema)
.map { colName =>
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
GreaterThanOrEqual(targetExprBuilder.apply(genColMaxValueExpr(colName)), value)
}
// Filter "colA is null"
// Translates to "colA_num_nulls > 0" for index lookup
case IsNull(attribute: AttributeReference) =>
getTargetIndexedColName(attribute, indexSchema)
.map(colName => GreaterThan(numNulls(colName), Literal(0)))
getTargetIndexedColumnName(attribute, indexSchema)
.map(colName => GreaterThan(genColNumNullsExpr(colName), Literal(0)))
// Filter "colA is not null"
// Translates to "colA_num_nulls = 0" for index lookup
case IsNotNull(attribute: AttributeReference) =>
getTargetIndexedColName(attribute, indexSchema)
.map(colName => EqualTo(numNulls(colName), Literal(0)))
getTargetIndexedColumnName(attribute, indexSchema)
.map(colName => EqualTo(genColNumNullsExpr(colName), Literal(0)))
// Filter "colA in (a, b, ...)"
// Translates to "(colA_minValue <= a AND colA_maxValue >= a) OR (colA_minValue <= b AND colA_maxValue >= b)" for index lookup
// NOTE: This is equivalent to "colA = a OR colA = b OR ..."
case In(attribute: AttributeReference, list: Seq[Literal]) =>
getTargetIndexedColName(attribute, indexSchema)
.map(colName =>
list.map { lit => colContainsValuesEqualToLiteral(colName, lit) }.reduce(Or)
)
// Filter "expr(colA) in (B1, B2, ...)"
// Translates to "(colA_minValue <= B1 AND colA_maxValue >= B1) OR (colA_minValue <= B2 AND colA_maxValue >= B2) ... "
// for index lookup
// NOTE: This is equivalent to "colA = B1 OR colA = B2 OR ..."
case In(sourceExpr @ AllowedTransformationExpression(attrRef), list: Seq[Expression]) if list.forall(isValueExpression) =>
getTargetIndexedColumnName(attrRef, indexSchema)
.map { colName =>
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
list.map(lit => genColumnValuesEqualToExpression(colName, lit, targetExprBuilder)).reduce(Or)
}
// Filter "colA not in (a, b, ...)"
// Translates to "NOT((colA_minValue = a AND colA_maxValue = a) OR (colA_minValue = b AND colA_maxValue = b))" for index lookup
// NOTE: This is NOT an inversion of `in (a, b, ...)` expr, this is equivalent to "colA != a AND colA != b AND ..."
case Not(In(attribute: AttributeReference, list: Seq[Literal])) =>
getTargetIndexedColName(attribute, indexSchema)
.map(colName =>
Not(
list.map { lit => colContainsOnlyValuesEqualToLiteral(colName, lit) }.reduce(Or)
)
)
// Filter "expr(colA) not in (B1, B2, ...)"
// Translates to "NOT((colA_minValue = B1 AND colA_maxValue = B1) OR (colA_minValue = B2 AND colA_maxValue = B2))" for index lookup
// NOTE: This is NOT an inversion of `in (B1, B2, ...)` expr, this is equivalent to "colA != B1 AND colA != B2 AND ..."
case Not(In(sourceExpr @ AllowedTransformationExpression(attrRef), list: Seq[Expression])) if list.forall(_.foldable) =>
getTargetIndexedColumnName(attrRef, indexSchema)
.map { colName =>
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
Not(list.map(lit => genColumnOnlyValuesEqualToExpression(colName, lit, targetExprBuilder)).reduce(Or))
}
// Filter "colA like 'xxx%'"
// Translates to "colA_minValue <= xxx AND colA_maxValue >= xxx" for index lookup
// NOTE: That this operator only matches string prefixes, and this is
// essentially equivalent to "colA = b" expression
case StartsWith(attribute, v @ Literal(_: UTF8String, _)) =>
getTargetIndexedColName(attribute, indexSchema)
.map(colName => colContainsValuesEqualToLiteral(colName, v))
// Translates to "colA_minValue <= xxx AND xxx <= colA_maxValue" for index lookup
//
// NOTE: Since a) this operator matches strings by prefix and b) given that this column is going to be ordered
// lexicographically, we essentially need to check that provided literal falls w/in min/max bounds of the
// given column
case StartsWith(sourceExpr @ AllowedTransformationExpression(attrRef), v @ Literal(_: UTF8String, _)) =>
getTargetIndexedColumnName(attrRef, indexSchema)
.map { colName =>
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
genColumnValuesEqualToExpression(colName, v, targetExprBuilder)
}
// Filter "colA not like 'xxx%'"
// Translates to "NOT(colA_minValue like 'xxx%' AND colA_maxValue like 'xxx%')" for index lookup
// Filter "expr(colA) not like 'xxx%'"
// Translates to "NOT(expr(colA_minValue) like 'xxx%' AND expr(colA_maxValue) like 'xxx%')" for index lookup
// NOTE: This is NOT an inversion of "colA like xxx"
case Not(StartsWith(attribute, value @ Literal(_: UTF8String, _))) =>
getTargetIndexedColName(attribute, indexSchema)
.map(colName =>
Not(And(StartsWith(minValue(colName), value), StartsWith(maxValue(colName), value)))
)
case Not(StartsWith(sourceExpr @ AllowedTransformationExpression(attrRef), value @ Literal(_: UTF8String, _))) =>
getTargetIndexedColumnName(attrRef, indexSchema)
.map { colName =>
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
val minValueExpr = targetExprBuilder.apply(genColMinValueExpr(colName))
val maxValueExpr = targetExprBuilder.apply(genColMaxValueExpr(colName))
Not(And(StartsWith(minValueExpr, value), StartsWith(maxValueExpr, value)))
}
case or: Or =>
val resLeft = createColumnStatsIndexFilterExprInternal(or.left, indexSchema)
@@ -238,7 +299,7 @@ object DataSkippingUtils extends Logging {
.forall(stat => indexSchema.exists(_.name == stat))
}
private def getTargetIndexedColName(resolvedExpr: Expression, indexSchema: StructType): Option[String] = {
private def getTargetIndexedColumnName(resolvedExpr: AttributeReference, indexSchema: StructType): Option[String] = {
val colName = UnresolvedAttribute(getTargetColNameParts(resolvedExpr)).name
// Verify that the column is indexed
@@ -261,3 +322,91 @@ object DataSkippingUtils extends Logging {
}
}
}
private object ColumnStatsExpressionUtils {
def genColMinValueExpr(colName: String): Expression =
col(getMinColumnNameFor(colName)).expr
def genColMaxValueExpr(colName: String): Expression =
col(getMaxColumnNameFor(colName)).expr
def genColNumNullsExpr(colName: String): Expression =
col(getNumNullsColumnNameFor(colName)).expr
def genColumnValuesEqualToExpression(colName: String,
value: Expression,
targetExprBuilder: Function[Expression, Expression] = Predef.identity): Expression = {
// TODO clean up
checkState(isValueExpression(value))
val minValueExpr = targetExprBuilder.apply(genColMinValueExpr(colName))
val maxValueExpr = targetExprBuilder.apply(genColMaxValueExpr(colName))
// Only case when column C contains value V is when min(C) <= V <= max(c)
And(LessThanOrEqual(minValueExpr, value), GreaterThanOrEqual(maxValueExpr, value))
}
def genColumnOnlyValuesEqualToExpression(colName: String,
value: Expression,
targetExprBuilder: Function[Expression, Expression] = Predef.identity): Expression = {
// TODO clean up
checkState(isValueExpression(value))
val minValueExpr = targetExprBuilder.apply(genColMinValueExpr(colName))
val maxValueExpr = targetExprBuilder.apply(genColMaxValueExpr(colName))
// Only case when column C contains _only_ value V is when min(C) = V AND max(c) = V
And(EqualTo(minValueExpr, value), EqualTo(maxValueExpr, value))
}
def swapAttributeRefInExpr(sourceExpr: Expression, from: AttributeReference, to: Expression): Expression = {
checkState(sourceExpr.references.size == 1)
sourceExpr.transformDown {
case attrRef: AttributeReference if attrRef.sameRef(from) => to
}
}
/**
* This check is used to validate that the expression that target column is compared against
* <pre>
* a) Has no references to other attributes (for ex, columns)
* b) Does not contain sub-queries
* </pre>
*
* This in turn allows us to be certain that Spark will be able to evaluate such expression
* against Column Stats Index as well
*/
def isValueExpression(expr: Expression): Boolean =
expr.references.isEmpty && !SubqueryExpression.hasSubquery(expr)
/**
* This utility pattern-matches an expression iff
*
* <ol>
* <li>It references *exactly* 1 attribute (column)</li>
* <li>It does NOT contain sub-queries</li>
* <li>It contains only whitelisted transformations that preserve ordering of the source column [1]</li>
* </ol>
*
* [1] This is required to make sure that we can correspondingly map Column Stats Index values as well. Applying
* transformations that do not preserve the ordering might lead to incorrect results being returned by Data
* Skipping flow.
*
* Returns only [[AttributeReference]] contained as a sub-expression
*/
object AllowedTransformationExpression extends SparkAdapterSupport {
val exprUtils: HoodieCatalystExpressionUtils = sparkAdapter.createCatalystExpressionUtils()
def unapply(expr: Expression): Option[AttributeReference] = {
// First step, we check that expression
// - Does NOT contain sub-queries
// - Does contain exactly 1 attribute
if (SubqueryExpression.hasSubquery(expr) || expr.references.size != 1) {
None
} else {
// Second step, we validate that holding expression is an actually permitted
// transformation
// NOTE: That transformation composition is permitted
exprUtils.tryMatchAttributeOrderingPreservingTransformation(expr)
}
}
}
}