1
0

[HUDI-3780] improve drop partitions (#5178)

This commit is contained in:
ForwardXu
2022-04-05 11:52:33 +08:00
committed by GitHub
parent b28f0d6ceb
commit 3449e86989
14 changed files with 216 additions and 160 deletions

View File

@@ -17,20 +17,15 @@
package org.apache.spark.sql.hudi.command
import org.apache.hudi.DataSourceWriteOptions.{OPERATION, _}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.HiveSyncConfig
import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
import org.apache.hudi.SparkAdapterSupport
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends HoodieLeafRunnableCommand
with SparkAdapterSupport {
with SparkAdapterSupport with ProvidesHoodieConfig {
private val table = deleteTable.table
@@ -44,7 +39,9 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Hoodie
if (deleteTable.condition.isDefined) {
df = df.filter(Column(deleteTable.condition.get))
}
val config = buildHoodieConfig(sparkSession)
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableId)
val config = buildHoodieDeleteTableConfig(hoodieCatalogTable, sparkSession)
df.write
.format("hudi")
.mode(SaveMode.Append)
@@ -54,33 +51,4 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Hoodie
logInfo(s"Finish execute delete command for $tableId")
Seq.empty[Row]
}
private def buildHoodieConfig(sparkSession: SparkSession): Map[String, String] = {
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableId)
val path = hoodieCatalogTable.tableLocation
val tableConfig = hoodieCatalogTable.tableConfig
val tableSchema = hoodieCatalogTable.tableSchema
val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase)
val partitionSchema = StructType(tableSchema.filter(f => partitionColumns.contains(f.name)))
assert(hoodieCatalogTable.primaryKeys.nonEmpty,
s"There are no primary key defined in table $tableId, cannot execute delete operator")
withSparkConf(sparkSession, hoodieCatalogTable.catalogProperties) {
Map(
"path" -> path,
RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
TBL_NAME.key -> tableConfig.getTableName,
HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
HiveSyncConfig.HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> "200",
SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL
)
}
}
}

View File

@@ -22,8 +22,7 @@ import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.{HiveSyncConfig, MultiPartKeysValueExtractor}
import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.hudi.hive.HiveSyncConfig
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, SparkAdapterSupport}
import org.apache.spark.sql._
@@ -34,9 +33,9 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeRef
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.hudi.HoodieSqlUtils.getMergeIntoTargetTableId
import org.apache.spark.sql.hudi.SerDeUtils
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
import org.apache.spark.sql.hudi.{ProvidesHoodieConfig, SerDeUtils}
import org.apache.spark.sql.types.{BooleanType, StructType}
import java.util.Base64
@@ -61,7 +60,7 @@ import java.util.Base64
*
*/
case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends HoodieLeafRunnableCommand
with SparkAdapterSupport {
with SparkAdapterSupport with ProvidesHoodieConfig {
private var sparkSession: SparkSession = _
@@ -439,6 +438,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
val targetTableDb = targetTableIdentify.database.getOrElse("default")
val targetTableName = targetTableIdentify.identifier
val path = hoodieCatalogTable.tableLocation
val catalogProperties = hoodieCatalogTable.catalogProperties
val tableConfig = hoodieCatalogTable.tableConfig
val tableSchema = hoodieCatalogTable.tableSchema
val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase)
@@ -449,6 +449,9 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
// TODO(HUDI-3456) clean up
val preCombineField = hoodieCatalogTable.preCombineKey.getOrElse("")
val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf)
val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)
// Enable the hive sync by default if spark have enable the hive metastore.
val enableHive = isEnableHive(sparkSession)
withSparkConf(sparkSession, hoodieCatalogTable.catalogProperties) {
@@ -464,16 +467,15 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
HiveSyncConfig.HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
HiveSyncConfig.HIVE_USE_JDBC.key -> "false",
HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode,
HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> targetTableDb,
HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> targetTableName,
HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString,
HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "200", // set the default parallelism to 200 for sql
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> "200",
HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.partitionValueExtractorClass,
HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "200"), // set the default parallelism to 200 for sql
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"),
HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key, "200"),
SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL
)
.filter { case (_, v) => v != null }