1
0

[HUDI-2706] refactor spark-sql to make consistent with DataFrame api (#3936)

This commit is contained in:
Yann Byron
2021-11-15 07:44:39 +08:00
committed by GitHub
parent c2f9094b49
commit 0bb6d8ff80
31 changed files with 764 additions and 337 deletions

View File

@@ -19,11 +19,13 @@ package org.apache.hudi
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.DataSourceOptionsHelper.{allAlternatives, translateConfigurations}
import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties}
@@ -42,19 +44,21 @@ import org.apache.hudi.internal.DataSourceInternalWriterHelper
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.sync.common.AbstractSyncTool
import org.apache.hudi.table.BulkInsertPartitioner
import org.apache.log4j.LogManager
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession}
import org.apache.spark.{SPARK_VERSION, SparkContext}
import java.util
import java.util.Properties
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.StringBuilder
import scala.collection.mutable.ListBuffer
object HoodieSparkSqlWriter {
@@ -141,7 +145,7 @@ object HoodieSparkSqlWriter {
.setPartitionFields(partitionColumns)
.setPopulateMetaFields(populateMetaFields)
.setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD))
.setKeyGeneratorClassProp(hoodieConfig.getString(KEYGENERATOR_CLASS_NAME))
.setKeyGeneratorClassProp(HoodieWriterUtils.getOriginKeyGenerator(parameters))
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
.initTable(sparkContext.hadoopConfiguration, path)
@@ -713,22 +717,6 @@ object HoodieSparkSqlWriter {
}
}
private def validateTableConfig(spark: SparkSession, params: Map[String, String],
tableConfig: HoodieTableConfig): Unit = {
val resolver = spark.sessionState.conf.resolver
val diffConfigs = StringBuilder.newBuilder
params.foreach { case (key, value) =>
val existingValue = getStringFromTableConfigWithAlternatives(tableConfig, key)
if (null != existingValue && !resolver(existingValue, value)) {
diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n")
}
}
if (diffConfigs.nonEmpty) {
diffConfigs.insert(0, "\nConfig conflict(key\tcurrent value\texisting value):\n")
throw new HoodieException(diffConfigs.toString.trim)
}
}
private def mergeParamsAndGetHoodieConfig(optParams: Map[String, String],
tableConfig: HoodieTableConfig): (Map[String, String], HoodieConfig) = {
val mergedParams = mutable.Map.empty ++
@@ -745,16 +733,4 @@ object HoodieSparkSqlWriter {
val params = mergedParams.toMap
(params, HoodieWriterUtils.convertMapToHoodieConfig(params))
}
private def getStringFromTableConfigWithAlternatives(tableConfig: HoodieTableConfig, key: String): String = {
if (null == tableConfig) {
null
} else {
if (allAlternatives.contains(key)) {
tableConfig.getString(allAlternatives(key))
} else {
tableConfig.getString(key)
}
}
}
}

View File

@@ -17,15 +17,19 @@
package org.apache.hudi
import java.util.Properties
import org.apache.hudi.DataSourceOptionsHelper.allAlternatives
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE
import org.apache.hudi.common.config.{HoodieConfig, TypedProperties}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.exception.HoodieException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hudi.command.SqlKeyGenerator
import java.util.Properties
import scala.collection.JavaConversions.mapAsJavaMap
import scala.collection.JavaConverters.{mapAsScalaMapConverter, _}
import scala.collection.JavaConverters.mapAsScalaMapConverter
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import scala.collection.JavaConverters._
/**
* WriterUtils to assist in write path in Datasource and tests.
@@ -102,4 +106,68 @@ object HoodieWriterUtils {
properties.putAll(mapAsJavaMap(parameters))
new HoodieConfig(properties)
}
def getOriginKeyGenerator(parameters: Map[String, String]): String = {
val kg = parameters.getOrElse(KEYGENERATOR_CLASS_NAME.key(), null)
if (classOf[SqlKeyGenerator].getCanonicalName == kg) {
parameters.getOrElse(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME, null)
} else {
kg
}
}
/**
* Detects conflicts between new parameters and existing table configurations
*/
def validateTableConfig(spark: SparkSession, params: Map[String, String],
tableConfig: HoodieConfig): Unit = {
val resolver = spark.sessionState.conf.resolver
val diffConfigs = StringBuilder.newBuilder
params.foreach { case (key, value) =>
val existingValue = getStringFromTableConfigWithAlternatives(tableConfig, key)
if (null != existingValue && !resolver(existingValue, value)) {
diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n")
}
}
if (null != tableConfig) {
val datasourceRecordKey = params.getOrElse(RECORDKEY_FIELD.key(), null)
val tableConfigRecordKey = tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS)
if (null != datasourceRecordKey && null != tableConfigRecordKey
&& datasourceRecordKey != tableConfigRecordKey) {
diffConfigs.append(s"RecordKey:\t$datasourceRecordKey\t$tableConfigRecordKey\n")
}
val datasourcePreCombineKey = params.getOrElse(PRECOMBINE_FIELD.key(), null)
val tableConfigPreCombineKey = tableConfig.getString(HoodieTableConfig.PRECOMBINE_FIELD)
if (null != datasourcePreCombineKey && null != tableConfigPreCombineKey
&& datasourcePreCombineKey != tableConfigPreCombineKey) {
diffConfigs.append(s"PreCombineKey:\t$datasourcePreCombineKey\t$tableConfigPreCombineKey\n")
}
val datasourceKeyGen = getOriginKeyGenerator(params)
val tableConfigKeyGen = tableConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
if (null != datasourceKeyGen && null != tableConfigKeyGen
&& datasourceKeyGen != tableConfigKeyGen) {
diffConfigs.append(s"KeyGenerator:\t$datasourceKeyGen\t$tableConfigKeyGen\n")
}
}
if (diffConfigs.nonEmpty) {
diffConfigs.insert(0, "\nConfig conflict(key\tcurrent value\texisting value):\n")
throw new HoodieException(diffConfigs.toString.trim)
}
}
private def getStringFromTableConfigWithAlternatives(tableConfig: HoodieConfig, key: String): String = {
if (null == tableConfig) {
null
} else {
if (allAlternatives.contains(key)) {
tableConfig.getString(allAlternatives(key))
} else {
tableConfig.getString(key)
}
}
}
}

