From 0dcd6a8fcaff0a8ac2e0fdccbfd14634722185b2 Mon Sep 17 00:00:00 2001 From: pengzhiwei Date: Thu, 5 Aug 2021 21:57:22 +0800 Subject: [PATCH] [HUDI-2233] Use HMS To Sync Hive Meta For Spark Sql (#3387) --- .../org/apache/hudi/DataSourceOptions.scala | 9 +++- .../apache/hudi/HoodieSparkSqlWriter.scala | 2 +- .../command/DeleteHoodieTableCommand.scala | 5 ++- .../InsertIntoHoodieTableCommand.scala | 2 + .../command/MergeIntoHoodieTableCommand.scala | 2 + .../command/UpdateHoodieTableCommand.scala | 2 + .../apache/hudi/hive/HoodieHiveClient.java | 15 +++---- .../apache/hudi/hive/ddl/HiveSyncMode.java | 42 +++++++++++++++++++ packaging/hudi-spark-bundle/pom.xml | 14 ------- pom.xml | 2 - 10 files changed, 69 insertions(+), 26 deletions(-) create mode 100644 hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveSyncMode.java diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 3c82c44c9..b12361b61 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -122,7 +122,6 @@ object DataSourceReadOptions { * Options supported for writing hoodie tables. */ object DataSourceWriteOptions { - private val log = LogManager.getLogger(DataSourceWriteOptions.getClass) val BULK_INSERT_OPERATION_OPT_VAL = WriteOperationType.BULK_INSERT.value @@ -349,9 +348,12 @@ object DataSourceWriteOptions { .defaultValue("false") .withDocumentation("") + // We should use HIVE_SYNC_MODE instead of this config from 0.9.0 + @Deprecated val HIVE_USE_JDBC: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.hive_sync.use_jdbc") .defaultValue("true") + .deprecatedAfter("0.9.0") .withDocumentation("Use JDBC when hive synchronization is enabled") val HIVE_AUTO_CREATE_DATABASE: ConfigProperty[String] = ConfigProperty @@ -401,6 +403,11 @@ object DataSourceWriteOptions { .defaultValue(1000) .withDocumentation("The number of partitions one batch when synchronous partitions to hive.") + val HIVE_SYNC_MODE: ConfigProperty[String] = ConfigProperty + .key("hoodie.datasource.hive_sync.mode") + .noDefaultValue() + .withDocumentation("Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql.") + // Async Compaction - Enabled by default for MOR val ASYNC_COMPACT_ENABLE: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.compaction.async.enable") diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index e132471f7..6213ab859 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -464,7 +464,7 @@ object HoodieSparkSqlWriter { hiveSyncConfig.syncAsSparkDataSourceTable = hoodieConfig.getStringOrDefault(HIVE_SYNC_AS_DATA_SOURCE_TABLE).toBoolean hiveSyncConfig.sparkSchemaLengthThreshold = sqlConf.getConf(StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD) hiveSyncConfig.createManagedTable = hoodieConfig.getBoolean(HIVE_CREATE_MANAGED_TABLE) - + hiveSyncConfig.syncMode = hoodieConfig.getString(HIVE_SYNC_MODE) hiveSyncConfig.serdeProperties = hoodieConfig.getString(HIVE_TABLE_SERDE_PROPERTIES) hiveSyncConfig.tableProperties = hoodieConfig.getString(HIVE_TABLE_PROPERTIES) hiveSyncConfig diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala index 74fe88bcc..6c88d89b8 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala @@ -17,10 +17,12 @@ package org.apache.spark.sql.hudi.command +import org.apache.hudi.DataSourceWriteOptions.OPERATION import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport} -import org.apache.hudi.DataSourceWriteOptions.{HIVE_STYLE_PARTITIONING, HIVE_SUPPORT_TIMESTAMP, KEYGENERATOR_CLASS, OPERATION, PARTITIONPATH_FIELD, RECORDKEY_FIELD} +import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME +import org.apache.hudi.hive.ddl.HiveSyncMode import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, SubqueryAlias} import org.apache.spark.sql.execution.command.RunnableCommand @@ -73,6 +75,7 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Runnab TABLE_NAME.key -> tableId.table, OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL, PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","), + HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(), HIVE_SUPPORT_TIMESTAMP.key -> "true", HIVE_STYLE_PARTITIONING.key -> "true", HoodieWriteConfig.DELETE_PARALLELISM.key -> "200", diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index bbca17f8d..9782ec5fd 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -27,6 +27,7 @@ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME import org.apache.hudi.hive.MultiPartKeysValueExtractor +import org.apache.hudi.hive.ddl.HiveSyncMode import org.apache.hudi.{HoodieSparkSqlWriter, HoodieWriterUtils} import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} @@ -244,6 +245,7 @@ object InsertIntoHoodieTableCommand { PARTITIONPATH_FIELD.key -> partitionFields, PAYLOAD_CLASS.key -> payloadClassName, META_SYNC_ENABLED.key -> enableHive.toString, + HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(), HIVE_USE_JDBC.key -> "false", HIVE_DATABASE.key -> table.identifier.database.getOrElse("default"), HIVE_TABLE.key -> table.identifier.table, diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index ab714c852..5146868ff 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -23,6 +23,7 @@ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME import org.apache.hudi.hive.MultiPartKeysValueExtractor +import org.apache.hudi.hive.ddl.HiveSyncMode import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils, SparkAdapterSupport} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, Cast, EqualTo, Expression, Literal} @@ -437,6 +438,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","), PAYLOAD_CLASS.key -> classOf[ExpressionPayload].getCanonicalName, META_SYNC_ENABLED.key -> enableHive.toString, + HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(), HIVE_USE_JDBC.key -> "false", HIVE_DATABASE.key -> targetTableDb, HIVE_TABLE.key -> targetTableName, diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala index e384c413b..f6d119525 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME import org.apache.hudi.hive.MultiPartKeysValueExtractor +import org.apache.hudi.hive.ddl.HiveSyncMode import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.logical.{Assignment, SubqueryAlias, UpdateTable} @@ -104,6 +105,7 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo OPERATION.key -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","), META_SYNC_ENABLED.key -> enableHive.toString, + HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(), HIVE_USE_JDBC.key -> "false", HIVE_DATABASE.key -> tableId.database.getOrElse("default"), HIVE_TABLE.key -> tableId.table, diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java index 36d7c50a5..4a22bb883 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java @@ -22,13 +22,13 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.hive.util.HiveSchemaUtil; +import org.apache.hudi.sync.common.AbstractSyncHoodieClient; import org.apache.hudi.hive.ddl.DDLExecutor; import org.apache.hudi.hive.ddl.HMSDDLExecutor; import org.apache.hudi.hive.ddl.HiveQueryDDLExecutor; +import org.apache.hudi.hive.ddl.HiveSyncMode; import org.apache.hudi.hive.ddl.JDBCExecutor; -import org.apache.hudi.hive.util.HiveSchemaUtil; -import org.apache.hudi.sync.common.AbstractSyncHoodieClient; - import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -69,14 +69,15 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient { // disable jdbc and depend on metastore client for all hive registrations try { if (!StringUtils.isNullOrEmpty(cfg.syncMode)) { - switch (cfg.syncMode.toLowerCase()) { - case "hms": + HiveSyncMode syncMode = HiveSyncMode.of(cfg.syncMode); + switch (syncMode) { + case HMS: ddlExecutor = new HMSDDLExecutor(configuration, cfg, fs); break; - case "hiveql": + case HIVEQL: ddlExecutor = new HiveQueryDDLExecutor(cfg, fs, configuration); break; - case "jdbc": + case JDBC: ddlExecutor = new JDBCExecutor(cfg, fs); break; default: diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveSyncMode.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveSyncMode.java new file mode 100644 index 000000000..7e011538c --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveSyncMode.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.ddl; + +import java.util.Locale; + +public enum HiveSyncMode { + + /** + * The HMS mode use the hive meta client to sync metadata. + */ + HMS, + /** + * The HIVEQL mode execute hive ql to sync metadata. + */ + HIVEQL, + /** + * The JDBC mode use hive jdbc to sync metadata. + */ + JDBC + ; + + public static HiveSyncMode of(String syncMode) { + return HiveSyncMode.valueOf(syncMode.toUpperCase(Locale.ROOT)); + } +} diff --git a/packaging/hudi-spark-bundle/pom.xml b/packaging/hudi-spark-bundle/pom.xml index 829e03426..9124cfe24 100644 --- a/packaging/hudi-spark-bundle/pom.xml +++ b/packaging/hudi-spark-bundle/pom.xml @@ -115,8 +115,6 @@ org.apache.curator:curator-client org.apache.curator:curator-recipes commons-codec:commons-codec - org.json:json - org.apache.calcite:calcite-core @@ -369,18 +367,6 @@ curator-recipes ${zk-curator.version} - - - org.json - json - ${json.version} - - - - org.apache.calcite - calcite-core - ${calcite.version} - diff --git a/pom.xml b/pom.xml index 7bd7ccab3..567266934 100644 --- a/pom.xml +++ b/pom.xml @@ -152,8 +152,6 @@ org.apache.hudi. true 2.7.1 - 20200518 - 1.16.0