1
0

[HUDI-2759] extract HoodieCatalogTable to coordinate spark catalog table and hoodie table (#3998)

This commit is contained in:
Yann Byron
2021-11-24 18:12:38 +08:00
committed by GitHub
parent 0bb506fa00
commit a234833f0a
18 changed files with 619 additions and 550 deletions

View File

@@ -0,0 +1,303 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.catalog
import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.common.config.DFSPropertiesConfiguration
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.ValidationUtils
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlUtils}
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.types.{StructField, StructType}
import java.util.{Locale, Properties}
import scala.collection.JavaConverters._
import scala.collection.mutable
/**
* A wrapper of hoodie CatalogTable instance and hoodie Table.
*/
class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) extends Logging {
assert(table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi", "It's not a Hudi table")
private val hadoopConf = spark.sessionState.newHadoopConf
/**
* database.table in catalog
*/
val catalogTableName = table.qualifiedName
/**
* properties defined in catalog.
*/
val catalogProperties: Map[String, String] = table.storage.properties ++ table.properties
/**
* hoodie table's location.
* if create managed hoodie table, use `catalog.defaultTablePath`.
*/
val tableLocation: String = HoodieSqlUtils.getTableLocation(table, spark)
/**
* A flag to whether the hoodie table exists.
*/
val hoodieTableExists: Boolean = tableExistsInPath(tableLocation, hadoopConf)
/**
* Meta Client.
*/
lazy val metaClient: HoodieTableMetaClient = HoodieTableMetaClient.builder()
.setBasePath(tableLocation)
.setConf(hadoopConf)
.build()
/**
* Hoodie Table Config
*/
lazy val tableConfig: HoodieTableConfig = metaClient.getTableConfig
/**
* the name of table
*/
lazy val tableName: String = tableConfig.getTableName
/**
* The name of type of table
*/
lazy val tableType: HoodieTableType = tableConfig.getTableType
/**
* The type of table
*/
lazy val tableTypeName: String = tableType.name()
/**
* Recored Field List(Primary Key List)
*/
lazy val primaryKeys: Array[String] = tableConfig.getRecordKeyFields.orElse(Array.empty)
/**
* PreCombine Field
*/
lazy val preCombineKey: Option[String] = Option(tableConfig.getPreCombineField)
/**
* Paritition Fields
*/
lazy val partitionFields: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty)
/**
* The schema of table.
* Make StructField nullable.
*/
lazy val tableSchema: StructType = {
val originSchema = getTableSqlSchema(metaClient, includeMetadataFields = true).get
StructType(originSchema.map(_.copy(nullable = true)))
}
/**
* The schema without hoodie meta fields
*/
lazy val tableSchemaWithoutMetaFields: StructType = HoodieSqlUtils.removeMetaFields(tableSchema)
/**
* The schema of data fields
*/
lazy val dataSchema: StructType = {
StructType(tableSchema.filterNot(f => partitionFields.contains(f.name)))
}
/**
* The schema of data fields not including hoodie meta fields
*/
lazy val dataSchemaWithoutMetaFields: StructType = HoodieSqlUtils.removeMetaFields(dataSchema)
/**
* The schema of partition fields
*/
lazy val partitionSchema: StructType = StructType(tableSchema.filter(f => partitionFields.contains(f.name)))
/**
* All the partition paths
*/
def getAllPartitionPaths: Seq[String] = HoodieSqlUtils.getAllPartitionPaths(spark, table)
/**
* init hoodie table for create table (as select)
*/
def initHoodieTable(): Unit = {
logInfo(s"Init hoodie.properties for ${table.identifier.unquotedString}")
val (finalSchema, tableConfigs) = parseSchemaAndConfigs()
// Save all the table config to the hoodie.properties.
val properties = new Properties()
properties.putAll(tableConfigs.asJava)
HoodieTableMetaClient.withPropertyBuilder()
.fromProperties(properties)
.setTableName(table.identifier.table)
.setTableCreateSchema(SchemaConverters.toAvroType(finalSchema).toString())
.setPartitionFields(table.partitionColumnNames.mkString(","))
.initTable(hadoopConf, tableLocation)
}
/**
* @return schema, table parameters in which all parameters aren't sql-styled.
*/
private def parseSchemaAndConfigs(): (StructType, Map[String, String]) = {
val globalProps = DFSPropertiesConfiguration.getGlobalProps.asScala.toMap
val globalTableConfigs = mappingSparkDatasourceConfigsToTableConfigs(globalProps)
val globalSqlOptions = HoodieOptionConfig.mappingTableConfigToSqlOption(globalTableConfigs)
val sqlOptions = HoodieOptionConfig.withDefaultSqlOptions(globalSqlOptions ++ catalogProperties)
// get final schema and parameters
val (finalSchema, tableConfigs) = (table.tableType, hoodieTableExists) match {
case (CatalogTableType.EXTERNAL, true) =>
val existingTableConfig = tableConfig.getProps.asScala.toMap
val currentTableConfig = globalTableConfigs ++ existingTableConfig
val catalogTableProps = HoodieOptionConfig.mappingSqlOptionToTableConfig(catalogProperties)
validateTableConfig(spark, catalogTableProps, convertMapToHoodieConfig(existingTableConfig))
val options = extraTableConfig(spark, hoodieTableExists, currentTableConfig) ++
HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions) ++ currentTableConfig
ValidationUtils.checkArgument(tableSchema.nonEmpty || table.schema.nonEmpty,
s"Missing schema for Create Table: $catalogTableName")
val schema = if (tableSchema.nonEmpty) {
tableSchema
} else {
addMetaFields(table.schema)
}
(schema, options)
case (_, false) =>
ValidationUtils.checkArgument(table.schema.nonEmpty,
s"Missing schema for Create Table: $catalogTableName")
val schema = table.schema
val options = extraTableConfig(spark, isTableExists = false, globalTableConfigs) ++
HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions)
(addMetaFields(schema), options)
case (CatalogTableType.MANAGED, true) =>
throw new AnalysisException(s"Can not create the managed table('$catalogTableName')" +
s". The associated location('$tableLocation') already exists.")
}
HoodieOptionConfig.validateTable(spark, finalSchema,
HoodieOptionConfig.mappingTableConfigToSqlOption(tableConfigs))
val resolver = spark.sessionState.conf.resolver
val dataSchema = finalSchema.filterNot { f =>
table.partitionColumnNames.exists(resolver(_, f.name))
}
verifyDataSchema(table.identifier, table.tableType, dataSchema)
(finalSchema, tableConfigs)
}
private def extraTableConfig(sparkSession: SparkSession, isTableExists: Boolean,
originTableConfig: Map[String, String] = Map.empty): Map[String, String] = {
val extraConfig = mutable.Map.empty[String, String]
if (isTableExists) {
val allPartitionPaths = getAllPartitionPaths
if (originTableConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)) {
extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) =
originTableConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)
} else {
extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) =
String.valueOf(isHiveStyledPartitioning(allPartitionPaths, table))
}
if (originTableConfig.contains(HoodieTableConfig.URL_ENCODE_PARTITIONING.key)) {
extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) =
originTableConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key)
} else {
extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) =
String.valueOf(isUrlEncodeEnabled(allPartitionPaths, table))
}
} else {
extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) = "true"
extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) = HoodieTableConfig.URL_ENCODE_PARTITIONING.defaultValue()
}
if (originTableConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) {
extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) =
HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(
originTableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key))
} else {
extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = classOf[ComplexKeyGenerator].getCanonicalName
}
extraConfig.toMap
}
// This code is forked from org.apache.spark.sql.hive.HiveExternalCatalog#verifyDataSchema
private def verifyDataSchema(tableIdentifier: TableIdentifier, tableType: CatalogTableType,
dataSchema: Seq[StructField]): Unit = {
if (tableType != CatalogTableType.VIEW) {
val invalidChars = Seq(",", ":", ";")
def verifyNestedColumnNames(schema: StructType): Unit = schema.foreach { f =>
f.dataType match {
case st: StructType => verifyNestedColumnNames(st)
case _ if invalidChars.exists(f.name.contains) =>
val invalidCharsString = invalidChars.map(c => s"'$c'").mkString(", ")
val errMsg = "Cannot create a table having a nested column whose name contains " +
s"invalid characters ($invalidCharsString) in Hive metastore. Table: $tableIdentifier; " +
s"Column: ${f.name}"
throw new AnalysisException(errMsg)
case _ =>
}
}
dataSchema.foreach { f =>
f.dataType match {
// Checks top-level column names
case _ if f.name.contains(",") =>
throw new AnalysisException("Cannot create a table having a column whose name " +
s"contains commas in Hive metastore. Table: $tableIdentifier; Column: ${f.name}")
// Checks nested column names
case st: StructType =>
verifyNestedColumnNames(st)
case _ =>
}
}
}
}
}
object HoodieCatalogTable {
def apply(sparkSession: SparkSession, tableIdentifier: TableIdentifier): HoodieCatalogTable = {
val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
HoodieCatalogTable(sparkSession, catalogTable)
}
def apply(sparkSession: SparkSession, catalogTable: CatalogTable): HoodieCatalogTable = {
new HoodieCatalogTable(sparkSession, catalogTable)
}
}

View File

