From 1e2be85a0f1b290fa568cfae68f4219ac2b5f210 Mon Sep 17 00:00:00 2001 From: Yann Byron Date: Tue, 19 Oct 2021 22:09:53 +0800 Subject: [PATCH] [HUDI-2482] support 'drop partition' sql (#3754) --- .../spark/sql/hudi/HoodieSqlUtils.scala | 41 +++- .../sql/hudi/analysis/HoodieAnalysis.scala | 5 + ...AlterHoodieTableDropPartitionCommand.scala | 142 ++++++++++++++ .../command/CreateHoodieTableCommand.scala | 43 +---- .../hudi/TestAlterTableDropPartition.scala | 179 ++++++++++++++++++ 5 files changed, 368 insertions(+), 42 deletions(-) create mode 100644 hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala index 318577b81..182d891dd 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala @@ -31,6 +31,7 @@ import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.spark.SPARK_VERSION +import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.{Column, DataFrame, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier @@ -92,7 +93,45 @@ object HoodieSqlUtils extends SparkAdapterSupport { properties.putAll((spark.sessionState.conf.getAllConfs ++ table.storage.properties).asJava) HoodieMetadataConfig.newBuilder.fromProperties(properties).build() } - FSUtils.getAllPartitionPaths(sparkEngine, metadataConfig, HoodieSqlUtils.getTableLocation(table, spark)).asScala + FSUtils.getAllPartitionPaths(sparkEngine, metadataConfig, getTableLocation(table, spark)).asScala + } + + /** + * This method is used to compatible with the old non-hive-styled partition table. + * By default we enable the "hoodie.datasource.write.hive_style_partitioning" + * when writing data to hudi table by spark sql by default. + * If the exist table is a non-hive-styled partitioned table, we should + * disable the "hoodie.datasource.write.hive_style_partitioning" when + * merge or update the table. Or else, we will get an incorrect merge result + * as the partition path mismatch. + */ + def isHiveStyledPartitioning(partitionPaths: Seq[String], table: CatalogTable): Boolean = { + if (table.partitionColumnNames.nonEmpty) { + val isHiveStylePartitionPath = (path: String) => { + val fragments = path.split("/") + if (fragments.size != table.partitionColumnNames.size) { + false + } else { + fragments.zip(table.partitionColumnNames).forall { + case (pathFragment, partitionColumn) => pathFragment.startsWith(s"$partitionColumn=") + } + } + } + partitionPaths.forall(isHiveStylePartitionPath) + } else { + true + } + } + + /** + * Determine whether URL encoding is enabled + */ + def isUrlEncodeEnabled(partitionPaths: Seq[String], table: CatalogTable): Boolean = { + if (table.partitionColumnNames.nonEmpty) { + partitionPaths.forall(partitionPath => partitionPath.split("/").length == table.partitionColumnNames.size) + } else { + false + } } private def tripAlias(plan: LogicalPlan): LogicalPlan = { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala index 3a6bedfbd..09e0314ff 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala @@ -405,6 +405,11 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic case CreateDataSourceTableCommand(table, ignoreIfExists) if isHoodieTable(table) => CreateHoodieTableCommand(table, ignoreIfExists) + // Rewrite the AlterTableDropPartitionCommand to AlterHoodieTableDropPartitionCommand + case AlterTableDropPartitionCommand(tableName, specs, _, _, _) + if isHoodieTable(tableName, sparkSession) => + AlterHoodieTableDropPartitionCommand(tableName, specs) + // Rewrite the AlterTableRenameCommand to AlterHoodieTableRenameCommand // Rewrite the AlterTableAddColumnsCommand to AlterHoodieTableAddColumnsCommand case AlterTableAddColumnsCommand(tableId, colsToAdd) if isHoodieTable(tableId, sparkSession) => diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala new file mode 100644 index 000000000..7c4d45649 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command + +import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils} +import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.common.util.PartitionPathEncodeUtils +import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME +import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.Resolver +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand} +import org.apache.spark.sql.hudi.HoodieSqlUtils._ + +case class AlterHoodieTableDropPartitionCommand( + tableIdentifier: TableIdentifier, + specs: Seq[TablePartitionSpec]) +extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val catalog = sparkSession.sessionState.catalog + val table = catalog.getTableMetadata(tableIdentifier) + DDLUtils.verifyAlterTableType(catalog, table, isView = false) + + val path = getTableLocation(table, sparkSession) + val hadoopConf = sparkSession.sessionState.newHadoopConf() + val metaClient = HoodieTableMetaClient.builder().setBasePath(path).setConf(hadoopConf).build() + val partitionColumns = metaClient.getTableConfig.getPartitionFields + val normalizedSpecs: Seq[Map[String, String]] = specs.map { spec => + normalizePartitionSpec( + spec, + partitionColumns.get(), + table.identifier.quotedString, + sparkSession.sessionState.conf.resolver) + } + + val parameters = buildHoodieConfig(sparkSession, path, partitionColumns.get, normalizedSpecs) + + HoodieSparkSqlWriter.write( + sparkSession.sqlContext, + SaveMode.Append, + parameters, + sparkSession.emptyDataFrame) + + Seq.empty[Row] + } + + private def buildHoodieConfig( + sparkSession: SparkSession, + path: String, + partitionColumns: Seq[String], + normalizedSpecs: Seq[Map[String, String]]): Map[String, String] = { + val table = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier) + val allPartitionPaths = getAllPartitionPaths(sparkSession, table) + val enableHiveStylePartitioning = isHiveStyledPartitioning(allPartitionPaths, table) + val enableEncodeUrl = isUrlEncodeEnabled(allPartitionPaths, table) + val partitionsToDelete = normalizedSpecs.map { spec => + partitionColumns.map{ partitionColumn => + val encodedPartitionValue = if (enableEncodeUrl) { + PartitionPathEncodeUtils.escapePathName(spec(partitionColumn)) + } else { + spec(partitionColumn) + } + if (enableHiveStylePartitioning) { + partitionColumn + "=" + encodedPartitionValue + } else { + encodedPartitionValue + } + }.mkString("/") + }.mkString(",") + + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(path) + .setConf(sparkSession.sessionState.newHadoopConf) + .build() + val tableConfig = metaClient.getTableConfig + + val optParams = withSparkConf(sparkSession, table.storage.properties) { + Map( + "path" -> path, + TBL_NAME.key -> tableIdentifier.table, + TABLE_TYPE.key -> tableConfig.getTableType.name, + OPERATION.key -> DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL, + PARTITIONS_TO_DELETE.key -> partitionsToDelete, + RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp, + PRECOMBINE_FIELD.key -> tableConfig.getPreCombineField, + PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp + ) + } + + val parameters = HoodieWriterUtils.parametersWithWriteDefaults(optParams) + val translatedOptions = DataSourceWriteOptions.translateSqlOptions(parameters) + translatedOptions + } + + def normalizePartitionSpec[T]( + partitionSpec: Map[String, T], + partColNames: Seq[String], + tblName: String, + resolver: Resolver): Map[String, T] = { + val normalizedPartSpec = partitionSpec.toSeq.map { case (key, value) => + val normalizedKey = partColNames.find(resolver(_, key)).getOrElse { + throw new AnalysisException(s"$key is not a valid partition column in table $tblName.") + } + normalizedKey -> value + } + + if (normalizedPartSpec.size < partColNames.size) { + throw new AnalysisException( + "All partition columns need to be specified for Hoodie's dropping partition") + } + + val lowerPartColNames = partColNames.map(_.toLowerCase) + if (lowerPartColNames.distinct.length != lowerPartColNames.length) { + val duplicateColumns = lowerPartColNames.groupBy(identity).collect { + case (x, ys) if ys.length > 1 => s"`$x`" + } + throw new AnalysisException( + s"Found duplicate column(s) in the partition schema: ${duplicateColumns.mkString(", ")}") + } + + normalizedPartSpec.toMap + } + +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala index 312986302..ec1f74624 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala @@ -100,12 +100,12 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean var upgrateConfig = Map.empty[String, String] // If this is a non-hive-styled partition table, disable the hive style config. // (By default this config is enable for spark sql) - upgrateConfig = if (isNotHiveStyledPartitionTable(allPartitionPaths, table)) { + upgrateConfig = if (!isHiveStyledPartitioning(allPartitionPaths, table)) { upgrateConfig + (DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "false") } else { upgrateConfig } - upgrateConfig = if (isUrlEncodeDisable(allPartitionPaths, table)) { + upgrateConfig = if (!isUrlEncodeEnabled(allPartitionPaths, table)) { upgrateConfig + (DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key -> "false") } else { upgrateConfig @@ -314,45 +314,6 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean s"'${HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_MOR}'") } } - - /** - * This method is used to compatible with the old non-hive-styled partition table. - * By default we enable the "hoodie.datasource.write.hive_style_partitioning" - * when writing data to hudi table by spark sql by default. - * If the exist table is a non-hive-styled partitioned table, we should - * disable the "hoodie.datasource.write.hive_style_partitioning" when - * merge or update the table. Or else, we will get an incorrect merge result - * as the partition path mismatch. - */ - private def isNotHiveStyledPartitionTable(partitionPaths: Seq[String], table: CatalogTable): Boolean = { - if (table.partitionColumnNames.nonEmpty) { - val isHiveStylePartitionPath = (path: String) => { - val fragments = path.split("/") - if (fragments.size != table.partitionColumnNames.size) { - false - } else { - fragments.zip(table.partitionColumnNames).forall { - case (pathFragment, partitionColumn) => pathFragment.startsWith(s"$partitionColumn=") - } - } - } - !partitionPaths.forall(isHiveStylePartitionPath) - } else { - false - } - } - - /** - * If this table has disable the url encode, spark sql should also disable it when writing to the table. - */ - private def isUrlEncodeDisable(partitionPaths: Seq[String], table: CatalogTable): Boolean = { - if (table.partitionColumnNames.nonEmpty) { - !partitionPaths.forall(partitionPath => partitionPath.split("/").length == table.partitionColumnNames.size) - } else { - false - } - } - } object CreateHoodieTableCommand extends Logging { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala new file mode 100644 index 000000000..1315857ae --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator} +import org.apache.spark.sql.SaveMode + +import scala.util.control.NonFatal + +class TestAlterTableDropPartition extends TestHoodieSqlBase { + + test("Drop non-partitioned table") { + val tableName = generateTableName + // create table + spark.sql( + s""" + | create table $tableName ( + | id bigint, + | name string, + | ts string, + | dt string + | ) + | using hudi + | options ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + |""".stripMargin) + // insert data + spark.sql(s"""insert into $tableName values (1, "z3", "v1", "2021-10-01"), (2, "l4", "v1", "2021-10-02")""") + + checkException(s"alter table $tableName drop partition (dt='2021-10-01')")( + s"dt is not a valid partition column in table `default`.`${tableName}`.;") + } + + Seq(false, true).foreach { urlencode => + test(s"Drop single-partition table' partitions, urlencode: $urlencode") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + + import spark.implicits._ + val df = Seq((1, "z3", "v1", "2021/10/01"), (2, "l4", "v1", "2021/10/02")) + .toDF("id", "name", "ts", "dt") + + df.write.format("hudi") + .option(HoodieWriteConfig.TBL_NAME.key, tableName) + .option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL) + .option(RECORDKEY_FIELD.key, "id") + .option(PRECOMBINE_FIELD.key, "ts") + .option(PARTITIONPATH_FIELD.key, "dt") + .option(URL_ENCODE_PARTITIONING.key(), urlencode) + .option(KEYGENERATOR_CLASS_NAME.key, classOf[SimpleKeyGenerator].getName) + .option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1") + .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1") + .mode(SaveMode.Overwrite) + .save(tablePath) + + // register meta to spark catalog by creating table + spark.sql( + s""" + |create table $tableName using hudi + | options ( + | primaryKey = 'id', + | preCombineField = 'ts' + |) + |partitioned by (dt) + |location '$tablePath' + |""".stripMargin) + + // drop 2021-10-01 partition + spark.sql(s"alter table $tableName drop partition (dt='2021/10/01')") + + checkAnswer(s"select dt from $tableName") (Seq(s"2021/10/02")) + } + } + } + + test("Drop single-partition table' partitions created by sql") { + val tableName = generateTableName + // create table + spark.sql( + s""" + | create table $tableName ( + | id bigint, + | name string, + | ts string, + | dt string + | ) + | using hudi + | options ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + | partitioned by (dt) + |""".stripMargin) + // insert data + spark.sql(s"""insert into $tableName values (1, "z3", "v1", "2021-10-01"), (2, "l4", "v1", "2021-10-02")""") + + // specify duplicate partition columns + try { + spark.sql(s"alter table $tableName drop partition (dt='2021-10-01', dt='2021-10-02')") + } catch { + case NonFatal(e) => + assert(e.getMessage.contains("Found duplicate keys 'dt'")) + } + + // drop 2021-10-01 partition + spark.sql(s"alter table $tableName drop partition (dt='2021-10-01')") + + checkAnswer(s"select id, name, ts, dt from $tableName") (Seq(2, "l4", "v1", "2021-10-02")) + } + + Seq(false, true).foreach { hiveStyle => + test(s"Drop multi-level partitioned table's partitions, isHiveStylePartitioning: $hiveStyle") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + + import spark.implicits._ + val df = Seq((1, "z3", "v1", "2021", "10", "01"), (2, "l4", "v1", "2021", "10","02")) + .toDF("id", "name", "ts", "year", "month", "day") + + df.write.format("hudi") + .option(HoodieWriteConfig.TBL_NAME.key, tableName) + .option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL) + .option(RECORDKEY_FIELD.key, "id") + .option(PRECOMBINE_FIELD.key, "ts") + .option(PARTITIONPATH_FIELD.key, "year,month,day") + .option(HIVE_STYLE_PARTITIONING.key, hiveStyle) + .option(KEYGENERATOR_CLASS_NAME.key, classOf[ComplexKeyGenerator].getName) + .option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1") + .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1") + .mode(SaveMode.Overwrite) + .save(tablePath) + + // register meta to spark catalog by creating table + spark.sql( + s""" + |create table $tableName using hudi + | options ( + | primaryKey = 'id', + | preCombineField = 'ts' + |) + |partitioned by (year, month, day) + |location '$tablePath' + |""".stripMargin) + + // not specified all partition column + checkException(s"alter table $tableName drop partition (year='2021', month='10')")( + "All partition columns need to be specified for Hoodie's dropping partition;" + ) + // drop 2021-10-01 partition + spark.sql(s"alter table $tableName drop partition (year='2021', month='10', day='01')") + + checkAnswer(s"select id, name, ts, year, month, day from $tableName")( + Seq(2, "l4", "v1", "2021", "10", "02") + ) + } + } + } +}