1
0

[HUDI-4116] Unify clustering/compaction related procedures' output type (#5620)

* Unify clustering/compaction related procedures' output type

* Address review comments
This commit is contained in:
huberylee
2022-05-19 09:48:03 +08:00
committed by GitHub
parent 551aa959c5
commit 6573469e73
11 changed files with 247 additions and 113 deletions

View File

@@ -19,14 +19,14 @@
package org.apache.hudi package org.apache.hudi
import org.apache.hudi.avro.model.HoodieClusteringGroup
import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.withSparkConf import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.withSparkConf
import scala.collection.JavaConverters.mapAsJavaMapConverter import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter}
import scala.collection.immutable.Map
object HoodieCLIUtils { object HoodieCLIUtils {
@@ -46,4 +46,15 @@ object HoodieCLIUtils {
DataSourceUtils.createHoodieClient(jsc, schemaStr, basePath, DataSourceUtils.createHoodieClient(jsc, schemaStr, basePath,
metaClient.getTableConfig.getTableName, finalParameters.asJava) metaClient.getTableConfig.getTableName, finalParameters.asJava)
} }
def extractPartitions(clusteringGroups: Seq[HoodieClusteringGroup]): String = {
var partitionPaths: Seq[String] = Seq.empty
clusteringGroups.foreach(g =>
g.getSlices.asScala.foreach(slice =>
partitionPaths = partitionPaths :+ slice.getPartitionPath
)
)
partitionPaths.sorted.mkString(",")
}
} }

View File

@@ -19,11 +19,9 @@ package org.apache.spark.sql.hudi.command
import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.{CompactionOperation, RUN, SCHEDULE} import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.{CompactionOperation, RUN, SCHEDULE}
import org.apache.spark.sql.hudi.command.procedures.{HoodieProcedureUtils, RunCompactionProcedure} import org.apache.spark.sql.hudi.command.procedures.{HoodieProcedureUtils, RunCompactionProcedure}
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.unsafe.types.UTF8String
@@ -50,10 +48,5 @@ case class CompactionHoodiePathCommand(path: String,
RunCompactionProcedure.builder.get().build.call(procedureArgs) RunCompactionProcedure.builder.get().build.call(procedureArgs)
} }
override val output: Seq[Attribute] = { override val output: Seq[Attribute] = RunCompactionProcedure.builder.get().build.outputType.toAttributes
operation match {
case RUN => Seq.empty
case SCHEDULE => Seq(AttributeReference("instant", StringType, nullable = false)())
}
}
} }

View File

@@ -18,10 +18,10 @@
package org.apache.spark.sql.hudi.command package org.apache.spark.sql.hudi.command
import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.{CompactionOperation, RUN, SCHEDULE} import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.CompactionOperation
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.getTableLocation import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.getTableLocation
import org.apache.spark.sql.types.StringType import org.apache.spark.sql.hudi.command.procedures.RunCompactionProcedure
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.{Row, SparkSession}
@Deprecated @Deprecated
@@ -35,10 +35,5 @@ case class CompactionHoodieTableCommand(table: CatalogTable,
CompactionHoodiePathCommand(basePath, operation, instantTimestamp).run(sparkSession) CompactionHoodiePathCommand(basePath, operation, instantTimestamp).run(sparkSession)
} }
override val output: Seq[Attribute] = { override val output: Seq[Attribute] = RunCompactionProcedure.builder.get().build.outputType.toAttributes
operation match {
case RUN => Seq.empty
case SCHEDULE => Seq(AttributeReference("instant", StringType, nullable = false)())
}
}
} }

View File

@@ -19,10 +19,8 @@ package org.apache.spark.sql.hudi.command
import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.hudi.command.procedures.{HoodieProcedureUtils, ShowCompactionProcedure} import org.apache.spark.sql.hudi.command.procedures.{HoodieProcedureUtils, ShowCompactionProcedure}
import org.apache.spark.sql.types.{IntegerType, StringType}
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.unsafe.types.UTF8String
@@ -42,11 +40,5 @@ case class CompactionShowHoodiePathCommand(path: String, limit: Int)
ShowCompactionProcedure.builder.get().build.call(procedureArgs) ShowCompactionProcedure.builder.get().build.call(procedureArgs)
} }
override val output: Seq[Attribute] = { override val output: Seq[Attribute] = ShowCompactionProcedure.builder.get().build.outputType.toAttributes
Seq(
AttributeReference("instant", StringType, nullable = false)(),
AttributeReference("action", StringType, nullable = false)(),
AttributeReference("size", IntegerType, nullable = false)()
)
}
} }

View File

