1
0

[HUDI-3445] Support Clustering Command Based on Call Procedure Command for Spark SQL (#4901)

* [HUDI-3445] Clustering Command Based on Call Procedure Command for Spark SQL

* [HUDI-3445] Clustering Command Based on Call Procedure Command for Spark SQL

* [HUDI-3445] Clustering Command Based on Call Procedure Command for Spark SQL

Co-authored-by: shibei <huberylee.li@alibaba-inc.com>
This commit is contained in:
shibei
2022-03-04 09:33:16 +08:00
committed by GitHub
parent be9a264885
commit 62f534d002
20 changed files with 909 additions and 247 deletions

View File

@@ -0,0 +1,286 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi
import org.apache.hadoop.fs.Path
import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.{InternalRow, expressions}
import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.withSparkConf
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.collection.immutable.Map
object HoodieCommonUtils extends Logging {
def createHoodieClientFromPath(sparkSession: SparkSession, basePath: String,
conf: Map[String, String]): SparkRDDWriteClient[_] = {
val metaClient = HoodieTableMetaClient.builder().setBasePath(basePath)
.setConf(sparkSession.sessionState.newHadoopConf()).build()
val schemaUtil = new TableSchemaResolver(metaClient)
val schemaStr = schemaUtil.getTableAvroSchemaWithoutMetadataFields.toString
val finalParameters = HoodieWriterUtils.parametersWithWriteDefaults(
withSparkConf(sparkSession, Map.empty)(
conf + (DataSourceWriteOptions.TABLE_TYPE.key() -> metaClient.getTableType.name()))
)
val jsc = new JavaSparkContext(sparkSession.sparkContext)
DataSourceUtils.createHoodieClient(jsc, schemaStr, basePath,
metaClient.getTableConfig.getTableName, finalParameters.asJava)
}
def getPartitionSchemaFromProperty(metaClient: HoodieTableMetaClient,
schemaSpec: Option[StructType]): StructType = {
val schema = schemaSpec.getOrElse({
val schemaUtil = new TableSchemaResolver(metaClient)
AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema)
})
val tableConfig = metaClient.getTableConfig
val partitionColumns = tableConfig.getPartitionFields
val nameFieldMap = generateFieldMap(schema)
if (partitionColumns.isPresent) {
// Note that key generator class name could be null
val keyGeneratorClassName = tableConfig.getKeyGeneratorClassName
if (classOf[TimestampBasedKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)
|| classOf[TimestampBasedAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)) {
val partitionFields = partitionColumns.get().map(column => StructField(column, StringType))
StructType(partitionFields)
} else {
val partitionFields = partitionColumns.get().map(column =>
nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column: '" +
s"$column' in the schema[${schema.fields.mkString(",")}]")))
StructType(partitionFields)
}
} else {
// If the partition columns have not stored in hoodie.properties(the table that was
// created earlier), we trait it as a non-partitioned table.
logWarning("No partition columns available from hoodie.properties." +
" Partition pruning will not work")
new StructType()
}
}
/**
* This method unravels [[StructType]] into a [[Map]] of pairs of dot-path notation with corresponding
* [[StructField]] object for every field of the provided [[StructType]], recursively.
*
* For example, following struct
* <pre>
* StructType(
* StructField("a",
* StructType(
* StructField("b", StringType),
* StructField("c", IntType)
* )
* )
* )
* </pre>
*
* will be converted into following mapping:
*
* <pre>
* "a.b" -> StructField("b", StringType),
* "a.c" -> StructField("c", IntType),
* </pre>
*/
def generateFieldMap(structType: StructType): Map[String, StructField] = {
def traverse(structField: Either[StructField, StructType]): Map[String, StructField] = {
structField match {
case Right(struct) => struct.fields.flatMap(f => traverse(Left(f))).toMap
case Left(field) => field.dataType match {
case struct: StructType => traverse(Right(struct)).map {
case (key, structField) => (s"${field.name}.$key", structField)
}
case _ => Map(field.name -> field)
}
}
}
traverse(Right(structType))
}
/**
* Prune the partition by the filter.This implementation is fork from
* org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex#prunePartitions.
*
* @param partitionPaths All the partition paths.
* @param predicates The filter condition.
* @return The Pruned partition paths.
*/
def prunePartition(partitionSchema: StructType,
partitionPaths: Seq[PartitionPath],
predicates: Seq[Expression]): Seq[PartitionPath] = {
val partitionColumnNames = partitionSchema.fields.map(_.name).toSet
val partitionPruningPredicates = predicates.filter {
_.references.map(_.name).toSet.subsetOf(partitionColumnNames)
}
if (partitionPruningPredicates.nonEmpty) {
val predicate = partitionPruningPredicates.reduce(expressions.And)
prunePartition(partitionSchema, partitionPaths, predicate)
} else {
partitionPaths
}
}
def prunePartition(partitionSchema: StructType,
partitionPaths: Seq[PartitionPath],
predicate: Expression): Seq[PartitionPath] = {
val boundPredicate = InterpretedPredicate(predicate.transform {
case a: AttributeReference =>
val index = partitionSchema.indexWhere(a.name == _.name)
BoundReference(index, partitionSchema(index).dataType, nullable = true)
})
val prunedPartitionPaths = partitionPaths.filter {
partitionPath => boundPredicate.eval(InternalRow.fromSeq(partitionPath.values))
}
logInfo(s"Total partition size is: ${partitionPaths.size}," +
s" after partition prune size is: ${prunedPartitionPaths.size}")
prunedPartitionPaths
}
def parsePartitionColumnValues(sparkParsePartitionUtil: SparkParsePartitionUtil,
configProperties: TypedProperties,
basePath: String,
partitionSchema: StructType,
partitionColumns: Array[String],
partitionPath: String): Array[Object] = {
if (partitionColumns.length == 0) {
// This is a non-partitioned table
Array.empty
} else {
val partitionFragments = partitionPath.split("/")
if (partitionFragments.length != partitionColumns.length &&
partitionColumns.length == 1) {
// If the partition column size is not equal to the partition fragment size
// and the partition column size is 1, we map the whole partition path
// to the partition column which can benefit from the partition prune.
val prefix = s"${partitionColumns.head}="
val partitionValue = if (partitionPath.startsWith(prefix)) {
// support hive style partition path
partitionPath.substring(prefix.length)
} else {
partitionPath
}
Array(UTF8String.fromString(partitionValue))
} else if (partitionFragments.length != partitionColumns.length &&
partitionColumns.length > 1) {
// If the partition column size is not equal to the partition fragments size
// and the partition column size > 1, we do not know how to map the partition
// fragments to the partition columns. So we trait it as a Non-Partitioned Table
// for the query which do not benefit from the partition prune.
logWarning(s"Cannot do the partition prune for table $basePath." +
s"The partitionFragments size (${partitionFragments.mkString(",")})" +
s" is not equal to the partition columns size(${partitionColumns.mkString(",")})")
Array.empty
} else {
// If partitionSeqs.length == partitionSchema.fields.length
// Append partition name to the partition value if the
// HIVE_STYLE_PARTITIONING is disable.
// e.g. convert "/xx/xx/2021/02" to "/xx/xx/year=2021/month=02"
val partitionWithName =
partitionFragments.zip(partitionColumns).map {
case (partition, columnName) =>
if (partition.indexOf("=") == -1) {
s"${columnName}=$partition"
} else {
partition
}
}.mkString("/")
val pathWithPartitionName = new Path(basePath, partitionWithName)
val partitionValues = parsePartitionPath(sparkParsePartitionUtil, configProperties, basePath,
pathWithPartitionName, partitionSchema)
partitionValues.map(_.asInstanceOf[Object]).toArray
}
}
}
private def parsePartitionPath(sparkParsePartitionUtil: SparkParsePartitionUtil,
configProperties: TypedProperties,
basePath: String,
partitionPath: Path,
partitionSchema: StructType): Seq[Any] = {
val timeZoneId = configProperties.getString(DateTimeUtils.TIMEZONE_OPTION, SQLConf.get.sessionLocalTimeZone)
val partitionDataTypes = partitionSchema.map(f => f.name -> f.dataType).toMap
sparkParsePartitionUtil.parsePartition(
partitionPath,
typeInference = false,
Set(new Path(basePath)),
partitionDataTypes,
DateTimeUtils.getTimeZone(timeZoneId)
)
.toSeq(partitionSchema)
}
def getConfigProperties(spark: SparkSession, options: Map[String, String]): TypedProperties = {
val sqlConf: SQLConf = spark.sessionState.conf
val properties = new TypedProperties()
// To support metadata listing via Spark SQL we allow users to pass the config via SQL Conf in spark session. Users
// would be able to run SET hoodie.metadata.enable=true in the spark sql session to enable metadata listing.
properties.setProperty(HoodieMetadataConfig.ENABLE.key(),
sqlConf.getConfString(HoodieMetadataConfig.ENABLE.key(),
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS.toString))
properties.putAll(options.asJava)
properties
}
def resolveFilterExpr(spark: SparkSession, exprString: String, tableSchema: StructType): Expression = {
val expr = spark.sessionState.sqlParser.parseExpression(exprString)
resolveFilterExpr(spark, expr, tableSchema)
}
def resolveFilterExpr(spark: SparkSession, expr: Expression, tableSchema: StructType): Expression = {
val schemaFields = tableSchema.fields
val resolvedExpr = spark.sessionState.analyzer.ResolveReferences(
Filter(expr, LocalRelation(schemaFields.head, schemaFields.drop(1): _*))
)
.asInstanceOf[Filter].condition
checkForUnresolvedRefs(resolvedExpr)
}
private def checkForUnresolvedRefs(resolvedExpr: Expression): Expression =
resolvedExpr match {
case UnresolvedAttribute(_) => throw new IllegalStateException("unresolved attribute")
case _ => resolvedExpr.mapChildren(e => checkForUnresolvedRefs(e))
}
}

