1
0

[HUDI-3567] Refactor HoodieCommonUtils to make code more reasonable (#4982)

This commit is contained in:
huberylee
2022-03-12 05:23:19 +08:00
committed by GitHub
parent b00180342e
commit 56cb49485d
11 changed files with 706 additions and 593 deletions

View File

@@ -24,9 +24,9 @@ import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType
@@ -70,7 +70,8 @@ class BaseFileOnlyViewRelation(sqlContext: SQLContext,
HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns)
val filterExpressions = convertToExpressions(filters)
val (partitionFilters, dataFilters) = filterExpressions.partition(isPartitionPredicate)
val (partitionFilters, dataFilters) = HoodieCatalystExpressionUtils.splitPartitionAndDataPredicates(
sparkSession, filterExpressions, partitionColumns)
val filePartitions = getPartitions(partitionFilters, dataFilters)
@@ -137,17 +138,4 @@ class BaseFileOnlyViewRelation(sqlContext: SQLContext,
catalystExpressions.filter(_.isDefined).map(_.get).toArray
}
/**
* Checks whether given expression only references only references partition columns
* (and involves no sub-query)
*/
private def isPartitionPredicate(condition: Expression): Boolean = {
// Validates that the provided names both resolve to the same entity
val resolvedNameEquals = sparkSession.sessionState.analyzer.resolver
condition.references.forall { r => partitionColumns.exists(resolvedNameEquals(r.name, _)) } &&
!SubqueryExpression.hasSubquery(condition)
}
}

View File

@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.withSparkConf
import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.collection.immutable.Map
object HoodieCLIUtils {
def createHoodieClientFromPath(sparkSession: SparkSession,
basePath: String,
conf: Map[String, String]): SparkRDDWriteClient[_] = {
val metaClient = HoodieTableMetaClient.builder().setBasePath(basePath)
.setConf(sparkSession.sessionState.newHadoopConf()).build()
val schemaUtil = new TableSchemaResolver(metaClient)
val schemaStr = schemaUtil.getTableAvroSchemaWithoutMetadataFields.toString
val finalParameters = HoodieWriterUtils.parametersWithWriteDefaults(
withSparkConf(sparkSession, Map.empty)(
conf + (DataSourceWriteOptions.TABLE_TYPE.key() -> metaClient.getTableType.name()))
)
val jsc = new JavaSparkContext(sparkSession.sparkContext)
DataSourceUtils.createHoodieClient(jsc, schemaStr, basePath,
metaClient.getTableConfig.getTableName, finalParameters.asJava)
}
}

View File

