1
0

[HUDI-2482] support 'drop partition' sql (#3754)

This commit is contained in:
Yann Byron
2021-10-19 22:09:53 +08:00
committed by GitHub
parent 60d4cb505a
commit 1e2be85a0f
5 changed files with 368 additions and 42 deletions

View File

@@ -31,6 +31,7 @@ import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.spark.SPARK_VERSION
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -92,7 +93,45 @@ object HoodieSqlUtils extends SparkAdapterSupport {
properties.putAll((spark.sessionState.conf.getAllConfs ++ table.storage.properties).asJava)
HoodieMetadataConfig.newBuilder.fromProperties(properties).build()
}
FSUtils.getAllPartitionPaths(sparkEngine, metadataConfig, HoodieSqlUtils.getTableLocation(table, spark)).asScala
FSUtils.getAllPartitionPaths(sparkEngine, metadataConfig, getTableLocation(table, spark)).asScala
}
/**
* This method is used to compatible with the old non-hive-styled partition table.
* By default we enable the "hoodie.datasource.write.hive_style_partitioning"
* when writing data to hudi table by spark sql by default.
* If the exist table is a non-hive-styled partitioned table, we should
* disable the "hoodie.datasource.write.hive_style_partitioning" when
* merge or update the table. Or else, we will get an incorrect merge result
* as the partition path mismatch.
*/
def isHiveStyledPartitioning(partitionPaths: Seq[String], table: CatalogTable): Boolean = {
if (table.partitionColumnNames.nonEmpty) {
val isHiveStylePartitionPath = (path: String) => {
val fragments = path.split("/")
if (fragments.size != table.partitionColumnNames.size) {
false
} else {
fragments.zip(table.partitionColumnNames).forall {
case (pathFragment, partitionColumn) => pathFragment.startsWith(s"$partitionColumn=")
}
}
}
partitionPaths.forall(isHiveStylePartitionPath)
} else {
true
}
}
/**
* Determine whether URL encoding is enabled
*/
def isUrlEncodeEnabled(partitionPaths: Seq[String], table: CatalogTable): Boolean = {
if (table.partitionColumnNames.nonEmpty) {
partitionPaths.forall(partitionPath => partitionPath.split("/").length == table.partitionColumnNames.size)
} else {
false
}
}
private def tripAlias(plan: LogicalPlan): LogicalPlan = {

View File

@@ -405,6 +405,11 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic
case CreateDataSourceTableCommand(table, ignoreIfExists)
if isHoodieTable(table) =>
CreateHoodieTableCommand(table, ignoreIfExists)
// Rewrite the AlterTableDropPartitionCommand to AlterHoodieTableDropPartitionCommand
case AlterTableDropPartitionCommand(tableName, specs, _, _, _)
if isHoodieTable(tableName, sparkSession) =>
AlterHoodieTableDropPartitionCommand(tableName, specs)
// Rewrite the AlterTableRenameCommand to AlterHoodieTableRenameCommand
// Rewrite the AlterTableAddColumnsCommand to AlterHoodieTableAddColumnsCommand
case AlterTableAddColumnsCommand(tableId, colsToAdd)
if isHoodieTable(tableId, sparkSession) =>

View File

@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command
import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils}
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand}
import org.apache.spark.sql.hudi.HoodieSqlUtils._
case class AlterHoodieTableDropPartitionCommand(
tableIdentifier: TableIdentifier,
specs: Seq[TablePartitionSpec])
extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableIdentifier)
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
val path = getTableLocation(table, sparkSession)
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder().setBasePath(path).setConf(hadoopConf).build()
val partitionColumns = metaClient.getTableConfig.getPartitionFields
val normalizedSpecs: Seq[Map[String, String]] = specs.map { spec =>
normalizePartitionSpec(
spec,
partitionColumns.get(),
table.identifier.quotedString,
sparkSession.sessionState.conf.resolver)
}
val parameters = buildHoodieConfig(sparkSession, path, partitionColumns.get, normalizedSpecs)
HoodieSparkSqlWriter.write(
sparkSession.sqlContext,
SaveMode.Append,
parameters,
sparkSession.emptyDataFrame)
Seq.empty[Row]
}
private def buildHoodieConfig(
sparkSession: SparkSession,
path: String,
partitionColumns: Seq[String],
normalizedSpecs: Seq[Map[String, String]]): Map[String, String] = {
val table = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
val allPartitionPaths = getAllPartitionPaths(sparkSession, table)
val enableHiveStylePartitioning = isHiveStyledPartitioning(allPartitionPaths, table)
val enableEncodeUrl = isUrlEncodeEnabled(allPartitionPaths, table)
val partitionsToDelete = normalizedSpecs.map { spec =>
partitionColumns.map{ partitionColumn =>
val encodedPartitionValue = if (enableEncodeUrl) {
PartitionPathEncodeUtils.escapePathName(spec(partitionColumn))
} else {
spec(partitionColumn)
}
if (enableHiveStylePartitioning) {
partitionColumn + "=" + encodedPartitionValue
} else {
encodedPartitionValue
}
}.mkString("/")
}.mkString(",")
val metaClient = HoodieTableMetaClient.builder()
.setBasePath(path)
.setConf(sparkSession.sessionState.newHadoopConf)
.build()
val tableConfig = metaClient.getTableConfig
val optParams = withSparkConf(sparkSession, table.storage.properties) {
Map(
"path" -> path,
TBL_NAME.key -> tableIdentifier.table,
TABLE_TYPE.key -> tableConfig.getTableType.name,
OPERATION.key -> DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL,
PARTITIONS_TO_DELETE.key -> partitionsToDelete,
RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp,
PRECOMBINE_FIELD.key -> tableConfig.getPreCombineField,
PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp
)
}
val parameters = HoodieWriterUtils.parametersWithWriteDefaults(optParams)
val translatedOptions = DataSourceWriteOptions.translateSqlOptions(parameters)
translatedOptions
}
def normalizePartitionSpec[T](
partitionSpec: Map[String, T],
partColNames: Seq[String],
tblName: String,
resolver: Resolver): Map[String, T] = {
val normalizedPartSpec = partitionSpec.toSeq.map { case (key, value) =>
val normalizedKey = partColNames.find(resolver(_, key)).getOrElse {
throw new AnalysisException(s"$key is not a valid partition column in table $tblName.")
}
normalizedKey -> value
}
if (normalizedPartSpec.size < partColNames.size) {
throw new AnalysisException(
"All partition columns need to be specified for Hoodie's dropping partition")
}
val lowerPartColNames = partColNames.map(_.toLowerCase)
if (lowerPartColNames.distinct.length != lowerPartColNames.length) {
val duplicateColumns = lowerPartColNames.groupBy(identity).collect {
case (x, ys) if ys.length > 1 => s"`$x`"
}
throw new AnalysisException(
s"Found duplicate column(s) in the partition schema: ${duplicateColumns.mkString(", ")}")
}
normalizedPartSpec.toMap
}
}