View File

@@ -19,7 +19,6 @@ package org.apache.hudi
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hudi.HoodieFileIndex.getConfigProperties
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.StringUtils
@@ -37,11 +36,10 @@ import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{AnalysisException, Column, SparkSession}
import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal
import java.text.SimpleDateFormat
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
/**
* A file index which support partition prune for hoodie snapshot and read-optimized query.
@@ -75,7 +73,7 @@ case class HoodieFileIndex(spark: SparkSession,
spark = spark,
metaClient = metaClient,
schemaSpec = schemaSpec,
configProperties = getConfigProperties(spark, options),
configProperties = HoodieCommonUtils.getConfigProperties(spark, options),
queryPaths = Seq(HoodieFileIndex.getQueryPath(options)),
specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant),
fileStatusCache = fileStatusCache
@@ -149,7 +147,8 @@ case class HoodieFileIndex(spark: SparkSession,
Seq(PartitionDirectory(InternalRow.empty, candidateFiles))
} else {
// Prune the partition path by the partition filters
val prunedPartitions = prunePartition(cachedAllInputFileSlices.keySet.asScala.toSeq, convertedPartitionFilters)
val prunedPartitions = HoodieCommonUtils.prunePartition(partitionSchema,
cachedAllInputFileSlices.keySet.asScala.toSeq, convertedPartitionFilters)
var totalFileSize = 0
var candidateFileSize = 0

View File

@@ -18,24 +18,18 @@
package org.apache.hudi
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_INCREMENTAL_OPT_VAL, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, generateFieldMap, toJavaOption}
import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, toJavaOption}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.{InternalRow, expressions}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.sql.types.StructType
import scala.collection.JavaConverters._
import scala.language.implicitConversions
@@ -84,32 +78,8 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
/**
* Get the partition schema from the hoodie.properties.
*/
private lazy val _partitionSchemaFromProperties: StructType = {
val tableConfig = metaClient.getTableConfig
val partitionColumns = tableConfig.getPartitionFields
val nameFieldMap = generateFieldMap(schema)
if (partitionColumns.isPresent) {
// Note that key generator class name could be null
val keyGeneratorClassName = tableConfig.getKeyGeneratorClassName
if (classOf[TimestampBasedKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)
|| classOf[TimestampBasedAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)) {
val partitionFields = partitionColumns.get().map(column => StructField(column, StringType))
StructType(partitionFields)
} else {
val partitionFields = partitionColumns.get().map(column =>
nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column: '" +
s"$column' in the schema[${schema.fields.mkString(",")}]")))
StructType(partitionFields)
}
} else {
// If the partition columns have not stored in hoodie.properties(the table that was
// created earlier), we trait it as a non-partitioned table.
logWarning("No partition columns available from hoodie.properties." +
" Partition pruning will not work")
new StructType()
}
}
private lazy val _partitionSchemaFromProperties: StructType =
HoodieCommonUtils.getPartitionSchemaFromProperty(metaClient, Some(schema))
/**
* Get the data schema of the table.
@@ -140,111 +110,16 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
*/
def listFileSlices(partitionFilters: Seq[Expression]): Map[String, Seq[FileSlice]] = {
// Prune the partition path by the partition filters
val prunedPartitions = prunePartition(cachedAllInputFileSlices.asScala.keys.toSeq, partitionFilters)
val prunedPartitions = HoodieCommonUtils.prunePartition(partitionSchema,
cachedAllInputFileSlices.asScala.keys.toSeq, partitionFilters)
prunedPartitions.map(partition => {
(partition.path, cachedAllInputFileSlices.get(partition).asScala)
}).toMap
}
/**
* Prune the partition by the filter.This implementation is fork from
* org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex#prunePartitions.
*
* @param partitionPaths All the partition paths.
* @param predicates The filter condition.
* @return The Pruned partition paths.
*/
def prunePartition(partitionPaths: Seq[PartitionPath], predicates: Seq[Expression]): Seq[PartitionPath] = {
val partitionColumnNames = partitionSchema.fields.map(_.name).toSet
val partitionPruningPredicates = predicates.filter {
_.references.map(_.name).toSet.subsetOf(partitionColumnNames)
}
if (partitionPruningPredicates.nonEmpty) {
val predicate = partitionPruningPredicates.reduce(expressions.And)
val boundPredicate = InterpretedPredicate(predicate.transform {
case a: AttributeReference =>
val index = partitionSchema.indexWhere(a.name == _.name)
BoundReference(index, partitionSchema(index).dataType, nullable = true)
})
val prunedPartitionPaths = partitionPaths.filter {
partitionPath => boundPredicate.eval(InternalRow.fromSeq(partitionPath.values))
}
logInfo(s"Total partition size is: ${partitionPaths.size}," +
s" after partition prune size is: ${prunedPartitionPaths.size}")
prunedPartitionPaths
} else {
partitionPaths
}
}
protected def parsePartitionColumnValues(partitionColumns: Array[String], partitionPath: String): Array[Object] = {
if (partitionColumns.length == 0) {
// This is a non-partitioned table
Array.empty
} else {
val partitionFragments = partitionPath.split("/")
if (partitionFragments.length != partitionColumns.length &&
partitionColumns.length == 1) {
// If the partition column size is not equal to the partition fragment size
// and the partition column size is 1, we map the whole partition path
// to the partition column which can benefit from the partition prune.
val prefix = s"${partitionColumns.head}="
val partitionValue = if (partitionPath.startsWith(prefix)) {
// support hive style partition path
partitionPath.substring(prefix.length)
} else {
partitionPath
}
Array(UTF8String.fromString(partitionValue))
} else if (partitionFragments.length != partitionColumns.length &&
partitionColumns.length > 1) {
// If the partition column size is not equal to the partition fragments size
// and the partition column size > 1, we do not know how to map the partition
// fragments to the partition columns. So we trait it as a Non-Partitioned Table
// for the query which do not benefit from the partition prune.
logWarning(s"Cannot do the partition prune for table $basePath." +
s"The partitionFragments size (${partitionFragments.mkString(",")})" +
s" is not equal to the partition columns size(${partitionColumns.mkString(",")})")
Array.empty
} else {
// If partitionSeqs.length == partitionSchema.fields.length
// Append partition name to the partition value if the
// HIVE_STYLE_PARTITIONING is disable.
// e.g. convert "/xx/xx/2021/02" to "/xx/xx/year=2021/month=02"
val partitionWithName =
partitionFragments.zip(partitionColumns).map {
case (partition, columnName) =>
if (partition.indexOf("=") == -1) {
s"${columnName}=$partition"
} else {
partition
}
}.mkString("/")
val pathWithPartitionName = new Path(basePath, partitionWithName)
val partitionValues = parsePartitionPath(pathWithPartitionName, partitionSchema)
partitionValues.map(_.asInstanceOf[Object]).toArray
}
}
}
private def parsePartitionPath(partitionPath: Path, partitionSchema: StructType): Seq[Any] = {
val timeZoneId = configProperties.getString(DateTimeUtils.TIMEZONE_OPTION, SQLConf.get.sessionLocalTimeZone)
val partitionDataTypes = partitionSchema.map(f => f.name -> f.dataType).toMap
sparkParsePartitionUtil.parsePartition(
partitionPath,
typeInference = false,
Set(new Path(basePath)),
partitionDataTypes,
DateTimeUtils.getTimeZone(timeZoneId)
)
.toSeq(partitionSchema)
HoodieCommonUtils.parsePartitionColumnValues(sparkParsePartitionUtil, configProperties,
basePath, partitionSchema, partitionColumns, partitionPath)
}
}
@@ -257,45 +132,6 @@ object SparkHoodieTableFileIndex {
org.apache.hudi.common.util.Option.empty()
}
/**
* This method unravels [[StructType]] into a [[Map]] of pairs of dot-path notation with corresponding
* [[StructField]] object for every field of the provided [[StructType]], recursively.
*
* For example, following struct
* <pre>
* StructType(
* StructField("a",
* StructType(
* StructField("b", StringType),
* StructField("c", IntType)
* )
* )
* )
* </pre>
*
* will be converted into following mapping:
*
* <pre>
* "a.b" -> StructField("b", StringType),
* "a.c" -> StructField("c", IntType),
* </pre>
*/
private def generateFieldMap(structType: StructType) : Map[String, StructField] = {
def traverse(structField: Either[StructField, StructType]) : Map[String, StructField] = {
structField match {
case Right(struct) => struct.fields.flatMap(f => traverse(Left(f))).toMap
case Left(field) => field.dataType match {
case struct: StructType => traverse(Right(struct)).map {
case (key, structField) => (s"${field.name}.$key", structField)
}
case _ => Map(field.name -> field)
}
}
}
traverse(Right(structType))
}
private def deduceQueryType(configProperties: TypedProperties): HoodieTableQueryType = {
configProperties.asScala(QUERY_TYPE.key()) match {
case QUERY_TYPE_SNAPSHOT_OPT_VAL => HoodieTableQueryType.SNAPSHOT

View File

@@ -18,12 +18,16 @@
package org.apache.hudi;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hadoop.fs.FileSystem;
@@ -80,4 +84,17 @@ public class HoodieDataSourceHelpers {
return metaClient.getCommitTimeline().filterCompletedInstants();
}
}
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
public static Option<HoodieClusteringPlan> getClusteringPlan(FileSystem fs, String basePath, String instantTime) {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf())
.setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
HoodieInstant hoodieInstant = HoodieTimeline.getReplaceCommitRequestedInstant(instantTime);
Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlan = ClusteringUtils.getClusteringPlan(metaClient, hoodieInstant);
if (clusteringPlan.isPresent()) {
return Option.of(clusteringPlan.get().getValue());
} else {
return Option.empty();
}
}
}

