diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java index 8442aff08..df450a5ff 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java @@ -20,6 +20,7 @@ package org.apache.hudi.common.testutils; import org.apache.hudi.avro.MercifulJsonConverter; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.FileIOUtils; @@ -94,6 +95,11 @@ public class RawTripTestPayload implements HoodieRecordPayload deleteRecordsToStrings(List records) { + return records.stream().map(record -> "{\"_row_key\": \"" + record.getRecordKey() + "\",\"partition\": \"" + record.getPartitionPath() + "\"}") + .collect(Collectors.toList()); + } + public String getPartitionPath() { return partitionPath; } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 829690f2b..340ac1443 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -204,7 +204,7 @@ private[hudi] object HoodieSparkSqlWriter { // Create a HoodieWriteClient & issue the delete. val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, - Schema.create(Schema.Type.NULL).toString, path.get, tblName, + null, path.get, tblName, mapAsJavaMap(parameters - HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP))) .asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 88ed65f89..bf730e212 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -27,6 +27,7 @@ import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.timeline.HoodieInstant import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.common.testutils.RawTripTestPayload.deleteRecordsToStrings import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.keygen._ import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config @@ -52,6 +53,8 @@ class TestCOWDataSource extends HoodieClientTestBase { val commonOpts = Map( "hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4", + "hoodie.bulkinsert.shuffle.parallelism" -> "2", + "hoodie.delete.shuffle.parallelism" -> "1", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp", @@ -125,6 +128,37 @@ class TestCOWDataSource extends HoodieClientTestBase { assertEquals(schema, actualSchema) } + @Test + def testCopyOnWriteDeletes(): Unit = { + // Insert Operation + val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .mode(SaveMode.Overwrite) + .save(basePath) + + assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) + + val snapshotDF1 = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*/*/*") + assertEquals(100, snapshotDF1.count()) + + val records2 = deleteRecordsToStrings(dataGen.generateUniqueDeletes(20)).toList + val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2 , 2)) + + inputDF2.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + + val snapshotDF2 = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*/*/*") + assertEquals(snapshotDF1.count() - inputDF2.count(), snapshotDF2.count()) + } + + @ParameterizedTest //TODO(metadata): Needs HUDI-1459 to be fixed //@ValueSource(booleans = Array(true, false))