diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 5c67a68ad..46aee6780 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -256,6 +256,22 @@ object DataSourceWriteOptions { .withDocumentation("When set to true, will perform write operations directly using the spark native " + "`Row` representation, avoiding any additional conversion costs.") + /** + * Enable the bulk insert for sql insert statement. + */ + val SQL_ENABLE_BULK_INSERT:ConfigProperty[String] = ConfigProperty + .key("hoodie.sql.bulk.insert.enable") + .defaultValue("false") + .withDocumentation("When set to true, the sql insert statement will use bulk insert.") + + val SQL_INSERT_MODE: ConfigProperty[String] = ConfigProperty + .key("hoodie.sql.insert.mode") + .defaultValue("upsert") + .withDocumentation("Insert mode when insert data to pk-table. The optional modes are: upsert, strict and non-strict." + + "For upsert mode, insert statement do the upsert operation for the pk-table which will update the duplicate record." + + "For strict mode, insert statement will keep the primary key uniqueness constraint which do not allow duplicate record." + + "While for non-strict mode, hudi just do the insert operation for the pk-table.") + val COMMIT_METADATA_KEYPREFIX: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.write.commitmeta.key.prefix") .defaultValue("_") diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/sql/InsertMode.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/sql/InsertMode.java new file mode 100644 index 000000000..4b44ae438 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/sql/InsertMode.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sql; + +import java.util.Locale; + +/** + * Insert mode for insert into pk-table. + */ +public enum InsertMode { + /** + * In upsert mode for insert into, duplicate record on primary key + * will be updated.This is the default insert mode for pk-table. + */ + UPSERT("upsert"), + /** + * In strict mode for insert into, we do the pk uniqueness guarantee + * for COW pk-table. + * For MOR pk-table, it has the same behavior with "upsert" mode. + */ + STRICT("strict"), + /** + * In non-strict mode for insert into, we use insert operation + * to write data which allow writing the duplicate record. + */ + NON_STRICT("non-strict") + ; + + private String value; + + InsertMode(String value) { + this.value = value; + } + + public String value() { + return value; + } + + public static InsertMode of(String value) { + switch (value.toLowerCase(Locale.ROOT)) { + case "upsert": + return UPSERT; + case "strict": + return STRICT; + case "non-strict": + return NON_STRICT; + default: + throw new AssertionError("UnSupport Insert Mode: " + value); + } + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 6213ab859..6a54931bc 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -164,7 +164,11 @@ object HoodieSparkSqlWriter { // Convert to RDD[HoodieRecord] val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, schema, structName, nameSpace) - val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean || operation.equals(WriteOperationType.UPSERT); + + val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean || + operation.equals(WriteOperationType.UPSERT) || + parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key(), + HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.defaultValue()).toBoolean val hoodieAllIncomingRecords = genericRecords.map(gr => { val hoodieRecord = if (shouldCombine) { val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala index 2f28c66ea..5120fe7c2 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala @@ -114,7 +114,7 @@ object HoodieOptionConfig { /** * Mapping the sql options to the hoodie table config which used to store to the hoodie - * .properites when create the table. + * .properties when create the table. * @param options * @return */ diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala index da6c4824c..b0a8b525d 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala @@ -218,7 +218,7 @@ object HoodieSqlUtils extends SparkAdapterSupport { * Append the spark config and table options to the baseConfig. */ def withSparkConf(spark: SparkSession, options: Map[String, String]) - (baseConfig: Map[String, String]): Map[String, String] = { + (baseConfig: Map[String, String] = Map.empty): Map[String, String] = { baseConfig ++ // Table options has the highest priority (spark.sessionState.conf.getAllConfs ++ HoodieOptionConfig.mappingSqlOptionToHoodieParam(options)) .filterKeys(_.startsWith("hoodie.")) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala index a156d9f70..38c7e290a 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.hive.util.ConfigUtils +import org.apache.hudi.sql.InsertMode import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} @@ -62,8 +63,8 @@ case class CreateHoodieTableAsSelectCommand( } } val tablePath = getTableLocation(table, sparkSession) - val conf = sparkSession.sessionState.newHadoopConf() - assert(CreateHoodieTableCommand.isEmptyPath(tablePath, conf), + val hadoopConf = sparkSession.sessionState.newHadoopConf() + assert(CreateHoodieTableCommand.isEmptyPath(tablePath, hadoopConf), s"Path '$tablePath' should be empty for CTAS") // ReOrder the query which move the partition columns to the last of the project list @@ -72,17 +73,15 @@ case class CreateHoodieTableAsSelectCommand( // Execute the insert query try { - // Set if sync as a managed table. - sparkSession.sessionState.conf.setConfString(DataSourceWriteOptions.HIVE_CREATE_MANAGED_TABLE.key, - (table.tableType == CatalogTableType.MANAGED).toString) - // Sync the options to hive serde properties - sparkSession.sessionState.conf.setConfString(DataSourceWriteOptions.HIVE_TABLE_SERDE_PROPERTIES.key, - ConfigUtils.configToString(table.storage.properties.asJava)) - // Sync the table properties to hive - sparkSession.sessionState.conf.setConfString(DataSourceWriteOptions.HIVE_TABLE_PROPERTIES.key, - ConfigUtils.configToString(table.properties.asJava)) + val options = Map( + DataSourceWriteOptions.HIVE_CREATE_MANAGED_TABLE.key -> (table.tableType == CatalogTableType.MANAGED).toString, + DataSourceWriteOptions.HIVE_TABLE_SERDE_PROPERTIES.key -> ConfigUtils.configToString(table.storage.properties.asJava), + DataSourceWriteOptions.HIVE_TABLE_PROPERTIES.key -> ConfigUtils.configToString(table.properties.asJava), + DataSourceWriteOptions.SQL_INSERT_MODE.key -> InsertMode.NON_STRICT.value(), + DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key -> "true" + ) val success = InsertIntoHoodieTableCommand.run(sparkSession, tableWithSchema, reOrderedQuery, Map.empty, - mode == SaveMode.Overwrite, refreshTable = false) + mode == SaveMode.Overwrite, refreshTable = false, extraOptions = options) if (success) { // If write success, create the table in catalog if it has not synced to the // catalog by the meta sync. @@ -92,11 +91,11 @@ case class CreateHoodieTableAsSelectCommand( createTableCommand.createTableInCatalog(sparkSession, checkPathForManagedTable = false) } } else { // failed to insert data, clear table path - clearTablePath(tablePath, conf) + clearTablePath(tablePath, hadoopConf) } } catch { case e: Throwable => // failed to insert data, clear table path - clearTablePath(tablePath, conf) + clearTablePath(tablePath, hadoopConf) throw e } Seq.empty[Row] diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index cfd66096d..02e742761 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -28,10 +28,12 @@ import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.hive.ddl.HiveSyncMode -import org.apache.hudi.{HoodieSparkSqlWriter, HoodieWriterUtils} +import org.apache.hudi.sql.InsertMode +import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils} +import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} -import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SaveMode, SparkSession} +import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.execution.datasources.LogicalRelation @@ -58,7 +60,7 @@ case class InsertIntoHoodieTableCommand( } } -object InsertIntoHoodieTableCommand { +object InsertIntoHoodieTableCommand extends Logging { /** * Run the insert query. We support both dynamic partition insert and static partition insert. * @param sparkSession The spark session. @@ -71,12 +73,14 @@ object InsertIntoHoodieTableCommand { * , it is None in the map. * @param overwrite Whether to overwrite the table. * @param refreshTable Whether to refresh the table after insert finished. + * @param extraOptions Extra options for insert. */ def run(sparkSession: SparkSession, table: CatalogTable, query: LogicalPlan, insertPartitions: Map[String, Option[String]], - overwrite: Boolean, refreshTable: Boolean = true): Boolean = { + overwrite: Boolean, refreshTable: Boolean = true, + extraOptions: Map[String, String] = Map.empty): Boolean = { - val config = buildHoodieInsertConfig(table, sparkSession, overwrite, insertPartitions) + val config = buildHoodieInsertConfig(table, sparkSession, overwrite, insertPartitions, extraOptions) val mode = if (overwrite && table.partitionColumnNames.isEmpty) { // insert overwrite non-partition table @@ -165,7 +169,7 @@ object InsertIntoHoodieTableCommand { Alias(castAttr, f.name)() }) } - // Remove the hoodie meta fileds from the projects as we do not need these to write + // Remove the hoodie meta fields from the projects as we do not need these to write val withoutMetaFieldDataProjects = dataProjects.filter(c => !HoodieSqlUtils.isMetaField(c.name)) val alignedProjects = withoutMetaFieldDataProjects ++ partitionProjects Project(alignedProjects, query) @@ -179,7 +183,8 @@ object InsertIntoHoodieTableCommand { table: CatalogTable, sparkSession: SparkSession, isOverwrite: Boolean, - insertPartitions: Map[String, Option[String]] = Map.empty): Map[String, String] = { + insertPartitions: Map[String, Option[String]] = Map.empty, + extraOptions: Map[String, String]): Map[String, String] = { if (insertPartitions.nonEmpty && (insertPartitions.keys.toSet != table.partitionColumnNames.toSet)) { @@ -187,7 +192,7 @@ object InsertIntoHoodieTableCommand { s"[${insertPartitions.keys.mkString(" " )}]" + s" not equal to the defined partition in table[${table.partitionColumnNames.mkString(",")}]") } - val parameters = HoodieOptionConfig.mappingSqlOptionToHoodieParam(table.storage.properties) + val parameters = withSparkConf(sparkSession, table.storage.properties)() ++ extraOptions val tableType = parameters.getOrElse(TABLE_TYPE.key, TABLE_TYPE.defaultValue) @@ -209,28 +214,49 @@ object InsertIntoHoodieTableCommand { .getOrElse(INSERT_DROP_DUPS.defaultValue) .toBoolean - val operation = if (isOverwrite) { - if (table.partitionColumnNames.nonEmpty) { - INSERT_OVERWRITE_OPERATION_OPT_VAL // overwrite partition - } else { - INSERT_OPERATION_OPT_VAL - } - } else { - if (primaryColumns.nonEmpty && !dropDuplicate) { - UPSERT_OPERATION_OPT_VAL - } else { - INSERT_OPERATION_OPT_VAL - } - } + val enableBulkInsert = parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key, + DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean + val isPartitionedTable = table.partitionColumnNames.nonEmpty + val isPrimaryKeyTable = primaryColumns.nonEmpty + val insertMode = InsertMode.of(parameters.getOrElse(DataSourceWriteOptions.SQL_INSERT_MODE.key, + DataSourceWriteOptions.SQL_INSERT_MODE.defaultValue())) + val isNonStrictMode = insertMode == InsertMode.NON_STRICT - val payloadClassName = if (primaryColumns.nonEmpty && !dropDuplicate && - tableType == COW_TABLE_TYPE_OPT_VAL) { + val operation = + (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match { + case (true, true, _, _) if !isNonStrictMode => + throw new IllegalArgumentException(s"Table with primaryKey can not use bulk insert in ${insertMode.value()} mode.") + case (_, true, true, _) if isPartitionedTable => + throw new IllegalArgumentException(s"Insert Overwrite Partition can not use bulk insert.") + case (_, true, _, true) => + throw new IllegalArgumentException(s"Bulk insert cannot support drop duplication." + + s" Please disable $INSERT_DROP_DUPS and try again.") + // if enableBulkInsert is true, use bulk insert for the insert overwrite non-partitioned table. + case (_, true, true, _) if !isPartitionedTable => BULK_INSERT_OPERATION_OPT_VAL + // insert overwrite partition + case (_, _, true, _) if isPartitionedTable => INSERT_OVERWRITE_OPERATION_OPT_VAL + // insert overwrite table + case (_, _, true, _) if !isPartitionedTable => INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL + // if it is pk table and the dropDuplicate has disable, use the upsert operation for strict and upsert mode. + case (true, false, false, false) if !isNonStrictMode => UPSERT_OPERATION_OPT_VAL + // if enableBulkInsert is true and the table is non-primaryKeyed, use the bulk insert operation + case (false, true, _, _) => BULK_INSERT_OPERATION_OPT_VAL + // if table is pk table and has enableBulkInsert use bulk insert for non-strict mode. + case (true, true, _, _) if isNonStrictMode => BULK_INSERT_OPERATION_OPT_VAL + // for the rest case, use the insert operation + case (_, _, _, _) => INSERT_OPERATION_OPT_VAL + } + + val payloadClassName = if (operation == UPSERT_OPERATION_OPT_VAL && + tableType == COW_TABLE_TYPE_OPT_VAL && insertMode == InsertMode.STRICT) { // Only validate duplicate key for COW, for MOR it will do the merge with the DefaultHoodieRecordPayload // on reading. classOf[ValidateDuplicateKeyPayload].getCanonicalName } else { classOf[DefaultHoodieRecordPayload].getCanonicalName } + logInfo(s"insert statement use write operation type: $operation, payloadClass: $payloadClassName") + val enableHive = isEnableHive(sparkSession) withSparkConf(sparkSession, options) { Map( @@ -243,6 +269,8 @@ object InsertIntoHoodieTableCommand { RECORDKEY_FIELD.key -> primaryColumns.mkString(","), PARTITIONPATH_FIELD.key -> partitionFields, PAYLOAD_CLASS.key -> payloadClassName, + ENABLE_ROW_WRITER.key -> enableBulkInsert.toString, + HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key -> isPrimaryKeyTable.toString, META_SYNC_ENABLED.key -> enableHive.toString, HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(), HIVE_USE_JDBC.key -> "false", diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala index 3b5b33cc6..2ae082132 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala @@ -22,9 +22,10 @@ import org.apache.avro.generic.GenericRecord import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.util.PartitionPathEncodeUtils import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.keygen.{ComplexKeyGenerator, KeyGenUtils} +import org.apache.hudi.keygen.{BaseKeyGenerator, ComplexKeyGenerator, KeyGenUtils, SparkKeyGeneratorInterface} +import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructType, TimestampType} -import org.joda.time.format.DateTimeFormat +import org.joda.time.format.{DateTimeFormat, DateTimeFormatter} /** * A complex key generator for sql command which do some process for the @@ -62,8 +63,15 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props) } } - override def getPartitionPath(record: GenericRecord) = { - val partitionPath = super.getPartitionPath(record) + override def getRecordKey(row: Row): String = { + if (originKeyGen.isDefined && originKeyGen.get.isInstanceOf[SparkKeyGeneratorInterface]) { + originKeyGen.get.asInstanceOf[SparkKeyGeneratorInterface].getRecordKey(row) + } else { + super.getRecordKey(row) + } + } + + private def convertPartitionPathToSqlType(partitionPath: String, rowType: Boolean): String = { if (partitionSchema.isDefined) { // we can split the partitionPath here because we enable the URL_ENCODE_PARTITIONING_OPT // by default for sql. @@ -82,9 +90,13 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props) partitionField.dataType match { case TimestampType => - val timeMs = MILLISECONDS.convert(_partitionValue.toLong, MICROSECONDS) + val timeMs = if (rowType) { // In RowType, the partitionPathValue is the time format string, convert to millis + SqlKeyGenerator.sqlTimestampFormat.parseMillis(_partitionValue) + } else { + MILLISECONDS.convert(_partitionValue.toLong, MICROSECONDS) + } val timestampFormat = PartitionPathEncodeUtils.escapePathName( - SqlKeyGenerator.timestampTimeFormat.print(timeMs)) + SqlKeyGenerator.timestampTimeFormat.print(timeMs)) if (isHiveStyle) s"$hiveStylePrefix$timestampFormat" else timestampFormat case _ => partitionValue } @@ -92,10 +104,21 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props) } } else partitionPath } + + override def getPartitionPath(record: GenericRecord) = { + val partitionPath = super.getPartitionPath(record) + convertPartitionPathToSqlType(partitionPath, false) + } + + override def getPartitionPath(row: Row): String = { + val partitionPath = super.getPartitionPath(row) + convertPartitionPathToSqlType(partitionPath, true) + } } object SqlKeyGenerator { val PARTITION_SCHEMA = "hoodie.sql.partition.schema" val ORIGIN_KEYGEN_CLASS = "hoodie.sql.origin.keygen.class" private val timestampTimeFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss") + private val sqlTimestampFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.S") } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UuidKeyGenerator.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UuidKeyGenerator.scala index 15dcdd2dc..14a0074fe 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UuidKeyGenerator.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UuidKeyGenerator.scala @@ -18,9 +18,9 @@ package org.apache.spark.sql.hudi.command import java.util.UUID - import org.apache.avro.generic.GenericRecord import org.apache.hudi.common.config.TypedProperties +import org.apache.spark.sql.Row /** * A KeyGenerator which use the uuid as the record key. @@ -28,4 +28,6 @@ import org.apache.hudi.common.config.TypedProperties class UuidKeyGenerator(props: TypedProperties) extends SqlKeyGenerator(props) { override def getRecordKey(record: GenericRecord): String = UUID.randomUUID.toString + + override def getRecordKey(row: Row): String = UUID.randomUUID.toString } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala index 67b52754c..11390fb19 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala @@ -79,12 +79,15 @@ class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll { } protected def checkException(sql: String)(errorMsg: String): Unit = { + var hasException = false try { spark.sql(sql) } catch { case e: Throwable => assertResult(errorMsg)(e.getMessage) + hasException = true } + assertResult(true)(hasException) } protected def removeQuotes(value: Any): Any = { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index 0cda18140..337317b8e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -64,6 +64,7 @@ class TestInsertTable extends TestHoodieSqlBase { test("Test Insert Into None Partitioned Table") { withTempDir { tmp => val tableName = generateTableName + spark.sql(s"set hoodie.sql.insert.mode=strict") // Create none partitioned cow table spark.sql( s""" @@ -80,7 +81,6 @@ class TestInsertTable extends TestHoodieSqlBase { | preCombineField = 'ts' | ) """.stripMargin) - spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") checkAnswer(s"select id, name, price, ts from $tableName")( Seq(1, "a1", 10.0, 1000) @@ -127,6 +127,9 @@ class TestInsertTable extends TestHoodieSqlBase { checkAnswer(s"select id, name, price, ts from $tableName2")( Seq(1, "a1", 10.0, 1000) ) + // disable this config to avoid affect other test in this class. + spark.sql("set hoodie.datasource.write.insert.drop.duplicates = false") + spark.sql(s"set hoodie.sql.insert.mode=upsert") } } @@ -146,7 +149,6 @@ class TestInsertTable extends TestHoodieSqlBase { | partitioned by (dt) | location '${tmp.getCanonicalPath}/$tableName' """.stripMargin) - // Insert overwrite dynamic partition spark.sql( s""" @@ -246,7 +248,6 @@ class TestInsertTable extends TestHoodieSqlBase { | partitioned by (dt) | location '${tmp.getCanonicalPath}/$tableName' """.stripMargin) - spark.sql(s"insert into $tableName partition(dt = $partitionValue) select 1, 'a1', 10") spark.sql(s"insert into $tableName select 2, 'a2', 10, $partitionValue") checkAnswer(s"select id, name, price, cast(dt as string) from $tableName order by id")( @@ -303,5 +304,220 @@ class TestInsertTable extends TestHoodieSqlBase { "assertion failed: Required select columns count: 4, Current select columns(including static partition column)" + " count: 3,columns: (1,a1,10)" ) + spark.sql("set hoodie.sql.bulk.insert.enable = true") + spark.sql("set hoodie.sql.insert.mode= strict") + + val tableName2 = generateTableName + spark.sql( + s""" + |create table $tableName2 ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | options ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + checkException(s"insert into $tableName2 values(1, 'a1', 10, 1000)")( + "Table with primaryKey can not use bulk insert in strict mode." + ) + + val tableName3 = generateTableName + spark.sql( + s""" + |create table $tableName3 ( + | id int, + | name string, + | price double, + | dt string + |) using hudi + | partitioned by (dt) + """.stripMargin) + checkException(s"insert overwrite table $tableName3 values(1, 'a1', 10, '2021-07-18')")( + "Insert Overwrite Partition can not use bulk insert." + ) + spark.sql("set hoodie.sql.bulk.insert.enable = false") + spark.sql("set hoodie.sql.insert.mode= upsert") } + + test("Test bulk insert") { + withTempDir { tmp => + Seq("cow", "mor").foreach {tableType => + // Test bulk insert for single partition + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | dt string + |) using hudi + | options ( + | type = '$tableType' + | ) + | partitioned by (dt) + | location '${tmp.getCanonicalPath}/$tableName' + """.stripMargin) + spark.sql("set hoodie.datasource.write.insert.drop.duplicates = false") + + // Enable the bulk insert + spark.sql("set hoodie.sql.bulk.insert.enable = true") + spark.sql(s"insert into $tableName values(1, 'a1', 10, '2021-07-18')") + + checkAnswer(s"select id, name, price, dt from $tableName")( + Seq(1, "a1", 10.0, "2021-07-18") + ) + // Disable the bulk insert + spark.sql("set hoodie.sql.bulk.insert.enable = false") + spark.sql(s"insert into $tableName values(2, 'a2', 10, '2021-07-18')") + + checkAnswer(s"select id, name, price, dt from $tableName order by id")( + Seq(1, "a1", 10.0, "2021-07-18"), + Seq(2, "a2", 10.0, "2021-07-18") + ) + + // Test bulk insert for multi-level partition + val tableMultiPartition = generateTableName + spark.sql( + s""" + |create table $tableMultiPartition ( + | id int, + | name string, + | price double, + | dt string, + | hh string + |) using hudi + | options ( + | type = '$tableType' + | ) + | partitioned by (dt, hh) + | location '${tmp.getCanonicalPath}/$tableMultiPartition' + """.stripMargin) + + // Enable the bulk insert + spark.sql("set hoodie.sql.bulk.insert.enable = true") + spark.sql(s"insert into $tableMultiPartition values(1, 'a1', 10, '2021-07-18', '12')") + + checkAnswer(s"select id, name, price, dt, hh from $tableMultiPartition")( + Seq(1, "a1", 10.0, "2021-07-18", "12") + ) + // Disable the bulk insert + spark.sql("set hoodie.sql.bulk.insert.enable = false") + spark.sql(s"insert into $tableMultiPartition " + + s"values(2, 'a2', 10, '2021-07-18','12')") + + checkAnswer(s"select id, name, price, dt, hh from $tableMultiPartition order by id")( + Seq(1, "a1", 10.0, "2021-07-18", "12"), + Seq(2, "a2", 10.0, "2021-07-18", "12") + ) + // Test bulk insert for non-partitioned table + val nonPartitionedTable = generateTableName + spark.sql( + s""" + |create table $nonPartitionedTable ( + | id int, + | name string, + | price double + |) using hudi + | options ( + | type = '$tableType' + | ) + | location '${tmp.getCanonicalPath}/$nonPartitionedTable' + """.stripMargin) + spark.sql("set hoodie.sql.bulk.insert.enable = true") + spark.sql(s"insert into $nonPartitionedTable values(1, 'a1', 10)") + checkAnswer(s"select id, name, price from $nonPartitionedTable")( + Seq(1, "a1", 10.0) + ) + spark.sql(s"insert overwrite table $nonPartitionedTable values(2, 'a2', 10)") + checkAnswer(s"select id, name, price from $nonPartitionedTable")( + Seq(2, "a2", 10.0) + ) + spark.sql("set hoodie.sql.bulk.insert.enable = false") + + // Test CTAS for bulk insert + val tableName2 = generateTableName + spark.sql( + s""" + |create table $tableName2 + |using hudi + |options( + | type = '$tableType', + | primaryKey = 'id' + |) + | location '${tmp.getCanonicalPath}/$tableName2' + | as + | select * from $tableName + |""".stripMargin) + checkAnswer(s"select id, name, price, dt from $tableName2 order by id")( + Seq(1, "a1", 10.0, "2021-07-18"), + Seq(2, "a2", 10.0, "2021-07-18") + ) + } + } + } + + test("Test combine before insert") { + withTempDir{tmp => + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | options ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + spark.sql( + s""" + |insert overwrite table $tableName + |select * from ( + | select 1 as id, 'a1' as name, 10 as price, 1000 as ts + | union all + | select 1 as id, 'a1' as name, 11 as price, 1001 as ts + | ) + |""".stripMargin + ) + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 11.0, 1001) + ) + } + } + + test("Test insert pk-table") { + withTempDir{tmp => + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | options ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName values(1, 'a1', 11, 1000)") + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 11.0, 1000) + ) + + } + } + }