@@ -18,9 +18,9 @@
package org.apache.spark.sql.hudi.command package org.apache.spark.sql.hudi.command
import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.getTableLocation import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.getTableLocation
import org.apache.spark.sql.types.{IntegerType, StringType} import org.apache.spark.sql.hudi.command.procedures.ShowCompactionProcedure
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.{Row, SparkSession}
@Deprecated @Deprecated
@@ -32,11 +32,5 @@ case class CompactionShowHoodieTableCommand(table: CatalogTable, limit: Int)
CompactionShowHoodiePathCommand(basePath, limit).run(sparkSession) CompactionShowHoodiePathCommand(basePath, limit).run(sparkSession)
} }
override val output: Seq[Attribute] = { override val output: Seq[Attribute] = ShowCompactionProcedure.builder.get().build.outputType.toAttributes
Seq(
AttributeReference("timestamp", StringType, nullable = false)(),
AttributeReference("action", StringType, nullable = false)(),
AttributeReference("size", IntegerType, nullable = false)()
)
}
} }

View File

@@ -18,7 +18,7 @@
package org.apache.spark.sql.hudi.command.procedures package org.apache.spark.sql.hudi.command.procedures
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL} import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.ValidationUtils.checkArgument import org.apache.hudi.common.util.ValidationUtils.checkArgument
import org.apache.hudi.common.util.{ClusteringUtils, Option => HOption} import org.apache.hudi.common.util.{ClusteringUtils, Option => HOption}
@@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.datasources.FileStatusCache
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
import java.util.function.Supplier import java.util.function.Supplier
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
class RunClusteringProcedure extends BaseProcedure class RunClusteringProcedure extends BaseProcedure
@@ -50,13 +51,15 @@ class RunClusteringProcedure extends BaseProcedure
ProcedureParameter.optional(0, "table", DataTypes.StringType, None), ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "path", DataTypes.StringType, None), ProcedureParameter.optional(1, "path", DataTypes.StringType, None),
ProcedureParameter.optional(2, "predicate", DataTypes.StringType, None), ProcedureParameter.optional(2, "predicate", DataTypes.StringType, None),
ProcedureParameter.optional(3, "order", DataTypes.StringType, None) ProcedureParameter.optional(3, "order", DataTypes.StringType, None),
ProcedureParameter.optional(4, "show_involved_partition", DataTypes.BooleanType, false)
) )
private val OUTPUT_TYPE = new StructType(Array[StructField]( private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("timestamp", DataTypes.StringType, nullable = true, Metadata.empty), StructField("timestamp", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty), StructField("input_group_size", DataTypes.IntegerType, nullable = true, Metadata.empty),
StructField("groups", DataTypes.IntegerType, nullable = true, Metadata.empty) StructField("state", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("involved_partitions", DataTypes.StringType, nullable = true, Metadata.empty)
)) ))
def parameters: Array[ProcedureParameter] = PARAMETERS def parameters: Array[ProcedureParameter] = PARAMETERS
@@ -70,6 +73,7 @@ class RunClusteringProcedure extends BaseProcedure
val tablePath = getArgValueOrDefault(args, PARAMETERS(1)) val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
val predicate = getArgValueOrDefault(args, PARAMETERS(2)) val predicate = getArgValueOrDefault(args, PARAMETERS(2))
val orderColumns = getArgValueOrDefault(args, PARAMETERS(3)) val orderColumns = getArgValueOrDefault(args, PARAMETERS(3))
val showInvolvedPartitions = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[Boolean]
val basePath: String = getBasePath(tableName, tablePath) val basePath: String = getBasePath(tableName, tablePath)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
@@ -114,7 +118,27 @@ class RunClusteringProcedure extends BaseProcedure
pendingClustering.foreach(client.cluster(_, true)) pendingClustering.foreach(client.cluster(_, true))
logInfo(s"Finish clustering all the instants: ${pendingClustering.mkString(",")}," + logInfo(s"Finish clustering all the instants: ${pendingClustering.mkString(",")}," +
s" time cost: ${System.currentTimeMillis() - startTs}ms.") s" time cost: ${System.currentTimeMillis() - startTs}ms.")
Seq.empty[Row]
val clusteringInstants = metaClient.reloadActiveTimeline().getInstants.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION && pendingClustering.contains(p.getTimestamp))
.toSeq
.sortBy(f => f.getTimestamp)
.reverse
val clusteringPlans = clusteringInstants.map(instant =>
ClusteringUtils.getClusteringPlan(metaClient, instant)
)
if (showInvolvedPartitions) {
clusteringPlans.map { p =>
Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(),
p.get().getLeft.getState.name(), HoodieCLIUtils.extractPartitions(p.get().getRight.getInputGroups.asScala))
}
} else {
clusteringPlans.map { p =>
Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(), p.get().getLeft.getState.name(), "*")
}
}
} }
override def build: Procedure = new RunClusteringProcedure() override def build: Procedure = new RunClusteringProcedure()

View File

