From 69c0d9e2d000061cd5a22bf4abcd800574c4554d Mon Sep 17 00:00:00 2001 From: pengzhiwei Date: Tue, 22 Jun 2021 22:33:20 +0800 Subject: [PATCH] [HUDI-1883] Support Truncate Table For Hoodie (#3098) --- .../sql/hudi/analysis/HoodieAnalysis.scala | 8 ++- .../command/TruncateHoodieTableCommand.scala | 64 +++++++++++++++++++ .../spark/sql/hudi/TestTruncateTable.scala | 54 ++++++++++++++++ 3 files changed, 124 insertions(+), 2 deletions(-) create mode 100644 hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTruncateTable.scala diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala index bf7b04607..dbd36bee4 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala @@ -29,11 +29,11 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, Literal, Na import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, DeleteFromTable, InsertAction, LogicalPlan, MergeIntoTable, Project, UpdateAction, UpdateTable} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableChangeColumnCommand, AlterTableRenameCommand, CreateDataSourceTableCommand} +import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableChangeColumnCommand, AlterTableRenameCommand, CreateDataSourceTableCommand, TruncateTableCommand} import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation} import org.apache.spark.sql.hudi.HoodieSqlUtils import org.apache.spark.sql.hudi.HoodieSqlUtils._ -import org.apache.spark.sql.hudi.command.{AlterHoodieTableAddColumnsCommand, AlterHoodieTableChangeColumnCommand, AlterHoodieTableRenameCommand, CreateHoodieTableAsSelectCommand, CreateHoodieTableCommand, DeleteHoodieTableCommand, InsertIntoHoodieTableCommand, MergeIntoHoodieTableCommand, UpdateHoodieTableCommand} +import org.apache.spark.sql.hudi.command.{AlterHoodieTableAddColumnsCommand, AlterHoodieTableChangeColumnCommand, AlterHoodieTableRenameCommand, CreateHoodieTableAsSelectCommand, CreateHoodieTableCommand, DeleteHoodieTableCommand, InsertIntoHoodieTableCommand, MergeIntoHoodieTableCommand, TruncateHoodieTableCommand, UpdateHoodieTableCommand} import org.apache.spark.sql.types.StringType object HoodieAnalysis { @@ -320,6 +320,10 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic case AlterTableChangeColumnCommand(tableName, columnName, newColumn) if isHoodieTable(tableName, sparkSession) => AlterHoodieTableChangeColumnCommand(tableName, columnName, newColumn) + // Rewrite TruncateTableCommand to TruncateHoodieTableCommand + case TruncateTableCommand(tableName, partitionSpec) + if isHoodieTable(tableName, sparkSession) => + new TruncateHoodieTableCommand(tableName, partitionSpec) case _ => plan } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala new file mode 100644 index 000000000..2cce88e79 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command + +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.execution.command.TruncateTableCommand +import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation + +/** + * Command for truncate hudi table. + */ +class TruncateHoodieTableCommand( + tableName: TableIdentifier, + partitionSpec: Option[TablePartitionSpec]) + extends TruncateTableCommand(tableName, partitionSpec) { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val table = sparkSession.sessionState.catalog.getTableMetadata(tableName) + val path = getTableLocation(table, sparkSession) + .getOrElse(s"missing location for ${table.identifier}") + val hadoopConf = sparkSession.sessionState.newHadoopConf() + // If we have not specified the partition, truncate will delete all the + // data in the table path include the hoodi.properties. In this case we + // should reInit the table. + val needReInitTable = partitionSpec.isEmpty + + val tableProperties = if (needReInitTable) { + // Create MetaClient + val metaClient = HoodieTableMetaClient.builder().setBasePath(path) + .setConf(hadoopConf).build() + Some(metaClient.getTableConfig.getProperties) + } else { + None + } + // Delete all data in the table directory + super.run(sparkSession) + + if (tableProperties.isDefined) { + // ReInit hoodie.properties + HoodieTableMetaClient.withPropertyBuilder() + .fromProperties(tableProperties.get) + .initTable(hadoopConf, path) + } + Seq.empty[Row] + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTruncateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTruncateTable.scala new file mode 100644 index 000000000..6a0f0a406 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTruncateTable.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +class TestTruncateTable extends TestHoodieSqlBase { + + test("Test Truncate Table") { + Seq("cow", "mor").foreach { tableType => + val tableName = generateTableName + // Create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | options ( + | type = '$tableType', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + // Insert data + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + // Truncate table + spark.sql(s"truncate table $tableName") + checkAnswer(s"select count(1) from $tableName")(Seq(0)) + + // Insert data to the truncated table. + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 10.0, 1000) + ) + } + } +}