diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala index adc34afc3..473bb2e24 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala @@ -24,9 +24,9 @@ import org.apache.hudi.HoodieBaseRelation.createBaseFileReader import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.hadoop.HoodieROTablePathFilter import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SQLContext} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.StructType @@ -70,7 +70,8 @@ class BaseFileOnlyViewRelation(sqlContext: SQLContext, HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns) val filterExpressions = convertToExpressions(filters) - val (partitionFilters, dataFilters) = filterExpressions.partition(isPartitionPredicate) + val (partitionFilters, dataFilters) = HoodieCatalystExpressionUtils.splitPartitionAndDataPredicates( + sparkSession, filterExpressions, partitionColumns) val filePartitions = getPartitions(partitionFilters, dataFilters) @@ -137,17 +138,4 @@ class BaseFileOnlyViewRelation(sqlContext: SQLContext, catalystExpressions.filter(_.isDefined).map(_.get).toArray } - - /** - * Checks whether given expression only references only references partition columns - * (and involves no sub-query) - */ - private def isPartitionPredicate(condition: Expression): Boolean = { - // Validates that the provided names both resolve to the same entity - val resolvedNameEquals = sparkSession.sessionState.analyzer.resolver - - condition.references.forall { r => partitionColumns.exists(resolvedNameEquals(r.name, _)) } && - !SubqueryExpression.hasSubquery(condition) - } - } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala new file mode 100644 index 000000000..58c332482 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.client.SparkRDDWriteClient +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.withSparkConf + +import scala.collection.JavaConverters.mapAsJavaMapConverter +import scala.collection.immutable.Map + +object HoodieCLIUtils { + + def createHoodieClientFromPath(sparkSession: SparkSession, + basePath: String, + conf: Map[String, String]): SparkRDDWriteClient[_] = { + val metaClient = HoodieTableMetaClient.builder().setBasePath(basePath) + .setConf(sparkSession.sessionState.newHadoopConf()).build() + val schemaUtil = new TableSchemaResolver(metaClient) + val schemaStr = schemaUtil.getTableAvroSchemaWithoutMetadataFields.toString + val finalParameters = HoodieWriterUtils.parametersWithWriteDefaults( + withSparkConf(sparkSession, Map.empty)( + conf + (DataSourceWriteOptions.TABLE_TYPE.key() -> metaClient.getTableType.name())) + ) + + val jsc = new JavaSparkContext(sparkSession.sparkContext) + DataSourceUtils.createHoodieClient(jsc, schemaStr, basePath, + metaClient.getTableConfig.getTableName, finalParameters.asJava) + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCommonUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCommonUtils.scala deleted file mode 100644 index d8189b1ad..000000000 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCommonUtils.scala +++ /dev/null @@ -1,286 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hudi - -import org.apache.hadoop.fs.Path -import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath -import org.apache.hudi.client.SparkRDDWriteClient -import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} -import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} -import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} -import org.apache.spark.api.java.JavaSparkContext -import org.apache.spark.internal.Logging -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate} -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation} -import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.catalyst.{InternalRow, expressions} -import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil -import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.withSparkConf -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{StringType, StructField, StructType} -import org.apache.spark.unsafe.types.UTF8String - -import scala.collection.JavaConverters.mapAsJavaMapConverter -import scala.collection.immutable.Map - -object HoodieCommonUtils extends Logging { - - def createHoodieClientFromPath(sparkSession: SparkSession, basePath: String, - conf: Map[String, String]): SparkRDDWriteClient[_] = { - val metaClient = HoodieTableMetaClient.builder().setBasePath(basePath) - .setConf(sparkSession.sessionState.newHadoopConf()).build() - val schemaUtil = new TableSchemaResolver(metaClient) - val schemaStr = schemaUtil.getTableAvroSchemaWithoutMetadataFields.toString - val finalParameters = HoodieWriterUtils.parametersWithWriteDefaults( - withSparkConf(sparkSession, Map.empty)( - conf + (DataSourceWriteOptions.TABLE_TYPE.key() -> metaClient.getTableType.name())) - ) - - val jsc = new JavaSparkContext(sparkSession.sparkContext) - DataSourceUtils.createHoodieClient(jsc, schemaStr, basePath, - metaClient.getTableConfig.getTableName, finalParameters.asJava) - } - - def getPartitionSchemaFromProperty(metaClient: HoodieTableMetaClient, - schemaSpec: Option[StructType]): StructType = { - val schema = schemaSpec.getOrElse({ - val schemaUtil = new TableSchemaResolver(metaClient) - AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema) - }) - - val tableConfig = metaClient.getTableConfig - val partitionColumns = tableConfig.getPartitionFields - val nameFieldMap = generateFieldMap(schema) - - if (partitionColumns.isPresent) { - // Note that key generator class name could be null - val keyGeneratorClassName = tableConfig.getKeyGeneratorClassName - if (classOf[TimestampBasedKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName) - || classOf[TimestampBasedAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)) { - val partitionFields = partitionColumns.get().map(column => StructField(column, StringType)) - StructType(partitionFields) - } else { - val partitionFields = partitionColumns.get().map(column => - nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column: '" + - s"$column' in the schema[${schema.fields.mkString(",")}]"))) - StructType(partitionFields) - } - } else { - // If the partition columns have not stored in hoodie.properties(the table that was - // created earlier), we trait it as a non-partitioned table. - logWarning("No partition columns available from hoodie.properties." + - " Partition pruning will not work") - new StructType() - } - } - - /** - * This method unravels [[StructType]] into a [[Map]] of pairs of dot-path notation with corresponding - * [[StructField]] object for every field of the provided [[StructType]], recursively. - * - * For example, following struct - *
- * StructType(
- * StructField("a",
- * StructType(
- * StructField("b", StringType),
- * StructField("c", IntType)
- * )
- * )
- * )
- *
- *
- * will be converted into following mapping:
- *
- *
- * "a.b" -> StructField("b", StringType),
- * "a.c" -> StructField("c", IntType),
- *
- */
- def generateFieldMap(structType: StructType): Map[String, StructField] = {
- def traverse(structField: Either[StructField, StructType]): Map[String, StructField] = {
- structField match {
- case Right(struct) => struct.fields.flatMap(f => traverse(Left(f))).toMap
- case Left(field) => field.dataType match {
- case struct: StructType => traverse(Right(struct)).map {
- case (key, structField) => (s"${field.name}.$key", structField)
- }
- case _ => Map(field.name -> field)
- }
- }
- }
-
- traverse(Right(structType))
- }
-
- /**
- * Prune the partition by the filter.This implementation is fork from
- * org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex#prunePartitions.
- *
- * @param partitionPaths All the partition paths.
- * @param predicates The filter condition.
- * @return The Pruned partition paths.
- */
- def prunePartition(partitionSchema: StructType,
- partitionPaths: Seq[PartitionPath],
- predicates: Seq[Expression]): Seq[PartitionPath] = {
- val partitionColumnNames = partitionSchema.fields.map(_.name).toSet
- val partitionPruningPredicates = predicates.filter {
- _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
- }
- if (partitionPruningPredicates.nonEmpty) {
- val predicate = partitionPruningPredicates.reduce(expressions.And)
- prunePartition(partitionSchema, partitionPaths, predicate)
- } else {
- partitionPaths
- }
- }
-
- def prunePartition(partitionSchema: StructType,
- partitionPaths: Seq[PartitionPath],
- predicate: Expression): Seq[PartitionPath] = {
- val boundPredicate = InterpretedPredicate(predicate.transform {
- case a: AttributeReference =>
- val index = partitionSchema.indexWhere(a.name == _.name)
- BoundReference(index, partitionSchema(index).dataType, nullable = true)
- })
-
- val prunedPartitionPaths = partitionPaths.filter {
- partitionPath => boundPredicate.eval(InternalRow.fromSeq(partitionPath.values))
- }
-
- logInfo(s"Total partition size is: ${partitionPaths.size}," +
- s" after partition prune size is: ${prunedPartitionPaths.size}")
- prunedPartitionPaths
- }
-
- def parsePartitionColumnValues(sparkParsePartitionUtil: SparkParsePartitionUtil,
- configProperties: TypedProperties,
- basePath: String,
- partitionSchema: StructType,
- partitionColumns: Array[String],
- partitionPath: String): Array[Object] = {
- if (partitionColumns.length == 0) {
- // This is a non-partitioned table
- Array.empty
- } else {
- val partitionFragments = partitionPath.split("/")
-
- if (partitionFragments.length != partitionColumns.length &&
- partitionColumns.length == 1) {
- // If the partition column size is not equal to the partition fragment size
- // and the partition column size is 1, we map the whole partition path
- // to the partition column which can benefit from the partition prune.
- val prefix = s"${partitionColumns.head}="
- val partitionValue = if (partitionPath.startsWith(prefix)) {
- // support hive style partition path
- partitionPath.substring(prefix.length)
- } else {
- partitionPath
- }
- Array(UTF8String.fromString(partitionValue))
- } else if (partitionFragments.length != partitionColumns.length &&
- partitionColumns.length > 1) {
- // If the partition column size is not equal to the partition fragments size
- // and the partition column size > 1, we do not know how to map the partition
- // fragments to the partition columns. So we trait it as a Non-Partitioned Table
- // for the query which do not benefit from the partition prune.
- logWarning(s"Cannot do the partition prune for table $basePath." +
- s"The partitionFragments size (${partitionFragments.mkString(",")})" +
- s" is not equal to the partition columns size(${partitionColumns.mkString(",")})")
- Array.empty
- } else {
- // If partitionSeqs.length == partitionSchema.fields.length
- // Append partition name to the partition value if the
- // HIVE_STYLE_PARTITIONING is disable.
- // e.g. convert "/xx/xx/2021/02" to "/xx/xx/year=2021/month=02"
- val partitionWithName =
- partitionFragments.zip(partitionColumns).map {
- case (partition, columnName) =>
- if (partition.indexOf("=") == -1) {
- s"${columnName}=$partition"
- } else {
- partition
- }
- }.mkString("/")
-
- val pathWithPartitionName = new Path(basePath, partitionWithName)
- val partitionValues = parsePartitionPath(sparkParsePartitionUtil, configProperties, basePath,
- pathWithPartitionName, partitionSchema)
-
- partitionValues.map(_.asInstanceOf[Object]).toArray
- }
- }
- }
-
- private def parsePartitionPath(sparkParsePartitionUtil: SparkParsePartitionUtil,
- configProperties: TypedProperties,
- basePath: String,
- partitionPath: Path,
- partitionSchema: StructType): Seq[Any] = {
- val timeZoneId = configProperties.getString(DateTimeUtils.TIMEZONE_OPTION, SQLConf.get.sessionLocalTimeZone)
- val partitionDataTypes = partitionSchema.map(f => f.name -> f.dataType).toMap
-
- sparkParsePartitionUtil.parsePartition(
- partitionPath,
- typeInference = false,
- Set(new Path(basePath)),
- partitionDataTypes,
- DateTimeUtils.getTimeZone(timeZoneId)
- )
- .toSeq(partitionSchema)
- }
-
- def getConfigProperties(spark: SparkSession, options: Map[String, String]): TypedProperties = {
- val sqlConf: SQLConf = spark.sessionState.conf
- val properties = new TypedProperties()
-
- // To support metadata listing via Spark SQL we allow users to pass the config via SQL Conf in spark session. Users
- // would be able to run SET hoodie.metadata.enable=true in the spark sql session to enable metadata listing.
- properties.setProperty(HoodieMetadataConfig.ENABLE.key(),
- sqlConf.getConfString(HoodieMetadataConfig.ENABLE.key(),
- HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS.toString))
- properties.putAll(options.asJava)
- properties
- }
-
- def resolveFilterExpr(spark: SparkSession, exprString: String, tableSchema: StructType): Expression = {
- val expr = spark.sessionState.sqlParser.parseExpression(exprString)
- resolveFilterExpr(spark, expr, tableSchema)
- }
-
- def resolveFilterExpr(spark: SparkSession, expr: Expression, tableSchema: StructType): Expression = {
- val schemaFields = tableSchema.fields
- val resolvedExpr = spark.sessionState.analyzer.ResolveReferences(
- Filter(expr, LocalRelation(schemaFields.head, schemaFields.drop(1): _*))
- )
- .asInstanceOf[Filter].condition
-
- checkForUnresolvedRefs(resolvedExpr)
- }
-
- private def checkForUnresolvedRefs(resolvedExpr: Expression): Expression =
- resolvedExpr match {
- case UnresolvedAttribute(_) => throw new IllegalStateException("unresolved attribute")
- case _ => resolvedExpr.mapChildren(e => checkForUnresolvedRefs(e))
- }
-}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index fc965aa8c..9cdf5cc63 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -19,6 +19,7 @@ package org.apache.hudi
import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.HoodieFileIndex.getConfigProperties
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.StringUtils
@@ -36,10 +37,11 @@ import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{AnalysisException, Column, SparkSession}
import org.apache.spark.unsafe.types.UTF8String
-import java.text.SimpleDateFormat
import scala.collection.JavaConverters._
-import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
+import scala.util.control.NonFatal
+
+import java.text.SimpleDateFormat
/**
* A file index which support partition prune for hoodie snapshot and read-optimized query.
@@ -73,7 +75,7 @@ case class HoodieFileIndex(spark: SparkSession,
spark = spark,
metaClient = metaClient,
schemaSpec = schemaSpec,
- configProperties = HoodieCommonUtils.getConfigProperties(spark, options),
+ configProperties = getConfigProperties(spark, options),
queryPaths = Seq(HoodieFileIndex.getQueryPath(options)),
specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant),
fileStatusCache = fileStatusCache
@@ -147,8 +149,7 @@ case class HoodieFileIndex(spark: SparkSession,
Seq(PartitionDirectory(InternalRow.empty, candidateFiles))
} else {
// Prune the partition path by the partition filters
- val prunedPartitions = HoodieCommonUtils.prunePartition(partitionSchema,
- cachedAllInputFileSlices.keySet.asScala.toSeq, convertedPartitionFilters)
+ val prunedPartitions = prunePartition(cachedAllInputFileSlices.keySet.asScala.toSeq, convertedPartitionFilters)
var totalFileSize = 0
var candidateFileSize = 0
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index c13adf3ab..d4c50b73f 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -18,18 +18,24 @@
package org.apache.hudi
import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_INCREMENTAL_OPT_VAL, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
-import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, toJavaOption}
+import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, generateFieldMap, toJavaOption}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConverters._
import scala.language.implicitConversions
@@ -78,8 +84,32 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
/**
* Get the partition schema from the hoodie.properties.
*/
- private lazy val _partitionSchemaFromProperties: StructType =
- HoodieCommonUtils.getPartitionSchemaFromProperty(metaClient, Some(schema))
+ private lazy val _partitionSchemaFromProperties: StructType = {
+ val tableConfig = metaClient.getTableConfig
+ val partitionColumns = tableConfig.getPartitionFields
+ val nameFieldMap = generateFieldMap(schema)
+
+ if (partitionColumns.isPresent) {
+ // Note that key generator class name could be null
+ val keyGeneratorClassName = tableConfig.getKeyGeneratorClassName
+ if (classOf[TimestampBasedKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)
+ || classOf[TimestampBasedAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)) {
+ val partitionFields = partitionColumns.get().map(column => StructField(column, StringType))
+ StructType(partitionFields)
+ } else {
+ val partitionFields = partitionColumns.get().map(column =>
+ nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column: '" +
+ s"$column' in the schema[${schema.fields.mkString(",")}]")))
+ StructType(partitionFields)
+ }
+ } else {
+ // If the partition columns have not stored in hoodie.properties(the table that was
+ // created earlier), we trait it as a non-partitioned table.
+ logWarning("No partition columns available from hoodie.properties." +
+ " Partition pruning will not work")
+ new StructType()
+ }
+ }
/**
* Get the data schema of the table.
@@ -110,16 +140,121 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
*/
def listFileSlices(partitionFilters: Seq[Expression]): Map[String, Seq[FileSlice]] = {
// Prune the partition path by the partition filters
- val prunedPartitions = HoodieCommonUtils.prunePartition(partitionSchema,
- cachedAllInputFileSlices.asScala.keys.toSeq, partitionFilters)
+ val prunedPartitions = prunePartition(cachedAllInputFileSlices.asScala.keys.toSeq, partitionFilters)
prunedPartitions.map(partition => {
(partition.path, cachedAllInputFileSlices.get(partition).asScala)
}).toMap
}
+ /**
+ * Get all the cached partition paths pruned by the filter.
+ *
+ * @param predicates The filter condition
+ * @return The pruned partition paths
+ */
+ def getPartitionPaths(predicates: Seq[Expression]): Seq[PartitionPath] = {
+ prunePartition(cachedAllInputFileSlices.keySet().asScala.toSeq, predicates)
+ }
+
+ /**
+ * Prune the partition by the filter.This implementation is fork from
+ * org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex#prunePartitions.
+ *
+ * @param partitionPaths All the partition paths.
+ * @param predicates The filter condition.
+ * @return The pruned partition paths.
+ */
+ protected def prunePartition(partitionPaths: Seq[PartitionPath], predicates: Seq[Expression]): Seq[PartitionPath] = {
+ val partitionColumnNames = partitionSchema.fields.map(_.name).toSet
+ val partitionPruningPredicates = predicates.filter {
+ _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+ }
+ if (partitionPruningPredicates.nonEmpty) {
+ val predicate = partitionPruningPredicates.reduce(expressions.And)
+
+ val boundPredicate = InterpretedPredicate(predicate.transform {
+ case a: AttributeReference =>
+ val index = partitionSchema.indexWhere(a.name == _.name)
+ BoundReference(index, partitionSchema(index).dataType, nullable = true)
+ })
+
+ val prunedPartitionPaths = partitionPaths.filter {
+ partitionPath => boundPredicate.eval(InternalRow.fromSeq(partitionPath.values))
+ }
+
+ logInfo(s"Total partition size is: ${partitionPaths.size}," +
+ s" after partition prune size is: ${prunedPartitionPaths.size}")
+ prunedPartitionPaths
+ } else {
+ partitionPaths
+ }
+ }
+
protected def parsePartitionColumnValues(partitionColumns: Array[String], partitionPath: String): Array[Object] = {
- HoodieCommonUtils.parsePartitionColumnValues(sparkParsePartitionUtil, configProperties,
- basePath, partitionSchema, partitionColumns, partitionPath)
+ if (partitionColumns.length == 0) {
+ // This is a non-partitioned table
+ Array.empty
+ } else {
+ val partitionFragments = partitionPath.split("/")
+
+ if (partitionFragments.length != partitionColumns.length &&
+ partitionColumns.length == 1) {
+ // If the partition column size is not equal to the partition fragment size
+ // and the partition column size is 1, we map the whole partition path
+ // to the partition column which can benefit from the partition prune.
+ val prefix = s"${partitionColumns.head}="
+ val partitionValue = if (partitionPath.startsWith(prefix)) {
+ // support hive style partition path
+ partitionPath.substring(prefix.length)
+ } else {
+ partitionPath
+ }
+ Array(UTF8String.fromString(partitionValue))
+ } else if (partitionFragments.length != partitionColumns.length &&
+ partitionColumns.length > 1) {
+ // If the partition column size is not equal to the partition fragments size
+ // and the partition column size > 1, we do not know how to map the partition
+ // fragments to the partition columns. So we trait it as a Non-Partitioned Table
+ // for the query which do not benefit from the partition prune.
+ logWarning(s"Cannot do the partition prune for table $basePath." +
+ s"The partitionFragments size (${partitionFragments.mkString(",")})" +
+ s" is not equal to the partition columns size(${partitionColumns.mkString(",")})")
+ Array.empty
+ } else {
+ // If partitionSeqs.length == partitionSchema.fields.length
+ // Append partition name to the partition value if the
+ // HIVE_STYLE_PARTITIONING is disable.
+ // e.g. convert "/xx/xx/2021/02" to "/xx/xx/year=2021/month=02"
+ val partitionWithName =
+ partitionFragments.zip(partitionColumns).map {
+ case (partition, columnName) =>
+ if (partition.indexOf("=") == -1) {
+ s"${columnName}=$partition"
+ } else {
+ partition
+ }
+ }.mkString("/")
+
+ val pathWithPartitionName = new Path(basePath, partitionWithName)
+ val partitionValues = parsePartitionPath(pathWithPartitionName, partitionSchema)
+
+ partitionValues.map(_.asInstanceOf[Object]).toArray
+ }
+ }
+ }
+
+ private def parsePartitionPath(partitionPath: Path, partitionSchema: StructType): Seq[Any] = {
+ val timeZoneId = configProperties.getString(DateTimeUtils.TIMEZONE_OPTION, SQLConf.get.sessionLocalTimeZone)
+ val partitionDataTypes = partitionSchema.map(f => f.name -> f.dataType).toMap
+
+ sparkParsePartitionUtil.parsePartition(
+ partitionPath,
+ typeInference = false,
+ Set(new Path(basePath)),
+ partitionDataTypes,
+ DateTimeUtils.getTimeZone(timeZoneId)
+ )
+ .toSeq(partitionSchema)
}
}
@@ -132,6 +267,45 @@ object SparkHoodieTableFileIndex {
org.apache.hudi.common.util.Option.empty()
}
+ /**
+ * This method unravels [[StructType]] into a [[Map]] of pairs of dot-path notation with corresponding
+ * [[StructField]] object for every field of the provided [[StructType]], recursively.
+ *
+ * For example, following struct
+ *
+ * StructType(
+ * StructField("a",
+ * StructType(
+ * StructField("b", StringType),
+ * StructField("c", IntType)
+ * )
+ * )
+ * )
+ *
+ *
+ * will be converted into following mapping:
+ *
+ *
+ * "a.b" -> StructField("b", StringType),
+ * "a.c" -> StructField("c", IntType),
+ *
+ */
+ private def generateFieldMap(structType: StructType) : Map[String, StructField] = {
+ def traverse(structField: Either[StructField, StructType]) : Map[String, StructField] = {
+ structField match {
+ case Right(struct) => struct.fields.flatMap(f => traverse(Left(f))).toMap
+ case Left(field) => field.dataType match {
+ case struct: StructType => traverse(Right(struct)).map {
+ case (key, structField) => (s"${field.name}.$key", structField)
+ }
+ case _ => Map(field.name -> field)
+ }
+ }
+ }
+
+ traverse(Right(structType))
+ }
+
private def deduceQueryType(configProperties: TypedProperties): HoodieTableQueryType = {
configProperties.asScala(QUERY_TYPE.key()) match {
case QUERY_TYPE_SNAPSHOT_OPT_VAL => HoodieTableQueryType.SNAPSHOT
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala
new file mode 100644
index 000000000..d640c0226
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation}
+import org.apache.spark.sql.types.StructType
+
+object HoodieCatalystExpressionUtils {
+
+ /**
+ * Resolve filter expression from string expr with given table schema, for example:
+ * + * ts > 1000 and ts <= 1500 + *+ * will be resolved as + *
+ * And(GreaterThan(ts#590L > 1000), LessThanOrEqual(ts#590L <= 1500)) + *+ * + * @param spark The spark session + * @param exprString String to be resolved + * @param tableSchema The table schema + * @return Resolved filter expression + */ + def resolveFilterExpr(spark: SparkSession, exprString: String, tableSchema: StructType): Expression = { + val expr = spark.sessionState.sqlParser.parseExpression(exprString) + resolveFilterExpr(spark, expr, tableSchema) + } + + def resolveFilterExpr(spark: SparkSession, expr: Expression, tableSchema: StructType): Expression = { + val schemaFields = tableSchema.fields + val resolvedExpr = spark.sessionState.analyzer.ResolveReferences( + Filter(expr, + LocalRelation(schemaFields.head, schemaFields.drop(1): _*)) + ) + .asInstanceOf[Filter].condition + + checkForUnresolvedRefs(resolvedExpr) + } + + private def checkForUnresolvedRefs(resolvedExpr: Expression): Expression = + resolvedExpr match { + case UnresolvedAttribute(_) => throw new IllegalStateException("unresolved attribute") + case _ => resolvedExpr.mapChildren(e => checkForUnresolvedRefs(e)) + } + + /** + * Split the given predicates into two sequence predicates: + * - predicates that references partition columns only(and involves no sub-query); + * - other predicates. + * + * @param sparkSession The spark session + * @param predicates The predicates to be split + * @param partitionColumns The partition columns + * @return (partitionFilters, dataFilters) + */ + def splitPartitionAndDataPredicates(sparkSession: SparkSession, + predicates: Array[Expression], + partitionColumns: Array[String]): (Array[Expression], Array[Expression]) = { + // Validates that the provided names both resolve to the same entity + val resolvedNameEquals = sparkSession.sessionState.analyzer.resolver + + predicates.partition(expr => { + // Checks whether given expression only references partition columns(and involves no sub-query) + expr.references.forall(r => partitionColumns.exists(resolvedNameEquals(r.name, _))) && + !SubqueryExpression.hasSubquery(expr) + }) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala index 59d38923b..1135981a9 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hudi.command -import org.apache.hudi.HoodieCommonUtils +import org.apache.hudi.HoodieCLIUtils import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieTableType} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline} @@ -42,7 +42,7 @@ case class CompactionHoodiePathCommand(path: String, assert(metaClient.getTableType == HoodieTableType.MERGE_ON_READ, s"Must compaction on a Merge On Read table.") - val client = HoodieCommonUtils.createHoodieClientFromPath(sparkSession, path, Map.empty) + val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, path, Map.empty) operation match { case SCHEDULE => diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala index a98cdce54..442ee0441 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala @@ -17,29 +17,24 @@ package org.apache.spark.sql.hudi.command.procedures -import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath -import org.apache.hudi.client.common.HoodieSparkEngineContext -import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL} import org.apache.hudi.common.table.timeline.HoodieActiveTimeline -import org.apache.hudi.common.table.view.FileSystemViewStorageConfig import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.util.ValidationUtils.checkArgument import org.apache.hudi.common.util.{ClusteringUtils, Option => HOption} import org.apache.hudi.config.HoodieClusteringConfig import org.apache.hudi.exception.HoodieClusteringException -import org.apache.hudi.metadata.HoodieTableMetadata -import org.apache.hudi.{HoodieCommonUtils, SparkAdapterSupport} -import org.apache.spark.api.java.JavaSparkContext +import org.apache.hudi.{AvroConversionUtils, HoodieCLIUtils, HoodieFileIndex} import org.apache.spark.internal.Logging -import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.{HoodieCatalystExpressionUtils, Row} +import org.apache.spark.sql.catalyst.expressions.PredicateHelper +import org.apache.spark.sql.execution.datasources.FileStatusCache import org.apache.spark.sql.types._ -import java.util.Properties import java.util.function.Supplier -import scala.collection.JavaConverters import scala.collection.JavaConverters._ -class RunClusteringProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport with Logging { +class RunClusteringProcedure extends BaseProcedure with ProcedureBuilder with PredicateHelper with Logging { /** * OPTIMIZE table_name|table_path [WHERE predicate] * [ORDER BY (col_name1 [, ...] ) ] @@ -74,15 +69,12 @@ class RunClusteringProcedure extends BaseProcedure with ProcedureBuilder with Sp var conf: Map[String, String] = Map.empty predicate match { case Some(p) => - val partitionColumnsSchema = HoodieCommonUtils.getPartitionSchemaFromProperty(metaClient, None) - val partitionPredicate = HoodieCommonUtils.resolveFilterExpr( - spark, p.asInstanceOf[String], partitionColumnsSchema) - val partitionSelected = prunePartition(metaClient, partitionPredicate) + val prunedPartitions = prunePartition(metaClient, p.asInstanceOf[String]) conf = conf ++ Map( HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key() -> "SELECTED_PARTITIONS", - HoodieClusteringConfig.PARTITION_SELECTED.key() -> partitionSelected + HoodieClusteringConfig.PARTITION_SELECTED.key() -> prunedPartitions ) - logInfo(s"Partition predicates: ${p}, partition selected: ${partitionSelected}") + logInfo(s"Partition predicates: ${p}, partition selected: ${prunedPartitions}") case _ => logInfo("No partition predicates") } @@ -104,7 +96,7 @@ class RunClusteringProcedure extends BaseProcedure with ProcedureBuilder with Sp .iterator().asScala.map(_.getLeft.getTimestamp).toSeq.sortBy(f => f) logInfo(s"Pending clustering instants: ${pendingClustering.mkString(",")}") - val client = HoodieCommonUtils.createHoodieClientFromPath(sparkSession, basePath, conf) + val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, conf) val instantTime = HoodieActiveTimeline.createNewInstantTime if (client.scheduleClusteringAtInstant(instantTime, HOption.empty())) { pendingClustering ++= Seq(instantTime) @@ -120,37 +112,26 @@ class RunClusteringProcedure extends BaseProcedure with ProcedureBuilder with Sp override def build: Procedure = new RunClusteringProcedure() - def prunePartition(metaClient: HoodieTableMetaClient, partitionPredicate: Expression): String = { - val partitionSchema = HoodieCommonUtils.getPartitionSchemaFromProperty(metaClient, None) - - // Get tableName meta data - val engineContext = new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext)) - val properties = new Properties() - properties.putAll(JavaConverters.mapAsJavaMapConverter(sparkSession.sessionState.conf.getAllConfs).asJava) - val metadataConfig = HoodieMetadataConfig.newBuilder().fromProperties(properties).build() - val tableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, metaClient.getBasePath, - FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue) - - val sparkParsePartitionUtil = sparkAdapter.createSparkParsePartitionUtil(sparkSession.sessionState.conf) - val typedProperties = HoodieCommonUtils.getConfigProperties(sparkSession, Map.empty) + def prunePartition(metaClient: HoodieTableMetaClient, predicate: String): String = { + val options = Map(QUERY_TYPE.key() -> QUERY_TYPE_SNAPSHOT_OPT_VAL, "path" -> metaClient.getBasePath) + val hoodieFileIndex = HoodieFileIndex(sparkSession, metaClient, None, options, + FileStatusCache.getOrCreate(sparkSession)) + // Resolve partition predicates + val schemaResolver = new TableSchemaResolver(metaClient) + val tableSchema = AvroConversionUtils.convertAvroSchemaToStructType(schemaResolver.getTableAvroSchema) + val condition = HoodieCatalystExpressionUtils.resolveFilterExpr(sparkSession, predicate, tableSchema) val partitionColumns = metaClient.getTableConfig.getPartitionFields.orElse(Array[String]()) + val (partitionPredicates, dataPredicates) = HoodieCatalystExpressionUtils.splitPartitionAndDataPredicates( + sparkSession, splitConjunctivePredicates(condition).toArray, partitionColumns) + checkArgument(dataPredicates.isEmpty, "Only partition predicates are allowed") - // Translate all partition path to {@code org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath} - val partitionPaths = tableMetadata.getAllPartitionPaths.asScala.map(partitionPath => { - val partitionColumnValues = HoodieCommonUtils.parsePartitionColumnValues( - sparkParsePartitionUtil, typedProperties, metaClient.getBasePath, - partitionSchema, partitionColumns, partitionPath) - new PartitionPath(partitionPath, partitionColumnValues) - }) - - // Filter partition by predicates - val selectedPartitions = HoodieCommonUtils.prunePartition( - partitionSchema, partitionPaths, partitionPredicate) - selectedPartitions.map(partitionPath => partitionPath.getPath).toSet.mkString(",") + // Get all partitions and prune partition by predicates + val prunedPartitions = hoodieFileIndex.getPartitionPaths(partitionPredicates) + prunedPartitions.map(partitionPath => partitionPath.getPath).toSet.mkString(",") } - def validateOrderColumns(orderColumns: String, metaClient: HoodieTableMetaClient): Unit = { + private def validateOrderColumns(orderColumns: String, metaClient: HoodieTableMetaClient): Unit = { if (orderColumns == null) { throw new HoodieClusteringException("Order columns is null") } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala index d8179d30f..ac866ba3e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala @@ -22,8 +22,8 @@ import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.spark.sql.catalyst.expressions.{Expression, Not} import org.apache.spark.sql.functions.col import org.apache.spark.sql.hudi.DataSkippingUtils -import org.apache.spark.sql.types._ -import org.apache.spark.sql.{Column, SparkSession} +import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType, VarcharType} +import org.apache.spark.sql.{Column, HoodieCatalystExpressionUtils, SparkSession} import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.params.ParameterizedTest @@ -73,8 +73,7 @@ class TestDataSkippingUtils extends HoodieClientTestBase { @ParameterizedTest @MethodSource(Array("testBaseLookupFilterExpressionsSource", "testAdvancedLookupFilterExpressionsSource")) def testLookupFilterExpressions(sourceExpr: String, input: Seq[IndexRow], output: Seq[String]): Unit = { - val resolvedExpr: Expression = HoodieCommonUtils.resolveFilterExpr(spark, sourceExpr, sourceTableSchema) - + val resolvedExpr: Expression = HoodieCatalystExpressionUtils.resolveFilterExpr(spark, sourceExpr, sourceTableSchema) val lookupFilter = DataSkippingUtils.createColumnStatsIndexFilterExpr(resolvedExpr, indexSchema) val spark2 = spark @@ -94,7 +93,7 @@ class TestDataSkippingUtils extends HoodieClientTestBase { @ParameterizedTest @MethodSource(Array("testStringsLookupFilterExpressionsSource")) def testStringsLookupFilterExpressions(sourceExpr: Expression, input: Seq[IndexRow], output: Seq[String]): Unit = { - val resolvedExpr = HoodieCommonUtils.resolveFilterExpr(spark, sourceExpr, sourceTableSchema) + val resolvedExpr = HoodieCatalystExpressionUtils.resolveFilterExpr(spark, sourceExpr, sourceTableSchema) val lookupFilter = DataSkippingUtils.createColumnStatsIndexFilterExpr(resolvedExpr, indexSchema) val spark2 = spark diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCallProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCallProcedure.scala index 52fe23711..eb2c614df 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCallProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCallProcedure.scala @@ -17,13 +17,6 @@ package org.apache.spark.sql.hudi -import org.apache.hadoop.fs.Path -import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline} -import org.apache.hudi.common.util.{Option => HOption} -import org.apache.hudi.{HoodieCommonUtils, HoodieDataSourceHelpers} - -import scala.collection.JavaConverters.asScalaIteratorConverter - class TestCallProcedure extends TestHoodieSqlBase { test("Test Call show_commits Procedure") { @@ -136,222 +129,4 @@ class TestCallProcedure extends TestHoodieSqlBase { assertResult(1){commits.length} } } - - test("Test Call run_clustering Procedure By Table") { - withTempDir { tmp => - Seq("cow", "mor").foreach { tableType => - val tableName = generateTableName - val basePath = s"${tmp.getCanonicalPath}/$tableName" - spark.sql( - s""" - |create table $tableName ( - | id int, - | name string, - | price double, - | ts long - |) using hudi - | options ( - | primaryKey ='id', - | type = '$tableType', - | preCombineField = 'ts' - | ) - | partitioned by(ts) - | location '$basePath' - """.stripMargin) - spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") - spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") - spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") - val client = HoodieCommonUtils.createHoodieClientFromPath(spark, basePath, Map.empty) - // Generate the first clustering plan - val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime - client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty()) - - // Generate the second clustering plan - spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)") - val secondScheduleInstant = HoodieActiveTimeline.createNewInstantTime - client.scheduleClusteringAtInstant(secondScheduleInstant, HOption.empty()) - checkAnswer(s"call show_clustering('$tableName')")( - Seq(firstScheduleInstant, 3), - Seq(secondScheduleInstant, 1) - ) - - // Do clustering for all clustering plan generated above, and no new clustering - // instant will be generated because of there is no commit after the second - // clustering plan generated - spark.sql(s"call run_clustering(table => '$tableName', order => 'ts')") - - // No new commits - val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf()) - assertResult(false)(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, secondScheduleInstant)) - - checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(1, "a1", 10.0, 1000), - Seq(2, "a2", 10.0, 1001), - Seq(3, "a3", 10.0, 1002), - Seq(4, "a4", 10.0, 1003) - ) - // After clustering there should be no pending clustering. - checkAnswer(s"call show_clustering(table => '$tableName')")() - - // Check the number of finished clustering instants - val finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) - .getInstants - .iterator().asScala - .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) - .toSeq - assertResult(2)(finishedClustering.size) - - // Do clustering without manual schedule(which will do the schedule if no pending clustering exists) - spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)") - spark.sql(s"insert into $tableName values(6, 'a6', 10, 1005)") - spark.sql(s"call run_clustering(table => '$tableName', order => 'ts')") - - val thirdClusteringInstant = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) - .findInstantsAfter(secondScheduleInstant) - .getInstants - .iterator().asScala - .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) - .toSeq - // Should have a new replace commit after the second clustering command. - assertResult(1)(thirdClusteringInstant.size) - - checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(1, "a1", 10.0, 1000), - Seq(2, "a2", 10.0, 1001), - Seq(3, "a3", 10.0, 1002), - Seq(4, "a4", 10.0, 1003), - Seq(5, "a5", 10.0, 1004), - Seq(6, "a6", 10.0, 1005) - ) - } - } - } - - test("Test Call run_clustering Procedure By Path") { - withTempDir { tmp => - Seq("cow", "mor").foreach { tableType => - val tableName = generateTableName - val basePath = s"${tmp.getCanonicalPath}/$tableName" - spark.sql( - s""" - |create table $tableName ( - | id int, - | name string, - | price double, - | ts long - |) using hudi - | options ( - | primaryKey ='id', - | type = '$tableType', - | preCombineField = 'ts' - | ) - | partitioned by(ts) - | location '$basePath' - """.stripMargin) - - spark.sql(s"call run_clustering(path => '$basePath')") - checkAnswer(s"call show_clustering(path => '$basePath')")() - - spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") - spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") - spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") - val client = HoodieCommonUtils.createHoodieClientFromPath(spark, basePath, Map.empty) - // Generate the first clustering plan - val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime - client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty()) - checkAnswer(s"call show_clustering(path => '$basePath')")( - Seq(firstScheduleInstant, 3) - ) - // Do clustering for all the clustering plan - spark.sql(s"call run_clustering(path => '$basePath', order => 'ts')") - checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(1, "a1", 10.0, 1000), - Seq(2, "a2", 10.0, 1001), - Seq(3, "a3", 10.0, 1002) - ) - val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf()) - HoodieDataSourceHelpers.hasNewCommits(fs, basePath, firstScheduleInstant) - - // Check the number of finished clustering instants - var finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) - .getInstants - .iterator().asScala - .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) - .toSeq - assertResult(1)(finishedClustering.size) - - // Do clustering without manual schedule(which will do the schedule if no pending clustering exists) - spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)") - spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)") - spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts >= 1003L')") - checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(1, "a1", 10.0, 1000), - Seq(2, "a2", 10.0, 1001), - Seq(3, "a3", 10.0, 1002), - Seq(4, "a4", 10.0, 1003), - Seq(5, "a5", 10.0, 1004) - ) - - finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) - .getInstants - .iterator().asScala - .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) - .toSeq - assertResult(2)(finishedClustering.size) - } - } - } - - test("Test Call run_clustering Procedure With Partition Pruning") { - withTempDir { tmp => - Seq("cow", "mor").foreach { tableType => - val tableName = generateTableName - val basePath = s"${tmp.getCanonicalPath}/$tableName" - spark.sql( - s""" - |create table $tableName ( - | id int, - | name string, - | price double, - | ts long - |) using hudi - | options ( - | primaryKey ='id', - | type = '$tableType', - | preCombineField = 'ts' - | ) - | partitioned by(ts) - | location '$basePath' - """.stripMargin) - spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") - spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") - spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") - - // Do clustering table with partition predicate - spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts <= 1001L', order => 'ts')") - - // Check the num of completed clustering instant - val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf()) - val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) - .getInstants - .iterator().asScala - .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) - .toSeq - assertResult(1)(clusteringInstants.size) - - val clusteringInstant = clusteringInstants.head - val clusteringPlan = HoodieDataSourceHelpers.getClusteringPlan(fs, basePath, clusteringInstant.getTimestamp) - assertResult(true)(clusteringPlan.isPresent) - assertResult(2)(clusteringPlan.get().getInputGroups.size()) - - checkAnswer(s"call show_clustering(table => '$tableName')")() - - checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(1, "a1", 10.0, 1000), - Seq(2, "a2", 10.0, 1001), - Seq(3, "a3", 10.0, 1002) - ) - } - } - } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestRunClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestRunClusteringProcedure.scala new file mode 100644 index 000000000..5ad0b2583 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestRunClusteringProcedure.scala @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi + +import org.apache.hadoop.fs.Path +import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline} +import org.apache.hudi.common.util.{Option => HOption} +import org.apache.hudi.{HoodieCLIUtils, HoodieDataSourceHelpers} + +import scala.collection.JavaConverters.asScalaIteratorConverter + +class TestRunClusteringProcedure extends TestHoodieSqlBase { + + test("Test Call run_clustering Procedure By Table") { + withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | options ( + | primaryKey ='id', + | type = '$tableType', + | preCombineField = 'ts' + | ) + | partitioned by(ts) + | location '$basePath' + """.stripMargin) + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") + val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty) + // Generate the first clustering plan + val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime + client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty()) + + // Generate the second clustering plan + spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)") + val secondScheduleInstant = HoodieActiveTimeline.createNewInstantTime + client.scheduleClusteringAtInstant(secondScheduleInstant, HOption.empty()) + checkAnswer(s"call show_clustering('$tableName')")( + Seq(firstScheduleInstant, 3), + Seq(secondScheduleInstant, 1) + ) + + // Do clustering for all clustering plan generated above, and no new clustering + // instant will be generated because of there is no commit after the second + // clustering plan generated + spark.sql(s"call run_clustering(table => '$tableName', order => 'ts')") + + // No new commits + val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf()) + assertResult(false)(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, secondScheduleInstant)) + + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 10.0, 1001), + Seq(3, "a3", 10.0, 1002), + Seq(4, "a4", 10.0, 1003) + ) + // After clustering there should be no pending clustering. + checkAnswer(s"call show_clustering(table => '$tableName')")() + + // Check the number of finished clustering instants + val finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) + .getInstants + .iterator().asScala + .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) + .toSeq + assertResult(2)(finishedClustering.size) + + // Do clustering without manual schedule(which will do the schedule if no pending clustering exists) + spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)") + spark.sql(s"insert into $tableName values(6, 'a6', 10, 1005)") + spark.sql(s"call run_clustering(table => '$tableName', order => 'ts')") + + val thirdClusteringInstant = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) + .findInstantsAfter(secondScheduleInstant) + .getInstants + .iterator().asScala + .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) + .toSeq + // Should have a new replace commit after the second clustering command. + assertResult(1)(thirdClusteringInstant.size) + + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 10.0, 1001), + Seq(3, "a3", 10.0, 1002), + Seq(4, "a4", 10.0, 1003), + Seq(5, "a5", 10.0, 1004), + Seq(6, "a6", 10.0, 1005) + ) + } + } + } + + test("Test Call run_clustering Procedure By Path") { + withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | options ( + | primaryKey ='id', + | type = '$tableType', + | preCombineField = 'ts' + | ) + | partitioned by(ts) + | location '$basePath' + """.stripMargin) + + spark.sql(s"call run_clustering(path => '$basePath')") + checkAnswer(s"call show_clustering(path => '$basePath')")() + + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") + val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty) + // Generate the first clustering plan + val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime + client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty()) + checkAnswer(s"call show_clustering(path => '$basePath')")( + Seq(firstScheduleInstant, 3) + ) + // Do clustering for all the clustering plan + spark.sql(s"call run_clustering(path => '$basePath', order => 'ts')") + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 10.0, 1001), + Seq(3, "a3", 10.0, 1002) + ) + val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf()) + HoodieDataSourceHelpers.hasNewCommits(fs, basePath, firstScheduleInstant) + + // Check the number of finished clustering instants + var finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) + .getInstants + .iterator().asScala + .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) + .toSeq + assertResult(1)(finishedClustering.size) + + // Do clustering without manual schedule(which will do the schedule if no pending clustering exists) + spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)") + spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)") + spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts >= 1003L')") + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 10.0, 1001), + Seq(3, "a3", 10.0, 1002), + Seq(4, "a4", 10.0, 1003), + Seq(5, "a5", 10.0, 1004) + ) + + finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) + .getInstants + .iterator().asScala + .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) + .toSeq + assertResult(2)(finishedClustering.size) + } + } + } + + test("Test Call run_clustering Procedure With Partition Pruning") { + withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | options ( + | primaryKey ='id', + | type = '$tableType', + | preCombineField = 'ts' + | ) + | partitioned by(ts) + | location '$basePath' + """.stripMargin) + + val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf()) + + // Test partition pruning with single predicate + { + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") + + checkException( + s"call run_clustering(table => '$tableName', predicate => 'ts <= 1001L and id = 10', order => 'ts')" + )("Only partition predicates are allowed") + + // Do clustering table with partition predicate + spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts <= 1001L', order => 'ts')") + + // There is 1 completed clustering instant + val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) + .getInstants + .iterator().asScala + .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) + .toSeq + assertResult(1)(clusteringInstants.size) + + // The latest clustering should contain 2 file groups + val clusteringInstant = clusteringInstants.last + val clusteringPlan = HoodieDataSourceHelpers.getClusteringPlan(fs, basePath, clusteringInstant.getTimestamp) + assertResult(true)(clusteringPlan.isPresent) + assertResult(2)(clusteringPlan.get().getInputGroups.size()) + + // No pending clustering instant + checkAnswer(s"call show_clustering(table => '$tableName')")() + + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 10.0, 1001), + Seq(3, "a3", 10.0, 1002) + ) + } + + // Test partition pruning with {@code And} predicates + { + spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)") + spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)") + spark.sql(s"insert into $tableName values(6, 'a6', 10, 1005)") + + checkException( + s"call run_clustering(table => '$tableName', predicate => 'ts > 1001L and ts <= 1005L and id = 10', order => 'ts')" + )("Only partition predicates are allowed") + + // Do clustering table with partition predicate + spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts > 1001L and ts <= 1005L', order => 'ts')") + + // There are 2 completed clustering instants + val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) + .getInstants + .iterator().asScala + .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) + .toSeq + assertResult(2)(clusteringInstants.size) + + // The latest clustering should contain 4 file groups(1002,1003,1004,1005) + val clusteringInstant = clusteringInstants.last + val clusteringPlan = HoodieDataSourceHelpers.getClusteringPlan(fs, basePath, clusteringInstant.getTimestamp) + assertResult(true)(clusteringPlan.isPresent) + assertResult(4)(clusteringPlan.get().getInputGroups.size()) + + // No pending clustering instant + checkAnswer(s"call show_clustering(table => '$tableName')")() + + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 10.0, 1001), + Seq(3, "a3", 10.0, 1002), + Seq(4, "a4", 10.0, 1003), + Seq(5, "a5", 10.0, 1004), + Seq(6, "a6", 10.0, 1005) + ) + } + + // Test partition pruning with {@code And}-{@code Or} predicates + { + spark.sql(s"insert into $tableName values(7, 'a7', 10, 1006)") + spark.sql(s"insert into $tableName values(8, 'a8', 10, 1007)") + spark.sql(s"insert into $tableName values(9, 'a9', 10, 1008)") + spark.sql(s"insert into $tableName values(10, 'a10', 10, 1009)") + + checkException( + s"call run_clustering(table => '$tableName', predicate => 'ts < 1007L or ts >= 1008L or id = 10', order => 'ts')" + )("Only partition predicates are allowed") + + // Do clustering table with partition predicate + spark.sql(s"call run_clustering(table => '$tableName', predicate => '(ts >= 1006L and ts < 1008L) or ts >= 1009L', order => 'ts')") + + // There are 3 completed clustering instants + val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) + .getInstants + .iterator().asScala + .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) + .toSeq + assertResult(3)(clusteringInstants.size) + + // The latest clustering should contain 3 file groups(1006,1007,1009) + val clusteringInstant = clusteringInstants.last + val clusteringPlan = HoodieDataSourceHelpers.getClusteringPlan(fs, basePath, clusteringInstant.getTimestamp) + assertResult(true)(clusteringPlan.isPresent) + assertResult(3)(clusteringPlan.get().getInputGroups.size()) + + // No pending clustering instant + checkAnswer(s"call show_clustering(table => '$tableName')")() + + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 10.0, 1001), + Seq(3, "a3", 10.0, 1002), + Seq(4, "a4", 10.0, 1003), + Seq(5, "a5", 10.0, 1004), + Seq(6, "a6", 10.0, 1005), + Seq(7, "a7", 10.0, 1006), + Seq(8, "a8", 10.0, 1007), + Seq(9, "a9", 10.0, 1008), + Seq(10, "a10", 10.0, 1009) + ) + } + } + } + } +}