From 1d0f4ccfe0e81f1b83ff6f0fb1e3383da9802e62 Mon Sep 17 00:00:00 2001 From: huberylee Date: Mon, 28 Mar 2022 14:11:35 +0800 Subject: [PATCH] [HUDI-3538] Support Compaction Command Based on Call Procedure Command for Spark SQL (#4945) * Support Compaction Command Based on Call Procedure Command for Spark SQL * Addressed review comments --- .../command/CompactionHoodiePathCommand.scala | 92 ++--------- .../CompactionHoodieTableCommand.scala | 4 +- .../CompactionShowHoodiePathCommand.scala | 31 ++-- .../CompactionShowHoodieTableCommand.scala | 1 + .../procedures/HoodieProcedureUtils.scala | 46 ++++++ .../command/procedures/HoodieProcedures.scala | 2 + .../procedures/RunCompactionProcedure.scala | 144 ++++++++++++++++++ .../procedures/ShowCompactionProcedure.scala | 90 +++++++++++ ...re.scala => TestClusteringProcedure.scala} | 3 +- .../procedure/TestCompactionProcedure.scala | 133 ++++++++++++++++ 10 files changed, 448 insertions(+), 98 deletions(-) create mode 100644 hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedureUtils.scala create mode 100644 hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala create mode 100644 hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala rename hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/{TestRunClusteringProcedure.scala => TestClusteringProcedure.scala} (99%) create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala index 7bd9a3f22..5b513f750 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala @@ -17,97 +17,37 @@ package org.apache.spark.sql.hudi.command -import org.apache.hudi.HoodieCLIUtils -import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieTableType} +import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.table.HoodieTableMetaClient -import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline} -import org.apache.hudi.common.util.{HoodieTimer, Option => HOption} -import org.apache.hudi.exception.HoodieException + import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.{CompactionOperation, RUN, SCHEDULE} +import org.apache.spark.sql.hudi.command.procedures.{HoodieProcedureUtils, RunCompactionProcedure} import org.apache.spark.sql.types.StringType import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.unsafe.types.UTF8String -import scala.collection.JavaConversions._ -import scala.collection.JavaConverters._ - +@Deprecated case class CompactionHoodiePathCommand(path: String, - operation: CompactionOperation, instantTimestamp: Option[Long] = None) + operation: CompactionOperation, + instantTimestamp: Option[Long] = None) extends HoodieLeafRunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val metaClient = HoodieTableMetaClient.builder().setBasePath(path) .setConf(sparkSession.sessionState.newHadoopConf()).build() + assert(metaClient.getTableType == HoodieTableType.MERGE_ON_READ, s"Must compaction on a Merge On Read table.") - assert(metaClient.getTableType == HoodieTableType.MERGE_ON_READ, - s"Must compaction on a Merge On Read table.") - val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, path, Map.empty) - - operation match { - case SCHEDULE => - val instantTime = instantTimestamp.map(_.toString).getOrElse(HoodieActiveTimeline.createNewInstantTime) - if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) { - Seq(Row(instantTime)) - } else { - Seq.empty[Row] - } - case RUN => - // Do compaction - val timeLine = metaClient.getActiveTimeline - val pendingCompactionInstants = timeLine.getWriteTimeline.getInstants.iterator().asScala - .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION) - .map(_.getTimestamp) - .toSeq.sortBy(f => f) - val willCompactionInstants = if (instantTimestamp.isEmpty) { - if (pendingCompactionInstants.nonEmpty) { - pendingCompactionInstants - } else { // If there are no pending compaction, schedule to generate one. - // CompactionHoodiePathCommand will return instanceTime for SCHEDULE. - val scheduleSeq = CompactionHoodiePathCommand(path, CompactionOperation.SCHEDULE).run(sparkSession) - if (scheduleSeq.isEmpty) { - Seq.empty - } else { - Seq(scheduleSeq.take(1).get(0).getString(0)).filter(_ != null) - } - } - } else { - // Check if the compaction timestamp has exists in the pending compaction - if (pendingCompactionInstants.contains(instantTimestamp.get.toString)) { - Seq(instantTimestamp.get.toString) - } else { - throw new IllegalArgumentException(s"Compaction instant: ${instantTimestamp.get} is not found in $path," + - s" Available pending compaction instants are: ${pendingCompactionInstants.mkString(",")} ") - } - } - if (willCompactionInstants.isEmpty) { - logInfo(s"No need to compaction on $path") - Seq.empty[Row] - } else { - logInfo(s"Run compaction at instants: [${willCompactionInstants.mkString(",")}] on $path") - val timer = new HoodieTimer - timer.startTimer() - willCompactionInstants.foreach {compactionInstant => - val writeResponse = client.compact(compactionInstant) - handleResponse(writeResponse.getCommitMetadata.get()) - client.commitCompaction(compactionInstant, writeResponse.getCommitMetadata.get(), HOption.empty()) - } - logInfo(s"Finish Run compaction at instants: [${willCompactionInstants.mkString(",")}]," + - s" spend: ${timer.endTimer()}ms") - Seq.empty[Row] - } - case _=> throw new UnsupportedOperationException(s"Unsupported compaction operation: $operation") + val op = operation match { + case SCHEDULE => UTF8String.fromString("schedule") + case RUN => UTF8String.fromString("run") + case _ => throw new UnsupportedOperationException(s"Unsupported compaction operation: $operation") } - } - private def handleResponse(metadata: HoodieCommitMetadata): Unit = { - - // Handle error - val writeStats = metadata.getPartitionToWriteStats.entrySet().flatMap(e => e.getValue).toList - val errorsCount = writeStats.map(state => state.getTotalWriteErrors).sum - if (errorsCount > 0) { - throw new HoodieException(s" Found $errorsCount when writing record") - } + var args: Map[String, Any] = Map("op" -> op, "path" -> UTF8String.fromString(path)) + instantTimestamp.foreach(timestamp => args += "timestamp" -> timestamp) + val procedureArgs = HoodieProcedureUtils.buildProcedureArgs(args) + RunCompactionProcedure.builder.get().build.call(procedureArgs) } override val output: Seq[Attribute] = { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala index 2c89ed8c9..5e362314c 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala @@ -24,8 +24,10 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.getTableLocation import org.apache.spark.sql.types.StringType import org.apache.spark.sql.{Row, SparkSession} +@Deprecated case class CompactionHoodieTableCommand(table: CatalogTable, - operation: CompactionOperation, instantTimestamp: Option[Long]) + operation: CompactionOperation, + instantTimestamp: Option[Long]) extends HoodieLeafRunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodiePathCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodiePathCommand.scala index 44c572397..965724163 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodiePathCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodiePathCommand.scala @@ -19,41 +19,32 @@ package org.apache.spark.sql.hudi.command import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.table.HoodieTableMetaClient -import org.apache.hudi.common.table.timeline.HoodieTimeline -import org.apache.hudi.common.util.CompactionUtils + import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.hudi.command.procedures.{HoodieProcedureUtils, ShowCompactionProcedure} import org.apache.spark.sql.types.{IntegerType, StringType} +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.unsafe.types.UTF8String -import scala.collection.JavaConverters.asScalaIteratorConverter - +@Deprecated case class CompactionShowHoodiePathCommand(path: String, limit: Int) extends HoodieLeafRunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { - val metaClient = HoodieTableMetaClient.builder().setBasePath(path.toString) + val metaClient = HoodieTableMetaClient.builder().setBasePath(path) .setConf(sparkSession.sessionState.newHadoopConf()).build() assert(metaClient.getTableType == HoodieTableType.MERGE_ON_READ, s"Cannot show compaction on a Non Merge On Read table.") - val timeLine = metaClient.getActiveTimeline - val compactionInstants = timeLine.getInstants.iterator().asScala - .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION) - .toSeq - .sortBy(f => f.getTimestamp) - .reverse - .take(limit) - val compactionPlans = compactionInstants.map(instant => - (instant, CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp))) - compactionPlans.map { case (instant, plan) => - Row(instant.getTimestamp, instant.getAction, plan.getOperations.size()) - } + + val args = Map("path" -> UTF8String.fromString(path), "limit" -> limit) + val procedureArgs = HoodieProcedureUtils.buildProcedureArgs(args) + ShowCompactionProcedure.builder.get().build.call(procedureArgs) } override val output: Seq[Attribute] = { Seq( - AttributeReference("timestamp", StringType, nullable = false)(), + AttributeReference("instant", StringType, nullable = false)(), AttributeReference("action", StringType, nullable = false)(), AttributeReference("size", IntegerType, nullable = false)() ) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala index a9176164f..f3f0a8e52 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.getTableLocation import org.apache.spark.sql.types.{IntegerType, StringType} import org.apache.spark.sql.{Row, SparkSession} +@Deprecated case class CompactionShowHoodieTableCommand(table: CatalogTable, limit: Int) extends HoodieLeafRunnableCommand { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedureUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedureUtils.scala new file mode 100644 index 000000000..374f86773 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedureUtils.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow + +import java.util + +object HoodieProcedureUtils { + + /** + * Build named procedure arguments from given args' map + * + * @param args The arguments map + * @return Named procedure arguments + */ + def buildProcedureArgs(args: Map[String, Any]): ProcedureArgs = { + val values: Array[Any] = new Array[Any](args.size) + val map = new util.LinkedHashMap[String, Int]() + + args.zipWithIndex.foreach { + case ((key, value), index) => + values(index) = value + map.put(key, index) + } + + ProcedureArgs(isNamedArgs = true, map, new GenericInternalRow(values)) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala index 9c0577353..e7de3e784 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala @@ -33,6 +33,8 @@ object HoodieProcedures { private def initProcedureBuilders: util.Map[String, Supplier[ProcedureBuilder]] = { val mapBuilder: ImmutableMap.Builder[String, Supplier[ProcedureBuilder]] = ImmutableMap.builder() + mapBuilder.put(RunCompactionProcedure.NAME, RunCompactionProcedure.builder) + mapBuilder.put(ShowCompactionProcedure.NAME, ShowCompactionProcedure.builder) mapBuilder.put(CreateSavepointsProcedure.NAME, CreateSavepointsProcedure.builder) mapBuilder.put(DeleteSavepointsProcedure.NAME, DeleteSavepointsProcedure.builder) mapBuilder.put(RollbackSavepointsProcedure.NAME, RollbackSavepointsProcedure.builder) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala new file mode 100644 index 000000000..9bca33f38 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.model.HoodieCommitMetadata +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline} +import org.apache.hudi.common.util.{HoodieTimer, Option => HOption} +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.{HoodieCLIUtils, SparkAdapterSupport} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types._ + +import java.util.function.Supplier + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +class RunCompactionProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport with Logging { + + /** + * operation = (RUN | SCHEDULE) COMPACTION ON tableIdentifier (AT instantTimestamp = INTEGER_VALUE)? + * operation = (RUN | SCHEDULE) COMPACTION ON path = STRING (AT instantTimestamp = INTEGER_VALUE)? + */ + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "op", DataTypes.StringType, None), + ProcedureParameter.optional(1, "table", DataTypes.StringType, None), + ProcedureParameter.optional(2, "path", DataTypes.StringType, None), + ProcedureParameter.optional(3, "timestamp", DataTypes.LongType, None) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("instant", DataTypes.StringType, nullable = true, Metadata.empty) + )) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val operation = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String].toLowerCase + val tableName = getArgValueOrDefault(args, PARAMETERS(1)) + val tablePath = getArgValueOrDefault(args, PARAMETERS(2)) + val instantTimestamp = getArgValueOrDefault(args, PARAMETERS(3)) + + val basePath = getBasePath(tableName, tablePath) + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, Map.empty) + + operation match { + case "schedule" => + val instantTime = instantTimestamp.map(_.toString).getOrElse(HoodieActiveTimeline.createNewInstantTime) + if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) { + Seq(Row(instantTime)) + } else { + Seq.empty[Row] + } + case "run" => + // Do compaction + val timeLine = metaClient.getActiveTimeline + val pendingCompactionInstants = timeLine.getWriteTimeline.getInstants.iterator().asScala + .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION) + .map(_.getTimestamp) + .toSeq.sortBy(f => f) + val willCompactionInstants = if (instantTimestamp.isEmpty) { + if (pendingCompactionInstants.nonEmpty) { + pendingCompactionInstants + } else { // If there are no pending compaction, schedule to generate one. + // CompactionHoodiePathCommand will return instanceTime for SCHEDULE. + val instantTime = HoodieActiveTimeline.createNewInstantTime() + if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) { + Seq(instantTime) + } else { + Seq.empty + } + } + } else { + // Check if the compaction timestamp has exists in the pending compaction + if (pendingCompactionInstants.contains(instantTimestamp.get.toString)) { + Seq(instantTimestamp.get.toString) + } else { + throw new IllegalArgumentException(s"Compaction instant: ${instantTimestamp.get} is not found in " + + s"$basePath, Available pending compaction instants are: ${pendingCompactionInstants.mkString(",")} ") + } + } + if (willCompactionInstants.isEmpty) { + logInfo(s"No need to compaction on $basePath") + Seq.empty[Row] + } else { + logInfo(s"Run compaction at instants: [${willCompactionInstants.mkString(",")}] on $basePath") + val timer = new HoodieTimer + timer.startTimer() + willCompactionInstants.foreach { compactionInstant => + val writeResponse = client.compact(compactionInstant) + handleResponse(writeResponse.getCommitMetadata.get()) + client.commitCompaction(compactionInstant, writeResponse.getCommitMetadata.get(), HOption.empty()) + } + logInfo(s"Finish Run compaction at instants: [${willCompactionInstants.mkString(",")}]," + + s" spend: ${timer.endTimer()}ms") + Seq.empty[Row] + } + case _ => throw new UnsupportedOperationException(s"Unsupported compaction operation: $operation") + } + } + + private def handleResponse(metadata: HoodieCommitMetadata): Unit = { + // Handle error + val writeStats = metadata.getPartitionToWriteStats.entrySet().flatMap(e => e.getValue).toList + val errorsCount = writeStats.map(state => state.getTotalWriteErrors).sum + if (errorsCount > 0) { + throw new HoodieException(s" Found $errorsCount when writing record") + } + } + + override def build: Procedure = new RunCompactionProcedure() + +} + +object RunCompactionProcedure { + val NAME = "run_compaction" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new RunCompactionProcedure + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala new file mode 100644 index 000000000..d484d6532 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.SparkAdapterSupport +import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.HoodieTimeline +import org.apache.hudi.common.util.CompactionUtils + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types._ + +import java.util.function.Supplier + +import scala.collection.JavaConverters._ + +class ShowCompactionProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport with Logging { + /** + * SHOW COMPACTION ON tableIdentifier (LIMIT limit = INTEGER_VALUE)? + * SHOW COMPACTION ON path = STRING (LIMIT limit = INTEGER_VALUE)? + */ + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.optional(0, "table", DataTypes.StringType, None), + ProcedureParameter.optional(1, "path", DataTypes.StringType, None), + ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 20) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("timestamp", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("size", DataTypes.IntegerType, nullable = true, Metadata.empty) + )) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val tablePath = getArgValueOrDefault(args, PARAMETERS(1)) + val limit = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int] + + val basePath: String = getBasePath(tableName, tablePath) + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + + assert(metaClient.getTableType == HoodieTableType.MERGE_ON_READ, + s"Cannot show compaction on a Non Merge On Read table.") + val timeLine = metaClient.getActiveTimeline + val compactionInstants = timeLine.getInstants.iterator().asScala + .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION) + .toSeq + .sortBy(f => f.getTimestamp) + .reverse + .take(limit) + val compactionPlans = compactionInstants.map(instant => + (instant, CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp))) + compactionPlans.map { case (instant, plan) => + Row(instant.getTimestamp, instant.getAction, plan.getOperations.size()) + } + } + + override def build: Procedure = new ShowCompactionProcedure() +} + +object ShowCompactionProcedure { + val NAME = "show_compaction" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new ShowCompactionProcedure + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRunClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala similarity index 99% rename from hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRunClusteringProcedure.scala rename to hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala index 068cd6538..621411723 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRunClusteringProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala @@ -23,11 +23,12 @@ import org.apache.hadoop.fs.Path import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline} import org.apache.hudi.common.util.{Option => HOption} import org.apache.hudi.{HoodieCLIUtils, HoodieDataSourceHelpers} + import org.apache.spark.sql.hudi.TestHoodieSqlBase import scala.collection.JavaConverters.asScalaIteratorConverter -class TestRunClusteringProcedure extends TestHoodieSqlBase { +class TestClusteringProcedure extends TestHoodieSqlBase { test("Test Call run_clustering Procedure By Table") { withTempDir { tmp => diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala new file mode 100644 index 000000000..f6e6772d1 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.procedure + +import org.apache.spark.sql.hudi.TestHoodieSqlBase + +class TestCompactionProcedure extends TestHoodieSqlBase { + + test("Test Call run_compaction Procedure by Table") { + withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}' + | tblproperties ( + | primaryKey ='id', + | type = 'mor', + | preCombineField = 'ts' + | ) + """.stripMargin) + spark.sql("set hoodie.parquet.max.file.size = 10000") + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)") + spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)") + spark.sql(s"update $tableName set price = 11 where id = 1") + + spark.sql(s"call run_compaction(op => 'schedule', table => '$tableName')") + spark.sql(s"update $tableName set price = 12 where id = 2") + spark.sql(s"call run_compaction('schedule', '$tableName')") + val compactionRows = spark.sql(s"call show_compaction(table => '$tableName', limit => 10)").collect() + val timestamps = compactionRows.map(_.getString(0)) + assertResult(2)(timestamps.length) + + spark.sql(s"call run_compaction(op => 'run', table => '$tableName', timestamp => ${timestamps(1)})") + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 11.0, 1000), + Seq(2, "a2", 12.0, 1000), + Seq(3, "a3", 10.0, 1000), + Seq(4, "a4", 10.0, 1000) + ) + assertResult(1)(spark.sql(s"call show_compaction('$tableName')").collect().length) + spark.sql(s"call run_compaction(op => 'run', table => '$tableName', timestamp => ${timestamps(0)})") + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 11.0, 1000), + Seq(2, "a2", 12.0, 1000), + Seq(3, "a3", 10.0, 1000), + Seq(4, "a4", 10.0, 1000) + ) + assertResult(0)(spark.sql(s"call show_compaction(table => '$tableName')").collect().length) + } + } + + test("Test Call run_compaction Procedure by Path") { + withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}' + | tblproperties ( + | primaryKey ='id', + | type = 'mor', + | preCombineField = 'ts' + | ) + """.stripMargin) + spark.sql("set hoodie.parquet.max.file.size = 10000") + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)") + spark.sql(s"update $tableName set price = 11 where id = 1") + + spark.sql(s"call run_compaction(op => 'run', path => '${tmp.getCanonicalPath}')") + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 11.0, 1000), + Seq(2, "a2", 10.0, 1000), + Seq(3, "a3", 10.0, 1000) + ) + assertResult(0)(spark.sql(s"call show_compaction(path => '${tmp.getCanonicalPath}')").collect().length) + // schedule compaction first + spark.sql(s"update $tableName set price = 12 where id = 1") + spark.sql(s"call run_compaction(op=> 'schedule', path => '${tmp.getCanonicalPath}')") + + // schedule compaction second + spark.sql(s"update $tableName set price = 12 where id = 2") + spark.sql(s"call run_compaction(op => 'schedule', path => '${tmp.getCanonicalPath}')") + + // show compaction + assertResult(2)(spark.sql(s"call show_compaction(path => '${tmp.getCanonicalPath}')").collect().length) + // run compaction for all the scheduled compaction + spark.sql(s"call run_compaction(op => 'run', path => '${tmp.getCanonicalPath}')") + + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 12.0, 1000), + Seq(2, "a2", 12.0, 1000), + Seq(3, "a3", 10.0, 1000) + ) + assertResult(0)(spark.sql(s"call show_compaction(path => '${tmp.getCanonicalPath}')").collect().length) + + checkException(s"call run_compaction(op => 'run', path => '${tmp.getCanonicalPath}', timestamp => 12345L)")( + s"Compaction instant: 12345 is not found in ${tmp.getCanonicalPath}, Available pending compaction instants are: " + ) + } + } +}