@@ -18,10 +18,9 @@
package org.apache.spark.sql.hudi
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.common.model.DefaultHoodieRecordPayload
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieTableType}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.util.ValidationUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
@@ -43,27 +42,27 @@ object HoodieOptionConfig {
val SQL_VALUE_TABLE_TYPE_MOR = "mor"
val SQL_KEY_TABLE_PRIMARY_KEY: HoodieOption[String] = buildConf()
val SQL_KEY_TABLE_PRIMARY_KEY: HoodieSQLOption[String] = buildConf()
.withSqlKey("primaryKey")
.withHoodieKey(DataSourceWriteOptions.RECORDKEY_FIELD.key)
.withTableConfigKey(HoodieTableConfig.RECORDKEY_FIELDS.key)
.defaultValue(DataSourceWriteOptions.RECORDKEY_FIELD.defaultValue())
.build()
val SQL_KEY_TABLE_TYPE: HoodieOption[String] = buildConf()
val SQL_KEY_TABLE_TYPE: HoodieSQLOption[String] = buildConf()
.withSqlKey("type")
.withHoodieKey(DataSourceWriteOptions.TABLE_TYPE.key)
.withTableConfigKey(HoodieTableConfig.TYPE.key)
.defaultValue(SQL_VALUE_TABLE_TYPE_COW)
.build()
val SQL_KEY_PRECOMBINE_FIELD: HoodieOption[String] = buildConf()
val SQL_KEY_PRECOMBINE_FIELD: HoodieSQLOption[String] = buildConf()
.withSqlKey("preCombineField")
.withHoodieKey(DataSourceWriteOptions.PRECOMBINE_FIELD.key)
.withTableConfigKey(HoodieTableConfig.PRECOMBINE_FIELD.key)
.build()
val SQL_PAYLOAD_CLASS: HoodieOption[String] = buildConf()
val SQL_PAYLOAD_CLASS: HoodieSQLOption[String] = buildConf()
.withSqlKey("payloadClass")
.withHoodieKey(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key)
.withTableConfigKey(HoodieTableConfig.PAYLOAD_CLASS_NAME.key)
@@ -75,8 +74,8 @@ object HoodieOptionConfig {
*/
private lazy val keyMapping: Map[String, String] = {
HoodieOptionConfig.getClass.getDeclaredFields
.filter(f => f.getType == classOf[HoodieOption[_]])
.map(f => {f.setAccessible(true); f.get(HoodieOptionConfig).asInstanceOf[HoodieOption[_]]})
.filter(f => f.getType == classOf[HoodieSQLOption[_]])
.map(f => {f.setAccessible(true); f.get(HoodieOptionConfig).asInstanceOf[HoodieSQLOption[_]]})
.map(option => option.sqlKeyName -> option.hoodieKeyName)
.toMap
}
@@ -87,8 +86,8 @@ object HoodieOptionConfig {
*/
private lazy val keyTableConfigMapping: Map[String, String] = {
HoodieOptionConfig.getClass.getDeclaredFields
.filter(f => f.getType == classOf[HoodieOption[_]])
.map(f => {f.setAccessible(true); f.get(HoodieOptionConfig).asInstanceOf[HoodieOption[_]]})
.filter(f => f.getType == classOf[HoodieSQLOption[_]])
.map(f => {f.setAccessible(true); f.get(HoodieOptionConfig).asInstanceOf[HoodieSQLOption[_]]})
.filter(_.tableConfigKey.isDefined)
.map(option => option.sqlKeyName -> option.tableConfigKey.get)
.toMap
@@ -142,19 +141,15 @@ object HoodieOptionConfig {
options.map(kv => tableConfigKeyToSqlKey.getOrElse(kv._1, kv._1) -> reverseValueMapping.getOrElse(kv._2, kv._2))
}
private lazy val defaultSqlOptions: Map[String, String] = {
val defaultSqlOptions: Map[String, String] = {
HoodieOptionConfig.getClass.getDeclaredFields
.filter(f => f.getType == classOf[HoodieOption[_]])
.map(f => {f.setAccessible(true); f.get(HoodieOptionConfig).asInstanceOf[HoodieOption[_]]})
.filter(f => f.getType == classOf[HoodieSQLOption[_]])
.map(f => {f.setAccessible(true); f.get(HoodieOptionConfig).asInstanceOf[HoodieSQLOption[_]]})
.filter(option => option.tableConfigKey.isDefined && option.defaultValue.isDefined)
.map(option => option.sqlKeyName -> option.defaultValue.get.toString)
.toMap
}
private lazy val defaultTableConfig: Map[String, String] = {
mappingSqlOptionToHoodieParam(defaultSqlOptions)
}
/**
* Get the primary key from the table options.
* @param options
@@ -189,76 +184,80 @@ object HoodieOptionConfig {
// extract primaryKey, preCombineField, type options
def extractSqlOptions(options: Map[String, String]): Map[String, String] = {
val sqlOptions = mappingTableConfigToSqlOption(options)
val targetOptions = keyMapping.keySet -- Set(SQL_PAYLOAD_CLASS.sqlKeyName)
options.filterKeys(targetOptions.contains)
sqlOptions.filterKeys(targetOptions.contains)
}
// validate primaryKey, preCombineField and type options
def validateTable(spark: SparkSession, schema: StructType, options: Map[String, String]): Unit = {
def validateTable(spark: SparkSession, schema: StructType, sqlOptions: Map[String, String]): Unit = {
val resolver = spark.sessionState.conf.resolver
// validate primary key
val primaryKeys = options.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName)
val primaryKeys = sqlOptions.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName)
.map(_.split(",").filter(_.length > 0))
ValidationUtils.checkArgument(primaryKeys.nonEmpty, "No `primaryKey` is specified.")
primaryKeys.get.foreach { primaryKey =>
ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, primaryKey)),
s"Can't find primary key `$primaryKey` in ${schema.treeString}.")
s"Can't find primaryKey `$primaryKey` in ${schema.treeString}.")
}
// validate precombine key
val precombineKey = options.get(SQL_KEY_PRECOMBINE_FIELD.sqlKeyName)
val precombineKey = sqlOptions.get(SQL_KEY_PRECOMBINE_FIELD.sqlKeyName)
if (precombineKey.isDefined && precombineKey.get.nonEmpty) {
ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, precombineKey.get)),
s"Can't find precombine key `${precombineKey.get}` in ${schema.treeString}.")
s"Can't find preCombineKey `${precombineKey.get}` in ${schema.treeString}.")
}
// validate table type
val tableType = options.get(SQL_KEY_TABLE_TYPE.sqlKeyName)
val tableType = sqlOptions.get(SQL_KEY_TABLE_TYPE.sqlKeyName)
ValidationUtils.checkArgument(tableType.nonEmpty, "No `type` is specified.")
ValidationUtils.checkArgument(
tableType.get.equalsIgnoreCase(HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_COW) ||
tableType.get.equalsIgnoreCase(HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_MOR),
s"'type' must be '${HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_COW}' or " +
s"'${HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_MOR}'")
tableType.get.equalsIgnoreCase(SQL_VALUE_TABLE_TYPE_COW) ||
tableType.get.equalsIgnoreCase(SQL_VALUE_TABLE_TYPE_MOR),
s"'type' must be '$SQL_VALUE_TABLE_TYPE_COW' or '$SQL_VALUE_TABLE_TYPE_MOR'")
}
def buildConf[T](): HoodieOptions[T] = {
new HoodieOptions[T]
def buildConf[T](): HoodieSQLOptionBuilder[T] = {
new HoodieSQLOptionBuilder[T]
}
}
case class HoodieOption[T](sqlKeyName: String, hoodieKeyName: String,
defaultValue: Option[T], tableConfigKey: Option[String] = None)
case class HoodieSQLOption[T](
sqlKeyName: String,
hoodieKeyName: String,
tableConfigKey: Option[String],
defaultValue: Option[T]
)
class HoodieOptions[T] {
class HoodieSQLOptionBuilder[T] {
private var sqlKeyName: String = _
private var hoodieKeyName: String =_
private var tableConfigKey: String =_
private var defaultValue: T =_
def withSqlKey(sqlKeyName: String): HoodieOptions[T] = {
def withSqlKey(sqlKeyName: String): HoodieSQLOptionBuilder[T] = {
this.sqlKeyName = sqlKeyName
this
}
def withHoodieKey(hoodieKeyName: String): HoodieOptions[T] = {
def withHoodieKey(hoodieKeyName: String): HoodieSQLOptionBuilder[T] = {
this.hoodieKeyName = hoodieKeyName
this
}
def withTableConfigKey(tableConfigKey: String): HoodieOptions[T] = {
def withTableConfigKey(tableConfigKey: String): HoodieSQLOptionBuilder[T] = {
this.tableConfigKey = tableConfigKey
this
}
def defaultValue(defaultValue: T): HoodieOptions[T] = {
def defaultValue(defaultValue: T): HoodieSQLOptionBuilder[T] = {
this.defaultValue = defaultValue
this
}
def build(): HoodieOption[T] = {
HoodieOption(sqlKeyName, hoodieKeyName, Option(defaultValue), Option(tableConfigKey))
def build(): HoodieSQLOption[T] = {
HoodieSQLOption(sqlKeyName, hoodieKeyName, Option(tableConfigKey), Option(defaultValue))
}
}

View File

@@ -80,13 +80,14 @@ object HoodieSqlUtils extends SparkAdapterSupport {
}
}
def getTableSqlSchema(metaClient: HoodieTableMetaClient): Option[StructType] = {
def getTableSqlSchema(metaClient: HoodieTableMetaClient,
includeMetadataFields: Boolean = false): Option[StructType] = {
val schemaResolver = new TableSchemaResolver(metaClient)
val avroSchema = try Some(schemaResolver.getTableAvroSchema(false))
val avroSchema = try Some(schemaResolver.getTableAvroSchema(includeMetadataFields))
catch {
case _: Throwable => None
}
avroSchema.map(AvroConversionUtils.convertAvroSchemaToStructType).map(removeMetaFields)
avroSchema.map(AvroConversionUtils.convertAvroSchemaToStructType)
}
def getAllPartitionPaths(spark: SparkSession, table: CatalogTable): Seq[String] = {
@@ -309,4 +310,21 @@ object HoodieSqlUtils extends SparkAdapterSupport {
+ s"Supported time format are: 'yyyy-MM-dd: HH:mm:ss' or 'yyyy-MM-dd' or 'yyyyMMddHHmmss'")
}
}
def formatName(sparkSession: SparkSession, name: String): String = {
if (sparkSession.sessionState.conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT)
}
/**
* Check if this is a empty table path.
*/
def isEmptyPath(tablePath: String, conf: Configuration): Boolean = {
val basePath = new Path(tablePath)
val fs = basePath.getFileSystem(conf)
if (fs.exists(basePath)) {
fs.listStatus(basePath).isEmpty
} else {
true
}
}
}

View File

@@ -18,26 +18,24 @@
package org.apache.spark.sql.hudi.command
import java.nio.charset.StandardCharsets
import org.apache.avro.Schema
import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieInstant.State
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant}
import org.apache.hudi.common.util.{CommitUtils, Option}
import org.apache.hudi.table.HoodieSparkTable
import scala.collection.JavaConverters._
import org.apache.hudi.{AvroConversionUtils, DataSourceUtils, HoodieWriterUtils}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}
import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand}
import org.apache.spark.sql.hudi.HoodieSqlUtils
import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.util.SchemaUtils
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
/**
@@ -51,31 +49,32 @@ case class AlterHoodieTableAddColumnsCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
if (colsToAdd.nonEmpty) {
val resolver = sparkSession.sessionState.conf.resolver
val table = sparkSession.sessionState.catalog.getTableMetadata(tableId)
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableId)
val tableSchema = hoodieCatalogTable.tableSchema
val existsColumns =
colsToAdd.map(_.name).filter(col => table.schema.fieldNames.exists(f => resolver(f, col)))
colsToAdd.map(_.name).filter(col => tableSchema.fieldNames.exists(f => resolver(f, col)))
if (existsColumns.nonEmpty) {
throw new AnalysisException(s"Columns: [${existsColumns.mkString(",")}] already exists in the table," +
s" table columns is: [${HoodieSqlUtils.removeMetaFields(table.schema).fieldNames.mkString(",")}]")
s" table columns is: [${hoodieCatalogTable.tableSchemaWithoutMetaFields.fieldNames.mkString(",")}]")
}
// Get the new schema
val newSqlSchema = StructType(table.schema.fields ++ colsToAdd)
val newSqlSchema = StructType(tableSchema.fields ++ colsToAdd)
val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableId.table)
val newSchema = AvroConversionUtils.convertStructTypeToAvroSchema(newSqlSchema, structName, nameSpace)
// Commit with new schema to change the table schema
AlterHoodieTableAddColumnsCommand.commitWithSchema(newSchema, table, sparkSession)
AlterHoodieTableAddColumnsCommand.commitWithSchema(newSchema, hoodieCatalogTable, sparkSession)
// Refresh the new schema to meta
val newDataSchema = StructType(table.dataSchema.fields ++ colsToAdd)
refreshSchemaInMeta(sparkSession, table, newDataSchema)
val newDataSchema = StructType(hoodieCatalogTable.dataSchema.fields ++ colsToAdd)
refreshSchemaInMeta(sparkSession, hoodieCatalogTable.table, newDataSchema)
}
Seq.empty[Row]
}
private def refreshSchemaInMeta(sparkSession: SparkSession, table: CatalogTable,
newSqlSchema: StructType): Unit = {
newSqlSchema: StructType): Unit = {
try {
sparkSession.catalog.uncacheTable(tableId.quotedString)
} catch {
@@ -98,25 +97,22 @@ object AlterHoodieTableAddColumnsCommand {
/**
* Generate an empty commit with new schema to change the table's schema.
* @param schema The new schema to commit.
* @param table The hoodie table.
* @param hoodieCatalogTable The hoodie catalog table.
* @param sparkSession The spark session.
*/
def commitWithSchema(schema: Schema, table: CatalogTable, sparkSession: SparkSession): Unit = {
val path = getTableLocation(table, sparkSession)
def commitWithSchema(schema: Schema, hoodieCatalogTable: HoodieCatalogTable,
sparkSession: SparkSession): Unit = {
val jsc = new JavaSparkContext(sparkSession.sparkContext)
val client = DataSourceUtils.createHoodieClient(
jsc,
schema.toString,
path,
table.identifier.table,
HoodieWriterUtils.parametersWithWriteDefaults(table.storage.properties ++ table.properties).asJava
hoodieCatalogTable.tableLocation,
hoodieCatalogTable.tableName,
HoodieWriterUtils.parametersWithWriteDefaults(hoodieCatalogTable.catalogProperties).asJava
)
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder().setBasePath(path).setConf(hadoopConf).build()
val commitActionType = CommitUtils.getCommitActionType(WriteOperationType.INSERT, metaClient.getTableType)
val commitActionType = CommitUtils.getCommitActionType(WriteOperationType.INSERT, hoodieCatalogTable.tableType)
val instantTime = HoodieActiveTimeline.createNewInstantTime
client.startCommitWithTime(instantTime, commitActionType)

View File

@@ -18,14 +18,16 @@
package org.apache.spark.sql.hudi.command
import org.apache.avro.Schema
import org.apache.hudi.AvroConversionUtils
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.exception.HoodieException
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation
import org.apache.spark.sql.types.{StructField, StructType}
import scala.util.control.NonFatal
@@ -34,22 +36,21 @@ import scala.util.control.NonFatal
* Command for alter hudi table's column type.
*/
case class AlterHoodieTableChangeColumnCommand(
tableName: TableIdentifier,
tableIdentifier: TableIdentifier,
columnName: String,
newColumn: StructField)
extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
val resolver = sparkSession.sessionState.conf.resolver
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)
val resolver = sparkSession.sessionState.conf.resolver
if (!resolver(columnName, newColumn.name)) {
throw new AnalysisException(s"Can not support change column name for hudi table currently.")
}
// Get the new schema
val newSqlSchema = StructType(
table.schema.fields.map { field =>
val newTableSchema = StructType(
hoodieCatalogTable.tableSchema.fields.map { field =>
if (resolver(field.name, columnName)) {
newColumn
} else {
@@ -57,34 +58,30 @@ case class AlterHoodieTableChangeColumnCommand(
}
})
val newDataSchema = StructType(
table.dataSchema.fields.map { field =>
hoodieCatalogTable.dataSchema.fields.map { field =>
if (resolver(field.name, columnName)) {
newColumn
} else {
field
}
})
val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName.table)
val newSchema = AvroConversionUtils.convertStructTypeToAvroSchema(newSqlSchema, structName, nameSpace)
val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableIdentifier.table)
val newSchema = AvroConversionUtils.convertStructTypeToAvroSchema(newTableSchema, structName, nameSpace)
val path = getTableLocation(table, sparkSession)
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
.setConf(hadoopConf).build()
// Validate the compatibility between new schema and origin schema.
validateSchema(newSchema, metaClient)
validateSchema(newSchema, hoodieCatalogTable.metaClient)
// Commit new schema to change the table schema
AlterHoodieTableAddColumnsCommand.commitWithSchema(newSchema, table, sparkSession)
AlterHoodieTableAddColumnsCommand.commitWithSchema(newSchema, hoodieCatalogTable, sparkSession)
try {
sparkSession.catalog.uncacheTable(tableName.quotedString)
sparkSession.catalog.uncacheTable(tableIdentifier.quotedString)
} catch {
case NonFatal(e) =>
log.warn(s"Exception when attempting to uncache table ${tableName.quotedString}", e)
log.warn(s"Exception when attempting to uncache table ${tableIdentifier.quotedString}", e)
}
sparkSession.catalog.refreshTable(tableName.unquotedString)
sparkSession.catalog.refreshTable(tableIdentifier.unquotedString)
// Change the schema in the meta using new data schema.
catalog.alterTableDataSchema(tableName, newDataSchema)
sparkSession.sessionState.catalog.alterTableDataSchema(tableIdentifier, newDataSchema)
Seq.empty[Row]
}

View File

@@ -17,15 +17,16 @@
package org.apache.spark.sql.hudi.command
import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils}
import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter}
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand}
import org.apache.spark.sql.hudi.HoodieSqlUtils._
@@ -35,24 +36,19 @@ case class AlterHoodieTableDropPartitionCommand(
extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableIdentifier)
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)
DDLUtils.verifyAlterTableType(
sparkSession.sessionState.catalog, hoodieCatalogTable.table, isView = false)
val path = getTableLocation(table, sparkSession)
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder().setBasePath(path).setConf(hadoopConf).build()
val partitionColumns = metaClient.getTableConfig.getPartitionFields
val normalizedSpecs: Seq[Map[String, String]] = specs.map { spec =>
normalizePartitionSpec(
spec,
partitionColumns.get(),
table.identifier.quotedString,
hoodieCatalogTable.partitionFields,
hoodieCatalogTable.tableName,
sparkSession.sessionState.conf.resolver)
}
val parameters = buildHoodieConfig(sparkSession, path, partitionColumns.get, normalizedSpecs)
val parameters = buildHoodieConfig(sparkSession, hoodieCatalogTable, normalizedSpecs)
HoodieSparkSqlWriter.write(
sparkSession.sqlContext,
SaveMode.Append,
@@ -65,15 +61,14 @@ extends RunnableCommand {
private def buildHoodieConfig(
sparkSession: SparkSession,
path: String,
partitionColumns: Seq[String],
hoodieCatalogTable: HoodieCatalogTable,
normalizedSpecs: Seq[Map[String, String]]): Map[String, String] = {
val table = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
val allPartitionPaths = getAllPartitionPaths(sparkSession, table)
val table = hoodieCatalogTable.table
val allPartitionPaths = hoodieCatalogTable.getAllPartitionPaths
val enableHiveStylePartitioning = isHiveStyledPartitioning(allPartitionPaths, table)
val enableEncodeUrl = isUrlEncodeEnabled(allPartitionPaths, table)
val partitionsToDelete = normalizedSpecs.map { spec =>
partitionColumns.map{ partitionColumn =>
hoodieCatalogTable.partitionFields.map{ partitionColumn =>
val encodedPartitionValue = if (enableEncodeUrl) {
PartitionPathEncodeUtils.escapePathName(spec(partitionColumn))
} else {
@@ -87,22 +82,16 @@ extends RunnableCommand {
}.mkString("/")
}.mkString(",")
val metaClient = HoodieTableMetaClient.builder()
.setBasePath(path)
.setConf(sparkSession.sessionState.newHadoopConf)
.build()
val tableConfig = metaClient.getTableConfig
withSparkConf(sparkSession, table.storage.properties) {
withSparkConf(sparkSession, Map.empty) {
Map(
"path" -> path,
TBL_NAME.key -> tableIdentifier.table,
TABLE_TYPE.key -> tableConfig.getTableType.name,
"path" -> hoodieCatalogTable.tableLocation,
TBL_NAME.key -> hoodieCatalogTable.tableName,
TABLE_TYPE.key -> hoodieCatalogTable.tableTypeName,
OPERATION.key -> DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL,
PARTITIONS_TO_DELETE.key -> partitionsToDelete,
RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp,
PRECOMBINE_FIELD.key -> tableConfig.getPreCombineField,
PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp
RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""),
PARTITIONPATH_FIELD.key -> hoodieCatalogTable.partitionFields.mkString(",")
)
}
}

View File

@@ -18,10 +18,11 @@
package org.apache.spark.sql.hudi.command
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.execution.command.AlterTableRenameCommand
import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation
/**
* Command for alter hudi table's table name.
@@ -34,18 +35,15 @@ class AlterHoodieTableRenameCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
if (newName != oldName) {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(oldName)
val path = getTableLocation(table, sparkSession)
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
.setConf(hadoopConf).build()
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, oldName)
// Init table with new name.
HoodieTableMetaClient.withPropertyBuilder()
.fromProperties(metaClient.getTableConfig.getProps(true))
.fromProperties(hoodieCatalogTable.tableConfig.getProps)
.setTableName(newName.table)
.initTable(hadoopConf, path)
.initTable(hadoopConf, hoodieCatalogTable.tableLocation)
// Call AlterTableRenameCommand#run to rename table in meta.
super.run(sparkSession)
}

View File

@@ -25,11 +25,11 @@ import org.apache.hudi.hive.util.ConfigUtils
import org.apache.hudi.sql.InsertMode
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, HoodieCatalogTable}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.DataWritingCommand
import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation
import org.apache.spark.sql.hudi.HoodieSqlUtils
import scala.collection.JavaConverters._
@@ -64,18 +64,23 @@ case class CreateHoodieTableAsSelectCommand(
// scalastyle:on
}
}
val tablePath = getTableLocation(table, sparkSession)
val hadoopConf = sparkSession.sessionState.newHadoopConf()
assert(CreateHoodieTableCommand.isEmptyPath(tablePath, hadoopConf),
s"Path '$tablePath' should be empty for CTAS")
// ReOrder the query which move the partition columns to the last of the project list
val reOrderedQuery = reOrderPartitionColumn(query, table.partitionColumnNames)
val tableWithSchema = table.copy(schema = reOrderedQuery.schema)
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableWithSchema)
val tablePath = hoodieCatalogTable.tableLocation
val hadoopConf = sparkSession.sessionState.newHadoopConf()
assert(HoodieSqlUtils.isEmptyPath(tablePath, hadoopConf),
s"Path '$tablePath' should be empty for CTAS")
// Execute the insert query
try {
val tblProperties = table.storage.properties ++ table.properties
// init hoodie table
hoodieCatalogTable.initHoodieTable()
val tblProperties = hoodieCatalogTable.catalogProperties
val options = Map(
DataSourceWriteOptions.HIVE_CREATE_MANAGED_TABLE.key -> (table.tableType == CatalogTableType.MANAGED).toString,
DataSourceWriteOptions.HIVE_TABLE_SERDE_PROPERTIES.key -> ConfigUtils.configToString(tblProperties.asJava),
@@ -89,11 +94,8 @@ case class CreateHoodieTableAsSelectCommand(
// If write success, create the table in catalog if it has not synced to the
// catalog by the meta sync.
if (!sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) {
// Create the table
val createTableCommand = CreateHoodieTableCommand(tableWithSchema, mode == SaveMode.Ignore)
val path = getTableLocation(table, sparkSession)
val (finalSchema, _, tableSqlOptions) = createTableCommand.parseSchemaAndConfigs(sparkSession, path, ctas = true)
createTableCommand.createTableInCatalog(sparkSession, finalSchema, tableSqlOptions)
// create catalog table for this hoodie table
CreateHoodieTableCommand.createTableInCatalog(sparkSession, hoodieCatalogTable, mode == SaveMode.Ignore)
}
} else { // failed to insert data, clear table path
clearTablePath(tablePath, hadoopConf)

View File

@@ -17,37 +17,27 @@
package org.apache.spark.sql.hudi.command
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.{DataSourceWriteOptions, HoodieWriterUtils, SparkAdapterSupport}
import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.common.config.DFSPropertiesConfiguration
import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
import org.apache.hudi.common.model.HoodieFileFormat
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.hadoop.HoodieParquetInputFormat
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.spark.internal.Logging
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.hive.HiveClientUtils
import org.apache.spark.sql.hive.HiveExternalCatalog._
import org.apache.spark.sql.hudi.HoodieOptionConfig
import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlUtils}
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.hudi.command.CreateHoodieTableCommand.checkTableConfigEqual
import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.{SPARK_VERSION, SparkConf}
import java.util.{Locale, Properties}
import scala.collection.JavaConversions.mapAsJavaMap
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.control.NonFatal
@@ -58,10 +48,6 @@ import scala.util.control.NonFatal
case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean)
extends RunnableCommand with SparkAdapterSupport {
val tableName = formatName(table.identifier.table)
val tblProperties = table.storage.properties ++ table.properties
override def run(sparkSession: SparkSession): Seq[Row] = {
val tableIsExists = sparkSession.sessionState.catalog.tableExists(table.identifier)
if (tableIsExists) {
@@ -74,94 +60,50 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
}
}
// get schema with meta fields, table config if hudi table exists, options including
// table configs and properties of the catalog table
val path = getTableLocation(table, sparkSession)
val (finalSchema, existingTableConfig, tableSqlOptions) = parseSchemaAndConfigs(sparkSession, path)
// Init the hoodie.properties
initTableIfNeed(sparkSession, path, finalSchema, existingTableConfig, tableSqlOptions)
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, table)
// check if there are conflict between table configs defined in hoodie table and properties defined in catalog.
CreateHoodieTableCommand.validateTblProperties(hoodieCatalogTable)
// init hoodie table
hoodieCatalogTable.initHoodieTable()
try {
// Create table in the catalog
createTableInCatalog(sparkSession, finalSchema, tableSqlOptions)
// create catalog table for this hoodie table
CreateHoodieTableCommand.createTableInCatalog(sparkSession, hoodieCatalogTable, ignoreIfExists)
} catch {
case NonFatal(e) =>
logWarning(s"Failed to create catalog table in metastore: ${e.getMessage}")
}
Seq.empty[Row]
}
}
def parseSchemaAndConfigs(sparkSession: SparkSession, path: String, ctas: Boolean = false)
: (StructType, Map[String, String], Map[String, String]) = {
val resolver = sparkSession.sessionState.conf.resolver
val conf = sparkSession.sessionState.newHadoopConf
// if CTAS, we treat the table we just created as nonexistent
val isTableExists = if (ctas) false else tableExistsInPath(path, conf)
var existingTableConfig = Map.empty[String, String]
val globalProps = DFSPropertiesConfiguration.getGlobalProps.asScala.toMap
val globalSqlProps = HoodieOptionConfig.mappingTableConfigToSqlOption(
HoodieWriterUtils.mappingSparkDatasourceConfigsToTableConfigs(globalProps))
val sqlOptions = HoodieOptionConfig.withDefaultSqlOptions(globalSqlProps ++ tblProperties)
val catalogTableProps = HoodieOptionConfig.mappingSqlOptionToTableConfig(tblProperties)
object CreateHoodieTableCommand {
// get final schema and parameters
val (finalSchema, tableSqlOptions) = (table.tableType, isTableExists) match {
case (CatalogTableType.EXTERNAL, true) =>
// If this is an external table & the table has already exists in the location,
// load the schema from the table meta.
val metaClient = HoodieTableMetaClient.builder()
.setBasePath(path)
.setConf(conf)
.build()
val tableSchema = getTableSqlSchema(metaClient)
existingTableConfig = metaClient.getTableConfig.getProps.asScala.toMap
validateTableConfig(sparkSession, catalogTableProps, convertMapToHoodieConfig(existingTableConfig))
def validateTblProperties(hoodieCatalogTable: HoodieCatalogTable): Unit = {
if (hoodieCatalogTable.hoodieTableExists) {
val originTableConfig = hoodieCatalogTable.tableConfig.getProps.asScala.toMap
val tableOptions = hoodieCatalogTable.catalogProperties
val options = extraTableConfig(sparkSession, isTableExists, existingTableConfig) ++
sqlOptions ++ HoodieOptionConfig.mappingTableConfigToSqlOption(existingTableConfig)
val userSpecifiedSchema = table.schema
val schema = if (userSpecifiedSchema.isEmpty && tableSchema.isDefined) {
tableSchema.get
} else if (userSpecifiedSchema.nonEmpty) {
userSpecifiedSchema
} else {
throw new IllegalArgumentException(s"Missing schema for Create Table: $tableName")
}
(addMetaFields(schema), options)
case (_, false) =>
assert(table.schema.nonEmpty, s"Missing schema for Create Table: $tableName")
val schema = table.schema
val options = extraTableConfig(sparkSession, isTableExists = false) ++ sqlOptions
(addMetaFields(schema), options)
case (CatalogTableType.MANAGED, true) =>
throw new AnalysisException(s"Can not create the managed table('$tableName')" +
s". The associated location('$path') already exists.")
checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.PRECOMBINE_FIELD.key)
checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.PARTITION_FIELDS.key)
checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.RECORDKEY_FIELDS.key)
checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)
checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.URL_ENCODE_PARTITIONING.key)
checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)
}
HoodieOptionConfig.validateTable(sparkSession, finalSchema, tableSqlOptions)
val dataSchema = finalSchema.filterNot { f =>
table.partitionColumnNames.exists(resolver(_, f.name))
}
verifyDataSchema(table.identifier, table.tableType, dataSchema)
(finalSchema, existingTableConfig, tableSqlOptions)
}
def createTableInCatalog(sparkSession: SparkSession, finalSchema: StructType,
options: Map[String, String]): Unit = {
def createTableInCatalog(sparkSession: SparkSession,
hoodieCatalogTable: HoodieCatalogTable, ignoreIfExists: Boolean): Unit = {
val table = hoodieCatalogTable.table
assert(table.tableType != CatalogTableType.VIEW)
assert(table.provider.isDefined)
val sessionState = sparkSession.sessionState
val path = getTableLocation(table, sparkSession)
val conf = sparkSession.sessionState.newHadoopConf()
val tableType = HoodieOptionConfig.getTableType(options)
val catalog = sparkSession.sessionState.catalog
val path = hoodieCatalogTable.tableLocation
val tableConfig = hoodieCatalogTable.tableConfig
val properties = tableConfig.getProps.asScala.toMap
val tableType = tableConfig.getTableType.name()
val inputFormat = tableType match {
case DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL =>
classOf[HoodieParquetInputFormat].getCanonicalName
@@ -173,7 +115,7 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
val serdeFormat = HoodieInputFormatUtils.getSerDeClassName(HoodieFileFormat.PARQUET)
// only parameters irrelevant to hudi can be set to storage.properties
val storageProperties = HoodieOptionConfig.deleteHoodieOptions(options)
val storageProperties = HoodieOptionConfig.deleteHoodieOptions(properties)
val newStorage = new CatalogStorageFormat(
Some(new Path(path).toUri),
Some(inputFormat),
@@ -182,17 +124,18 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
table.storage.compressed,
storageProperties + ("path" -> path))
val newDatabaseName = formatName(table.identifier.database
.getOrElse(sessionState.catalog.getCurrentDatabase))
val tablName = HoodieSqlUtils.formatName(sparkSession, table.identifier.table)
val newDatabaseName = HoodieSqlUtils.formatName(sparkSession, table.identifier.database
.getOrElse(catalog.getCurrentDatabase))
val newTableIdentifier = table.identifier
.copy(table = tableName, database = Some(newDatabaseName))
.copy(table = tablName, database = Some(newDatabaseName))
// append pk, preCombineKey, type to the properties of table
val newTblProperties = table.storage.properties ++ table.properties ++ HoodieOptionConfig.extractSqlOptions(options)
val newTblProperties = hoodieCatalogTable.catalogProperties ++ HoodieOptionConfig.extractSqlOptions(properties)
val newTable = table.copy(
identifier = newTableIdentifier,
schema = finalSchema,
schema = hoodieCatalogTable.tableSchema,
storage = newStorage,
createVersion = SPARK_VERSION,
properties = newTblProperties
@@ -201,9 +144,9 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
// Create table in the catalog
val enableHive = isEnableHive(sparkSession)
if (enableHive) {
createHiveDataSourceTable(newTable, sparkSession)
createHiveDataSourceTable(sparkSession, newTable, ignoreIfExists)
} else {
sessionState.catalog.createTable(newTable, ignoreIfExists = false, validateLocation = false)
catalog.createTable(newTable, ignoreIfExists = false, validateLocation = false)
}
}
@@ -215,7 +158,8 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
* @param table
* @param sparkSession
*/
private def createHiveDataSourceTable(table: CatalogTable, sparkSession: SparkSession): Unit = {
private def createHiveDataSourceTable(sparkSession: SparkSession, table: CatalogTable,
ignoreIfExists: Boolean): Unit = {
val dbName = table.identifier.database.get
// check database
val dbExists = sparkSession.sessionState.catalog.databaseExists(dbName)
@@ -237,43 +181,6 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
client.createTable(tableWithDataSourceProps, ignoreIfExists)
}
private def formatName(name: String): String = {
if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT)
}
// This code is forked from org.apache.spark.sql.hive.HiveExternalCatalog#verifyDataSchema
private def verifyDataSchema(tableName: TableIdentifier, tableType: CatalogTableType,
dataSchema: Seq[StructField]): Unit = {
if (tableType != CatalogTableType.VIEW) {
val invalidChars = Seq(",", ":", ";")
def verifyNestedColumnNames(schema: StructType): Unit = schema.foreach { f =>
f.dataType match {
case st: StructType => verifyNestedColumnNames(st)
case _ if invalidChars.exists(f.name.contains) =>
val invalidCharsString = invalidChars.map(c => s"'$c'").mkString(", ")
val errMsg = "Cannot create a table having a nested column whose name contains " +
s"invalid characters ($invalidCharsString) in Hive metastore. Table: $tableName; " +
s"Column: ${f.name}"
throw new AnalysisException(errMsg)
case _ =>
}
}
dataSchema.foreach { f =>
f.dataType match {
// Checks top-level column names
case _ if f.name.contains(",") =>
throw new AnalysisException("Cannot create a table having a column whose name " +
s"contains commas in Hive metastore. Table: $tableName; Column: ${f.name}")
// Checks nested column names
case st: StructType =>
verifyNestedColumnNames(st)
case _ =>
}
}
}
}
// This code is forked from org.apache.spark.sql.hive.HiveExternalCatalog#tableMetaToTableProps
private def tableMetaToTableProps(sparkConf: SparkConf, table: CatalogTable,
schema: StructType): Map[String, String] = {
@@ -323,93 +230,15 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
properties.toMap
}
def extraTableConfig(sparkSession: SparkSession, isTableExists: Boolean,
originTableConfig: Map[String, String] = Map.empty): Map[String, String] = {
val extraConfig = mutable.Map.empty[String, String]
if (isTableExists) {
val allPartitionPaths = getAllPartitionPaths(sparkSession, table)
if (originTableConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)) {
extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) =
originTableConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)
} else {
extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) =
String.valueOf(isHiveStyledPartitioning(allPartitionPaths, table))
}
if (originTableConfig.contains(HoodieTableConfig.URL_ENCODE_PARTITIONING.key)) {
extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) =
originTableConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key)
} else {
extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) =
String.valueOf(isUrlEncodeEnabled(allPartitionPaths, table))
}
} else {
extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) = "true"
extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) = HoodieTableConfig.URL_ENCODE_PARTITIONING.defaultValue
}
if (originTableConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) {
extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) =
HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(
originTableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key))
} else {
extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = classOf[ComplexKeyGenerator].getCanonicalName
}
extraConfig.toMap
}
/**
* Init the hoodie.properties.
*/
def initTableIfNeed(sparkSession: SparkSession,
location: String,
schema: StructType,
private def checkTableConfigEqual(
originTableConfig: Map[String, String],
sqlOptions: Map[String, String]): Unit = {
logInfo(s"Init hoodie.properties for $tableName")
val conf = sparkSession.sessionState.newHadoopConf()
val tableOptions = HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions)
checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.PRECOMBINE_FIELD.key)
checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.PARTITION_FIELDS.key)
checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.RECORDKEY_FIELDS.key)
checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)
checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.URL_ENCODE_PARTITIONING.key)
checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)
// Save all the table config to the hoodie.properties.
val parameters = HoodieWriterUtils.mappingSparkDatasourceConfigsToTableConfigs(originTableConfig ++ tableOptions)
val properties = new Properties()
properties.putAll(parameters.asJava)
HoodieTableMetaClient.withPropertyBuilder()
.fromProperties(properties)
.setTableName(tableName)
.setTableCreateSchema(SchemaConverters.toAvroType(schema).toString())
.setPartitionFields(table.partitionColumnNames.mkString(","))
.initTable(conf, location)
}
}
object CreateHoodieTableCommand extends Logging {
def checkTableConfigEqual(originTableConfig: Map[String, String],
newTableConfig: Map[String, String], configKey: String): Unit = {
newTableConfig: Map[String, String],
configKey: String): Unit = {
if (originTableConfig.contains(configKey) && newTableConfig.contains(configKey)) {
assert(originTableConfig(configKey) == newTableConfig(configKey),
s"Table config: $configKey in the create table is: ${newTableConfig(configKey)}, is not the same with the value in " +
s"hoodie.properties, which is: ${originTableConfig(configKey)}. Please keep the same.")
}
}
/**
* Check if this is a empty table path.
*/
def isEmptyPath(tablePath: String, conf: Configuration): Boolean = {
val basePath = new Path(tablePath)
val fs = basePath.getFileSystem(conf)
if (fs.exists(basePath)) {
fs.listStatus(basePath).isEmpty
} else {
true
}
}
}

