[HUDI-2233] Use HMS To Sync Hive Meta For Spark Sql (#3387)
This commit is contained in:
@@ -122,7 +122,6 @@ object DataSourceReadOptions {
|
|||||||
* Options supported for writing hoodie tables.
|
* Options supported for writing hoodie tables.
|
||||||
*/
|
*/
|
||||||
object DataSourceWriteOptions {
|
object DataSourceWriteOptions {
|
||||||
|
|
||||||
private val log = LogManager.getLogger(DataSourceWriteOptions.getClass)
|
private val log = LogManager.getLogger(DataSourceWriteOptions.getClass)
|
||||||
|
|
||||||
val BULK_INSERT_OPERATION_OPT_VAL = WriteOperationType.BULK_INSERT.value
|
val BULK_INSERT_OPERATION_OPT_VAL = WriteOperationType.BULK_INSERT.value
|
||||||
@@ -349,9 +348,12 @@ object DataSourceWriteOptions {
|
|||||||
.defaultValue("false")
|
.defaultValue("false")
|
||||||
.withDocumentation("")
|
.withDocumentation("")
|
||||||
|
|
||||||
|
// We should use HIVE_SYNC_MODE instead of this config from 0.9.0
|
||||||
|
@Deprecated
|
||||||
val HIVE_USE_JDBC: ConfigProperty[String] = ConfigProperty
|
val HIVE_USE_JDBC: ConfigProperty[String] = ConfigProperty
|
||||||
.key("hoodie.datasource.hive_sync.use_jdbc")
|
.key("hoodie.datasource.hive_sync.use_jdbc")
|
||||||
.defaultValue("true")
|
.defaultValue("true")
|
||||||
|
.deprecatedAfter("0.9.0")
|
||||||
.withDocumentation("Use JDBC when hive synchronization is enabled")
|
.withDocumentation("Use JDBC when hive synchronization is enabled")
|
||||||
|
|
||||||
val HIVE_AUTO_CREATE_DATABASE: ConfigProperty[String] = ConfigProperty
|
val HIVE_AUTO_CREATE_DATABASE: ConfigProperty[String] = ConfigProperty
|
||||||
@@ -401,6 +403,11 @@ object DataSourceWriteOptions {
|
|||||||
.defaultValue(1000)
|
.defaultValue(1000)
|
||||||
.withDocumentation("The number of partitions one batch when synchronous partitions to hive.")
|
.withDocumentation("The number of partitions one batch when synchronous partitions to hive.")
|
||||||
|
|
||||||
|
val HIVE_SYNC_MODE: ConfigProperty[String] = ConfigProperty
|
||||||
|
.key("hoodie.datasource.hive_sync.mode")
|
||||||
|
.noDefaultValue()
|
||||||
|
.withDocumentation("Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql.")
|
||||||
|
|
||||||
// Async Compaction - Enabled by default for MOR
|
// Async Compaction - Enabled by default for MOR
|
||||||
val ASYNC_COMPACT_ENABLE: ConfigProperty[String] = ConfigProperty
|
val ASYNC_COMPACT_ENABLE: ConfigProperty[String] = ConfigProperty
|
||||||
.key("hoodie.datasource.compaction.async.enable")
|
.key("hoodie.datasource.compaction.async.enable")
|
||||||
|
|||||||
@@ -464,7 +464,7 @@ object HoodieSparkSqlWriter {
|
|||||||
hiveSyncConfig.syncAsSparkDataSourceTable = hoodieConfig.getStringOrDefault(HIVE_SYNC_AS_DATA_SOURCE_TABLE).toBoolean
|
hiveSyncConfig.syncAsSparkDataSourceTable = hoodieConfig.getStringOrDefault(HIVE_SYNC_AS_DATA_SOURCE_TABLE).toBoolean
|
||||||
hiveSyncConfig.sparkSchemaLengthThreshold = sqlConf.getConf(StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD)
|
hiveSyncConfig.sparkSchemaLengthThreshold = sqlConf.getConf(StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD)
|
||||||
hiveSyncConfig.createManagedTable = hoodieConfig.getBoolean(HIVE_CREATE_MANAGED_TABLE)
|
hiveSyncConfig.createManagedTable = hoodieConfig.getBoolean(HIVE_CREATE_MANAGED_TABLE)
|
||||||
|
hiveSyncConfig.syncMode = hoodieConfig.getString(HIVE_SYNC_MODE)
|
||||||
hiveSyncConfig.serdeProperties = hoodieConfig.getString(HIVE_TABLE_SERDE_PROPERTIES)
|
hiveSyncConfig.serdeProperties = hoodieConfig.getString(HIVE_TABLE_SERDE_PROPERTIES)
|
||||||
hiveSyncConfig.tableProperties = hoodieConfig.getString(HIVE_TABLE_PROPERTIES)
|
hiveSyncConfig.tableProperties = hoodieConfig.getString(HIVE_TABLE_PROPERTIES)
|
||||||
hiveSyncConfig
|
hiveSyncConfig
|
||||||
|
|||||||
@@ -17,10 +17,12 @@
|
|||||||
|
|
||||||
package org.apache.spark.sql.hudi.command
|
package org.apache.spark.sql.hudi.command
|
||||||
|
|
||||||
|
import org.apache.hudi.DataSourceWriteOptions.OPERATION
|
||||||
import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
|
import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
|
||||||
import org.apache.hudi.DataSourceWriteOptions.{HIVE_STYLE_PARTITIONING, HIVE_SUPPORT_TIMESTAMP, KEYGENERATOR_CLASS, OPERATION, PARTITIONPATH_FIELD, RECORDKEY_FIELD}
|
import org.apache.hudi.DataSourceWriteOptions._
|
||||||
import org.apache.hudi.config.HoodieWriteConfig
|
import org.apache.hudi.config.HoodieWriteConfig
|
||||||
import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME
|
import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME
|
||||||
|
import org.apache.hudi.hive.ddl.HiveSyncMode
|
||||||
import org.apache.spark.sql._
|
import org.apache.spark.sql._
|
||||||
import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, SubqueryAlias}
|
import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, SubqueryAlias}
|
||||||
import org.apache.spark.sql.execution.command.RunnableCommand
|
import org.apache.spark.sql.execution.command.RunnableCommand
|
||||||
@@ -73,6 +75,7 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Runnab
|
|||||||
TABLE_NAME.key -> tableId.table,
|
TABLE_NAME.key -> tableId.table,
|
||||||
OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
|
OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
|
||||||
PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","),
|
PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","),
|
||||||
|
HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
|
||||||
HIVE_SUPPORT_TIMESTAMP.key -> "true",
|
HIVE_SUPPORT_TIMESTAMP.key -> "true",
|
||||||
HIVE_STYLE_PARTITIONING.key -> "true",
|
HIVE_STYLE_PARTITIONING.key -> "true",
|
||||||
HoodieWriteConfig.DELETE_PARALLELISM.key -> "200",
|
HoodieWriteConfig.DELETE_PARALLELISM.key -> "200",
|
||||||
|
|||||||
@@ -27,6 +27,7 @@ import org.apache.hudi.DataSourceWriteOptions._
|
|||||||
import org.apache.hudi.config.HoodieWriteConfig
|
import org.apache.hudi.config.HoodieWriteConfig
|
||||||
import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME
|
import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME
|
||||||
import org.apache.hudi.hive.MultiPartKeysValueExtractor
|
import org.apache.hudi.hive.MultiPartKeysValueExtractor
|
||||||
|
import org.apache.hudi.hive.ddl.HiveSyncMode
|
||||||
import org.apache.hudi.{HoodieSparkSqlWriter, HoodieWriterUtils}
|
import org.apache.hudi.{HoodieSparkSqlWriter, HoodieWriterUtils}
|
||||||
import org.apache.spark.sql.catalyst.catalog.CatalogTable
|
import org.apache.spark.sql.catalyst.catalog.CatalogTable
|
||||||
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
|
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
|
||||||
@@ -244,6 +245,7 @@ object InsertIntoHoodieTableCommand {
|
|||||||
PARTITIONPATH_FIELD.key -> partitionFields,
|
PARTITIONPATH_FIELD.key -> partitionFields,
|
||||||
PAYLOAD_CLASS.key -> payloadClassName,
|
PAYLOAD_CLASS.key -> payloadClassName,
|
||||||
META_SYNC_ENABLED.key -> enableHive.toString,
|
META_SYNC_ENABLED.key -> enableHive.toString,
|
||||||
|
HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
|
||||||
HIVE_USE_JDBC.key -> "false",
|
HIVE_USE_JDBC.key -> "false",
|
||||||
HIVE_DATABASE.key -> table.identifier.database.getOrElse("default"),
|
HIVE_DATABASE.key -> table.identifier.database.getOrElse("default"),
|
||||||
HIVE_TABLE.key -> table.identifier.table,
|
HIVE_TABLE.key -> table.identifier.table,
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ import org.apache.hudi.DataSourceWriteOptions._
|
|||||||
import org.apache.hudi.config.HoodieWriteConfig
|
import org.apache.hudi.config.HoodieWriteConfig
|
||||||
import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME
|
import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME
|
||||||
import org.apache.hudi.hive.MultiPartKeysValueExtractor
|
import org.apache.hudi.hive.MultiPartKeysValueExtractor
|
||||||
|
import org.apache.hudi.hive.ddl.HiveSyncMode
|
||||||
import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils, SparkAdapterSupport}
|
import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils, SparkAdapterSupport}
|
||||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||||
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, Cast, EqualTo, Expression, Literal}
|
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, Cast, EqualTo, Expression, Literal}
|
||||||
@@ -437,6 +438,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
|
|||||||
PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","),
|
PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","),
|
||||||
PAYLOAD_CLASS.key -> classOf[ExpressionPayload].getCanonicalName,
|
PAYLOAD_CLASS.key -> classOf[ExpressionPayload].getCanonicalName,
|
||||||
META_SYNC_ENABLED.key -> enableHive.toString,
|
META_SYNC_ENABLED.key -> enableHive.toString,
|
||||||
|
HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
|
||||||
HIVE_USE_JDBC.key -> "false",
|
HIVE_USE_JDBC.key -> "false",
|
||||||
HIVE_DATABASE.key -> targetTableDb,
|
HIVE_DATABASE.key -> targetTableDb,
|
||||||
HIVE_TABLE.key -> targetTableName,
|
HIVE_TABLE.key -> targetTableName,
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecord
|
|||||||
import org.apache.hudi.config.HoodieWriteConfig
|
import org.apache.hudi.config.HoodieWriteConfig
|
||||||
import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME
|
import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME
|
||||||
import org.apache.hudi.hive.MultiPartKeysValueExtractor
|
import org.apache.hudi.hive.MultiPartKeysValueExtractor
|
||||||
|
import org.apache.hudi.hive.ddl.HiveSyncMode
|
||||||
import org.apache.spark.sql._
|
import org.apache.spark.sql._
|
||||||
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression}
|
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression}
|
||||||
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, SubqueryAlias, UpdateTable}
|
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, SubqueryAlias, UpdateTable}
|
||||||
@@ -104,6 +105,7 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo
|
|||||||
OPERATION.key -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
|
OPERATION.key -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
|
||||||
PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","),
|
PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","),
|
||||||
META_SYNC_ENABLED.key -> enableHive.toString,
|
META_SYNC_ENABLED.key -> enableHive.toString,
|
||||||
|
HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
|
||||||
HIVE_USE_JDBC.key -> "false",
|
HIVE_USE_JDBC.key -> "false",
|
||||||
HIVE_DATABASE.key -> tableId.database.getOrElse("default"),
|
HIVE_DATABASE.key -> tableId.database.getOrElse("default"),
|
||||||
HIVE_TABLE.key -> tableId.table,
|
HIVE_TABLE.key -> tableId.table,
|
||||||
|
|||||||
@@ -22,13 +22,13 @@ import org.apache.hudi.common.fs.FSUtils;
|
|||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.common.util.StringUtils;
|
import org.apache.hudi.common.util.StringUtils;
|
||||||
|
import org.apache.hudi.hive.util.HiveSchemaUtil;
|
||||||
|
import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
|
||||||
import org.apache.hudi.hive.ddl.DDLExecutor;
|
import org.apache.hudi.hive.ddl.DDLExecutor;
|
||||||
import org.apache.hudi.hive.ddl.HMSDDLExecutor;
|
import org.apache.hudi.hive.ddl.HMSDDLExecutor;
|
||||||
import org.apache.hudi.hive.ddl.HiveQueryDDLExecutor;
|
import org.apache.hudi.hive.ddl.HiveQueryDDLExecutor;
|
||||||
|
import org.apache.hudi.hive.ddl.HiveSyncMode;
|
||||||
import org.apache.hudi.hive.ddl.JDBCExecutor;
|
import org.apache.hudi.hive.ddl.JDBCExecutor;
|
||||||
import org.apache.hudi.hive.util.HiveSchemaUtil;
|
|
||||||
import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
|
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hive.conf.HiveConf;
|
import org.apache.hadoop.hive.conf.HiveConf;
|
||||||
@@ -69,14 +69,15 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
|
|||||||
// disable jdbc and depend on metastore client for all hive registrations
|
// disable jdbc and depend on metastore client for all hive registrations
|
||||||
try {
|
try {
|
||||||
if (!StringUtils.isNullOrEmpty(cfg.syncMode)) {
|
if (!StringUtils.isNullOrEmpty(cfg.syncMode)) {
|
||||||
switch (cfg.syncMode.toLowerCase()) {
|
HiveSyncMode syncMode = HiveSyncMode.of(cfg.syncMode);
|
||||||
case "hms":
|
switch (syncMode) {
|
||||||
|
case HMS:
|
||||||
ddlExecutor = new HMSDDLExecutor(configuration, cfg, fs);
|
ddlExecutor = new HMSDDLExecutor(configuration, cfg, fs);
|
||||||
break;
|
break;
|
||||||
case "hiveql":
|
case HIVEQL:
|
||||||
ddlExecutor = new HiveQueryDDLExecutor(cfg, fs, configuration);
|
ddlExecutor = new HiveQueryDDLExecutor(cfg, fs, configuration);
|
||||||
break;
|
break;
|
||||||
case "jdbc":
|
case JDBC:
|
||||||
ddlExecutor = new JDBCExecutor(cfg, fs);
|
ddlExecutor = new JDBCExecutor(cfg, fs);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
|||||||
@@ -0,0 +1,42 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.hive.ddl;
|
||||||
|
|
||||||
|
import java.util.Locale;
|
||||||
|
|
||||||
|
public enum HiveSyncMode {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The HMS mode use the hive meta client to sync metadata.
|
||||||
|
*/
|
||||||
|
HMS,
|
||||||
|
/**
|
||||||
|
* The HIVEQL mode execute hive ql to sync metadata.
|
||||||
|
*/
|
||||||
|
HIVEQL,
|
||||||
|
/**
|
||||||
|
* The JDBC mode use hive jdbc to sync metadata.
|
||||||
|
*/
|
||||||
|
JDBC
|
||||||
|
;
|
||||||
|
|
||||||
|
public static HiveSyncMode of(String syncMode) {
|
||||||
|
return HiveSyncMode.valueOf(syncMode.toUpperCase(Locale.ROOT));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -115,8 +115,6 @@
|
|||||||
<include>org.apache.curator:curator-client</include>
|
<include>org.apache.curator:curator-client</include>
|
||||||
<include>org.apache.curator:curator-recipes</include>
|
<include>org.apache.curator:curator-recipes</include>
|
||||||
<include>commons-codec:commons-codec</include>
|
<include>commons-codec:commons-codec</include>
|
||||||
<include>org.json:json</include>
|
|
||||||
<include>org.apache.calcite:calcite-core</include>
|
|
||||||
</includes>
|
</includes>
|
||||||
</artifactSet>
|
</artifactSet>
|
||||||
<relocations>
|
<relocations>
|
||||||
@@ -369,18 +367,6 @@
|
|||||||
<artifactId>curator-recipes</artifactId>
|
<artifactId>curator-recipes</artifactId>
|
||||||
<version>${zk-curator.version}</version>
|
<version>${zk-curator.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<!-- import json to fix the crash when sync meta in spark-->
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.json</groupId>
|
|
||||||
<artifactId>json</artifactId>
|
|
||||||
<version>${json.version}</version>
|
|
||||||
</dependency>
|
|
||||||
<!-- import calcite to support hive3 & spark 3-->
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.calcite</groupId>
|
|
||||||
<artifactId>calcite-core</artifactId>
|
|
||||||
<version>${calcite.version}</version>
|
|
||||||
</dependency>
|
|
||||||
<!-- TODO: Reinvestigate PR 633 -->
|
<!-- TODO: Reinvestigate PR 633 -->
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
|
|||||||
2
pom.xml
2
pom.xml
@@ -152,8 +152,6 @@
|
|||||||
<presto.bundle.bootstrap.shade.prefix>org.apache.hudi.</presto.bundle.bootstrap.shade.prefix>
|
<presto.bundle.bootstrap.shade.prefix>org.apache.hudi.</presto.bundle.bootstrap.shade.prefix>
|
||||||
<shadeSources>true</shadeSources>
|
<shadeSources>true</shadeSources>
|
||||||
<zk-curator.version>2.7.1</zk-curator.version>
|
<zk-curator.version>2.7.1</zk-curator.version>
|
||||||
<json.version>20200518</json.version>
|
|
||||||
<calcite.version>1.16.0</calcite.version>
|
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<scm>
|
<scm>
|
||||||
|
|||||||
Reference in New Issue
Block a user