diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index e1095012e..b10a05b02 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -165,7 +165,7 @@ private[hudi] object HoodieSparkSqlWriter { // Create a HoodieWriteClient & issue the write. val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, - tblName, mapAsJavaMap(parameters) + tblName, mapAsJavaMap(parameters - HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP) )).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) { @@ -205,7 +205,8 @@ private[hudi] object HoodieSparkSqlWriter { // Create a HoodieWriteClient & issue the delete. val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, Schema.create(Schema.Type.NULL).toString, path.get, tblName, - mapAsJavaMap(parameters))).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] + mapAsJavaMap(parameters - HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP))) + .asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) { asyncCompactionTriggerFn.get.apply(client) diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index bf71e6868..3db89487f 100644 --- a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -307,4 +307,17 @@ class TestCOWDataSource extends HoodieClientTestBase { } }) } + + @Test def testWithAutoCommitOn(): Unit = { + val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP, "true") + .mode(SaveMode.Overwrite) + .save(basePath) + + assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) + } }