View File

@@ -17,14 +17,14 @@
package org.apache.spark.sql.hudi.command
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.DataSourceWriteOptions.{OPERATION, _}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.hudi.HoodieSqlUtils._
@@ -57,26 +57,20 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Runnab
}
private def buildHoodieConfig(sparkSession: SparkSession): Map[String, String] = {
val targetTable = sparkSession.sessionState.catalog.getTableMetadata(tableId)
val tblProperties = targetTable.storage.properties ++ targetTable.properties
val path = getTableLocation(targetTable, sparkSession)
val conf = sparkSession.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder()
.setBasePath(path)
.setConf(conf)
.build()
val tableConfig = metaClient.getTableConfig
val tableSchema = getTableSqlSchema(metaClient).get
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableId)
val path = hoodieCatalogTable.tableLocation
val tableConfig = hoodieCatalogTable.tableConfig
val tableSchema = hoodieCatalogTable.tableSchema
val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase)
val partitionSchema = StructType(tableSchema.filter(f => partitionColumns.contains(f.name)))
val primaryColumns = tableConfig.getRecordKeyFields.get()
assert(primaryColumns.nonEmpty,
s"There are no primary key defined in table $tableId, cannot execute delete operator")
withSparkConf(sparkSession, tblProperties) {
withSparkConf(sparkSession, hoodieCatalogTable.catalogProperties) {
Map(
"path" -> path,
TBL_NAME.key -> tableId.table,
TBL_NAME.key -> tableConfig.getTableName,
HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitoning,
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,

View File

@@ -22,7 +22,6 @@ import org.apache.avro.generic.{GenericRecord, IndexedRecord}
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
@@ -34,15 +33,13 @@ import org.apache.hudi.sql.InsertMode
import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlUtils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
import java.util.Properties
@@ -53,10 +50,10 @@ import scala.collection.JavaConverters._
* Command for insert into hoodie table.
*/
case class InsertIntoHoodieTableCommand(
logicalRelation: LogicalRelation,
query: LogicalPlan,
partition: Map[String, Option[String]],
overwrite: Boolean)
logicalRelation: LogicalRelation,
query: LogicalPlan,
partition: Map[String, Option[String]],
overwrite: Boolean)
extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
@@ -83,14 +80,18 @@ object InsertIntoHoodieTableCommand extends Logging {
* @param refreshTable Whether to refresh the table after insert finished.
* @param extraOptions Extra options for insert.
*/
def run(sparkSession: SparkSession, table: CatalogTable, query: LogicalPlan,
insertPartitions: Map[String, Option[String]],
overwrite: Boolean, refreshTable: Boolean = true,
extraOptions: Map[String, String] = Map.empty): Boolean = {
def run(sparkSession: SparkSession,
table: CatalogTable,
query: LogicalPlan,
insertPartitions: Map[String, Option[String]],
overwrite: Boolean,
refreshTable: Boolean = true,
extraOptions: Map[String, String] = Map.empty): Boolean = {
val config = buildHoodieInsertConfig(table, sparkSession, overwrite, insertPartitions, extraOptions)
val hoodieCatalogTable = new HoodieCatalogTable(sparkSession, table)
val config = buildHoodieInsertConfig(hoodieCatalogTable, sparkSession, overwrite, insertPartitions, extraOptions)
val mode = if (overwrite && table.partitionColumnNames.isEmpty) {
val mode = if (overwrite && hoodieCatalogTable.partitionFields.isEmpty) {
// insert overwrite non-partition table
SaveMode.Overwrite
} else {
@@ -98,7 +99,7 @@ object InsertIntoHoodieTableCommand extends Logging {
SaveMode.Append
}
val conf = sparkSession.sessionState.conf
val alignedQuery = alignOutputFields(query, table, insertPartitions, conf)
val alignedQuery = alignOutputFields(query, hoodieCatalogTable, insertPartitions, conf)
// If we create dataframe using the Dataset.ofRows(sparkSession, alignedQuery),
// The nullable attribute of fields will lost.
// In order to pass the nullable attribute to the inputDF, we specify the schema
@@ -120,18 +121,18 @@ object InsertIntoHoodieTableCommand extends Logging {
/**
* Aligned the type and name of query's output fields with the result table's fields.
* @param query The insert query which to aligned.
* @param table The result table.
* @param hoodieCatalogTable The result hoodie catalog table.
* @param insertPartitions The insert partition map.
* @param conf The SQLConf.
* @return
*/
private def alignOutputFields(
query: LogicalPlan,
table: CatalogTable,
hoodieCatalogTable: HoodieCatalogTable,
insertPartitions: Map[String, Option[String]],
conf: SQLConf): LogicalPlan = {
val targetPartitionSchema = table.partitionSchema
val targetPartitionSchema = hoodieCatalogTable.partitionSchema
val staticPartitionValues = insertPartitions.filter(p => p._2.isDefined).mapValues(_.get)
assert(staticPartitionValues.isEmpty ||
@@ -139,20 +140,22 @@ object InsertIntoHoodieTableCommand extends Logging {
s"Required partition columns is: ${targetPartitionSchema.json}, Current static partitions " +
s"is: ${staticPartitionValues.mkString("," + "")}")
assert(staticPartitionValues.size + query.output.size == table.schema.size,
s"Required select columns count: ${removeMetaFields(table.schema).size}, " +
val queryOutputWithoutMetaFields = removeMetaFields(query.output)
assert(staticPartitionValues.size + queryOutputWithoutMetaFields.size
== hoodieCatalogTable.tableSchemaWithoutMetaFields.size,
s"Required select columns count: ${hoodieCatalogTable.tableSchemaWithoutMetaFields.size}, " +
s"Current select columns(including static partition column) count: " +
s"${staticPartitionValues.size + removeMetaFields(query.output).size}columns: " +
s"(${(removeMetaFields(query.output).map(_.name) ++ staticPartitionValues.keys).mkString(",")})")
val queryDataFields = if (staticPartitionValues.isEmpty) { // insert dynamic partition
query.output.dropRight(targetPartitionSchema.fields.length)
s"${staticPartitionValues.size + queryOutputWithoutMetaFields.size}columns: " +
s"(${(queryOutputWithoutMetaFields.map(_.name) ++ staticPartitionValues.keys).mkString(",")})")
val queryDataFieldsWithoutMetaFields = if (staticPartitionValues.isEmpty) { // insert dynamic partition
queryOutputWithoutMetaFields.dropRight(targetPartitionSchema.fields.length)
} else { // insert static partition
query.output
queryOutputWithoutMetaFields
}
val targetDataSchema = table.dataSchema
// Align for the data fields of the query
val dataProjects = queryDataFields.zip(targetDataSchema.fields).map {
case (dataAttr, targetField) =>
val dataProjectsWithputMetaFields = queryDataFieldsWithoutMetaFields.zip(
hoodieCatalogTable.dataSchemaWithoutMetaFields.fields).map { case (dataAttr, targetField) =>
val castAttr = castIfNeeded(dataAttr.withNullability(targetField.nullable),
targetField.dataType, conf)
Alias(castAttr, targetField.name)()
@@ -161,9 +164,9 @@ object InsertIntoHoodieTableCommand extends Logging {
val partitionProjects = if (staticPartitionValues.isEmpty) { // insert dynamic partitions
// The partition attributes is followed the data attributes in the query
// So we init the partitionAttrPosition with the data schema size.
var partitionAttrPosition = targetDataSchema.size
var partitionAttrPosition = hoodieCatalogTable.dataSchemaWithoutMetaFields.size
targetPartitionSchema.fields.map(f => {
val partitionAttr = query.output(partitionAttrPosition)
val partitionAttr = queryOutputWithoutMetaFields(partitionAttrPosition)
partitionAttrPosition = partitionAttrPosition + 1
val castAttr = castIfNeeded(partitionAttr.withNullability(f.nullable), f.dataType, conf)
Alias(castAttr, f.name)()
@@ -176,9 +179,7 @@ object InsertIntoHoodieTableCommand extends Logging {
Alias(castAttr, f.name)()
})
}
// Remove the hoodie meta fields from the projects as we do not need these to write
val withoutMetaFieldDataProjects = dataProjects.filter(c => !HoodieSqlUtils.isMetaField(c.name))
val alignedProjects = withoutMetaFieldDataProjects ++ partitionProjects
val alignedProjects = dataProjectsWithputMetaFields ++ partitionProjects
Project(alignedProjects, query)
}
@@ -187,65 +188,44 @@ object InsertIntoHoodieTableCommand extends Logging {
* @return
*/
private def buildHoodieInsertConfig(
table: CatalogTable,
hoodieCatalogTable: HoodieCatalogTable,
sparkSession: SparkSession,
isOverwrite: Boolean,
insertPartitions: Map[String, Option[String]] = Map.empty,
extraOptions: Map[String, String]): Map[String, String] = {
if (insertPartitions.nonEmpty &&
(insertPartitions.keys.toSet != table.partitionColumnNames.toSet)) {
(insertPartitions.keys.toSet != hoodieCatalogTable.partitionFields.toSet)) {
throw new IllegalArgumentException(s"Insert partition fields" +
s"[${insertPartitions.keys.mkString(" " )}]" +
s" not equal to the defined partition in table[${table.partitionColumnNames.mkString(",")}]")
}
val path = getTableLocation(table, sparkSession)
val conf = sparkSession.sessionState.newHadoopConf()
val isTableExists = tableExistsInPath(path, conf)
val (tableConfig, tableSchema) = if (isTableExists) {
val metaClient = HoodieTableMetaClient.builder()
.setBasePath(path)
.setConf(conf)
.build()
(metaClient.getTableConfig, getTableSqlSchema(metaClient).get)
} else {
(new HoodieTableConfig(), table.schema)
}
val partitionColumns = tableConfig.getPartitionFieldProp
val partitionSchema = if (null == partitionColumns || partitionColumns.isEmpty) {
table.partitionSchema
} else {
StructType(tableSchema.filter(f => partitionColumns.contains(f.name)))
s" not equal to the defined partition in table[${hoodieCatalogTable.partitionFields.mkString(",")}]")
}
val path = hoodieCatalogTable.tableLocation
val tableType = hoodieCatalogTable.tableTypeName
val tableConfig = hoodieCatalogTable.tableConfig
val tableSchema = hoodieCatalogTable.tableSchema
val options = table.storage.properties ++ table.properties ++ tableConfig.getProps.asScala.toMap ++ extraOptions
val options = hoodieCatalogTable.catalogProperties ++ tableConfig.getProps.asScala.toMap ++ extraOptions
val parameters = withSparkConf(sparkSession, options)()
val tableName = Option(tableConfig.getTableName).getOrElse(table.identifier.table)
val tableType = Option(tableConfig.getTableType.name).getOrElse(TABLE_TYPE.defaultValue)
val primaryColumns = tableConfig.getRecordKeyFields.orElse(HoodieOptionConfig.getPrimaryColumns(options))
val preCombineColumn = Option(tableConfig.getPreCombineField)
.getOrElse(HoodieOptionConfig.getPreCombineField(options).getOrElse(""))
val partitionFields = Option(tableConfig.getPartitionFieldProp)
.getOrElse(table.partitionColumnNames.mkString(","))
val preCombineColumn = hoodieCatalogTable.preCombineKey.getOrElse("")
val partitionFields = hoodieCatalogTable.partitionFields.mkString(",")
val hiveStylePartitioningEnable = Option(tableConfig.getHiveStylePartitioningEnable).getOrElse("true")
val urlEncodePartitioning = Option(tableConfig.getUrlEncodePartitoning).getOrElse("false")
val keyGeneratorClassName = Option(tableConfig.getKeyGeneratorClassName)
.getOrElse(classOf[ComplexKeyGenerator].getCanonicalName)
val dropDuplicate = sparkSession.conf
.getOption(INSERT_DROP_DUPS.key)
.getOrElse(INSERT_DROP_DUPS.defaultValue)
.toBoolean
val enableBulkInsert = parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
val hasPrecombineColumn = preCombineColumn.nonEmpty
val isPartitionedTable = table.partitionColumnNames.nonEmpty
val dropDuplicate = sparkSession.conf
.getOption(INSERT_DROP_DUPS.key).getOrElse(INSERT_DROP_DUPS.defaultValue).toBoolean
val insertMode = InsertMode.of(parameters.getOrElse(DataSourceWriteOptions.SQL_INSERT_MODE.key,
DataSourceWriteOptions.SQL_INSERT_MODE.defaultValue()))
val isNonStrictMode = insertMode == InsertMode.NON_STRICT
val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty
val hasPrecombineColumn = preCombineColumn.nonEmpty
val operation =
(enableBulkInsert, isOverwrite, dropDuplicate, isNonStrictMode, isPartitionedTable) match {
case (true, _, _, false, _) =>
@@ -284,14 +264,14 @@ object InsertIntoHoodieTableCommand extends Logging {
Map(
"path" -> path,
TABLE_TYPE.key -> tableType,
TBL_NAME.key -> tableName,
TBL_NAME.key -> hoodieCatalogTable.tableName,
PRECOMBINE_FIELD.key -> preCombineColumn,
OPERATION.key -> operation,
HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning,
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> keyGeneratorClassName,
RECORDKEY_FIELD.key -> primaryColumns.mkString(","),
RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
PARTITIONPATH_FIELD.key -> partitionFields,
PAYLOAD_CLASS_NAME.key -> payloadClassName,
ENABLE_ROW_WRITER.key -> enableBulkInsert.toString,
@@ -299,14 +279,14 @@ object InsertIntoHoodieTableCommand extends Logging {
META_SYNC_ENABLED.key -> enableHive.toString,
HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
HIVE_USE_JDBC.key -> "false",
HIVE_DATABASE.key -> table.identifier.database.getOrElse("default"),
HIVE_TABLE.key -> table.identifier.table,
HIVE_DATABASE.key -> hoodieCatalogTable.table.identifier.database.getOrElse("default"),
HIVE_TABLE.key -> hoodieCatalogTable.table.identifier.table,
HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
HIVE_PARTITION_FIELDS.key -> partitionFields,
HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "200",
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
SqlKeyGenerator.PARTITION_SCHEMA -> table.partitionSchema.toDDL
SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL
)
}
}
@@ -324,7 +304,7 @@ class ValidateDuplicateKeyPayload(record: GenericRecord, orderingVal: Comparable
}
override def combineAndGetUpdateValue(currentValue: IndexedRecord,
schema: Schema, properties: Properties): HOption[IndexedRecord] = {
schema: Schema, properties: Properties): HOption[IndexedRecord] = {
val key = currentValue.asInstanceOf[GenericRecord].get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
throw new HoodieDuplicateKeyException(key)
}

View File

@@ -19,26 +19,29 @@ package org.apache.spark.sql.hudi.command
import org.apache.avro.Schema
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils, SparkAdapterSupport}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, Cast, EqualTo, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
import org.apache.spark.sql.hudi.{HoodieOptionConfig, SerDeUtils}
import org.apache.spark.sql.hudi.SerDeUtils
import org.apache.spark.sql.types.{BooleanType, StructType}
import java.util.Base64
/**
* The Command for hoodie MergeIntoTable.
* The match on condition must contain the row key fields currently, so that we can use Hoodie
@@ -78,12 +81,9 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
private lazy val targetTableSchemaWithoutMetaFields =
removeMetaFields(mergeInto.targetTable.schema).fields
private lazy val targetTable =
sparkSession.sessionState.catalog.getTableMetadata(targetTableIdentify)
private lazy val hoodieCatalogTable = HoodieCatalogTable(sparkSession, targetTableIdentify)
private lazy val tblProperties = targetTable.storage.properties ++ targetTable.properties
private lazy val targetTableType = HoodieOptionConfig.getTableType(tblProperties)
private lazy val targetTableType = hoodieCatalogTable.tableTypeName
/**
*
@@ -126,7 +126,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
assert(updateActions.size <= 1, s"Only support one updateAction currently, current update action count is: ${updateActions.size}")
val updateAction = updateActions.headOption
HoodieOptionConfig.getPreCombineField(tblProperties).map(preCombineField => {
hoodieCatalogTable.preCombineKey.map(preCombineField => {
val sourcePreCombineField =
updateAction.map(u => u.assignments.filter {
case Assignment(key: AttributeReference, _) => key.name.equalsIgnoreCase(preCombineField)
@@ -149,9 +149,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
this.sparkSession = sparkSession
// Create the write parameters
val parameters = buildMergeIntoConfig(mergeInto)
val sourceDF = buildSourceDF(sparkSession)
val parameters = buildMergeIntoConfig(hoodieCatalogTable)
if (mergeInto.matchedActions.nonEmpty) { // Do the upsert
executeUpsert(sourceDF, parameters)
@@ -180,7 +178,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
* row key and pre-combine field.
*
*/
private def buildSourceDF(sparkSession: SparkSession): DataFrame = {
private lazy val sourceDF: DataFrame = {
var sourceDF = Dataset.ofRows(sparkSession, mergeInto.sourceTable)
targetKey2SourceExpression.foreach {
case (targetColumn, sourceExpression)
@@ -429,33 +427,24 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
/**
* Create the config for hoodie writer.
* @param mergeInto
* @return
*/
private def buildMergeIntoConfig(mergeInto: MergeIntoTable): Map[String, String] = {
private def buildMergeIntoConfig(hoodieCatalogTable: HoodieCatalogTable): Map[String, String] = {
val targetTableDb = targetTableIdentify.database.getOrElse("default")
val targetTableName = targetTableIdentify.identifier
val path = getTableLocation(targetTable, sparkSession)
val conf = sparkSession.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder()
.setBasePath(path)
.setConf(conf)
.build()
val tableConfig = metaClient.getTableConfig
val tableSchema = getTableSqlSchema(metaClient).get
val path = hoodieCatalogTable.tableLocation
val tableConfig = hoodieCatalogTable.tableConfig
val tableSchema = hoodieCatalogTable.tableSchema
val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase)
val partitionSchema = StructType(tableSchema.filter(f => partitionColumns.contains(f.name)))
val options = tblProperties
val preCombineColumn = Option(tableConfig.getPreCombineField).getOrElse("")
// Enable the hive sync by default if spark have enable the hive metastore.
val enableHive = isEnableHive(sparkSession)
withSparkConf(sparkSession, options) {
withSparkConf(sparkSession, hoodieCatalogTable.catalogProperties) {
Map(
"path" -> path,
RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp,
PRECOMBINE_FIELD.key -> preCombineColumn,
PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""),
TBL_NAME.key -> targetTableName,
PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName,

View File

@@ -17,22 +17,22 @@
package org.apache.spark.sql.hudi.command
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.types.StringType
/**
* Command for show hudi table's partitions.
*/
case class ShowHoodieTablePartitionsCommand(
tableName: TableIdentifier,
tableIdentifier: TableIdentifier,
specOpt: Option[TablePartitionSpec])
extends RunnableCommand {
@@ -41,28 +41,17 @@ extends RunnableCommand {
}
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val resolver = sparkSession.sessionState.conf.resolver
val catalogTable = catalog.getTableMetadata(tableName)
val tablePath = getTableLocation(catalogTable, sparkSession)
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder().setBasePath(tablePath)
.setConf(hadoopConf).build()
val schemaOpt = getTableSqlSchema(metaClient)
val partitionColumnNamesOpt = metaClient.getTableConfig.getPartitionFields
if (partitionColumnNamesOpt.isPresent && partitionColumnNamesOpt.get.nonEmpty
&& schemaOpt.isDefined && schemaOpt.nonEmpty) {
val partitionColumnNames = partitionColumnNamesOpt.get
val schema = schemaOpt.get
val allPartitionPaths: Seq[String] = getAllPartitionPaths(sparkSession, catalogTable)
val schemaOpt = hoodieCatalogTable.tableSchema
val partitionColumnNamesOpt = hoodieCatalogTable.tableConfig.getPartitionFields
if (partitionColumnNamesOpt.isPresent && partitionColumnNamesOpt.get.nonEmpty && schemaOpt.nonEmpty) {
if (specOpt.isEmpty) {
allPartitionPaths.map(Row(_))
hoodieCatalogTable.getAllPartitionPaths.map(Row(_))
} else {
val spec = specOpt.get
allPartitionPaths.filter { partitionPath =>
hoodieCatalogTable.getAllPartitionPaths.filter { partitionPath =>
val part = PartitioningUtils.parsePathFragment(partitionPath)
spec.forall { case (col, value) =>
PartitionPathEncodeUtils.escapePartitionValue(value) == part.getOrElse(col, null)

View File

@@ -18,45 +18,37 @@
package org.apache.spark.sql.hudi.command
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.execution.command.TruncateTableCommand
import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation
/**
* Command for truncate hudi table.
*/
class TruncateHoodieTableCommand(
tableName: TableIdentifier,
tableIdentifier: TableIdentifier,
partitionSpec: Option[TablePartitionSpec])
extends TruncateTableCommand(tableName, partitionSpec) {
extends TruncateTableCommand(tableIdentifier, partitionSpec) {
override def run(sparkSession: SparkSession): Seq[Row] = {
val table = sparkSession.sessionState.catalog.getTableMetadata(tableName)
val path = getTableLocation(table, sparkSession)
val hadoopConf = sparkSession.sessionState.newHadoopConf()
// If we have not specified the partition, truncate will delete all the
// data in the table path include the hoodi.properties. In this case we
// should reInit the table.
val needReInitTable = partitionSpec.isEmpty
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)
val properties = hoodieCatalogTable.tableConfig.getProps
val tablePath = hoodieCatalogTable.tableLocation
val tableProperties = if (needReInitTable) {
// Create MetaClient
val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
.setConf(hadoopConf).build()
Some(metaClient.getTableConfig.getProps(true))
} else {
None
}
// Delete all data in the table directory
super.run(sparkSession)
if (tableProperties.isDefined) {
// If we have not specified the partition, truncate will delete all the data in the table path
// include the hoodi.properties. In this case we should reInit the table.
if (partitionSpec.isEmpty) {
val hadoopConf = sparkSession.sessionState.newHadoopConf()
// ReInit hoodie.properties
HoodieTableMetaClient.withPropertyBuilder()
.fromProperties(tableProperties.get)
.initTable(hadoopConf, path)
.fromProperties(properties)
.initTable(hadoopConf, hoodieCatalogTable.tableLocation)
}
Seq.empty[Row]
}

View File

@@ -20,17 +20,18 @@ package org.apache.spark.sql.hudi.command
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, UpdateTable}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StructField, StructType}
import scala.collection.JavaConverters._
@@ -43,9 +44,7 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo
override def run(sparkSession: SparkSession): Seq[Row] = {
logInfo(s"start execute update command for $tableId")
def cast(exp:Expression, field: StructField): Expression = {
castIfNeeded(exp, field.dataType, sparkSession.sqlContext.conf)
}
val sqlConf = sparkSession.sessionState.conf
val name2UpdateValue = updateTable.assignments.map {
case Assignment(attr: AttributeReference, value) =>
attr.name -> value
@@ -61,9 +60,9 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo
val projects = updateExpressions.zip(removeMetaFields(table.schema).fields).map {
case (attr: AttributeReference, field) =>
Column(cast(attr, field))
Column(cast(attr, field, sqlConf))
case (exp, field) =>
Column(Alias(cast(exp, field), field.name)())
Column(Alias(cast(exp, field, sqlConf), field.name)())
}
var df = Dataset.ofRows(sparkSession, table)
@@ -83,30 +82,21 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo
}
private def buildHoodieConfig(sparkSession: SparkSession): Map[String, String] = {
val targetTable = sparkSession.sessionState.catalog.getTableMetadata(tableId)
val tblProperties = targetTable.storage.properties ++ targetTable.properties
val path = getTableLocation(targetTable, sparkSession)
val conf = sparkSession.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder()
.setBasePath(path)
.setConf(conf)
.build()
val tableConfig = metaClient.getTableConfig
val tableSchema = getTableSqlSchema(metaClient).get
val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase)
val partitionSchema = StructType(tableSchema.filter(f => partitionColumns.contains(f.name)))
val primaryColumns = tableConfig.getRecordKeyFields.get()
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableId)
val catalogProperties = hoodieCatalogTable.catalogProperties
val tableConfig = hoodieCatalogTable.tableConfig
val preCombineColumn = Option(tableConfig.getPreCombineField).getOrElse("")
assert(primaryColumns.nonEmpty,
assert(hoodieCatalogTable.primaryKeys.nonEmpty,
s"There are no primary key in table $tableId, cannot execute update operator")
val enableHive = isEnableHive(sparkSession)
withSparkConf(sparkSession, tblProperties) {
withSparkConf(sparkSession, catalogProperties) {
Map(
"path" -> path,
RECORDKEY_FIELD.key -> primaryColumns.mkString(","),
"path" -> hoodieCatalogTable.tableLocation,
RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
PRECOMBINE_FIELD.key -> preCombineColumn,
TBL_NAME.key -> tableId.table,
TBL_NAME.key -> hoodieCatalogTable.tableName,
HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitoning,
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
@@ -122,8 +112,12 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo
HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL
SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL
)
}
}
def cast(exp:Expression, field: StructField, sqlConf: SQLConf): Expression = {
castIfNeeded(exp, field.dataType, sqlConf)
}
}

View File

@@ -47,7 +47,7 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
spark.sql(s"""insert into $tableName values (1, "z3", "v1", "2021-10-01"), (2, "l4", "v1", "2021-10-02")""")
checkExceptionContain(s"alter table $tableName drop partition (dt='2021-10-01')")(
s"dt is not a valid partition column in table `default`.`$tableName`.")
s"dt is not a valid partition column in table")
}
Seq(false, true).foreach { urlencode =>

View File

@@ -138,7 +138,7 @@ class TestHoodieOptionConfig extends HoodieClientTestBase {
val e2 = intercept[IllegalArgumentException] {
HoodieOptionConfig.validateTable(spark, schema, sqlOptions2)
}
assertTrue(e2.getMessage.contains("Can't find primary key"))
assertTrue(e2.getMessage.contains("Can't find primaryKey"))
// preCombine field not found
val sqlOptions3 = baseSqlOptions ++ Map(
@@ -149,7 +149,7 @@ class TestHoodieOptionConfig extends HoodieClientTestBase {
val e3 = intercept[IllegalArgumentException] {
HoodieOptionConfig.validateTable(spark, schema, sqlOptions3)
}
assertTrue(e3.getMessage.contains("Can't find precombine key"))
assertTrue(e3.getMessage.contains("Can't find preCombineKey"))
// miss type parameter
val sqlOptions4 = baseSqlOptions ++ Map(

View File

@@ -17,7 +17,6 @@
package org.apache.spark.sql.hudi
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.exception.HoodieDuplicateKeyException
@@ -265,10 +264,6 @@ class TestInsertTable extends TestHoodieSqlBase {
test("Test insert for uppercase table name") {
withTempDir{ tmp =>
val tableName = s"H_$generateTableName"
HoodieTableMetaClient.withPropertyBuilder()
.setTableName(tableName)
.setTableType(HoodieTableType.COPY_ON_WRITE.name())
.initTable(spark.sessionState.newHadoopConf(), tmp.getCanonicalPath)
spark.sql(
s"""
@@ -285,6 +280,11 @@ class TestInsertTable extends TestHoodieSqlBase {
checkAnswer(s"select id, name, price from $tableName")(
Seq(1, "a1", 10.0)
)
val metaClient = HoodieTableMetaClient.builder()
.setBasePath(tmp.getCanonicalPath)
.setConf(spark.sessionState.newHadoopConf())
.build()
assertResult(metaClient.getTableConfig.getTableName)(tableName)
}
}