[HUDI-2051] Enable Hive Sync When Spark Enable Hive Meta For Spark Sql (#3126)
This commit is contained in:
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
|
|||||||
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Cast, Expression, Literal}
|
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Cast, Expression, Literal}
|
||||||
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
|
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
|
||||||
import org.apache.spark.sql.execution.datasources.LogicalRelation
|
import org.apache.spark.sql.execution.datasources.LogicalRelation
|
||||||
import org.apache.spark.sql.internal.SQLConf
|
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
|
||||||
import org.apache.spark.sql.types.{DataType, NullType, StringType, StructField, StructType}
|
import org.apache.spark.sql.types.{DataType, NullType, StringType, StructField, StructType}
|
||||||
|
|
||||||
import scala.collection.immutable.Map
|
import scala.collection.immutable.Map
|
||||||
@@ -171,10 +171,6 @@ object HoodieSqlUtils extends SparkAdapterSupport {
|
|||||||
/**
|
/**
|
||||||
* Append the SparkSession config and table options to the baseConfig.
|
* Append the SparkSession config and table options to the baseConfig.
|
||||||
* We add the "spark" prefix to hoodie's config key.
|
* We add the "spark" prefix to hoodie's config key.
|
||||||
* @param spark
|
|
||||||
* @param options
|
|
||||||
* @param baseConfig
|
|
||||||
* @return
|
|
||||||
*/
|
*/
|
||||||
def withSparkConf(spark: SparkSession, options: Map[String, String])
|
def withSparkConf(spark: SparkSession, options: Map[String, String])
|
||||||
(baseConfig: Map[String, String]): Map[String, String] = {
|
(baseConfig: Map[String, String]): Map[String, String] = {
|
||||||
@@ -184,4 +180,7 @@ object HoodieSqlUtils extends SparkAdapterSupport {
|
|||||||
}
|
}
|
||||||
|
|
||||||
def isSpark3: Boolean = SPARK_VERSION.startsWith("3.")
|
def isSpark3: Boolean = SPARK_VERSION.startsWith("3.")
|
||||||
|
|
||||||
|
def isEnableHive(sparkSession: SparkSession): Boolean =
|
||||||
|
"hive" == sparkSession.sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -155,7 +155,7 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
|
|||||||
validateTable(newTable)
|
validateTable(newTable)
|
||||||
|
|
||||||
// Create table in the catalog
|
// Create table in the catalog
|
||||||
val enableHive = "hive" == sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION)
|
val enableHive = isEnableHive(sparkSession)
|
||||||
if (enableHive) {
|
if (enableHive) {
|
||||||
createHiveDataSourceTable(newTable, sparkSession)
|
createHiveDataSourceTable(newTable, sparkSession)
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -66,10 +66,9 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Runnab
|
|||||||
|
|
||||||
assert(primaryColumns.nonEmpty,
|
assert(primaryColumns.nonEmpty,
|
||||||
s"There are no primary key defined in table $tableId, cannot execute delete operator")
|
s"There are no primary key defined in table $tableId, cannot execute delete operator")
|
||||||
|
|
||||||
withSparkConf(sparkSession, targetTable.storage.properties) {
|
withSparkConf(sparkSession, targetTable.storage.properties) {
|
||||||
Map(
|
Map(
|
||||||
"path" -> path.toString,
|
"path" -> path,
|
||||||
KEYGENERATOR_CLASS_OPT_KEY.key -> classOf[SqlKeyGenerator].getCanonicalName,
|
KEYGENERATOR_CLASS_OPT_KEY.key -> classOf[SqlKeyGenerator].getCanonicalName,
|
||||||
TABLE_NAME.key -> tableId.table,
|
TABLE_NAME.key -> tableId.table,
|
||||||
OPERATION_OPT_KEY.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
|
OPERATION_OPT_KEY.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
|
||||||
|
|||||||
@@ -18,7 +18,6 @@
|
|||||||
package org.apache.spark.sql.hudi.command
|
package org.apache.spark.sql.hudi.command
|
||||||
|
|
||||||
import java.util.Properties
|
import java.util.Properties
|
||||||
|
|
||||||
import org.apache.avro.Schema
|
import org.apache.avro.Schema
|
||||||
import org.apache.avro.generic.{GenericRecord, IndexedRecord}
|
import org.apache.avro.generic.{GenericRecord, IndexedRecord}
|
||||||
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord}
|
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord}
|
||||||
@@ -232,7 +231,7 @@ object InsertIntoHoodieTableCommand {
|
|||||||
} else {
|
} else {
|
||||||
classOf[DefaultHoodieRecordPayload].getCanonicalName
|
classOf[DefaultHoodieRecordPayload].getCanonicalName
|
||||||
}
|
}
|
||||||
|
val enableHive = isEnableHive(sparkSession)
|
||||||
withSparkConf(sparkSession, options) {
|
withSparkConf(sparkSession, options) {
|
||||||
Map(
|
Map(
|
||||||
"path" -> path,
|
"path" -> path,
|
||||||
@@ -244,7 +243,7 @@ object InsertIntoHoodieTableCommand {
|
|||||||
RECORDKEY_FIELD_OPT_KEY.key -> primaryColumns.mkString(","),
|
RECORDKEY_FIELD_OPT_KEY.key -> primaryColumns.mkString(","),
|
||||||
PARTITIONPATH_FIELD_OPT_KEY.key -> partitionFields,
|
PARTITIONPATH_FIELD_OPT_KEY.key -> partitionFields,
|
||||||
PAYLOAD_CLASS_OPT_KEY.key -> payloadClassName,
|
PAYLOAD_CLASS_OPT_KEY.key -> payloadClassName,
|
||||||
META_SYNC_ENABLED_OPT_KEY.key -> "true",
|
META_SYNC_ENABLED_OPT_KEY.key -> enableHive.toString,
|
||||||
HIVE_USE_JDBC_OPT_KEY.key -> "false",
|
HIVE_USE_JDBC_OPT_KEY.key -> "false",
|
||||||
HIVE_DATABASE_OPT_KEY.key -> table.identifier.database.getOrElse("default"),
|
HIVE_DATABASE_OPT_KEY.key -> table.identifier.database.getOrElse("default"),
|
||||||
HIVE_TABLE_OPT_KEY.key -> table.identifier.table,
|
HIVE_TABLE_OPT_KEY.key -> table.identifier.table,
|
||||||
|
|||||||
@@ -18,7 +18,6 @@
|
|||||||
package org.apache.spark.sql.hudi.command
|
package org.apache.spark.sql.hudi.command
|
||||||
|
|
||||||
import java.util.Base64
|
import java.util.Base64
|
||||||
|
|
||||||
import org.apache.avro.Schema
|
import org.apache.avro.Schema
|
||||||
import org.apache.hudi.DataSourceWriteOptions._
|
import org.apache.hudi.DataSourceWriteOptions._
|
||||||
import org.apache.hudi.config.HoodieWriteConfig
|
import org.apache.hudi.config.HoodieWriteConfig
|
||||||
@@ -426,7 +425,8 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
|
|||||||
throw new IllegalArgumentException(s"Merge Key[${targetKey2SourceExpression.keySet.mkString(",")}] is not" +
|
throw new IllegalArgumentException(s"Merge Key[${targetKey2SourceExpression.keySet.mkString(",")}] is not" +
|
||||||
s" Equal to the defined primary key[${definedPk.mkString(",")}] in table $targetTableName")
|
s" Equal to the defined primary key[${definedPk.mkString(",")}] in table $targetTableName")
|
||||||
}
|
}
|
||||||
|
// Enable the hive sync by default if spark have enable the hive metastore.
|
||||||
|
val enableHive = isEnableHive(sparkSession)
|
||||||
HoodieWriterUtils.parametersWithWriteDefaults(
|
HoodieWriterUtils.parametersWithWriteDefaults(
|
||||||
withSparkConf(sparkSession, options) {
|
withSparkConf(sparkSession, options) {
|
||||||
Map(
|
Map(
|
||||||
@@ -437,7 +437,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
|
|||||||
TABLE_NAME.key -> targetTableName,
|
TABLE_NAME.key -> targetTableName,
|
||||||
PARTITIONPATH_FIELD_OPT_KEY.key -> targetTable.partitionColumnNames.mkString(","),
|
PARTITIONPATH_FIELD_OPT_KEY.key -> targetTable.partitionColumnNames.mkString(","),
|
||||||
PAYLOAD_CLASS_OPT_KEY.key -> classOf[ExpressionPayload].getCanonicalName,
|
PAYLOAD_CLASS_OPT_KEY.key -> classOf[ExpressionPayload].getCanonicalName,
|
||||||
META_SYNC_ENABLED_OPT_KEY.key -> "true",
|
META_SYNC_ENABLED_OPT_KEY.key -> enableHive.toString,
|
||||||
HIVE_USE_JDBC_OPT_KEY.key -> "false",
|
HIVE_USE_JDBC_OPT_KEY.key -> "false",
|
||||||
HIVE_DATABASE_OPT_KEY.key -> targetTableDb,
|
HIVE_DATABASE_OPT_KEY.key -> targetTableDb,
|
||||||
HIVE_TABLE_OPT_KEY.key -> targetTableName,
|
HIVE_TABLE_OPT_KEY.key -> targetTableName,
|
||||||
|
|||||||
@@ -93,16 +93,17 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo
|
|||||||
|
|
||||||
assert(primaryColumns.nonEmpty,
|
assert(primaryColumns.nonEmpty,
|
||||||
s"There are no primary key in table $tableId, cannot execute update operator")
|
s"There are no primary key in table $tableId, cannot execute update operator")
|
||||||
|
val enableHive = isEnableHive(sparkSession)
|
||||||
withSparkConf(sparkSession, targetTable.storage.properties) {
|
withSparkConf(sparkSession, targetTable.storage.properties) {
|
||||||
Map(
|
Map(
|
||||||
"path" -> path.toString,
|
"path" -> path,
|
||||||
RECORDKEY_FIELD_OPT_KEY.key -> primaryColumns.mkString(","),
|
RECORDKEY_FIELD_OPT_KEY.key -> primaryColumns.mkString(","),
|
||||||
KEYGENERATOR_CLASS_OPT_KEY.key -> classOf[SqlKeyGenerator].getCanonicalName,
|
KEYGENERATOR_CLASS_OPT_KEY.key -> classOf[SqlKeyGenerator].getCanonicalName,
|
||||||
PRECOMBINE_FIELD_OPT_KEY.key -> primaryColumns.head, //set the default preCombine field.
|
PRECOMBINE_FIELD_OPT_KEY.key -> primaryColumns.head, //set the default preCombine field.
|
||||||
TABLE_NAME.key -> tableId.table,
|
TABLE_NAME.key -> tableId.table,
|
||||||
OPERATION_OPT_KEY.key -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
|
OPERATION_OPT_KEY.key -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
|
||||||
PARTITIONPATH_FIELD_OPT_KEY.key -> targetTable.partitionColumnNames.mkString(","),
|
PARTITIONPATH_FIELD_OPT_KEY.key -> targetTable.partitionColumnNames.mkString(","),
|
||||||
META_SYNC_ENABLED_OPT_KEY.key -> "false", // TODO make the meta sync enable by default.
|
META_SYNC_ENABLED_OPT_KEY.key -> enableHive.toString,
|
||||||
HIVE_USE_JDBC_OPT_KEY.key -> "false",
|
HIVE_USE_JDBC_OPT_KEY.key -> "false",
|
||||||
HIVE_DATABASE_OPT_KEY.key -> tableId.database.getOrElse("default"),
|
HIVE_DATABASE_OPT_KEY.key -> tableId.database.getOrElse("default"),
|
||||||
HIVE_TABLE_OPT_KEY.key -> tableId.table,
|
HIVE_TABLE_OPT_KEY.key -> tableId.table,
|
||||||
|
|||||||
@@ -37,7 +37,6 @@ class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll {
|
|||||||
.appName("hoodie sql test")
|
.appName("hoodie sql test")
|
||||||
.withExtensions(new HoodieSparkSessionExtension)
|
.withExtensions(new HoodieSparkSessionExtension)
|
||||||
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
||||||
.config("hoodie.datasource.meta.sync.enable", "false")
|
|
||||||
.config("hoodie.insert.shuffle.parallelism", "4")
|
.config("hoodie.insert.shuffle.parallelism", "4")
|
||||||
.config("hoodie.upsert.shuffle.parallelism", "4")
|
.config("hoodie.upsert.shuffle.parallelism", "4")
|
||||||
.config("hoodie.delete.shuffle.parallelism", "4")
|
.config("hoodie.delete.shuffle.parallelism", "4")
|
||||||
|
|||||||
Reference in New Issue
Block a user