diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala index a5098d6dc..00133abca 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -26,7 +26,6 @@ import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_REA import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.HoodieROTablePathFilter -import org.apache.hudi.hive.util.ConfigUtils import org.apache.log4j.LogManager import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.execution.datasources.{DataSource, FileStatusCache, HadoopFsRelation} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala index c49ffbd17..2f28c66ea 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala @@ -126,12 +126,9 @@ object HoodieOptionConfig { /** * Mapping the table config (loaded from the hoodie.properties) to the sql options. - * @param options - * @return */ def mappingTableConfigToSqlOption(options: Map[String, String]): Map[String, String] = { - options.filterKeys(k => tableConfigKeyToSqlKey.contains(k)) - .map(kv => tableConfigKeyToSqlKey(kv._1) -> reverseValueMapping.getOrElse(kv._2, kv._2)) + options.map(kv => tableConfigKeyToSqlKey.getOrElse(kv._1, kv._1) -> reverseValueMapping.getOrElse(kv._2, kv._2)) } private lazy val defaultTableConfig: Map[String, String] = { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala index 99ae226c8..e2c622e51 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala @@ -24,7 +24,9 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hudi.SparkAdapterSupport import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.spark.SPARK_VERSION +import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.{Column, DataFrame, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -64,6 +66,16 @@ object HoodieSqlUtils extends SparkAdapterSupport { } } + def getTableSqlSchema(metaClient: HoodieTableMetaClient): Option[StructType] = { + val schemaResolver = new TableSchemaResolver(metaClient) + val avroSchema = try Some(schemaResolver.getTableAvroSchema(false)) + catch { + case _: Throwable => None + } + avroSchema.map(SchemaConverters.toSqlType(_).dataType + .asInstanceOf[StructType]) + } + private def tripAlias(plan: LogicalPlan): LogicalPlan = { plan match { case SubqueryAlias(_, relation: LogicalPlan) => @@ -122,12 +134,12 @@ object HoodieSqlUtils extends SparkAdapterSupport { * @param spark * @return */ - def getTableLocation(tableId: TableIdentifier, spark: SparkSession): Option[String] = { + def getTableLocation(tableId: TableIdentifier, spark: SparkSession): String = { val table = spark.sessionState.catalog.getTableMetadata(tableId) getTableLocation(table, spark) } - def getTableLocation(table: CatalogTable, sparkSession: SparkSession): Option[String] = { + def getTableLocation(table: CatalogTable, sparkSession: SparkSession): String = { val uri = if (table.tableType == CatalogTableType.MANAGED && isHoodieTable(table)) { Some(sparkSession.sessionState.catalog.defaultTablePath(table.identifier)) } else { @@ -136,6 +148,7 @@ object HoodieSqlUtils extends SparkAdapterSupport { val conf = sparkSession.sessionState.newHadoopConf() uri.map(makePathQualified(_, conf)) .map(removePlaceHolder) + .getOrElse(throw new IllegalArgumentException(s"Missing location for ${table.identifier}")) } private def removePlaceHolder(path: String): String = { @@ -154,6 +167,16 @@ object HoodieSqlUtils extends SparkAdapterSupport { fs.makeQualified(hadoopPath).toUri.toString } + /** + * Check if the hoodie.properties exists in the table path. + */ + def tableExistsInPath(tablePath: String, conf: Configuration): Boolean = { + val basePath = new Path(tablePath) + val fs = basePath.getFileSystem(conf) + val metaPath = new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME) + fs.exists(metaPath) + } + def castIfNeeded(child: Expression, dataType: DataType, conf: SQLConf): Expression = { child match { case Literal(nul, NullType) => Literal(nul, dataType) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala index 8ab8e8b0c..4c15670a4 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala @@ -21,7 +21,8 @@ import org.apache.hudi.DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL import org.apache.hudi.SparkAdapterSupport import scala.collection.JavaConverters._ -import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType} +import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.analysis.UnresolvedStar import org.apache.spark.sql.catalyst.expressions.AttributeReference @@ -286,7 +287,28 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi } else { l } - + // Fill schema for Create Table without specify schema info + case c @ CreateTable(tableDesc, _, _) + if isHoodieTable(tableDesc) => + val tablePath = getTableLocation(c.tableDesc, sparkSession) + if (tableExistsInPath(tablePath, sparkSession.sessionState.newHadoopConf())) { + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(tablePath) + .setConf(sparkSession.sessionState.newHadoopConf()) + .build() + val tableSchema = HoodieSqlUtils.getTableSqlSchema(metaClient).map(HoodieSqlUtils.addMetaFields) + if (tableSchema.isDefined && tableDesc.schema.isEmpty) { + // Fill the schema with the schema from the table + c.copy(tableDesc.copy(schema = tableSchema.get)) + } else if (tableSchema.isDefined && tableDesc.schema != tableSchema.get) { + throw new AnalysisException(s"Specified schema in create table statement is not equal to the table schema." + + s"You should not specify the schema for an exist table: ${tableDesc.identifier} ") + } else { + c + } + } else { + c + } case p => p } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala index c5a6f845a..4123ea949 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala @@ -103,7 +103,6 @@ object AlterHoodieTableAddColumnsCommand { */ def commitWithSchema(schema: Schema, table: CatalogTable, sparkSession: SparkSession): Unit = { val path = getTableLocation(table, sparkSession) - .getOrElse(s"missing location for ${table.identifier}") val jsc = new JavaSparkContext(sparkSession.sparkContext) val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala index d38b98c7a..d9569ceb5 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala @@ -68,7 +68,6 @@ case class AlterHoodieTableChangeColumnCommand( val newSchema = AvroConversionUtils.convertStructTypeToAvroSchema(newSqlSchema, structName, nameSpace) val path = getTableLocation(table, sparkSession) - .getOrElse(s"missing location for ${table.identifier}") val hadoopConf = sparkSession.sessionState.newHadoopConf() val metaClient = HoodieTableMetaClient.builder().setBasePath(path) .setConf(hadoopConf).build() diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableRenameCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableRenameCommand.scala index e4732996a..2df9ec869 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableRenameCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableRenameCommand.scala @@ -37,7 +37,6 @@ class AlterHoodieTableRenameCommand( val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(oldName) val path = getTableLocation(table, sparkSession) - .getOrElse(s"missing location for ${table.identifier}") val hadoopConf = sparkSession.sessionState.newHadoopConf() val metaClient = HoodieTableMetaClient.builder().setBasePath(path) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala index 5fdfefb2d..631504d51 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala @@ -31,7 +31,6 @@ case class CompactionHoodieTableCommand(table: CatalogTable, override def run(sparkSession: SparkSession): Seq[Row] = { val basePath = getTableLocation(table, sparkSession) - .getOrElse(s"missing location for ${table.identifier}") CompactionHoodiePathCommand(basePath, operation, instantTimestamp).run(sparkSession) } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala index e06bc1f1f..0702e6bc2 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala @@ -29,7 +29,6 @@ case class CompactionShowHoodieTableCommand(table: CatalogTable, limit: Int) override def run(sparkSession: SparkSession): Seq[Row] = { val basePath = getTableLocation(table, sparkSession) - .getOrElse(s"missing location for ${table.identifier}") CompactionShowHoodiePathCommand(basePath, limit).run(sparkSession) } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala index dce34b377..a156d9f70 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala @@ -62,7 +62,6 @@ case class CreateHoodieTableAsSelectCommand( } } val tablePath = getTableLocation(table, sparkSession) - .getOrElse(s"Missing path for table ${table.identifier}") val conf = sparkSession.sessionState.newHadoopConf() assert(CreateHoodieTableCommand.isEmptyPath(tablePath, conf), s"Path '$tablePath' should be empty for CTAS") diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala index 462dfcf99..12a67bb80 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala @@ -19,16 +19,19 @@ package org.apache.spark.sql.hudi.command import scala.collection.JavaConverters._ import java.util.{Locale, Properties} - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport} import org.apache.hudi.common.model.HoodieFileFormat -import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.util.ValidationUtils import org.apache.hudi.hadoop.HoodieParquetInputFormat import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils +import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.{SPARK_VERSION, SparkConf} import org.apache.spark.internal.Logging import org.apache.spark.sql.avro.SchemaConverters @@ -40,8 +43,7 @@ import org.apache.spark.sql.hive.HiveClientUtils import org.apache.spark.sql.hive.HiveExternalCatalog._ import org.apache.spark.sql.hudi.HoodieSqlUtils._ import org.apache.spark.sql.hudi.HoodieOptionConfig -import org.apache.spark.sql.hudi.command.CreateHoodieTableCommand.{initTableIfNeed, tableExistsInPath, isEmptyPath} -import org.apache.spark.sql.internal.StaticSQLConf +import org.apache.spark.sql.hudi.command.CreateHoodieTableCommand.{initTableIfNeed, isEmptyPath} import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{AnalysisException, Row, SparkSession} @@ -50,8 +52,6 @@ import scala.collection.mutable /** * Command for create hoodie table. - * @param table - * @param ignoreIfExists */ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean) extends RunnableCommand with SparkAdapterSupport { @@ -83,7 +83,6 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean val sessionState = sparkSession.sessionState val tableName = table.identifier.unquotedString val path = getTableLocation(table, sparkSession) - .getOrElse(s"Missing path for table ${table.identifier}") val conf = sparkSession.sessionState.newHadoopConf() val isTableExists = tableExistsInPath(path, conf) // Get the schema & table options @@ -95,26 +94,44 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean .setBasePath(path) .setConf(conf) .build() - val schemaResolver = new TableSchemaResolver(metaClient) - val avroSchema = try Some(schemaResolver.getTableAvroSchema(false)) - catch { - case _: Throwable => None - } - val tableSchema = avroSchema.map(SchemaConverters.toSqlType(_).dataType - .asInstanceOf[StructType]) + val tableSchema = getTableSqlSchema(metaClient) - // Get options from the external table and append with the options in ddl. - val options = HoodieOptionConfig.mappingTableConfigToSqlOption( - metaClient.getTableConfig.getProps.asScala.toMap) ++ table.storage.properties + // Get options from the external table and append with the options in ddl. + val originTableConfig = HoodieOptionConfig.mappingTableConfigToSqlOption( + metaClient.getTableConfig.getProps.asScala.toMap) - val userSpecifiedSchema = table.schema - if (userSpecifiedSchema.isEmpty && tableSchema.isDefined) { + val allPartitionPaths = getAllPartitionPaths(sparkSession, table) + var upgrateConfig = Map.empty[String, String] + // If this is a non-hive-styled partition table, disable the hive style config. + // (By default this config is enable for spark sql) + upgrateConfig = if (isNotHiveStyledPartitionTable(allPartitionPaths, table)) { + upgrateConfig + (DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "false") + } else { + upgrateConfig + } + upgrateConfig = if (isUrlEncodeDisable(allPartitionPaths, table)) { + upgrateConfig + (DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key -> "false") + } else { + upgrateConfig + } + + // Use the origin keygen to generate record key to keep the rowkey consistent with the old table for spark sql. + // See SqlKeyGenerator#getRecordKey for detail. + upgrateConfig = if (originTableConfig.contains(HoodieTableConfig.HOODIE_TABLE_KEY_GENERATOR_CLASS.key)) { + upgrateConfig + (SqlKeyGenerator.ORIGIN_KEYGEN_CLASS -> originTableConfig(HoodieTableConfig.HOODIE_TABLE_KEY_GENERATOR_CLASS.key)) + } else { + upgrateConfig + } + val options = originTableConfig ++ upgrateConfig ++ table.storage.properties + + val userSpecifiedSchema = table.schema + if (userSpecifiedSchema.isEmpty && tableSchema.isDefined) { (addMetaFields(tableSchema.get), options) - } else if (userSpecifiedSchema.nonEmpty) { + } else if (userSpecifiedSchema.nonEmpty) { (addMetaFields(userSpecifiedSchema), options) - } else { + } else { throw new IllegalArgumentException(s"Missing schema for Create Table: $tableName") - } + } } else { assert(table.schema.nonEmpty, s"Missing schema for Create Table: $tableName") // SPARK-19724: the default location of a managed table should be non-existent or empty. @@ -301,47 +318,102 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean s"'${HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_MOR}'") } } + + private def getAllPartitionPaths(spark: SparkSession, table: CatalogTable): Seq[String] = { + val sparkEngine = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)) + val metadataConfig = { + val properties = new Properties() + properties.putAll((spark.sessionState.conf.getAllConfs ++ table.storage.properties).asJava) + HoodieMetadataConfig.newBuilder.fromProperties(properties).build() + } + FSUtils.getAllPartitionPaths(sparkEngine, metadataConfig, getTableLocation(table, spark)).asScala + } + + /** + * This method is used to compatible with the old non-hive-styled partition table. + * By default we enable the "hoodie.datasource.write.hive_style_partitioning" + * when writing data to hudi table by spark sql by default. + * If the exist table is a non-hive-styled partitioned table, we should + * disable the "hoodie.datasource.write.hive_style_partitioning" when + * merge or update the table. Or else, we will get an incorrect merge result + * as the partition path mismatch. + */ + private def isNotHiveStyledPartitionTable(partitionPaths: Seq[String], table: CatalogTable): Boolean = { + if (table.partitionColumnNames.nonEmpty) { + val isHiveStylePartitionPath = (path: String) => { + val fragments = path.split("/") + if (fragments.size != table.partitionColumnNames.size) { + false + } else { + fragments.zip(table.partitionColumnNames).forall { + case (pathFragment, partitionColumn) => pathFragment.startsWith(s"$partitionColumn=") + } + } + } + !partitionPaths.forall(isHiveStylePartitionPath) + } else { + false + } + } + + /** + * If this table has disable the url encode, spark sql should also disable it when writing to the table. + */ + private def isUrlEncodeDisable(partitionPaths: Seq[String], table: CatalogTable): Boolean = { + if (table.partitionColumnNames.nonEmpty) { + !partitionPaths.forall(partitionPath => partitionPath.split("/").length == table.partitionColumnNames.size) + } else { + false + } + } + } object CreateHoodieTableCommand extends Logging { /** - * Init the table if it is not exists. - * @param sparkSession - * @param table - * @return + * Init the hoodie.properties. */ def initTableIfNeed(sparkSession: SparkSession, table: CatalogTable): Unit = { - val location = getTableLocation(table, sparkSession).getOrElse( - throw new IllegalArgumentException(s"Missing location for ${table.identifier}")) + val location = getTableLocation(table, sparkSession) val conf = sparkSession.sessionState.newHadoopConf() // Init the hoodie table - if (!tableExistsInPath(location, conf)) { - val tableName = table.identifier.table - logInfo(s"Table $tableName is not exists, start to create the hudi table") + val originTableConfig = if (tableExistsInPath(location, conf)) { + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(location) + .setConf(conf) + .build() + metaClient.getTableConfig.getProps.asScala.toMap + } else { + Map.empty[String, String] + } - // Save all the table config to the hoodie.properties. - val parameters = HoodieOptionConfig.mappingSqlOptionToTableConfig(table.storage.properties) - val properties = new Properties() + val tableName = table.identifier.table + logInfo(s"Init hoodie.properties for $tableName") + val tableOptions = HoodieOptionConfig.mappingSqlOptionToTableConfig(table.storage.properties) + checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD_PROP.key) + checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.HOODIE_TABLE_PARTITION_FIELDS_PROP.key) + checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.HOODIE_TABLE_RECORDKEY_FIELDS.key) + // Save all the table config to the hoodie.properties. + val parameters = originTableConfig ++ tableOptions + val properties = new Properties() properties.putAll(parameters.asJava) HoodieTableMetaClient.withPropertyBuilder() - .fromProperties(properties) - .setTableName(tableName) - .setTableCreateSchema(SchemaConverters.toAvroType(table.schema).toString()) - .setPartitionFields(table.partitionColumnNames.mkString(",")) - .initTable(conf, location) - } + .fromProperties(properties) + .setTableName(tableName) + .setTableCreateSchema(SchemaConverters.toAvroType(table.schema).toString()) + .setPartitionFields(table.partitionColumnNames.mkString(",")) + .initTable(conf, location) } - /** - * Check if the hoodie.properties exists in the table path. - */ - def tableExistsInPath(tablePath: String, conf: Configuration): Boolean = { - val basePath = new Path(tablePath) - val fs = basePath.getFileSystem(conf) - val metaPath = new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME) - fs.exists(metaPath) + def checkTableConfigEqual(originTableConfig: Map[String, String], + newTableConfig: Map[String, String], configKey: String): Unit = { + if (originTableConfig.contains(configKey) && newTableConfig.contains(configKey)) { + assert(originTableConfig(configKey) == newTableConfig(configKey), + s"Table config: $configKey in the create table is: ${newTableConfig(configKey)}, is not the same with the value in " + + s"hoodie.properties, which is: ${originTableConfig(configKey)}. Please keep the same.") + } } /** diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala index cfebe1e56..cca9e1521 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala @@ -59,7 +59,6 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Runnab val targetTable = sparkSession.sessionState.catalog .getTableMetadata(tableId) val path = getTableLocation(targetTable, sparkSession) - .getOrElse(s"missing location for $tableId") val primaryColumns = HoodieOptionConfig.getPrimaryColumns(targetTable.storage.properties) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index 9782ec5fd..cfd66096d 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -193,7 +193,6 @@ object InsertIntoHoodieTableCommand { val partitionFields = table.partitionColumnNames.mkString(",") val path = getTableLocation(table, sparkSession) - .getOrElse(s"Missing location for table ${table.identifier}") val tableSchema = table.schema val options = table.storage.properties diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index 5146868ff..70b1a6003 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -416,7 +416,6 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab val targetTableDb = targetTableIdentify.database.getOrElse("default") val targetTableName = targetTableIdentify.identifier val path = getTableLocation(targetTable, sparkSession) - .getOrElse(s"missing location for $targetTableIdentify") val options = targetTable.storage.properties val definedPk = HoodieOptionConfig.getPrimaryColumns(options) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala index fe0a7e1e2..3b5b33cc6 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala @@ -18,10 +18,10 @@ package org.apache.spark.sql.hudi.command import java.util.concurrent.TimeUnit.{MICROSECONDS, MILLISECONDS} - import org.apache.avro.generic.GenericRecord import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.util.PartitionPathEncodeUtils +import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.keygen.{ComplexKeyGenerator, KeyGenUtils} import org.apache.spark.sql.types.{StructType, TimestampType} import org.joda.time.format.DateTimeFormat @@ -40,45 +40,62 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props) None } } + // The origin key generator class for this table. + private lazy val originKeyGen = { + val beforeKeyGenClassName = props.getString(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS, null) + if (beforeKeyGenClassName != null) { + val keyGenProps = new TypedProperties() + keyGenProps.putAll(props) + keyGenProps.remove(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS) + keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP.key, beforeKeyGenClassName) + Some(KeyGenUtils.createKeyGeneratorByClassName(keyGenProps)) + } else { + None + } + } - override def getPartitionPath(record: GenericRecord): String = { + override def getRecordKey(record: GenericRecord): String = { + if (originKeyGen.isDefined) { + originKeyGen.get.getKey(record).getRecordKey + } else { + super.getRecordKey(record) + } + } + + override def getPartitionPath(record: GenericRecord) = { val partitionPath = super.getPartitionPath(record) if (partitionSchema.isDefined) { // we can split the partitionPath here because we enable the URL_ENCODE_PARTITIONING_OPT // by default for sql. val partitionFragments = partitionPath.split(KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR) - assert(partitionFragments.size == partitionSchema.get.size) + // If it is a table which is not write by spark sql before and the url encode has disabled, + // the partition path level may not equal to the partition schema size. Just return the partitionPath + // in this case. + if (partitionFragments.size != partitionSchema.get.size) { + partitionPath + } else { + partitionFragments.zip(partitionSchema.get.fields).map { + case (partitionValue, partitionField) => + val hiveStylePrefix = s"${partitionField.name}=" + val isHiveStyle = partitionValue.startsWith(hiveStylePrefix) + val _partitionValue = if (isHiveStyle) partitionValue.substring(hiveStylePrefix.length) else partitionValue - partitionFragments.zip(partitionSchema.get.fields).map { - case (partitionValue, partitionField) => - val hiveStylePrefix = s"${partitionField.name}=" - val isHiveStyle = partitionValue.startsWith(hiveStylePrefix) - val _partitionValue = if (isHiveStyle) { - partitionValue.substring(hiveStylePrefix.length) - } else { - partitionValue - } - - partitionField.dataType match { - case TimestampType => - val timeMs = MILLISECONDS.convert(_partitionValue.toLong, MICROSECONDS) - val timestampFormat = PartitionPathEncodeUtils.escapePathName( - SqlKeyGenerator.timestampTimeFormat.print(timeMs)) - if (isHiveStyle) { - s"$hiveStylePrefix$timestampFormat" - } else { - timestampFormat - } - case _=> partitionValue - } - }.mkString(KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR) - } else { - partitionPath - } + partitionField.dataType match { + case TimestampType => + val timeMs = MILLISECONDS.convert(_partitionValue.toLong, MICROSECONDS) + val timestampFormat = PartitionPathEncodeUtils.escapePathName( + SqlKeyGenerator.timestampTimeFormat.print(timeMs)) + if (isHiveStyle) s"$hiveStylePrefix$timestampFormat" else timestampFormat + case _ => partitionValue + } + }.mkString(KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR) + } + } else partitionPath } } object SqlKeyGenerator { val PARTITION_SCHEMA = "hoodie.sql.partition.schema" + val ORIGIN_KEYGEN_CLASS = "hoodie.sql.origin.keygen.class" private val timestampTimeFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss") } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala index 02624fae7..339f4b52c 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala @@ -35,7 +35,6 @@ class TruncateHoodieTableCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val table = sparkSession.sessionState.catalog.getTableMetadata(tableName) val path = getTableLocation(table, sparkSession) - .getOrElse(s"missing location for ${table.identifier}") val hadoopConf = sparkSession.sessionState.newHadoopConf() // If we have not specified the partition, truncate will delete all the // data in the table path include the hoodi.properties. In this case we diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala index a11f21f07..c0b0d7847 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala @@ -85,7 +85,6 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo val targetTable = sparkSession.sessionState.catalog .getTableMetadata(tableId) val path = getTableLocation(targetTable, sparkSession) - .getOrElse(s"missing location for $tableId") val primaryColumns = HoodieOptionConfig.getPrimaryColumns(targetTable.storage.properties) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala index 51bf8d49c..f75af6109 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala @@ -17,9 +17,15 @@ package org.apache.spark.sql.hudi +import org.apache.hudi.DataSourceWriteOptions._ + import scala.collection.JavaConverters._ import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat +import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator} +import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTableType import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField} @@ -272,4 +278,231 @@ class TestCreateTable extends TestHoodieSqlBase { ) } } + + test("Test Create Table From Exist Hoodie Table") { + withTempDir { tmp => + Seq("2021-08-02", "2021/08/02").foreach { partitionValue => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + import spark.implicits._ + val df = Seq((1, "a1", 10, 1000, partitionValue)).toDF("id", "name", "value", "ts", "dt") + // Write a table by spark dataframe. + df.write.format("hudi") + .option(HoodieWriteConfig.TABLE_NAME.key, tableName) + .option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL) + .option(RECORDKEY_FIELD.key, "id") + .option(PRECOMBINE_FIELD.key, "ts") + .option(PARTITIONPATH_FIELD.key, "dt") + .option(KEYGENERATOR_CLASS.key, classOf[SimpleKeyGenerator].getName) + .option(HoodieWriteConfig.INSERT_PARALLELISM.key, "1") + .option(HoodieWriteConfig.UPSERT_PARALLELISM.key, "1") + .mode(SaveMode.Overwrite) + .save(tablePath) + + // Create a table over the exist old table. + spark.sql( + s""" + |create table $tableName using hudi + | options ( + | primaryKey = 'id', + | preCombineField = 'ts' + |) + |partitioned by (dt) + |location '$tablePath' + |""".stripMargin) + checkAnswer(s"select id, name, value, ts, dt from $tableName")( + Seq(1, "a1", 10, 1000, partitionValue) + ) + // Check the missing properties for spark sql + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(tablePath) + .setConf(spark.sessionState.newHadoopConf()) + .build() + val properties = metaClient.getTableConfig.getProps.asScala.toMap + assertResult(true)(properties.contains(HoodieTableConfig.HOODIE_TABLE_CREATE_SCHEMA.key)) + assertResult("dt")(properties(HoodieTableConfig.HOODIE_TABLE_PARTITION_FIELDS_PROP.key)) + assertResult("ts")(properties(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD_PROP.key)) + + // Test insert into + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, '$partitionValue')") + checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, dt from $tableName order by id")( + Seq("1", partitionValue, 1, "a1", 10, 1000, partitionValue), + Seq("2", partitionValue, 2, "a2", 10, 1000, partitionValue) + ) + // Test merge into + spark.sql( + s""" + |merge into $tableName h0 + |using (select 1 as id, 'a1' as name, 11 as value, 1001 as ts, '$partitionValue' as dt) s0 + |on h0.id = s0.id + |when matched then update set * + |""".stripMargin) + checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, dt from $tableName order by id")( + Seq("1", partitionValue, 1, "a1", 11, 1001, partitionValue), + Seq("2", partitionValue, 2, "a2", 10, 1000, partitionValue) + ) + // Test update + spark.sql(s"update $tableName set value = value + 1 where id = 2") + checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, dt from $tableName order by id")( + Seq("1", partitionValue, 1, "a1", 11, 1001, partitionValue), + Seq("2", partitionValue, 2, "a2", 11, 1000, partitionValue) + ) + // Test delete + spark.sql(s"delete from $tableName where id = 1") + checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, dt from $tableName order by id")( + Seq("2", partitionValue, 2, "a2", 11, 1000, partitionValue) + ) + } + } + } + + test("Test Create Table From Exist Hoodie Table For Multi-Level Partitioned Table") { + withTempDir { tmp => + Seq("2021-08-02", "2021/08/02").foreach { day => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + import spark.implicits._ + val df = Seq((1, "a1", 10, 1000, day, 12)).toDF("id", "name", "value", "ts", "day", "hh") + // Write a table by spark dataframe. + df.write.format("hudi") + .option(HoodieWriteConfig.TABLE_NAME.key, tableName) + .option(TABLE_TYPE.key, MOR_TABLE_TYPE_OPT_VAL) + .option(RECORDKEY_FIELD.key, "id") + .option(PRECOMBINE_FIELD.key, "ts") + .option(PARTITIONPATH_FIELD.key, "day,hh") + .option(KEYGENERATOR_CLASS.key, classOf[ComplexKeyGenerator].getName) + .option(HoodieWriteConfig.INSERT_PARALLELISM.key, "1") + .option(HoodieWriteConfig.UPSERT_PARALLELISM.key, "1") + .mode(SaveMode.Overwrite) + .save(tablePath) + + // Create a table over the exist old table. + spark.sql( + s""" + |create table $tableName using hudi + | options ( + | primaryKey = 'id', + | preCombineField = 'ts' + |) + |partitioned by (day, hh) + |location '$tablePath' + |""".stripMargin) + checkAnswer(s"select id, name, value, ts, day, hh from $tableName")( + Seq(1, "a1", 10, 1000, day, 12) + ) + // Check the missing properties for spark sql + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(tablePath) + .setConf(spark.sessionState.newHadoopConf()) + .build() + val properties = metaClient.getTableConfig.getProps.asScala.toMap + assertResult(true)(properties.contains(HoodieTableConfig.HOODIE_TABLE_CREATE_SCHEMA.key)) + assertResult("day,hh")(properties(HoodieTableConfig.HOODIE_TABLE_PARTITION_FIELDS_PROP.key)) + assertResult("ts")(properties(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD_PROP.key)) + + // Test insert into + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, '$day', 12)") + checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, day, hh from $tableName order by id")( + Seq("id:1", s"$day/12", 1, "a1", 10, 1000, day, 12), + Seq("id:2", s"$day/12", 2, "a2", 10, 1000, day, 12) + ) + // Test merge into + spark.sql( + s""" + |merge into $tableName h0 + |using (select 1 as id, 'a1' as name, 11 as value, 1001 as ts, '$day' as day, 12 as hh) s0 + |on h0.id = s0.id + |when matched then update set * + |""".stripMargin) + checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, day, hh from $tableName order by id")( + Seq("id:1", s"$day/12", 1, "a1", 11, 1001, day, 12), + Seq("id:2", s"$day/12", 2, "a2", 10, 1000, day, 12) + ) + // Test update + spark.sql(s"update $tableName set value = value + 1 where id = 2") + checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, day, hh from $tableName order by id")( + Seq("id:1", s"$day/12", 1, "a1", 11, 1001, day, 12), + Seq("id:2", s"$day/12", 2, "a2", 11, 1000, day, 12) + ) + // Test delete + spark.sql(s"delete from $tableName where id = 1") + checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, day, hh from $tableName order by id")( + Seq("id:2", s"$day/12", 2, "a2", 11, 1000, day, 12) + ) + } + } + } + + test("Test Create Table From Exist Hoodie Table For None Partitioned Table") { + withTempDir{tmp => + // Write a table by spark dataframe. + val tableName = generateTableName + import spark.implicits._ + val df = Seq((1, "a1", 10, 1000)).toDF("id", "name", "value", "ts") + df.write.format("hudi") + .option(HoodieWriteConfig.TABLE_NAME.key, tableName) + .option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL) + .option(RECORDKEY_FIELD.key, "id") + .option(PRECOMBINE_FIELD.key, "ts") + .option(PARTITIONPATH_FIELD.key, "") + .option(KEYGENERATOR_CLASS.key, classOf[NonpartitionedKeyGenerator].getName) + .option(HoodieWriteConfig.INSERT_PARALLELISM.key, "1") + .option(HoodieWriteConfig.UPSERT_PARALLELISM.key, "1") + .mode(SaveMode.Overwrite) + .save(tmp.getCanonicalPath) + + // Create a table over the exist old table. + spark.sql( + s""" + |create table $tableName using hudi + | options ( + | primaryKey = 'id', + | preCombineField = 'ts' + |) + |location '${tmp.getCanonicalPath}' + |""".stripMargin) + checkAnswer(s"select id, name, value, ts from $tableName")( + Seq(1, "a1", 10, 1000) + ) + // Check the missing properties for spark sql + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(tmp.getCanonicalPath) + .setConf(spark.sessionState.newHadoopConf()) + .build() + val properties = metaClient.getTableConfig.getProps.asScala.toMap + assertResult(true)(properties.contains(HoodieTableConfig.HOODIE_TABLE_CREATE_SCHEMA.key)) + assertResult("ts")(properties(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD_PROP.key)) + + // Test insert into + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)") + checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts from $tableName order by id")( + Seq("1", "", 1, "a1", 10, 1000), + Seq("2", "", 2, "a2", 10, 1000) + ) + // Test merge into + spark.sql( + s""" + |merge into $tableName h0 + |using (select 1 as id, 'a1' as name, 11 as value, 1001 as ts) s0 + |on h0.id = s0.id + |when matched then update set * + |""".stripMargin) + checkAnswer(s"select id, name, value, ts from $tableName order by id")( + Seq(1, "a1", 11, 1001), + Seq(2, "a2", 10, 1000) + ) + // Test update + spark.sql(s"update $tableName set value = value + 1 where id = 2") + checkAnswer(s"select id, name, value, ts from $tableName order by id")( + Seq(1, "a1", 11, 1001), + Seq(2, "a2", 11, 1000) + ) + // Test delete + spark.sql(s"delete from $tableName where id = 1") + checkAnswer(s"select id, name, value, ts from $tableName order by id")( + Seq(2, "a2", 11, 1000) + ) + } + } + } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala index 0def04973..0dbb07466 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala @@ -126,9 +126,9 @@ class TestPartialUpdateForMergeInto extends TestHoodieSqlBase { checkException( s""" |merge into $tableName2 t0 - |using ( select 1 as id, 'a1' as name, 12 as price) s0 + |using ( select 1 as id, 'a1' as name, 12 as price, 1000 as ts) s0 |on t0.id = s0.id - |when matched then update set price = s0.price + |when matched then update set price = s0.price, _ts = s0.ts """.stripMargin)( "Missing specify the value for target field: 'id' in merge into update action for MOR table. " + "Currently we cannot support partial update for MOR, please complete all the target fields " +