@@ -1,286 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi
import org.apache.hadoop.fs.Path
import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.{InternalRow, expressions}
import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.withSparkConf
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.collection.immutable.Map
object HoodieCommonUtils extends Logging {
def createHoodieClientFromPath(sparkSession: SparkSession, basePath: String,
conf: Map[String, String]): SparkRDDWriteClient[_] = {
val metaClient = HoodieTableMetaClient.builder().setBasePath(basePath)
.setConf(sparkSession.sessionState.newHadoopConf()).build()
val schemaUtil = new TableSchemaResolver(metaClient)
val schemaStr = schemaUtil.getTableAvroSchemaWithoutMetadataFields.toString
val finalParameters = HoodieWriterUtils.parametersWithWriteDefaults(
withSparkConf(sparkSession, Map.empty)(
conf + (DataSourceWriteOptions.TABLE_TYPE.key() -> metaClient.getTableType.name()))
)
val jsc = new JavaSparkContext(sparkSession.sparkContext)
DataSourceUtils.createHoodieClient(jsc, schemaStr, basePath,
metaClient.getTableConfig.getTableName, finalParameters.asJava)
}
def getPartitionSchemaFromProperty(metaClient: HoodieTableMetaClient,
schemaSpec: Option[StructType]): StructType = {
val schema = schemaSpec.getOrElse({
val schemaUtil = new TableSchemaResolver(metaClient)
AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema)
})
val tableConfig = metaClient.getTableConfig
val partitionColumns = tableConfig.getPartitionFields
val nameFieldMap = generateFieldMap(schema)
if (partitionColumns.isPresent) {
// Note that key generator class name could be null
val keyGeneratorClassName = tableConfig.getKeyGeneratorClassName
if (classOf[TimestampBasedKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)
|| classOf[TimestampBasedAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)) {
val partitionFields = partitionColumns.get().map(column => StructField(column, StringType))
StructType(partitionFields)
} else {
val partitionFields = partitionColumns.get().map(column =>
nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column: '" +
s"$column' in the schema[${schema.fields.mkString(",")}]")))
StructType(partitionFields)
}
} else {
// If the partition columns have not stored in hoodie.properties(the table that was
// created earlier), we trait it as a non-partitioned table.
logWarning("No partition columns available from hoodie.properties." +
" Partition pruning will not work")
new StructType()
}
}
/**
* This method unravels [[StructType]] into a [[Map]] of pairs of dot-path notation with corresponding
* [[StructField]] object for every field of the provided [[StructType]], recursively.
*
* For example, following struct
* <pre>
* StructType(
* StructField("a",
* StructType(
* StructField("b", StringType),
* StructField("c", IntType)
* )
* )
* )
* </pre>
*
* will be converted into following mapping:
*
* <pre>
* "a.b" -> StructField("b", StringType),
* "a.c" -> StructField("c", IntType),
* </pre>
*/
def generateFieldMap(structType: StructType): Map[String, StructField] = {
def traverse(structField: Either[StructField, StructType]): Map[String, StructField] = {
structField match {
case Right(struct) => struct.fields.flatMap(f => traverse(Left(f))).toMap
case Left(field) => field.dataType match {
case struct: StructType => traverse(Right(struct)).map {
case (key, structField) => (s"${field.name}.$key", structField)
}
case _ => Map(field.name -> field)
}
}
}
traverse(Right(structType))
}
/**
* Prune the partition by the filter.This implementation is fork from
* org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex#prunePartitions.
*
* @param partitionPaths All the partition paths.
* @param predicates The filter condition.
* @return The Pruned partition paths.
*/
def prunePartition(partitionSchema: StructType,
partitionPaths: Seq[PartitionPath],
predicates: Seq[Expression]): Seq[PartitionPath] = {
val partitionColumnNames = partitionSchema.fields.map(_.name).toSet
val partitionPruningPredicates = predicates.filter {
_.references.map(_.name).toSet.subsetOf(partitionColumnNames)
}
if (partitionPruningPredicates.nonEmpty) {
val predicate = partitionPruningPredicates.reduce(expressions.And)
prunePartition(partitionSchema, partitionPaths, predicate)
} else {
partitionPaths
}
}
def prunePartition(partitionSchema: StructType,
partitionPaths: Seq[PartitionPath],
predicate: Expression): Seq[PartitionPath] = {
val boundPredicate = InterpretedPredicate(predicate.transform {
case a: AttributeReference =>
val index = partitionSchema.indexWhere(a.name == _.name)
BoundReference(index, partitionSchema(index).dataType, nullable = true)
})
val prunedPartitionPaths = partitionPaths.filter {
partitionPath => boundPredicate.eval(InternalRow.fromSeq(partitionPath.values))
}
logInfo(s"Total partition size is: ${partitionPaths.size}," +
s" after partition prune size is: ${prunedPartitionPaths.size}")
prunedPartitionPaths
}
def parsePartitionColumnValues(sparkParsePartitionUtil: SparkParsePartitionUtil,
configProperties: TypedProperties,
basePath: String,
partitionSchema: StructType,
partitionColumns: Array[String],
partitionPath: String): Array[Object] = {
if (partitionColumns.length == 0) {
// This is a non-partitioned table
Array.empty
} else {
val partitionFragments = partitionPath.split("/")
if (partitionFragments.length != partitionColumns.length &&
partitionColumns.length == 1) {
// If the partition column size is not equal to the partition fragment size
// and the partition column size is 1, we map the whole partition path
// to the partition column which can benefit from the partition prune.
val prefix = s"${partitionColumns.head}="
val partitionValue = if (partitionPath.startsWith(prefix)) {
// support hive style partition path
partitionPath.substring(prefix.length)
} else {
partitionPath
}
Array(UTF8String.fromString(partitionValue))
} else if (partitionFragments.length != partitionColumns.length &&
partitionColumns.length > 1) {
// If the partition column size is not equal to the partition fragments size
// and the partition column size > 1, we do not know how to map the partition
// fragments to the partition columns. So we trait it as a Non-Partitioned Table
// for the query which do not benefit from the partition prune.
logWarning(s"Cannot do the partition prune for table $basePath." +
s"The partitionFragments size (${partitionFragments.mkString(",")})" +
s" is not equal to the partition columns size(${partitionColumns.mkString(",")})")
Array.empty
} else {
// If partitionSeqs.length == partitionSchema.fields.length
// Append partition name to the partition value if the
// HIVE_STYLE_PARTITIONING is disable.
// e.g. convert "/xx/xx/2021/02" to "/xx/xx/year=2021/month=02"
val partitionWithName =
partitionFragments.zip(partitionColumns).map {
case (partition, columnName) =>
if (partition.indexOf("=") == -1) {
s"${columnName}=$partition"
} else {
partition
}
}.mkString("/")
val pathWithPartitionName = new Path(basePath, partitionWithName)
val partitionValues = parsePartitionPath(sparkParsePartitionUtil, configProperties, basePath,
pathWithPartitionName, partitionSchema)
partitionValues.map(_.asInstanceOf[Object]).toArray
}
}
}
private def parsePartitionPath(sparkParsePartitionUtil: SparkParsePartitionUtil,
configProperties: TypedProperties,
basePath: String,
partitionPath: Path,
partitionSchema: StructType): Seq[Any] = {
val timeZoneId = configProperties.getString(DateTimeUtils.TIMEZONE_OPTION, SQLConf.get.sessionLocalTimeZone)
val partitionDataTypes = partitionSchema.map(f => f.name -> f.dataType).toMap
sparkParsePartitionUtil.parsePartition(
partitionPath,
typeInference = false,
Set(new Path(basePath)),
partitionDataTypes,
DateTimeUtils.getTimeZone(timeZoneId)
)
.toSeq(partitionSchema)
}
def getConfigProperties(spark: SparkSession, options: Map[String, String]): TypedProperties = {
val sqlConf: SQLConf = spark.sessionState.conf
val properties = new TypedProperties()
// To support metadata listing via Spark SQL we allow users to pass the config via SQL Conf in spark session. Users
// would be able to run SET hoodie.metadata.enable=true in the spark sql session to enable metadata listing.
properties.setProperty(HoodieMetadataConfig.ENABLE.key(),
sqlConf.getConfString(HoodieMetadataConfig.ENABLE.key(),
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS.toString))
properties.putAll(options.asJava)
properties
}
def resolveFilterExpr(spark: SparkSession, exprString: String, tableSchema: StructType): Expression = {
val expr = spark.sessionState.sqlParser.parseExpression(exprString)
resolveFilterExpr(spark, expr, tableSchema)
}
def resolveFilterExpr(spark: SparkSession, expr: Expression, tableSchema: StructType): Expression = {
val schemaFields = tableSchema.fields
val resolvedExpr = spark.sessionState.analyzer.ResolveReferences(
Filter(expr, LocalRelation(schemaFields.head, schemaFields.drop(1): _*))
)
.asInstanceOf[Filter].condition
checkForUnresolvedRefs(resolvedExpr)
}
private def checkForUnresolvedRefs(resolvedExpr: Expression): Expression =
resolvedExpr match {
case UnresolvedAttribute(_) => throw new IllegalStateException("unresolved attribute")
case _ => resolvedExpr.mapChildren(e => checkForUnresolvedRefs(e))
}
}