View File

@@ -17,17 +17,15 @@
package org.apache.spark.sql.hudi.command
import org.apache.hudi.HoodieCommonUtils
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieTableType}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.{HoodieTimer, Option => HOption}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.{DataSourceUtils, DataSourceWriteOptions, HoodieWriterUtils}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation
import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.{CompactionOperation, RUN, SCHEDULE}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{Row, SparkSession}
@@ -44,19 +42,7 @@ case class CompactionHoodiePathCommand(path: String,
assert(metaClient.getTableType == HoodieTableType.MERGE_ON_READ,
s"Must compaction on a Merge On Read table.")
val schemaUtil = new TableSchemaResolver(metaClient)
val schemaStr = schemaUtil.getTableAvroSchemaWithoutMetadataFields.toString
val parameters = HoodieWriterUtils.parametersWithWriteDefaults(
HoodieSqlCommonUtils.withSparkConf(sparkSession, Map.empty)(
Map(
DataSourceWriteOptions.TABLE_TYPE.key() -> HoodieTableType.MERGE_ON_READ.name()
)
)
)
val jsc = new JavaSparkContext(sparkSession.sparkContext)
val client = DataSourceUtils.createHoodieClient(jsc, schemaStr, path,
metaClient.getTableConfig.getTableName, parameters)
val client = HoodieCommonUtils.createHoodieClientFromPath(sparkSession, path, Map.empty)
operation match {
case SCHEDULE =>

View File

@@ -21,12 +21,18 @@ import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.model.HoodieRecordPayload
import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
import org.apache.hudi.exception.HoodieClusteringException
import org.apache.hudi.index.HoodieIndex.IndexType
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.types._
import java.nio.charset.Charset
import java.sql.{Date, Timestamp}
abstract class BaseProcedure extends Procedure {
val INVALID_ARG_INDEX: Int = -1
@@ -68,14 +74,22 @@ abstract class BaseProcedure extends Procedure {
args.map.getOrDefault(key, INVALID_ARG_INDEX)
}
protected def getArgValueOrDefault(args: ProcedureArgs, parameter: ProcedureParameter): Any = {
protected def getArgValueOrDefault(args: ProcedureArgs, parameter: ProcedureParameter): Option[Any] = {
var argsIndex: Int = INVALID_ARG_INDEX
if (args.isNamedArgs) {
argsIndex = getArgsIndex(parameter.name, args)
} else {
argsIndex = getArgsIndex(parameter.index.toString, args)
}
if (argsIndex.equals(INVALID_ARG_INDEX)) parameter.default else getInternalRowValue(args.internalRow, argsIndex, parameter.dataType)
if (argsIndex.equals(INVALID_ARG_INDEX)) {
parameter.default match {
case option: Option[Any] => option
case _ => Option.apply(parameter.default)
}
} else {
Option.apply(getInternalRowValue(args.internalRow, argsIndex, parameter.dataType))
}
}
protected def getInternalRowValue(row: InternalRow, index: Int, dataType: DataType): Any = {
@@ -96,4 +110,40 @@ abstract class BaseProcedure extends Procedure {
throw new UnsupportedOperationException(s"type: ${dataType.typeName} not supported")
}
}
protected def getBasePath(tableName: Option[Any], tablePath: Option[Any]): String = {
tableName.map(
t => HoodieCatalogTable(sparkSession, new TableIdentifier(t.asInstanceOf[String])).tableLocation)
.getOrElse(
tablePath.map(p => p.asInstanceOf[String]).getOrElse(
throw new HoodieClusteringException("Table name or table path must be given one"))
)
}
protected def convertCatalystType(value: String, dataType: DataType): Any = {
try {
val valueWithType = dataType match {
case StringType => value
case BinaryType => value.getBytes(Charset.forName("utf-8"))
case BooleanType => value.toBoolean
case DoubleType => value.toDouble
case d: DecimalType => Decimal.apply(BigDecimal(value), d.precision, d.scale)
case FloatType => value.toFloat
case ByteType => value.toByte
case IntegerType => value.toInt
case LongType => value.toLong
case ShortType => value.toShort
case DateType => DateTimeUtils.fromJavaDate(Date.valueOf(value))
case TimestampType => DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(value))
case _ => throw new HoodieClusteringException("Data type not support:" + dataType)
}
valueWithType
} catch {
case e: HoodieClusteringException =>
throw e
case _ =>
throw new HoodieClusteringException("Data type not match, value:" + value + ", dataType:" + dataType)
}
}
}

View File

@@ -36,6 +36,8 @@ object HoodieProcedures {
mapBuilder.put(ShowCommitsProcedure.NAME, ShowCommitsProcedure.builder)
mapBuilder.put(ShowCommitsMetadataProcedure.NAME, ShowCommitsMetadataProcedure.builder)
mapBuilder.put(RollbackToInstantTimeProcedure.NAME, RollbackToInstantTimeProcedure.builder)
mapBuilder.put(RunClusteringProcedure.NAME, RunClusteringProcedure.builder)
mapBuilder.put(ShowClusteringProcedure.NAME, ShowClusteringProcedure.builder)
mapBuilder.build
}
}

View File

@@ -45,8 +45,8 @@ class RollbackToInstantTimeProcedure extends BaseProcedure with ProcedureBuilder
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
val table = getArgValueOrDefault(args, PARAMETERS(0)).asInstanceOf[String]
val instantTime = getArgValueOrDefault(args, PARAMETERS(1)).asInstanceOf[String]
val table = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String]
val instantTime = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, new TableIdentifier(table))
val basePath = hoodieCatalogTable.tableLocation