View File

@@ -20,6 +20,10 @@ package org.apache.spark.sql.hudi
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.common.model.DefaultHoodieRecordPayload
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.util.ValidationUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
/**
@@ -43,6 +47,7 @@ object HoodieOptionConfig {
.withSqlKey("primaryKey")
.withHoodieKey(DataSourceWriteOptions.RECORDKEY_FIELD.key)
.withTableConfigKey(HoodieTableConfig.RECORDKEY_FIELDS.key)
.defaultValue(DataSourceWriteOptions.RECORDKEY_FIELD.defaultValue())
.build()
val SQL_KEY_TABLE_TYPE: HoodieOption[String] = buildConf()
@@ -102,6 +107,8 @@ object HoodieOptionConfig {
private lazy val reverseValueMapping = valueMapping.map(f => f._2 -> f._1)
def withDefaultSqlOptions(options: Map[String, String]): Map[String, String] = defaultSqlOptions ++ options
/**
* Mapping the sql's short name key/value in the options to the hoodie's config key/value.
* @param options
@@ -119,14 +126,13 @@ object HoodieOptionConfig {
* @return
*/
def mappingSqlOptionToTableConfig(options: Map[String, String]): Map[String, String] = {
defaultTableConfig ++
options.map { case (k, v) =>
if (keyTableConfigMapping.contains(k)) {
keyTableConfigMapping(k) -> valueMapping.getOrElse(v, v)
} else {
k -> v
}
options.map { case (k, v) =>
if (keyTableConfigMapping.contains(k)) {
keyTableConfigMapping(k) -> valueMapping.getOrElse(v, v)
} else {
k -> v
}
}
}
/**
@@ -136,16 +142,19 @@ object HoodieOptionConfig {
options.map(kv => tableConfigKeyToSqlKey.getOrElse(kv._1, kv._1) -> reverseValueMapping.getOrElse(kv._2, kv._2))
}
private lazy val defaultTableConfig: Map[String, String] = {
private lazy val defaultSqlOptions: Map[String, String] = {
HoodieOptionConfig.getClass.getDeclaredFields
.filter(f => f.getType == classOf[HoodieOption[_]])
.map(f => {f.setAccessible(true); f.get(HoodieOptionConfig).asInstanceOf[HoodieOption[_]]})
.filter(option => option.tableConfigKey.isDefined && option.defaultValue.isDefined)
.map(option => option.tableConfigKey.get ->
valueMapping.getOrElse(option.defaultValue.get.toString, option.defaultValue.get.toString))
.map(option => option.sqlKeyName -> option.defaultValue.get.toString)
.toMap
}
private lazy val defaultTableConfig: Map[String, String] = {
mappingSqlOptionToHoodieParam(defaultSqlOptions)
}
/**
* Get the primary key from the table options.
* @param options
@@ -154,7 +163,7 @@ object HoodieOptionConfig {
def getPrimaryColumns(options: Map[String, String]): Array[String] = {
val params = mappingSqlOptionToHoodieParam(options)
params.get(DataSourceWriteOptions.RECORDKEY_FIELD.key)
.map(_.split(",").filter(_.length > 0))
.map(_.split(",").filter(_.nonEmpty))
.getOrElse(Array.empty)
}
@@ -171,7 +180,47 @@ object HoodieOptionConfig {
def getPreCombineField(options: Map[String, String]): Option[String] = {
val params = mappingSqlOptionToHoodieParam(options)
params.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key)
params.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key).filter(_.nonEmpty)
}
def deleteHooideOptions(options: Map[String, String]): Map[String, String] = {
options.filterNot(_._1.startsWith("hoodie.")).filterNot(kv => keyMapping.contains(kv._1))
}
// extract primaryKey, preCombineField, type options
def extractSqlOptions(options: Map[String, String]): Map[String, String] = {
val targetOptions = keyMapping.keySet -- Set(SQL_PAYLOAD_CLASS.sqlKeyName)
options.filterKeys(targetOptions.contains)
}
// validate primaryKey, preCombineField and type options
def validateTable(spark: SparkSession, schema: StructType, options: Map[String, String]): Unit = {
val resolver = spark.sessionState.conf.resolver
// validate primary key
val primaryKeys = options.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName)
.map(_.split(",").filter(_.length > 0))
ValidationUtils.checkArgument(primaryKeys.nonEmpty, "No `primaryKey` is specified.")
primaryKeys.get.foreach { primaryKey =>
ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, primaryKey)),
s"Can't find primary key `$primaryKey` in ${schema.treeString}.")
}
// validate precombine key
val precombineKey = options.get(SQL_KEY_PRECOMBINE_FIELD.sqlKeyName)
if (precombineKey.isDefined && precombineKey.get.nonEmpty) {
ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, precombineKey.get)),
s"Can't find precombine key `${precombineKey.get}` in ${schema.treeString}.")
}
// validate table type
val tableType = options.get(SQL_KEY_TABLE_TYPE.sqlKeyName)
ValidationUtils.checkArgument(tableType.nonEmpty, "No `type` is specified.")
ValidationUtils.checkArgument(
tableType.get.equalsIgnoreCase(HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_COW) ||
tableType.get.equalsIgnoreCase(HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_MOR),
s"'type' must be '${HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_COW}' or " +
s"'${HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_MOR}'")
}
def buildConf[T](): HoodieOptions[T] = {

View File

@@ -90,7 +90,7 @@ object HoodieSqlUtils extends SparkAdapterSupport {
val sparkEngine = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
val metadataConfig = {
val properties = new Properties()
properties.putAll((spark.sessionState.conf.getAllConfs ++ table.storage.properties).asJava)
properties.putAll((spark.sessionState.conf.getAllConfs ++ table.storage.properties ++ table.properties).asJava)
HoodieMetadataConfig.newBuilder.fromProperties(properties).build()
}
FSUtils.getAllPartitionPaths(sparkEngine, metadataConfig, getTableLocation(table, spark)).asScala

View File

@@ -202,8 +202,9 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
val targetTableId = getMergeIntoTargetTableId(mergeInto)
val targetTable =
sparkSession.sessionState.catalog.getTableMetadata(targetTableId)
val targetTableType = HoodieOptionConfig.getTableType(targetTable.storage.properties)
val preCombineField = HoodieOptionConfig.getPreCombineField(targetTable.storage.properties)
val tblProperties = targetTable.storage.properties ++ targetTable.properties
val targetTableType = HoodieOptionConfig.getTableType(tblProperties)
val preCombineField = HoodieOptionConfig.getPreCombineField(tblProperties)
// Get the map of target attribute to value of the update assignments.
val target2Values = resolvedAssignments.map {

View File

@@ -105,8 +105,13 @@ object AlterHoodieTableAddColumnsCommand {
val path = getTableLocation(table, sparkSession)
val jsc = new JavaSparkContext(sparkSession.sparkContext)
val client = DataSourceUtils.createHoodieClient(jsc, schema.toString,
path, table.identifier.table, HoodieWriterUtils.parametersWithWriteDefaults(table.storage.properties).asJava)
val client = DataSourceUtils.createHoodieClient(
jsc,
schema.toString,
path,
table.identifier.table,
HoodieWriterUtils.parametersWithWriteDefaults(table.storage.properties ++ table.properties).asJava
)
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder().setBasePath(path).setConf(hadoopConf).build()

View File

@@ -92,7 +92,7 @@ extends RunnableCommand {
.build()
val tableConfig = metaClient.getTableConfig
val optParams = withSparkConf(sparkSession, table.storage.properties) {
withSparkConf(sparkSession, table.storage.properties) {
Map(
"path" -> path,
TBL_NAME.key -> tableIdentifier.table,
@@ -104,10 +104,6 @@ extends RunnableCommand {
PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp
)
}
val parameters = HoodieWriterUtils.parametersWithWriteDefaults(optParams)
val translatedOptions = DataSourceWriteOptions.translateSqlOptions(parameters)
translatedOptions
}
def normalizePartitionSpec[T](

View File

@@ -19,9 +19,11 @@ package org.apache.spark.sql.hudi.command
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.hive.util.ConfigUtils
import org.apache.hudi.sql.InsertMode
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
@@ -73,9 +75,10 @@ case class CreateHoodieTableAsSelectCommand(
// Execute the insert query
try {
val tblProperties = table.storage.properties ++ table.properties
val options = Map(
DataSourceWriteOptions.HIVE_CREATE_MANAGED_TABLE.key -> (table.tableType == CatalogTableType.MANAGED).toString,
DataSourceWriteOptions.HIVE_TABLE_SERDE_PROPERTIES.key -> ConfigUtils.configToString(table.storage.properties.asJava),
DataSourceWriteOptions.HIVE_TABLE_SERDE_PROPERTIES.key -> ConfigUtils.configToString(tblProperties.asJava),
DataSourceWriteOptions.HIVE_TABLE_PROPERTIES.key -> ConfigUtils.configToString(table.properties.asJava),
DataSourceWriteOptions.SQL_INSERT_MODE.key -> InsertMode.NON_STRICT.value(),
DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key -> "true"
@@ -88,7 +91,9 @@ case class CreateHoodieTableAsSelectCommand(
if (!sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) {
// Create the table
val createTableCommand = CreateHoodieTableCommand(tableWithSchema, mode == SaveMode.Ignore)
createTableCommand.createTableInCatalog(sparkSession, checkPathForManagedTable = false)
val path = getTableLocation(table, sparkSession)
val (finalSchema, _, tableSqlOptions) = createTableCommand.parseSchemaAndConfigs(sparkSession, path, ctas = true)
createTableCommand.createTableInCatalog(sparkSession, finalSchema, tableSqlOptions)
}
} else { // failed to insert data, clear table path
clearTablePath(tablePath, hadoopConf)

View File

@@ -19,13 +19,17 @@ package org.apache.spark.sql.hudi.command
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.common.model.HoodieFileFormat
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.ValidationUtils
import org.apache.hudi.hadoop.HoodieParquetInputFormat
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils
import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.spark.internal.Logging
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -36,19 +40,17 @@ import org.apache.spark.sql.hive.HiveClientUtils
import org.apache.spark.sql.hive.HiveExternalCatalog._
import org.apache.spark.sql.hudi.HoodieOptionConfig
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.hudi.command.CreateHoodieTableCommand.{initTableIfNeed, isEmptyPath}
import org.apache.spark.sql.hudi.command.CreateHoodieTableCommand.checkTableConfigEqual
import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.{SPARK_VERSION, SparkConf}
import java.util.{Locale, Properties}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import java.util.{Locale, Properties}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.control.NonFatal
/**
* Command for create hoodie table.
@@ -56,9 +58,11 @@ import scala.collection.mutable
case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean)
extends RunnableCommand with SparkAdapterSupport {
override def run(sparkSession: SparkSession): Seq[Row] = {
val tableName = table.identifier.unquotedString
val tableName = formatName(table.identifier.table)
val tblProperties = table.storage.properties ++ table.properties
override def run(sparkSession: SparkSession): Seq[Row] = {
val tableIsExists = sparkSession.sessionState.catalog.tableExists(table.identifier)
if (tableIsExists) {
if (ignoreIfExists) {
@@ -66,64 +70,95 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
return Seq.empty[Row]
// scalastyle:on
} else {
throw new IllegalArgumentException(s"Table $tableName already exists.")
throw new IllegalArgumentException(s"Table ${table.identifier.unquotedString} already exists.")
}
}
// Create table in the catalog
val createTable = createTableInCatalog(sparkSession)
// get schema with meta fields, table config if hudi table exists, options including
// table configs and properties of the catalog table
val path = getTableLocation(table, sparkSession)
val (finalSchema, existingTableConfig, tableSqlOptions) = parseSchemaAndConfigs(sparkSession, path)
// Init the hoodie.properties
initTableIfNeed(sparkSession, createTable)
initTableIfNeed(sparkSession, path, finalSchema, existingTableConfig, tableSqlOptions)
try {
// Create table in the catalog
createTableInCatalog(sparkSession, finalSchema, tableSqlOptions)
} catch {
case NonFatal(e) =>
logWarning(s"Failed to create catalog table in metastore: ${e.getMessage}")
}
Seq.empty[Row]
}
def createTableInCatalog(sparkSession: SparkSession,
checkPathForManagedTable: Boolean = true): CatalogTable = {
def parseSchemaAndConfigs(sparkSession: SparkSession, path: String, ctas: Boolean = false)
: (StructType, Map[String, String], Map[String, String]) = {
val resolver = sparkSession.sessionState.conf.resolver
val conf = sparkSession.sessionState.newHadoopConf
// if CTAS, we treat the table we just created as nonexistent
val isTableExists = if (ctas) false else tableExistsInPath(path, conf)
var existingTableConfig = Map.empty[String, String]
val sqlOptions = HoodieOptionConfig.withDefaultSqlOptions(tblProperties)
val catalogTableProps = HoodieOptionConfig.mappingSqlOptionToTableConfig(tblProperties)
// get final schema and parameters
val (finalSchema, tableSqlOptions) = (table.tableType, isTableExists) match {
case (CatalogTableType.EXTERNAL, true) =>
// If this is an external table & the table has already exists in the location,
// load the schema from the table meta.
val metaClient = HoodieTableMetaClient.builder()
.setBasePath(path)
.setConf(conf)
.build()
val tableSchema = getTableSqlSchema(metaClient)
existingTableConfig = metaClient.getTableConfig.getProps.asScala.toMap
validateTableConfig(sparkSession, catalogTableProps, convertMapToHoodieConfig(existingTableConfig))
val options = extraTableConfig(sparkSession, isTableExists, existingTableConfig) ++
sqlOptions ++ HoodieOptionConfig.mappingTableConfigToSqlOption(existingTableConfig)
val userSpecifiedSchema = table.schema
val schema = if (userSpecifiedSchema.isEmpty && tableSchema.isDefined) {
tableSchema.get
} else if (userSpecifiedSchema.nonEmpty) {
userSpecifiedSchema
} else {
throw new IllegalArgumentException(s"Missing schema for Create Table: $tableName")
}
(addMetaFields(schema), options)
case (_, false) =>
assert(table.schema.nonEmpty, s"Missing schema for Create Table: $tableName")
val schema = table.schema
val options = extraTableConfig(sparkSession, isTableExists = false) ++ sqlOptions
(addMetaFields(schema), options)
case (CatalogTableType.MANAGED, true) =>
throw new AnalysisException(s"Can not create the managed table('$tableName')" +
s". The associated location('$path') already exists.")
}
HoodieOptionConfig.validateTable(sparkSession, finalSchema, tableSqlOptions)
val dataSchema = finalSchema.filterNot { f =>
table.partitionColumnNames.exists(resolver(_, f.name))
}
verifyDataSchema(table.identifier, table.tableType, dataSchema)
(finalSchema, existingTableConfig, tableSqlOptions)
}
def createTableInCatalog(sparkSession: SparkSession, finalSchema: StructType,
options: Map[String, String]): Unit = {
assert(table.tableType != CatalogTableType.VIEW)
assert(table.provider.isDefined)
val sessionState = sparkSession.sessionState
val tableName = table.identifier.unquotedString
val path = getTableLocation(table, sparkSession)
val conf = sparkSession.sessionState.newHadoopConf()
val isTableExists = tableExistsInPath(path, conf)
// Get the schema & table options
val (newSchema, tableOptions) = if (table.tableType == CatalogTableType.EXTERNAL &&
isTableExists) {
// If this is an external table & the table has already exists in the location,
// load the schema from the table meta.
val metaClient = HoodieTableMetaClient.builder()
.setBasePath(path)
.setConf(conf)
.build()
val tableSchema = getTableSqlSchema(metaClient)
// Get options from the external table and append with the options in ddl.
val originTableConfig = HoodieOptionConfig.mappingTableConfigToSqlOption(
metaClient.getTableConfig.getProps.asScala.toMap)
val extraConfig = extraTableConfig(sparkSession, isTableExists, originTableConfig)
val options = originTableConfig ++ table.storage.properties ++ extraConfig
val userSpecifiedSchema = table.schema
if (userSpecifiedSchema.isEmpty && tableSchema.isDefined) {
(addMetaFields(tableSchema.get), options)
} else if (userSpecifiedSchema.nonEmpty) {
(addMetaFields(userSpecifiedSchema), options)
} else {
throw new IllegalArgumentException(s"Missing schema for Create Table: $tableName")
}
} else {
assert(table.schema.nonEmpty, s"Missing schema for Create Table: $tableName")
// SPARK-19724: the default location of a managed table should be non-existent or empty.
if (checkPathForManagedTable && table.tableType == CatalogTableType.MANAGED
&& !isEmptyPath(path, conf)) {
throw new AnalysisException(s"Can not create the managed table('$tableName')" +
s". The associated location('$path') already exists.")
}
// Add the meta fields to the schema if this is a managed table or an empty external table.
val options = table.storage.properties ++ extraTableConfig(sparkSession, false)
(addMetaFields(table.schema), options)
}
val tableType = HoodieOptionConfig.getTableType(table.storage.properties)
val tableType = HoodieOptionConfig.getTableType(options)
val inputFormat = tableType match {
case DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL =>
classOf[HoodieParquetInputFormat].getCanonicalName
@@ -134,31 +169,39 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
val outputFormat = HoodieInputFormatUtils.getOutputFormatClassName(HoodieFileFormat.PARQUET)
val serdeFormat = HoodieInputFormatUtils.getSerDeClassName(HoodieFileFormat.PARQUET)
val newStorage = new CatalogStorageFormat(Some(new Path(path).toUri),
Some(inputFormat), Some(outputFormat), Some(serdeFormat),
table.storage.compressed, tableOptions + ("path" -> path))
// only parameters irrelevant to hudi can be set to storage.properties
val storageProperties = HoodieOptionConfig.deleteHooideOptions(options)
val newStorage = new CatalogStorageFormat(
Some(new Path(path).toUri),
Some(inputFormat),
Some(outputFormat),
Some(serdeFormat),
table.storage.compressed,
storageProperties + ("path" -> path))
val newDatabaseName = formatName(table.identifier.database
.getOrElse(sessionState.catalog.getCurrentDatabase))
val newTableName = formatName(table.identifier.table)
val newTableIdentifier = table.identifier
.copy(table = newTableName, database = Some(newDatabaseName))
.copy(table = tableName, database = Some(newDatabaseName))
val newTable = table.copy(identifier = newTableIdentifier,
schema = newSchema, storage = newStorage, createVersion = SPARK_VERSION)
// validate the table
validateTable(newTable)
// append pk, preCombineKey, type to the properties of table
val newTblProperties = table.storage.properties ++ table.properties ++ HoodieOptionConfig.extractSqlOptions(options)
val newTable = table.copy(
identifier = newTableIdentifier,
schema = finalSchema,
storage = newStorage,
createVersion = SPARK_VERSION,
properties = newTblProperties
)
// Create table in the catalog
val enableHive = isEnableHive(sparkSession)
if (enableHive) {
createHiveDataSourceTable(newTable, sparkSession)
} else {
sessionState.catalog.createTable(newTable, ignoreIfExists = false,
validateLocation = checkPathForManagedTable)
sessionState.catalog.createTable(newTable, ignoreIfExists = false, validateLocation = false)
}
newTable
}
/**
@@ -170,8 +213,6 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
* @param sparkSession
*/
private def createHiveDataSourceTable(table: CatalogTable, sparkSession: SparkSession): Unit = {
// check schema
verifyDataSchema(table.identifier, table.tableType, table.schema)
val dbName = table.identifier.database.get
// check database
val dbExists = sparkSession.sessionState.catalog.databaseExists(dbName)
@@ -186,7 +227,7 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
val dataSourceProps = tableMetaToTableProps(sparkSession.sparkContext.conf,
table, table.schema)
val tableWithDataSourceProps = table.copy(properties = dataSourceProps)
val tableWithDataSourceProps = table.copy(properties = dataSourceProps ++ table.properties)
val client = HiveClientUtils.newClientForMetadata(sparkSession.sparkContext.conf,
sparkSession.sessionState.newHadoopConf())
// create hive table.
@@ -198,9 +239,8 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
}
// This code is forked from org.apache.spark.sql.hive.HiveExternalCatalog#verifyDataSchema
private def verifyDataSchema(tableName: TableIdentifier,
tableType: CatalogTableType,
dataSchema: StructType): Unit = {
private def verifyDataSchema(tableName: TableIdentifier, tableType: CatalogTableType,
dataSchema: Seq[StructField]): Unit = {
if (tableType != CatalogTableType.VIEW) {
val invalidChars = Seq(",", ":", ";")
def verifyNestedColumnNames(schema: StructType): Unit = schema.foreach { f =>
@@ -230,10 +270,10 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
}
}
}
// This code is forked from org.apache.spark.sql.hive.HiveExternalCatalog#tableMetaToTableProps
private def tableMetaToTableProps( sparkConf: SparkConf,
table: CatalogTable,
schema: StructType): Map[String, String] = {
private def tableMetaToTableProps(sparkConf: SparkConf, table: CatalogTable,
schema: StructType): Map[String, String] = {
val partitionColumns = table.partitionColumnNames
val bucketSpec = table.bucketSpec
@@ -280,24 +320,6 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
properties.toMap
}
private def validateTable(table: CatalogTable): Unit = {
val options = table.storage.properties
// validate the pk if it exist in the table.
HoodieOptionConfig.getPrimaryColumns(options).foreach(pk => table.schema.fieldIndex(pk))
// validate the version column if it exist in the table.
HoodieOptionConfig.getPreCombineField(options).foreach(v => table.schema.fieldIndex(v))
// validate the partition columns
table.partitionColumnNames.foreach(p => table.schema.fieldIndex(p))
// validate table type
options.get(HoodieOptionConfig.SQL_KEY_TABLE_TYPE.sqlKeyName).foreach { tableType =>
ValidationUtils.checkArgument(
tableType.equalsIgnoreCase(HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_COW) ||
tableType.equalsIgnoreCase(HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_MOR),
s"'type' must be '${HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_COW}' or " +
s"'${HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_MOR}'")
}
}
def extraTableConfig(sparkSession: SparkSession, isTableExists: Boolean,
originTableConfig: Map[String, String] = Map.empty): Map[String, String] = {
val extraConfig = mutable.Map.empty[String, String]
@@ -322,10 +344,7 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) = HoodieTableConfig.URL_ENCODE_PARTITIONING.defaultValue()
}
val primaryColumns = HoodieOptionConfig.getPrimaryColumns(originTableConfig ++ table.storage.properties)
if (primaryColumns.isEmpty) {
extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = classOf[UuidKeyGenerator].getCanonicalName
} else if (originTableConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) {
if (originTableConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) {
extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) =
HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(
originTableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key))
@@ -334,31 +353,20 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
}
extraConfig.toMap
}
}
object CreateHoodieTableCommand extends Logging {
/**
* Init the hoodie.properties.
*/
def initTableIfNeed(sparkSession: SparkSession, table: CatalogTable): Unit = {
val location = getTableLocation(table, sparkSession)
def initTableIfNeed(sparkSession: SparkSession,
location: String,
schema: StructType,
originTableConfig: Map[String, String],
sqlOptions: Map[String, String]): Unit = {
val conf = sparkSession.sessionState.newHadoopConf()
// Init the hoodie table
val originTableConfig = if (tableExistsInPath(location, conf)) {
val metaClient = HoodieTableMetaClient.builder()
.setBasePath(location)
.setConf(conf)
.build()
metaClient.getTableConfig.getProps.asScala.toMap
} else {
Map.empty[String, String]
}
val tableName = table.identifier.table
logInfo(s"Init hoodie.properties for $tableName")
val tableOptions = HoodieOptionConfig.mappingSqlOptionToTableConfig(table.storage.properties)
val conf = sparkSession.sessionState.newHadoopConf()
val tableOptions = HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions)
checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.PRECOMBINE_FIELD.key)
checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.PARTITION_FIELDS.key)
checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.RECORDKEY_FIELDS.key)
@@ -372,10 +380,13 @@ object CreateHoodieTableCommand extends Logging {
HoodieTableMetaClient.withPropertyBuilder()
.fromProperties(properties)
.setTableName(tableName)
.setTableCreateSchema(SchemaConverters.toAvroType(table.schema).toString())
.setTableCreateSchema(SchemaConverters.toAvroType(schema).toString())
.setPartitionFields(table.partitionColumnNames.mkString(","))
.initTable(conf, location)
}
}
object CreateHoodieTableCommand extends Logging {
def checkTableConfigEqual(originTableConfig: Map[String, String],
newTableConfig: Map[String, String], configKey: String): Unit = {

View File

@@ -23,11 +23,12 @@ import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.hudi.HoodieOptionConfig
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.types.StructType
case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends RunnableCommand
with SparkAdapterSupport {
@@ -56,8 +57,8 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Runnab
}
private def buildHoodieConfig(sparkSession: SparkSession): Map[String, String] = {
val targetTable = sparkSession.sessionState.catalog
.getTableMetadata(tableId)
val targetTable = sparkSession.sessionState.catalog.getTableMetadata(tableId)
val tblProperties = targetTable.storage.properties ++ targetTable.properties
val path = getTableLocation(targetTable, sparkSession)
val conf = sparkSession.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder()
@@ -65,23 +66,27 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Runnab
.setConf(conf)
.build()
val tableConfig = metaClient.getTableConfig
val primaryColumns = HoodieOptionConfig.getPrimaryColumns(targetTable.storage.properties)
val tableSchema = getTableSqlSchema(metaClient).get
val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase)
val partitionSchema = StructType(tableSchema.filter(f => partitionColumns.contains(f.name)))
val primaryColumns = tableConfig.getRecordKeyFields.get()
assert(primaryColumns.nonEmpty,
s"There are no primary key defined in table $tableId, cannot execute delete operator")
withSparkConf(sparkSession, targetTable.storage.properties) {
withSparkConf(sparkSession, tblProperties) {
Map(
"path" -> path,
TBL_NAME.key -> tableId.table,
HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitoning,
KEYGENERATOR_CLASS_NAME.key -> tableConfig.getKeyGeneratorClassName,
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","),
PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> "200",
SqlKeyGenerator.PARTITION_SCHEMA -> targetTable.partitionSchema.toDDL
SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL
)
}
}

View File

@@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi.command
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, IndexedRecord}
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
@@ -30,7 +31,8 @@ import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hudi.sql.InsertMode
import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils}
import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
@@ -40,10 +42,13 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlUtils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
import java.util.Properties
import scala.collection.JavaConverters._
/**
* Command for insert into hoodie table.
*/
@@ -194,46 +199,39 @@ object InsertIntoHoodieTableCommand extends Logging {
s"[${insertPartitions.keys.mkString(" " )}]" +
s" not equal to the defined partition in table[${table.partitionColumnNames.mkString(",")}]")
}
val options = table.storage.properties ++ extraOptions
val parameters = withSparkConf(sparkSession, options)()
val tableType = parameters.getOrElse(TABLE_TYPE.key, TABLE_TYPE.defaultValue)
val primaryColumns = HoodieOptionConfig.getPrimaryColumns(options)
val partitionFields = table.partitionColumnNames.mkString(",")
val path = getTableLocation(table, sparkSession)
val conf = sparkSession.sessionState.newHadoopConf()
val isTableExists = tableExistsInPath(path, conf)
val tableConfig = if (isTableExists) {
HoodieTableMetaClient.builder()
val (tableConfig, tableSchema) = if (isTableExists) {
val metaClient = HoodieTableMetaClient.builder()
.setBasePath(path)
.setConf(conf)
.build()
.getTableConfig
(metaClient.getTableConfig, getTableSqlSchema(metaClient).get)
} else {
null
(new HoodieTableConfig(), table.schema)
}
val hiveStylePartitioningEnable = if (null == tableConfig || null == tableConfig.getHiveStylePartitioningEnable) {
"true"
val partitionColumns = tableConfig.getPartitionFieldProp
val partitionSchema = if (null == partitionColumns || partitionColumns.isEmpty) {
table.partitionSchema
} else {
tableConfig.getHiveStylePartitioningEnable
}
val urlEncodePartitioning = if (null == tableConfig || null == tableConfig.getUrlEncodePartitoning) {
"false"
} else {
tableConfig.getUrlEncodePartitoning
}
val keyGeneratorClassName = if (null == tableConfig || null == tableConfig.getKeyGeneratorClassName) {
if (primaryColumns.nonEmpty) {
classOf[ComplexKeyGenerator].getCanonicalName
} else {
classOf[UuidKeyGenerator].getCanonicalName
}
} else {
tableConfig.getKeyGeneratorClassName
StructType(tableSchema.filter(f => partitionColumns.contains(f.name)))
}
val tableSchema = table.schema
val options = table.storage.properties ++ table.properties ++ tableConfig.getProps.asScala.toMap ++ extraOptions
val parameters = withSparkConf(sparkSession, options)()
val tableName = Option(tableConfig.getTableName).getOrElse(table.identifier.table)
val tableType = Option(tableConfig.getTableType.name).getOrElse(TABLE_TYPE.defaultValue)
val primaryColumns = tableConfig.getRecordKeyFields.orElse(HoodieOptionConfig.getPrimaryColumns(options))
val preCombineColumn = Option(tableConfig.getPreCombineField)
.getOrElse(HoodieOptionConfig.getPreCombineField(options).getOrElse(""))
val partitionFields = Option(tableConfig.getPartitionFieldProp)
.getOrElse(table.partitionColumnNames.mkString(","))
val hiveStylePartitioningEnable = Option(tableConfig.getHiveStylePartitioningEnable).getOrElse("true")
val urlEncodePartitioning = Option(tableConfig.getUrlEncodePartitoning).getOrElse("false")
val keyGeneratorClassName = Option(tableConfig.getKeyGeneratorClassName)
.getOrElse(classOf[ComplexKeyGenerator].getCanonicalName)
val dropDuplicate = sparkSession.conf
.getOption(INSERT_DROP_DUPS.key)
@@ -242,35 +240,33 @@ object InsertIntoHoodieTableCommand extends Logging {
val enableBulkInsert = parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
val hasPrecombineColumn = preCombineColumn.nonEmpty
val isPartitionedTable = table.partitionColumnNames.nonEmpty
val isPrimaryKeyTable = primaryColumns.nonEmpty
val insertMode = InsertMode.of(parameters.getOrElse(DataSourceWriteOptions.SQL_INSERT_MODE.key,
DataSourceWriteOptions.SQL_INSERT_MODE.defaultValue()))
val isNonStrictMode = insertMode == InsertMode.NON_STRICT
val operation =
(isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
case (true, true, _, _) if !isNonStrictMode =>
(enableBulkInsert, isOverwrite, dropDuplicate, isNonStrictMode, isPartitionedTable) match {
case (true, _, _, false, _) =>
throw new IllegalArgumentException(s"Table with primaryKey can not use bulk insert in ${insertMode.value()} mode.")
case (_, true, true, _) if isPartitionedTable =>
case (true, true, _, _, true) =>
throw new IllegalArgumentException(s"Insert Overwrite Partition can not use bulk insert.")
case (_, true, _, true) =>
case (true, _, true, _, _) =>
throw new IllegalArgumentException(s"Bulk insert cannot support drop duplication." +
s" Please disable $INSERT_DROP_DUPS and try again.")
// if enableBulkInsert is true, use bulk insert for the insert overwrite non-partitioned table.
case (_, true, true, _) if !isPartitionedTable => BULK_INSERT_OPERATION_OPT_VAL
// insert overwrite partition
case (_, _, true, _) if isPartitionedTable => INSERT_OVERWRITE_OPERATION_OPT_VAL
case (true, true, _, _, false) => BULK_INSERT_OPERATION_OPT_VAL
// insert overwrite table
case (_, _, true, _) if !isPartitionedTable => INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
// if it is pk table and the dropDuplicate has disable, use the upsert operation for strict and upsert mode.
case (true, false, false, false) if !isNonStrictMode => UPSERT_OPERATION_OPT_VAL
// if enableBulkInsert is true and the table is non-primaryKeyed, use the bulk insert operation
case (false, true, _, _) => BULK_INSERT_OPERATION_OPT_VAL
case (false, true, _, _, false) => INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
// insert overwrite partition
case (_, true, _, _, true) => INSERT_OVERWRITE_OPERATION_OPT_VAL
// disable dropDuplicate, and provide preCombineKey, use the upsert operation for strict and upsert mode.
case (false, false, false, false, _) if hasPrecombineColumn => UPSERT_OPERATION_OPT_VAL
// if table is pk table and has enableBulkInsert use bulk insert for non-strict mode.
case (true, true, _, _) if isNonStrictMode => BULK_INSERT_OPERATION_OPT_VAL
case (true, _, _, true, _) => BULK_INSERT_OPERATION_OPT_VAL
// for the rest case, use the insert operation
case (_, _, _, _) => INSERT_OPERATION_OPT_VAL
case _ => INSERT_OPERATION_OPT_VAL
}
val payloadClassName = if (operation == UPSERT_OPERATION_OPT_VAL &&
@@ -288,17 +284,18 @@ object InsertIntoHoodieTableCommand extends Logging {
Map(
"path" -> path,
TABLE_TYPE.key -> tableType,
TBL_NAME.key -> table.identifier.table,
PRECOMBINE_FIELD.key -> tableSchema.fields.last.name,
TBL_NAME.key -> tableName,
PRECOMBINE_FIELD.key -> preCombineColumn,
OPERATION.key -> operation,
HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning,
KEYGENERATOR_CLASS_NAME.key -> keyGeneratorClassName,
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> keyGeneratorClassName,
RECORDKEY_FIELD.key -> primaryColumns.mkString(","),
PARTITIONPATH_FIELD.key -> partitionFields,
PAYLOAD_CLASS_NAME.key -> payloadClassName,
ENABLE_ROW_WRITER.key -> enableBulkInsert.toString,
HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> isPrimaryKeyTable.toString,
HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn),
META_SYNC_ENABLED.key -> enableHive.toString,
HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
HIVE_USE_JDBC.key -> "false",

View File

@@ -20,6 +20,7 @@ package org.apache.spark.sql.hudi.command
import org.apache.avro.Schema
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.MultiPartKeysValueExtractor
@@ -80,8 +81,9 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
private lazy val targetTable =
sparkSession.sessionState.catalog.getTableMetadata(targetTableIdentify)
private lazy val targetTableType =
HoodieOptionConfig.getTableType(targetTable.storage.properties)
private lazy val tblProperties = targetTable.storage.properties ++ targetTable.properties
private lazy val targetTableType = HoodieOptionConfig.getTableType(tblProperties)
/**
*
@@ -124,7 +126,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
assert(updateActions.size <= 1, s"Only support one updateAction currently, current update action count is: ${updateActions.size}")
val updateAction = updateActions.headOption
HoodieOptionConfig.getPreCombineField(targetTable.storage.properties).map(preCombineField => {
HoodieOptionConfig.getPreCombineField(tblProperties).map(preCombineField => {
val sourcePreCombineField =
updateAction.map(u => u.assignments.filter {
case Assignment(key: AttributeReference, _) => key.name.equalsIgnoreCase(preCombineField)
@@ -242,8 +244,13 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
// Append the table schema to the parameters. In the case of merge into, the schema of sourceDF
// may be different from the target table, because the are transform logical in the update or
// insert actions.
val operation = if (StringUtils.isNullOrEmpty(parameters.getOrElse(PRECOMBINE_FIELD.key, ""))) {
INSERT_OPERATION_OPT_VAL
} else {
UPSERT_OPERATION_OPT_VAL
}
var writeParams = parameters +
(OPERATION.key -> UPSERT_OPERATION_OPT_VAL) +
(OPERATION.key -> operation) +
(HoodieWriteConfig.WRITE_SCHEMA.key -> getTableSchema.toString) +
(DataSourceWriteOptions.TABLE_TYPE.key -> targetTableType)
@@ -436,38 +443,38 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
.setConf(conf)
.build()
val tableConfig = metaClient.getTableConfig
val options = targetTable.storage.properties
val definedPk = HoodieOptionConfig.getPrimaryColumns(options)
// TODO Currently the mergeEqualConditionKeys must be the same the primary key.
if (targetKey2SourceExpression.keySet != definedPk.toSet) {
throw new IllegalArgumentException(s"Merge Key[${targetKey2SourceExpression.keySet.mkString(",")}] is not" +
s" Equal to the defined primary key[${definedPk.mkString(",")}] in table $targetTableName")
}
val tableSchema = getTableSqlSchema(metaClient).get
val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase)
val partitionSchema = StructType(tableSchema.filter(f => partitionColumns.contains(f.name)))
val options = tblProperties
val preCombineColumn = Option(tableConfig.getPreCombineField).getOrElse("")
// Enable the hive sync by default if spark have enable the hive metastore.
val enableHive = isEnableHive(sparkSession)
withSparkConf(sparkSession, options) {
Map(
"path" -> path,
RECORDKEY_FIELD.key -> targetKey2SourceExpression.keySet.mkString(","),
PRECOMBINE_FIELD.key -> targetKey2SourceExpression.keySet.head, // set a default preCombine field
RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp,
PRECOMBINE_FIELD.key -> preCombineColumn,
TBL_NAME.key -> targetTableName,
PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","),
PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName,
HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitoning,
KEYGENERATOR_CLASS_NAME.key -> tableConfig.getKeyGeneratorClassName,
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
META_SYNC_ENABLED.key -> enableHive.toString,
HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
HIVE_USE_JDBC.key -> "false",
HIVE_DATABASE.key -> targetTableDb,
HIVE_TABLE.key -> targetTableName,
HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
HIVE_PARTITION_FIELDS.key -> targetTable.partitionColumnNames.mkString(","),
HIVE_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "200", // set the default parallelism to 200 for sql
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> "200",
SqlKeyGenerator.PARTITION_SCHEMA -> targetTable.partitionSchema.toDDL
SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL
)
}
}

View File

@@ -18,20 +18,20 @@
package org.apache.spark.sql.hudi.command
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, UpdateTable}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.hudi.HoodieOptionConfig
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.{StructField, StructType}
import scala.collection.JavaConverters._
@@ -83,8 +83,8 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo
}
private def buildHoodieConfig(sparkSession: SparkSession): Map[String, String] = {
val targetTable = sparkSession.sessionState.catalog
.getTableMetadata(tableId)
val targetTable = sparkSession.sessionState.catalog.getTableMetadata(tableId)
val tblProperties = targetTable.storage.properties ++ targetTable.properties
val path = getTableLocation(targetTable, sparkSession)
val conf = sparkSession.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder()
@@ -92,32 +92,37 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo
.setConf(conf)
.build()
val tableConfig = metaClient.getTableConfig
val primaryColumns = HoodieOptionConfig.getPrimaryColumns(targetTable.storage.properties)
val tableSchema = getTableSqlSchema(metaClient).get
val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase)
val partitionSchema = StructType(tableSchema.filter(f => partitionColumns.contains(f.name)))
val primaryColumns = tableConfig.getRecordKeyFields.get()
val preCombineColumn = Option(tableConfig.getPreCombineField).getOrElse("")
assert(primaryColumns.nonEmpty,
s"There are no primary key in table $tableId, cannot execute update operator")
val enableHive = isEnableHive(sparkSession)
withSparkConf(sparkSession, targetTable.storage.properties) {
withSparkConf(sparkSession, tblProperties) {
Map(
"path" -> path,
RECORDKEY_FIELD.key -> primaryColumns.mkString(","),
PRECOMBINE_FIELD.key -> primaryColumns.head, //set the default preCombine field.
PRECOMBINE_FIELD.key -> preCombineColumn,
TBL_NAME.key -> tableId.table,
HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitoning,
KEYGENERATOR_CLASS_NAME.key -> tableConfig.getKeyGeneratorClassName,
OPERATION.key -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","),
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
OPERATION.key -> UPSERT_OPERATION_OPT_VAL,
PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
META_SYNC_ENABLED.key -> enableHive.toString,
HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
HIVE_USE_JDBC.key -> "false",
HIVE_DATABASE.key -> tableId.database.getOrElse("default"),
HIVE_TABLE.key -> tableId.table,
HIVE_PARTITION_FIELDS.key -> targetTable.partitionColumnNames.mkString(","),
HIVE_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
SqlKeyGenerator.PARTITION_SCHEMA -> targetTable.partitionSchema.toDDL
SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL
)
}
}