From 62f534d00228653059c4fed944d444632bc07091 Mon Sep 17 00:00:00 2001 From: shibei Date: Fri, 4 Mar 2022 09:33:16 +0800 Subject: [PATCH] [HUDI-3445] Support Clustering Command Based on Call Procedure Command for Spark SQL (#4901) * [HUDI-3445] Clustering Command Based on Call Procedure Command for Spark SQL * [HUDI-3445] Clustering Command Based on Call Procedure Command for Spark SQL * [HUDI-3445] Clustering Command Based on Call Procedure Command for Spark SQL Co-authored-by: shibei --- .../hudi/config/HoodieClusteringConfig.java | 12 + .../apache/hudi/config/HoodieWriteConfig.java | 4 + .../ClusteringPlanPartitionFilter.java | 16 +- .../PartitionAwareClusteringPlanStrategy.java | 14 +- ...tPartitionAwareClusteringPlanStrategy.java | 2 +- .../apache/hudi/BaseHoodieTableFileIndex.java | 11 +- .../org/apache/hudi/HoodieCommonUtils.scala | 286 ++++++++++++++++++ .../org/apache/hudi/HoodieFileIndex.scala | 13 +- .../hudi/SparkHoodieTableFileIndex.scala | 182 +---------- .../apache/hudi/HoodieDataSourceHelpers.java | 17 ++ .../command/CompactionHoodiePathCommand.scala | 20 +- .../command/procedures/BaseProcedure.scala | 56 +++- .../command/procedures/HoodieProcedures.scala | 2 + .../RollbackToInstantTimeProcedure.scala | 4 +- .../procedures/RunClusteringProcedure.scala | 176 +++++++++++ .../procedures/ShowClusteringProcedure.scala | 69 +++++ .../procedures/ShowCommitsProcedure.scala | 4 +- .../parser/HoodieSqlCommonAstBuilder.scala | 14 +- .../apache/hudi/TestDataSkippingUtils.scala | 29 +- .../spark/sql/hudi/TestCallProcedure.scala | 225 ++++++++++++++ 20 files changed, 909 insertions(+), 247 deletions(-) create mode 100644 hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCommonUtils.scala create mode 100644 hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala create mode 100644 hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowClusteringProcedure.scala diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java index 41b1812c0..36f9d169f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java @@ -31,6 +31,7 @@ import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode; import javax.annotation.Nonnull; + import java.io.File; import java.io.FileReader; import java.io.IOException; @@ -94,6 +95,12 @@ public class HoodieClusteringConfig extends HoodieConfig { .sinceVersion("0.11.0") .withDocumentation("Filter clustering partitions that matched regex pattern"); + public static final ConfigProperty PARTITION_SELECTED = ConfigProperty + .key(CLUSTERING_STRATEGY_PARAM_PREFIX + "partition.selected") + .noDefaultValue() + .sinceVersion("0.11.0") + .withDocumentation("Partitions to run clustering"); + public static final ConfigProperty PLAN_STRATEGY_CLASS_NAME = ConfigProperty .key("hoodie.clustering.plan.strategy.class") .defaultValue(SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY) @@ -473,6 +480,11 @@ public class HoodieClusteringConfig extends HoodieConfig { return this; } + public Builder withClusteringPartitionSelected(String partitionSelected) { + clusteringConfig.setValue(PARTITION_SELECTED, partitionSelected); + return this; + } + public Builder withClusteringSkipPartitionsFromLatest(int clusteringSkipPartitionsFromLatest) { clusteringConfig.setValue(PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST, String.valueOf(clusteringSkipPartitionsFromLatest)); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index b7b410817..2d71c4d73 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1301,6 +1301,10 @@ public class HoodieWriteConfig extends HoodieConfig { return getLong(HoodieClusteringConfig.PLAN_STRATEGY_SMALL_FILE_LIMIT); } + public String getClusteringPartitionSelected() { + return getString(HoodieClusteringConfig.PARTITION_SELECTED); + } + public String getClusteringPartitionFilterRegexPattern() { return getString(HoodieClusteringConfig.PARTITION_REGEX_PATTERN); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java index a63eb3bad..3a889de75 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java @@ -24,6 +24,7 @@ import org.apache.hudi.exception.HoodieClusteringException; import java.util.Comparator; import java.util.List; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Partition filter utilities. Currently, we support three mode: @@ -58,11 +59,18 @@ public class ClusteringPlanPartitionFilter { } private static List selectedPartitionsFilter(List partitions, HoodieWriteConfig config) { + Stream filteredPartitions = partitions.stream(); + String beginPartition = config.getBeginPartitionForClustering(); + if (beginPartition != null) { + filteredPartitions = filteredPartitions.filter(path -> path.compareTo(beginPartition) >= 0); + } + String endPartition = config.getEndPartitionForClustering(); - List filteredPartitions = partitions.stream() - .filter(path -> path.compareTo(beginPartition) >= 0 && path.compareTo(endPartition) <= 0) - .collect(Collectors.toList()); - return filteredPartitions; + if (endPartition != null) { + filteredPartitions = filteredPartitions.filter(path -> path.compareTo(endPartition) <= 0); + } + + return filteredPartitions.collect(Collectors.toList()); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java index f4aaeee0c..5d62ef390 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java @@ -35,6 +35,7 @@ import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilter; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import java.util.Arrays; import java.util.List; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -72,8 +73,8 @@ public abstract class PartitionAwareClusteringPlanStrategy partitionPaths = FSUtils.getAllPartitionPaths(getEngineContext(), config.getMetadataConfig(), metaClient.getBasePath()); - // get regex matched partitions if set - partitionPaths = getRegexPatternMatchedPartitions(config, partitionPaths); + // get matched partitions if set + partitionPaths = getMatchedPartitions(config, partitionPaths); // filter the partition paths if needed to reduce list status partitionPaths = filterPartitionPaths(partitionPaths); @@ -113,6 +114,15 @@ public abstract class PartitionAwareClusteringPlanStrategy getMatchedPartitions(HoodieWriteConfig config, List partitionPaths) { + String partitionSelected = config.getClusteringPartitionSelected(); + if (!StringUtils.isNullOrEmpty(partitionSelected)) { + return Arrays.asList(partitionSelected.split(",")); + } else { + return getRegexPatternMatchedPartitions(config, partitionPaths); + } + } + public List getRegexPatternMatchedPartitions(HoodieWriteConfig config, List partitionPaths) { String pattern = config.getClusteringPartitionFilterRegexPattern(); if (!StringUtils.isNullOrEmpty(pattern)) { diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java index a053a9611..440bc9561 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java @@ -71,7 +71,7 @@ public class TestPartitionAwareClusteringPlanStrategy { fakeTimeBasedPartitionsPath.add("20210719"); fakeTimeBasedPartitionsPath.add("20210721"); - List list = strategyTestRegexPattern.getRegexPatternMatchedPartitions(hoodieWriteConfig, fakeTimeBasedPartitionsPath); + List list = strategyTestRegexPattern.getMatchedPartitions(hoodieWriteConfig, fakeTimeBasedPartitionsPath); assertEquals(2, list.size()); assertTrue(list.contains("20210721")); assertTrue(list.contains("20210723")); diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java index 428da925c..9eae74e92 100644 --- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java @@ -18,8 +18,6 @@ package org.apache.hudi; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -37,6 +35,9 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -333,7 +334,7 @@ public abstract class BaseHoodieTableFileIndex { return fileSlice.getBaseFile().map(BaseFile::getFileLen).orElse(0L) + logFileSize; } - protected static final class PartitionPath { + public static final class PartitionPath { final String path; final Object[] values; @@ -342,6 +343,10 @@ public abstract class BaseHoodieTableFileIndex { this.values = values; } + public String getPath() { + return path; + } + Path fullPartitionPath(String basePath) { if (!path.isEmpty()) { return new Path(basePath, path); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCommonUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCommonUtils.scala new file mode 100644 index 000000000..d8189b1ad --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCommonUtils.scala @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi + +import org.apache.hadoop.fs.Path +import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath +import org.apache.hudi.client.SparkRDDWriteClient +import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.{InternalRow, expressions} +import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.withSparkConf +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.unsafe.types.UTF8String + +import scala.collection.JavaConverters.mapAsJavaMapConverter +import scala.collection.immutable.Map + +object HoodieCommonUtils extends Logging { + + def createHoodieClientFromPath(sparkSession: SparkSession, basePath: String, + conf: Map[String, String]): SparkRDDWriteClient[_] = { + val metaClient = HoodieTableMetaClient.builder().setBasePath(basePath) + .setConf(sparkSession.sessionState.newHadoopConf()).build() + val schemaUtil = new TableSchemaResolver(metaClient) + val schemaStr = schemaUtil.getTableAvroSchemaWithoutMetadataFields.toString + val finalParameters = HoodieWriterUtils.parametersWithWriteDefaults( + withSparkConf(sparkSession, Map.empty)( + conf + (DataSourceWriteOptions.TABLE_TYPE.key() -> metaClient.getTableType.name())) + ) + + val jsc = new JavaSparkContext(sparkSession.sparkContext) + DataSourceUtils.createHoodieClient(jsc, schemaStr, basePath, + metaClient.getTableConfig.getTableName, finalParameters.asJava) + } + + def getPartitionSchemaFromProperty(metaClient: HoodieTableMetaClient, + schemaSpec: Option[StructType]): StructType = { + val schema = schemaSpec.getOrElse({ + val schemaUtil = new TableSchemaResolver(metaClient) + AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema) + }) + + val tableConfig = metaClient.getTableConfig + val partitionColumns = tableConfig.getPartitionFields + val nameFieldMap = generateFieldMap(schema) + + if (partitionColumns.isPresent) { + // Note that key generator class name could be null + val keyGeneratorClassName = tableConfig.getKeyGeneratorClassName + if (classOf[TimestampBasedKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName) + || classOf[TimestampBasedAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)) { + val partitionFields = partitionColumns.get().map(column => StructField(column, StringType)) + StructType(partitionFields) + } else { + val partitionFields = partitionColumns.get().map(column => + nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column: '" + + s"$column' in the schema[${schema.fields.mkString(",")}]"))) + StructType(partitionFields) + } + } else { + // If the partition columns have not stored in hoodie.properties(the table that was + // created earlier), we trait it as a non-partitioned table. + logWarning("No partition columns available from hoodie.properties." + + " Partition pruning will not work") + new StructType() + } + } + + /** + * This method unravels [[StructType]] into a [[Map]] of pairs of dot-path notation with corresponding + * [[StructField]] object for every field of the provided [[StructType]], recursively. + * + * For example, following struct + *
+   * StructType(
+   * StructField("a",
+   * StructType(
+   * StructField("b", StringType),
+   * StructField("c", IntType)
+   * )
+   * )
+   * )
+   * 
+ * + * will be converted into following mapping: + * + *
+   * "a.b" -> StructField("b", StringType),
+   * "a.c" -> StructField("c", IntType),
+   * 
+ */ + def generateFieldMap(structType: StructType): Map[String, StructField] = { + def traverse(structField: Either[StructField, StructType]): Map[String, StructField] = { + structField match { + case Right(struct) => struct.fields.flatMap(f => traverse(Left(f))).toMap + case Left(field) => field.dataType match { + case struct: StructType => traverse(Right(struct)).map { + case (key, structField) => (s"${field.name}.$key", structField) + } + case _ => Map(field.name -> field) + } + } + } + + traverse(Right(structType)) + } + + /** + * Prune the partition by the filter.This implementation is fork from + * org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex#prunePartitions. + * + * @param partitionPaths All the partition paths. + * @param predicates The filter condition. + * @return The Pruned partition paths. + */ + def prunePartition(partitionSchema: StructType, + partitionPaths: Seq[PartitionPath], + predicates: Seq[Expression]): Seq[PartitionPath] = { + val partitionColumnNames = partitionSchema.fields.map(_.name).toSet + val partitionPruningPredicates = predicates.filter { + _.references.map(_.name).toSet.subsetOf(partitionColumnNames) + } + if (partitionPruningPredicates.nonEmpty) { + val predicate = partitionPruningPredicates.reduce(expressions.And) + prunePartition(partitionSchema, partitionPaths, predicate) + } else { + partitionPaths + } + } + + def prunePartition(partitionSchema: StructType, + partitionPaths: Seq[PartitionPath], + predicate: Expression): Seq[PartitionPath] = { + val boundPredicate = InterpretedPredicate(predicate.transform { + case a: AttributeReference => + val index = partitionSchema.indexWhere(a.name == _.name) + BoundReference(index, partitionSchema(index).dataType, nullable = true) + }) + + val prunedPartitionPaths = partitionPaths.filter { + partitionPath => boundPredicate.eval(InternalRow.fromSeq(partitionPath.values)) + } + + logInfo(s"Total partition size is: ${partitionPaths.size}," + + s" after partition prune size is: ${prunedPartitionPaths.size}") + prunedPartitionPaths + } + + def parsePartitionColumnValues(sparkParsePartitionUtil: SparkParsePartitionUtil, + configProperties: TypedProperties, + basePath: String, + partitionSchema: StructType, + partitionColumns: Array[String], + partitionPath: String): Array[Object] = { + if (partitionColumns.length == 0) { + // This is a non-partitioned table + Array.empty + } else { + val partitionFragments = partitionPath.split("/") + + if (partitionFragments.length != partitionColumns.length && + partitionColumns.length == 1) { + // If the partition column size is not equal to the partition fragment size + // and the partition column size is 1, we map the whole partition path + // to the partition column which can benefit from the partition prune. + val prefix = s"${partitionColumns.head}=" + val partitionValue = if (partitionPath.startsWith(prefix)) { + // support hive style partition path + partitionPath.substring(prefix.length) + } else { + partitionPath + } + Array(UTF8String.fromString(partitionValue)) + } else if (partitionFragments.length != partitionColumns.length && + partitionColumns.length > 1) { + // If the partition column size is not equal to the partition fragments size + // and the partition column size > 1, we do not know how to map the partition + // fragments to the partition columns. So we trait it as a Non-Partitioned Table + // for the query which do not benefit from the partition prune. + logWarning(s"Cannot do the partition prune for table $basePath." + + s"The partitionFragments size (${partitionFragments.mkString(",")})" + + s" is not equal to the partition columns size(${partitionColumns.mkString(",")})") + Array.empty + } else { + // If partitionSeqs.length == partitionSchema.fields.length + // Append partition name to the partition value if the + // HIVE_STYLE_PARTITIONING is disable. + // e.g. convert "/xx/xx/2021/02" to "/xx/xx/year=2021/month=02" + val partitionWithName = + partitionFragments.zip(partitionColumns).map { + case (partition, columnName) => + if (partition.indexOf("=") == -1) { + s"${columnName}=$partition" + } else { + partition + } + }.mkString("/") + + val pathWithPartitionName = new Path(basePath, partitionWithName) + val partitionValues = parsePartitionPath(sparkParsePartitionUtil, configProperties, basePath, + pathWithPartitionName, partitionSchema) + + partitionValues.map(_.asInstanceOf[Object]).toArray + } + } + } + + private def parsePartitionPath(sparkParsePartitionUtil: SparkParsePartitionUtil, + configProperties: TypedProperties, + basePath: String, + partitionPath: Path, + partitionSchema: StructType): Seq[Any] = { + val timeZoneId = configProperties.getString(DateTimeUtils.TIMEZONE_OPTION, SQLConf.get.sessionLocalTimeZone) + val partitionDataTypes = partitionSchema.map(f => f.name -> f.dataType).toMap + + sparkParsePartitionUtil.parsePartition( + partitionPath, + typeInference = false, + Set(new Path(basePath)), + partitionDataTypes, + DateTimeUtils.getTimeZone(timeZoneId) + ) + .toSeq(partitionSchema) + } + + def getConfigProperties(spark: SparkSession, options: Map[String, String]): TypedProperties = { + val sqlConf: SQLConf = spark.sessionState.conf + val properties = new TypedProperties() + + // To support metadata listing via Spark SQL we allow users to pass the config via SQL Conf in spark session. Users + // would be able to run SET hoodie.metadata.enable=true in the spark sql session to enable metadata listing. + properties.setProperty(HoodieMetadataConfig.ENABLE.key(), + sqlConf.getConfString(HoodieMetadataConfig.ENABLE.key(), + HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS.toString)) + properties.putAll(options.asJava) + properties + } + + def resolveFilterExpr(spark: SparkSession, exprString: String, tableSchema: StructType): Expression = { + val expr = spark.sessionState.sqlParser.parseExpression(exprString) + resolveFilterExpr(spark, expr, tableSchema) + } + + def resolveFilterExpr(spark: SparkSession, expr: Expression, tableSchema: StructType): Expression = { + val schemaFields = tableSchema.fields + val resolvedExpr = spark.sessionState.analyzer.ResolveReferences( + Filter(expr, LocalRelation(schemaFields.head, schemaFields.drop(1): _*)) + ) + .asInstanceOf[Filter].condition + + checkForUnresolvedRefs(resolvedExpr) + } + + private def checkForUnresolvedRefs(resolvedExpr: Expression): Expression = + resolvedExpr match { + case UnresolvedAttribute(_) => throw new IllegalStateException("unresolved attribute") + case _ => resolvedExpr.mapChildren(e => checkForUnresolvedRefs(e)) + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 9cdf5cc63..fc965aa8c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -19,7 +19,6 @@ package org.apache.hudi import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hudi.HoodieFileIndex.getConfigProperties import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.util.StringUtils @@ -37,11 +36,10 @@ import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.sql.{AnalysisException, Column, SparkSession} import org.apache.spark.unsafe.types.UTF8String -import scala.collection.JavaConverters._ -import scala.util.{Failure, Success, Try} -import scala.util.control.NonFatal - import java.text.SimpleDateFormat +import scala.collection.JavaConverters._ +import scala.util.control.NonFatal +import scala.util.{Failure, Success, Try} /** * A file index which support partition prune for hoodie snapshot and read-optimized query. @@ -75,7 +73,7 @@ case class HoodieFileIndex(spark: SparkSession, spark = spark, metaClient = metaClient, schemaSpec = schemaSpec, - configProperties = getConfigProperties(spark, options), + configProperties = HoodieCommonUtils.getConfigProperties(spark, options), queryPaths = Seq(HoodieFileIndex.getQueryPath(options)), specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant), fileStatusCache = fileStatusCache @@ -149,7 +147,8 @@ case class HoodieFileIndex(spark: SparkSession, Seq(PartitionDirectory(InternalRow.empty, candidateFiles)) } else { // Prune the partition path by the partition filters - val prunedPartitions = prunePartition(cachedAllInputFileSlices.keySet.asScala.toSeq, convertedPartitionFilters) + val prunedPartitions = HoodieCommonUtils.prunePartition(partitionSchema, + cachedAllInputFileSlices.keySet.asScala.toSeq, convertedPartitionFilters) var totalFileSize = 0 var candidateFileSize = 0 diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index 46201c413..c13adf3ab 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -18,24 +18,18 @@ package org.apache.hudi import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_INCREMENTAL_OPT_VAL, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL} -import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, generateFieldMap, toJavaOption} +import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, toJavaOption} import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} -import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate} -import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.catalyst.{InternalRow, expressions} +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache} -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{StringType, StructField, StructType} -import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.sql.types.StructType import scala.collection.JavaConverters._ import scala.language.implicitConversions @@ -84,32 +78,8 @@ class SparkHoodieTableFileIndex(spark: SparkSession, /** * Get the partition schema from the hoodie.properties. */ - private lazy val _partitionSchemaFromProperties: StructType = { - val tableConfig = metaClient.getTableConfig - val partitionColumns = tableConfig.getPartitionFields - val nameFieldMap = generateFieldMap(schema) - - if (partitionColumns.isPresent) { - // Note that key generator class name could be null - val keyGeneratorClassName = tableConfig.getKeyGeneratorClassName - if (classOf[TimestampBasedKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName) - || classOf[TimestampBasedAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)) { - val partitionFields = partitionColumns.get().map(column => StructField(column, StringType)) - StructType(partitionFields) - } else { - val partitionFields = partitionColumns.get().map(column => - nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column: '" + - s"$column' in the schema[${schema.fields.mkString(",")}]"))) - StructType(partitionFields) - } - } else { - // If the partition columns have not stored in hoodie.properties(the table that was - // created earlier), we trait it as a non-partitioned table. - logWarning("No partition columns available from hoodie.properties." + - " Partition pruning will not work") - new StructType() - } - } + private lazy val _partitionSchemaFromProperties: StructType = + HoodieCommonUtils.getPartitionSchemaFromProperty(metaClient, Some(schema)) /** * Get the data schema of the table. @@ -140,111 +110,16 @@ class SparkHoodieTableFileIndex(spark: SparkSession, */ def listFileSlices(partitionFilters: Seq[Expression]): Map[String, Seq[FileSlice]] = { // Prune the partition path by the partition filters - val prunedPartitions = prunePartition(cachedAllInputFileSlices.asScala.keys.toSeq, partitionFilters) + val prunedPartitions = HoodieCommonUtils.prunePartition(partitionSchema, + cachedAllInputFileSlices.asScala.keys.toSeq, partitionFilters) prunedPartitions.map(partition => { (partition.path, cachedAllInputFileSlices.get(partition).asScala) }).toMap } - /** - * Prune the partition by the filter.This implementation is fork from - * org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex#prunePartitions. - * - * @param partitionPaths All the partition paths. - * @param predicates The filter condition. - * @return The Pruned partition paths. - */ - def prunePartition(partitionPaths: Seq[PartitionPath], predicates: Seq[Expression]): Seq[PartitionPath] = { - val partitionColumnNames = partitionSchema.fields.map(_.name).toSet - val partitionPruningPredicates = predicates.filter { - _.references.map(_.name).toSet.subsetOf(partitionColumnNames) - } - if (partitionPruningPredicates.nonEmpty) { - val predicate = partitionPruningPredicates.reduce(expressions.And) - - val boundPredicate = InterpretedPredicate(predicate.transform { - case a: AttributeReference => - val index = partitionSchema.indexWhere(a.name == _.name) - BoundReference(index, partitionSchema(index).dataType, nullable = true) - }) - - val prunedPartitionPaths = partitionPaths.filter { - partitionPath => boundPredicate.eval(InternalRow.fromSeq(partitionPath.values)) - } - - logInfo(s"Total partition size is: ${partitionPaths.size}," + - s" after partition prune size is: ${prunedPartitionPaths.size}") - prunedPartitionPaths - } else { - partitionPaths - } - } - protected def parsePartitionColumnValues(partitionColumns: Array[String], partitionPath: String): Array[Object] = { - if (partitionColumns.length == 0) { - // This is a non-partitioned table - Array.empty - } else { - val partitionFragments = partitionPath.split("/") - - if (partitionFragments.length != partitionColumns.length && - partitionColumns.length == 1) { - // If the partition column size is not equal to the partition fragment size - // and the partition column size is 1, we map the whole partition path - // to the partition column which can benefit from the partition prune. - val prefix = s"${partitionColumns.head}=" - val partitionValue = if (partitionPath.startsWith(prefix)) { - // support hive style partition path - partitionPath.substring(prefix.length) - } else { - partitionPath - } - Array(UTF8String.fromString(partitionValue)) - } else if (partitionFragments.length != partitionColumns.length && - partitionColumns.length > 1) { - // If the partition column size is not equal to the partition fragments size - // and the partition column size > 1, we do not know how to map the partition - // fragments to the partition columns. So we trait it as a Non-Partitioned Table - // for the query which do not benefit from the partition prune. - logWarning(s"Cannot do the partition prune for table $basePath." + - s"The partitionFragments size (${partitionFragments.mkString(",")})" + - s" is not equal to the partition columns size(${partitionColumns.mkString(",")})") - Array.empty - } else { - // If partitionSeqs.length == partitionSchema.fields.length - // Append partition name to the partition value if the - // HIVE_STYLE_PARTITIONING is disable. - // e.g. convert "/xx/xx/2021/02" to "/xx/xx/year=2021/month=02" - val partitionWithName = - partitionFragments.zip(partitionColumns).map { - case (partition, columnName) => - if (partition.indexOf("=") == -1) { - s"${columnName}=$partition" - } else { - partition - } - }.mkString("/") - - val pathWithPartitionName = new Path(basePath, partitionWithName) - val partitionValues = parsePartitionPath(pathWithPartitionName, partitionSchema) - - partitionValues.map(_.asInstanceOf[Object]).toArray - } - } - } - - private def parsePartitionPath(partitionPath: Path, partitionSchema: StructType): Seq[Any] = { - val timeZoneId = configProperties.getString(DateTimeUtils.TIMEZONE_OPTION, SQLConf.get.sessionLocalTimeZone) - val partitionDataTypes = partitionSchema.map(f => f.name -> f.dataType).toMap - - sparkParsePartitionUtil.parsePartition( - partitionPath, - typeInference = false, - Set(new Path(basePath)), - partitionDataTypes, - DateTimeUtils.getTimeZone(timeZoneId) - ) - .toSeq(partitionSchema) + HoodieCommonUtils.parsePartitionColumnValues(sparkParsePartitionUtil, configProperties, + basePath, partitionSchema, partitionColumns, partitionPath) } } @@ -257,45 +132,6 @@ object SparkHoodieTableFileIndex { org.apache.hudi.common.util.Option.empty() } - /** - * This method unravels [[StructType]] into a [[Map]] of pairs of dot-path notation with corresponding - * [[StructField]] object for every field of the provided [[StructType]], recursively. - * - * For example, following struct - *
-   *   StructType(
-   *     StructField("a",
-   *       StructType(
-   *          StructField("b", StringType),
-   *          StructField("c", IntType)
-   *       )
-   *     )
-   *   )
-   * 
- * - * will be converted into following mapping: - * - *
-   *   "a.b" -> StructField("b", StringType),
-   *   "a.c" -> StructField("c", IntType),
-   * 
- */ - private def generateFieldMap(structType: StructType) : Map[String, StructField] = { - def traverse(structField: Either[StructField, StructType]) : Map[String, StructField] = { - structField match { - case Right(struct) => struct.fields.flatMap(f => traverse(Left(f))).toMap - case Left(field) => field.dataType match { - case struct: StructType => traverse(Right(struct)).map { - case (key, structField) => (s"${field.name}.$key", structField) - } - case _ => Map(field.name -> field) - } - } - } - - traverse(Right(structType)) - } - private def deduceQueryType(configProperties: TypedProperties): HoodieTableQueryType = { configProperties.asScala(QUERY_TYPE.key()) match { case QUERY_TYPE_SNAPSHOT_OPT_VAL => HoodieTableQueryType.SNAPSHOT diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java index ce80b5232..9491e43e2 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java @@ -18,12 +18,16 @@ package org.apache.hudi; +import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hadoop.fs.FileSystem; @@ -80,4 +84,17 @@ public class HoodieDataSourceHelpers { return metaClient.getCommitTimeline().filterCompletedInstants(); } } + + @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE) + public static Option getClusteringPlan(FileSystem fs, String basePath, String instantTime) { + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()) + .setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); + HoodieInstant hoodieInstant = HoodieTimeline.getReplaceCommitRequestedInstant(instantTime); + Option> clusteringPlan = ClusteringUtils.getClusteringPlan(metaClient, hoodieInstant); + if (clusteringPlan.isPresent()) { + return Option.of(clusteringPlan.get().getValue()); + } else { + return Option.empty(); + } + } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala index 2f5c4d004..59d38923b 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala @@ -17,17 +17,15 @@ package org.apache.spark.sql.hudi.command +import org.apache.hudi.HoodieCommonUtils import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieTableType} +import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline} -import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.{HoodieTimer, Option => HOption} import org.apache.hudi.exception.HoodieException -import org.apache.hudi.{DataSourceUtils, DataSourceWriteOptions, HoodieWriterUtils} -import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.{CompactionOperation, RUN, SCHEDULE} -import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.types.StringType import org.apache.spark.sql.{Row, SparkSession} @@ -44,19 +42,7 @@ case class CompactionHoodiePathCommand(path: String, assert(metaClient.getTableType == HoodieTableType.MERGE_ON_READ, s"Must compaction on a Merge On Read table.") - val schemaUtil = new TableSchemaResolver(metaClient) - val schemaStr = schemaUtil.getTableAvroSchemaWithoutMetadataFields.toString - - val parameters = HoodieWriterUtils.parametersWithWriteDefaults( - HoodieSqlCommonUtils.withSparkConf(sparkSession, Map.empty)( - Map( - DataSourceWriteOptions.TABLE_TYPE.key() -> HoodieTableType.MERGE_ON_READ.name() - ) - ) - ) - val jsc = new JavaSparkContext(sparkSession.sparkContext) - val client = DataSourceUtils.createHoodieClient(jsc, schemaStr, path, - metaClient.getTableConfig.getTableName, parameters) + val client = HoodieCommonUtils.createHoodieClientFromPath(sparkSession, path, Map.empty) operation match { case SCHEDULE => diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala index e64df997d..b8777eddb 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala @@ -21,12 +21,18 @@ import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.model.HoodieRecordPayload import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig} +import org.apache.hudi.exception.HoodieClusteringException import org.apache.hudi.index.HoodieIndex.IndexType import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.types._ +import java.nio.charset.Charset +import java.sql.{Date, Timestamp} + abstract class BaseProcedure extends Procedure { val INVALID_ARG_INDEX: Int = -1 @@ -68,14 +74,22 @@ abstract class BaseProcedure extends Procedure { args.map.getOrDefault(key, INVALID_ARG_INDEX) } - protected def getArgValueOrDefault(args: ProcedureArgs, parameter: ProcedureParameter): Any = { + protected def getArgValueOrDefault(args: ProcedureArgs, parameter: ProcedureParameter): Option[Any] = { var argsIndex: Int = INVALID_ARG_INDEX if (args.isNamedArgs) { argsIndex = getArgsIndex(parameter.name, args) } else { argsIndex = getArgsIndex(parameter.index.toString, args) } - if (argsIndex.equals(INVALID_ARG_INDEX)) parameter.default else getInternalRowValue(args.internalRow, argsIndex, parameter.dataType) + + if (argsIndex.equals(INVALID_ARG_INDEX)) { + parameter.default match { + case option: Option[Any] => option + case _ => Option.apply(parameter.default) + } + } else { + Option.apply(getInternalRowValue(args.internalRow, argsIndex, parameter.dataType)) + } } protected def getInternalRowValue(row: InternalRow, index: Int, dataType: DataType): Any = { @@ -96,4 +110,40 @@ abstract class BaseProcedure extends Procedure { throw new UnsupportedOperationException(s"type: ${dataType.typeName} not supported") } } + + protected def getBasePath(tableName: Option[Any], tablePath: Option[Any]): String = { + tableName.map( + t => HoodieCatalogTable(sparkSession, new TableIdentifier(t.asInstanceOf[String])).tableLocation) + .getOrElse( + tablePath.map(p => p.asInstanceOf[String]).getOrElse( + throw new HoodieClusteringException("Table name or table path must be given one")) + ) + } + + protected def convertCatalystType(value: String, dataType: DataType): Any = { + try { + val valueWithType = dataType match { + case StringType => value + case BinaryType => value.getBytes(Charset.forName("utf-8")) + case BooleanType => value.toBoolean + case DoubleType => value.toDouble + case d: DecimalType => Decimal.apply(BigDecimal(value), d.precision, d.scale) + case FloatType => value.toFloat + case ByteType => value.toByte + case IntegerType => value.toInt + case LongType => value.toLong + case ShortType => value.toShort + case DateType => DateTimeUtils.fromJavaDate(Date.valueOf(value)) + case TimestampType => DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(value)) + case _ => throw new HoodieClusteringException("Data type not support:" + dataType) + } + + valueWithType + } catch { + case e: HoodieClusteringException => + throw e + case _ => + throw new HoodieClusteringException("Data type not match, value:" + value + ", dataType:" + dataType) + } + } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala index 7b919fcef..2993bcae7 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala @@ -36,6 +36,8 @@ object HoodieProcedures { mapBuilder.put(ShowCommitsProcedure.NAME, ShowCommitsProcedure.builder) mapBuilder.put(ShowCommitsMetadataProcedure.NAME, ShowCommitsMetadataProcedure.builder) mapBuilder.put(RollbackToInstantTimeProcedure.NAME, RollbackToInstantTimeProcedure.builder) + mapBuilder.put(RunClusteringProcedure.NAME, RunClusteringProcedure.builder) + mapBuilder.put(ShowClusteringProcedure.NAME, ShowClusteringProcedure.builder) mapBuilder.build } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala index 5414e8db6..f17efe244 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala @@ -45,8 +45,8 @@ class RollbackToInstantTimeProcedure extends BaseProcedure with ProcedureBuilder override def call(args: ProcedureArgs): Seq[Row] = { super.checkArgs(PARAMETERS, args) - val table = getArgValueOrDefault(args, PARAMETERS(0)).asInstanceOf[String] - val instantTime = getArgValueOrDefault(args, PARAMETERS(1)).asInstanceOf[String] + val table = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String] + val instantTime = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String] val hoodieCatalogTable = HoodieCatalogTable(sparkSession, new TableIdentifier(table)) val basePath = hoodieCatalogTable.tableLocation diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala new file mode 100644 index 000000000..a98cdce54 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.util.{ClusteringUtils, Option => HOption} +import org.apache.hudi.config.HoodieClusteringConfig +import org.apache.hudi.exception.HoodieClusteringException +import org.apache.hudi.metadata.HoodieTableMetadata +import org.apache.hudi.{HoodieCommonUtils, SparkAdapterSupport} +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.types._ + +import java.util.Properties +import java.util.function.Supplier +import scala.collection.JavaConverters +import scala.collection.JavaConverters._ + +class RunClusteringProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport with Logging { + /** + * OPTIMIZE table_name|table_path [WHERE predicate] + * [ORDER BY (col_name1 [, ...] ) ] + */ + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.optional(0, "table", DataTypes.StringType, None), + ProcedureParameter.optional(1, "path", DataTypes.StringType, None), + ProcedureParameter.optional(2, "predicate", DataTypes.StringType, None), + ProcedureParameter.optional(3, "order", DataTypes.StringType, None) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("timestamp", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("groups", DataTypes.IntegerType, nullable = true, Metadata.empty) + )) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val tablePath = getArgValueOrDefault(args, PARAMETERS(1)) + val predicate = getArgValueOrDefault(args, PARAMETERS(2)) + val orderColumns = getArgValueOrDefault(args, PARAMETERS(3)) + + val basePath: String = getBasePath(tableName, tablePath) + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + var conf: Map[String, String] = Map.empty + predicate match { + case Some(p) => + val partitionColumnsSchema = HoodieCommonUtils.getPartitionSchemaFromProperty(metaClient, None) + val partitionPredicate = HoodieCommonUtils.resolveFilterExpr( + spark, p.asInstanceOf[String], partitionColumnsSchema) + val partitionSelected = prunePartition(metaClient, partitionPredicate) + conf = conf ++ Map( + HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key() -> "SELECTED_PARTITIONS", + HoodieClusteringConfig.PARTITION_SELECTED.key() -> partitionSelected + ) + logInfo(s"Partition predicates: ${p}, partition selected: ${partitionSelected}") + case _ => + logInfo("No partition predicates") + } + + // Construct sort column info + orderColumns match { + case Some(o) => + validateOrderColumns(o.asInstanceOf[String], metaClient) + conf = conf ++ Map( + HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key() -> o.asInstanceOf[String] + ) + logInfo(s"Order columns: ${o}") + case _ => + logInfo("No order columns") + } + + // Get all pending clustering instants + var pendingClustering = ClusteringUtils.getAllPendingClusteringPlans(metaClient) + .iterator().asScala.map(_.getLeft.getTimestamp).toSeq.sortBy(f => f) + logInfo(s"Pending clustering instants: ${pendingClustering.mkString(",")}") + + val client = HoodieCommonUtils.createHoodieClientFromPath(sparkSession, basePath, conf) + val instantTime = HoodieActiveTimeline.createNewInstantTime + if (client.scheduleClusteringAtInstant(instantTime, HOption.empty())) { + pendingClustering ++= Seq(instantTime) + } + logInfo(s"Clustering instants to run: ${pendingClustering.mkString(",")}.") + + val startTs = System.currentTimeMillis() + pendingClustering.foreach(client.cluster(_, true)) + logInfo(s"Finish clustering all the instants: ${pendingClustering.mkString(",")}," + + s" time cost: ${System.currentTimeMillis() - startTs}ms.") + Seq.empty[Row] + } + + override def build: Procedure = new RunClusteringProcedure() + + def prunePartition(metaClient: HoodieTableMetaClient, partitionPredicate: Expression): String = { + val partitionSchema = HoodieCommonUtils.getPartitionSchemaFromProperty(metaClient, None) + + // Get tableName meta data + val engineContext = new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext)) + val properties = new Properties() + properties.putAll(JavaConverters.mapAsJavaMapConverter(sparkSession.sessionState.conf.getAllConfs).asJava) + val metadataConfig = HoodieMetadataConfig.newBuilder().fromProperties(properties).build() + val tableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, metaClient.getBasePath, + FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue) + + val sparkParsePartitionUtil = sparkAdapter.createSparkParsePartitionUtil(sparkSession.sessionState.conf) + val typedProperties = HoodieCommonUtils.getConfigProperties(sparkSession, Map.empty) + + val partitionColumns = metaClient.getTableConfig.getPartitionFields.orElse(Array[String]()) + + // Translate all partition path to {@code org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath} + val partitionPaths = tableMetadata.getAllPartitionPaths.asScala.map(partitionPath => { + val partitionColumnValues = HoodieCommonUtils.parsePartitionColumnValues( + sparkParsePartitionUtil, typedProperties, metaClient.getBasePath, + partitionSchema, partitionColumns, partitionPath) + new PartitionPath(partitionPath, partitionColumnValues) + }) + + // Filter partition by predicates + val selectedPartitions = HoodieCommonUtils.prunePartition( + partitionSchema, partitionPaths, partitionPredicate) + selectedPartitions.map(partitionPath => partitionPath.getPath).toSet.mkString(",") + } + + def validateOrderColumns(orderColumns: String, metaClient: HoodieTableMetaClient): Unit = { + if (orderColumns == null) { + throw new HoodieClusteringException("Order columns is null") + } + + val tableSchemaResolver = new TableSchemaResolver(metaClient) + val fields = tableSchemaResolver.getTableAvroSchema(false) + .getFields.asScala.map(_.name().toLowerCase) + orderColumns.split(",").foreach(col => { + if (!fields.contains(col.toLowerCase)) { + throw new HoodieClusteringException("Order column not exist:" + col) + } + }) + } + +} + +object RunClusteringProcedure { + val NAME = "run_clustering" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new RunClusteringProcedure + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowClusteringProcedure.scala new file mode 100644 index 000000000..a9d808217 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowClusteringProcedure.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.SparkAdapterSupport +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.util.ClusteringUtils +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types._ + +import java.util.function.Supplier +import scala.collection.JavaConverters._ + +class ShowClusteringProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport with Logging { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.optional(0, "table", DataTypes.StringType, None), + ProcedureParameter.optional(1, "path", DataTypes.StringType, None), + ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 20) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("timestamp", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("groups", DataTypes.IntegerType, nullable = true, Metadata.empty) + )) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val tablePath = getArgValueOrDefault(args, PARAMETERS(1)) + val limit = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int] + + val basePath: String = getBasePath(tableName, tablePath) + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + ClusteringUtils.getAllPendingClusteringPlans(metaClient).iterator().asScala.map { p => + Row(p.getLeft.getTimestamp, p.getRight.getInputGroups.size()) + }.toSeq.take(limit) + } + + override def build: Procedure = new ShowClusteringProcedure() +} + +object ShowClusteringProcedure { + val NAME = "show_clustering" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new ShowClusteringProcedure + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitsProcedure.scala index da089baba..920e3a2c1 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitsProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitsProcedure.scala @@ -75,8 +75,8 @@ class ShowCommitsProcedure(includeExtraMetadata: Boolean) extends BaseProcedure override def call(args: ProcedureArgs): Seq[Row] = { super.checkArgs(PARAMETERS, args) - val table = getArgValueOrDefault(args, PARAMETERS(0)).asInstanceOf[String] - val limit = getArgValueOrDefault(args, PARAMETERS(1)).asInstanceOf[Int] + val table = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String] + val limit = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int] val hoodieCatalogTable = HoodieCatalogTable(sparkSession, new TableIdentifier(table)) val basePath = hoodieCatalogTable.tableLocation diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala index 3146740b1..771798dd2 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala @@ -61,20 +61,20 @@ class HoodieSqlCommonAstBuilder(session: SparkSession, delegate: ParserInterface CompactionTable(table, operation, timestamp) } - override def visitCompactionOnPath (ctx: CompactionOnPathContext): LogicalPlan = withOrigin(ctx) { + override def visitCompactionOnPath(ctx: CompactionOnPathContext): LogicalPlan = withOrigin(ctx) { val path = string(ctx.path) val operation = CompactionOperation.withName(ctx.operation.getText.toUpperCase) val timestamp = if (ctx.instantTimestamp != null) Some(ctx.instantTimestamp.getText.toLong) else None CompactionPath(path, operation, timestamp) } - override def visitShowCompactionOnTable (ctx: ShowCompactionOnTableContext): LogicalPlan = withOrigin(ctx) { + override def visitShowCompactionOnTable(ctx: ShowCompactionOnTableContext): LogicalPlan = withOrigin(ctx) { val table = ctx.tableIdentifier().accept(this).asInstanceOf[LogicalPlan] - if (ctx.limit != null) { - CompactionShowOnTable(table, ctx.limit.getText.toInt) - } else { - CompactionShowOnTable(table) - } + if (ctx.limit != null) { + CompactionShowOnTable(table, ctx.limit.getText.toInt) + } else { + CompactionShowOnTable(table) + } } override def visitShowCompactionOnPath(ctx: ShowCompactionOnPathContext): LogicalPlan = withOrigin(ctx) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala index 9e3572b56..d8179d30f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala @@ -19,12 +19,10 @@ package org.apache.hudi import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper import org.apache.hudi.testutils.HoodieClientTestBase -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.{Expression, Not} -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation} import org.apache.spark.sql.functions.col import org.apache.spark.sql.hudi.DataSkippingUtils -import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType, VarcharType} +import org.apache.spark.sql.types._ import org.apache.spark.sql.{Column, SparkSession} import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach @@ -75,7 +73,7 @@ class TestDataSkippingUtils extends HoodieClientTestBase { @ParameterizedTest @MethodSource(Array("testBaseLookupFilterExpressionsSource", "testAdvancedLookupFilterExpressionsSource")) def testLookupFilterExpressions(sourceExpr: String, input: Seq[IndexRow], output: Seq[String]): Unit = { - val resolvedExpr: Expression = resolveFilterExpr(sourceExpr, sourceTableSchema) + val resolvedExpr: Expression = HoodieCommonUtils.resolveFilterExpr(spark, sourceExpr, sourceTableSchema) val lookupFilter = DataSkippingUtils.createColumnStatsIndexFilterExpr(resolvedExpr, indexSchema) @@ -96,7 +94,7 @@ class TestDataSkippingUtils extends HoodieClientTestBase { @ParameterizedTest @MethodSource(Array("testStringsLookupFilterExpressionsSource")) def testStringsLookupFilterExpressions(sourceExpr: Expression, input: Seq[IndexRow], output: Seq[String]): Unit = { - val resolvedExpr = resolveFilterExpr(sourceExpr, sourceTableSchema) + val resolvedExpr = HoodieCommonUtils.resolveFilterExpr(spark, sourceExpr, sourceTableSchema) val lookupFilter = DataSkippingUtils.createColumnStatsIndexFilterExpr(resolvedExpr, indexSchema) val spark2 = spark @@ -112,27 +110,6 @@ class TestDataSkippingUtils extends HoodieClientTestBase { assertEquals(output, rows) } - - private def resolveFilterExpr(exprString: String, tableSchema: StructType): Expression = { - val expr = spark.sessionState.sqlParser.parseExpression(exprString) - resolveFilterExpr(expr, tableSchema) - } - - private def resolveFilterExpr(expr: Expression, tableSchema: StructType): Expression = { - val schemaFields = tableSchema.fields - val resolvedExpr = spark.sessionState.analyzer.ResolveReferences( - Filter(expr, LocalRelation(schemaFields.head, schemaFields.drop(1): _*)) - ) - .asInstanceOf[Filter].condition - - checkForUnresolvedRefs(resolvedExpr) - } - - def checkForUnresolvedRefs(resolvedExpr: Expression): Expression = - resolvedExpr match { - case UnresolvedAttribute(_) => throw new IllegalStateException("unresolved attribute") - case _ => resolvedExpr.mapChildren(e => checkForUnresolvedRefs(e)) - } } object TestDataSkippingUtils { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCallProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCallProcedure.scala index eb2c614df..52fe23711 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCallProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCallProcedure.scala @@ -17,6 +17,13 @@ package org.apache.spark.sql.hudi +import org.apache.hadoop.fs.Path +import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline} +import org.apache.hudi.common.util.{Option => HOption} +import org.apache.hudi.{HoodieCommonUtils, HoodieDataSourceHelpers} + +import scala.collection.JavaConverters.asScalaIteratorConverter + class TestCallProcedure extends TestHoodieSqlBase { test("Test Call show_commits Procedure") { @@ -129,4 +136,222 @@ class TestCallProcedure extends TestHoodieSqlBase { assertResult(1){commits.length} } } + + test("Test Call run_clustering Procedure By Table") { + withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | options ( + | primaryKey ='id', + | type = '$tableType', + | preCombineField = 'ts' + | ) + | partitioned by(ts) + | location '$basePath' + """.stripMargin) + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") + val client = HoodieCommonUtils.createHoodieClientFromPath(spark, basePath, Map.empty) + // Generate the first clustering plan + val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime + client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty()) + + // Generate the second clustering plan + spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)") + val secondScheduleInstant = HoodieActiveTimeline.createNewInstantTime + client.scheduleClusteringAtInstant(secondScheduleInstant, HOption.empty()) + checkAnswer(s"call show_clustering('$tableName')")( + Seq(firstScheduleInstant, 3), + Seq(secondScheduleInstant, 1) + ) + + // Do clustering for all clustering plan generated above, and no new clustering + // instant will be generated because of there is no commit after the second + // clustering plan generated + spark.sql(s"call run_clustering(table => '$tableName', order => 'ts')") + + // No new commits + val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf()) + assertResult(false)(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, secondScheduleInstant)) + + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 10.0, 1001), + Seq(3, "a3", 10.0, 1002), + Seq(4, "a4", 10.0, 1003) + ) + // After clustering there should be no pending clustering. + checkAnswer(s"call show_clustering(table => '$tableName')")() + + // Check the number of finished clustering instants + val finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) + .getInstants + .iterator().asScala + .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) + .toSeq + assertResult(2)(finishedClustering.size) + + // Do clustering without manual schedule(which will do the schedule if no pending clustering exists) + spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)") + spark.sql(s"insert into $tableName values(6, 'a6', 10, 1005)") + spark.sql(s"call run_clustering(table => '$tableName', order => 'ts')") + + val thirdClusteringInstant = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) + .findInstantsAfter(secondScheduleInstant) + .getInstants + .iterator().asScala + .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) + .toSeq + // Should have a new replace commit after the second clustering command. + assertResult(1)(thirdClusteringInstant.size) + + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 10.0, 1001), + Seq(3, "a3", 10.0, 1002), + Seq(4, "a4", 10.0, 1003), + Seq(5, "a5", 10.0, 1004), + Seq(6, "a6", 10.0, 1005) + ) + } + } + } + + test("Test Call run_clustering Procedure By Path") { + withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | options ( + | primaryKey ='id', + | type = '$tableType', + | preCombineField = 'ts' + | ) + | partitioned by(ts) + | location '$basePath' + """.stripMargin) + + spark.sql(s"call run_clustering(path => '$basePath')") + checkAnswer(s"call show_clustering(path => '$basePath')")() + + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") + val client = HoodieCommonUtils.createHoodieClientFromPath(spark, basePath, Map.empty) + // Generate the first clustering plan + val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime + client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty()) + checkAnswer(s"call show_clustering(path => '$basePath')")( + Seq(firstScheduleInstant, 3) + ) + // Do clustering for all the clustering plan + spark.sql(s"call run_clustering(path => '$basePath', order => 'ts')") + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 10.0, 1001), + Seq(3, "a3", 10.0, 1002) + ) + val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf()) + HoodieDataSourceHelpers.hasNewCommits(fs, basePath, firstScheduleInstant) + + // Check the number of finished clustering instants + var finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) + .getInstants + .iterator().asScala + .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) + .toSeq + assertResult(1)(finishedClustering.size) + + // Do clustering without manual schedule(which will do the schedule if no pending clustering exists) + spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)") + spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)") + spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts >= 1003L')") + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 10.0, 1001), + Seq(3, "a3", 10.0, 1002), + Seq(4, "a4", 10.0, 1003), + Seq(5, "a5", 10.0, 1004) + ) + + finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) + .getInstants + .iterator().asScala + .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) + .toSeq + assertResult(2)(finishedClustering.size) + } + } + } + + test("Test Call run_clustering Procedure With Partition Pruning") { + withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | options ( + | primaryKey ='id', + | type = '$tableType', + | preCombineField = 'ts' + | ) + | partitioned by(ts) + | location '$basePath' + """.stripMargin) + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") + + // Do clustering table with partition predicate + spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts <= 1001L', order => 'ts')") + + // Check the num of completed clustering instant + val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf()) + val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) + .getInstants + .iterator().asScala + .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) + .toSeq + assertResult(1)(clusteringInstants.size) + + val clusteringInstant = clusteringInstants.head + val clusteringPlan = HoodieDataSourceHelpers.getClusteringPlan(fs, basePath, clusteringInstant.getTimestamp) + assertResult(true)(clusteringPlan.isPresent) + assertResult(2)(clusteringPlan.get().getInputGroups.size()) + + checkAnswer(s"call show_clustering(table => '$tableName')")() + + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 10.0, 1001), + Seq(3, "a3", 10.0, 1002) + ) + } + } + } }