@@ -20,10 +20,9 @@ package org.apache.spark.sql.hudi.command.procedures
import org.apache.hudi.common.model.HoodieCommitMetadata import org.apache.hudi.common.model.HoodieCommitMetadata
import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline} import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
import org.apache.hudi.common.util.{HoodieTimer, Option => HOption} import org.apache.hudi.common.util.{CompactionUtils, HoodieTimer, Option => HOption}
import org.apache.hudi.exception.HoodieException import org.apache.hudi.exception.HoodieException
import org.apache.hudi.{HoodieCLIUtils, SparkAdapterSupport} import org.apache.hudi.{HoodieCLIUtils, SparkAdapterSupport}
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row import org.apache.spark.sql.Row
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
@@ -47,7 +46,9 @@ class RunCompactionProcedure extends BaseProcedure with ProcedureBuilder with Sp
) )
private val OUTPUT_TYPE = new StructType(Array[StructField]( private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("instant", DataTypes.StringType, nullable = true, Metadata.empty) StructField("timestamp", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("operation_size", DataTypes.IntegerType, nullable = true, Metadata.empty),
StructField("state", DataTypes.StringType, nullable = true, Metadata.empty)
)) ))
def parameters: Array[ProcedureParameter] = PARAMETERS def parameters: Array[ProcedureParameter] = PARAMETERS
@@ -66,13 +67,12 @@ class RunCompactionProcedure extends BaseProcedure with ProcedureBuilder with Sp
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, Map.empty) val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, Map.empty)
var willCompactionInstants: Seq[String] = Seq.empty
operation match { operation match {
case "schedule" => case "schedule" =>
val instantTime = instantTimestamp.map(_.toString).getOrElse(HoodieActiveTimeline.createNewInstantTime) val instantTime = instantTimestamp.map(_.toString).getOrElse(HoodieActiveTimeline.createNewInstantTime)
if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) { if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) {
Seq(Row(instantTime)) willCompactionInstants = Seq(instantTime)
} else {
Seq.empty[Row]
} }
case "run" => case "run" =>
// Do compaction // Do compaction
@@ -81,7 +81,7 @@ class RunCompactionProcedure extends BaseProcedure with ProcedureBuilder with Sp
.filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION) .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION)
.map(_.getTimestamp) .map(_.getTimestamp)
.toSeq.sortBy(f => f) .toSeq.sortBy(f => f)
val willCompactionInstants = if (instantTimestamp.isEmpty) { willCompactionInstants = if (instantTimestamp.isEmpty) {
if (pendingCompactionInstants.nonEmpty) { if (pendingCompactionInstants.nonEmpty) {
pendingCompactionInstants pendingCompactionInstants
} else { // If there are no pending compaction, schedule to generate one. } else { // If there are no pending compaction, schedule to generate one.
@@ -102,9 +102,9 @@ class RunCompactionProcedure extends BaseProcedure with ProcedureBuilder with Sp
s"$basePath, Available pending compaction instants are: ${pendingCompactionInstants.mkString(",")} ") s"$basePath, Available pending compaction instants are: ${pendingCompactionInstants.mkString(",")} ")
} }
} }
if (willCompactionInstants.isEmpty) { if (willCompactionInstants.isEmpty) {
logInfo(s"No need to compaction on $basePath") logInfo(s"No need to compaction on $basePath")
Seq.empty[Row]
} else { } else {
logInfo(s"Run compaction at instants: [${willCompactionInstants.mkString(",")}] on $basePath") logInfo(s"Run compaction at instants: [${willCompactionInstants.mkString(",")}] on $basePath")
val timer = new HoodieTimer val timer = new HoodieTimer
@@ -116,10 +116,21 @@ class RunCompactionProcedure extends BaseProcedure with ProcedureBuilder with Sp
} }
logInfo(s"Finish Run compaction at instants: [${willCompactionInstants.mkString(",")}]," + logInfo(s"Finish Run compaction at instants: [${willCompactionInstants.mkString(",")}]," +
s" spend: ${timer.endTimer()}ms") s" spend: ${timer.endTimer()}ms")
Seq.empty[Row]
} }
case _ => throw new UnsupportedOperationException(s"Unsupported compaction operation: $operation") case _ => throw new UnsupportedOperationException(s"Unsupported compaction operation: $operation")
} }
val compactionInstants = metaClient.reloadActiveTimeline().getInstants.iterator().asScala
.filter(instant => willCompactionInstants.contains(instant.getTimestamp))
.toSeq
.sortBy(p => p.getTimestamp)
.reverse
compactionInstants.map(instant =>
(instant, CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp))
).map { case (instant, plan) =>
Row(instant.getTimestamp, plan.getOperations.size(), instant.getState.name())
}
} }
private def handleResponse(metadata: HoodieCommitMetadata): Unit = { private def handleResponse(metadata: HoodieCommitMetadata): Unit = {

View File

@@ -17,26 +17,31 @@
package org.apache.spark.sql.hudi.command.procedures package org.apache.spark.sql.hudi.command.procedures
import org.apache.hudi.SparkAdapterSupport import org.apache.hudi.{HoodieCLIUtils, SparkAdapterSupport}
import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.common.util.ClusteringUtils import org.apache.hudi.common.util.ClusteringUtils
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row import org.apache.spark.sql.Row
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
import java.util.function.Supplier import java.util.function.Supplier
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
class ShowClusteringProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport with Logging { class ShowClusteringProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport with Logging {
private val PARAMETERS = Array[ProcedureParameter]( private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.optional(0, "table", DataTypes.StringType, None), ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "path", DataTypes.StringType, None), ProcedureParameter.optional(1, "path", DataTypes.StringType, None),
ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 20) ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 20),
ProcedureParameter.optional(3, "show_involved_partition", DataTypes.BooleanType, false)
) )
private val OUTPUT_TYPE = new StructType(Array[StructField]( private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("timestamp", DataTypes.StringType, nullable = true, Metadata.empty), StructField("timestamp", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("groups", DataTypes.IntegerType, nullable = true, Metadata.empty) StructField("input_group_size", DataTypes.IntegerType, nullable = true, Metadata.empty),
StructField("state", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("involved_partitions", DataTypes.StringType, nullable = true, Metadata.empty)
)) ))
def parameters: Array[ProcedureParameter] = PARAMETERS def parameters: Array[ProcedureParameter] = PARAMETERS
@@ -49,12 +54,32 @@ class ShowClusteringProcedure extends BaseProcedure with ProcedureBuilder with S
val tableName = getArgValueOrDefault(args, PARAMETERS(0)) val tableName = getArgValueOrDefault(args, PARAMETERS(0))
val tablePath = getArgValueOrDefault(args, PARAMETERS(1)) val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
val limit = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int] val limit = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int]
val showInvolvedPartitions = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[Boolean]
val basePath: String = getBasePath(tableName, tablePath) val basePath: String = getBasePath(tableName, tablePath)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
ClusteringUtils.getAllPendingClusteringPlans(metaClient).iterator().asScala.map { p => val clusteringInstants = metaClient.getActiveTimeline.getInstants.iterator().asScala
Row(p.getLeft.getTimestamp, p.getRight.getInputGroups.size()) .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION)
}.toSeq.take(limit) .toSeq
.sortBy(f => f.getTimestamp)
.reverse
.take(limit)
val clusteringPlans = clusteringInstants.map(instant =>
ClusteringUtils.getClusteringPlan(metaClient, instant)
)
if (showInvolvedPartitions) {
clusteringPlans.map { p =>
Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(),
p.get().getLeft.getState.name(), HoodieCLIUtils.extractPartitions(p.get().getRight.getInputGroups.asScala))
}
} else {
clusteringPlans.map { p =>
Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(),
p.get().getLeft.getState.name(), "*")
}
}
} }
override def build: Procedure = new ShowClusteringProcedure() override def build: Procedure = new ShowClusteringProcedure()

