1
0

[HUDI-3136] Fix merge/insert/show partitions error on Spark3.2 (#4490)

This commit is contained in:
Yann Byron
2022-01-02 18:42:10 +08:00
committed by GitHub
parent 188d0338c4
commit 1622b52c9c
4 changed files with 24 additions and 8 deletions

View File

@@ -254,7 +254,9 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
case action: MergeAction =>
// SPARK-34962: use UpdateStarAction as the explicit representation of * in UpdateAction.
// So match and covert this in Spark3.2 env.
UpdateAction(action.condition, Seq.empty)
val (resolvedCondition, resolvedAssignments) =
resolveConditionAssignments(action.condition, Seq.empty)
UpdateAction(resolvedCondition, resolvedAssignments)
}
// Resolve the notMatchedActions
val resolvedNotMatchedActions = notMatchedActions.map {
@@ -265,7 +267,9 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
case action: MergeAction =>
// SPARK-34962: use InsertStarAction as the explicit representation of * in InsertAction.
// So match and covert this in Spark3.2 env.
InsertAction(action.condition, Seq.empty)
val (resolvedCondition, resolvedAssignments) =
resolveConditionAssignments(action.condition, Seq.empty)
InsertAction(resolvedCondition, resolvedAssignments)
}
// Return the resolved MergeIntoTable
MergeIntoTable(target, resolvedSource, resolvedMergeCondition,

View File

@@ -21,7 +21,7 @@ import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, IndexedRecord}
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord}
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord, OverwriteWithLatestAvroPayload}
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
@@ -254,7 +254,7 @@ object InsertIntoHoodieTableCommand extends Logging {
// on reading.
classOf[ValidateDuplicateKeyPayload].getCanonicalName
} else {
classOf[DefaultHoodieRecordPayload].getCanonicalName
classOf[OverwriteWithLatestAvroPayload].getCanonicalName
}
logInfo(s"insert statement use write operation type: $operation, payloadClass: $payloadClassName")

View File

@@ -25,6 +25,8 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.execution.command.TruncateTableCommand
import scala.util.control.NonFatal
/**
* Command for truncate hudi table.
*/
@@ -36,10 +38,16 @@ class TruncateHoodieTableCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)
val properties = hoodieCatalogTable.tableConfig.getProps
val tablePath = hoodieCatalogTable.tableLocation
// Delete all data in the table directory
super.run(sparkSession)
try {
// Delete all data in the table directory
super.run(sparkSession)
} catch {
// TruncateTableCommand will delete the related directories first, and then refresh the table.
// It will fail when refresh table, because the hudi meta directory(.hoodie) has been deleted at the first step.
// So here ignore this failure, and refresh table later.
case NonFatal(_) =>
}
// If we have not specified the partition, truncate will delete all the data in the table path
// include the hoodi.properties. In this case we should reInit the table.
@@ -50,6 +58,10 @@ class TruncateHoodieTableCommand(
.fromProperties(properties)
.initTable(hadoopConf, hoodieCatalogTable.tableLocation)
}
// After deleting the data, refresh the table to make sure we don't keep around a stale
// file relation in the metastore cache and cached table data in the cache manager.
sparkSession.catalog.refreshTable(hoodieCatalogTable.table.identifier.quotedString)
Seq.empty[Row]
}
}

View File

@@ -154,7 +154,7 @@ class TestShowPartitions extends TestHoodieSqlBase {
Seq("year=2021/month=02/day=default"),
Seq("year=2021/month=02/day=01")
)
checkAnswer(s"show partitions $tableName partition(day=01)")(
checkAnswer(s"show partitions $tableName partition(day='01')")(
Seq("year=2021/month=02/day=01"),
Seq("year=2021/month=default/day=01"),
Seq("year=2021/month=01/day=01"),