View File

@@ -19,6 +19,7 @@ package org.apache.hudi
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hudi.HoodieFileIndex.getConfigProperties
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.StringUtils
@@ -36,10 +37,11 @@ import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{AnalysisException, Column, SparkSession}
import org.apache.spark.unsafe.types.UTF8String
import java.text.SimpleDateFormat
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal
import java.text.SimpleDateFormat
/**
* A file index which support partition prune for hoodie snapshot and read-optimized query.
@@ -73,7 +75,7 @@ case class HoodieFileIndex(spark: SparkSession,
spark = spark,
metaClient = metaClient,
schemaSpec = schemaSpec,
configProperties = HoodieCommonUtils.getConfigProperties(spark, options),
configProperties = getConfigProperties(spark, options),
queryPaths = Seq(HoodieFileIndex.getQueryPath(options)),
specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant),
fileStatusCache = fileStatusCache
@@ -147,8 +149,7 @@ case class HoodieFileIndex(spark: SparkSession,
Seq(PartitionDirectory(InternalRow.empty, candidateFiles))
} else {
// Prune the partition path by the partition filters
val prunedPartitions = HoodieCommonUtils.prunePartition(partitionSchema,
cachedAllInputFileSlices.keySet.asScala.toSeq, convertedPartitionFilters)
val prunedPartitions = prunePartition(cachedAllInputFileSlices.keySet.asScala.toSeq, convertedPartitionFilters)
var totalFileSize = 0
var candidateFileSize = 0

View File

@@ -18,18 +18,24 @@
package org.apache.hudi
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_INCREMENTAL_OPT_VAL, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, toJavaOption}
import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, generateFieldMap, toJavaOption}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.{InternalRow, expressions}
import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConverters._
import scala.language.implicitConversions
@@ -78,8 +84,32 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
/**
* Get the partition schema from the hoodie.properties.
*/
private lazy val _partitionSchemaFromProperties: StructType =
HoodieCommonUtils.getPartitionSchemaFromProperty(metaClient, Some(schema))
private lazy val _partitionSchemaFromProperties: StructType = {
val tableConfig = metaClient.getTableConfig
val partitionColumns = tableConfig.getPartitionFields
val nameFieldMap = generateFieldMap(schema)
if (partitionColumns.isPresent) {
// Note that key generator class name could be null
val keyGeneratorClassName = tableConfig.getKeyGeneratorClassName
if (classOf[TimestampBasedKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)
|| classOf[TimestampBasedAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)) {
val partitionFields = partitionColumns.get().map(column => StructField(column, StringType))
StructType(partitionFields)
} else {
val partitionFields = partitionColumns.get().map(column =>
nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column: '" +
s"$column' in the schema[${schema.fields.mkString(",")}]")))
StructType(partitionFields)
}
} else {
// If the partition columns have not stored in hoodie.properties(the table that was
// created earlier), we trait it as a non-partitioned table.
logWarning("No partition columns available from hoodie.properties." +
" Partition pruning will not work")
new StructType()
}
}
/**
* Get the data schema of the table.
@@ -110,16 +140,121 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
*/
def listFileSlices(partitionFilters: Seq[Expression]): Map[String, Seq[FileSlice]] = {
// Prune the partition path by the partition filters
val prunedPartitions = HoodieCommonUtils.prunePartition(partitionSchema,
cachedAllInputFileSlices.asScala.keys.toSeq, partitionFilters)
val prunedPartitions = prunePartition(cachedAllInputFileSlices.asScala.keys.toSeq, partitionFilters)
prunedPartitions.map(partition => {
(partition.path, cachedAllInputFileSlices.get(partition).asScala)
}).toMap
}
/**
* Get all the cached partition paths pruned by the filter.
*
* @param predicates The filter condition
* @return The pruned partition paths
*/
def getPartitionPaths(predicates: Seq[Expression]): Seq[PartitionPath] = {
prunePartition(cachedAllInputFileSlices.keySet().asScala.toSeq, predicates)
}
/**
* Prune the partition by the filter.This implementation is fork from
* org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex#prunePartitions.
*
* @param partitionPaths All the partition paths.
* @param predicates The filter condition.
* @return The pruned partition paths.
*/
protected def prunePartition(partitionPaths: Seq[PartitionPath], predicates: Seq[Expression]): Seq[PartitionPath] = {
val partitionColumnNames = partitionSchema.fields.map(_.name).toSet
val partitionPruningPredicates = predicates.filter {
_.references.map(_.name).toSet.subsetOf(partitionColumnNames)
}
if (partitionPruningPredicates.nonEmpty) {
val predicate = partitionPruningPredicates.reduce(expressions.And)
val boundPredicate = InterpretedPredicate(predicate.transform {
case a: AttributeReference =>
val index = partitionSchema.indexWhere(a.name == _.name)
BoundReference(index, partitionSchema(index).dataType, nullable = true)
})
val prunedPartitionPaths = partitionPaths.filter {
partitionPath => boundPredicate.eval(InternalRow.fromSeq(partitionPath.values))
}
logInfo(s"Total partition size is: ${partitionPaths.size}," +
s" after partition prune size is: ${prunedPartitionPaths.size}")
prunedPartitionPaths
} else {
partitionPaths
}
}
protected def parsePartitionColumnValues(partitionColumns: Array[String], partitionPath: String): Array[Object] = {
HoodieCommonUtils.parsePartitionColumnValues(sparkParsePartitionUtil, configProperties,
basePath, partitionSchema, partitionColumns, partitionPath)
if (partitionColumns.length == 0) {
// This is a non-partitioned table
Array.empty
} else {
val partitionFragments = partitionPath.split("/")
if (partitionFragments.length != partitionColumns.length &&
partitionColumns.length == 1) {
// If the partition column size is not equal to the partition fragment size
// and the partition column size is 1, we map the whole partition path
// to the partition column which can benefit from the partition prune.
val prefix = s"${partitionColumns.head}="
val partitionValue = if (partitionPath.startsWith(prefix)) {
// support hive style partition path
partitionPath.substring(prefix.length)
} else {
partitionPath
}
Array(UTF8String.fromString(partitionValue))
} else if (partitionFragments.length != partitionColumns.length &&
partitionColumns.length > 1) {
// If the partition column size is not equal to the partition fragments size
// and the partition column size > 1, we do not know how to map the partition
// fragments to the partition columns. So we trait it as a Non-Partitioned Table
// for the query which do not benefit from the partition prune.
logWarning(s"Cannot do the partition prune for table $basePath." +
s"The partitionFragments size (${partitionFragments.mkString(",")})" +
s" is not equal to the partition columns size(${partitionColumns.mkString(",")})")
Array.empty
} else {
// If partitionSeqs.length == partitionSchema.fields.length
// Append partition name to the partition value if the
// HIVE_STYLE_PARTITIONING is disable.
// e.g. convert "/xx/xx/2021/02" to "/xx/xx/year=2021/month=02"
val partitionWithName =
partitionFragments.zip(partitionColumns).map {
case (partition, columnName) =>
if (partition.indexOf("=") == -1) {
s"${columnName}=$partition"
} else {
partition
}
}.mkString("/")
val pathWithPartitionName = new Path(basePath, partitionWithName)
val partitionValues = parsePartitionPath(pathWithPartitionName, partitionSchema)
partitionValues.map(_.asInstanceOf[Object]).toArray
}
}
}
private def parsePartitionPath(partitionPath: Path, partitionSchema: StructType): Seq[Any] = {
val timeZoneId = configProperties.getString(DateTimeUtils.TIMEZONE_OPTION, SQLConf.get.sessionLocalTimeZone)
val partitionDataTypes = partitionSchema.map(f => f.name -> f.dataType).toMap
sparkParsePartitionUtil.parsePartition(
partitionPath,
typeInference = false,
Set(new Path(basePath)),
partitionDataTypes,
DateTimeUtils.getTimeZone(timeZoneId)
)
.toSeq(partitionSchema)
}
}
@@ -132,6 +267,45 @@ object SparkHoodieTableFileIndex {
org.apache.hudi.common.util.Option.empty()
}
/**
* This method unravels [[StructType]] into a [[Map]] of pairs of dot-path notation with corresponding
* [[StructField]] object for every field of the provided [[StructType]], recursively.
*
* For example, following struct
* <pre>
* StructType(
* StructField("a",
* StructType(
* StructField("b", StringType),
* StructField("c", IntType)
* )
* )
* )
* </pre>
*
* will be converted into following mapping:
*
* <pre>
* "a.b" -> StructField("b", StringType),
* "a.c" -> StructField("c", IntType),
* </pre>
*/
private def generateFieldMap(structType: StructType) : Map[String, StructField] = {
def traverse(structField: Either[StructField, StructType]) : Map[String, StructField] = {
structField match {
case Right(struct) => struct.fields.flatMap(f => traverse(Left(f))).toMap
case Left(field) => field.dataType match {
case struct: StructType => traverse(Right(struct)).map {
case (key, structField) => (s"${field.name}.$key", structField)
}
case _ => Map(field.name -> field)
}
}
}
traverse(Right(structType))
}
private def deduceQueryType(configProperties: TypedProperties): HoodieTableQueryType = {
configProperties.asScala(QUERY_TYPE.key()) match {
case QUERY_TYPE_SNAPSHOT_OPT_VAL => HoodieTableQueryType.SNAPSHOT

View File

@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation}
import org.apache.spark.sql.types.StructType
object HoodieCatalystExpressionUtils {
/**
* Resolve filter expression from string expr with given table schema, for example:
* <pre>
* ts > 1000 and ts <= 1500
* </pre>
* will be resolved as
* <pre>
* And(GreaterThan(ts#590L > 1000), LessThanOrEqual(ts#590L <= 1500))
* </pre>
*
* @param spark The spark session
* @param exprString String to be resolved
* @param tableSchema The table schema
* @return Resolved filter expression
*/
def resolveFilterExpr(spark: SparkSession, exprString: String, tableSchema: StructType): Expression = {
val expr = spark.sessionState.sqlParser.parseExpression(exprString)
resolveFilterExpr(spark, expr, tableSchema)
}
def resolveFilterExpr(spark: SparkSession, expr: Expression, tableSchema: StructType): Expression = {
val schemaFields = tableSchema.fields
val resolvedExpr = spark.sessionState.analyzer.ResolveReferences(
Filter(expr,
LocalRelation(schemaFields.head, schemaFields.drop(1): _*))
)
.asInstanceOf[Filter].condition
checkForUnresolvedRefs(resolvedExpr)
}
private def checkForUnresolvedRefs(resolvedExpr: Expression): Expression =
resolvedExpr match {
case UnresolvedAttribute(_) => throw new IllegalStateException("unresolved attribute")
case _ => resolvedExpr.mapChildren(e => checkForUnresolvedRefs(e))
}
/**
* Split the given predicates into two sequence predicates:
* - predicates that references partition columns only(and involves no sub-query);
* - other predicates.
*
* @param sparkSession The spark session
* @param predicates The predicates to be split
* @param partitionColumns The partition columns
* @return (partitionFilters, dataFilters)
*/
def splitPartitionAndDataPredicates(sparkSession: SparkSession,
predicates: Array[Expression],
partitionColumns: Array[String]): (Array[Expression], Array[Expression]) = {
// Validates that the provided names both resolve to the same entity
val resolvedNameEquals = sparkSession.sessionState.analyzer.resolver
predicates.partition(expr => {
// Checks whether given expression only references partition columns(and involves no sub-query)
expr.references.forall(r => partitionColumns.exists(resolvedNameEquals(r.name, _))) &&
!SubqueryExpression.hasSubquery(expr)
})
}
}

View File

@@ -17,7 +17,7 @@
package org.apache.spark.sql.hudi.command
import org.apache.hudi.HoodieCommonUtils
import org.apache.hudi.HoodieCLIUtils
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieTableType}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
@@ -42,7 +42,7 @@ case class CompactionHoodiePathCommand(path: String,
assert(metaClient.getTableType == HoodieTableType.MERGE_ON_READ,
s"Must compaction on a Merge On Read table.")
val client = HoodieCommonUtils.createHoodieClientFromPath(sparkSession, path, Map.empty)
val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, path, Map.empty)
operation match {
case SCHEDULE =>

View File

@@ -17,29 +17,24 @@
package org.apache.spark.sql.hudi.command.procedures
import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.ValidationUtils.checkArgument
import org.apache.hudi.common.util.{ClusteringUtils, Option => HOption}
import org.apache.hudi.config.HoodieClusteringConfig
import org.apache.hudi.exception.HoodieClusteringException
import org.apache.hudi.metadata.HoodieTableMetadata
import org.apache.hudi.{HoodieCommonUtils, SparkAdapterSupport}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.hudi.{AvroConversionUtils, HoodieCLIUtils, HoodieFileIndex}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, Row}
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.execution.datasources.FileStatusCache
import org.apache.spark.sql.types._
import java.util.Properties
import java.util.function.Supplier
import scala.collection.JavaConverters
import scala.collection.JavaConverters._
class RunClusteringProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport with Logging {
class RunClusteringProcedure extends BaseProcedure with ProcedureBuilder with PredicateHelper with Logging {
/**
* OPTIMIZE table_name|table_path [WHERE predicate]
* [ORDER BY (col_name1 [, ...] ) ]
@@ -74,15 +69,12 @@ class RunClusteringProcedure extends BaseProcedure with ProcedureBuilder with Sp
var conf: Map[String, String] = Map.empty
predicate match {
case Some(p) =>
val partitionColumnsSchema = HoodieCommonUtils.getPartitionSchemaFromProperty(metaClient, None)
val partitionPredicate = HoodieCommonUtils.resolveFilterExpr(
spark, p.asInstanceOf[String], partitionColumnsSchema)
val partitionSelected = prunePartition(metaClient, partitionPredicate)
val prunedPartitions = prunePartition(metaClient, p.asInstanceOf[String])
conf = conf ++ Map(
HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key() -> "SELECTED_PARTITIONS",
HoodieClusteringConfig.PARTITION_SELECTED.key() -> partitionSelected
HoodieClusteringConfig.PARTITION_SELECTED.key() -> prunedPartitions
)
logInfo(s"Partition predicates: ${p}, partition selected: ${partitionSelected}")
logInfo(s"Partition predicates: ${p}, partition selected: ${prunedPartitions}")
case _ =>
logInfo("No partition predicates")
}
@@ -104,7 +96,7 @@ class RunClusteringProcedure extends BaseProcedure with ProcedureBuilder with Sp
.iterator().asScala.map(_.getLeft.getTimestamp).toSeq.sortBy(f => f)
logInfo(s"Pending clustering instants: ${pendingClustering.mkString(",")}")
val client = HoodieCommonUtils.createHoodieClientFromPath(sparkSession, basePath, conf)
val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, conf)
val instantTime = HoodieActiveTimeline.createNewInstantTime
if (client.scheduleClusteringAtInstant(instantTime, HOption.empty())) {
pendingClustering ++= Seq(instantTime)
@@ -120,37 +112,26 @@ class RunClusteringProcedure extends BaseProcedure with ProcedureBuilder with Sp
override def build: Procedure = new RunClusteringProcedure()
def prunePartition(metaClient: HoodieTableMetaClient, partitionPredicate: Expression): String = {
val partitionSchema = HoodieCommonUtils.getPartitionSchemaFromProperty(metaClient, None)
// Get tableName meta data
val engineContext = new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext))
val properties = new Properties()
properties.putAll(JavaConverters.mapAsJavaMapConverter(sparkSession.sessionState.conf.getAllConfs).asJava)
val metadataConfig = HoodieMetadataConfig.newBuilder().fromProperties(properties).build()
val tableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, metaClient.getBasePath,
FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue)
val sparkParsePartitionUtil = sparkAdapter.createSparkParsePartitionUtil(sparkSession.sessionState.conf)
val typedProperties = HoodieCommonUtils.getConfigProperties(sparkSession, Map.empty)
def prunePartition(metaClient: HoodieTableMetaClient, predicate: String): String = {
val options = Map(QUERY_TYPE.key() -> QUERY_TYPE_SNAPSHOT_OPT_VAL, "path" -> metaClient.getBasePath)
val hoodieFileIndex = HoodieFileIndex(sparkSession, metaClient, None, options,
FileStatusCache.getOrCreate(sparkSession))
// Resolve partition predicates
val schemaResolver = new TableSchemaResolver(metaClient)
val tableSchema = AvroConversionUtils.convertAvroSchemaToStructType(schemaResolver.getTableAvroSchema)
val condition = HoodieCatalystExpressionUtils.resolveFilterExpr(sparkSession, predicate, tableSchema)
val partitionColumns = metaClient.getTableConfig.getPartitionFields.orElse(Array[String]())
val (partitionPredicates, dataPredicates) = HoodieCatalystExpressionUtils.splitPartitionAndDataPredicates(
sparkSession, splitConjunctivePredicates(condition).toArray, partitionColumns)
checkArgument(dataPredicates.isEmpty, "Only partition predicates are allowed")
// Translate all partition path to {@code org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath}
val partitionPaths = tableMetadata.getAllPartitionPaths.asScala.map(partitionPath => {
val partitionColumnValues = HoodieCommonUtils.parsePartitionColumnValues(
sparkParsePartitionUtil, typedProperties, metaClient.getBasePath,
partitionSchema, partitionColumns, partitionPath)
new PartitionPath(partitionPath, partitionColumnValues)
})
// Filter partition by predicates
val selectedPartitions = HoodieCommonUtils.prunePartition(
partitionSchema, partitionPaths, partitionPredicate)
selectedPartitions.map(partitionPath => partitionPath.getPath).toSet.mkString(",")
// Get all partitions and prune partition by predicates
val prunedPartitions = hoodieFileIndex.getPartitionPaths(partitionPredicates)
prunedPartitions.map(partitionPath => partitionPath.getPath).toSet.mkString(",")
}
def validateOrderColumns(orderColumns: String, metaClient: HoodieTableMetaClient): Unit = {
private def validateOrderColumns(orderColumns: String, metaClient: HoodieTableMetaClient): Unit = {
if (orderColumns == null) {
throw new HoodieClusteringException("Order columns is null")
}

View File

@@ -22,8 +22,8 @@ import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.spark.sql.catalyst.expressions.{Expression, Not}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.hudi.DataSkippingUtils
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType, VarcharType}
import org.apache.spark.sql.{Column, HoodieCatalystExpressionUtils, SparkSession}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.params.ParameterizedTest
@@ -73,8 +73,7 @@ class TestDataSkippingUtils extends HoodieClientTestBase {
@ParameterizedTest
@MethodSource(Array("testBaseLookupFilterExpressionsSource", "testAdvancedLookupFilterExpressionsSource"))
def testLookupFilterExpressions(sourceExpr: String, input: Seq[IndexRow], output: Seq[String]): Unit = {
val resolvedExpr: Expression = HoodieCommonUtils.resolveFilterExpr(spark, sourceExpr, sourceTableSchema)
val resolvedExpr: Expression = HoodieCatalystExpressionUtils.resolveFilterExpr(spark, sourceExpr, sourceTableSchema)
val lookupFilter = DataSkippingUtils.createColumnStatsIndexFilterExpr(resolvedExpr, indexSchema)
val spark2 = spark
@@ -94,7 +93,7 @@ class TestDataSkippingUtils extends HoodieClientTestBase {
@ParameterizedTest
@MethodSource(Array("testStringsLookupFilterExpressionsSource"))
def testStringsLookupFilterExpressions(sourceExpr: Expression, input: Seq[IndexRow], output: Seq[String]): Unit = {
val resolvedExpr = HoodieCommonUtils.resolveFilterExpr(spark, sourceExpr, sourceTableSchema)
val resolvedExpr = HoodieCatalystExpressionUtils.resolveFilterExpr(spark, sourceExpr, sourceTableSchema)
val lookupFilter = DataSkippingUtils.createColumnStatsIndexFilterExpr(resolvedExpr, indexSchema)
val spark2 = spark

View File

@@ -17,13 +17,6 @@
package org.apache.spark.sql.hudi
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.{HoodieCommonUtils, HoodieDataSourceHelpers}
import scala.collection.JavaConverters.asScalaIteratorConverter
class TestCallProcedure extends TestHoodieSqlBase {
test("Test Call show_commits Procedure") {
@@ -136,222 +129,4 @@ class TestCallProcedure extends TestHoodieSqlBase {
assertResult(1){commits.length}
}
}
test("Test Call run_clustering Procedure By Table") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}/$tableName"
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| options (
| primaryKey ='id',
| type = '$tableType',
| preCombineField = 'ts'
| )
| partitioned by(ts)
| location '$basePath'
""".stripMargin)
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
val client = HoodieCommonUtils.createHoodieClientFromPath(spark, basePath, Map.empty)
// Generate the first clustering plan
val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime
client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty())
// Generate the second clustering plan
spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)")
val secondScheduleInstant = HoodieActiveTimeline.createNewInstantTime
client.scheduleClusteringAtInstant(secondScheduleInstant, HOption.empty())
checkAnswer(s"call show_clustering('$tableName')")(
Seq(firstScheduleInstant, 3),
Seq(secondScheduleInstant, 1)
)
// Do clustering for all clustering plan generated above, and no new clustering
// instant will be generated because of there is no commit after the second
// clustering plan generated
spark.sql(s"call run_clustering(table => '$tableName', order => 'ts')")
// No new commits
val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf())
assertResult(false)(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, secondScheduleInstant))
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 10.0, 1000),
Seq(2, "a2", 10.0, 1001),
Seq(3, "a3", 10.0, 1002),
Seq(4, "a4", 10.0, 1003)
)
// After clustering there should be no pending clustering.
checkAnswer(s"call show_clustering(table => '$tableName')")()
// Check the number of finished clustering instants
val finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
.getInstants
.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION)
.toSeq
assertResult(2)(finishedClustering.size)
// Do clustering without manual schedule(which will do the schedule if no pending clustering exists)
spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)")
spark.sql(s"insert into $tableName values(6, 'a6', 10, 1005)")
spark.sql(s"call run_clustering(table => '$tableName', order => 'ts')")
val thirdClusteringInstant = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
.findInstantsAfter(secondScheduleInstant)
.getInstants
.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION)
.toSeq
// Should have a new replace commit after the second clustering command.
assertResult(1)(thirdClusteringInstant.size)
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 10.0, 1000),
Seq(2, "a2", 10.0, 1001),
Seq(3, "a3", 10.0, 1002),
Seq(4, "a4", 10.0, 1003),
Seq(5, "a5", 10.0, 1004),
Seq(6, "a6", 10.0, 1005)
)
}
}
}
test("Test Call run_clustering Procedure By Path") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}/$tableName"
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| options (
| primaryKey ='id',
| type = '$tableType',
| preCombineField = 'ts'
| )
| partitioned by(ts)
| location '$basePath'
""".stripMargin)
spark.sql(s"call run_clustering(path => '$basePath')")
checkAnswer(s"call show_clustering(path => '$basePath')")()
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
val client = HoodieCommonUtils.createHoodieClientFromPath(spark, basePath, Map.empty)
// Generate the first clustering plan
val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime
client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty())
checkAnswer(s"call show_clustering(path => '$basePath')")(
Seq(firstScheduleInstant, 3)
)
// Do clustering for all the clustering plan
spark.sql(s"call run_clustering(path => '$basePath', order => 'ts')")
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 10.0, 1000),
Seq(2, "a2", 10.0, 1001),
Seq(3, "a3", 10.0, 1002)
)
val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf())
HoodieDataSourceHelpers.hasNewCommits(fs, basePath, firstScheduleInstant)
// Check the number of finished clustering instants
var finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
.getInstants
.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION)
.toSeq
assertResult(1)(finishedClustering.size)
// Do clustering without manual schedule(which will do the schedule if no pending clustering exists)
spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)")
spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)")
spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts >= 1003L')")
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 10.0, 1000),
Seq(2, "a2", 10.0, 1001),
Seq(3, "a3", 10.0, 1002),
Seq(4, "a4", 10.0, 1003),
Seq(5, "a5", 10.0, 1004)
)
finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
.getInstants
.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION)
.toSeq
assertResult(2)(finishedClustering.size)
}
}
}
test("Test Call run_clustering Procedure With Partition Pruning") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}/$tableName"
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| options (
| primaryKey ='id',
| type = '$tableType',
| preCombineField = 'ts'
| )
| partitioned by(ts)
| location '$basePath'
""".stripMargin)
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
// Do clustering table with partition predicate
spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts <= 1001L', order => 'ts')")
// Check the num of completed clustering instant
val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf())
val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
.getInstants
.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION)
.toSeq
assertResult(1)(clusteringInstants.size)
val clusteringInstant = clusteringInstants.head
val clusteringPlan = HoodieDataSourceHelpers.getClusteringPlan(fs, basePath, clusteringInstant.getTimestamp)
assertResult(true)(clusteringPlan.isPresent)
assertResult(2)(clusteringPlan.get().getInputGroups.size())
checkAnswer(s"call show_clustering(table => '$tableName')")()
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 10.0, 1000),
Seq(2, "a2", 10.0, 1001),
Seq(3, "a3", 10.0, 1002)
)
}
}
}
}

View File

@@ -0,0 +1,344 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.spark.sql.hudi
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.{HoodieCLIUtils, HoodieDataSourceHelpers}
import scala.collection.JavaConverters.asScalaIteratorConverter
class TestRunClusteringProcedure extends TestHoodieSqlBase {
test("Test Call run_clustering Procedure By Table") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}/$tableName"
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| options (
| primaryKey ='id',
| type = '$tableType',
| preCombineField = 'ts'
| )
| partitioned by(ts)
| location '$basePath'
""".stripMargin)
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty)
// Generate the first clustering plan
val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime
client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty())
// Generate the second clustering plan
spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)")
val secondScheduleInstant = HoodieActiveTimeline.createNewInstantTime
client.scheduleClusteringAtInstant(secondScheduleInstant, HOption.empty())
checkAnswer(s"call show_clustering('$tableName')")(
Seq(firstScheduleInstant, 3),
Seq(secondScheduleInstant, 1)
)
// Do clustering for all clustering plan generated above, and no new clustering
// instant will be generated because of there is no commit after the second
// clustering plan generated
spark.sql(s"call run_clustering(table => '$tableName', order => 'ts')")
// No new commits
val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf())
assertResult(false)(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, secondScheduleInstant))
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 10.0, 1000),
Seq(2, "a2", 10.0, 1001),
Seq(3, "a3", 10.0, 1002),
Seq(4, "a4", 10.0, 1003)
)
// After clustering there should be no pending clustering.
checkAnswer(s"call show_clustering(table => '$tableName')")()
// Check the number of finished clustering instants
val finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
.getInstants
.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION)
.toSeq
assertResult(2)(finishedClustering.size)
// Do clustering without manual schedule(which will do the schedule if no pending clustering exists)
spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)")
spark.sql(s"insert into $tableName values(6, 'a6', 10, 1005)")
spark.sql(s"call run_clustering(table => '$tableName', order => 'ts')")
val thirdClusteringInstant = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
.findInstantsAfter(secondScheduleInstant)
.getInstants
.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION)
.toSeq
// Should have a new replace commit after the second clustering command.
assertResult(1)(thirdClusteringInstant.size)
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 10.0, 1000),
Seq(2, "a2", 10.0, 1001),
Seq(3, "a3", 10.0, 1002),
Seq(4, "a4", 10.0, 1003),
Seq(5, "a5", 10.0, 1004),
Seq(6, "a6", 10.0, 1005)
)
}
}
}
test("Test Call run_clustering Procedure By Path") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}/$tableName"
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| options (
| primaryKey ='id',
| type = '$tableType',
| preCombineField = 'ts'
| )
| partitioned by(ts)
| location '$basePath'
""".stripMargin)
spark.sql(s"call run_clustering(path => '$basePath')")
checkAnswer(s"call show_clustering(path => '$basePath')")()
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty)
// Generate the first clustering plan
val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime
client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty())
checkAnswer(s"call show_clustering(path => '$basePath')")(
Seq(firstScheduleInstant, 3)
)
// Do clustering for all the clustering plan
spark.sql(s"call run_clustering(path => '$basePath', order => 'ts')")
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 10.0, 1000),
Seq(2, "a2", 10.0, 1001),
Seq(3, "a3", 10.0, 1002)
)
val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf())
HoodieDataSourceHelpers.hasNewCommits(fs, basePath, firstScheduleInstant)
// Check the number of finished clustering instants
var finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
.getInstants
.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION)
.toSeq
assertResult(1)(finishedClustering.size)
// Do clustering without manual schedule(which will do the schedule if no pending clustering exists)
spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)")
spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)")
spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts >= 1003L')")
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 10.0, 1000),
Seq(2, "a2", 10.0, 1001),
Seq(3, "a3", 10.0, 1002),
Seq(4, "a4", 10.0, 1003),
Seq(5, "a5", 10.0, 1004)
)
finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
.getInstants
.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION)
.toSeq
assertResult(2)(finishedClustering.size)
}
}
}
test("Test Call run_clustering Procedure With Partition Pruning") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}/$tableName"
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| options (
| primaryKey ='id',
| type = '$tableType',
| preCombineField = 'ts'
| )
| partitioned by(ts)
| location '$basePath'
""".stripMargin)
val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf())
// Test partition pruning with single predicate
{
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
checkException(
s"call run_clustering(table => '$tableName', predicate => 'ts <= 1001L and id = 10', order => 'ts')"
)("Only partition predicates are allowed")
// Do clustering table with partition predicate
spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts <= 1001L', order => 'ts')")
// There is 1 completed clustering instant
val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
.getInstants
.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION)
.toSeq
assertResult(1)(clusteringInstants.size)
// The latest clustering should contain 2 file groups
val clusteringInstant = clusteringInstants.last
val clusteringPlan = HoodieDataSourceHelpers.getClusteringPlan(fs, basePath, clusteringInstant.getTimestamp)
assertResult(true)(clusteringPlan.isPresent)
assertResult(2)(clusteringPlan.get().getInputGroups.size())
// No pending clustering instant
checkAnswer(s"call show_clustering(table => '$tableName')")()
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 10.0, 1000),
Seq(2, "a2", 10.0, 1001),
Seq(3, "a3", 10.0, 1002)
)
}
// Test partition pruning with {@code And} predicates
{
spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)")
spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)")
spark.sql(s"insert into $tableName values(6, 'a6', 10, 1005)")
checkException(
s"call run_clustering(table => '$tableName', predicate => 'ts > 1001L and ts <= 1005L and id = 10', order => 'ts')"
)("Only partition predicates are allowed")
// Do clustering table with partition predicate
spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts > 1001L and ts <= 1005L', order => 'ts')")
// There are 2 completed clustering instants
val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
.getInstants
.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION)
.toSeq
assertResult(2)(clusteringInstants.size)
// The latest clustering should contain 4 file groups(1002,1003,1004,1005)
val clusteringInstant = clusteringInstants.last
val clusteringPlan = HoodieDataSourceHelpers.getClusteringPlan(fs, basePath, clusteringInstant.getTimestamp)
assertResult(true)(clusteringPlan.isPresent)
assertResult(4)(clusteringPlan.get().getInputGroups.size())
// No pending clustering instant
checkAnswer(s"call show_clustering(table => '$tableName')")()
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 10.0, 1000),
Seq(2, "a2", 10.0, 1001),
Seq(3, "a3", 10.0, 1002),
Seq(4, "a4", 10.0, 1003),
Seq(5, "a5", 10.0, 1004),
Seq(6, "a6", 10.0, 1005)
)
}
// Test partition pruning with {@code And}-{@code Or} predicates
{
spark.sql(s"insert into $tableName values(7, 'a7', 10, 1006)")
spark.sql(s"insert into $tableName values(8, 'a8', 10, 1007)")
spark.sql(s"insert into $tableName values(9, 'a9', 10, 1008)")
spark.sql(s"insert into $tableName values(10, 'a10', 10, 1009)")
checkException(
s"call run_clustering(table => '$tableName', predicate => 'ts < 1007L or ts >= 1008L or id = 10', order => 'ts')"
)("Only partition predicates are allowed")
// Do clustering table with partition predicate
spark.sql(s"call run_clustering(table => '$tableName', predicate => '(ts >= 1006L and ts < 1008L) or ts >= 1009L', order => 'ts')")
// There are 3 completed clustering instants
val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
.getInstants
.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION)
.toSeq
assertResult(3)(clusteringInstants.size)
// The latest clustering should contain 3 file groups(1006,1007,1009)
val clusteringInstant = clusteringInstants.last
val clusteringPlan = HoodieDataSourceHelpers.getClusteringPlan(fs, basePath, clusteringInstant.getTimestamp)
assertResult(true)(clusteringPlan.isPresent)
assertResult(3)(clusteringPlan.get().getInputGroups.size())
// No pending clustering instant
checkAnswer(s"call show_clustering(table => '$tableName')")()
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 10.0, 1000),
Seq(2, "a2", 10.0, 1001),
Seq(3, "a3", 10.0, 1002),
Seq(4, "a4", 10.0, 1003),
Seq(5, "a5", 10.0, 1004),
Seq(6, "a6", 10.0, 1005),
Seq(7, "a7", 10.0, 1006),
Seq(8, "a8", 10.0, 1007),
Seq(9, "a9", 10.0, 1008),
Seq(10, "a10", 10.0, 1009)
)
}
}
}
}
}