diff --git a/hudi-integ-test/pom.xml b/hudi-integ-test/pom.xml
index b6b37c78d..e92beee7f 100644
--- a/hudi-integ-test/pom.xml
+++ b/hudi-integ-test/pom.xml
@@ -396,6 +396,13 @@
test
+
+ org.scalatest
+ scalatest_${scala.binary.version}
+ ${scalatest.version}
+ test
+
+
@@ -407,7 +414,49 @@
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+ ${scala-maven-plugin.version}
+
+
+ -nobootcp
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+
+
+ scala-compile-first
+ process-resources
+
+ add-source
+ compile
+
+
+
+ scala-test-compile
+ process-test-resources
+
+ testCompile
+
+
+
+
+
org.codehaus.mojo
exec-maven-plugin
@@ -430,6 +479,29 @@
+
+ org.scalatest
+ scalatest-maven-plugin
+ 1.0
+
+ ${skipUTs}
+ ${project.build.directory}/surefire-reports
+ .
+ TestSuite.txt
+
+
+
+ test
+
+ test
+
+
+
+
+
+ org.scalastyle
+ scalastyle-maven-plugin
+
org.jacoco
jacoco-maven-plugin
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
index 9e29c9b10..d8ea946fe 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
@@ -94,6 +94,7 @@ public class DeltaConfig implements Serializable {
private static String VALIDATE_CLEAN = "validate_clean";
private static String SCHEMA_VERSION = "schema_version";
private static String NUM_ROLLBACKS = "num_rollbacks";
+ private static String ENABLE_ROW_WRITING = "enable_row_writing";
private Map configsMap;
@@ -189,6 +190,10 @@ public class DeltaConfig implements Serializable {
return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_CLEAN, false).toString());
}
+ public boolean enableRowWriting() {
+ return Boolean.valueOf(configsMap.getOrDefault(ENABLE_ROW_WRITING, false).toString());
+ }
+
public Map getOtherConfigs() {
if (configsMap == null) {
return new HashMap<>();
diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkBulkInsertNode.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkBulkInsertNode.scala
new file mode 100644
index 000000000..af3df6f70
--- /dev/null
+++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkBulkInsertNode.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.dag.nodes
+
+import org.apache.hudi.client.WriteStatus
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
+import org.apache.hudi.integ.testsuite.dag.ExecutionContext
+import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SaveMode
+
+import scala.collection.JavaConverters._
+
+/**
+ * Spark datasource based bulk insert node
+ * @param config1
+ */
+class SparkBulkInsertNode(config1: Config) extends DagNode[RDD[WriteStatus]] {
+
+ config = config1
+
+ /**
+ * Execute the {@link DagNode}.
+ *
+ * @param context The context needed for an execution of a node.
+ * @param curItrCount iteration count for executing the node.
+ * @throws Exception Thrown if the execution failed.
+ */
+ override def execute(context: ExecutionContext, curItrCount: Int): Unit = {
+ if (!config.isDisableGenerate) {
+ context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateInserts(config)).count()
+ }
+ val inputDF = AvroConversionUtils.createDataFrame(context.getWriterContext.getHoodieTestSuiteWriter.getNextBatch,
+ context.getWriterContext.getHoodieTestSuiteWriter.getSchema,
+ context.getWriterContext.getSparkSession)
+ val saveMode = if(curItrCount == 0) SaveMode.Overwrite else SaveMode.Append
+ inputDF.write.format("hudi")
+ .options(DataSourceWriteOptions.translateSqlOptions(context.getWriterContext.getProps.asScala.toMap))
+ .option(DataSourceWriteOptions.TABLE_NAME_OPT_KEY.key(), context.getHoodieTestSuiteWriter.getCfg.targetTableName)
+ .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key(), context.getHoodieTestSuiteWriter.getCfg.tableType)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.ENABLE_ROW_WRITER_OPT_KEY.key(), String.valueOf(config.enableRowWriting()))
+ .option(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX_OPT_KEY.key(), "deltastreamer.checkpoint.key")
+ .option("deltastreamer.checkpoint.key", context.getWriterContext.getHoodieTestSuiteWriter.getLastCheckpoint.orElse(""))
+ .option(HoodieWriteConfig.TABLE_NAME.key(), context.getHoodieTestSuiteWriter.getCfg.targetTableName)
+ .mode(saveMode)
+ .save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath)
+ }
+}