1
0

[HUDI-3538] Support Compaction Command Based on Call Procedure Command for Spark SQL (#4945)

* Support Compaction Command Based on Call Procedure Command for Spark SQL

* Addressed review comments
This commit is contained in:
huberylee
2022-03-28 14:11:35 +08:00
committed by GitHub
parent d31cde284c
commit 1d0f4ccfe0
10 changed files with 448 additions and 98 deletions

View File

@@ -17,97 +17,37 @@
package org.apache.spark.sql.hudi.command
import org.apache.hudi.HoodieCLIUtils
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieTableType}
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
import org.apache.hudi.common.util.{HoodieTimer, Option => HOption}
import org.apache.hudi.exception.HoodieException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation
import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.{CompactionOperation, RUN, SCHEDULE}
import org.apache.spark.sql.hudi.command.procedures.{HoodieProcedureUtils, RunCompactionProcedure}
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@Deprecated
case class CompactionHoodiePathCommand(path: String,
operation: CompactionOperation, instantTimestamp: Option[Long] = None)
operation: CompactionOperation,
instantTimestamp: Option[Long] = None)
extends HoodieLeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
.setConf(sparkSession.sessionState.newHadoopConf()).build()
assert(metaClient.getTableType == HoodieTableType.MERGE_ON_READ, s"Must compaction on a Merge On Read table.")
assert(metaClient.getTableType == HoodieTableType.MERGE_ON_READ,
s"Must compaction on a Merge On Read table.")
val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, path, Map.empty)
operation match {
case SCHEDULE =>
val instantTime = instantTimestamp.map(_.toString).getOrElse(HoodieActiveTimeline.createNewInstantTime)
if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) {
Seq(Row(instantTime))
} else {
Seq.empty[Row]
}
case RUN =>
// Do compaction
val timeLine = metaClient.getActiveTimeline
val pendingCompactionInstants = timeLine.getWriteTimeline.getInstants.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION)
.map(_.getTimestamp)
.toSeq.sortBy(f => f)
val willCompactionInstants = if (instantTimestamp.isEmpty) {
if (pendingCompactionInstants.nonEmpty) {
pendingCompactionInstants
} else { // If there are no pending compaction, schedule to generate one.
// CompactionHoodiePathCommand will return instanceTime for SCHEDULE.
val scheduleSeq = CompactionHoodiePathCommand(path, CompactionOperation.SCHEDULE).run(sparkSession)
if (scheduleSeq.isEmpty) {
Seq.empty
} else {
Seq(scheduleSeq.take(1).get(0).getString(0)).filter(_ != null)
}
}
} else {
// Check if the compaction timestamp has exists in the pending compaction
if (pendingCompactionInstants.contains(instantTimestamp.get.toString)) {
Seq(instantTimestamp.get.toString)
} else {
throw new IllegalArgumentException(s"Compaction instant: ${instantTimestamp.get} is not found in $path," +
s" Available pending compaction instants are: ${pendingCompactionInstants.mkString(",")} ")
}
}
if (willCompactionInstants.isEmpty) {
logInfo(s"No need to compaction on $path")
Seq.empty[Row]
} else {
logInfo(s"Run compaction at instants: [${willCompactionInstants.mkString(",")}] on $path")
val timer = new HoodieTimer
timer.startTimer()
willCompactionInstants.foreach {compactionInstant =>
val writeResponse = client.compact(compactionInstant)
handleResponse(writeResponse.getCommitMetadata.get())
client.commitCompaction(compactionInstant, writeResponse.getCommitMetadata.get(), HOption.empty())
}
logInfo(s"Finish Run compaction at instants: [${willCompactionInstants.mkString(",")}]," +
s" spend: ${timer.endTimer()}ms")
Seq.empty[Row]
}
case _=> throw new UnsupportedOperationException(s"Unsupported compaction operation: $operation")
val op = operation match {
case SCHEDULE => UTF8String.fromString("schedule")
case RUN => UTF8String.fromString("run")
case _ => throw new UnsupportedOperationException(s"Unsupported compaction operation: $operation")
}
}
private def handleResponse(metadata: HoodieCommitMetadata): Unit = {
// Handle error
val writeStats = metadata.getPartitionToWriteStats.entrySet().flatMap(e => e.getValue).toList
val errorsCount = writeStats.map(state => state.getTotalWriteErrors).sum
if (errorsCount > 0) {
throw new HoodieException(s" Found $errorsCount when writing record")
}
var args: Map[String, Any] = Map("op" -> op, "path" -> UTF8String.fromString(path))
instantTimestamp.foreach(timestamp => args += "timestamp" -> timestamp)
val procedureArgs = HoodieProcedureUtils.buildProcedureArgs(args)
RunCompactionProcedure.builder.get().build.call(procedureArgs)
}
override val output: Seq[Attribute] = {

View File

@@ -24,8 +24,10 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.getTableLocation
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{Row, SparkSession}
@Deprecated
case class CompactionHoodieTableCommand(table: CatalogTable,
operation: CompactionOperation, instantTimestamp: Option[Long])
operation: CompactionOperation,
instantTimestamp: Option[Long])
extends HoodieLeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {

View File

@@ -19,41 +19,32 @@ package org.apache.spark.sql.hudi.command
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.common.util.CompactionUtils
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.hudi.command.procedures.{HoodieProcedureUtils, ShowCompactionProcedure}
import org.apache.spark.sql.types.{IntegerType, StringType}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConverters.asScalaIteratorConverter
@Deprecated
case class CompactionShowHoodiePathCommand(path: String, limit: Int)
extends HoodieLeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val metaClient = HoodieTableMetaClient.builder().setBasePath(path.toString)
val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
.setConf(sparkSession.sessionState.newHadoopConf()).build()
assert(metaClient.getTableType == HoodieTableType.MERGE_ON_READ,
s"Cannot show compaction on a Non Merge On Read table.")
val timeLine = metaClient.getActiveTimeline
val compactionInstants = timeLine.getInstants.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION)
.toSeq
.sortBy(f => f.getTimestamp)
.reverse
.take(limit)
val compactionPlans = compactionInstants.map(instant =>
(instant, CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp)))
compactionPlans.map { case (instant, plan) =>
Row(instant.getTimestamp, instant.getAction, plan.getOperations.size())
}
val args = Map("path" -> UTF8String.fromString(path), "limit" -> limit)
val procedureArgs = HoodieProcedureUtils.buildProcedureArgs(args)
ShowCompactionProcedure.builder.get().build.call(procedureArgs)
}
override val output: Seq[Attribute] = {
Seq(
AttributeReference("timestamp", StringType, nullable = false)(),
AttributeReference("instant", StringType, nullable = false)(),
AttributeReference("action", StringType, nullable = false)(),
AttributeReference("size", IntegerType, nullable = false)()
)

View File

@@ -23,6 +23,7 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.getTableLocation
import org.apache.spark.sql.types.{IntegerType, StringType}
import org.apache.spark.sql.{Row, SparkSession}
@Deprecated
case class CompactionShowHoodieTableCommand(table: CatalogTable, limit: Int)
extends HoodieLeafRunnableCommand {

View File

@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.spark.sql.hudi.command.procedures
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import java.util
object HoodieProcedureUtils {
/**
* Build named procedure arguments from given args' map
*
* @param args The arguments map
* @return Named procedure arguments
*/
def buildProcedureArgs(args: Map[String, Any]): ProcedureArgs = {
val values: Array[Any] = new Array[Any](args.size)
val map = new util.LinkedHashMap[String, Int]()
args.zipWithIndex.foreach {
case ((key, value), index) =>
values(index) = value
map.put(key, index)
}
ProcedureArgs(isNamedArgs = true, map, new GenericInternalRow(values))
}
}

View File

@@ -33,6 +33,8 @@ object HoodieProcedures {
private def initProcedureBuilders: util.Map[String, Supplier[ProcedureBuilder]] = {
val mapBuilder: ImmutableMap.Builder[String, Supplier[ProcedureBuilder]] = ImmutableMap.builder()
mapBuilder.put(RunCompactionProcedure.NAME, RunCompactionProcedure.builder)
mapBuilder.put(ShowCompactionProcedure.NAME, ShowCompactionProcedure.builder)
mapBuilder.put(CreateSavepointsProcedure.NAME, CreateSavepointsProcedure.builder)
mapBuilder.put(DeleteSavepointsProcedure.NAME, DeleteSavepointsProcedure.builder)
mapBuilder.put(RollbackSavepointsProcedure.NAME, RollbackSavepointsProcedure.builder)

View File

@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command.procedures
import org.apache.hudi.common.model.HoodieCommitMetadata
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
import org.apache.hudi.common.util.{HoodieTimer, Option => HOption}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.{HoodieCLIUtils, SparkAdapterSupport}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import java.util.function.Supplier
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
class RunCompactionProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport with Logging {
/**
* operation = (RUN | SCHEDULE) COMPACTION ON tableIdentifier (AT instantTimestamp = INTEGER_VALUE)?
* operation = (RUN | SCHEDULE) COMPACTION ON path = STRING (AT instantTimestamp = INTEGER_VALUE)?
*/
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "op", DataTypes.StringType, None),
ProcedureParameter.optional(1, "table", DataTypes.StringType, None),
ProcedureParameter.optional(2, "path", DataTypes.StringType, None),
ProcedureParameter.optional(3, "timestamp", DataTypes.LongType, None)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("instant", DataTypes.StringType, nullable = true, Metadata.empty)
))
def parameters: Array[ProcedureParameter] = PARAMETERS
def outputType: StructType = OUTPUT_TYPE
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
val operation = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String].toLowerCase
val tableName = getArgValueOrDefault(args, PARAMETERS(1))
val tablePath = getArgValueOrDefault(args, PARAMETERS(2))
val instantTimestamp = getArgValueOrDefault(args, PARAMETERS(3))
val basePath = getBasePath(tableName, tablePath)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, Map.empty)
operation match {
case "schedule" =>
val instantTime = instantTimestamp.map(_.toString).getOrElse(HoodieActiveTimeline.createNewInstantTime)
if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) {
Seq(Row(instantTime))
} else {
Seq.empty[Row]
}
case "run" =>
// Do compaction
val timeLine = metaClient.getActiveTimeline
val pendingCompactionInstants = timeLine.getWriteTimeline.getInstants.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION)
.map(_.getTimestamp)
.toSeq.sortBy(f => f)
val willCompactionInstants = if (instantTimestamp.isEmpty) {
if (pendingCompactionInstants.nonEmpty) {
pendingCompactionInstants
} else { // If there are no pending compaction, schedule to generate one.
// CompactionHoodiePathCommand will return instanceTime for SCHEDULE.
val instantTime = HoodieActiveTimeline.createNewInstantTime()
if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) {
Seq(instantTime)
} else {
Seq.empty
}
}
} else {
// Check if the compaction timestamp has exists in the pending compaction
if (pendingCompactionInstants.contains(instantTimestamp.get.toString)) {
Seq(instantTimestamp.get.toString)
} else {
throw new IllegalArgumentException(s"Compaction instant: ${instantTimestamp.get} is not found in " +
s"$basePath, Available pending compaction instants are: ${pendingCompactionInstants.mkString(",")} ")
}
}
if (willCompactionInstants.isEmpty) {
logInfo(s"No need to compaction on $basePath")
Seq.empty[Row]
} else {
logInfo(s"Run compaction at instants: [${willCompactionInstants.mkString(",")}] on $basePath")
val timer = new HoodieTimer
timer.startTimer()
willCompactionInstants.foreach { compactionInstant =>
val writeResponse = client.compact(compactionInstant)
handleResponse(writeResponse.getCommitMetadata.get())
client.commitCompaction(compactionInstant, writeResponse.getCommitMetadata.get(), HOption.empty())
}
logInfo(s"Finish Run compaction at instants: [${willCompactionInstants.mkString(",")}]," +
s" spend: ${timer.endTimer()}ms")
Seq.empty[Row]
}
case _ => throw new UnsupportedOperationException(s"Unsupported compaction operation: $operation")
}
}
private def handleResponse(metadata: HoodieCommitMetadata): Unit = {
// Handle error
val writeStats = metadata.getPartitionToWriteStats.entrySet().flatMap(e => e.getValue).toList
val errorsCount = writeStats.map(state => state.getTotalWriteErrors).sum
if (errorsCount > 0) {
throw new HoodieException(s" Found $errorsCount when writing record")
}
}
override def build: Procedure = new RunCompactionProcedure()
}
object RunCompactionProcedure {
val NAME = "run_compaction"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new RunCompactionProcedure
}
}