View File

@@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command.procedures
import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.{ClusteringUtils, Option => HOption}
import org.apache.hudi.config.HoodieClusteringConfig
import org.apache.hudi.exception.HoodieClusteringException
import org.apache.hudi.metadata.HoodieTableMetadata
import org.apache.hudi.{HoodieCommonUtils, SparkAdapterSupport}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.types._
import java.util.Properties
import java.util.function.Supplier
import scala.collection.JavaConverters
import scala.collection.JavaConverters._
class RunClusteringProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport with Logging {
/**
* OPTIMIZE table_name|table_path [WHERE predicate]
* [ORDER BY (col_name1 [, ...] ) ]
*/
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "path", DataTypes.StringType, None),
ProcedureParameter.optional(2, "predicate", DataTypes.StringType, None),
ProcedureParameter.optional(3, "order", DataTypes.StringType, None)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("timestamp", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("groups", DataTypes.IntegerType, nullable = true, Metadata.empty)
))
def parameters: Array[ProcedureParameter] = PARAMETERS
def outputType: StructType = OUTPUT_TYPE
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
val predicate = getArgValueOrDefault(args, PARAMETERS(2))
val orderColumns = getArgValueOrDefault(args, PARAMETERS(3))
val basePath: String = getBasePath(tableName, tablePath)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
var conf: Map[String, String] = Map.empty
predicate match {
case Some(p) =>
val partitionColumnsSchema = HoodieCommonUtils.getPartitionSchemaFromProperty(metaClient, None)
val partitionPredicate = HoodieCommonUtils.resolveFilterExpr(
spark, p.asInstanceOf[String], partitionColumnsSchema)
val partitionSelected = prunePartition(metaClient, partitionPredicate)
conf = conf ++ Map(
HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key() -> "SELECTED_PARTITIONS",
HoodieClusteringConfig.PARTITION_SELECTED.key() -> partitionSelected
)
logInfo(s"Partition predicates: ${p}, partition selected: ${partitionSelected}")
case _ =>
logInfo("No partition predicates")
}
// Construct sort column info
orderColumns match {
case Some(o) =>
validateOrderColumns(o.asInstanceOf[String], metaClient)
conf = conf ++ Map(
HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key() -> o.asInstanceOf[String]
)
logInfo(s"Order columns: ${o}")
case _ =>
logInfo("No order columns")
}
// Get all pending clustering instants
var pendingClustering = ClusteringUtils.getAllPendingClusteringPlans(metaClient)
.iterator().asScala.map(_.getLeft.getTimestamp).toSeq.sortBy(f => f)
logInfo(s"Pending clustering instants: ${pendingClustering.mkString(",")}")
val client = HoodieCommonUtils.createHoodieClientFromPath(sparkSession, basePath, conf)
val instantTime = HoodieActiveTimeline.createNewInstantTime
if (client.scheduleClusteringAtInstant(instantTime, HOption.empty())) {
pendingClustering ++= Seq(instantTime)
}
logInfo(s"Clustering instants to run: ${pendingClustering.mkString(",")}.")
val startTs = System.currentTimeMillis()
pendingClustering.foreach(client.cluster(_, true))
logInfo(s"Finish clustering all the instants: ${pendingClustering.mkString(",")}," +
s" time cost: ${System.currentTimeMillis() - startTs}ms.")
Seq.empty[Row]
}
override def build: Procedure = new RunClusteringProcedure()
def prunePartition(metaClient: HoodieTableMetaClient, partitionPredicate: Expression): String = {
val partitionSchema = HoodieCommonUtils.getPartitionSchemaFromProperty(metaClient, None)
// Get tableName meta data
val engineContext = new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext))
val properties = new Properties()
properties.putAll(JavaConverters.mapAsJavaMapConverter(sparkSession.sessionState.conf.getAllConfs).asJava)
val metadataConfig = HoodieMetadataConfig.newBuilder().fromProperties(properties).build()
val tableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, metaClient.getBasePath,
FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue)
val sparkParsePartitionUtil = sparkAdapter.createSparkParsePartitionUtil(sparkSession.sessionState.conf)
val typedProperties = HoodieCommonUtils.getConfigProperties(sparkSession, Map.empty)
val partitionColumns = metaClient.getTableConfig.getPartitionFields.orElse(Array[String]())
// Translate all partition path to {@code org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath}
val partitionPaths = tableMetadata.getAllPartitionPaths.asScala.map(partitionPath => {
val partitionColumnValues = HoodieCommonUtils.parsePartitionColumnValues(
sparkParsePartitionUtil, typedProperties, metaClient.getBasePath,
partitionSchema, partitionColumns, partitionPath)
new PartitionPath(partitionPath, partitionColumnValues)
})
// Filter partition by predicates
val selectedPartitions = HoodieCommonUtils.prunePartition(
partitionSchema, partitionPaths, partitionPredicate)
selectedPartitions.map(partitionPath => partitionPath.getPath).toSet.mkString(",")
}
def validateOrderColumns(orderColumns: String, metaClient: HoodieTableMetaClient): Unit = {
if (orderColumns == null) {
throw new HoodieClusteringException("Order columns is null")
}
val tableSchemaResolver = new TableSchemaResolver(metaClient)
val fields = tableSchemaResolver.getTableAvroSchema(false)
.getFields.asScala.map(_.name().toLowerCase)
orderColumns.split(",").foreach(col => {
if (!fields.contains(col.toLowerCase)) {
throw new HoodieClusteringException("Order column not exist:" + col)
}
})
}
}
object RunClusteringProcedure {
val NAME = "run_clustering"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new RunClusteringProcedure
}
}

