1
0

[HUDI-1615] Fixing usage of NULL schema for delete operation in HoodieSparkSqlWriter (#2777)

This commit is contained in:
Sivabalan Narayanan
2021-04-14 03:35:39 -04:00
committed by GitHub
parent ab4a7b0b4a
commit 8d29863c86
3 changed files with 41 additions and 1 deletions

View File

@@ -20,6 +20,7 @@
package org.apache.hudi.common.testutils; package org.apache.hudi.common.testutils;
import org.apache.hudi.avro.MercifulJsonConverter; import org.apache.hudi.avro.MercifulJsonConverter;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.FileIOUtils;
@@ -94,6 +95,11 @@ public class RawTripTestPayload implements HoodieRecordPayload<RawTripTestPayloa
} }
} }
public static List<String> deleteRecordsToStrings(List<HoodieKey> records) {
return records.stream().map(record -> "{\"_row_key\": \"" + record.getRecordKey() + "\",\"partition\": \"" + record.getPartitionPath() + "\"}")
.collect(Collectors.toList());
}
public String getPartitionPath() { public String getPartitionPath() {
return partitionPath; return partitionPath;
} }

View File

@@ -204,7 +204,7 @@ private[hudi] object HoodieSparkSqlWriter {
// Create a HoodieWriteClient & issue the delete. // Create a HoodieWriteClient & issue the delete.
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
Schema.create(Schema.Type.NULL).toString, path.get, tblName, null, path.get, tblName,
mapAsJavaMap(parameters - HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP))) mapAsJavaMap(parameters - HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP)))
.asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] .asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.timeline.HoodieInstant import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.testutils.RawTripTestPayload.deleteRecordsToStrings
import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen._ import org.apache.hudi.keygen._
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config
@@ -52,6 +53,8 @@ class TestCOWDataSource extends HoodieClientTestBase {
val commonOpts = Map( val commonOpts = Map(
"hoodie.insert.shuffle.parallelism" -> "4", "hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4",
"hoodie.bulkinsert.shuffle.parallelism" -> "2",
"hoodie.delete.shuffle.parallelism" -> "1",
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp",
@@ -125,6 +128,37 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertEquals(schema, actualSchema) assertEquals(schema, actualSchema)
} }
@Test
def testCopyOnWriteDeletes(): Unit = {
// Insert Operation
val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi")
.options(commonOpts)
.mode(SaveMode.Overwrite)
.save(basePath)
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
val snapshotDF1 = spark.read.format("org.apache.hudi")
.load(basePath + "/*/*/*/*")
assertEquals(100, snapshotDF1.count())
val records2 = deleteRecordsToStrings(dataGen.generateUniqueDeletes(20)).toList
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2 , 2))
inputDF2.write.format("org.apache.hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(basePath)
val snapshotDF2 = spark.read.format("org.apache.hudi")
.load(basePath + "/*/*/*/*")
assertEquals(snapshotDF1.count() - inputDF2.count(), snapshotDF2.count())
}
@ParameterizedTest @ParameterizedTest
//TODO(metadata): Needs HUDI-1459 to be fixed //TODO(metadata): Needs HUDI-1459 to be fixed
//@ValueSource(booleans = Array(true, false)) //@ValueSource(booleans = Array(true, false))