From 56cb49485d74c28500101b81afe2cc41d90fb431 Mon Sep 17 00:00:00 2001 From: huberylee Date: Sat, 12 Mar 2022 05:23:19 +0800 Subject: [PATCH] [HUDI-3567] Refactor HoodieCommonUtils to make code more reasonable (#4982) --- .../hudi/BaseFileOnlyViewRelation.scala | 20 +- .../org/apache/hudi/HoodieCLIUtils.scala | 49 +++ .../org/apache/hudi/HoodieCommonUtils.scala | 286 --------------- .../org/apache/hudi/HoodieFileIndex.scala | 11 +- .../hudi/SparkHoodieTableFileIndex.scala | 192 +++++++++- .../sql/HoodieCatalystExpressionUtils.scala | 88 +++++ .../command/CompactionHoodiePathCommand.scala | 4 +- .../procedures/RunClusteringProcedure.scala | 71 ++-- .../apache/hudi/TestDataSkippingUtils.scala | 9 +- .../spark/sql/hudi/TestCallProcedure.scala | 225 ------------ .../sql/hudi/TestRunClusteringProcedure.scala | 344 ++++++++++++++++++ 11 files changed, 706 insertions(+), 593 deletions(-) create mode 100644 hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala delete mode 100644 hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCommonUtils.scala create mode 100644 hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestRunClusteringProcedure.scala diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala index adc34afc3..473bb2e24 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala @@ -24,9 +24,9 @@ import org.apache.hudi.HoodieBaseRelation.createBaseFileReader import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.hadoop.HoodieROTablePathFilter import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SQLContext} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.StructType @@ -70,7 +70,8 @@ class BaseFileOnlyViewRelation(sqlContext: SQLContext, HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns) val filterExpressions = convertToExpressions(filters) - val (partitionFilters, dataFilters) = filterExpressions.partition(isPartitionPredicate) + val (partitionFilters, dataFilters) = HoodieCatalystExpressionUtils.splitPartitionAndDataPredicates( + sparkSession, filterExpressions, partitionColumns) val filePartitions = getPartitions(partitionFilters, dataFilters) @@ -137,17 +138,4 @@ class BaseFileOnlyViewRelation(sqlContext: SQLContext, catalystExpressions.filter(_.isDefined).map(_.get).toArray } - - /** - * Checks whether given expression only references only references partition columns - * (and involves no sub-query) - */ - private def isPartitionPredicate(condition: Expression): Boolean = { - // Validates that the provided names both resolve to the same entity - val resolvedNameEquals = sparkSession.sessionState.analyzer.resolver - - condition.references.forall { r => partitionColumns.exists(resolvedNameEquals(r.name, _)) } && - !SubqueryExpression.hasSubquery(condition) - } - } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala new file mode 100644 index 000000000..58c332482 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.client.SparkRDDWriteClient +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.withSparkConf + +import scala.collection.JavaConverters.mapAsJavaMapConverter +import scala.collection.immutable.Map + +object HoodieCLIUtils { + + def createHoodieClientFromPath(sparkSession: SparkSession, + basePath: String, + conf: Map[String, String]): SparkRDDWriteClient[_] = { + val metaClient = HoodieTableMetaClient.builder().setBasePath(basePath) + .setConf(sparkSession.sessionState.newHadoopConf()).build() + val schemaUtil = new TableSchemaResolver(metaClient) + val schemaStr = schemaUtil.getTableAvroSchemaWithoutMetadataFields.toString + val finalParameters = HoodieWriterUtils.parametersWithWriteDefaults( + withSparkConf(sparkSession, Map.empty)( + conf + (DataSourceWriteOptions.TABLE_TYPE.key() -> metaClient.getTableType.name())) + ) + + val jsc = new JavaSparkContext(sparkSession.sparkContext) + DataSourceUtils.createHoodieClient(jsc, schemaStr, basePath, + metaClient.getTableConfig.getTableName, finalParameters.asJava) + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCommonUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCommonUtils.scala deleted file mode 100644 index d8189b1ad..000000000 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCommonUtils.scala +++ /dev/null @@ -1,286 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hudi - -import org.apache.hadoop.fs.Path -import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath -import org.apache.hudi.client.SparkRDDWriteClient -import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} -import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} -import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} -import org.apache.spark.api.java.JavaSparkContext -import org.apache.spark.internal.Logging -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate} -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation} -import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.catalyst.{InternalRow, expressions} -import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil -import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.withSparkConf -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{StringType, StructField, StructType} -import org.apache.spark.unsafe.types.UTF8String - -import scala.collection.JavaConverters.mapAsJavaMapConverter -import scala.collection.immutable.Map - -object HoodieCommonUtils extends Logging { - - def createHoodieClientFromPath(sparkSession: SparkSession, basePath: String, - conf: Map[String, String]): SparkRDDWriteClient[_] = { - val metaClient = HoodieTableMetaClient.builder().setBasePath(basePath) - .setConf(sparkSession.sessionState.newHadoopConf()).build() - val schemaUtil = new TableSchemaResolver(metaClient) - val schemaStr = schemaUtil.getTableAvroSchemaWithoutMetadataFields.toString - val finalParameters = HoodieWriterUtils.parametersWithWriteDefaults( - withSparkConf(sparkSession, Map.empty)( - conf + (DataSourceWriteOptions.TABLE_TYPE.key() -> metaClient.getTableType.name())) - ) - - val jsc = new JavaSparkContext(sparkSession.sparkContext) - DataSourceUtils.createHoodieClient(jsc, schemaStr, basePath, - metaClient.getTableConfig.getTableName, finalParameters.asJava) - } - - def getPartitionSchemaFromProperty(metaClient: HoodieTableMetaClient, - schemaSpec: Option[StructType]): StructType = { - val schema = schemaSpec.getOrElse({ - val schemaUtil = new TableSchemaResolver(metaClient) - AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema) - }) - - val tableConfig = metaClient.getTableConfig - val partitionColumns = tableConfig.getPartitionFields - val nameFieldMap = generateFieldMap(schema) - - if (partitionColumns.isPresent) { - // Note that key generator class name could be null - val keyGeneratorClassName = tableConfig.getKeyGeneratorClassName - if (classOf[TimestampBasedKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName) - || classOf[TimestampBasedAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)) { - val partitionFields = partitionColumns.get().map(column => StructField(column, StringType)) - StructType(partitionFields) - } else { - val partitionFields = partitionColumns.get().map(column => - nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column: '" + - s"$column' in the schema[${schema.fields.mkString(",")}]"))) - StructType(partitionFields) - } - } else { - // If the partition columns have not stored in hoodie.properties(the table that was - // created earlier), we trait it as a non-partitioned table. - logWarning("No partition columns available from hoodie.properties." + - " Partition pruning will not work") - new StructType() - } - } - - /** - * This method unravels [[StructType]] into a [[Map]] of pairs of dot-path notation with corresponding - * [[StructField]] object for every field of the provided [[StructType]], recursively. - * - * For example, following struct - *
-   * StructType(
-   * StructField("a",
-   * StructType(
-   * StructField("b", StringType),
-   * StructField("c", IntType)
-   * )
-   * )
-   * )
-   * 
- * - * will be converted into following mapping: - * - *
-   * "a.b" -> StructField("b", StringType),
-   * "a.c" -> StructField("c", IntType),
-   * 
- */ - def generateFieldMap(structType: StructType): Map[String, StructField] = { - def traverse(structField: Either[StructField, StructType]): Map[String, StructField] = { - structField match { - case Right(struct) => struct.fields.flatMap(f => traverse(Left(f))).toMap - case Left(field) => field.dataType match { - case struct: StructType => traverse(Right(struct)).map { - case (key, structField) => (s"${field.name}.$key", structField) - } - case _ => Map(field.name -> field) - } - } - } - - traverse(Right(structType)) - } - - /** - * Prune the partition by the filter.This implementation is fork from - * org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex#prunePartitions. - * - * @param partitionPaths All the partition paths. - * @param predicates The filter condition. - * @return The Pruned partition paths. - */ - def prunePartition(partitionSchema: StructType, - partitionPaths: Seq[PartitionPath], - predicates: Seq[Expression]): Seq[PartitionPath] = { - val partitionColumnNames = partitionSchema.fields.map(_.name).toSet - val partitionPruningPredicates = predicates.filter { - _.references.map(_.name).toSet.subsetOf(partitionColumnNames) - } - if (partitionPruningPredicates.nonEmpty) { - val predicate = partitionPruningPredicates.reduce(expressions.And) - prunePartition(partitionSchema, partitionPaths, predicate) - } else { - partitionPaths - } - } - - def prunePartition(partitionSchema: StructType, - partitionPaths: Seq[PartitionPath], - predicate: Expression): Seq[PartitionPath] = { - val boundPredicate = InterpretedPredicate(predicate.transform { - case a: AttributeReference => - val index = partitionSchema.indexWhere(a.name == _.name) - BoundReference(index, partitionSchema(index).dataType, nullable = true) - }) - - val prunedPartitionPaths = partitionPaths.filter { - partitionPath => boundPredicate.eval(InternalRow.fromSeq(partitionPath.values)) - } - - logInfo(s"Total partition size is: ${partitionPaths.size}," + - s" after partition prune size is: ${prunedPartitionPaths.size}") - prunedPartitionPaths - } - - def parsePartitionColumnValues(sparkParsePartitionUtil: SparkParsePartitionUtil, - configProperties: TypedProperties, - basePath: String, - partitionSchema: StructType, - partitionColumns: Array[String], - partitionPath: String): Array[Object] = { - if (partitionColumns.length == 0) { - // This is a non-partitioned table - Array.empty - } else { - val partitionFragments = partitionPath.split("/") - - if (partitionFragments.length != partitionColumns.length && - partitionColumns.length == 1) { - // If the partition column size is not equal to the partition fragment size - // and the partition column size is 1, we map the whole partition path - // to the partition column which can benefit from the partition prune. - val prefix = s"${partitionColumns.head}=" - val partitionValue = if (partitionPath.startsWith(prefix)) { - // support hive style partition path - partitionPath.substring(prefix.length) - } else { - partitionPath - } - Array(UTF8String.fromString(partitionValue)) - } else if (partitionFragments.length != partitionColumns.length && - partitionColumns.length > 1) { - // If the partition column size is not equal to the partition fragments size - // and the partition column size > 1, we do not know how to map the partition - // fragments to the partition columns. So we trait it as a Non-Partitioned Table - // for the query which do not benefit from the partition prune. - logWarning(s"Cannot do the partition prune for table $basePath." + - s"The partitionFragments size (${partitionFragments.mkString(",")})" + - s" is not equal to the partition columns size(${partitionColumns.mkString(",")})") - Array.empty - } else { - // If partitionSeqs.length == partitionSchema.fields.length - // Append partition name to the partition value if the - // HIVE_STYLE_PARTITIONING is disable. - // e.g. convert "/xx/xx/2021/02" to "/xx/xx/year=2021/month=02" - val partitionWithName = - partitionFragments.zip(partitionColumns).map { - case (partition, columnName) => - if (partition.indexOf("=") == -1) { - s"${columnName}=$partition" - } else { - partition - } - }.mkString("/") - - val pathWithPartitionName = new Path(basePath, partitionWithName) - val partitionValues = parsePartitionPath(sparkParsePartitionUtil, configProperties, basePath, - pathWithPartitionName, partitionSchema) - - partitionValues.map(_.asInstanceOf[Object]).toArray - } - } - } - - private def parsePartitionPath(sparkParsePartitionUtil: SparkParsePartitionUtil, - configProperties: TypedProperties, - basePath: String, - partitionPath: Path, - partitionSchema: StructType): Seq[Any] = { - val timeZoneId = configProperties.getString(DateTimeUtils.TIMEZONE_OPTION, SQLConf.get.sessionLocalTimeZone) - val partitionDataTypes = partitionSchema.map(f => f.name -> f.dataType).toMap - - sparkParsePartitionUtil.parsePartition( - partitionPath, - typeInference = false, - Set(new Path(basePath)), - partitionDataTypes, - DateTimeUtils.getTimeZone(timeZoneId) - ) - .toSeq(partitionSchema) - } - - def getConfigProperties(spark: SparkSession, options: Map[String, String]): TypedProperties = { - val sqlConf: SQLConf = spark.sessionState.conf - val properties = new TypedProperties() - - // To support metadata listing via Spark SQL we allow users to pass the config via SQL Conf in spark session. Users - // would be able to run SET hoodie.metadata.enable=true in the spark sql session to enable metadata listing. - properties.setProperty(HoodieMetadataConfig.ENABLE.key(), - sqlConf.getConfString(HoodieMetadataConfig.ENABLE.key(), - HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS.toString)) - properties.putAll(options.asJava) - properties - } - - def resolveFilterExpr(spark: SparkSession, exprString: String, tableSchema: StructType): Expression = { - val expr = spark.sessionState.sqlParser.parseExpression(exprString) - resolveFilterExpr(spark, expr, tableSchema) - } - - def resolveFilterExpr(spark: SparkSession, expr: Expression, tableSchema: StructType): Expression = { - val schemaFields = tableSchema.fields - val resolvedExpr = spark.sessionState.analyzer.ResolveReferences( - Filter(expr, LocalRelation(schemaFields.head, schemaFields.drop(1): _*)) - ) - .asInstanceOf[Filter].condition - - checkForUnresolvedRefs(resolvedExpr) - } - - private def checkForUnresolvedRefs(resolvedExpr: Expression): Expression = - resolvedExpr match { - case UnresolvedAttribute(_) => throw new IllegalStateException("unresolved attribute") - case _ => resolvedExpr.mapChildren(e => checkForUnresolvedRefs(e)) - } -} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index fc965aa8c..9cdf5cc63 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -19,6 +19,7 @@ package org.apache.hudi import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hudi.HoodieFileIndex.getConfigProperties import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.util.StringUtils @@ -36,10 +37,11 @@ import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.sql.{AnalysisException, Column, SparkSession} import org.apache.spark.unsafe.types.UTF8String -import java.text.SimpleDateFormat import scala.collection.JavaConverters._ -import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} +import scala.util.control.NonFatal + +import java.text.SimpleDateFormat /** * A file index which support partition prune for hoodie snapshot and read-optimized query. @@ -73,7 +75,7 @@ case class HoodieFileIndex(spark: SparkSession, spark = spark, metaClient = metaClient, schemaSpec = schemaSpec, - configProperties = HoodieCommonUtils.getConfigProperties(spark, options), + configProperties = getConfigProperties(spark, options), queryPaths = Seq(HoodieFileIndex.getQueryPath(options)), specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant), fileStatusCache = fileStatusCache @@ -147,8 +149,7 @@ case class HoodieFileIndex(spark: SparkSession, Seq(PartitionDirectory(InternalRow.empty, candidateFiles)) } else { // Prune the partition path by the partition filters - val prunedPartitions = HoodieCommonUtils.prunePartition(partitionSchema, - cachedAllInputFileSlices.keySet.asScala.toSeq, convertedPartitionFilters) + val prunedPartitions = prunePartition(cachedAllInputFileSlices.keySet.asScala.toSeq, convertedPartitionFilters) var totalFileSize = 0 var candidateFileSize = 0 diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index c13adf3ab..d4c50b73f 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -18,18 +18,24 @@ package org.apache.hudi import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_INCREMENTAL_OPT_VAL, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL} -import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, toJavaOption} +import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, generateFieldMap, toJavaOption} import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.{InternalRow, expressions} import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.unsafe.types.UTF8String import scala.collection.JavaConverters._ import scala.language.implicitConversions @@ -78,8 +84,32 @@ class SparkHoodieTableFileIndex(spark: SparkSession, /** * Get the partition schema from the hoodie.properties. */ - private lazy val _partitionSchemaFromProperties: StructType = - HoodieCommonUtils.getPartitionSchemaFromProperty(metaClient, Some(schema)) + private lazy val _partitionSchemaFromProperties: StructType = { + val tableConfig = metaClient.getTableConfig + val partitionColumns = tableConfig.getPartitionFields + val nameFieldMap = generateFieldMap(schema) + + if (partitionColumns.isPresent) { + // Note that key generator class name could be null + val keyGeneratorClassName = tableConfig.getKeyGeneratorClassName + if (classOf[TimestampBasedKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName) + || classOf[TimestampBasedAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)) { + val partitionFields = partitionColumns.get().map(column => StructField(column, StringType)) + StructType(partitionFields) + } else { + val partitionFields = partitionColumns.get().map(column => + nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column: '" + + s"$column' in the schema[${schema.fields.mkString(",")}]"))) + StructType(partitionFields) + } + } else { + // If the partition columns have not stored in hoodie.properties(the table that was + // created earlier), we trait it as a non-partitioned table. + logWarning("No partition columns available from hoodie.properties." + + " Partition pruning will not work") + new StructType() + } + } /** * Get the data schema of the table. @@ -110,16 +140,121 @@ class SparkHoodieTableFileIndex(spark: SparkSession, */ def listFileSlices(partitionFilters: Seq[Expression]): Map[String, Seq[FileSlice]] = { // Prune the partition path by the partition filters - val prunedPartitions = HoodieCommonUtils.prunePartition(partitionSchema, - cachedAllInputFileSlices.asScala.keys.toSeq, partitionFilters) + val prunedPartitions = prunePartition(cachedAllInputFileSlices.asScala.keys.toSeq, partitionFilters) prunedPartitions.map(partition => { (partition.path, cachedAllInputFileSlices.get(partition).asScala) }).toMap } + /** + * Get all the cached partition paths pruned by the filter. + * + * @param predicates The filter condition + * @return The pruned partition paths + */ + def getPartitionPaths(predicates: Seq[Expression]): Seq[PartitionPath] = { + prunePartition(cachedAllInputFileSlices.keySet().asScala.toSeq, predicates) + } + + /** + * Prune the partition by the filter.This implementation is fork from + * org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex#prunePartitions. + * + * @param partitionPaths All the partition paths. + * @param predicates The filter condition. + * @return The pruned partition paths. + */ + protected def prunePartition(partitionPaths: Seq[PartitionPath], predicates: Seq[Expression]): Seq[PartitionPath] = { + val partitionColumnNames = partitionSchema.fields.map(_.name).toSet + val partitionPruningPredicates = predicates.filter { + _.references.map(_.name).toSet.subsetOf(partitionColumnNames) + } + if (partitionPruningPredicates.nonEmpty) { + val predicate = partitionPruningPredicates.reduce(expressions.And) + + val boundPredicate = InterpretedPredicate(predicate.transform { + case a: AttributeReference => + val index = partitionSchema.indexWhere(a.name == _.name) + BoundReference(index, partitionSchema(index).dataType, nullable = true) + }) + + val prunedPartitionPaths = partitionPaths.filter { + partitionPath => boundPredicate.eval(InternalRow.fromSeq(partitionPath.values)) + } + + logInfo(s"Total partition size is: ${partitionPaths.size}," + + s" after partition prune size is: ${prunedPartitionPaths.size}") + prunedPartitionPaths + } else { + partitionPaths + } + } + protected def parsePartitionColumnValues(partitionColumns: Array[String], partitionPath: String): Array[Object] = { - HoodieCommonUtils.parsePartitionColumnValues(sparkParsePartitionUtil, configProperties, - basePath, partitionSchema, partitionColumns, partitionPath) + if (partitionColumns.length == 0) { + // This is a non-partitioned table + Array.empty + } else { + val partitionFragments = partitionPath.split("/") + + if (partitionFragments.length != partitionColumns.length && + partitionColumns.length == 1) { + // If the partition column size is not equal to the partition fragment size + // and the partition column size is 1, we map the whole partition path + // to the partition column which can benefit from the partition prune. + val prefix = s"${partitionColumns.head}=" + val partitionValue = if (partitionPath.startsWith(prefix)) { + // support hive style partition path + partitionPath.substring(prefix.length) + } else { + partitionPath + } + Array(UTF8String.fromString(partitionValue)) + } else if (partitionFragments.length != partitionColumns.length && + partitionColumns.length > 1) { + // If the partition column size is not equal to the partition fragments size + // and the partition column size > 1, we do not know how to map the partition + // fragments to the partition columns. So we trait it as a Non-Partitioned Table + // for the query which do not benefit from the partition prune. + logWarning(s"Cannot do the partition prune for table $basePath." + + s"The partitionFragments size (${partitionFragments.mkString(",")})" + + s" is not equal to the partition columns size(${partitionColumns.mkString(",")})") + Array.empty + } else { + // If partitionSeqs.length == partitionSchema.fields.length + // Append partition name to the partition value if the + // HIVE_STYLE_PARTITIONING is disable. + // e.g. convert "/xx/xx/2021/02" to "/xx/xx/year=2021/month=02" + val partitionWithName = + partitionFragments.zip(partitionColumns).map { + case (partition, columnName) => + if (partition.indexOf("=") == -1) { + s"${columnName}=$partition" + } else { + partition + } + }.mkString("/") + + val pathWithPartitionName = new Path(basePath, partitionWithName) + val partitionValues = parsePartitionPath(pathWithPartitionName, partitionSchema) + + partitionValues.map(_.asInstanceOf[Object]).toArray + } + } + } + + private def parsePartitionPath(partitionPath: Path, partitionSchema: StructType): Seq[Any] = { + val timeZoneId = configProperties.getString(DateTimeUtils.TIMEZONE_OPTION, SQLConf.get.sessionLocalTimeZone) + val partitionDataTypes = partitionSchema.map(f => f.name -> f.dataType).toMap + + sparkParsePartitionUtil.parsePartition( + partitionPath, + typeInference = false, + Set(new Path(basePath)), + partitionDataTypes, + DateTimeUtils.getTimeZone(timeZoneId) + ) + .toSeq(partitionSchema) } } @@ -132,6 +267,45 @@ object SparkHoodieTableFileIndex { org.apache.hudi.common.util.Option.empty() } + /** + * This method unravels [[StructType]] into a [[Map]] of pairs of dot-path notation with corresponding + * [[StructField]] object for every field of the provided [[StructType]], recursively. + * + * For example, following struct + *
+   *   StructType(
+   *     StructField("a",
+   *       StructType(
+   *          StructField("b", StringType),
+   *          StructField("c", IntType)
+   *       )
+   *     )
+   *   )
+   * 
+ * + * will be converted into following mapping: + * + *
+   *   "a.b" -> StructField("b", StringType),
+   *   "a.c" -> StructField("c", IntType),
+   * 
+ */ + private def generateFieldMap(structType: StructType) : Map[String, StructField] = { + def traverse(structField: Either[StructField, StructType]) : Map[String, StructField] = { + structField match { + case Right(struct) => struct.fields.flatMap(f => traverse(Left(f))).toMap + case Left(field) => field.dataType match { + case struct: StructType => traverse(Right(struct)).map { + case (key, structField) => (s"${field.name}.$key", structField) + } + case _ => Map(field.name -> field) + } + } + } + + traverse(Right(structType)) + } + private def deduceQueryType(configProperties: TypedProperties): HoodieTableQueryType = { configProperties.asScala(QUERY_TYPE.key()) match { case QUERY_TYPE_SNAPSHOT_OPT_VAL => HoodieTableQueryType.SNAPSHOT diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala new file mode 100644 index 000000000..d640c0226 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation} +import org.apache.spark.sql.types.StructType + +object HoodieCatalystExpressionUtils { + + /** + * Resolve filter expression from string expr with given table schema, for example: + *
+   *   ts > 1000 and ts <= 1500
+   * 
+ * will be resolved as + *
+   *   And(GreaterThan(ts#590L > 1000), LessThanOrEqual(ts#590L <= 1500))
+   * 
+ * + * @param spark The spark session + * @param exprString String to be resolved + * @param tableSchema The table schema + * @return Resolved filter expression + */ + def resolveFilterExpr(spark: SparkSession, exprString: String, tableSchema: StructType): Expression = { + val expr = spark.sessionState.sqlParser.parseExpression(exprString) + resolveFilterExpr(spark, expr, tableSchema) + } + + def resolveFilterExpr(spark: SparkSession, expr: Expression, tableSchema: StructType): Expression = { + val schemaFields = tableSchema.fields + val resolvedExpr = spark.sessionState.analyzer.ResolveReferences( + Filter(expr, + LocalRelation(schemaFields.head, schemaFields.drop(1): _*)) + ) + .asInstanceOf[Filter].condition + + checkForUnresolvedRefs(resolvedExpr) + } + + private def checkForUnresolvedRefs(resolvedExpr: Expression): Expression = + resolvedExpr match { + case UnresolvedAttribute(_) => throw new IllegalStateException("unresolved attribute") + case _ => resolvedExpr.mapChildren(e => checkForUnresolvedRefs(e)) + } + + /** + * Split the given predicates into two sequence predicates: + * - predicates that references partition columns only(and involves no sub-query); + * - other predicates. + * + * @param sparkSession The spark session + * @param predicates The predicates to be split + * @param partitionColumns The partition columns + * @return (partitionFilters, dataFilters) + */ + def splitPartitionAndDataPredicates(sparkSession: SparkSession, + predicates: Array[Expression], + partitionColumns: Array[String]): (Array[Expression], Array[Expression]) = { + // Validates that the provided names both resolve to the same entity + val resolvedNameEquals = sparkSession.sessionState.analyzer.resolver + + predicates.partition(expr => { + // Checks whether given expression only references partition columns(and involves no sub-query) + expr.references.forall(r => partitionColumns.exists(resolvedNameEquals(r.name, _))) && + !SubqueryExpression.hasSubquery(expr) + }) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala index 59d38923b..1135981a9 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hudi.command -import org.apache.hudi.HoodieCommonUtils +import org.apache.hudi.HoodieCLIUtils import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieTableType} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline} @@ -42,7 +42,7 @@ case class CompactionHoodiePathCommand(path: String, assert(metaClient.getTableType == HoodieTableType.MERGE_ON_READ, s"Must compaction on a Merge On Read table.") - val client = HoodieCommonUtils.createHoodieClientFromPath(sparkSession, path, Map.empty) + val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, path, Map.empty) operation match { case SCHEDULE => diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala index a98cdce54..442ee0441 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala @@ -17,29 +17,24 @@ package org.apache.spark.sql.hudi.command.procedures -import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath -import org.apache.hudi.client.common.HoodieSparkEngineContext -import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL} import org.apache.hudi.common.table.timeline.HoodieActiveTimeline -import org.apache.hudi.common.table.view.FileSystemViewStorageConfig import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.util.ValidationUtils.checkArgument import org.apache.hudi.common.util.{ClusteringUtils, Option => HOption} import org.apache.hudi.config.HoodieClusteringConfig import org.apache.hudi.exception.HoodieClusteringException -import org.apache.hudi.metadata.HoodieTableMetadata -import org.apache.hudi.{HoodieCommonUtils, SparkAdapterSupport} -import org.apache.spark.api.java.JavaSparkContext +import org.apache.hudi.{AvroConversionUtils, HoodieCLIUtils, HoodieFileIndex} import org.apache.spark.internal.Logging -import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.{HoodieCatalystExpressionUtils, Row} +import org.apache.spark.sql.catalyst.expressions.PredicateHelper +import org.apache.spark.sql.execution.datasources.FileStatusCache import org.apache.spark.sql.types._ -import java.util.Properties import java.util.function.Supplier -import scala.collection.JavaConverters import scala.collection.JavaConverters._ -class RunClusteringProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport with Logging { +class RunClusteringProcedure extends BaseProcedure with ProcedureBuilder with PredicateHelper with Logging { /** * OPTIMIZE table_name|table_path [WHERE predicate] * [ORDER BY (col_name1 [, ...] ) ] @@ -74,15 +69,12 @@ class RunClusteringProcedure extends BaseProcedure with ProcedureBuilder with Sp var conf: Map[String, String] = Map.empty predicate match { case Some(p) => - val partitionColumnsSchema = HoodieCommonUtils.getPartitionSchemaFromProperty(metaClient, None) - val partitionPredicate = HoodieCommonUtils.resolveFilterExpr( - spark, p.asInstanceOf[String], partitionColumnsSchema) - val partitionSelected = prunePartition(metaClient, partitionPredicate) + val prunedPartitions = prunePartition(metaClient, p.asInstanceOf[String]) conf = conf ++ Map( HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key() -> "SELECTED_PARTITIONS", - HoodieClusteringConfig.PARTITION_SELECTED.key() -> partitionSelected + HoodieClusteringConfig.PARTITION_SELECTED.key() -> prunedPartitions ) - logInfo(s"Partition predicates: ${p}, partition selected: ${partitionSelected}") + logInfo(s"Partition predicates: ${p}, partition selected: ${prunedPartitions}") case _ => logInfo("No partition predicates") } @@ -104,7 +96,7 @@ class RunClusteringProcedure extends BaseProcedure with ProcedureBuilder with Sp .iterator().asScala.map(_.getLeft.getTimestamp).toSeq.sortBy(f => f) logInfo(s"Pending clustering instants: ${pendingClustering.mkString(",")}") - val client = HoodieCommonUtils.createHoodieClientFromPath(sparkSession, basePath, conf) + val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, conf) val instantTime = HoodieActiveTimeline.createNewInstantTime if (client.scheduleClusteringAtInstant(instantTime, HOption.empty())) { pendingClustering ++= Seq(instantTime) @@ -120,37 +112,26 @@ class RunClusteringProcedure extends BaseProcedure with ProcedureBuilder with Sp override def build: Procedure = new RunClusteringProcedure() - def prunePartition(metaClient: HoodieTableMetaClient, partitionPredicate: Expression): String = { - val partitionSchema = HoodieCommonUtils.getPartitionSchemaFromProperty(metaClient, None) - - // Get tableName meta data - val engineContext = new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext)) - val properties = new Properties() - properties.putAll(JavaConverters.mapAsJavaMapConverter(sparkSession.sessionState.conf.getAllConfs).asJava) - val metadataConfig = HoodieMetadataConfig.newBuilder().fromProperties(properties).build() - val tableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, metaClient.getBasePath, - FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue) - - val sparkParsePartitionUtil = sparkAdapter.createSparkParsePartitionUtil(sparkSession.sessionState.conf) - val typedProperties = HoodieCommonUtils.getConfigProperties(sparkSession, Map.empty) + def prunePartition(metaClient: HoodieTableMetaClient, predicate: String): String = { + val options = Map(QUERY_TYPE.key() -> QUERY_TYPE_SNAPSHOT_OPT_VAL, "path" -> metaClient.getBasePath) + val hoodieFileIndex = HoodieFileIndex(sparkSession, metaClient, None, options, + FileStatusCache.getOrCreate(sparkSession)) + // Resolve partition predicates + val schemaResolver = new TableSchemaResolver(metaClient) + val tableSchema = AvroConversionUtils.convertAvroSchemaToStructType(schemaResolver.getTableAvroSchema) + val condition = HoodieCatalystExpressionUtils.resolveFilterExpr(sparkSession, predicate, tableSchema) val partitionColumns = metaClient.getTableConfig.getPartitionFields.orElse(Array[String]()) + val (partitionPredicates, dataPredicates) = HoodieCatalystExpressionUtils.splitPartitionAndDataPredicates( + sparkSession, splitConjunctivePredicates(condition).toArray, partitionColumns) + checkArgument(dataPredicates.isEmpty, "Only partition predicates are allowed") - // Translate all partition path to {@code org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath} - val partitionPaths = tableMetadata.getAllPartitionPaths.asScala.map(partitionPath => { - val partitionColumnValues = HoodieCommonUtils.parsePartitionColumnValues( - sparkParsePartitionUtil, typedProperties, metaClient.getBasePath, - partitionSchema, partitionColumns, partitionPath) - new PartitionPath(partitionPath, partitionColumnValues) - }) - - // Filter partition by predicates - val selectedPartitions = HoodieCommonUtils.prunePartition( - partitionSchema, partitionPaths, partitionPredicate) - selectedPartitions.map(partitionPath => partitionPath.getPath).toSet.mkString(",") + // Get all partitions and prune partition by predicates + val prunedPartitions = hoodieFileIndex.getPartitionPaths(partitionPredicates) + prunedPartitions.map(partitionPath => partitionPath.getPath).toSet.mkString(",") } - def validateOrderColumns(orderColumns: String, metaClient: HoodieTableMetaClient): Unit = { + private def validateOrderColumns(orderColumns: String, metaClient: HoodieTableMetaClient): Unit = { if (orderColumns == null) { throw new HoodieClusteringException("Order columns is null") } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala index d8179d30f..ac866ba3e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala @@ -22,8 +22,8 @@ import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.spark.sql.catalyst.expressions.{Expression, Not} import org.apache.spark.sql.functions.col import org.apache.spark.sql.hudi.DataSkippingUtils -import org.apache.spark.sql.types._ -import org.apache.spark.sql.{Column, SparkSession} +import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType, VarcharType} +import org.apache.spark.sql.{Column, HoodieCatalystExpressionUtils, SparkSession} import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.params.ParameterizedTest @@ -73,8 +73,7 @@ class TestDataSkippingUtils extends HoodieClientTestBase { @ParameterizedTest @MethodSource(Array("testBaseLookupFilterExpressionsSource", "testAdvancedLookupFilterExpressionsSource")) def testLookupFilterExpressions(sourceExpr: String, input: Seq[IndexRow], output: Seq[String]): Unit = { - val resolvedExpr: Expression = HoodieCommonUtils.resolveFilterExpr(spark, sourceExpr, sourceTableSchema) - + val resolvedExpr: Expression = HoodieCatalystExpressionUtils.resolveFilterExpr(spark, sourceExpr, sourceTableSchema) val lookupFilter = DataSkippingUtils.createColumnStatsIndexFilterExpr(resolvedExpr, indexSchema) val spark2 = spark @@ -94,7 +93,7 @@ class TestDataSkippingUtils extends HoodieClientTestBase { @ParameterizedTest @MethodSource(Array("testStringsLookupFilterExpressionsSource")) def testStringsLookupFilterExpressions(sourceExpr: Expression, input: Seq[IndexRow], output: Seq[String]): Unit = { - val resolvedExpr = HoodieCommonUtils.resolveFilterExpr(spark, sourceExpr, sourceTableSchema) + val resolvedExpr = HoodieCatalystExpressionUtils.resolveFilterExpr(spark, sourceExpr, sourceTableSchema) val lookupFilter = DataSkippingUtils.createColumnStatsIndexFilterExpr(resolvedExpr, indexSchema) val spark2 = spark diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCallProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCallProcedure.scala index 52fe23711..eb2c614df 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCallProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCallProcedure.scala @@ -17,13 +17,6 @@ package org.apache.spark.sql.hudi -import org.apache.hadoop.fs.Path -import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline} -import org.apache.hudi.common.util.{Option => HOption} -import org.apache.hudi.{HoodieCommonUtils, HoodieDataSourceHelpers} - -import scala.collection.JavaConverters.asScalaIteratorConverter - class TestCallProcedure extends TestHoodieSqlBase { test("Test Call show_commits Procedure") { @@ -136,222 +129,4 @@ class TestCallProcedure extends TestHoodieSqlBase { assertResult(1){commits.length} } } - - test("Test Call run_clustering Procedure By Table") { - withTempDir { tmp => - Seq("cow", "mor").foreach { tableType => - val tableName = generateTableName - val basePath = s"${tmp.getCanonicalPath}/$tableName" - spark.sql( - s""" - |create table $tableName ( - | id int, - | name string, - | price double, - | ts long - |) using hudi - | options ( - | primaryKey ='id', - | type = '$tableType', - | preCombineField = 'ts' - | ) - | partitioned by(ts) - | location '$basePath' - """.stripMargin) - spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") - spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") - spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") - val client = HoodieCommonUtils.createHoodieClientFromPath(spark, basePath, Map.empty) - // Generate the first clustering plan - val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime - client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty()) - - // Generate the second clustering plan - spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)") - val secondScheduleInstant = HoodieActiveTimeline.createNewInstantTime - client.scheduleClusteringAtInstant(secondScheduleInstant, HOption.empty()) - checkAnswer(s"call show_clustering('$tableName')")( - Seq(firstScheduleInstant, 3), - Seq(secondScheduleInstant, 1) - ) - - // Do clustering for all clustering plan generated above, and no new clustering - // instant will be generated because of there is no commit after the second - // clustering plan generated - spark.sql(s"call run_clustering(table => '$tableName', order => 'ts')") - - // No new commits - val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf()) - assertResult(false)(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, secondScheduleInstant)) - - checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(1, "a1", 10.0, 1000), - Seq(2, "a2", 10.0, 1001), - Seq(3, "a3", 10.0, 1002), - Seq(4, "a4", 10.0, 1003) - ) - // After clustering there should be no pending clustering. - checkAnswer(s"call show_clustering(table => '$tableName')")() - - // Check the number of finished clustering instants - val finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) - .getInstants - .iterator().asScala - .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) - .toSeq - assertResult(2)(finishedClustering.size) - - // Do clustering without manual schedule(which will do the schedule if no pending clustering exists) - spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)") - spark.sql(s"insert into $tableName values(6, 'a6', 10, 1005)") - spark.sql(s"call run_clustering(table => '$tableName', order => 'ts')") - - val thirdClusteringInstant = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) - .findInstantsAfter(secondScheduleInstant) - .getInstants - .iterator().asScala - .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) - .toSeq - // Should have a new replace commit after the second clustering command. - assertResult(1)(thirdClusteringInstant.size) - - checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(1, "a1", 10.0, 1000), - Seq(2, "a2", 10.0, 1001), - Seq(3, "a3", 10.0, 1002), - Seq(4, "a4", 10.0, 1003), - Seq(5, "a5", 10.0, 1004), - Seq(6, "a6", 10.0, 1005) - ) - } - } - } - - test("Test Call run_clustering Procedure By Path") { - withTempDir { tmp => - Seq("cow", "mor").foreach { tableType => - val tableName = generateTableName - val basePath = s"${tmp.getCanonicalPath}/$tableName" - spark.sql( - s""" - |create table $tableName ( - | id int, - | name string, - | price double, - | ts long - |) using hudi - | options ( - | primaryKey ='id', - | type = '$tableType', - | preCombineField = 'ts' - | ) - | partitioned by(ts) - | location '$basePath' - """.stripMargin) - - spark.sql(s"call run_clustering(path => '$basePath')") - checkAnswer(s"call show_clustering(path => '$basePath')")() - - spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") - spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") - spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") - val client = HoodieCommonUtils.createHoodieClientFromPath(spark, basePath, Map.empty) - // Generate the first clustering plan - val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime - client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty()) - checkAnswer(s"call show_clustering(path => '$basePath')")( - Seq(firstScheduleInstant, 3) - ) - // Do clustering for all the clustering plan - spark.sql(s"call run_clustering(path => '$basePath', order => 'ts')") - checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(1, "a1", 10.0, 1000), - Seq(2, "a2", 10.0, 1001), - Seq(3, "a3", 10.0, 1002) - ) - val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf()) - HoodieDataSourceHelpers.hasNewCommits(fs, basePath, firstScheduleInstant) - - // Check the number of finished clustering instants - var finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) - .getInstants - .iterator().asScala - .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) - .toSeq - assertResult(1)(finishedClustering.size) - - // Do clustering without manual schedule(which will do the schedule if no pending clustering exists) - spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)") - spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)") - spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts >= 1003L')") - checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(1, "a1", 10.0, 1000), - Seq(2, "a2", 10.0, 1001), - Seq(3, "a3", 10.0, 1002), - Seq(4, "a4", 10.0, 1003), - Seq(5, "a5", 10.0, 1004) - ) - - finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) - .getInstants - .iterator().asScala - .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) - .toSeq - assertResult(2)(finishedClustering.size) - } - } - } - - test("Test Call run_clustering Procedure With Partition Pruning") { - withTempDir { tmp => - Seq("cow", "mor").foreach { tableType => - val tableName = generateTableName - val basePath = s"${tmp.getCanonicalPath}/$tableName" - spark.sql( - s""" - |create table $tableName ( - | id int, - | name string, - | price double, - | ts long - |) using hudi - | options ( - | primaryKey ='id', - | type = '$tableType', - | preCombineField = 'ts' - | ) - | partitioned by(ts) - | location '$basePath' - """.stripMargin) - spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") - spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") - spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") - - // Do clustering table with partition predicate - spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts <= 1001L', order => 'ts')") - - // Check the num of completed clustering instant - val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf()) - val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) - .getInstants - .iterator().asScala - .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) - .toSeq - assertResult(1)(clusteringInstants.size) - - val clusteringInstant = clusteringInstants.head - val clusteringPlan = HoodieDataSourceHelpers.getClusteringPlan(fs, basePath, clusteringInstant.getTimestamp) - assertResult(true)(clusteringPlan.isPresent) - assertResult(2)(clusteringPlan.get().getInputGroups.size()) - - checkAnswer(s"call show_clustering(table => '$tableName')")() - - checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(1, "a1", 10.0, 1000), - Seq(2, "a2", 10.0, 1001), - Seq(3, "a3", 10.0, 1002) - ) - } - } - } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestRunClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestRunClusteringProcedure.scala new file mode 100644 index 000000000..5ad0b2583 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestRunClusteringProcedure.scala @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi + +import org.apache.hadoop.fs.Path +import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline} +import org.apache.hudi.common.util.{Option => HOption} +import org.apache.hudi.{HoodieCLIUtils, HoodieDataSourceHelpers} + +import scala.collection.JavaConverters.asScalaIteratorConverter + +class TestRunClusteringProcedure extends TestHoodieSqlBase { + + test("Test Call run_clustering Procedure By Table") { + withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | options ( + | primaryKey ='id', + | type = '$tableType', + | preCombineField = 'ts' + | ) + | partitioned by(ts) + | location '$basePath' + """.stripMargin) + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") + val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty) + // Generate the first clustering plan + val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime + client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty()) + + // Generate the second clustering plan + spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)") + val secondScheduleInstant = HoodieActiveTimeline.createNewInstantTime + client.scheduleClusteringAtInstant(secondScheduleInstant, HOption.empty()) + checkAnswer(s"call show_clustering('$tableName')")( + Seq(firstScheduleInstant, 3), + Seq(secondScheduleInstant, 1) + ) + + // Do clustering for all clustering plan generated above, and no new clustering + // instant will be generated because of there is no commit after the second + // clustering plan generated + spark.sql(s"call run_clustering(table => '$tableName', order => 'ts')") + + // No new commits + val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf()) + assertResult(false)(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, secondScheduleInstant)) + + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 10.0, 1001), + Seq(3, "a3", 10.0, 1002), + Seq(4, "a4", 10.0, 1003) + ) + // After clustering there should be no pending clustering. + checkAnswer(s"call show_clustering(table => '$tableName')")() + + // Check the number of finished clustering instants + val finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) + .getInstants + .iterator().asScala + .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) + .toSeq + assertResult(2)(finishedClustering.size) + + // Do clustering without manual schedule(which will do the schedule if no pending clustering exists) + spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)") + spark.sql(s"insert into $tableName values(6, 'a6', 10, 1005)") + spark.sql(s"call run_clustering(table => '$tableName', order => 'ts')") + + val thirdClusteringInstant = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) + .findInstantsAfter(secondScheduleInstant) + .getInstants + .iterator().asScala + .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) + .toSeq + // Should have a new replace commit after the second clustering command. + assertResult(1)(thirdClusteringInstant.size) + + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 10.0, 1001), + Seq(3, "a3", 10.0, 1002), + Seq(4, "a4", 10.0, 1003), + Seq(5, "a5", 10.0, 1004), + Seq(6, "a6", 10.0, 1005) + ) + } + } + } + + test("Test Call run_clustering Procedure By Path") { + withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | options ( + | primaryKey ='id', + | type = '$tableType', + | preCombineField = 'ts' + | ) + | partitioned by(ts) + | location '$basePath' + """.stripMargin) + + spark.sql(s"call run_clustering(path => '$basePath')") + checkAnswer(s"call show_clustering(path => '$basePath')")() + + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") + val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty) + // Generate the first clustering plan + val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime + client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty()) + checkAnswer(s"call show_clustering(path => '$basePath')")( + Seq(firstScheduleInstant, 3) + ) + // Do clustering for all the clustering plan + spark.sql(s"call run_clustering(path => '$basePath', order => 'ts')") + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 10.0, 1001), + Seq(3, "a3", 10.0, 1002) + ) + val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf()) + HoodieDataSourceHelpers.hasNewCommits(fs, basePath, firstScheduleInstant) + + // Check the number of finished clustering instants + var finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) + .getInstants + .iterator().asScala + .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) + .toSeq + assertResult(1)(finishedClustering.size) + + // Do clustering without manual schedule(which will do the schedule if no pending clustering exists) + spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)") + spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)") + spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts >= 1003L')") + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 10.0, 1001), + Seq(3, "a3", 10.0, 1002), + Seq(4, "a4", 10.0, 1003), + Seq(5, "a5", 10.0, 1004) + ) + + finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) + .getInstants + .iterator().asScala + .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) + .toSeq + assertResult(2)(finishedClustering.size) + } + } + } + + test("Test Call run_clustering Procedure With Partition Pruning") { + withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | options ( + | primaryKey ='id', + | type = '$tableType', + | preCombineField = 'ts' + | ) + | partitioned by(ts) + | location '$basePath' + """.stripMargin) + + val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf()) + + // Test partition pruning with single predicate + { + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") + + checkException( + s"call run_clustering(table => '$tableName', predicate => 'ts <= 1001L and id = 10', order => 'ts')" + )("Only partition predicates are allowed") + + // Do clustering table with partition predicate + spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts <= 1001L', order => 'ts')") + + // There is 1 completed clustering instant + val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) + .getInstants + .iterator().asScala + .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) + .toSeq + assertResult(1)(clusteringInstants.size) + + // The latest clustering should contain 2 file groups + val clusteringInstant = clusteringInstants.last + val clusteringPlan = HoodieDataSourceHelpers.getClusteringPlan(fs, basePath, clusteringInstant.getTimestamp) + assertResult(true)(clusteringPlan.isPresent) + assertResult(2)(clusteringPlan.get().getInputGroups.size()) + + // No pending clustering instant + checkAnswer(s"call show_clustering(table => '$tableName')")() + + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 10.0, 1001), + Seq(3, "a3", 10.0, 1002) + ) + } + + // Test partition pruning with {@code And} predicates + { + spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)") + spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)") + spark.sql(s"insert into $tableName values(6, 'a6', 10, 1005)") + + checkException( + s"call run_clustering(table => '$tableName', predicate => 'ts > 1001L and ts <= 1005L and id = 10', order => 'ts')" + )("Only partition predicates are allowed") + + // Do clustering table with partition predicate + spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts > 1001L and ts <= 1005L', order => 'ts')") + + // There are 2 completed clustering instants + val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) + .getInstants + .iterator().asScala + .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) + .toSeq + assertResult(2)(clusteringInstants.size) + + // The latest clustering should contain 4 file groups(1002,1003,1004,1005) + val clusteringInstant = clusteringInstants.last + val clusteringPlan = HoodieDataSourceHelpers.getClusteringPlan(fs, basePath, clusteringInstant.getTimestamp) + assertResult(true)(clusteringPlan.isPresent) + assertResult(4)(clusteringPlan.get().getInputGroups.size()) + + // No pending clustering instant + checkAnswer(s"call show_clustering(table => '$tableName')")() + + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 10.0, 1001), + Seq(3, "a3", 10.0, 1002), + Seq(4, "a4", 10.0, 1003), + Seq(5, "a5", 10.0, 1004), + Seq(6, "a6", 10.0, 1005) + ) + } + + // Test partition pruning with {@code And}-{@code Or} predicates + { + spark.sql(s"insert into $tableName values(7, 'a7', 10, 1006)") + spark.sql(s"insert into $tableName values(8, 'a8', 10, 1007)") + spark.sql(s"insert into $tableName values(9, 'a9', 10, 1008)") + spark.sql(s"insert into $tableName values(10, 'a10', 10, 1009)") + + checkException( + s"call run_clustering(table => '$tableName', predicate => 'ts < 1007L or ts >= 1008L or id = 10', order => 'ts')" + )("Only partition predicates are allowed") + + // Do clustering table with partition predicate + spark.sql(s"call run_clustering(table => '$tableName', predicate => '(ts >= 1006L and ts < 1008L) or ts >= 1009L', order => 'ts')") + + // There are 3 completed clustering instants + val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) + .getInstants + .iterator().asScala + .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) + .toSeq + assertResult(3)(clusteringInstants.size) + + // The latest clustering should contain 3 file groups(1006,1007,1009) + val clusteringInstant = clusteringInstants.last + val clusteringPlan = HoodieDataSourceHelpers.getClusteringPlan(fs, basePath, clusteringInstant.getTimestamp) + assertResult(true)(clusteringPlan.isPresent) + assertResult(3)(clusteringPlan.get().getInputGroups.size()) + + // No pending clustering instant + checkAnswer(s"call show_clustering(table => '$tableName')")() + + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 10.0, 1001), + Seq(3, "a3", 10.0, 1002), + Seq(4, "a4", 10.0, 1003), + Seq(5, "a5", 10.0, 1004), + Seq(6, "a6", 10.0, 1005), + Seq(7, "a7", 10.0, 1006), + Seq(8, "a8", 10.0, 1007), + Seq(9, "a9", 10.0, 1008), + Seq(10, "a10", 10.0, 1009) + ) + } + } + } + } +}