diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java index 105107bce..97e47deed 100644 --- a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java @@ -176,7 +176,7 @@ public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient { } @Override - public void dropPartitionsToTable(String tableName, List partitionsToDrop) { + public void dropPartitions(String tableName, List partitionsToDrop) { throw new UnsupportedOperationException("Not support dropPartitionsToTable yet."); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 57725f253..854eef175 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -421,25 +421,6 @@ public class TableSchemaResolver { return latestSchema; } - - /** - * Get Last commit's Metadata. - */ - public Option getLatestCommitMetadata() { - try { - HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); - if (timeline.lastInstant().isPresent()) { - HoodieInstant instant = timeline.lastInstant().get(); - byte[] data = timeline.getInstantDetails(instant).get(); - return Option.of(HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class)); - } else { - return Option.empty(); - } - } catch (Exception e) { - throw new HoodieException("Failed to get commit metadata", e); - } - } - /** * Read the parquet schema from a parquet File. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 33a03ba50..b6d9224b3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -1336,4 +1336,22 @@ public class HoodieTableMetadataUtil { inflightAndCompletedPartitions.addAll(getCompletedMetadataPartitions(tableConfig)); return inflightAndCompletedPartitions; } + + /** + * Get Last commit's Metadata. + */ + public static Option getLatestCommitMetadata(HoodieTableMetaClient metaClient) { + try { + HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + if (timeline.lastInstant().isPresent()) { + HoodieInstant instant = timeline.lastInstant().get(); + byte[] data = timeline.getInstantDetails(instant).get(); + return Option.of(HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class)); + } else { + return Option.empty(); + } + } catch (Exception e) { + throw new HoodieException("Failed to get commit metadata", e); + } + } } diff --git a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java index 4aafbf2e5..cb41ca227 100644 --- a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java +++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java @@ -177,12 +177,6 @@ public class HoodieBigQuerySyncClient extends AbstractSyncHoodieClient { throw new UnsupportedOperationException("No support for addPartitionsToTable yet."); } - @Override - public void dropPartitionsToTable(final String tableName, final List partitionsToDrop) { - // bigQuery discovers the new partitions automatically, so do nothing. - throw new UnsupportedOperationException("No support for dropPartitionsToTable yet."); - } - public boolean datasetExists() { Dataset dataset = bigquery.getDataset(DatasetId.of(syncConfig.projectId, syncConfig.datasetName)); return dataset != null; @@ -236,6 +230,12 @@ public class HoodieBigQuerySyncClient extends AbstractSyncHoodieClient { throw new UnsupportedOperationException("No support for updatePartitionsToTable yet."); } + @Override + public void dropPartitions(String tableName, List partitionsToDrop) { + // bigQuery discovers the new partitions automatically, so do nothing. + throw new UnsupportedOperationException("No support for dropPartitions yet."); + } + @Override public void close() { // bigQuery has no connection close method, so do nothing. diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala index 98823d142..7ee8f6ad5 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala @@ -110,6 +110,11 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten */ lazy val partitionFields: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty) + /** + * BaseFileFormat + */ + lazy val baseFileFormat: String = metaClient.getTableConfig.getBaseFileFormat.name() + /** * The schema of table. * Make StructField nullable and fill the comments in. diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala index d6745b679..74255473b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala @@ -17,22 +17,31 @@ package org.apache.spark.sql.hudi -import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload -import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME -import org.apache.hudi.hive.MultiPartKeysValueExtractor +import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig} import org.apache.hudi.hive.ddl.HiveSyncMode +import org.apache.hudi.hive.{HiveSyncConfig, MultiPartKeysValueExtractor} import org.apache.hudi.keygen.ComplexKeyGenerator import org.apache.hudi.sql.InsertMode +import org.apache.hudi.sync.common.HoodieSyncConfig +import org.apache.hudi.{DataSourceWriteOptions, HoodieWriterUtils} import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable +import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isEnableHive, withSparkConf} import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, ValidateDuplicateKeyPayload} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType -import scala.collection.JavaConverters.propertiesAsScalaMapConverter +import java.util +import java.util.Locale + +import scala.collection.JavaConverters._ trait ProvidesHoodieConfig extends Logging { @@ -40,7 +49,6 @@ trait ProvidesHoodieConfig extends Logging { val sparkSession: SparkSession = hoodieCatalogTable.spark val catalogProperties = hoodieCatalogTable.catalogProperties val tableConfig = hoodieCatalogTable.tableConfig - val tableId = hoodieCatalogTable.table.identifier // NOTE: Here we fallback to "" to make sure that null value is not overridden with // default value ("ts") @@ -51,6 +59,10 @@ trait ProvidesHoodieConfig extends Logging { s"There are no primary key in table ${hoodieCatalogTable.table.identifier}, cannot execute update operator") val enableHive = isEnableHive(sparkSession) + val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf) + + val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable) + withSparkConf(sparkSession, catalogProperties) { Map.apply( "path" -> hoodieCatalogTable.tableLocation, @@ -63,15 +75,14 @@ trait ProvidesHoodieConfig extends Logging { SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName, OPERATION.key -> UPSERT_OPERATION_OPT_VAL, PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp, - META_SYNC_ENABLED.key -> enableHive.toString, - HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(), - HIVE_USE_JDBC.key -> "false", - HIVE_DATABASE.key -> tableId.database.getOrElse("default"), - HIVE_TABLE.key -> tableId.table, - HIVE_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp, - HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName, - HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true", - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200", + HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString, + HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode, + HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.databaseName, + HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.tableName, + HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp, + HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.partitionValueExtractorClass, + HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString, + HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"), SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL ) .filter { case(_, v) => v != null } @@ -98,10 +109,12 @@ trait ProvidesHoodieConfig extends Logging { val path = hoodieCatalogTable.tableLocation val tableType = hoodieCatalogTable.tableTypeName val tableConfig = hoodieCatalogTable.tableConfig - val tableSchema = hoodieCatalogTable.tableSchema + val catalogProperties = hoodieCatalogTable.catalogProperties - val options = hoodieCatalogTable.catalogProperties ++ tableConfig.getProps.asScala.toMap ++ extraOptions - val parameters = withSparkConf(sparkSession, options)() + val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf, extraOptions) + val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable) + + val parameters = withSparkConf(sparkSession, catalogProperties)() val partitionFieldsStr = hoodieCatalogTable.partitionFields.mkString(",") @@ -161,7 +174,7 @@ trait ProvidesHoodieConfig extends Logging { val enableHive = isEnableHive(sparkSession) - withSparkConf(sparkSession, options) { + withSparkConf(sparkSession, catalogProperties) { Map( "path" -> path, TABLE_TYPE.key -> tableType, @@ -177,20 +190,124 @@ trait ProvidesHoodieConfig extends Logging { PAYLOAD_CLASS_NAME.key -> payloadClassName, ENABLE_ROW_WRITER.key -> enableBulkInsert.toString, HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn), - HIVE_PARTITION_FIELDS.key -> partitionFieldsStr, - META_SYNC_ENABLED.key -> enableHive.toString, - HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(), - HIVE_USE_JDBC.key -> "false", - HIVE_DATABASE.key -> hoodieCatalogTable.table.identifier.database.getOrElse("default"), - HIVE_TABLE.key -> hoodieCatalogTable.table.identifier.table, - HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true", - HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName, - HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "200", - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200", + HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFieldsStr, + HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString, + HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode, + HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.databaseName, + HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.tableName, + HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString, + HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.partitionValueExtractorClass, + HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "200"), + HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"), SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL ) .filter { case (_, v) => v != null } } } + def buildHoodieDropPartitionsConfig( + sparkSession: SparkSession, + hoodieCatalogTable: HoodieCatalogTable, + partitionsToDrop: String): Map[String, String] = { + val partitionFields = hoodieCatalogTable.partitionFields.mkString(",") + val enableHive = isEnableHive(sparkSession) + val catalogProperties = hoodieCatalogTable.catalogProperties + val tableConfig = hoodieCatalogTable.tableConfig + + val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf) + val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable) + + withSparkConf(sparkSession, catalogProperties) { + Map( + "path" -> hoodieCatalogTable.tableLocation, + TBL_NAME.key -> hoodieCatalogTable.tableName, + TABLE_TYPE.key -> hoodieCatalogTable.tableTypeName, + OPERATION.key -> DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL, + PARTITIONS_TO_DELETE.key -> partitionsToDrop, + RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), + PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""), + PARTITIONPATH_FIELD.key -> partitionFields, + HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString, + HiveSyncConfig.HIVE_SYNC_ENABLED.key -> enableHive.toString, + HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode, + HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.databaseName, + HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.tableName, + HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString, + HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields, + HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.partitionValueExtractorClass + ) + .filter { case (_, v) => v != null } + } + } + + def buildHoodieDeleteTableConfig(hoodieCatalogTable: HoodieCatalogTable, + sparkSession: SparkSession): Map[String, String] = { + val path = hoodieCatalogTable.tableLocation + val catalogProperties = hoodieCatalogTable.catalogProperties + val tableConfig = hoodieCatalogTable.tableConfig + val tableSchema = hoodieCatalogTable.tableSchema + val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase(Locale.ROOT)) + val partitionSchema = StructType(tableSchema.filter(f => partitionColumns.contains(f.name))) + + assert(hoodieCatalogTable.primaryKeys.nonEmpty, + s"There are no primary key defined in table ${hoodieCatalogTable.table.identifier}, cannot execute delete operation") + + val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf) + val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable) + + withSparkConf(sparkSession, hoodieCatalogTable.catalogProperties) { + Map( + "path" -> path, + RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), + TBL_NAME.key -> tableConfig.getTableName, + HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, + URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning, + KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, + SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName, + OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL, + PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp, + HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode, + HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString, + HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key, "200"), + SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL + ) + } + } + + def getHoodieProps(catalogProperties: Map[String, String], tableConfig: HoodieTableConfig, conf: SQLConf, extraOptions: Map[String, String] = Map.empty): TypedProperties = { + val options: Map[String, String] = catalogProperties ++ tableConfig.getProps.asScala.toMap ++ conf.getAllConfs ++ extraOptions + val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(options) + hoodieConfig.getProps + } + + def buildHiveSyncConfig(props: TypedProperties, hoodieCatalogTable: HoodieCatalogTable): HiveSyncConfig = { + val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig + hiveSyncConfig.basePath = hoodieCatalogTable.tableLocation + hiveSyncConfig.baseFileFormat = hoodieCatalogTable.baseFileFormat + hiveSyncConfig.usePreApacheInputFormat = props.getBoolean(HiveSyncConfig.HIVE_USE_PRE_APACHE_INPUT_FORMAT.key, HiveSyncConfig.HIVE_USE_PRE_APACHE_INPUT_FORMAT.defaultValue.toBoolean) + hiveSyncConfig.databaseName = hoodieCatalogTable.table.identifier.database.getOrElse("default") + if (props.containsKey(HoodieSyncConfig.META_SYNC_TABLE_NAME.key)) { + hiveSyncConfig.tableName = props.getString(HoodieSyncConfig.META_SYNC_TABLE_NAME.key) + } else { + hiveSyncConfig.tableName = hoodieCatalogTable.table.identifier.table + } + hiveSyncConfig.syncMode = props.getString(HiveSyncConfig.HIVE_SYNC_MODE.key, HiveSyncMode.HMS.name()) + hiveSyncConfig.hiveUser = props.getString(HiveSyncConfig.HIVE_USER.key, HiveSyncConfig.HIVE_USER.defaultValue) + hiveSyncConfig.hivePass = props.getString(HiveSyncConfig.HIVE_PASS.key, HiveSyncConfig.HIVE_PASS.defaultValue) + hiveSyncConfig.jdbcUrl = props.getString(HiveSyncConfig.HIVE_URL.key, HiveSyncConfig.HIVE_URL.defaultValue) + hiveSyncConfig.metastoreUris = props.getString(HiveSyncConfig.METASTORE_URIS.key, HiveSyncConfig.METASTORE_URIS.defaultValue) + hiveSyncConfig.partitionFields = props.getStringList(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key, ",", new util.ArrayList[String]) + hiveSyncConfig.partitionValueExtractorClass = props.getString(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key, classOf[MultiPartKeysValueExtractor].getName) + if (props.containsKey(HiveSyncConfig.HIVE_SYNC_MODE.key)) hiveSyncConfig.syncMode = props.getString(HiveSyncConfig.HIVE_SYNC_MODE.key) + hiveSyncConfig.autoCreateDatabase = props.getString(HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.key, HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.defaultValue).toBoolean + hiveSyncConfig.ignoreExceptions = props.getString(HiveSyncConfig.HIVE_IGNORE_EXCEPTIONS.key, HiveSyncConfig.HIVE_IGNORE_EXCEPTIONS.defaultValue).toBoolean + hiveSyncConfig.skipROSuffix = props.getString(HiveSyncConfig.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE.key, HiveSyncConfig.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE.defaultValue).toBoolean + hiveSyncConfig.supportTimestamp = props.getString(HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key, "true").toBoolean + hiveSyncConfig.isConditionalSync = props.getString(HoodieSyncConfig.META_SYNC_CONDITIONAL_SYNC.key, HoodieSyncConfig.META_SYNC_CONDITIONAL_SYNC.defaultValue).toBoolean + hiveSyncConfig.bucketSpec = if (props.getBoolean(HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC.key, HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC.defaultValue)) HiveSyncConfig.getBucketSpec(props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key), props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key)) + else null + if (props.containsKey(HiveExternalCatalog.CREATED_SPARK_VERSION)) hiveSyncConfig.sparkVersion = props.getString(HiveExternalCatalog.CREATED_SPARK_VERSION) + hiveSyncConfig.syncComment = props.getString(DataSourceWriteOptions.HIVE_SYNC_COMMENT.key, DataSourceWriteOptions.HIVE_SYNC_COMMENT.defaultValue).toBoolean + hiveSyncConfig + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala index bcc397694..c7afbfe11 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala @@ -17,19 +17,15 @@ package org.apache.spark.sql.hudi.command -import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.HoodieSparkSqlWriter import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME -import org.apache.hudi.hive.ddl.HiveSyncMode -import org.apache.hudi.hive.{HiveSyncConfig, MultiPartKeysValueExtractor} -import org.apache.hudi.sync.common.HoodieSyncConfig -import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._ +import org.apache.spark.sql.hudi.ProvidesHoodieConfig import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} case class AlterHoodieTableDropPartitionCommand( @@ -38,7 +34,7 @@ case class AlterHoodieTableDropPartitionCommand( ifExists : Boolean, purge : Boolean, retainData : Boolean) - extends HoodieLeafRunnableCommand { + extends HoodieLeafRunnableCommand with ProvidesHoodieConfig { override def run(sparkSession: SparkSession): Seq[Row] = { val fullTableName = s"${tableIdentifier.database}.${tableIdentifier.table}" @@ -62,7 +58,7 @@ case class AlterHoodieTableDropPartitionCommand( } val partitionsToDrop = getPartitionPathToDrop(hoodieCatalogTable, normalizedSpecs) - val parameters = buildHoodieConfig(sparkSession, hoodieCatalogTable, partitionsToDrop) + val parameters = buildHoodieDropPartitionsConfig(sparkSession, hoodieCatalogTable, partitionsToDrop) HoodieSparkSqlWriter.write( sparkSession.sqlContext, SaveMode.Append, @@ -84,33 +80,4 @@ case class AlterHoodieTableDropPartitionCommand( logInfo(s"Finish execute alter table drop partition command for $fullTableName") Seq.empty[Row] } - - private def buildHoodieConfig( - sparkSession: SparkSession, - hoodieCatalogTable: HoodieCatalogTable, - partitionsToDrop: String): Map[String, String] = { - val partitionFields = hoodieCatalogTable.partitionFields.mkString(",") - val enableHive = isEnableHive(sparkSession) - withSparkConf(sparkSession, Map.empty) { - Map( - "path" -> hoodieCatalogTable.tableLocation, - TBL_NAME.key -> hoodieCatalogTable.tableName, - TABLE_TYPE.key -> hoodieCatalogTable.tableTypeName, - OPERATION.key -> DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL, - PARTITIONS_TO_DELETE.key -> partitionsToDrop, - RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), - PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""), - PARTITIONPATH_FIELD.key -> partitionFields, - HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString, - HiveSyncConfig.HIVE_SYNC_ENABLED.key -> enableHive.toString, - HiveSyncConfig.HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(), - HiveSyncConfig.HIVE_USE_JDBC.key -> "false", - HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hoodieCatalogTable.table.identifier.database.getOrElse("default"), - HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hoodieCatalogTable.table.identifier.table, - HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true", - HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields, - HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName - ) - } - } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala index f6da1b386..632a983b4 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala @@ -17,20 +17,15 @@ package org.apache.spark.sql.hudi.command -import org.apache.hudi.DataSourceWriteOptions.{OPERATION, _} -import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME -import org.apache.hudi.hive.HiveSyncConfig -import org.apache.hudi.hive.ddl.HiveSyncMode -import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport} +import org.apache.hudi.SparkAdapterSupport import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._ -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.hudi.ProvidesHoodieConfig case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends HoodieLeafRunnableCommand - with SparkAdapterSupport { + with SparkAdapterSupport with ProvidesHoodieConfig { private val table = deleteTable.table @@ -44,7 +39,9 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Hoodie if (deleteTable.condition.isDefined) { df = df.filter(Column(deleteTable.condition.get)) } - val config = buildHoodieConfig(sparkSession) + + val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableId) + val config = buildHoodieDeleteTableConfig(hoodieCatalogTable, sparkSession) df.write .format("hudi") .mode(SaveMode.Append) @@ -54,33 +51,4 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Hoodie logInfo(s"Finish execute delete command for $tableId") Seq.empty[Row] } - - private def buildHoodieConfig(sparkSession: SparkSession): Map[String, String] = { - val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableId) - val path = hoodieCatalogTable.tableLocation - val tableConfig = hoodieCatalogTable.tableConfig - val tableSchema = hoodieCatalogTable.tableSchema - val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase) - val partitionSchema = StructType(tableSchema.filter(f => partitionColumns.contains(f.name))) - - assert(hoodieCatalogTable.primaryKeys.nonEmpty, - s"There are no primary key defined in table $tableId, cannot execute delete operator") - withSparkConf(sparkSession, hoodieCatalogTable.catalogProperties) { - Map( - "path" -> path, - RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), - TBL_NAME.key -> tableConfig.getTableName, - HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, - URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning, - KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, - SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName, - OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL, - PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp, - HiveSyncConfig.HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(), - HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true", - HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> "200", - SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL - ) - } - } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index 74d6226b4..1376445bd 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -22,8 +22,7 @@ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.util.StringUtils import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME -import org.apache.hudi.hive.{HiveSyncConfig, MultiPartKeysValueExtractor} -import org.apache.hudi.hive.ddl.HiveSyncMode +import org.apache.hudi.hive.HiveSyncConfig import org.apache.hudi.sync.common.HoodieSyncConfig import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, SparkAdapterSupport} import org.apache.spark.sql._ @@ -34,9 +33,9 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeRef import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._ import org.apache.spark.sql.hudi.HoodieSqlUtils.getMergeIntoTargetTableId -import org.apache.spark.sql.hudi.SerDeUtils import org.apache.spark.sql.hudi.command.payload.ExpressionPayload import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._ +import org.apache.spark.sql.hudi.{ProvidesHoodieConfig, SerDeUtils} import org.apache.spark.sql.types.{BooleanType, StructType} import java.util.Base64 @@ -61,7 +60,7 @@ import java.util.Base64 * */ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends HoodieLeafRunnableCommand - with SparkAdapterSupport { + with SparkAdapterSupport with ProvidesHoodieConfig { private var sparkSession: SparkSession = _ @@ -439,6 +438,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie val targetTableDb = targetTableIdentify.database.getOrElse("default") val targetTableName = targetTableIdentify.identifier val path = hoodieCatalogTable.tableLocation + val catalogProperties = hoodieCatalogTable.catalogProperties val tableConfig = hoodieCatalogTable.tableConfig val tableSchema = hoodieCatalogTable.tableSchema val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase) @@ -449,6 +449,9 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie // TODO(HUDI-3456) clean up val preCombineField = hoodieCatalogTable.preCombineKey.getOrElse("") + val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf) + val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable) + // Enable the hive sync by default if spark have enable the hive metastore. val enableHive = isEnableHive(sparkSession) withSparkConf(sparkSession, hoodieCatalogTable.catalogProperties) { @@ -464,16 +467,15 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName, HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString, - HiveSyncConfig.HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(), - HiveSyncConfig.HIVE_USE_JDBC.key -> "false", + HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode, HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> targetTableDb, HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> targetTableName, - HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true", + HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString, HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp, - HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName, - HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "200", // set the default parallelism to 200 for sql - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200", - HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> "200", + HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.partitionValueExtractorClass, + HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "200"), // set the default parallelism to 200 for sql + HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"), + HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key, "200"), SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL ) .filter { case (_, v) => v != null } diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java index 3510db7c2..68569822c 100644 --- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java +++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java @@ -132,8 +132,8 @@ public class DataHubSyncClient extends AbstractSyncHoodieClient { } @Override - public void dropPartitionsToTable(String tableName, List partitionsToDrop) { - throw new UnsupportedOperationException("Not supported: `dropPartitionsToTable`"); + public void dropPartitions(String tableName, List partitionsToDrop) { + throw new UnsupportedOperationException("Not supported: `dropPartitions`"); } @Override diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java index 54192b6a8..10869eaf2 100644 --- a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java +++ b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java @@ -310,8 +310,8 @@ public class HoodieDLAClient extends AbstractSyncHoodieClient { } @Override - public void dropPartitionsToTable(String tableName, List partitionsToDrop) { - throw new UnsupportedOperationException("Not support dropPartitionsToTable yet."); + public void dropPartitions(String tableName, List partitionsToDrop) { + throw new UnsupportedOperationException("Not support dropPartitions yet."); } public Map, String> scanTablePartitions(String tableName) { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index b6c4069a2..939fc114c 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -404,7 +404,7 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable { List dropPartitions = filterPartitions(partitionEvents, PartitionEventType.DROP); if (!dropPartitions.isEmpty()) { LOG.info("Drop Partitions " + dropPartitions); - hoodieHiveClient.dropPartitionsToTable(tableName, dropPartitions); + hoodieHiveClient.dropPartitions(tableName, dropPartitions); } partitionsChanged = !updatePartitions.isEmpty() || !newPartitions.isEmpty() || !dropPartitions.isEmpty(); diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java index a61e7cb6b..539d18a21 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java @@ -110,7 +110,7 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient { * Partition path has changed - drop the following partitions. */ @Override - public void dropPartitionsToTable(String tableName, List partitionsToDrop) { + public void dropPartitions(String tableName, List partitionsToDrop) { ddlExecutor.dropPartitionsToTable(tableName, partitionsToDrop); } diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java index 33fc20478..8eec32789 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java @@ -18,6 +18,15 @@ package org.apache.hudi.sync.common; +import java.io.Serializable; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -29,21 +38,11 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.schema.MessageType; -import java.io.Serializable; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.List; -import java.util.Map; -import java.util.Objects; - public abstract class AbstractSyncHoodieClient implements AutoCloseable { private static final Logger LOG = LogManager.getLogger(AbstractSyncHoodieClient.class); @@ -113,7 +112,7 @@ public abstract class AbstractSyncHoodieClient implements AutoCloseable { public abstract void updatePartitionsToTable(String tableName, List changedPartitions); - public abstract void dropPartitionsToTable(String tableName, List partitionsToDrop); + public abstract void dropPartitions(String tableName, List partitionsToDrop); public void updateTableProperties(String tableName, Map tableProperties) {} @@ -170,8 +169,7 @@ public abstract class AbstractSyncHoodieClient implements AutoCloseable { public boolean isDropPartition() { try { - Option hoodieCommitMetadata; - hoodieCommitMetadata = new TableSchemaResolver(metaClient).getLatestCommitMetadata(); + Option hoodieCommitMetadata = HoodieTableMetadataUtil.getLatestCommitMetadata(metaClient); if (hoodieCommitMetadata.isPresent() && WriteOperationType.DELETE_PARTITION.equals(hoodieCommitMetadata.get().getOperationType())) {