View File

@@ -44,8 +44,8 @@ class ShowCompactionProcedure extends BaseProcedure with ProcedureBuilder with S
private val OUTPUT_TYPE = new StructType(Array[StructField]( private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("timestamp", DataTypes.StringType, nullable = true, Metadata.empty), StructField("timestamp", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), StructField("operation_size", DataTypes.IntegerType, nullable = true, Metadata.empty),
StructField("size", DataTypes.IntegerType, nullable = true, Metadata.empty) StructField("state", DataTypes.StringType, nullable = true, Metadata.empty)
)) ))
def parameters: Array[ProcedureParameter] = PARAMETERS def parameters: Array[ProcedureParameter] = PARAMETERS
@@ -64,17 +64,17 @@ class ShowCompactionProcedure extends BaseProcedure with ProcedureBuilder with S
assert(metaClient.getTableType == HoodieTableType.MERGE_ON_READ, assert(metaClient.getTableType == HoodieTableType.MERGE_ON_READ,
s"Cannot show compaction on a Non Merge On Read table.") s"Cannot show compaction on a Non Merge On Read table.")
val timeLine = metaClient.getActiveTimeline val compactionInstants = metaClient.getActiveTimeline.getInstants.iterator().asScala
val compactionInstants = timeLine.getInstants.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION) .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION)
.toSeq .toSeq
.sortBy(f => f.getTimestamp) .sortBy(f => f.getTimestamp)
.reverse .reverse
.take(limit) .take(limit)
val compactionPlans = compactionInstants.map(instant =>
(instant, CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp))) compactionInstants.map(instant =>
compactionPlans.map { case (instant, plan) => (instant, CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp))
Row(instant.getTimestamp, instant.getAction, plan.getOperations.size()) ).map { case (instant, plan) =>
Row(instant.getTimestamp, plan.getOperations.size(), instant.getState.name())
} }
} }

View File