View File

@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command.procedures
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.common.util.CompactionUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import java.util.function.Supplier
import scala.collection.JavaConverters._
class ShowCompactionProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport with Logging {
/**
* SHOW COMPACTION ON tableIdentifier (LIMIT limit = INTEGER_VALUE)?
* SHOW COMPACTION ON path = STRING (LIMIT limit = INTEGER_VALUE)?
*/
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "path", DataTypes.StringType, None),
ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 20)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("timestamp", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("action", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("size", DataTypes.IntegerType, nullable = true, Metadata.empty)
))
def parameters: Array[ProcedureParameter] = PARAMETERS
def outputType: StructType = OUTPUT_TYPE
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
val limit = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int]
val basePath: String = getBasePath(tableName, tablePath)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
assert(metaClient.getTableType == HoodieTableType.MERGE_ON_READ,
s"Cannot show compaction on a Non Merge On Read table.")
val timeLine = metaClient.getActiveTimeline
val compactionInstants = timeLine.getInstants.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION)
.toSeq
.sortBy(f => f.getTimestamp)
.reverse
.take(limit)
val compactionPlans = compactionInstants.map(instant =>
(instant, CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp)))
compactionPlans.map { case (instant, plan) =>
Row(instant.getTimestamp, instant.getAction, plan.getOperations.size())
}
}
override def build: Procedure = new ShowCompactionProcedure()
}
object ShowCompactionProcedure {
val NAME = "show_compaction"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new ShowCompactionProcedure
}
}