View File

@@ -100,12 +100,12 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
var upgrateConfig = Map.empty[String, String]
// If this is a non-hive-styled partition table, disable the hive style config.
// (By default this config is enable for spark sql)
upgrateConfig = if (isNotHiveStyledPartitionTable(allPartitionPaths, table)) {
upgrateConfig = if (!isHiveStyledPartitioning(allPartitionPaths, table)) {
upgrateConfig + (DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "false")
} else {
upgrateConfig
}
upgrateConfig = if (isUrlEncodeDisable(allPartitionPaths, table)) {
upgrateConfig = if (!isUrlEncodeEnabled(allPartitionPaths, table)) {
upgrateConfig + (DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key -> "false")
} else {
upgrateConfig
@@ -314,45 +314,6 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
s"'${HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_MOR}'")
}
}
/**
* This method is used to compatible with the old non-hive-styled partition table.
* By default we enable the "hoodie.datasource.write.hive_style_partitioning"
* when writing data to hudi table by spark sql by default.
* If the exist table is a non-hive-styled partitioned table, we should
* disable the "hoodie.datasource.write.hive_style_partitioning" when
* merge or update the table. Or else, we will get an incorrect merge result
* as the partition path mismatch.
*/
private def isNotHiveStyledPartitionTable(partitionPaths: Seq[String], table: CatalogTable): Boolean = {
if (table.partitionColumnNames.nonEmpty) {
val isHiveStylePartitionPath = (path: String) => {
val fragments = path.split("/")
if (fragments.size != table.partitionColumnNames.size) {
false
} else {
fragments.zip(table.partitionColumnNames).forall {
case (pathFragment, partitionColumn) => pathFragment.startsWith(s"$partitionColumn=")
}
}
}
!partitionPaths.forall(isHiveStylePartitionPath)
} else {
false
}
}
/**
* If this table has disable the url encode, spark sql should also disable it when writing to the table.
*/
private def isUrlEncodeDisable(partitionPaths: Seq[String], table: CatalogTable): Boolean = {
if (table.partitionColumnNames.nonEmpty) {
!partitionPaths.forall(partitionPath => partitionPath.split("/").length == table.partitionColumnNames.size)
} else {
false
}
}
}
object CreateHoodieTableCommand extends Logging {

View File

@@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator}
import org.apache.spark.sql.SaveMode
import scala.util.control.NonFatal
class TestAlterTableDropPartition extends TestHoodieSqlBase {
test("Drop non-partitioned table") {
val tableName = generateTableName
// create table
spark.sql(
s"""
| create table $tableName (
| id bigint,
| name string,
| ts string,
| dt string
| )
| using hudi
| options (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
|""".stripMargin)
// insert data
spark.sql(s"""insert into $tableName values (1, "z3", "v1", "2021-10-01"), (2, "l4", "v1", "2021-10-02")""")
checkException(s"alter table $tableName drop partition (dt='2021-10-01')")(
s"dt is not a valid partition column in table `default`.`${tableName}`.;")
}
Seq(false, true).foreach { urlencode =>
test(s"Drop single-partition table' partitions, urlencode: $urlencode") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
import spark.implicits._
val df = Seq((1, "z3", "v1", "2021/10/01"), (2, "l4", "v1", "2021/10/02"))
.toDF("id", "name", "ts", "dt")
df.write.format("hudi")
.option(HoodieWriteConfig.TBL_NAME.key, tableName)
.option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL)
.option(RECORDKEY_FIELD.key, "id")
.option(PRECOMBINE_FIELD.key, "ts")
.option(PARTITIONPATH_FIELD.key, "dt")
.option(URL_ENCODE_PARTITIONING.key(), urlencode)
.option(KEYGENERATOR_CLASS_NAME.key, classOf[SimpleKeyGenerator].getName)
.option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1")
.option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1")
.mode(SaveMode.Overwrite)
.save(tablePath)
// register meta to spark catalog by creating table
spark.sql(
s"""
|create table $tableName using hudi
| options (
| primaryKey = 'id',
| preCombineField = 'ts'
|)
|partitioned by (dt)
|location '$tablePath'
|""".stripMargin)
// drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (dt='2021/10/01')")
checkAnswer(s"select dt from $tableName") (Seq(s"2021/10/02"))
}
}
}
test("Drop single-partition table' partitions created by sql") {
val tableName = generateTableName
// create table
spark.sql(
s"""
| create table $tableName (
| id bigint,
| name string,
| ts string,
| dt string
| )
| using hudi
| options (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
| partitioned by (dt)
|""".stripMargin)
// insert data
spark.sql(s"""insert into $tableName values (1, "z3", "v1", "2021-10-01"), (2, "l4", "v1", "2021-10-02")""")
// specify duplicate partition columns
try {
spark.sql(s"alter table $tableName drop partition (dt='2021-10-01', dt='2021-10-02')")
} catch {
case NonFatal(e) =>
assert(e.getMessage.contains("Found duplicate keys 'dt'"))
}
// drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (dt='2021-10-01')")
checkAnswer(s"select id, name, ts, dt from $tableName") (Seq(2, "l4", "v1", "2021-10-02"))
}
Seq(false, true).foreach { hiveStyle =>
test(s"Drop multi-level partitioned table's partitions, isHiveStylePartitioning: $hiveStyle") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
import spark.implicits._
val df = Seq((1, "z3", "v1", "2021", "10", "01"), (2, "l4", "v1", "2021", "10","02"))
.toDF("id", "name", "ts", "year", "month", "day")
df.write.format("hudi")
.option(HoodieWriteConfig.TBL_NAME.key, tableName)
.option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL)
.option(RECORDKEY_FIELD.key, "id")
.option(PRECOMBINE_FIELD.key, "ts")
.option(PARTITIONPATH_FIELD.key, "year,month,day")
.option(HIVE_STYLE_PARTITIONING.key, hiveStyle)
.option(KEYGENERATOR_CLASS_NAME.key, classOf[ComplexKeyGenerator].getName)
.option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1")
.option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1")
.mode(SaveMode.Overwrite)
.save(tablePath)
// register meta to spark catalog by creating table
spark.sql(
s"""
|create table $tableName using hudi
| options (
| primaryKey = 'id',
| preCombineField = 'ts'
|)
|partitioned by (year, month, day)
|location '$tablePath'
|""".stripMargin)
// not specified all partition column
checkException(s"alter table $tableName drop partition (year='2021', month='10')")(
"All partition columns need to be specified for Hoodie's dropping partition;"
)
// drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (year='2021', month='10', day='01')")
checkAnswer(s"select id, name, ts, year, month, day from $tableName")(
Seq(2, "l4", "v1", "2021", "10", "02")
)
}
}
}
}