View File

@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command.procedures
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.ClusteringUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import java.util.function.Supplier
import scala.collection.JavaConverters._
class ShowClusteringProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport with Logging {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "path", DataTypes.StringType, None),
ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 20)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("timestamp", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("groups", DataTypes.IntegerType, nullable = true, Metadata.empty)
))
def parameters: Array[ProcedureParameter] = PARAMETERS
def outputType: StructType = OUTPUT_TYPE
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
val limit = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int]
val basePath: String = getBasePath(tableName, tablePath)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
ClusteringUtils.getAllPendingClusteringPlans(metaClient).iterator().asScala.map { p =>
Row(p.getLeft.getTimestamp, p.getRight.getInputGroups.size())
}.toSeq.take(limit)
}
override def build: Procedure = new ShowClusteringProcedure()
}
object ShowClusteringProcedure {
val NAME = "show_clustering"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new ShowClusteringProcedure
}
}

View File

@@ -75,8 +75,8 @@ class ShowCommitsProcedure(includeExtraMetadata: Boolean) extends BaseProcedure
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
val table = getArgValueOrDefault(args, PARAMETERS(0)).asInstanceOf[String]
val limit = getArgValueOrDefault(args, PARAMETERS(1)).asInstanceOf[Int]
val table = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String]
val limit = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int]
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, new TableIdentifier(table))
val basePath = hoodieCatalogTable.tableLocation