@@ -20,10 +20,9 @@
package org.apache.spark.sql.hudi.procedure package org.apache.spark.sql.hudi.procedure
import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.Path
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline} import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.util.{Option => HOption} import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.{HoodieCLIUtils, HoodieDataSourceHelpers} import org.apache.hudi.{HoodieCLIUtils, HoodieDataSourceHelpers}
import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
import scala.collection.JavaConverters.asScalaIteratorConverter import scala.collection.JavaConverters.asScalaIteratorConverter
@@ -64,28 +63,22 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase {
val secondScheduleInstant = HoodieActiveTimeline.createNewInstantTime val secondScheduleInstant = HoodieActiveTimeline.createNewInstantTime
client.scheduleClusteringAtInstant(secondScheduleInstant, HOption.empty()) client.scheduleClusteringAtInstant(secondScheduleInstant, HOption.empty())
checkAnswer(s"call show_clustering('$tableName')")( checkAnswer(s"call show_clustering('$tableName')")(
Seq(firstScheduleInstant, 3), Seq(secondScheduleInstant, 1, HoodieInstant.State.REQUESTED.name(), "*"),
Seq(secondScheduleInstant, 1) Seq(firstScheduleInstant, 3, HoodieInstant.State.REQUESTED.name(), "*")
) )
// Do clustering for all clustering plan generated above, and no new clustering // Do clustering for all clustering plan generated above, and no new clustering
// instant will be generated because of there is no commit after the second // instant will be generated because of there is no commit after the second
// clustering plan generated // clustering plan generated
spark.sql(s"call run_clustering(table => '$tableName', order => 'ts')") checkAnswer(s"call run_clustering(table => '$tableName', order => 'ts', show_involved_partition => true)")(
Seq(secondScheduleInstant, 1, HoodieInstant.State.COMPLETED.name(), "ts=1003"),
Seq(firstScheduleInstant, 3, HoodieInstant.State.COMPLETED.name(), "ts=1000,ts=1001,ts=1002")
)
// No new commits // No new commits
val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf()) val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf())
assertResult(false)(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, secondScheduleInstant)) assertResult(false)(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, secondScheduleInstant))
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 10.0, 1000),
Seq(2, "a2", 10.0, 1001),
Seq(3, "a3", 10.0, 1002),
Seq(4, "a4", 10.0, 1003)
)
// After clustering there should be no pending clustering.
checkAnswer(s"call show_clustering(table => '$tableName')")()
// Check the number of finished clustering instants // Check the number of finished clustering instants
val finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) val finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
.getInstants .getInstants
@@ -94,10 +87,23 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase {
.toSeq .toSeq
assertResult(2)(finishedClustering.size) assertResult(2)(finishedClustering.size)
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 10.0, 1000),
Seq(2, "a2", 10.0, 1001),
Seq(3, "a3", 10.0, 1002),
Seq(4, "a4", 10.0, 1003)
)
// After clustering there should be no pending clustering and all clustering instants should be completed
checkAnswer(s"call show_clustering(table => '$tableName')")(
Seq(secondScheduleInstant, 1, HoodieInstant.State.COMPLETED.name(), "*"),
Seq(firstScheduleInstant, 3, HoodieInstant.State.COMPLETED.name(), "*")
)
// Do clustering without manual schedule(which will do the schedule if no pending clustering exists) // Do clustering without manual schedule(which will do the schedule if no pending clustering exists)
spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)") spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)")
spark.sql(s"insert into $tableName values(6, 'a6', 10, 1005)") spark.sql(s"insert into $tableName values(6, 'a6', 10, 1005)")
spark.sql(s"call run_clustering(table => '$tableName', order => 'ts')") spark.sql(s"call run_clustering(table => '$tableName', order => 'ts', show_involved_partition => true)").show()
val thirdClusteringInstant = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) val thirdClusteringInstant = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
.findInstantsAfter(secondScheduleInstant) .findInstantsAfter(secondScheduleInstant)
@@ -142,7 +148,7 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase {
| location '$basePath' | location '$basePath'
""".stripMargin) """.stripMargin)
spark.sql(s"call run_clustering(path => '$basePath')") spark.sql(s"call run_clustering(path => '$basePath')").show()
checkAnswer(s"call show_clustering(path => '$basePath')")() checkAnswer(s"call show_clustering(path => '$basePath')")()
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
@@ -152,18 +158,22 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase {
// Generate the first clustering plan // Generate the first clustering plan
val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime
client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty()) client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty())
checkAnswer(s"call show_clustering(path => '$basePath')")( checkAnswer(s"call show_clustering(path => '$basePath', show_involved_partition => true)")(
Seq(firstScheduleInstant, 3) Seq(firstScheduleInstant, 3, HoodieInstant.State.REQUESTED.name(), "ts=1000,ts=1001,ts=1002")
) )
// Do clustering for all the clustering plan // Do clustering for all the clustering plan
spark.sql(s"call run_clustering(path => '$basePath', order => 'ts')") checkAnswer(s"call run_clustering(path => '$basePath', order => 'ts')")(
Seq(firstScheduleInstant, 3, HoodieInstant.State.COMPLETED.name(), "*")
)
checkAnswer(s"select id, name, price, ts from $tableName order by id")( checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 10.0, 1000), Seq(1, "a1", 10.0, 1000),
Seq(2, "a2", 10.0, 1001), Seq(2, "a2", 10.0, 1001),
Seq(3, "a3", 10.0, 1002) Seq(3, "a3", 10.0, 1002)
) )
val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf()) val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf())
HoodieDataSourceHelpers.hasNewCommits(fs, basePath, firstScheduleInstant) assertResult(false)(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, firstScheduleInstant))
// Check the number of finished clustering instants // Check the number of finished clustering instants
var finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) var finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
@@ -176,7 +186,12 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase {
// Do clustering without manual schedule(which will do the schedule if no pending clustering exists) // Do clustering without manual schedule(which will do the schedule if no pending clustering exists)
spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)") spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)")
spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)") spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)")
spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts >= 1003L')") val resultA = spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts >= 1003L', show_involved_partition => true)")
.collect()
.map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
assertResult(1)(resultA.length)
assertResult("ts=1003,ts=1004")(resultA(0)(3))
checkAnswer(s"select id, name, price, ts from $tableName order by id")( checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 10.0, 1000), Seq(1, "a1", 10.0, 1000),
Seq(2, "a2", 10.0, 1001), Seq(2, "a2", 10.0, 1001),
@@ -220,6 +235,8 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase {
val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf()) val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf())
// Test partition pruning with single predicate // Test partition pruning with single predicate
var resultA: Array[Seq[Any]] = Array.empty
{ {
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
@@ -230,7 +247,11 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase {
)("Only partition predicates are allowed") )("Only partition predicates are allowed")
// Do clustering table with partition predicate // Do clustering table with partition predicate
spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts <= 1001L', order => 'ts')") resultA = spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts <= 1001L', order => 'ts', show_involved_partition => true)")
.collect()
.map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
assertResult(1)(resultA.length)
assertResult("ts=1000,ts=1001")(resultA(0)(3))
// There is 1 completed clustering instant // There is 1 completed clustering instant
val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
@@ -245,9 +266,12 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase {
val clusteringPlan = HoodieDataSourceHelpers.getClusteringPlan(fs, basePath, clusteringInstant.getTimestamp) val clusteringPlan = HoodieDataSourceHelpers.getClusteringPlan(fs, basePath, clusteringInstant.getTimestamp)
assertResult(true)(clusteringPlan.isPresent) assertResult(true)(clusteringPlan.isPresent)
assertResult(2)(clusteringPlan.get().getInputGroups.size()) assertResult(2)(clusteringPlan.get().getInputGroups.size())
assertResult(resultA(0)(1))(clusteringPlan.get().getInputGroups.size())
// No pending clustering instant // All clustering instants are completed
checkAnswer(s"call show_clustering(table => '$tableName')")() checkAnswer(s"call show_clustering(table => '$tableName', show_involved_partition => true)")(
Seq(resultA(0).head, resultA(0)(1), HoodieInstant.State.COMPLETED.name(), "ts=1000,ts=1001")
)
checkAnswer(s"select id, name, price, ts from $tableName order by id")( checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 10.0, 1000), Seq(1, "a1", 10.0, 1000),
@@ -257,6 +281,8 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase {
} }
// Test partition pruning with {@code And} predicates // Test partition pruning with {@code And} predicates
var resultB: Array[Seq[Any]] = Array.empty
{ {
spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)") spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)")
spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)") spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)")
@@ -267,7 +293,11 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase {
)("Only partition predicates are allowed") )("Only partition predicates are allowed")
// Do clustering table with partition predicate // Do clustering table with partition predicate
spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts > 1001L and ts <= 1005L', order => 'ts')") resultB = spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts > 1001L and ts <= 1005L', order => 'ts', show_involved_partition => true)")
.collect()
.map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
assertResult(1)(resultB.length)
assertResult("ts=1002,ts=1003,ts=1004,ts=1005")(resultB(0)(3))
// There are 2 completed clustering instants // There are 2 completed clustering instants
val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
@@ -283,8 +313,11 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase {
assertResult(true)(clusteringPlan.isPresent) assertResult(true)(clusteringPlan.isPresent)
assertResult(4)(clusteringPlan.get().getInputGroups.size()) assertResult(4)(clusteringPlan.get().getInputGroups.size())
// No pending clustering instant // All clustering instants are completed
checkAnswer(s"call show_clustering(table => '$tableName')")() checkAnswer(s"call show_clustering(table => '$tableName', show_involved_partition => true)")(
Seq(resultA(0).head, resultA(0)(1), HoodieInstant.State.COMPLETED.name(), "ts=1000,ts=1001"),
Seq(resultB(0).head, resultB(0)(1), HoodieInstant.State.COMPLETED.name(), "ts=1002,ts=1003,ts=1004,ts=1005")
)
checkAnswer(s"select id, name, price, ts from $tableName order by id")( checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 10.0, 1000), Seq(1, "a1", 10.0, 1000),
@@ -297,6 +330,8 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase {
} }
// Test partition pruning with {@code And}-{@code Or} predicates // Test partition pruning with {@code And}-{@code Or} predicates
var resultC: Array[Seq[Any]] = Array.empty
{ {
spark.sql(s"insert into $tableName values(7, 'a7', 10, 1006)") spark.sql(s"insert into $tableName values(7, 'a7', 10, 1006)")
spark.sql(s"insert into $tableName values(8, 'a8', 10, 1007)") spark.sql(s"insert into $tableName values(8, 'a8', 10, 1007)")
@@ -308,7 +343,11 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase {
)("Only partition predicates are allowed") )("Only partition predicates are allowed")
// Do clustering table with partition predicate // Do clustering table with partition predicate
spark.sql(s"call run_clustering(table => '$tableName', predicate => '(ts >= 1006L and ts < 1008L) or ts >= 1009L', order => 'ts')") resultC = spark.sql(s"call run_clustering(table => '$tableName', predicate => '(ts >= 1006L and ts < 1008L) or ts >= 1009L', order => 'ts', show_involved_partition => true)")
.collect()
.map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
assertResult(1)(resultC.length)
assertResult("ts=1006,ts=1007,ts=1009")(resultC(0)(3))
// There are 3 completed clustering instants // There are 3 completed clustering instants
val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
@@ -324,8 +363,12 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase {
assertResult(true)(clusteringPlan.isPresent) assertResult(true)(clusteringPlan.isPresent)
assertResult(3)(clusteringPlan.get().getInputGroups.size()) assertResult(3)(clusteringPlan.get().getInputGroups.size())
// No pending clustering instant // All clustering instants are completed
checkAnswer(s"call show_clustering(table => '$tableName')")() checkAnswer(s"call show_clustering(table => '$tableName', show_involved_partition => true)")(
Seq(resultA(0).head, resultA(0)(1), HoodieInstant.State.COMPLETED.name(), "ts=1000,ts=1001"),
Seq(resultB(0).head, resultB(0)(1), HoodieInstant.State.COMPLETED.name(), "ts=1002,ts=1003,ts=1004,ts=1005"),
Seq(resultC(0).head, resultC(0)(1), HoodieInstant.State.COMPLETED.name(), "ts=1006,ts=1007,ts=1009")
)
checkAnswer(s"select id, name, price, ts from $tableName order by id")( checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 10.0, 1000), Seq(1, "a1", 10.0, 1000),

View File

@@ -19,6 +19,7 @@
package org.apache.spark.sql.hudi.procedure package org.apache.spark.sql.hudi.procedure
import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
class TestCompactionProcedure extends HoodieSparkSqlTestBase { class TestCompactionProcedure extends HoodieSparkSqlTestBase {
@@ -48,22 +49,52 @@ class TestCompactionProcedure extends HoodieSparkSqlTestBase {
spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)") spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)")
spark.sql(s"update $tableName set price = 11 where id = 1") spark.sql(s"update $tableName set price = 11 where id = 1")
spark.sql(s"call run_compaction(op => 'schedule', table => '$tableName')") // Schedule the first compaction
val resultA = spark.sql(s"call run_compaction(op => 'schedule', table => '$tableName')")
.collect()
.map(row => Seq(row.getString(0), row.getInt(1), row.getString(2)))
spark.sql(s"update $tableName set price = 12 where id = 2") spark.sql(s"update $tableName set price = 12 where id = 2")
spark.sql(s"call run_compaction('schedule', '$tableName')")
val compactionRows = spark.sql(s"call show_compaction(table => '$tableName', limit => 10)").collect() // Schedule the second compaction
val timestamps = compactionRows.map(_.getString(0)) val resultB = spark.sql(s"call run_compaction('schedule', '$tableName')")
.collect()
.map(row => Seq(row.getString(0), row.getInt(1), row.getString(2)))
assertResult(1)(resultA.length)
assertResult(1)(resultB.length)
val showCompactionSql: String = s"call show_compaction(table => '$tableName', limit => 10)"
checkAnswer(showCompactionSql)(
resultA(0),
resultB(0)
)
val compactionRows = spark.sql(showCompactionSql).collect()
val timestamps = compactionRows.map(_.getString(0)).sorted
assertResult(2)(timestamps.length) assertResult(2)(timestamps.length)
spark.sql(s"call run_compaction(op => 'run', table => '$tableName', timestamp => ${timestamps(1)})") // Execute the second scheduled compaction instant actually
checkAnswer(s"call run_compaction(op => 'run', table => '$tableName', timestamp => ${timestamps(1)})")(
Seq(resultB(0).head, resultB(0)(1), HoodieInstant.State.COMPLETED.name())
)
checkAnswer(s"select id, name, price, ts from $tableName order by id")( checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 11.0, 1000), Seq(1, "a1", 11.0, 1000),
Seq(2, "a2", 12.0, 1000), Seq(2, "a2", 12.0, 1000),
Seq(3, "a3", 10.0, 1000), Seq(3, "a3", 10.0, 1000),
Seq(4, "a4", 10.0, 1000) Seq(4, "a4", 10.0, 1000)
) )
assertResult(1)(spark.sql(s"call show_compaction('$tableName')").collect().length)
spark.sql(s"call run_compaction(op => 'run', table => '$tableName', timestamp => ${timestamps(0)})") // A compaction action eventually becomes commit when completed, so show_compaction
// can only see the first scheduled compaction instant
val resultC = spark.sql(s"call show_compaction('$tableName')")
.collect()
.map(row => Seq(row.getString(0), row.getInt(1), row.getString(2)))
assertResult(1)(resultC.length)
assertResult(resultA)(resultC)
checkAnswer(s"call run_compaction(op => 'run', table => '$tableName', timestamp => ${timestamps(0)})")(
Seq(resultA(0).head, resultA(0)(1), HoodieInstant.State.COMPLETED.name())
)
checkAnswer(s"select id, name, price, ts from $tableName order by id")( checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 11.0, 1000), Seq(1, "a1", 11.0, 1000),
Seq(2, "a2", 12.0, 1000), Seq(2, "a2", 12.0, 1000),
@@ -98,25 +129,40 @@ class TestCompactionProcedure extends HoodieSparkSqlTestBase {
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)") spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
spark.sql(s"update $tableName set price = 11 where id = 1") spark.sql(s"update $tableName set price = 11 where id = 1")
spark.sql(s"call run_compaction(op => 'run', path => '${tmp.getCanonicalPath}')") checkAnswer(s"call run_compaction(op => 'run', path => '${tmp.getCanonicalPath}')")()
checkAnswer(s"select id, name, price, ts from $tableName order by id")( checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 11.0, 1000), Seq(1, "a1", 11.0, 1000),
Seq(2, "a2", 10.0, 1000), Seq(2, "a2", 10.0, 1000),
Seq(3, "a3", 10.0, 1000) Seq(3, "a3", 10.0, 1000)
) )
assertResult(0)(spark.sql(s"call show_compaction(path => '${tmp.getCanonicalPath}')").collect().length) assertResult(0)(spark.sql(s"call show_compaction(path => '${tmp.getCanonicalPath}')").collect().length)
// schedule compaction first
spark.sql(s"update $tableName set price = 12 where id = 1") spark.sql(s"update $tableName set price = 12 where id = 1")
spark.sql(s"call run_compaction(op=> 'schedule', path => '${tmp.getCanonicalPath}')")
// schedule compaction second // Schedule the first compaction
val resultA = spark.sql(s"call run_compaction(op=> 'schedule', path => '${tmp.getCanonicalPath}')")
.collect()
.map(row => Seq(row.getString(0), row.getInt(1), row.getString(2)))
spark.sql(s"update $tableName set price = 12 where id = 2") spark.sql(s"update $tableName set price = 12 where id = 2")
spark.sql(s"call run_compaction(op => 'schedule', path => '${tmp.getCanonicalPath}')")
// show compaction // Schedule the second compaction
assertResult(2)(spark.sql(s"call show_compaction(path => '${tmp.getCanonicalPath}')").collect().length) val resultB = spark.sql(s"call run_compaction(op => 'schedule', path => '${tmp.getCanonicalPath}')")
// run compaction for all the scheduled compaction .collect()
spark.sql(s"call run_compaction(op => 'run', path => '${tmp.getCanonicalPath}')") .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2)))
assertResult(1)(resultA.length)
assertResult(1)(resultB.length)
checkAnswer(s"call show_compaction(path => '${tmp.getCanonicalPath}')")(
resultA(0),
resultB(0)
)
// Run compaction for all the scheduled compaction
checkAnswer(s"call run_compaction(op => 'run', path => '${tmp.getCanonicalPath}')")(
Seq(resultA(0).head, resultA(0)(1), HoodieInstant.State.COMPLETED.name()),
Seq(resultB(0).head, resultB(0)(1), HoodieInstant.State.COMPLETED.name())
)
checkAnswer(s"select id, name, price, ts from $tableName order by id")( checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 12.0, 1000), Seq(1, "a1", 12.0, 1000),