1
0

[HUDI-2208] Support Bulk Insert For Spark Sql (#3328)

This commit is contained in:
pengzhiwei
2021-08-09 12:18:31 +08:00
committed by GitHub
parent 11ea74958d
commit 41a9986a76
11 changed files with 407 additions and 50 deletions

View File

@@ -256,6 +256,22 @@ object DataSourceWriteOptions {
.withDocumentation("When set to true, will perform write operations directly using the spark native " +
"`Row` representation, avoiding any additional conversion costs.")
/**
* Enable the bulk insert for sql insert statement.
*/
val SQL_ENABLE_BULK_INSERT:ConfigProperty[String] = ConfigProperty
.key("hoodie.sql.bulk.insert.enable")
.defaultValue("false")
.withDocumentation("When set to true, the sql insert statement will use bulk insert.")
val SQL_INSERT_MODE: ConfigProperty[String] = ConfigProperty
.key("hoodie.sql.insert.mode")
.defaultValue("upsert")
.withDocumentation("Insert mode when insert data to pk-table. The optional modes are: upsert, strict and non-strict." +
"For upsert mode, insert statement do the upsert operation for the pk-table which will update the duplicate record." +
"For strict mode, insert statement will keep the primary key uniqueness constraint which do not allow duplicate record." +
"While for non-strict mode, hudi just do the insert operation for the pk-table.")
val COMMIT_METADATA_KEYPREFIX: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.write.commitmeta.key.prefix")
.defaultValue("_")

View File

@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sql;
import java.util.Locale;
/**
* Insert mode for insert into pk-table.
*/
public enum InsertMode {
/**
* In upsert mode for insert into, duplicate record on primary key
* will be updated.This is the default insert mode for pk-table.
*/
UPSERT("upsert"),
/**
* In strict mode for insert into, we do the pk uniqueness guarantee
* for COW pk-table.
* For MOR pk-table, it has the same behavior with "upsert" mode.
*/
STRICT("strict"),
/**
* In non-strict mode for insert into, we use insert operation
* to write data which allow writing the duplicate record.
*/
NON_STRICT("non-strict")
;
private String value;
InsertMode(String value) {
this.value = value;
}
public String value() {
return value;
}
public static InsertMode of(String value) {
switch (value.toLowerCase(Locale.ROOT)) {
case "upsert":
return UPSERT;
case "strict":
return STRICT;
case "non-strict":
return NON_STRICT;
default:
throw new AssertionError("UnSupport Insert Mode: " + value);
}
}
}

View File

@@ -164,7 +164,11 @@ object HoodieSparkSqlWriter {
// Convert to RDD[HoodieRecord]
val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, schema, structName, nameSpace)
val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean || operation.equals(WriteOperationType.UPSERT);
val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
operation.equals(WriteOperationType.UPSERT) ||
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key(),
HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.defaultValue()).toBoolean
val hoodieAllIncomingRecords = genericRecords.map(gr => {
val hoodieRecord = if (shouldCombine) {
val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false)

View File

@@ -114,7 +114,7 @@ object HoodieOptionConfig {
/**
* Mapping the sql options to the hoodie table config which used to store to the hoodie
* .properites when create the table.
* .properties when create the table.
* @param options
* @return
*/

View File

@@ -218,7 +218,7 @@ object HoodieSqlUtils extends SparkAdapterSupport {
* Append the spark config and table options to the baseConfig.
*/
def withSparkConf(spark: SparkSession, options: Map[String, String])
(baseConfig: Map[String, String]): Map[String, String] = {
(baseConfig: Map[String, String] = Map.empty): Map[String, String] = {
baseConfig ++ // Table options has the highest priority
(spark.sessionState.conf.getAllConfs ++ HoodieOptionConfig.mappingSqlOptionToHoodieParam(options))
.filterKeys(_.startsWith("hoodie."))

View File

@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.hive.util.ConfigUtils
import org.apache.hudi.sql.InsertMode
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
@@ -62,8 +63,8 @@ case class CreateHoodieTableAsSelectCommand(
}
}
val tablePath = getTableLocation(table, sparkSession)
val conf = sparkSession.sessionState.newHadoopConf()
assert(CreateHoodieTableCommand.isEmptyPath(tablePath, conf),
val hadoopConf = sparkSession.sessionState.newHadoopConf()
assert(CreateHoodieTableCommand.isEmptyPath(tablePath, hadoopConf),
s"Path '$tablePath' should be empty for CTAS")
// ReOrder the query which move the partition columns to the last of the project list
@@ -72,17 +73,15 @@ case class CreateHoodieTableAsSelectCommand(
// Execute the insert query
try {
// Set if sync as a managed table.
sparkSession.sessionState.conf.setConfString(DataSourceWriteOptions.HIVE_CREATE_MANAGED_TABLE.key,
(table.tableType == CatalogTableType.MANAGED).toString)
// Sync the options to hive serde properties
sparkSession.sessionState.conf.setConfString(DataSourceWriteOptions.HIVE_TABLE_SERDE_PROPERTIES.key,
ConfigUtils.configToString(table.storage.properties.asJava))
// Sync the table properties to hive
sparkSession.sessionState.conf.setConfString(DataSourceWriteOptions.HIVE_TABLE_PROPERTIES.key,
ConfigUtils.configToString(table.properties.asJava))
val options = Map(
DataSourceWriteOptions.HIVE_CREATE_MANAGED_TABLE.key -> (table.tableType == CatalogTableType.MANAGED).toString,
DataSourceWriteOptions.HIVE_TABLE_SERDE_PROPERTIES.key -> ConfigUtils.configToString(table.storage.properties.asJava),
DataSourceWriteOptions.HIVE_TABLE_PROPERTIES.key -> ConfigUtils.configToString(table.properties.asJava),
DataSourceWriteOptions.SQL_INSERT_MODE.key -> InsertMode.NON_STRICT.value(),
DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key -> "true"
)
val success = InsertIntoHoodieTableCommand.run(sparkSession, tableWithSchema, reOrderedQuery, Map.empty,
mode == SaveMode.Overwrite, refreshTable = false)
mode == SaveMode.Overwrite, refreshTable = false, extraOptions = options)
if (success) {
// If write success, create the table in catalog if it has not synced to the
// catalog by the meta sync.
@@ -92,11 +91,11 @@ case class CreateHoodieTableAsSelectCommand(
createTableCommand.createTableInCatalog(sparkSession, checkPathForManagedTable = false)
}
} else { // failed to insert data, clear table path
clearTablePath(tablePath, conf)
clearTablePath(tablePath, hadoopConf)
}
} catch {
case e: Throwable => // failed to insert data, clear table path
clearTablePath(tablePath, conf)
clearTablePath(tablePath, hadoopConf)
throw e
}
Seq.empty[Row]

View File

@@ -28,10 +28,12 @@ import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.hudi.{HoodieSparkSqlWriter, HoodieWriterUtils}
import org.apache.hudi.sql.InsertMode
import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SaveMode, SparkSession}
import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -58,7 +60,7 @@ case class InsertIntoHoodieTableCommand(
}
}
object InsertIntoHoodieTableCommand {
object InsertIntoHoodieTableCommand extends Logging {
/**
* Run the insert query. We support both dynamic partition insert and static partition insert.
* @param sparkSession The spark session.
@@ -71,12 +73,14 @@ object InsertIntoHoodieTableCommand {
* , it is None in the map.
* @param overwrite Whether to overwrite the table.
* @param refreshTable Whether to refresh the table after insert finished.
* @param extraOptions Extra options for insert.
*/
def run(sparkSession: SparkSession, table: CatalogTable, query: LogicalPlan,
insertPartitions: Map[String, Option[String]],
overwrite: Boolean, refreshTable: Boolean = true): Boolean = {
overwrite: Boolean, refreshTable: Boolean = true,
extraOptions: Map[String, String] = Map.empty): Boolean = {
val config = buildHoodieInsertConfig(table, sparkSession, overwrite, insertPartitions)
val config = buildHoodieInsertConfig(table, sparkSession, overwrite, insertPartitions, extraOptions)
val mode = if (overwrite && table.partitionColumnNames.isEmpty) {
// insert overwrite non-partition table
@@ -165,7 +169,7 @@ object InsertIntoHoodieTableCommand {
Alias(castAttr, f.name)()
})
}
// Remove the hoodie meta fileds from the projects as we do not need these to write
// Remove the hoodie meta fields from the projects as we do not need these to write
val withoutMetaFieldDataProjects = dataProjects.filter(c => !HoodieSqlUtils.isMetaField(c.name))
val alignedProjects = withoutMetaFieldDataProjects ++ partitionProjects
Project(alignedProjects, query)
@@ -179,7 +183,8 @@ object InsertIntoHoodieTableCommand {
table: CatalogTable,
sparkSession: SparkSession,
isOverwrite: Boolean,
insertPartitions: Map[String, Option[String]] = Map.empty): Map[String, String] = {
insertPartitions: Map[String, Option[String]] = Map.empty,
extraOptions: Map[String, String]): Map[String, String] = {
if (insertPartitions.nonEmpty &&
(insertPartitions.keys.toSet != table.partitionColumnNames.toSet)) {
@@ -187,7 +192,7 @@ object InsertIntoHoodieTableCommand {
s"[${insertPartitions.keys.mkString(" " )}]" +
s" not equal to the defined partition in table[${table.partitionColumnNames.mkString(",")}]")
}
val parameters = HoodieOptionConfig.mappingSqlOptionToHoodieParam(table.storage.properties)
val parameters = withSparkConf(sparkSession, table.storage.properties)() ++ extraOptions
val tableType = parameters.getOrElse(TABLE_TYPE.key, TABLE_TYPE.defaultValue)
@@ -209,28 +214,49 @@ object InsertIntoHoodieTableCommand {
.getOrElse(INSERT_DROP_DUPS.defaultValue)
.toBoolean
val operation = if (isOverwrite) {
if (table.partitionColumnNames.nonEmpty) {
INSERT_OVERWRITE_OPERATION_OPT_VAL // overwrite partition
} else {
INSERT_OPERATION_OPT_VAL
}
} else {
if (primaryColumns.nonEmpty && !dropDuplicate) {
UPSERT_OPERATION_OPT_VAL
} else {
INSERT_OPERATION_OPT_VAL
}
}
val enableBulkInsert = parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
val isPartitionedTable = table.partitionColumnNames.nonEmpty
val isPrimaryKeyTable = primaryColumns.nonEmpty
val insertMode = InsertMode.of(parameters.getOrElse(DataSourceWriteOptions.SQL_INSERT_MODE.key,
DataSourceWriteOptions.SQL_INSERT_MODE.defaultValue()))
val isNonStrictMode = insertMode == InsertMode.NON_STRICT
val payloadClassName = if (primaryColumns.nonEmpty && !dropDuplicate &&
tableType == COW_TABLE_TYPE_OPT_VAL) {
val operation =
(isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
case (true, true, _, _) if !isNonStrictMode =>
throw new IllegalArgumentException(s"Table with primaryKey can not use bulk insert in ${insertMode.value()} mode.")
case (_, true, true, _) if isPartitionedTable =>
throw new IllegalArgumentException(s"Insert Overwrite Partition can not use bulk insert.")
case (_, true, _, true) =>
throw new IllegalArgumentException(s"Bulk insert cannot support drop duplication." +
s" Please disable $INSERT_DROP_DUPS and try again.")
// if enableBulkInsert is true, use bulk insert for the insert overwrite non-partitioned table.
case (_, true, true, _) if !isPartitionedTable => BULK_INSERT_OPERATION_OPT_VAL
// insert overwrite partition
case (_, _, true, _) if isPartitionedTable => INSERT_OVERWRITE_OPERATION_OPT_VAL
// insert overwrite table
case (_, _, true, _) if !isPartitionedTable => INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
// if it is pk table and the dropDuplicate has disable, use the upsert operation for strict and upsert mode.
case (true, false, false, false) if !isNonStrictMode => UPSERT_OPERATION_OPT_VAL
// if enableBulkInsert is true and the table is non-primaryKeyed, use the bulk insert operation
case (false, true, _, _) => BULK_INSERT_OPERATION_OPT_VAL
// if table is pk table and has enableBulkInsert use bulk insert for non-strict mode.
case (true, true, _, _) if isNonStrictMode => BULK_INSERT_OPERATION_OPT_VAL
// for the rest case, use the insert operation
case (_, _, _, _) => INSERT_OPERATION_OPT_VAL
}
val payloadClassName = if (operation == UPSERT_OPERATION_OPT_VAL &&
tableType == COW_TABLE_TYPE_OPT_VAL && insertMode == InsertMode.STRICT) {
// Only validate duplicate key for COW, for MOR it will do the merge with the DefaultHoodieRecordPayload
// on reading.
classOf[ValidateDuplicateKeyPayload].getCanonicalName
} else {
classOf[DefaultHoodieRecordPayload].getCanonicalName
}
logInfo(s"insert statement use write operation type: $operation, payloadClass: $payloadClassName")
val enableHive = isEnableHive(sparkSession)
withSparkConf(sparkSession, options) {
Map(
@@ -243,6 +269,8 @@ object InsertIntoHoodieTableCommand {
RECORDKEY_FIELD.key -> primaryColumns.mkString(","),
PARTITIONPATH_FIELD.key -> partitionFields,
PAYLOAD_CLASS.key -> payloadClassName,
ENABLE_ROW_WRITER.key -> enableBulkInsert.toString,
HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key -> isPrimaryKeyTable.toString,
META_SYNC_ENABLED.key -> enableHive.toString,
HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
HIVE_USE_JDBC.key -> "false",

View File

@@ -22,9 +22,10 @@ import org.apache.avro.generic.GenericRecord
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.{ComplexKeyGenerator, KeyGenUtils}
import org.apache.hudi.keygen.{BaseKeyGenerator, ComplexKeyGenerator, KeyGenUtils, SparkKeyGeneratorInterface}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.joda.time.format.DateTimeFormat
import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}
/**
* A complex key generator for sql command which do some process for the
@@ -62,8 +63,15 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
}
}
override def getPartitionPath(record: GenericRecord) = {
val partitionPath = super.getPartitionPath(record)
override def getRecordKey(row: Row): String = {
if (originKeyGen.isDefined && originKeyGen.get.isInstanceOf[SparkKeyGeneratorInterface]) {
originKeyGen.get.asInstanceOf[SparkKeyGeneratorInterface].getRecordKey(row)
} else {
super.getRecordKey(row)
}
}
private def convertPartitionPathToSqlType(partitionPath: String, rowType: Boolean): String = {
if (partitionSchema.isDefined) {
// we can split the partitionPath here because we enable the URL_ENCODE_PARTITIONING_OPT
// by default for sql.
@@ -82,9 +90,13 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
partitionField.dataType match {
case TimestampType =>
val timeMs = MILLISECONDS.convert(_partitionValue.toLong, MICROSECONDS)
val timeMs = if (rowType) { // In RowType, the partitionPathValue is the time format string, convert to millis
SqlKeyGenerator.sqlTimestampFormat.parseMillis(_partitionValue)
} else {
MILLISECONDS.convert(_partitionValue.toLong, MICROSECONDS)
}
val timestampFormat = PartitionPathEncodeUtils.escapePathName(
SqlKeyGenerator.timestampTimeFormat.print(timeMs))
SqlKeyGenerator.timestampTimeFormat.print(timeMs))
if (isHiveStyle) s"$hiveStylePrefix$timestampFormat" else timestampFormat
case _ => partitionValue
}
@@ -92,10 +104,21 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
}
} else partitionPath
}
override def getPartitionPath(record: GenericRecord) = {
val partitionPath = super.getPartitionPath(record)
convertPartitionPathToSqlType(partitionPath, false)
}
override def getPartitionPath(row: Row): String = {
val partitionPath = super.getPartitionPath(row)
convertPartitionPathToSqlType(partitionPath, true)
}
}
object SqlKeyGenerator {
val PARTITION_SCHEMA = "hoodie.sql.partition.schema"
val ORIGIN_KEYGEN_CLASS = "hoodie.sql.origin.keygen.class"
private val timestampTimeFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
private val sqlTimestampFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.S")
}

View File

@@ -18,9 +18,9 @@
package org.apache.spark.sql.hudi.command
import java.util.UUID
import org.apache.avro.generic.GenericRecord
import org.apache.hudi.common.config.TypedProperties
import org.apache.spark.sql.Row
/**
* A KeyGenerator which use the uuid as the record key.
@@ -28,4 +28,6 @@ import org.apache.hudi.common.config.TypedProperties
class UuidKeyGenerator(props: TypedProperties) extends SqlKeyGenerator(props) {
override def getRecordKey(record: GenericRecord): String = UUID.randomUUID.toString
override def getRecordKey(row: Row): String = UUID.randomUUID.toString
}

View File

@@ -79,12 +79,15 @@ class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll {
}
protected def checkException(sql: String)(errorMsg: String): Unit = {
var hasException = false
try {
spark.sql(sql)
} catch {
case e: Throwable =>
assertResult(errorMsg)(e.getMessage)
hasException = true
}
assertResult(true)(hasException)
}
protected def removeQuotes(value: Any): Any = {

View File

@@ -64,6 +64,7 @@ class TestInsertTable extends TestHoodieSqlBase {
test("Test Insert Into None Partitioned Table") {
withTempDir { tmp =>
val tableName = generateTableName
spark.sql(s"set hoodie.sql.insert.mode=strict")
// Create none partitioned cow table
spark.sql(
s"""
@@ -80,7 +81,6 @@ class TestInsertTable extends TestHoodieSqlBase {
| preCombineField = 'ts'
| )
""".stripMargin)
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 10.0, 1000)
@@ -127,6 +127,9 @@ class TestInsertTable extends TestHoodieSqlBase {
checkAnswer(s"select id, name, price, ts from $tableName2")(
Seq(1, "a1", 10.0, 1000)
)
// disable this config to avoid affect other test in this class.
spark.sql("set hoodie.datasource.write.insert.drop.duplicates = false")
spark.sql(s"set hoodie.sql.insert.mode=upsert")
}
}
@@ -146,7 +149,6 @@ class TestInsertTable extends TestHoodieSqlBase {
| partitioned by (dt)
| location '${tmp.getCanonicalPath}/$tableName'
""".stripMargin)
// Insert overwrite dynamic partition
spark.sql(
s"""
@@ -246,7 +248,6 @@ class TestInsertTable extends TestHoodieSqlBase {
| partitioned by (dt)
| location '${tmp.getCanonicalPath}/$tableName'
""".stripMargin)
spark.sql(s"insert into $tableName partition(dt = $partitionValue) select 1, 'a1', 10")
spark.sql(s"insert into $tableName select 2, 'a2', 10, $partitionValue")
checkAnswer(s"select id, name, price, cast(dt as string) from $tableName order by id")(
@@ -303,5 +304,220 @@ class TestInsertTable extends TestHoodieSqlBase {
"assertion failed: Required select columns count: 4, Current select columns(including static partition column)" +
" count: 3columns: (1,a1,10)"
)
spark.sql("set hoodie.sql.bulk.insert.enable = true")
spark.sql("set hoodie.sql.insert.mode= strict")
val tableName2 = generateTableName
spark.sql(
s"""
|create table $tableName2 (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| options (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
checkException(s"insert into $tableName2 values(1, 'a1', 10, 1000)")(
"Table with primaryKey can not use bulk insert in strict mode."
)
val tableName3 = generateTableName
spark.sql(
s"""
|create table $tableName3 (
| id int,
| name string,
| price double,
| dt string
|) using hudi
| partitioned by (dt)
""".stripMargin)
checkException(s"insert overwrite table $tableName3 values(1, 'a1', 10, '2021-07-18')")(
"Insert Overwrite Partition can not use bulk insert."
)
spark.sql("set hoodie.sql.bulk.insert.enable = false")
spark.sql("set hoodie.sql.insert.mode= upsert")
}
test("Test bulk insert") {
withTempDir { tmp =>
Seq("cow", "mor").foreach {tableType =>
// Test bulk insert for single partition
val tableName = generateTableName
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| dt string
|) using hudi
| options (
| type = '$tableType'
| )
| partitioned by (dt)
| location '${tmp.getCanonicalPath}/$tableName'
""".stripMargin)
spark.sql("set hoodie.datasource.write.insert.drop.duplicates = false")
// Enable the bulk insert
spark.sql("set hoodie.sql.bulk.insert.enable = true")
spark.sql(s"insert into $tableName values(1, 'a1', 10, '2021-07-18')")
checkAnswer(s"select id, name, price, dt from $tableName")(
Seq(1, "a1", 10.0, "2021-07-18")
)
// Disable the bulk insert
spark.sql("set hoodie.sql.bulk.insert.enable = false")
spark.sql(s"insert into $tableName values(2, 'a2', 10, '2021-07-18')")
checkAnswer(s"select id, name, price, dt from $tableName order by id")(
Seq(1, "a1", 10.0, "2021-07-18"),
Seq(2, "a2", 10.0, "2021-07-18")
)
// Test bulk insert for multi-level partition
val tableMultiPartition = generateTableName
spark.sql(
s"""
|create table $tableMultiPartition (
| id int,
| name string,
| price double,
| dt string,
| hh string
|) using hudi
| options (
| type = '$tableType'
| )
| partitioned by (dt, hh)
| location '${tmp.getCanonicalPath}/$tableMultiPartition'
""".stripMargin)
// Enable the bulk insert
spark.sql("set hoodie.sql.bulk.insert.enable = true")
spark.sql(s"insert into $tableMultiPartition values(1, 'a1', 10, '2021-07-18', '12')")
checkAnswer(s"select id, name, price, dt, hh from $tableMultiPartition")(
Seq(1, "a1", 10.0, "2021-07-18", "12")
)
// Disable the bulk insert
spark.sql("set hoodie.sql.bulk.insert.enable = false")
spark.sql(s"insert into $tableMultiPartition " +
s"values(2, 'a2', 10, '2021-07-18','12')")
checkAnswer(s"select id, name, price, dt, hh from $tableMultiPartition order by id")(
Seq(1, "a1", 10.0, "2021-07-18", "12"),
Seq(2, "a2", 10.0, "2021-07-18", "12")
)
// Test bulk insert for non-partitioned table
val nonPartitionedTable = generateTableName
spark.sql(
s"""
|create table $nonPartitionedTable (
| id int,
| name string,
| price double
|) using hudi
| options (
| type = '$tableType'
| )
| location '${tmp.getCanonicalPath}/$nonPartitionedTable'
""".stripMargin)
spark.sql("set hoodie.sql.bulk.insert.enable = true")
spark.sql(s"insert into $nonPartitionedTable values(1, 'a1', 10)")
checkAnswer(s"select id, name, price from $nonPartitionedTable")(
Seq(1, "a1", 10.0)
)
spark.sql(s"insert overwrite table $nonPartitionedTable values(2, 'a2', 10)")
checkAnswer(s"select id, name, price from $nonPartitionedTable")(
Seq(2, "a2", 10.0)
)
spark.sql("set hoodie.sql.bulk.insert.enable = false")
// Test CTAS for bulk insert
val tableName2 = generateTableName
spark.sql(
s"""
|create table $tableName2
|using hudi
|options(
| type = '$tableType',
| primaryKey = 'id'
|)
| location '${tmp.getCanonicalPath}/$tableName2'
| as
| select * from $tableName
|""".stripMargin)
checkAnswer(s"select id, name, price, dt from $tableName2 order by id")(
Seq(1, "a1", 10.0, "2021-07-18"),
Seq(2, "a2", 10.0, "2021-07-18")
)
}
}
}
test("Test combine before insert") {
withTempDir{tmp =>
val tableName = generateTableName
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| options (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
spark.sql(
s"""
|insert overwrite table $tableName
|select * from (
| select 1 as id, 'a1' as name, 10 as price, 1000 as ts
| union all
| select 1 as id, 'a1' as name, 11 as price, 1001 as ts
| )
|""".stripMargin
)
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 11.0, 1001)
)
}
}
test("Test insert pk-table") {
withTempDir{tmp =>
val tableName = generateTableName
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| options (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(1, 'a1', 11, 1000)")
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 11.0, 1000)
)
}
}
}