View File

@@ -23,11 +23,12 @@ import org.apache.hadoop.fs.Path
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.{HoodieCLIUtils, HoodieDataSourceHelpers}
import org.apache.spark.sql.hudi.TestHoodieSqlBase
import scala.collection.JavaConverters.asScalaIteratorConverter
class TestRunClusteringProcedure extends TestHoodieSqlBase {
class TestClusteringProcedure extends TestHoodieSqlBase {
test("Test Call run_clustering Procedure By Table") {
withTempDir { tmp =>

View File

@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.spark.sql.hudi.procedure
import org.apache.spark.sql.hudi.TestHoodieSqlBase
class TestCompactionProcedure extends TestHoodieSqlBase {
test("Test Call run_compaction Procedure by Table") {
withTempDir { tmp =>
val tableName = generateTableName
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}'
| tblproperties (
| primaryKey ='id',
| type = 'mor',
| preCombineField = 'ts'
| )
""".stripMargin)
spark.sql("set hoodie.parquet.max.file.size = 10000")
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)")
spark.sql(s"update $tableName set price = 11 where id = 1")
spark.sql(s"call run_compaction(op => 'schedule', table => '$tableName')")
spark.sql(s"update $tableName set price = 12 where id = 2")
spark.sql(s"call run_compaction('schedule', '$tableName')")
val compactionRows = spark.sql(s"call show_compaction(table => '$tableName', limit => 10)").collect()
val timestamps = compactionRows.map(_.getString(0))
assertResult(2)(timestamps.length)
spark.sql(s"call run_compaction(op => 'run', table => '$tableName', timestamp => ${timestamps(1)})")
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 11.0, 1000),
Seq(2, "a2", 12.0, 1000),
Seq(3, "a3", 10.0, 1000),
Seq(4, "a4", 10.0, 1000)
)
assertResult(1)(spark.sql(s"call show_compaction('$tableName')").collect().length)
spark.sql(s"call run_compaction(op => 'run', table => '$tableName', timestamp => ${timestamps(0)})")
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 11.0, 1000),
Seq(2, "a2", 12.0, 1000),
Seq(3, "a3", 10.0, 1000),
Seq(4, "a4", 10.0, 1000)
)
assertResult(0)(spark.sql(s"call show_compaction(table => '$tableName')").collect().length)
}
}
test("Test Call run_compaction Procedure by Path") {
withTempDir { tmp =>
val tableName = generateTableName
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}'
| tblproperties (
| primaryKey ='id',
| type = 'mor',
| preCombineField = 'ts'
| )
""".stripMargin)
spark.sql("set hoodie.parquet.max.file.size = 10000")
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
spark.sql(s"update $tableName set price = 11 where id = 1")
spark.sql(s"call run_compaction(op => 'run', path => '${tmp.getCanonicalPath}')")
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 11.0, 1000),
Seq(2, "a2", 10.0, 1000),
Seq(3, "a3", 10.0, 1000)
)
assertResult(0)(spark.sql(s"call show_compaction(path => '${tmp.getCanonicalPath}')").collect().length)
// schedule compaction first
spark.sql(s"update $tableName set price = 12 where id = 1")
spark.sql(s"call run_compaction(op=> 'schedule', path => '${tmp.getCanonicalPath}')")
// schedule compaction second
spark.sql(s"update $tableName set price = 12 where id = 2")
spark.sql(s"call run_compaction(op => 'schedule', path => '${tmp.getCanonicalPath}')")
// show compaction
assertResult(2)(spark.sql(s"call show_compaction(path => '${tmp.getCanonicalPath}')").collect().length)
// run compaction for all the scheduled compaction
spark.sql(s"call run_compaction(op => 'run', path => '${tmp.getCanonicalPath}')")
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 12.0, 1000),
Seq(2, "a2", 12.0, 1000),
Seq(3, "a3", 10.0, 1000)
)
assertResult(0)(spark.sql(s"call show_compaction(path => '${tmp.getCanonicalPath}')").collect().length)
checkException(s"call run_compaction(op => 'run', path => '${tmp.getCanonicalPath}', timestamp => 12345L)")(
s"Compaction instant: 12345 is not found in ${tmp.getCanonicalPath}, Available pending compaction instants are: "
)
}
}
}