View File

@@ -61,20 +61,20 @@ class HoodieSqlCommonAstBuilder(session: SparkSession, delegate: ParserInterface
CompactionTable(table, operation, timestamp)
}
override def visitCompactionOnPath (ctx: CompactionOnPathContext): LogicalPlan = withOrigin(ctx) {
override def visitCompactionOnPath(ctx: CompactionOnPathContext): LogicalPlan = withOrigin(ctx) {
val path = string(ctx.path)
val operation = CompactionOperation.withName(ctx.operation.getText.toUpperCase)
val timestamp = if (ctx.instantTimestamp != null) Some(ctx.instantTimestamp.getText.toLong) else None
CompactionPath(path, operation, timestamp)
}
override def visitShowCompactionOnTable (ctx: ShowCompactionOnTableContext): LogicalPlan = withOrigin(ctx) {
override def visitShowCompactionOnTable(ctx: ShowCompactionOnTableContext): LogicalPlan = withOrigin(ctx) {
val table = ctx.tableIdentifier().accept(this).asInstanceOf[LogicalPlan]
if (ctx.limit != null) {
CompactionShowOnTable(table, ctx.limit.getText.toInt)
} else {
CompactionShowOnTable(table)
}
if (ctx.limit != null) {
CompactionShowOnTable(table, ctx.limit.getText.toInt)
} else {
CompactionShowOnTable(table)
}
}
override def visitShowCompactionOnPath(ctx: ShowCompactionOnPathContext): LogicalPlan = withOrigin(ctx) {

View File

@@ -19,12 +19,10 @@ package org.apache.hudi
import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{Expression, Not}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.hudi.DataSkippingUtils
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType, VarcharType}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, SparkSession}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
@@ -75,7 +73,7 @@ class TestDataSkippingUtils extends HoodieClientTestBase {
@ParameterizedTest
@MethodSource(Array("testBaseLookupFilterExpressionsSource", "testAdvancedLookupFilterExpressionsSource"))
def testLookupFilterExpressions(sourceExpr: String, input: Seq[IndexRow], output: Seq[String]): Unit = {
val resolvedExpr: Expression = resolveFilterExpr(sourceExpr, sourceTableSchema)
val resolvedExpr: Expression = HoodieCommonUtils.resolveFilterExpr(spark, sourceExpr, sourceTableSchema)
val lookupFilter = DataSkippingUtils.createColumnStatsIndexFilterExpr(resolvedExpr, indexSchema)
@@ -96,7 +94,7 @@ class TestDataSkippingUtils extends HoodieClientTestBase {
@ParameterizedTest
@MethodSource(Array("testStringsLookupFilterExpressionsSource"))
def testStringsLookupFilterExpressions(sourceExpr: Expression, input: Seq[IndexRow], output: Seq[String]): Unit = {
val resolvedExpr = resolveFilterExpr(sourceExpr, sourceTableSchema)
val resolvedExpr = HoodieCommonUtils.resolveFilterExpr(spark, sourceExpr, sourceTableSchema)
val lookupFilter = DataSkippingUtils.createColumnStatsIndexFilterExpr(resolvedExpr, indexSchema)
val spark2 = spark
@@ -112,27 +110,6 @@ class TestDataSkippingUtils extends HoodieClientTestBase {
assertEquals(output, rows)
}
private def resolveFilterExpr(exprString: String, tableSchema: StructType): Expression = {
val expr = spark.sessionState.sqlParser.parseExpression(exprString)
resolveFilterExpr(expr, tableSchema)
}
private def resolveFilterExpr(expr: Expression, tableSchema: StructType): Expression = {
val schemaFields = tableSchema.fields
val resolvedExpr = spark.sessionState.analyzer.ResolveReferences(
Filter(expr, LocalRelation(schemaFields.head, schemaFields.drop(1): _*))
)
.asInstanceOf[Filter].condition
checkForUnresolvedRefs(resolvedExpr)
}
def checkForUnresolvedRefs(resolvedExpr: Expression): Expression =
resolvedExpr match {
case UnresolvedAttribute(_) => throw new IllegalStateException("unresolved attribute")
case _ => resolvedExpr.mapChildren(e => checkForUnresolvedRefs(e))
}
}
object TestDataSkippingUtils {

View File

@@ -17,6 +17,13 @@
package org.apache.spark.sql.hudi
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.{HoodieCommonUtils, HoodieDataSourceHelpers}
import scala.collection.JavaConverters.asScalaIteratorConverter
class TestCallProcedure extends TestHoodieSqlBase {
test("Test Call show_commits Procedure") {
@@ -129,4 +136,222 @@ class TestCallProcedure extends TestHoodieSqlBase {
assertResult(1){commits.length}
}
}
test("Test Call run_clustering Procedure By Table") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}/$tableName"
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| options (
| primaryKey ='id',
| type = '$tableType',
| preCombineField = 'ts'
| )
| partitioned by(ts)
| location '$basePath'
""".stripMargin)
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
val client = HoodieCommonUtils.createHoodieClientFromPath(spark, basePath, Map.empty)
// Generate the first clustering plan
val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime
client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty())
// Generate the second clustering plan
spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)")
val secondScheduleInstant = HoodieActiveTimeline.createNewInstantTime
client.scheduleClusteringAtInstant(secondScheduleInstant, HOption.empty())
checkAnswer(s"call show_clustering('$tableName')")(
Seq(firstScheduleInstant, 3),
Seq(secondScheduleInstant, 1)
)
// Do clustering for all clustering plan generated above, and no new clustering
// instant will be generated because of there is no commit after the second
// clustering plan generated
spark.sql(s"call run_clustering(table => '$tableName', order => 'ts')")
// No new commits
val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf())
assertResult(false)(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, secondScheduleInstant))
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 10.0, 1000),
Seq(2, "a2", 10.0, 1001),
Seq(3, "a3", 10.0, 1002),
Seq(4, "a4", 10.0, 1003)
)
// After clustering there should be no pending clustering.
checkAnswer(s"call show_clustering(table => '$tableName')")()
// Check the number of finished clustering instants
val finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
.getInstants
.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION)
.toSeq
assertResult(2)(finishedClustering.size)
// Do clustering without manual schedule(which will do the schedule if no pending clustering exists)
spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)")
spark.sql(s"insert into $tableName values(6, 'a6', 10, 1005)")
spark.sql(s"call run_clustering(table => '$tableName', order => 'ts')")
val thirdClusteringInstant = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
.findInstantsAfter(secondScheduleInstant)
.getInstants
.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION)
.toSeq
// Should have a new replace commit after the second clustering command.
assertResult(1)(thirdClusteringInstant.size)
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 10.0, 1000),
Seq(2, "a2", 10.0, 1001),
Seq(3, "a3", 10.0, 1002),
Seq(4, "a4", 10.0, 1003),
Seq(5, "a5", 10.0, 1004),
Seq(6, "a6", 10.0, 1005)
)
}
}
}
test("Test Call run_clustering Procedure By Path") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}/$tableName"
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| options (
| primaryKey ='id',
| type = '$tableType',
| preCombineField = 'ts'
| )
| partitioned by(ts)
| location '$basePath'
""".stripMargin)
spark.sql(s"call run_clustering(path => '$basePath')")
checkAnswer(s"call show_clustering(path => '$basePath')")()
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
val client = HoodieCommonUtils.createHoodieClientFromPath(spark, basePath, Map.empty)
// Generate the first clustering plan
val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime
client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty())
checkAnswer(s"call show_clustering(path => '$basePath')")(
Seq(firstScheduleInstant, 3)
)
// Do clustering for all the clustering plan
spark.sql(s"call run_clustering(path => '$basePath', order => 'ts')")
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 10.0, 1000),
Seq(2, "a2", 10.0, 1001),
Seq(3, "a3", 10.0, 1002)
)
val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf())
HoodieDataSourceHelpers.hasNewCommits(fs, basePath, firstScheduleInstant)
// Check the number of finished clustering instants
var finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
.getInstants
.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION)
.toSeq
assertResult(1)(finishedClustering.size)
// Do clustering without manual schedule(which will do the schedule if no pending clustering exists)
spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)")
spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)")
spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts >= 1003L')")
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 10.0, 1000),
Seq(2, "a2", 10.0, 1001),
Seq(3, "a3", 10.0, 1002),
Seq(4, "a4", 10.0, 1003),
Seq(5, "a5", 10.0, 1004)
)
finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
.getInstants
.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION)
.toSeq
assertResult(2)(finishedClustering.size)
}
}
}
test("Test Call run_clustering Procedure With Partition Pruning") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}/$tableName"
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| options (
| primaryKey ='id',
| type = '$tableType',
| preCombineField = 'ts'
| )
| partitioned by(ts)
| location '$basePath'
""".stripMargin)
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
// Do clustering table with partition predicate
spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts <= 1001L', order => 'ts')")
// Check the num of completed clustering instant
val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf())
val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
.getInstants
.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION)
.toSeq
assertResult(1)(clusteringInstants.size)
val clusteringInstant = clusteringInstants.head
val clusteringPlan = HoodieDataSourceHelpers.getClusteringPlan(fs, basePath, clusteringInstant.getTimestamp)
assertResult(true)(clusteringPlan.isPresent)
assertResult(2)(clusteringPlan.get().getInputGroups.size())
checkAnswer(s"call show_clustering(table => '$tableName')")()
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 10.0, 1000),
Seq(2, "a2", 10.0, 1001),
Seq(3, "a3", 10.0, 1002)
)
}
}
}
}