[HUDI-1879] Support Partition Prune For MergeOnRead Snapshot Table (#2926)
This commit is contained in:
@@ -262,7 +262,14 @@ case class HoodieFileIndex(
|
||||
// If the partition column size is not equal to the partition fragment size
|
||||
// and the partition column size is 1, we map the whole partition path
|
||||
// to the partition column which can benefit from the partition prune.
|
||||
InternalRow.fromSeq(Seq(UTF8String.fromString(partitionPath)))
|
||||
val prefix = s"${partitionSchema.fieldNames.head}="
|
||||
val partitionValue = if (partitionPath.startsWith(prefix)) {
|
||||
// support hive style partition path
|
||||
partitionPath.substring(prefix.length)
|
||||
} else {
|
||||
partitionPath
|
||||
}
|
||||
InternalRow.fromSeq(Seq(UTF8String.fromString(partitionValue)))
|
||||
} else if (partitionFragments.length != partitionSchema.fields.length &&
|
||||
partitionSchema.fields.length > 1) {
|
||||
// If the partition column size is not equal to the partition fragments size
|
||||
|
||||
@@ -28,8 +28,10 @@ import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
|
||||
import org.apache.spark.sql.avro.SchemaConverters
|
||||
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
|
||||
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Literal}
|
||||
import org.apache.spark.sql.execution.datasources.{FileStatusCache, InMemoryFileIndex, Spark2ParsePartitionUtil, Spark3ParsePartitionUtil, SparkParsePartitionUtil}
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.sql.sources.{And, EqualNullSafe, EqualTo, Filter, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Not, Or, StringContains, StringEndsWith, StringStartsWith}
|
||||
import org.apache.spark.sql.types.{StringType, StructField, StructType}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
@@ -128,4 +130,98 @@ object HoodieSparkUtils {
|
||||
new Spark3ParsePartitionUtil(conf)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert Filters to Catalyst Expressions and joined by And. If convert success return an
|
||||
* Non-Empty Option[Expression],or else return None.
|
||||
*/
|
||||
def convertToCatalystExpressions(filters: Array[Filter],
|
||||
tableSchema: StructType): Option[Expression] = {
|
||||
val expressions = filters.map(convertToCatalystExpression(_, tableSchema))
|
||||
if (expressions.forall(p => p.isDefined)) {
|
||||
if (expressions.isEmpty) {
|
||||
None
|
||||
} else if (expressions.length == 1) {
|
||||
expressions(0)
|
||||
} else {
|
||||
Some(expressions.map(_.get).reduce(org.apache.spark.sql.catalyst.expressions.And))
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert Filter to Catalyst Expression. If convert success return an Non-Empty
|
||||
* Option[Expression],or else return None.
|
||||
*/
|
||||
def convertToCatalystExpression(filter: Filter, tableSchema: StructType): Option[Expression] = {
|
||||
Option(
|
||||
filter match {
|
||||
case EqualTo(attribute, value) =>
|
||||
org.apache.spark.sql.catalyst.expressions.EqualTo(toAttribute(attribute, tableSchema), Literal.create(value))
|
||||
case EqualNullSafe(attribute, value) =>
|
||||
org.apache.spark.sql.catalyst.expressions.EqualNullSafe(toAttribute(attribute, tableSchema), Literal.create(value))
|
||||
case GreaterThan(attribute, value) =>
|
||||
org.apache.spark.sql.catalyst.expressions.GreaterThan(toAttribute(attribute, tableSchema), Literal.create(value))
|
||||
case GreaterThanOrEqual(attribute, value) =>
|
||||
org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual(toAttribute(attribute, tableSchema), Literal.create(value))
|
||||
case LessThan(attribute, value) =>
|
||||
org.apache.spark.sql.catalyst.expressions.LessThan(toAttribute(attribute, tableSchema), Literal.create(value))
|
||||
case LessThanOrEqual(attribute, value) =>
|
||||
org.apache.spark.sql.catalyst.expressions.LessThanOrEqual(toAttribute(attribute, tableSchema), Literal.create(value))
|
||||
case In(attribute, values) =>
|
||||
val attrExp = toAttribute(attribute, tableSchema)
|
||||
val valuesExp = values.map(v => Literal.create(v))
|
||||
org.apache.spark.sql.catalyst.expressions.In(attrExp, valuesExp)
|
||||
case IsNull(attribute) =>
|
||||
org.apache.spark.sql.catalyst.expressions.IsNull(toAttribute(attribute, tableSchema))
|
||||
case IsNotNull(attribute) =>
|
||||
org.apache.spark.sql.catalyst.expressions.IsNotNull(toAttribute(attribute, tableSchema))
|
||||
case And(left, right) =>
|
||||
val leftExp = convertToCatalystExpression(left, tableSchema)
|
||||
val rightExp = convertToCatalystExpression(right, tableSchema)
|
||||
if (leftExp.isEmpty || rightExp.isEmpty) {
|
||||
null
|
||||
} else {
|
||||
org.apache.spark.sql.catalyst.expressions.And(leftExp.get, rightExp.get)
|
||||
}
|
||||
case Or(left, right) =>
|
||||
val leftExp = convertToCatalystExpression(left, tableSchema)
|
||||
val rightExp = convertToCatalystExpression(right, tableSchema)
|
||||
if (leftExp.isEmpty || rightExp.isEmpty) {
|
||||
null
|
||||
} else {
|
||||
org.apache.spark.sql.catalyst.expressions.Or(leftExp.get, rightExp.get)
|
||||
}
|
||||
case Not(child) =>
|
||||
val childExp = convertToCatalystExpression(child, tableSchema)
|
||||
if (childExp.isEmpty) {
|
||||
null
|
||||
} else {
|
||||
org.apache.spark.sql.catalyst.expressions.Not(childExp.get)
|
||||
}
|
||||
case StringStartsWith(attribute, value) =>
|
||||
val leftExp = toAttribute(attribute, tableSchema)
|
||||
val rightExp = Literal.create(s"$value%")
|
||||
org.apache.spark.sql.catalyst.expressions.Like(leftExp, rightExp)
|
||||
case StringEndsWith(attribute, value) =>
|
||||
val leftExp = toAttribute(attribute, tableSchema)
|
||||
val rightExp = Literal.create(s"%$value")
|
||||
org.apache.spark.sql.catalyst.expressions.Like(leftExp, rightExp)
|
||||
case StringContains(attribute, value) =>
|
||||
val leftExp = toAttribute(attribute, tableSchema)
|
||||
val rightExp = Literal.create(s"%$value%")
|
||||
org.apache.spark.sql.catalyst.expressions.Like(leftExp, rightExp)
|
||||
case _=> null
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
private def toAttribute(columnName: String, tableSchema: StructType): AttributeReference = {
|
||||
val field = tableSchema.find(p => p.name == columnName)
|
||||
assert(field.isDefined, s"Cannot find column: $columnName, Table Columns are: " +
|
||||
s"${tableSchema.fieldNames.mkString(",")}")
|
||||
AttributeReference(columnName, field.get.dataType, field.get.nullable)()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -67,7 +67,6 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
|
||||
DataSourceReadOptions.REALTIME_MERGE_OPT_KEY,
|
||||
DataSourceReadOptions.DEFAULT_REALTIME_MERGE_OPT_VAL)
|
||||
private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf)
|
||||
private val fileIndex = buildFileIndex()
|
||||
private val preCombineField = {
|
||||
val preCombineFieldFromTableConfig = metaClient.getTableConfig.getPreCombineField
|
||||
if (preCombineFieldFromTableConfig != null) {
|
||||
@@ -94,6 +93,8 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
|
||||
})
|
||||
val requiredAvroSchema = AvroConversionUtils
|
||||
.convertStructTypeToAvroSchema(requiredStructSchema, tableAvroSchema.getName, tableAvroSchema.getNamespace)
|
||||
|
||||
val fileIndex = buildFileIndex(filters)
|
||||
val hoodieTableState = HoodieMergeOnReadTableState(
|
||||
tableStructSchema,
|
||||
requiredStructSchema,
|
||||
@@ -131,7 +132,8 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
|
||||
rdd.asInstanceOf[RDD[Row]]
|
||||
}
|
||||
|
||||
def buildFileIndex(): List[HoodieMergeOnReadFileSplit] = {
|
||||
def buildFileIndex(filters: Array[Filter]): List[HoodieMergeOnReadFileSplit] = {
|
||||
|
||||
val fileStatuses = if (globPaths.isDefined) {
|
||||
// Load files from the global paths if it has defined to be compatible with the original mode
|
||||
val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths.get)
|
||||
@@ -139,7 +141,19 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
|
||||
} else { // Load files by the HoodieFileIndex.
|
||||
val hoodieFileIndex = HoodieFileIndex(sqlContext.sparkSession, metaClient,
|
||||
Some(tableStructSchema), optParams, FileStatusCache.getOrCreate(sqlContext.sparkSession))
|
||||
hoodieFileIndex.allFiles
|
||||
|
||||
// Get partition filter and convert to catalyst expression
|
||||
val partitionColumns = hoodieFileIndex.partitionSchema.fieldNames.toSet
|
||||
val partitionFilters = filters.filter(f => f.references.forall(p => partitionColumns.contains(p)))
|
||||
val partitionFilterExpression =
|
||||
HoodieSparkUtils.convertToCatalystExpressions(partitionFilters, tableStructSchema)
|
||||
|
||||
// if convert success to catalyst expression, use the partition prune
|
||||
if (partitionFilterExpression.isDefined) {
|
||||
hoodieFileIndex.listFiles(Seq(partitionFilterExpression.get), Seq.empty).flatMap(_.files)
|
||||
} else {
|
||||
hoodieFileIndex.allFiles
|
||||
}
|
||||
}
|
||||
|
||||
if (fileStatuses.isEmpty) { // If this an empty table, return an empty split list.
|
||||
|
||||
Reference in New Issue
Block a user